
PI Web API Cookbook
Copy-paste Python recipes for every common PI Web API task. Each recipe is self-contained, production-tested, and includes error handling.
How to use this cookbook
Every recipe is designed to be copied directly into your project. Each one includes imports, configuration, the core logic, and error handling. Recipes are organized by use case: start with Session Setup, then pick the recipes you need.
All recipes assume you have created a session using Recipe 1. Variables like session, BASE_URL, and WEB_ID are shared across recipes.
Recipes by category
Foundation
1. Session Setup
FoundationReusable authenticated session with SSL handling, connection pooling, and retry logic.
2. Find a PI Point
FoundationLook up a PI point by path or search and retrieve its WebID.
Reading data
3. Read Current Value
ReadingGet the latest snapshot value with quality checking and digital state handling.
4. Recorded Values to DataFrame
ReadingPull recorded history into a pandas DataFrame with proper quality filtering.
5. Summary Statistics
ReadingGet min, max, average, and other summaries without pulling raw data.
Writing data
6. Write a Single Value
WritingWrite one timestamped value with buffer and update options.
7. Write Multiple Values
WritingWrite a batch of historical values with error checking.
Batch and bulk
AF and hierarchy
9. AF Element Traversal
AFWalk the AF hierarchy and read attribute values for elements.
10. Event Frame Search
AFSearch and create event frames for process events.
Production patterns
Recipe 1: Session setup
Create a reusable session with authentication, SSL certificate handling, connection pooling, and automatic retries. Every other recipe depends on this.
import os
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_pi_session(
base_url: str | None = None,
username: str | None = None,
password: str | None = None,
ca_bundle: str | bool | None = None,
pool_size: int = 10,
max_retries: int = 3,
) -> requests.Session:
"""Create a configured session for PI Web API.
Args:
base_url: PI Web API URL (falls back to PI_WEB_API_URL env var)
username: Auth username (falls back to PI_USERNAME env var)
password: Auth password (falls back to PI_PASSWORD env var)
ca_bundle: Path to CA cert, True for default, False to skip verify
pool_size: Connection pool size (increase for high concurrency)
max_retries: Number of retries for transient failures
"""
session = requests.Session()
# Authentication
user = username or os.environ.get("PI_USERNAME", "")
pwd = password or os.environ.get("PI_PASSWORD", "")
if user:
session.auth = (user, pwd)
# SSL verification
if ca_bundle is not None:
session.verify = ca_bundle
else:
env_bundle = os.environ.get("PI_CA_BUNDLE")
session.verify = env_bundle if env_bundle else True
# Connection pooling + retry logic
retries = Retry(
total=max_retries,
backoff_factor=1, # 1s, 2s, 4s between retries
status_forcelist=[408, 429, 500, 502, 503, 504],
allowed_methods=["GET", "HEAD"], # Only retry safe methods
)
adapter = HTTPAdapter(
max_retries=retries,
pool_connections=pool_size,
pool_maxsize=pool_size,
)
session.mount("https://", adapter)
# Default headers
session.headers.update({
"Accept": "application/json",
"Content-Type": "application/json",
"X-Requested-With": "PiSharp-Cookbook",
})
return session
# Usage
BASE_URL = os.environ.get("PI_WEB_API_URL", "https://myserver/piwebapi")
session = create_pi_session()
# Verify connectivity
resp = session.get(f"{BASE_URL}/")
if resp.ok:
info = resp.json()
print(f"Connected to {info['ProductTitle']} {info['ProductVersion']}")
else:
print(f"Connection failed: {resp.status_code}")Recipe 2: Find a PI point
Look up a PI point by its full path (most reliable) or by search query. Returns the WebID needed for all data operations.
def find_point_by_path(session, base_url, server_name, point_name):
"""Find a PI point by its full path. Most reliable method.
Returns: (web_id, point_name) tuple
"""
path = f"\\\\{server_name}\\{point_name}"
resp = session.get(
f"{base_url}/points",
params={
"path": path,
"selectedFields": "WebId;Name;PointType;EngineeringUnits",
},
)
resp.raise_for_status()
data = resp.json()
return data["WebId"], data["Name"]
def find_points_by_filter(session, base_url, data_server_web_id,
name_filter, max_count=100):
"""Find PI points by name pattern on a specific data server.
Args:
name_filter: Wildcard pattern like "Temperature*" or "Unit1_*"
Returns: List of {WebId, Name, PointType} dicts
"""
resp = session.get(
f"{base_url}/dataservers/{data_server_web_id}/points",
params={
"nameFilter": name_filter,
"maxCount": max_count,
"selectedFields": "Items.WebId;Items.Name;Items.PointType",
},
)
resp.raise_for_status()
return resp.json().get("Items", [])
# Usage: path-based lookup (preferred)
web_id, name = find_point_by_path(session, BASE_URL, "MY-PI-SERVER", "sinusoid")
print(f"Found {name}: {web_id}")
# Usage: filter-based lookup (multiple points)
# First get the data server WebID
ds_resp = session.get(f"{BASE_URL}/dataservers",
params={"selectedFields": "Items.WebId;Items.Name"})
ds_web_id = ds_resp.json()["Items"][0]["WebId"]
points = find_points_by_filter(session, BASE_URL, ds_web_id, "Temperature*")
for p in points:
print(f" {p['Name']}: {p['WebId']}")Recipe 3: Read current value
Get the most recent snapshot value for a PI point, with proper handling for digital states and bad quality.
def read_current_value(session, base_url, web_id):
"""Read the current snapshot value for a PI point.
Handles digital states, bad quality, and returns a clean dict.
"""
resp = session.get(
f"{base_url}/streams/{web_id}/value",
params={"selectedFields": "Timestamp;Value;Good;Questionable;Substituted"},
)
resp.raise_for_status()
data = resp.json()
# Handle digital state values (come as objects, not numbers)
value = data["Value"]
is_digital = isinstance(value, dict)
if is_digital:
value = value.get("Name", value.get("Value", str(value)))
return {
"value": value,
"timestamp": data["Timestamp"],
"good": data.get("Good", True),
"questionable": data.get("Questionable", False),
"substituted": data.get("Substituted", False),
"is_digital": is_digital,
}
# Usage
result = read_current_value(session, BASE_URL, web_id)
if result["good"]:
print(f"Value: {result['value']} at {result['timestamp']}")
else:
print(f"Bad quality value: {result['value']} (questionable={result['questionable']})")Recipe 4: Recorded values to DataFrame
Pull recorded historical values and load them into a pandas DataFrame with proper timestamps, quality filtering, and digital state handling.
import pandas as pd
from datetime import datetime, timedelta, timezone
def recorded_to_dataframe(
session, base_url, web_id,
start_time="*-24h", end_time="*",
max_count=10000, boundary_type="Inside",
filter_good_only=True,
):
"""Fetch recorded values and return as a pandas DataFrame.
Args:
boundary_type: "Inside" (default), "Outside", or "Interpolated"
filter_good_only: If True, remove bad-quality values
Returns: DataFrame with Timestamp index, Value column
"""
resp = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": start_time,
"endTime": end_time,
"maxCount": max_count,
"boundaryType": boundary_type,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if not items:
return pd.DataFrame(columns=["Value"])
df = pd.DataFrame(items)
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
df = df.set_index("Timestamp").sort_index()
# Handle digital state values
def extract_value(v):
if isinstance(v, dict):
return v.get("Name", v.get("Value"))
return v
df["Value"] = df["Value"].apply(extract_value)
# Filter by quality
if filter_good_only and "Good" in df.columns:
df = df[df["Good"] == True]
# Warn if data was truncated
if len(items) == max_count:
print(f"WARNING: Returned exactly {max_count} values. "
f"Data may be truncated. Increase maxCount or narrow time range.")
return df[["Value"]]
# Usage
df = recorded_to_dataframe(session, BASE_URL, web_id, start_time="*-48h")
print(f"Got {len(df)} values")
print(df.describe())Recipe 5: Summary statistics
Get min, max, average, total, count, and other statistics without pulling all raw data. Much faster than reading recorded values and computing client-side.
def get_summaries(
session, base_url, web_id,
start_time="*-24h", end_time="*",
summary_type="Average,Minimum,Maximum,Count,PercentGood",
calculation_basis="TimeWeighted",
):
"""Get summary statistics for a PI point.
Args:
summary_type: Comma-separated list. Options: Average, Minimum,
Maximum, Total, Count, StdDev, Range, PercentGood, All
calculation_basis: "TimeWeighted" (default) or "EventWeighted"
TimeWeighted: Weights values by duration (correct for flow/rate)
EventWeighted: Equal weight per event (correct for batch counts)
"""
resp = session.get(
f"{base_url}/streams/{web_id}/summary",
params={
"startTime": start_time,
"endTime": end_time,
"summaryType": summary_type,
"calculationBasis": calculation_basis,
},
)
resp.raise_for_status()
results = {}
for item in resp.json().get("Items", []):
stat_type = item["Type"]
value = item["Value"]
results[stat_type] = {
"value": value.get("Value"),
"timestamp": value.get("Timestamp"),
"good": value.get("Good", True),
}
return results
# Usage
stats = get_summaries(session, BASE_URL, web_id, start_time="*-7d")
for stat, data in stats.items():
print(f"{stat}: {data['value']}")
# Example output:
# Average: 72.34
# Minimum: 65.12
# Maximum: 81.90
# Count: 8640
# PercentGood: 99.87Recipe 6: Write a single value
Write one value with a timestamp to a PI point, with control over update behavior and buffering.
from datetime import datetime, timezone
def write_value(
session, base_url, web_id, value, timestamp=None,
update_option="Replace", buffer_option="BufferIfPossible",
):
"""Write a single value to a PI point.
Args:
update_option: "Replace", "Insert", "NoReplace", "Remove",
"InsertNoCompression"
buffer_option: "BufferIfPossible", "Buffer", "DoNotBuffer"
BufferIfPossible: Queue if Data Archive is temporarily unavailable
DoNotBuffer: Fail immediately if Data Archive is down
Returns: True if write succeeded
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc).isoformat()
resp = session.post(
f"{base_url}/streams/{web_id}/value",
json={"Value": value, "Timestamp": timestamp},
params={
"updateOption": update_option,
"bufferOption": buffer_option,
},
)
if resp.status_code in (200, 202, 204):
return True
# Parse error details
try:
errors = resp.json().get("Errors", [])
print(f"Write failed ({resp.status_code}): {errors}")
except Exception:
print(f"Write failed ({resp.status_code}): {resp.text[:200]}")
return False
# Usage
success = write_value(session, BASE_URL, web_id, 72.5)
print(f"Write {'succeeded' if success else 'failed'}")Write permissions required
Writing to PI points requires appropriate Data Access permissions on the PI Data Archive. If you get a 403 error, check the point security with your PI administrator.
Recipe 7: Write multiple values
Write a batch of historical values to a PI point with error checking for partial failures.
def write_values(session, base_url, web_id, values,
update_option="Replace"):
"""Write multiple values to a PI point.
Args:
values: List of {"Value": ..., "Timestamp": ...} dicts
Returns: (success_count, error_count) tuple
"""
resp = session.post(
f"{base_url}/streams/{web_id}/recorded",
json=values,
params={"updateOption": update_option},
)
if resp.status_code in (200, 202, 204):
return len(values), 0
# Check for partial failures
if resp.status_code == 207: # Multi-status
body = resp.json()
errors = [e for e in body.get("Errors", []) if e]
return len(values) - len(errors), len(errors)
print(f"Write failed ({resp.status_code}): {resp.text[:300]}")
return 0, len(values)
# Usage
values = [
{"Value": 70.1, "Timestamp": "2026-03-15T10:00:00Z"},
{"Value": 71.3, "Timestamp": "2026-03-15T10:05:00Z"},
{"Value": 72.0, "Timestamp": "2026-03-15T10:10:00Z"},
{"Value": 73.5, "Timestamp": "2026-03-15T10:15:00Z"},
]
ok, err = write_values(session, BASE_URL, web_id, values)
print(f"Written: {ok} succeeded, {err} failed")Recipe 8: Batch read 100+ points
Read current values for a large number of PI points using batched requests with automatic chunking. One HTTP request per chunk instead of one per point.
def batch_read_current(session, base_url, web_ids, chunk_size=100):
"""Read current values for many points using chunked batch requests.
Args:
web_ids: List of (name, web_id) tuples or just web_id strings
chunk_size: Max sub-requests per batch (100 is a safe default)
Returns: Dict mapping name/index to {value, timestamp, good}
"""
# Normalize input
if web_ids and isinstance(web_ids[0], str):
web_ids = [(f"point_{i}", wid) for i, wid in enumerate(web_ids)]
all_results = {}
for i in range(0, len(web_ids), chunk_size):
chunk = web_ids[i : i + chunk_size]
batch = {}
for name, wid in chunk:
batch[name] = {
"Method": "GET",
"Resource": (
f"{base_url}/streams/{wid}/value"
f"?selectedFields=Timestamp;Value;Good"
),
}
resp = session.post(f"{base_url}/batch", json=batch)
resp.raise_for_status()
for key, result in resp.json().items():
if result["Status"] == 200:
content = result["Content"]
value = content["Value"]
if isinstance(value, dict):
value = value.get("Name", value.get("Value"))
all_results[key] = {
"value": value,
"timestamp": content["Timestamp"],
"good": content.get("Good", True),
}
else:
all_results[key] = {
"value": None,
"error": result["Status"],
}
return all_results
# Usage
point_list = [
("Temperature", "F1DPaH..."),
("Pressure", "F1DPbX..."),
("Flow", "F1DPcY..."),
]
values = batch_read_current(session, BASE_URL, point_list)
for name, val in values.items():
if "error" not in val:
print(f"{name}: {val['value']}")
else:
print(f"{name}: ERROR {val['error']}")Recipe 9: AF element traversal
Walk the AF element hierarchy and read attribute values. Useful for discovering equipment structure and reading configuration from the asset model.
def get_af_databases(session, base_url):
"""List all AF databases."""
resp = session.get(f"{base_url}/assetservers")
resp.raise_for_status()
servers = resp.json().get("Items", [])
databases = []
for server in servers:
resp = session.get(
f"{base_url}/assetservers/{server['WebId']}/assetdatabases",
params={"selectedFields": "Items.WebId;Items.Name"},
)
if resp.ok:
databases.extend(resp.json().get("Items", []))
return databases
def get_child_elements(session, base_url, parent_web_id, max_count=1000):
"""Get child elements of an AF element."""
resp = session.get(
f"{base_url}/elements/{parent_web_id}/elements",
params={
"maxCount": max_count,
"selectedFields": "Items.WebId;Items.Name;Items.TemplateName;Items.HasChildren",
},
)
resp.raise_for_status()
return resp.json().get("Items", [])
def get_element_attributes(session, base_url, element_web_id):
"""Get all attributes and their current values for an element."""
resp = session.get(
f"{base_url}/elements/{element_web_id}/attributes",
params={
"selectedFields": "Items.WebId;Items.Name;Items.Value;Items.Type",
},
)
resp.raise_for_status()
return resp.json().get("Items", [])
def walk_af_tree(session, base_url, element_web_id, depth=0, max_depth=3):
"""Recursively walk the AF hierarchy and print the tree."""
children = get_child_elements(session, base_url, element_web_id)
for child in children:
indent = " " * depth
template = child.get("TemplateName", "")
suffix = f" [{template}]" if template else ""
print(f"{indent}{child['Name']}{suffix}")
if child.get("HasChildren") and depth < max_depth:
walk_af_tree(session, base_url, child["WebId"],
depth + 1, max_depth)
# Usage
# List databases
dbs = get_af_databases(session, BASE_URL)
for db in dbs:
print(f"Database: {db['Name']}")
# Walk the tree from the database root
if dbs:
# Get root elements of first database
resp = session.get(
f"{BASE_URL}/assetdatabases/{dbs[0]['WebId']}/elements",
params={"selectedFields": "Items.WebId;Items.Name;Items.HasChildren"},
)
roots = resp.json().get("Items", [])
for root in roots:
print(f"\n{root['Name']}")
walk_af_tree(session, BASE_URL, root["WebId"])Recipe 10: Event frame search and creation
Search for existing event frames and create new ones. Event frames represent time-bounded process events (batches, alarms, shifts).
def search_event_frames(session, base_url, database_web_id,
start_time="*-7d", end_time="*",
template_name=None, name_filter=None,
max_count=100):
"""Search for event frames in an AF database."""
params = {
"startTime": start_time,
"endTime": end_time,
"maxCount": max_count,
"searchMode": "Overlapping",
"selectedFields": (
"Items.WebId;Items.Name;Items.TemplateName;"
"Items.StartTime;Items.EndTime"
),
}
if template_name:
params["templateName"] = template_name
if name_filter:
params["nameFilter"] = name_filter
resp = session.get(
f"{base_url}/assetdatabases/{database_web_id}/eventframes",
params=params,
)
resp.raise_for_status()
return resp.json().get("Items", [])
def create_event_frame(session, base_url, element_web_id,
name, start_time, end_time=None,
template_name=None, description=None):
"""Create a new event frame on an AF element.
If end_time is None, creates an open (in-progress) event frame.
"""
payload = {
"Name": name,
"StartTime": start_time,
}
if end_time:
payload["EndTime"] = end_time
if template_name:
payload["TemplateName"] = template_name
if description:
payload["Description"] = description
resp = session.post(
f"{base_url}/elements/{element_web_id}/eventframes",
json=payload,
)
resp.raise_for_status()
# Location header contains the URL of the created event frame
return resp.headers.get("Location", "")
# Usage: search for recent event frames
frames = search_event_frames(session, BASE_URL, db_web_id,
template_name="Batch")
for ef in frames:
print(f"{ef['Name']}: {ef['StartTime']} -> {ef.get('EndTime', 'In progress')}")
# Usage: create a new event frame
location = create_event_frame(
session, BASE_URL, element_web_id,
name="Batch-2026-0315-001",
start_time="2026-03-15T08:00:00Z",
end_time="2026-03-15T16:00:00Z",
template_name="Batch",
description="Morning production batch",
)
print(f"Created event frame at: {location}")Recipe 11: Incremental ETL with watermarks
A production-grade ETL pattern that reads new data since the last run using watermark tracking. Survives restarts and handles gaps gracefully.
import json
from pathlib import Path
from datetime import datetime, timezone
class WatermarkTracker:
"""Track extraction watermarks for incremental ETL.
Stores the last successfully extracted timestamp per point
so the next run picks up where the previous one left off.
"""
def __init__(self, path: str = "watermarks.json"):
self.path = Path(path)
self.marks = {}
if self.path.exists():
self.marks = json.loads(self.path.read_text())
def get(self, key: str, default: str = "*-24h") -> str:
return self.marks.get(key, default)
def set(self, key: str, timestamp: str):
self.marks[key] = timestamp
self.path.write_text(json.dumps(self.marks, indent=2))
def incremental_extract(session, base_url, web_id, point_name,
tracker, max_count=10000):
"""Extract new recorded values since the last watermark.
Returns: DataFrame of new values (empty if none)
"""
import pandas as pd
start = tracker.get(point_name)
end = "*"
resp = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": start,
"endTime": end,
"maxCount": max_count,
"boundaryType": "Outside",
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if not items:
return pd.DataFrame()
df = pd.DataFrame(items)
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
# Update watermark to the latest timestamp
latest = df["Timestamp"].max().isoformat()
tracker.set(point_name, latest)
return df.set_index("Timestamp").sort_index()
# Usage
tracker = WatermarkTracker("my_pipeline_watermarks.json")
points = [
("Temperature", temp_web_id),
("Pressure", pressure_web_id),
]
for name, wid in points:
df = incremental_extract(session, BASE_URL, wid, name, tracker)
if not df.empty:
print(f"{name}: extracted {len(df)} new values")
# Save to your destination (database, CSV, data lake, etc.)
# df.to_csv(f"{name}_extract.csv", mode="a", header=False)
else:
print(f"{name}: no new data")Production ETL
For production workloads, run this script via a task scheduler (cron, Windows Task Scheduler, Airflow) instead of a loop. Store data in a database rather than CSV. Add monitoring to detect when extraction falls behind.
Recipe 12: Health check
Monitor PI Web API availability and data freshness. Useful for operations dashboards and alerting.
from datetime import datetime, timezone, timedelta
def pi_health_check(session, base_url, watchlist_web_ids=None,
stale_threshold_minutes=30):
"""Check PI Web API health and data freshness.
Args:
watchlist_web_ids: Optional list of (name, web_id) to check freshness
stale_threshold_minutes: Flag data older than this as stale
Returns: dict with status, latency, and details
"""
import time
result = {"status": "healthy", "checks": {}}
# Check 1: API reachability
start = time.perf_counter()
try:
resp = session.get(f"{base_url}/", timeout=10)
latency = time.perf_counter() - start
result["checks"]["api"] = {
"status": "ok" if resp.ok else "error",
"latency_ms": round(latency * 1000),
"status_code": resp.status_code,
}
if not resp.ok:
result["status"] = "degraded"
except Exception as e:
result["checks"]["api"] = {"status": "error", "error": str(e)}
result["status"] = "down"
return result
# Check 2: Data server connectivity
resp = session.get(f"{base_url}/dataservers",
params={"selectedFields": "Items.Name;Items.IsConnected"})
if resp.ok:
servers = resp.json().get("Items", [])
disconnected = [s["Name"] for s in servers if not s.get("IsConnected")]
result["checks"]["data_servers"] = {
"status": "ok" if not disconnected else "error",
"total": len(servers),
"disconnected": disconnected,
}
if disconnected:
result["status"] = "degraded"
# Check 3: Data freshness (optional watchlist)
if watchlist_web_ids:
stale = []
threshold = datetime.now(timezone.utc) - timedelta(
minutes=stale_threshold_minutes
)
for name, wid in watchlist_web_ids:
try:
resp = session.get(
f"{base_url}/streams/{wid}/value",
params={"selectedFields": "Timestamp"},
timeout=5,
)
if resp.ok:
ts = datetime.fromisoformat(
resp.json()["Timestamp"].replace("Z", "+00:00")
)
if ts < threshold:
stale.append({"name": name, "last_update": ts.isoformat()})
except Exception:
stale.append({"name": name, "error": "read failed"})
result["checks"]["data_freshness"] = {
"status": "ok" if not stale else "warning",
"stale_points": stale,
"threshold_minutes": stale_threshold_minutes,
}
if stale:
result["status"] = "warning" if result["status"] == "healthy" else result["status"]
return result
# Usage
health = pi_health_check(session, BASE_URL, watchlist_web_ids=[
("Reactor_Temp", reactor_temp_wid),
("Feed_Flow", feed_flow_wid),
])
print(f"Overall status: {health['status']}")
for check, details in health["checks"].items():
print(f" {check}: {details['status']}")Get the complete cookbook
Download all recipes
Get the complete PI Web API Cookbook with all recipes above, plus additional patterns for connection pooling, multi-server setups, and channel subscriptions.
Get the full cookbook