
Batch Requests
The PI Web API batch endpoint combines multiple API calls into a single HTTP request. This guide covers parallel and sequential execution, the RequestTemplate pattern for efficient multi-point operations, error handling for partial failures, performance tuning, and real-world bulk read/write patterns.
Why use batch requests?
Every HTTP request to PI Web API has overhead: TCP handshake, TLS negotiation, authentication, and network latency. When you need to read 200 PI points, making 200 individual requests takes 20-60 seconds over a WAN. The /batch endpoint sends all 200 sub-requests in a single HTTP call, and PI Web API executes them server-side.
| Approach | 200 points | Network calls | Auth negotiations |
|---|---|---|---|
| Individual requests | 20-60 seconds | 200 | 200 (or 1 with session reuse) |
| Batch request | 0.5-3 seconds | 1 | 1 |
Parallel vs sequential execution
PI Web API supports two execution modes within a batch. Understanding the difference is critical for correctness.
| Mode | How to use | When to use |
|---|---|---|
| Parallel (default) | Omit ParentIds. All sub-requests run concurrently. | Independent reads or writes where no sub-request depends on another. |
| Sequential (chained) | Add ParentIds to create dependencies between sub-requests. | Workflows where one result feeds into the next (e.g., look up element, then read its attributes). |
Parallel is always faster
Sub-requests without ParentIds run in parallel on the server. Only add ParentIds when a sub-request genuinely depends on the output of another. Unnecessary chaining serializes execution and slows the entire batch.
Basic batch structure
A batch request is a POST to /batch with a JSON body. Each key is a unique name you assign, and each value describes one sub-request with Method, Resource, and optionally Content (for POST/PUT), Headers, and Parameters.
# Each sub-request specifies Method, Resource, and optionally Content
batch_request = {
"GetTemp": {
"Method": "GET",
"Resource": f"{BASE_URL}/streams/{TEMP_WEB_ID}/value"
+ "?selectedFields=Timestamp;Value;Good",
},
"GetPressure": {
"Method": "GET",
"Resource": f"{BASE_URL}/streams/{PRESSURE_WEB_ID}/value"
+ "?selectedFields=Timestamp;Value;Good",
},
"GetFlow": {
"Method": "GET",
"Resource": f"{BASE_URL}/streams/{FLOW_WEB_ID}/value"
+ "?selectedFields=Timestamp;Value;Good",
},
}
response = session.post(f"{BASE_URL}/batch", json=batch_request)
results = response.json()
# Each result has its own Status, Headers, and Content
for name, result in results.items():
if result["Status"] == 200:
content = result["Content"]
print(f"{name}: {content['Value']} at {content['Timestamp']}")
else:
print(f"{name}: Error {result['Status']}")
if "Content" in result:
print(f" Details: {result['Content']}")Expected response structure:
{
"GetTemp": {
"Status": 200,
"Headers": { "Content-Type": "application/json" },
"Content": {
"Timestamp": "2025-03-15T14:30:00Z",
"Value": 72.5,
"Good": true
}
},
"GetPressure": {
"Status": 200,
"Headers": { "Content-Type": "application/json" },
"Content": {
"Timestamp": "2025-03-15T14:30:00Z",
"Value": 14.7,
"Good": true
}
}
}Bulk tag reads: current values
The most common batch pattern: reading the current snapshot of many PI points at once.
def read_current_values_batch(
session, base_url: str, web_ids: list[str]
) -> dict[str, dict]:
"""Read current values for multiple PI points in one HTTP call.
Returns a dict keyed by WebID with value, timestamp, and quality.
"""
batch = {}
for web_id in web_ids:
batch[web_id] = {
"Method": "GET",
"Resource": (
f"{base_url}/streams/{web_id}/value"
"?selectedFields=Timestamp;Value;Good"
),
}
response = session.post(f"{base_url}/batch", json=batch)
response.raise_for_status()
results = {}
for web_id, result in response.json().items():
if result["Status"] == 200:
c = result["Content"]
results[web_id] = {
"value": c["Value"],
"timestamp": c["Timestamp"],
"good": c.get("Good", True),
}
else:
results[web_id] = {
"error": result["Status"],
"message": result.get("Content", {}).get("Message", ""),
}
return results
# Usage — read 150 points in one call
web_ids = ["F1DPaH...", "F1DPbX...", "F1DPcZ..."] # up to hundreds
values = read_current_values_batch(session, BASE_URL, web_ids)
for wid, val in values.items():
if "error" in val:
print(f"{wid}: ERROR {val['error']} - {val['message']}")
else:
print(f"{wid}: {val['value']} (good={val['good']})")Bulk tag reads: recorded history
Batch historical queries for multiple points. Each sub-request can have its own time range and parameters.
from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
start = (now - timedelta(hours=24)).isoformat()
end = now.isoformat()
batch = {}
for web_id in web_ids:
batch[web_id] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{web_id}/recorded"
f"?startTime={start}&endTime={end}"
f"&maxCount=10000"
f"&selectedFields=Items.Timestamp;Items.Value;Items.Good"
),
}
response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()
for web_id, result in results.items():
if result["Status"] == 200:
items = result["Content"]["Items"]
print(f"{web_id}: {len(items)} values")
# Check for truncation
if len(items) == 10000:
print(f" WARNING: results may be truncated at maxCount")
else:
print(f"{web_id}: Error {result['Status']}")Watch for maxCount truncation
If a sub-request returns exactly maxCount values, the data is likely truncated. Either increase maxCount, narrow the time range, or paginate each sub-request individually. Batch does not paginate individual sub-requests for you.
Multi-point writes
Write current values to multiple PI points atomically within one HTTP call. The Content field must be a JSON object, not a string.
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
points_to_write = [
{"web_id": "F1DPaH...", "value": 72.5, "name": "temperature"},
{"web_id": "F1DPbX...", "value": 14.7, "name": "pressure"},
{"web_id": "F1DPcZ...", "value": 150.0, "name": "flow_rate"},
]
batch = {}
for point in points_to_write:
batch[point["name"]] = {
"Method": "POST",
"Resource": f"{BASE_URL}/streams/{point['web_id']}/value",
# Content MUST be a JSON object, not a string
"Content": {
"Value": point["value"],
"Timestamp": now,
},
}
response = session.post(f"{BASE_URL}/batch", json=batch)
# Check each sub-request individually
succeeded = 0
failed = 0
for name, result in response.json().items():
status = result["Status"]
if status in (200, 202, 204):
succeeded += 1
else:
failed += 1
msg = result.get("Content", {})
if isinstance(msg, dict):
msg = msg.get("Message", str(msg))
print(f"FAILED {name}: HTTP {status} - {msg}")
print(f"Writes: {succeeded} succeeded, {failed} failed")Content must be a JSON object
A common mistake is setting Content to a JSON string (e.g., using json.dumps() or an f-string). PI Web API expects the Content field to be an actual JSON object in the batch body, not a serialized string. If you pass a string, you will get 400 Bad Request on that sub-request.
Bulk historical writes
Write multiple historical values to multiple points in one batch call. Each sub-request writes an array of timestamped values to one point.
from datetime import datetime, timedelta, timezone
# Generate sample historical data for two points
base_time = datetime(2025, 3, 15, 0, 0, 0, tzinfo=timezone.utc)
batch = {
"WriteTemp": {
"Method": "POST",
"Resource": f"{BASE_URL}/streams/{TEMP_WEB_ID}/recorded",
"Content": [
{
"Timestamp": (base_time + timedelta(minutes=i * 5)).isoformat(),
"Value": 70.0 + (i * 0.1),
}
for i in range(288) # 24 hours at 5-minute intervals
],
},
"WritePressure": {
"Method": "POST",
"Resource": f"{BASE_URL}/streams/{PRESSURE_WEB_ID}/recorded",
"Content": [
{
"Timestamp": (base_time + timedelta(minutes=i * 5)).isoformat(),
"Value": 14.5 + (i * 0.005),
}
for i in range(288)
],
},
}
response = session.post(f"{BASE_URL}/batch", json=batch)
for name, result in response.json().items():
if result["Status"] in (200, 202, 204):
print(f"{name}: OK")
else:
# Check for per-item errors in the response
content = result.get("Content", {})
if isinstance(content, dict) and "Errors" in content:
print(f"{name}: {len(content['Errors'])} item errors")
for err in content["Errors"][:3]:
print(f" {err}")
else:
print(f"{name}: Error {result['Status']} - {content}")Dependent requests with ParentIds
Some batch workflows require chaining: first look up a resource, then use its WebID in a follow-up request. Use ParentIds and Parameters with JSONPath expressions to create dependencies.
Simple chain: look up point, then read value
batch = {
"FindPoint": {
"Method": "GET",
"Resource": (
f"{BASE_URL}/points"
f"?path=\\\\YOUR-SERVER\\sinusoid"
),
},
"ReadValue": {
"Method": "GET",
"Resource": "{0}?selectedFields=Timestamp;Value;Good",
"ParentIds": ["FindPoint"],
"Parameters": ["$.FindPoint.Content.Links.Value"],
},
}
response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()
if results["ReadValue"]["Status"] == 200:
content = results["ReadValue"]["Content"]
print(f"Value: {content['Value']} at {content['Timestamp']}")
else:
print(f"Error: {results['ReadValue']['Status']}")Realistic chain: element, then attributes, then values
# Three-step chain: find AF element -> get its attributes -> read values
batch = {
# Step 1: Look up an AF element by path
"FindElement": {
"Method": "GET",
"Resource": (
f"{BASE_URL}/elements"
f"?path=\\\\YOUR-AF-SERVER\\Database\\Plant1\\Reactor01"
),
},
# Step 2: Get the element's attributes (depends on Step 1)
"GetAttributes": {
"Method": "GET",
"Resource": "{0}?selectedFields=Items.Name;Items.WebId;Items.Links",
"ParentIds": ["FindElement"],
"Parameters": ["$.FindElement.Content.Links.Attributes"],
},
}
response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()
# Parse the attribute list
if results["GetAttributes"]["Status"] == 200:
attributes = results["GetAttributes"]["Content"]["Items"]
for attr in attributes:
print(f" {attr['Name']}: {attr['WebId']}")
# Now batch-read all attribute values in a second batch call
value_batch = {}
for attr in attributes:
value_batch[attr["Name"]] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{attr['WebId']}/value"
"?selectedFields=Timestamp;Value;Good"
),
}
value_response = session.post(f"{BASE_URL}/batch", json=value_batch)
for attr_name, result in value_response.json().items():
if result["Status"] == 200:
print(f" {attr_name} = {result['Content']['Value']}")
else:
print(f"Error: {results['GetAttributes']['Status']}")How ParentIds and Parameters work
When a sub-request has ParentIds, it waits for those parents to complete first. The Parameters array uses JSONPath expressions (prefixed with $.) to extract values from parent responses. These are injected into the child request's Resource URL using positional placeholders: {0}, {1}, etc. If a parent fails, the dependent sub-request is skipped and returns a 409 Conflict status.
RequestTemplate for efficient multi-point reads
When you need to make the same type of request for many resources (e.g., reading the current value of 500 points), the RequestTemplate and ParameterPaths pattern is more compact than writing 500 separate sub-request objects.
# RequestTemplate: define the pattern once, parameterize per-item
# This reads current values for multiple points efficiently
# First, get the list of WebIDs (from a prior lookup or cache)
web_ids = ["F1DPaH...", "F1DPbX...", "F1DPcZ...", "F1DPdQ..."]
batch = {}
for i, web_id in enumerate(web_ids):
batch[f"Read_{i}"] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{web_id}/value"
"?selectedFields=Timestamp;Value;Good"
),
}
# For very large lists, you can also use the streamsets endpoint
# which is purpose-built for multi-point reads:
#
# GET /streamsets/{webId}/value (for AF element children)
# POST /streamsets/value?selectedFields=Items.Name;Items.Items
# with body: [webId1, webId2, ...]
#
# The streamsets endpoint is often faster than batch for pure
# multi-point reads because it is a single atomic operation.
response = session.post(f"{BASE_URL}/batch", json=batch)
print(f"Read {len(response.json())} point values in one call")Consider streamsets for pure multi-point reads
If you are reading the same type of value (current, recorded, interpolated) for many points, the /streamsets endpoints are often faster than batch. The POST /streamsets/value endpoint accepts an array of WebIDs and returns all values in one call, without the overhead of named sub-requests. Use batch when you need mixed operations (reads + writes + lookups) in one call.
Batch size limits and server configuration
PI Web API limits batch requests at several levels. These defaults are configurable by the PI administrator in the PI Web API Admin utility.
| Setting | Location | Default | Notes |
|---|---|---|---|
| BatchLimit | System Configuration | Varies (often 500) | Maximum sub-requests per batch call |
| Request body size | IIS / PI Web API config | 4 MB - 28 MB | Total JSON payload size limit |
| Request timeout | IIS configuration | 110 seconds | Large batches may exceed this |
| MaxItemsPerCall | System Configuration | Varies | Limits items returned per individual sub-request |
Finding your server's limits
If you exceed the batch limit, PI Web API returns 413 Request Entity Too Large or 400 Bad Request with a message indicating the limit. Ask your PI administrator to check PI Web API Admin > System Configuration for the BatchLimit setting. On most production servers, the default is sufficient for 200-500 sub-requests per batch.
Chunked batch for large operations
When you need to process more items than the server allows in a single batch, split into chunks. Optimal chunk size depends on your server and the complexity of each sub-request.
import time
import logging
logger = logging.getLogger(__name__)
def chunked_batch(
session,
base_url: str,
batch_items: dict,
chunk_size: int = 200,
delay_between_chunks: float = 0.1,
) -> dict:
"""Execute a large batch in chunks, respecting server limits.
Args:
batch_items: Full batch dict (may exceed server limit)
chunk_size: Sub-requests per chunk (tune based on server config)
delay_between_chunks: Seconds to wait between chunks (avoids
overwhelming the server)
Returns:
Combined results from all chunks
"""
keys = list(batch_items.keys())
all_results = {}
total_chunks = (len(keys) + chunk_size - 1) // chunk_size
for chunk_num, i in enumerate(range(0, len(keys), chunk_size)):
chunk_keys = keys[i : i + chunk_size]
chunk = {k: batch_items[k] for k in chunk_keys}
logger.info(
f"Batch chunk {chunk_num + 1}/{total_chunks}: "
f"{len(chunk)} sub-requests"
)
response = session.post(f"{base_url}/batch", json=chunk)
response.raise_for_status()
all_results.update(response.json())
# Small delay to avoid overwhelming the server
if i + chunk_size < len(keys) and delay_between_chunks > 0:
time.sleep(delay_between_chunks)
return all_results
# Example: read current values for 2000 points
big_batch = {}
for i, web_id in enumerate(large_web_id_list):
big_batch[f"read_{i}"] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{web_id}/value"
"?selectedFields=Timestamp;Value;Good"
),
}
# Process in chunks of 200 (safe default for most servers)
results = chunked_batch(session, BASE_URL, big_batch, chunk_size=200)
print(f"Read {len(results)} points across {len(big_batch)//200 + 1} chunks")Tuning chunk size
Start with a chunk size of 200. If your server is fast and returns quickly, increase to 400-500. If you see timeouts or 503 errors, decrease to 100. For sub-requests that return large payloads (historical data), use a smaller chunk size (50-100) because the response body can be very large. For simple current-value reads, you can often go up to 500 per chunk.
Error handling for partial batch failures
A batch response can be a mix of successes and failures. The top-level HTTP status is typically 207 Multi-Status when there are mixed results, but you should always check each sub-request individually.
from dataclasses import dataclass
@dataclass
class BatchResult:
"""Parsed result of a batch sub-request."""
name: str
status: int
content: dict | list | None
success: bool
@property
def error_message(self) -> str:
if self.success:
return ""
if isinstance(self.content, dict):
return self.content.get("Message", str(self.content))
return str(self.content) if self.content else f"HTTP {self.status}"
def execute_batch(session, base_url: str, batch: dict) -> list[BatchResult]:
"""Execute a batch request and return parsed results.
Raises on transport-level errors (no response at all).
Does NOT raise on individual sub-request failures.
"""
response = session.post(f"{base_url}/batch", json=batch)
# If the batch endpoint itself fails (not individual sub-requests)
if response.status_code not in (200, 207, 202):
response.raise_for_status()
results = []
for name, result in response.json().items():
status = result["Status"]
results.append(BatchResult(
name=name,
status=status,
content=result.get("Content"),
success=status in (200, 202, 204),
))
return results
# Usage
results = execute_batch(session, BASE_URL, my_batch)
succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"Succeeded: {len(succeeded)}")
print(f"Failed: {len(failed)}")
for r in failed:
print(f" {r.name}: HTTP {r.status} - {r.error_message}")
# Common failure patterns:
# 404 - Point or element does not exist (wrong WebID or deleted)
# 403 - No permission on this specific point
# 409 - Dependent request failed (parent had an error)
# 502 - PI Data Archive is unreachable from PI Web API serverA 200 batch response does not mean all sub-requests succeeded
PI Web API returns HTTP 200 at the top level even when individual sub-requests have failed. You must iterate through every result and check each Status field. A common bug is to only check response.status_code and assume all is well.
Retry failed sub-requests
When some sub-requests in a batch fail (e.g., 503 from a temporarily overloaded server), you can retry just the failed ones without re-executing the successful ones.
import time
def batch_with_retry(
session, base_url: str, batch: dict,
max_retries: int = 3, retry_statuses: set = {502, 503},
) -> dict:
"""Execute batch with automatic retry for transient failures.
Only retries sub-requests that failed with retryable status codes.
Sub-requests that succeeded or failed permanently are not retried.
"""
remaining = dict(batch)
all_results = {}
for attempt in range(max_retries + 1):
if not remaining:
break
response = session.post(f"{base_url}/batch", json=remaining)
response.raise_for_status()
retry_batch = {}
for name, result in response.json().items():
status = result["Status"]
if status in retry_statuses and attempt < max_retries:
# This sub-request can be retried
retry_batch[name] = remaining[name]
else:
# Final result (success or permanent failure)
all_results[name] = result
remaining = retry_batch
if remaining:
wait = 2 ** attempt
print(f"Retrying {len(remaining)} sub-requests in {wait}s...")
time.sleep(wait)
return all_resultsReal-world pattern: dashboard snapshot
A common production pattern: reading current values, 24-hour summaries, and alarm status for a set of points in a single batch call to populate a dashboard.
from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
yesterday = (now - timedelta(hours=24)).isoformat()
# Define the dashboard points
dashboard_points = {
"reactor_temp": REACTOR_TEMP_WEB_ID,
"reactor_pressure": REACTOR_PRESSURE_WEB_ID,
"feed_flow": FEED_FLOW_WEB_ID,
"product_flow": PRODUCT_FLOW_WEB_ID,
}
# Build a batch with BOTH current values AND 24h summaries
batch = {}
for name, web_id in dashboard_points.items():
# Current value
batch[f"{name}_current"] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{web_id}/value"
"?selectedFields=Timestamp;Value;Good"
),
}
# 24-hour summary (min, max, average)
batch[f"{name}_summary"] = {
"Method": "GET",
"Resource": (
f"{BASE_URL}/streams/{web_id}/summary"
f"?startTime={yesterday}&endTime={now.isoformat()}"
f"&summaryType=Minimum&summaryType=Maximum&summaryType=Average"
f"&selectedFields=Items.Type;Items.Value.Value"
),
}
# One HTTP call for 4 current values + 4 summaries = 8 sub-requests
response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()
# Parse into a dashboard-friendly structure
for name in dashboard_points:
current = results.get(f"{name}_current", {})
summary = results.get(f"{name}_summary", {})
if current.get("Status") == 200:
val = current["Content"]["Value"]
print(f"{name}: current = {val}")
if summary.get("Status") == 200:
for item in summary["Content"].get("Items", []):
stype = item["Type"]
sval = item["Value"]["Value"]
print(f" {stype}: {sval:.2f}")Performance guidelines
| Guideline | Details |
|---|---|
Use selectedFields in every sub-request | Reduces response size by 50-80%. Especially important in batch because savings multiply across all sub-requests. |
| Keep chunks under 200 for historical data | Sub-requests with large responses (recorded values) consume more server memory. Smaller chunks prevent timeouts. |
Use /streamsets for same-type reads | If all sub-requests read the same endpoint type (e.g., current value), the streamsets endpoint is faster than batch. |
Avoid unnecessary ParentIds | Parallel execution is always faster. Only chain requests when genuinely needed. |
| Cache WebIDs | Do not include WebID lookups in every batch. Look up once, cache, and reuse. |
| Monitor response time | If batch calls take more than 10 seconds, reduce chunk size or narrow time ranges for historical queries. |