Skip to main content
How-to Guide

Pagination & Limits

PI Web API limits response sizes to protect server performance. Understanding maxCount behavior, time-based pagination, continuation tokens, and memory-efficient streaming patterns is essential for retrieving large datasets reliably without missing data.

How PI Web API limits responses

PI Web API applies limits at multiple levels. These are configurable by the PI administrator.

SettingDefaultLocationWhat it affects
maxCount (query parameter)1,000Per-requestMaximum items returned for stream endpoints (/recorded, /interpolated, etc.)
MaxItemsPerCallVaries (often 150,000)PI Web API Admin > System ConfigurationServer-wide upper bound on maxCount. Your request cannot exceed this.
BatchLimitVaries (often 500)PI Web API Admin > System ConfigurationMaximum sub-requests per batch call
Search results10 (default count)Per-requestIndexed Search results per page (use count parameter, max varies)
Request body size4 MB - 28 MBIIS configurationMaximum POST body size (affects batch and write payloads)

maxCount behavior and defaults

The maxCount parameter controls how many values the server returns from stream endpoints. If the time range contains more values than maxCount, the response is silently truncated -- no error, no warning header.

max_count_behavior.pypython
# Default behavior: maxCount=1000 (if not specified)
response = session.get(
    f"{BASE_URL}/streams/{WEB_ID}/recorded",
    params={"startTime": "*-30d", "endTime": "*"},
)
items = response.json()["Items"]
# If the point has 50,000 values in 30 days, you only get 1,000!

# Explicitly set maxCount higher
response = session.get(
    f"{BASE_URL}/streams/{WEB_ID}/recorded",
    params={
        "startTime": "*-30d",
        "endTime": "*",
        "maxCount": 50000,
        "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
    },
)
items = response.json()["Items"]
print(f"Returned {len(items)} values")

# CRITICAL: detect truncation
if len(items) == 50000:
    print("WARNING: Results may be truncated. Paginate to get all data.")

Silent truncation is the number one data quality bug

PI Web API does not return a header or flag indicating truncation. If you request maxCount=1000 and get exactly 1,000 results, your data is almost certainly truncated. Always check len(items) == maxCount and paginate if true. Many production bugs trace back to silently truncated data that was never detected.

Estimating data volume before querying

Use the Count summary type to estimate how many recorded values exist in a time range before retrieving them.

estimate_volume.pypython
def estimate_value_count(session, base_url, web_id, start, end):
    """Estimate the number of recorded values in a time range.

    Uses the Count summary to get an approximate count without
    transferring the actual data.
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/summary",
        params={
            "startTime": start,
            "endTime": end,
            "summaryType": "Count",
            "selectedFields": "Items.Value.Value",
        },
    )
    resp.raise_for_status()
    items = resp.json().get("Items", [])
    if items:
        return int(items[0]["Value"]["Value"])
    return 0


# Check before querying
count = estimate_value_count(session, BASE_URL, WEB_ID, "*-30d", "*")
print(f"Estimated values in 30 days: {count}")

if count > 100000:
    print("Large dataset — use paginated retrieval")
    # Use time-based pagination (see below)
elif count > 0:
    print(f"Setting maxCount={count + 1000} to ensure no truncation")
    # Safe to retrieve in one call

Time-based pagination for recorded values

The most reliable way to retrieve all recorded values from a large time range: paginate by time, using the timestamp of the last returned value as the start of the next page.

time_pagination.pypython
from datetime import datetime, timedelta, timezone


def get_all_recorded_values(
    session, base_url: str, web_id: str,
    start: str, end: str,
    max_count: int = 10000,
    selected_fields: str = "Items.Timestamp;Items.Value;Items.Good",
) -> list[dict]:
    """Retrieve all recorded values using time-based pagination.

    Handles the duplicate-timestamp edge case by using the timestamp
    of the last value as the start of the next page, with boundary
    type set to exclude the boundary value.

    Args:
        start: Start time (PI time string or ISO 8601)
        end: End time
        max_count: Values per page (higher = fewer HTTP calls, more memory)

    Returns:
        Complete list of all recorded values in the time range
    """
    all_values = []
    current_start = start
    page = 0

    while True:
        page += 1
        response = session.get(
            f"{base_url}/streams/{web_id}/recorded",
            params={
                "startTime": current_start,
                "endTime": end,
                "maxCount": max_count,
                "selectedFields": selected_fields,
            },
        )
        response.raise_for_status()
        items = response.json().get("Items", [])

        if not items:
            break

        # On the first page, take all items.
        # On subsequent pages, skip the first item (it's the same as
        # the last item of the previous page due to inclusive start boundary).
        if page > 1 and items[0]["Timestamp"] == all_values[-1]["Timestamp"]:
            items = items[1:]

        if not items:
            break

        all_values.extend(items)

        # If fewer than maxCount, we have all the data
        if len(items) < max_count:
            break

        # Use the last timestamp as the start of the next page
        current_start = items[-1]["Timestamp"]

    return all_values


# Usage: get all values for the last 90 days
values = get_all_recorded_values(
    session, BASE_URL, WEB_ID,
    start="*-90d",
    end="*",
    max_count=50000,
)
print(f"Total values retrieved: {len(values)}")

The duplicate-timestamp edge case

When paginating by time, the last value of page N and the first value of page N+1 can have the same timestamp (PI can store multiple values at the exact same timestamp). The code above handles this by comparing timestamps at the page boundary. A simpler but less correct approach is to add 1 millisecond to the start time, but this risks missing values that genuinely share a timestamp.

Continuation tokens

Some PI Web API endpoints support continuation tokens for stateful pagination. When available, the response includes a Links.Next URL that contains a continuation token. This is more reliable than time-based pagination because the server tracks your position.

continuation_token.pypython
def get_all_with_continuation(session, initial_url: str) -> list[dict]:
    """Paginate using continuation tokens (Links.Next).

    Works with endpoints that support continuation:
    - /elements/{webId}/elements (child elements)
    - /dataservers/{webId}/points (server points)
    - /assetdatabases/{webId}/elements (database elements)
    - /search/query (indexed search results)

    The server includes a 'Links.Next' URL when more data is available.
    """
    all_items = []
    url = initial_url

    while url:
        response = session.get(url)
        response.raise_for_status()
        data = response.json()

        items = data.get("Items", [])
        all_items.extend(items)

        # Check for continuation link
        links = data.get("Links", {})
        url = links.get("Next")  # None if no more pages

        if url:
            print(f"  Page loaded: {len(items)} items (total: {len(all_items)})")

    return all_items


# Example: get all points on a data server (may be thousands)
server_web_id = "S1..."
all_points = get_all_with_continuation(
    session,
    f"{BASE_URL}/dataservers/{server_web_id}/points"
    f"?maxCount=500&selectedFields=Items.WebId;Items.Name;Items.PointType;Links",
)
print(f"Total points on server: {len(all_points)}")

# Example: get all child elements (full AF tree level)
all_elements = get_all_with_continuation(
    session,
    f"{BASE_URL}/elements/{ROOT_WEB_ID}/elements"
    f"?maxCount=200&selectedFields=Items.WebId;Items.Name;Items.HasChildren;Links",
)
print(f"Total child elements: {len(all_elements)}")

When continuation tokens are available

Continuation tokens are available on collection endpoints (elements, points, attributes, databases) but not on stream endpoints (/recorded, /interpolated, etc.). For stream data, use time-based pagination. You can detect whether an endpoint supports continuation by checking for a Links.Next field in the response.

Search pagination

The PI Web API search endpoint uses offset-based pagination with start and count parameters.

search_pagination.pypython
def search_all(
    session, base_url: str, query: str, page_size: int = 100
) -> list[dict]:
    """Search for PI resources with full pagination.

    Args:
        query: Search query (e.g., "name:*temperature*")
        page_size: Results per page (max depends on server config)

    Returns:
        All matching resources across all pages
    """
    all_results = []
    start = 0

    while True:
        response = session.get(
            f"{base_url}/search/query",
            params={
                "q": query,
                "count": page_size,
                "start": start,
                "selectedFields": (
                    "Items.Name;Items.WebId;Items.ItemType;"
                    "Items.Description;TotalHits"
                ),
            },
        )
        response.raise_for_status()
        data = response.json()
        items = data.get("Items", [])

        if not items:
            break

        all_results.extend(items)

        # Check if we've retrieved everything
        total_hits = data.get("TotalHits", 0)
        if total_hits > 0 and start + page_size >= total_hits:
            break

        # Also stop if we got fewer than requested
        if len(items) < page_size:
            break

        start += page_size
        print(f"  Search page: {len(all_results)}/{total_hits}")

    return all_results


# Usage
results = search_all(session, BASE_URL, "name:*temperature*")
print(f"Found {len(results)} items matching 'temperature'")

Memory-efficient streaming for large datasets

When retrieving millions of values, loading everything into a list can exhaust memory. Use a generator pattern to process data in chunks without holding it all in memory.

streaming_pattern.pypython
from typing import Generator
import csv
from pathlib import Path


def stream_recorded_values(
    session, base_url: str, web_id: str,
    start: str, end: str,
    page_size: int = 50000,
) -> Generator[list[dict], None, None]:
    """Yield pages of recorded values without loading all into memory.

    Each yield returns one page of values. The caller processes and
    discards each page before the next is loaded.

    Yields:
        Lists of value dicts, one list per page
    """
    current_start = start
    page = 0

    while True:
        page += 1
        response = session.get(
            f"{base_url}/streams/{web_id}/recorded",
            params={
                "startTime": current_start,
                "endTime": end,
                "maxCount": page_size,
                "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
            },
        )
        response.raise_for_status()
        items = response.json().get("Items", [])

        if not items:
            break

        # Handle page boundary duplicates
        if page > 1:
            items = items[1:]  # Skip duplicate first item

        if not items:
            break

        yield items

        if len(items) < page_size:
            break

        current_start = items[-1]["Timestamp"]


def export_to_csv(
    session, base_url: str, web_id: str,
    start: str, end: str,
    output_file: str,
    page_size: int = 50000,
):
    """Stream PI data directly to CSV without loading all into memory.

    Memory usage stays constant regardless of dataset size.
    """
    output = Path(output_file)
    total_rows = 0

    with open(output, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Timestamp", "Value", "Good"])

        for page in stream_recorded_values(
            session, base_url, web_id, start, end, page_size
        ):
            for item in page:
                value = item["Value"]
                # Handle digital states
                if isinstance(value, dict):
                    value = value.get("Name", str(value))

                writer.writerow([
                    item["Timestamp"],
                    value,
                    item.get("Good", True),
                ])
                total_rows += 1

            print(f"  Written {total_rows} rows...")

    print(f"Export complete: {total_rows} rows to {output_file}")


# Usage: export 1 year of data (potentially millions of values)
export_to_csv(
    session, BASE_URL, WEB_ID,
    start="*-365d", end="*",
    output_file="reactor_temp_2024.csv",
    page_size=50000,
)
# Memory usage: ~constant at ~50K values * ~100 bytes = ~5 MB per page

Choosing page_size

For memory-efficient streaming, page_size=50000 is a good balance. Each page is roughly 5-10 MB in memory. Smaller pages (10,000) mean more HTTP calls but less memory. Larger pages (100,000+) are faster but may time out on slow servers. If exporting to a database, match the page size to your database's optimal batch insert size.

Rate limiting and throttling

PI Web API does not have formal rate-limiting headers like public APIs (no X-RateLimit-Remaining). However, several mechanisms can throttle or reject your requests.

Throttle sourceSymptomMitigation
IIS connection limitsConnection refused or HTTP 503Limit concurrent connections to 10-20 per client. Use session reuse.
PI Data Archive query limitsHTTP 502 or slow responsesReduce maxCount, narrow time ranges, add delays between pages.
PI Web API throttling configHTTP 429 or 503Configured by admin. Back off and retry with exponential delay.
Request timeout (IIS)HTTP 504 or connection resetUse smaller page sizes. Complex queries (summaries over large ranges) are slower.
Kerberos ticket expiryHTTP 401 after hours of runningRefresh Kerberos ticket in long-running ETL jobs. Check klist.
rate_aware_client.pypython
import time
import logging


logger = logging.getLogger(__name__)


def paginate_with_backpressure(
    session, base_url: str, web_id: str,
    start: str, end: str,
    max_count: int = 10000,
    min_delay: float = 0.05,
    max_delay: float = 5.0,
) -> list[dict]:
    """Paginate with adaptive rate control.

    Starts with minimal delay and increases it when the server is slow.
    This prevents overwhelming the server during large extractions
    while staying fast when the server has capacity.
    """
    all_values = []
    current_start = start
    delay = min_delay
    page = 0

    while True:
        page += 1
        start_time = time.monotonic()

        response = session.get(
            f"{base_url}/streams/{web_id}/recorded",
            params={
                "startTime": current_start,
                "endTime": end,
                "maxCount": max_count,
                "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
            },
        )

        elapsed = time.monotonic() - start_time

        # Handle server errors with backoff
        if response.status_code in (502, 503, 504):
            delay = min(delay * 2, max_delay)
            logger.warning(
                f"Server returned {response.status_code}, "
                f"backing off to {delay:.1f}s"
            )
            time.sleep(delay)
            continue

        response.raise_for_status()
        items = response.json().get("Items", [])

        if not items:
            break

        if page > 1 and items[0]["Timestamp"] == all_values[-1]["Timestamp"]:
            items = items[1:]

        if not items:
            break

        all_values.extend(items)

        # Adaptive delay: if the server responded quickly, reduce delay
        if elapsed < 1.0:
            delay = max(delay * 0.8, min_delay)
        elif elapsed > 5.0:
            delay = min(delay * 1.5, max_delay)

        if len(items) < max_count:
            break

        current_start = items[-1]["Timestamp"]

        logger.info(
            f"Page {page}: {len(items)} values ({elapsed:.1f}s), "
            f"total: {len(all_values)}, next delay: {delay:.2f}s"
        )
        time.sleep(delay)

    return all_values

Best practices for high-volume retrieval

Use batch requests to reduce connection count. Limit concurrent connections to 10-20 per server. Use selectedFields to minimize response size. Prefer interpolated values over recorded when you do not need every event. Add adaptive delays between paginated requests. Monitor PI Web API server CPU and memory if you control the infrastructure.

Server configuration reference

If you are a PI administrator or can work with one, these settings control pagination behavior. They are configured in PI Web API Admin (the management utility installed alongside PI Web API).

SettingPath in AdminNotes
MaxItemsPerCallSystem Configuration > APIUpper bound on maxCount for any single request. Increase for large extraction jobs, but monitor server memory.
BatchLimitSystem Configuration > APIMaximum sub-requests per batch call. Default is typically sufficient.
Request TimeoutIIS Manager > Sites > PI Web APIDefault 110 seconds. Increase if you see timeouts on large queries.
Max ConnectionsIIS Manager > Application PoolsMaximum concurrent connections. Default is usually fine for small teams. Increase for high-concurrency production workloads.

Need help?