Skip to main content
Developer workstation with Python code connecting to industrial equipment
Tutorial

Python Guide

A production-grade guide to using PI Web API with Python. Covers session management with connection pooling, certificate handling for self-signed and corporate CAs, async patterns with aiohttp, pandas integration with quality-aware loading, retry logic, and ETL patterns you can adapt for production pipelines.

Project setup

Start with a clean virtual environment and install the dependencies you need.

setup.shbash
# Create a virtual environment
python -m venv .venv

# Activate it
# Linux/Mac:
source .venv/bin/activate
# Windows:
.venv\Scripts\activate

# Core dependencies
pip install requests pandas

# Optional: for Kerberos or NTLM auth
pip install requests-kerberos requests-ntlm

# Optional: for async operations
pip install aiohttp aiohttp-retry

# Optional: Windows certificate store integration
pip install python-certifi-win32
requirements.txttext
requests>=2.31.0
pandas>=2.0.0
urllib3>=2.0.0

# Authentication (install the ones you need)
requests-kerberos>=0.14.0   # Kerberos auth
requests-ntlm>=1.2.0        # NTLM auth

# Async (optional)
aiohttp>=3.9.0
aiohttp-retry>=2.8.0

# Windows cert store (optional, Windows only)
python-certifi-win32>=1.6.1

Session management with connection pooling

A properly configured requests.Session is the foundation of all PI Web API work in Python. The session holds authentication state, TLS settings, and a connection pool that reuses TCP connections across requests.

pi_session.pypython
import os
import requests
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from urllib3.util.retry import Retry


def create_pi_session(
    base_url: str,
    username: str | None = None,
    password: str | None = None,
    verify: str | bool = True,
    auth_method: str = "basic",
    pool_connections: int = 10,
    pool_maxsize: int = 20,
    max_retries: int = 3,
) -> requests.Session:
    """Create a production-grade session for PI Web API.

    Features:
    - Connection pooling (reuses TCP connections)
    - Automatic retries with exponential backoff
    - Configurable auth (basic, kerberos, ntlm)
    - Proper certificate handling

    Args:
        base_url: PI Web API base URL (e.g., https://server/piwebapi)
        username: Username in DOMAIN\\user format (Basic/NTLM)
        password: Password (Basic/NTLM)
        verify: Path to CA bundle PEM file, or False to skip verification
        auth_method: One of 'basic', 'kerberos', 'ntlm'
        pool_connections: Number of connection pools to cache
        pool_maxsize: Maximum connections per pool (set higher for
            concurrent batch requests)
        max_retries: Automatic retries for 502, 503, 504 responses
    """
    session = requests.Session()
    session.verify = verify
    session.headers.update({
        "Accept": "application/json",
        "Content-Type": "application/json",
        "X-Requested-With": "XMLHttpRequest",  # Required for some PI Web API configs
    })

    # Configure connection pooling and retries
    retry_strategy = Retry(
        total=max_retries,
        backoff_factor=1,  # 1s, 2s, 4s between retries
        status_forcelist=[502, 503, 504],
        allowed_methods=["GET", "POST"],  # Retry both reads and writes
    )
    adapter = HTTPAdapter(
        pool_connections=pool_connections,
        pool_maxsize=pool_maxsize,
        max_retries=retry_strategy,
    )
    session.mount("https://", adapter)
    session.mount("http://", adapter)

    # Configure authentication
    if auth_method == "basic":
        session.auth = HTTPBasicAuth(username, password)
    elif auth_method == "kerberos":
        from requests_kerberos import HTTPKerberosAuth, OPTIONAL
        session.auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
    elif auth_method == "ntlm":
        from requests_ntlm import HttpNtlmAuth
        session.auth = HttpNtlmAuth(username, password)

    # Verify the connection works
    resp = session.get(f"{base_url}/", timeout=10)
    resp.raise_for_status()
    product = resp.json()
    print(f"Connected to {product.get('ProductTitle', 'PI Web API')} "
          f"v{product.get('ProductVersion', 'unknown')}")

    return session


# Usage with environment variables (recommended)
session = create_pi_session(
    base_url=os.environ["PI_WEB_API_URL"],
    username=os.environ.get("PI_USERNAME"),
    password=os.environ.get("PI_PASSWORD"),
    verify=os.environ.get("PI_CA_BUNDLE", False),
    auth_method=os.environ.get("PI_AUTH_METHOD", "basic"),
)

Connection pool sizing

The default pool_maxsize=10 in urllib3 means at most 10 concurrent connections to PI Web API. If you are using batch requests (which are single connections), the default is fine. If you are making parallel requests with threads or asyncio, increase pool_maxsize to match your concurrency level. Setting it too high wastes resources; setting it too low causes connections to be closed and reopened.

Certificate handling

PI Web API servers often use self-signed certificates or certificates signed by an internal corporate CA. Here are the common scenarios and how to handle each one properly.

Option 1: Extract and trust the server certificate

extract_cert.shbash
# Extract the server's certificate using openssl
openssl s_client -connect your-pi-server:443 -showcerts </dev/null 2>/dev/null \
  | openssl x509 -outform PEM > pi_server.pem

# If there's an intermediate CA, extract the full chain:
openssl s_client -connect your-pi-server:443 -showcerts </dev/null 2>/dev/null \
  | sed -n '/-----BEGIN CERTIFICATE-----/,/-----END CERTIFICATE-----/p' \
  > pi_chain.pem

# Verify the certificate
openssl x509 -in pi_server.pem -text -noout | head -20
cert_usage.pypython
# Point the session at your extracted certificate
session = create_pi_session(
    base_url="https://your-pi-server/piwebapi",
    username="DOMAIN\\user",
    password="password",
    verify="/path/to/pi_chain.pem",  # Path to PEM bundle
)

# Or use the REQUESTS_CA_BUNDLE environment variable
# (works globally for all requests in the process)
import os
os.environ["REQUESTS_CA_BUNDLE"] = "/path/to/pi_chain.pem"

Option 2: Trust the system certificate store (Windows)

cert_windows.pypython
# On Windows, pip install python-certifi-win32 bridges the gap between
# Python's certifi CA bundle and the Windows certificate store.
# After installation, requests automatically trusts certificates that
# Windows trusts (including internal CA certs pushed via Group Policy).

# pip install python-certifi-win32

# No code changes needed -- just install the package and use verify=True
session = create_pi_session(
    base_url="https://your-pi-server/piwebapi",
    verify=True,  # Will use Windows cert store via python-certifi-win32
)

Option 3: Trust system store (Linux)

cert_linux.shbash
# Debian/Ubuntu: add corporate CA to system trust store
sudo cp corporate-ca.pem /usr/local/share/ca-certificates/corporate-ca.crt
sudo update-ca-certificates

# RHEL/CentOS: add corporate CA to system trust store
sudo cp corporate-ca.pem /etc/pki/ca-trust/source/anchors/corporate-ca.pem
sudo update-ca-trust

# Then in Python, point requests at the system bundle:
# export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt  # Debian
# export REQUESTS_CA_BUNDLE=/etc/pki/tls/certs/ca-bundle.crt     # RHEL

Never use verify=False in production

Setting verify=False disables all TLS certificate validation, making you vulnerable to man-in-the-middle attacks. It is acceptable for local development, but production code should always validate certificates. Use one of the three options above to properly trust your server's certificate.

Reusable helper functions

Build a small library of typed helper functions that you reuse across scripts. This avoids duplicating URL construction and error handling.

pi_helpers.pypython
"""Reusable helpers for common PI Web API operations.

All functions accept a configured session and base URL.
All functions use selectedFields for performance.
All functions handle digital states gracefully.
"""

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any


def find_point_by_path(
    session, base_url: str, path: str
) -> dict[str, Any]:
    """Look up a PI point by its full path.

    Args:
        path: Full PI point path, e.g. '\\\\SERVER\\sinusoid'

    Returns:
        Point metadata dict with WebId, Name, PointType, etc.

    Raises:
        requests.HTTPError: If point not found (404) or server error
    """
    resp = session.get(
        f"{base_url}/points",
        params={
            "path": path,
            "selectedFields": "WebId;Name;PointType;PointClass;Descriptor",
        },
    )
    resp.raise_for_status()
    return resp.json()


def get_current_value(
    session, base_url: str, web_id: str
) -> dict[str, Any]:
    """Get the current snapshot value of a PI point or AF attribute.

    Returns:
        Dict with Timestamp, Value, Good, and UnitsAbbreviation
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/value",
        params={"selectedFields": "Timestamp;Value;Good;UnitsAbbreviation"},
    )
    resp.raise_for_status()
    return resp.json()


def get_recorded_values(
    session, base_url: str, web_id: str,
    start: str = "*-1d", end: str = "*",
    max_count: int = 10000,
    filter_expression: str | None = None,
    boundary_type: str = "Inside",
) -> list[dict]:
    """Get recorded (compressed) values for a time range.

    Args:
        start: Start time (PI time string, e.g., "*-7d", ISO 8601)
        end: End time
        max_count: Maximum values to return (watch for truncation)
        filter_expression: Server-side filter, e.g., "'.' > 50"
        boundary_type: "Inside", "Outside", or "Interpolated"

    Returns:
        List of value dicts with Timestamp, Value, Good
    """
    params: dict[str, Any] = {
        "startTime": start,
        "endTime": end,
        "maxCount": max_count,
        "boundaryType": boundary_type,
        "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
    }
    if filter_expression:
        params["filterExpression"] = filter_expression

    resp = session.get(
        f"{base_url}/streams/{web_id}/recorded",
        params=params,
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


def get_interpolated_values(
    session, base_url: str, web_id: str,
    start: str = "*-1d", end: str = "*",
    interval: str = "1h",
) -> list[dict]:
    """Get interpolated values at regular intervals.

    Args:
        interval: PI time interval string (e.g., "5m", "1h", "1d")

    Returns:
        List of value dicts at evenly-spaced timestamps
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/interpolated",
        params={
            "startTime": start,
            "endTime": end,
            "interval": interval,
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


def get_summary(
    session, base_url: str, web_id: str,
    start: str = "*-1d", end: str = "*",
    summary_types: list[str] | None = None,
    calculation_basis: str = "TimeWeighted",
) -> dict[str, float]:
    """Get summary statistics for a time range.

    Args:
        summary_types: List of types (e.g., ["Average", "Minimum", "Maximum"])
        calculation_basis: "TimeWeighted" or "EventWeighted"

    Returns:
        Dict mapping summary type to value
    """
    if summary_types is None:
        summary_types = ["Average", "Minimum", "Maximum", "Count"]

    params: dict[str, Any] = {
        "startTime": start,
        "endTime": end,
        "calculationBasis": calculation_basis,
        "selectedFields": "Items.Type;Items.Value.Value;Items.Value.Good",
    }
    # Add each summary type as a separate parameter
    for st in summary_types:
        params.setdefault("summaryType", [])
        if isinstance(params["summaryType"], list):
            params["summaryType"].append(st)

    resp = session.get(
        f"{base_url}/streams/{web_id}/summary",
        params=params,
    )
    resp.raise_for_status()

    result = {}
    for item in resp.json().get("Items", []):
        stype = item["Type"]
        value = item.get("Value", {}).get("Value")
        result[stype] = value

    return result


def write_value(
    session, base_url: str, web_id: str,
    value: Any, timestamp: str = "*",
) -> int:
    """Write a single value to a PI point.

    Returns:
        HTTP status code (202 = buffered, 204 = direct write)
    """
    resp = session.post(
        f"{base_url}/streams/{web_id}/value",
        json={"Value": value, "Timestamp": timestamp},
    )
    return resp.status_code


def extract_numeric_value(item: dict) -> float | None:
    """Extract a numeric value from a PI Web API value item.

    Handles digital states (returned as dicts) and bad quality.
    Returns None for non-numeric or bad-quality values.
    """
    if not item.get("Good", True):
        return None

    value = item["Value"]
    if isinstance(value, dict):
        # Digital state: {"Name": "Active", "Value": 1}
        return None  # or value.get("Value") for the integer code
    if isinstance(value, (int, float)):
        return float(value)
    return None

Async patterns with aiohttp

For high-throughput scenarios (reading thousands of points, real-time dashboards), async I/O lets you make many concurrent requests without threads. Use aiohttp for async PI Web API access.

pi_async.pypython
"""Async PI Web API access using aiohttp.

Use this when you need high concurrency (many points, low latency).
For most batch operations, synchronous requests with the batch endpoint
are simpler and often fast enough.
"""

import asyncio
import aiohttp
from aiohttp import BasicAuth, TCPConnector


async def create_async_session(
    base_url: str,
    username: str,
    password: str,
    verify_ssl: bool = False,
    max_connections: int = 20,
) -> aiohttp.ClientSession:
    """Create an async session with connection pooling.

    Args:
        max_connections: Maximum concurrent TCP connections.
            PI Web API servers typically handle 20-50 concurrent
            connections well. Going higher may trigger throttling.
    """
    connector = TCPConnector(
        limit=max_connections,
        ssl=verify_ssl,
    )
    session = aiohttp.ClientSession(
        connector=connector,
        auth=BasicAuth(username, password),
        headers={
            "Accept": "application/json",
            "Content-Type": "application/json",
        },
    )

    # Verify connection
    async with session.get(f"{base_url}/") as resp:
        data = await resp.json()
        print(f"Connected to {data.get('ProductTitle', 'PI Web API')}")

    return session


async def read_current_values_async(
    session: aiohttp.ClientSession,
    base_url: str,
    web_ids: list[str],
    concurrency: int = 10,
) -> dict[str, dict]:
    """Read current values for many points using async concurrency.

    Uses a semaphore to limit concurrent requests and avoid
    overwhelming the PI Web API server.
    """
    semaphore = asyncio.Semaphore(concurrency)
    results = {}

    async def read_one(web_id: str):
        async with semaphore:
            url = (
                f"{base_url}/streams/{web_id}/value"
                f"?selectedFields=Timestamp;Value;Good"
            )
            async with session.get(url) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    results[web_id] = data
                else:
                    results[web_id] = {"error": resp.status}

    # Launch all reads concurrently (semaphore limits actual concurrency)
    await asyncio.gather(*(read_one(wid) for wid in web_ids))
    return results


# Usage
async def main():
    session = await create_async_session(
        base_url="https://your-server/piwebapi",
        username="DOMAIN\\user",
        password="password",
        max_connections=20,
    )

    try:
        web_ids = ["F1DPaH...", "F1DPbX...", "F1DPcZ..."]
        values = await read_current_values_async(
            session, "https://your-server/piwebapi",
            web_ids, concurrency=10,
        )
        for wid, val in values.items():
            print(f"{wid}: {val}")
    finally:
        await session.close()


# asyncio.run(main())

When to use async vs batch

Use the batch endpoint when you need to combine different types of operations (reads + writes + lookups) or when all requests go to the same server. Use async when you need to read from multiple PI Web API servers in parallel, or when you need real-time streaming with low latency. For most use cases, batch is simpler and sufficient.

Pandas integration

A production-grade pattern for loading PI data into pandas with proper handling of quality flags, digital states, timezones, and multi-point alignment.

pi_pandas.pypython
import pandas as pd
from typing import Any


def pi_to_dataframe(
    items: list[dict],
    column_name: str = "Value",
    include_quality: bool = False,
) -> pd.DataFrame:
    """Convert PI Web API value items to a pandas DataFrame.

    Handles:
    - UTC timezone-aware DatetimeIndex
    - Digital states (dict values) -> NaN with state name preserved
    - Bad quality values -> NaN
    - Proper numeric type conversion

    Args:
        items: The "Items" list from a recorded/interpolated response
        column_name: Name for the value column
        include_quality: If True, add a "{column_name}_good" column

    Returns:
        DataFrame with DatetimeIndex (UTC) and value column(s)
    """
    if not items:
        return pd.DataFrame(columns=[column_name])

    rows = []
    for item in items:
        value = item["Value"]
        good = item.get("Good", True)

        # Handle digital states (value is a dict like {"Name": "Active", "Value": 1})
        if isinstance(value, dict):
            numeric_value = None
            state_name = value.get("Name", "")
        elif isinstance(value, (int, float)):
            numeric_value = float(value) if good else None
            state_name = None
        elif isinstance(value, str):
            # Some PI points return string values
            numeric_value = None
            state_name = None
        else:
            numeric_value = None
            state_name = None

        row: dict[str, Any] = {
            "Timestamp": item["Timestamp"],
            column_name: numeric_value,
        }
        if include_quality:
            row[f"{column_name}_good"] = good
            row[f"{column_name}_state"] = state_name

        rows.append(row)

    df = pd.DataFrame(rows)
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
    df = df.set_index("Timestamp").sort_index()

    return df


def load_multiple_points(
    session, base_url: str,
    points: dict[str, str],
    start: str = "*-7d", end: str = "*",
    interval: str = "1h",
    use_interpolated: bool = True,
) -> pd.DataFrame:
    """Load multiple PI points into a single aligned DataFrame.

    Args:
        points: Dict of {column_name: web_id}
        use_interpolated: If True, use interpolated endpoint (aligned timestamps).
            If False, use recorded endpoint (different timestamps per point,
            combined with outer join).

    Returns:
        DataFrame with one column per point and aligned timestamps
    """
    frames = {}

    for name, web_id in points.items():
        if use_interpolated:
            resp = session.get(
                f"{base_url}/streams/{web_id}/interpolated",
                params={
                    "startTime": start,
                    "endTime": end,
                    "interval": interval,
                    "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
                },
            )
        else:
            resp = session.get(
                f"{base_url}/streams/{web_id}/recorded",
                params={
                    "startTime": start,
                    "endTime": end,
                    "maxCount": 50000,
                    "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
                },
            )

        resp.raise_for_status()
        items = resp.json().get("Items", [])
        df = pi_to_dataframe(items, column_name=name)
        frames[name] = df[name]

    combined = pd.concat(frames, axis=1)
    return combined


# Usage: load 24 hours of 3 points at 5-minute intervals
points = {
    "Temperature": "web-id-1",
    "Pressure": "web-id-2",
    "Flow_Rate": "web-id-3",
}

df = load_multiple_points(
    session, BASE_URL, points,
    start="*-24h", end="*", interval="5m",
)

print(f"Shape: {df.shape}")
print(f"Time range: {df.index.min()} to {df.index.max()}")
print(f"NaN counts (digital states or bad quality):")
print(df.isna().sum())
print(f"\nStatistics:")
print(df.describe())

Use interpolated for analysis, recorded for audits

Use use_interpolated=True (default) when you need aligned timestamps for correlation analysis, machine learning, or charting. Use use_interpolated=False when you need the actual recorded events (e.g., for compliance audits or event detection). Recorded values from different points have different timestamps, so the resulting DataFrame will have many NaN values from the outer join.

Production error handling and retry logic

Production code needs proper error handling with retries, logging, and graceful degradation. This pattern wraps PI Web API calls with structured error handling.

pi_robust.pypython
import logging
import time
from dataclasses import dataclass
from enum import Enum

import requests

logger = logging.getLogger("pi_web_api")


class PIErrorType(Enum):
    AUTH = "authentication"
    PERMISSION = "permission"
    NOT_FOUND = "not_found"
    SERVER_ERROR = "server_error"
    TIMEOUT = "timeout"
    CONNECTION = "connection"


@dataclass
class PIError:
    error_type: PIErrorType
    status_code: int | None
    message: str
    url: str


def pi_request(
    session: requests.Session,
    method: str,
    url: str,
    max_retries: int = 3,
    timeout: int = 30,
    **kwargs,
) -> requests.Response:
    """Make a PI Web API request with structured error handling.

    Retries on:
    - 502 Bad Gateway (PI Data Archive connectivity)
    - 503 Service Unavailable (server overloaded)
    - 504 Gateway Timeout
    - ConnectionError, Timeout

    Fails immediately on:
    - 401 Unauthorized (auth problem — retrying won't help)
    - 403 Forbidden (permission problem)
    - 404 Not Found (resource doesn't exist)
    - 400 Bad Request (malformed query)
    """
    last_error = None

    for attempt in range(max_retries):
        try:
            response = session.request(
                method, url, timeout=timeout, **kwargs
            )

            # Non-retryable errors: fail immediately
            if response.status_code == 401:
                logger.error(f"Authentication failed: {url}")
                raise PIRequestError(PIError(
                    PIErrorType.AUTH, 401, "Authentication failed", url
                ))

            if response.status_code == 403:
                logger.error(f"Permission denied: {url}")
                raise PIRequestError(PIError(
                    PIErrorType.PERMISSION, 403, "Permission denied", url
                ))

            if response.status_code == 404:
                logger.warning(f"Not found: {url}")
                raise PIRequestError(PIError(
                    PIErrorType.NOT_FOUND, 404, "Resource not found", url
                ))

            if response.status_code == 400:
                body = response.text[:500]
                logger.error(f"Bad request: {url} - {body}")
                raise PIRequestError(PIError(
                    PIErrorType.SERVER_ERROR, 400, f"Bad request: {body}", url
                ))

            # Retryable server errors
            if response.status_code in (502, 503, 504):
                wait = 2 ** attempt
                logger.warning(
                    f"Server error {response.status_code} on {url}, "
                    f"retry {attempt + 1}/{max_retries} in {wait}s"
                )
                last_error = response
                time.sleep(wait)
                continue

            return response

        except requests.ConnectionError as e:
            wait = 2 ** attempt
            logger.warning(
                f"Connection error: {e}, retry {attempt + 1}/{max_retries} in {wait}s"
            )
            last_error = e
            time.sleep(wait)

        except requests.Timeout as e:
            wait = 2 ** attempt
            logger.warning(
                f"Timeout: {e}, retry {attempt + 1}/{max_retries} in {wait}s"
            )
            last_error = e
            time.sleep(wait)

    # All retries exhausted
    if isinstance(last_error, requests.Response):
        raise PIRequestError(PIError(
            PIErrorType.SERVER_ERROR, last_error.status_code,
            f"Failed after {max_retries} retries", url
        ))
    raise PIRequestError(PIError(
        PIErrorType.CONNECTION, None,
        f"Failed after {max_retries} retries: {last_error}", url
    ))


class PIRequestError(Exception):
    def __init__(self, error: PIError):
        self.error = error
        super().__init__(f"[{error.error_type.value}] {error.message}")


# Configure logging for production
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("pi_web_api.log"),
    ],
)

ETL pattern with watermark tracking

A production ETL pipeline needs incremental extraction (only new data since the last run), watermark tracking, and proper error recovery.

pi_etl.pypython
"""Production ETL: incremental extraction with watermark tracking.

Extracts PI data since the last successful run, transforms it,
and saves to CSV (or database, cloud storage, etc.).
Watermark file ensures no data is missed or duplicated across runs.
"""

import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pandas as pd

logger = logging.getLogger("pi_etl")


class WatermarkTracker:
    """Track the last successful extraction timestamp per point.

    Stores watermarks in a JSON file so they persist across runs.
    """

    def __init__(self, watermark_file: str = "watermarks.json"):
        self.file = Path(watermark_file)
        self.marks: dict[str, str] = {}
        if self.file.exists():
            self.marks = json.loads(self.file.read_text())

    def get(self, point_name: str, default_start: str = "*-7d") -> str:
        """Get the last extracted timestamp for a point."""
        return self.marks.get(point_name, default_start)

    def update(self, point_name: str, timestamp: str):
        """Update the watermark after successful extraction."""
        self.marks[point_name] = timestamp
        self.file.write_text(json.dumps(self.marks, indent=2))


def incremental_extract(
    session, base_url: str,
    points: dict[str, str],
    watermarks: WatermarkTracker,
    interval: str = "5m",
    output_dir: str = "./data",
) -> pd.DataFrame | None:
    """Extract new data since the last watermark for each point.

    Args:
        points: Dict of {column_name: web_id}
        watermarks: Tracker for last-extracted timestamps
        interval: Interpolation interval
        output_dir: Directory for output CSV files

    Returns:
        DataFrame with new data, or None if no new data
    """
    now = datetime.now(timezone.utc)
    frames = {}

    for name, web_id in points.items():
        start = watermarks.get(name)
        logger.info(f"Extracting {name}: {start} to {now.isoformat()}")

        resp = session.get(
            f"{base_url}/streams/{web_id}/interpolated",
            params={
                "startTime": start,
                "endTime": now.isoformat(),
                "interval": interval,
                "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
            },
        )
        resp.raise_for_status()
        items = resp.json().get("Items", [])

        if not items:
            logger.info(f"  {name}: no new data")
            continue

        logger.info(f"  {name}: {len(items)} new values")
        df = pi_to_dataframe(items, column_name=name)
        frames[name] = df[name]

        # Update watermark to the last extracted timestamp
        last_ts = items[-1]["Timestamp"]
        watermarks.update(name, last_ts)

    if not frames:
        return None

    combined = pd.concat(frames, axis=1)

    # Save to CSV
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    filename = output_path / f"pi_data_{now.strftime('%Y%m%d_%H%M%S')}.csv"
    combined.to_csv(filename)
    logger.info(f"Saved {len(combined)} rows to {filename}")

    return combined


# Usage
watermarks = WatermarkTracker("my_project_watermarks.json")
points = {
    "Temperature": "F1DPaH...",
    "Pressure": "F1DPbX...",
    "Flow_Rate": "F1DPcZ...",
}

df = incremental_extract(
    session, BASE_URL, points, watermarks,
    interval="5m", output_dir="./data/pi_extract",
)
if df is not None:
    print(f"Extracted {len(df)} rows")
    print(df.describe())
else:
    print("No new data since last extraction")

Environment variable configuration

A clean way to manage PI Web API configuration across environments (development, staging, production).

.env.example (commit this, not .env)text
PI_WEB_API_URL=https://your-server/piwebapi
PI_USERNAME=DOMAIN\username
PI_PASSWORD=your-password
PI_AUTH_METHOD=basic
PI_CA_BUNDLE=/path/to/ca-bundle.pem
PI_POOL_SIZE=20
PI_MAX_RETRIES=3
config.pypython
"""Load PI Web API configuration from environment variables."""

import os

from dataclasses import dataclass, field


@dataclass
class PIConfig:
    """Configuration for PI Web API connection."""
    base_url: str
    username: str | None = None
    password: str | None = None
    auth_method: str = "basic"
    verify: str | bool = True
    pool_size: int = 20
    max_retries: int = 3

    @classmethod
    def from_env(cls) -> "PIConfig":
        """Load configuration from environment variables."""
        verify_val = os.environ.get("PI_CA_BUNDLE", "")
        return cls(
            base_url=os.environ["PI_WEB_API_URL"],
            username=os.environ.get("PI_USERNAME"),
            password=os.environ.get("PI_PASSWORD"),
            auth_method=os.environ.get("PI_AUTH_METHOD", "basic"),
            verify=verify_val if verify_val else False,
            pool_size=int(os.environ.get("PI_POOL_SIZE", "20")),
            max_retries=int(os.environ.get("PI_MAX_RETRIES", "3")),
        )

    def create_session(self) -> "requests.Session":
        """Create a configured session from this config."""
        return create_pi_session(
            base_url=self.base_url,
            username=self.username,
            password=self.password,
            verify=self.verify,
            auth_method=self.auth_method,
            pool_maxsize=self.pool_size,
            max_retries=self.max_retries,
        )


# Usage
config = PIConfig.from_env()
session = config.create_session()

Never commit .env files

Add .env to your .gitignore. Commit a .env.example with placeholder values so your team knows which variables are required. In CI/CD, inject secrets via environment variables or a secrets manager.

Need help?