Skip to main content
Cookbook

Advanced Recipes

Production-grade patterns that go beyond the basics. Each recipe solves a real integration problem with complete, copy-paste-ready Python code.

Start with the basics first

These recipes assume you are comfortable with session setup, authentication, and basic reads/writes. If not, start with the PI Web API Cookbook first.

Recipe 1: Parallel reads with concurrent.futures

When you need to read hundreds of points, sequential requests are too slow. Use a thread pool to parallelize reads while respecting server limits. For even better performance, use the batch endpoint (Recipe 8 in the Cookbook) -- but parallel reads are useful when you need recorded/interpolated data per point.

parallel_reads.pypython
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd

def read_recorded_for_point(session, base_url, web_id, point_name,
                            start_time="*-24h", end_time="*"):
    """Read recorded values for a single point."""
    resp = session.get(
        f"{base_url}/streams/{web_id}/recorded",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "maxCount": 10000,
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()
    items = resp.json().get("Items", [])

    df = pd.DataFrame(items)
    if not df.empty:
        df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
        df["PointName"] = point_name
    return df


def read_many_recorded(session, base_url, points, max_workers=10,
                       start_time="*-24h", end_time="*"):
    """Read recorded values for many points in parallel.

    Args:
        points: list of (name, web_id) tuples
        max_workers: Number of concurrent threads. Start with 10,
                     increase only if the server handles it well.
                     Monitor PI Web API CPU when increasing.

    Returns: Combined DataFrame with PointName column
    """
    results = []
    errors = []

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {
            pool.submit(
                read_recorded_for_point, session, base_url,
                wid, name, start_time, end_time
            ): name
            for name, wid in points
        }

        for future in as_completed(futures):
            name = futures[future]
            try:
                df = future.result()
                if not df.empty:
                    results.append(df)
            except Exception as e:
                errors.append({"point": name, "error": str(e)})

    if errors:
        print(f"Warning: {len(errors)} reads failed:")
        for err in errors:
            print(f"  {err['point']}: {err['error']}")

    if results:
        return pd.concat(results, ignore_index=True)
    return pd.DataFrame()


# Usage
points = [
    ("Temperature_R1", "F1DPaH..."),
    ("Pressure_R1", "F1DPbX..."),
    ("Flow_R1", "F1DPcY..."),
    # ... up to hundreds of points
]
df = read_many_recorded(session, BASE_URL, points, max_workers=10)
print(f"Read {len(df)} total values across {df['PointName'].nunique()} points")

Respect server capacity

Start with 10 concurrent workers and monitor PI Web API server CPU and response times. Most PI Web API servers handle 10-20 concurrent connections well. Going higher risks 503 errors and can affect other users of the same PI Web API server.

Recipe 2: Resilient requests with retries and circuit breaker

Network glitches, server restarts, and transient 503 errors happen in production. This recipe adds retry logic for reads and a circuit breaker pattern that backs off when the server is struggling.

resilient_session.pypython
import time
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logger = logging.getLogger("piwebapi")

def create_resilient_session(username, password, ca_bundle=False):
    """Create a session with automatic retries and connection pooling."""
    import requests
    session = requests.Session()
    session.auth = (username, password)
    session.verify = ca_bundle

    retry_strategy = Retry(
        total=3,
        backoff_factor=1,           # 1s, 2s, 4s between retries
        status_forcelist=[408, 429, 500, 502, 503, 504],
        allowed_methods=["GET"],    # Only retry safe (read) methods
        raise_on_status=False,
    )

    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,        # Connection pool size
        pool_maxsize=10,            # Max connections per host
    )
    session.mount("https://", adapter)
    session.mount("http://", adapter)

    return session


class CircuitBreaker:
    """Simple circuit breaker for PI Web API requests.

    When errors exceed the threshold, the breaker opens and
    requests are blocked for a cooldown period. This prevents
    hammering a struggling server.
    """

    def __init__(self, failure_threshold=5, cooldown_seconds=30):
        self.failure_threshold = failure_threshold
        self.cooldown_seconds = cooldown_seconds
        self.failures = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed = healthy, open = blocking

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
            logger.warning(
                f"Circuit breaker OPEN after {self.failures} failures. "
                f"Blocking requests for {self.cooldown_seconds}s."
            )

    def allow_request(self) -> bool:
        if self.state == "closed":
            return True
        # Check if cooldown has elapsed
        elapsed = time.time() - self.last_failure_time
        if elapsed >= self.cooldown_seconds:
            self.state = "half-open"
            logger.info("Circuit breaker half-open. Allowing test request.")
            return True
        return False


def resilient_get(session, url, circuit_breaker, **kwargs):
    """Make a GET request with circuit breaker protection."""
    if not circuit_breaker.allow_request():
        logger.warning(f"Circuit breaker OPEN. Skipping request to {url}")
        return None

    try:
        resp = session.get(url, **kwargs)
        if resp.status_code < 500:
            circuit_breaker.record_success()
        else:
            circuit_breaker.record_failure()
        return resp
    except Exception as e:
        circuit_breaker.record_failure()
        logger.error(f"Request failed: {e}")
        return None


# Usage
session = create_resilient_session("DOMAIN\\user", "password")
breaker = CircuitBreaker(failure_threshold=5, cooldown_seconds=30)

resp = resilient_get(session, f"{BASE_URL}/streams/{WEB_ID}/value", breaker)
if resp and resp.ok:
    print(f"Value: {resp.json()['Value']}")

Why only retry GETs?

Retrying POST (write) requests can cause duplicate writes. If you need to retry writes, add idempotency logic: use updateOption=Replace so duplicate writes overwrite rather than conflict.

Recipe 3: Change detection with batch snapshots

Instead of polling every point individually, use the batch endpoint to take periodic snapshots and detect which values actually changed. Much more efficient than individual polling.

change_detection.pypython
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class ChangeEvent:
    point_name: str
    web_id: str
    old_value: object
    new_value: object
    old_timestamp: str
    new_timestamp: str

class ChangeDetector:
    """Detect which PI points changed between polls using batch reads."""

    def __init__(self, session, base_url, points):
        """
        Args:
            points: list of (name, web_id) tuples
        """
        self.session = session
        self.base_url = base_url
        self.points = points
        self.last_snapshot: dict = {}

    def take_snapshot(self) -> list[ChangeEvent]:
        """Read current values via batch and return only changed points."""
        # Build batch request
        batch = {}
        for name, wid in self.points:
            batch[name] = {
                "Method": "GET",
                "Resource": (
                    f"{self.base_url}/streams/{wid}/value"
                    f"?selectedFields=Timestamp;Value;Good"
                ),
            }

        resp = self.session.post(f"{self.base_url}/batch", json=batch)
        if not resp.ok:
            return []

        # Parse results and detect changes
        changes = []
        current = {}

        for name, result in resp.json().items():
            if result["Status"] != 200:
                continue

            content = result["Content"]
            value = content["Value"]
            if isinstance(value, dict):
                value = value.get("Name", str(value))

            current[name] = {
                "value": value,
                "timestamp": content["Timestamp"],
            }

            # Compare with previous snapshot
            prev = self.last_snapshot.get(name)
            if prev is None or prev["timestamp"] != content["Timestamp"]:
                # Find web_id for this point
                wid = next(w for n, w in self.points if n == name)
                changes.append(ChangeEvent(
                    point_name=name,
                    web_id=wid,
                    old_value=prev["value"] if prev else None,
                    new_value=value,
                    old_timestamp=prev["timestamp"] if prev else None,
                    new_timestamp=content["Timestamp"],
                ))

        self.last_snapshot = current
        return changes


# Usage
points = [
    ("Reactor_Temp", temp_wid),
    ("Reactor_Pressure", pressure_wid),
    ("Feed_Flow", flow_wid),
]
detector = ChangeDetector(session, BASE_URL, points)

# First call: everything is "changed" (no previous snapshot)
changes = detector.take_snapshot()
print(f"Initial snapshot: {len(changes)} points")

# Subsequent calls: only actually changed points
import time
time.sleep(10)
changes = detector.take_snapshot()
print(f"Changed since last poll: {len(changes)} points")
for c in changes:
    print(f"  {c.point_name}: {c.old_value} -> {c.new_value}")

Recipe 4: Historical data backfill

Backfilling large time ranges requires chunking to avoid timeouts and memory issues. This recipe reads recorded values in configurable chunks with progress reporting and truncation detection.

backfill.pypython
import pandas as pd
from datetime import datetime, timedelta, timezone

def backfill_recorded(
    session, base_url, web_id, point_name,
    start, end,
    chunk_days=1, max_count=10000,
    selected_fields="Items.Timestamp;Items.Value;Items.Good",
):
    """Read recorded values in chunks to avoid timeouts.

    Args:
        start: Start datetime (timezone-aware)
        end: End datetime (timezone-aware)
        chunk_days: Size of each time window in days
        max_count: Max values per chunk request

    Returns: Combined DataFrame
    """
    all_frames = []
    chunk_start = start
    total_values = 0
    truncated_chunks = 0

    print(f"Backfilling {point_name}: {start.date()} to {end.date()}")

    while chunk_start < end:
        chunk_end = min(chunk_start + timedelta(days=chunk_days), end)

        resp = session.get(
            f"{base_url}/streams/{web_id}/recorded",
            params={
                "startTime": chunk_start.isoformat(),
                "endTime": chunk_end.isoformat(),
                "maxCount": max_count,
                "boundaryType": "Inside",
                "selectedFields": selected_fields,
            },
        )
        resp.raise_for_status()
        items = resp.json().get("Items", [])

        if items:
            df = pd.DataFrame(items)
            df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
            all_frames.append(df)
            total_values += len(items)

            # Detect truncation
            if len(items) == max_count:
                truncated_chunks += 1
                print(f"  {chunk_start.date()}: {len(items)} values (TRUNCATED - reduce chunk_days)")
            else:
                print(f"  {chunk_start.date()}: {len(items)} values")

        chunk_start = chunk_end

    if truncated_chunks > 0:
        print(f"\nWARNING: {truncated_chunks} chunk(s) were truncated. "
              f"Reduce chunk_days or increase max_count to get all data.")

    if not all_frames:
        print("No data found in the specified range.")
        return pd.DataFrame()

    result = pd.concat(all_frames, ignore_index=True)
    result = result.drop_duplicates(subset=["Timestamp"])
    result = result.sort_values("Timestamp").reset_index(drop=True)
    print(f"\nTotal: {len(result)} unique recorded values")
    return result


# Usage: backfill 90 days of data
df = backfill_recorded(
    session, BASE_URL, WEB_ID, "Temperature_R1",
    start=datetime(2026, 1, 1, tzinfo=timezone.utc),
    end=datetime(2026, 3, 31, tzinfo=timezone.utc),
    chunk_days=1,
    max_count=50000,
)

# Save to CSV
if not df.empty:
    df.to_csv("temperature_backfill.csv", index=False)
    print(f"Saved to temperature_backfill.csv")

Recipe 5: Multi-server aggregation

Some organizations run multiple PI Data Archives across plants or regions. This recipe reads the same metric from multiple servers in parallel and combines the results for cross-plant comparison.

multi_server.pypython
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

SERVERS = [
    {
        "name": "Plant-A",
        "url": "https://pi-a/piwebapi",
        "point_path": "\\\\PI-A\\Flow_Rate",
    },
    {
        "name": "Plant-B",
        "url": "https://pi-b/piwebapi",
        "point_path": "\\\\PI-B\\Flow_Rate",
    },
    {
        "name": "Plant-C",
        "url": "https://pi-c/piwebapi",
        "point_path": "\\\\PI-C\\Flow_Rate",
    },
]


def read_from_server(session, server, start_time, end_time):
    """Read recorded values from one server."""
    # Look up the point
    resp = session.get(
        f"{server['url']}/points",
        params={
            "path": server["point_path"],
            "selectedFields": "WebId",
        },
    )
    resp.raise_for_status()
    web_id = resp.json()["WebId"]

    # Read values
    resp = session.get(
        f"{server['url']}/streams/{web_id}/recorded",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "maxCount": 10000,
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()

    items = resp.json().get("Items", [])
    if not items:
        return pd.DataFrame()

    df = pd.DataFrame(items)
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
    df["Server"] = server["name"]
    return df


def aggregate_servers(session, servers, start_time, end_time):
    """Read from all servers in parallel and combine."""
    frames = []
    errors = []

    with ThreadPoolExecutor(max_workers=len(servers)) as pool:
        futures = {
            pool.submit(read_from_server, session, s,
                        start_time, end_time): s["name"]
            for s in servers
        }
        for future in futures:
            name = futures[future]
            try:
                df = future.result()
                if not df.empty:
                    frames.append(df)
                    print(f"  {name}: {len(df)} values")
                else:
                    print(f"  {name}: no data")
            except Exception as e:
                errors.append(name)
                print(f"  {name}: ERROR - {e}")

    if not frames:
        return pd.DataFrame()

    combined = pd.concat(frames, ignore_index=True)
    return combined


# Usage
print("Reading from all plants...")
df = aggregate_servers(session, SERVERS, "*-24h", "*")

if not df.empty:
    # Cross-plant summary
    summary = df.groupby("Server")["Value"].agg(["mean", "min", "max", "count"])
    print(f"\nCross-plant summary:")
    print(summary)

Recipe 6: Data quality validation

Validate PI data quality before downstream processing. Detect stale data, bad quality percentages, outliers, and gaps. Essential for production data pipelines.

data_quality.pypython
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
import pandas as pd

@dataclass
class QualityReport:
    point_name: str
    total_values: int
    good_count: int
    bad_count: int
    good_percent: float
    latest_timestamp: str
    is_stale: bool
    stale_minutes: float
    value_min: float | None
    value_max: float | None
    value_mean: float | None
    has_outliers: bool
    outlier_count: int
    gap_count: int
    max_gap_minutes: float


def validate_data_quality(
    session, base_url, web_id, point_name,
    start_time="*-24h", end_time="*",
    stale_threshold_minutes=30,
    outlier_std_threshold=4.0,
    expected_interval_minutes=5,
    max_gap_multiplier=3,
):
    """Validate data quality for a PI point.

    Args:
        stale_threshold_minutes: Flag if latest value is older than this
        outlier_std_threshold: Flag values more than N std devs from mean
        expected_interval_minutes: Expected time between events
        max_gap_multiplier: Flag gaps > this * expected_interval

    Returns: QualityReport
    """
    # Get summary statistics
    resp = session.get(
        f"{base_url}/streams/{web_id}/summary",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "summaryType": "Average,Minimum,Maximum,Count,PercentGood",
        },
    )
    resp.raise_for_status()

    stats = {}
    for item in resp.json().get("Items", []):
        stats[item["Type"]] = item["Value"].get("Value")

    # Get recorded values for gap and outlier analysis
    resp = session.get(
        f"{base_url}/streams/{web_id}/recorded",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "maxCount": 50000,
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()
    items = resp.json().get("Items", [])

    if not items:
        return QualityReport(
            point_name=point_name, total_values=0,
            good_count=0, bad_count=0, good_percent=0.0,
            latest_timestamp="N/A", is_stale=True, stale_minutes=999,
            value_min=None, value_max=None, value_mean=None,
            has_outliers=False, outlier_count=0,
            gap_count=0, max_gap_minutes=0,
        )

    df = pd.DataFrame(items)
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
    df = df.sort_values("Timestamp")

    # Staleness check
    latest_ts = df["Timestamp"].max()
    now = datetime.now(timezone.utc)
    stale_minutes = (now - latest_ts).total_seconds() / 60

    # Quality counts
    good_count = int(df["Good"].sum()) if "Good" in df.columns else len(df)
    bad_count = len(df) - good_count

    # Outlier detection (only on numeric, good-quality values)
    good_numeric = df[df.get("Good", True) == True]["Value"].apply(
        lambda v: v if isinstance(v, (int, float)) else None
    ).dropna()

    outlier_count = 0
    if len(good_numeric) > 10:
        mean = good_numeric.mean()
        std = good_numeric.std()
        if std > 0:
            outlier_count = int(
                ((good_numeric - mean).abs() > outlier_std_threshold * std).sum()
            )

    # Gap detection
    timestamps = df["Timestamp"].sort_values()
    gaps = timestamps.diff().dropna()
    max_gap = timedelta(minutes=expected_interval_minutes * max_gap_multiplier)
    large_gaps = gaps[gaps > max_gap]
    max_gap_minutes = gaps.max().total_seconds() / 60 if len(gaps) > 0 else 0

    return QualityReport(
        point_name=point_name,
        total_values=len(df),
        good_count=good_count,
        bad_count=bad_count,
        good_percent=stats.get("PercentGood", (good_count / len(df) * 100)),
        latest_timestamp=latest_ts.isoformat(),
        is_stale=stale_minutes > stale_threshold_minutes,
        stale_minutes=round(stale_minutes, 1),
        value_min=stats.get("Minimum"),
        value_max=stats.get("Maximum"),
        value_mean=stats.get("Average"),
        has_outliers=outlier_count > 0,
        outlier_count=outlier_count,
        gap_count=len(large_gaps),
        max_gap_minutes=round(max_gap_minutes, 1),
    )


# Usage
points = [
    ("Reactor_Temp", reactor_temp_wid),
    ("Feed_Pressure", feed_pressure_wid),
    ("Product_Flow", product_flow_wid),
]

print(f"{'Point':<20} {'Values':>8} {'Good%':>8} {'Stale':>6} {'Outliers':>9} {'Gaps':>5}")
print("-" * 62)
for name, wid in points:
    report = validate_data_quality(session, BASE_URL, wid, name)
    stale_flag = "YES" if report.is_stale else "no"
    outlier_flag = str(report.outlier_count) if report.has_outliers else "0"
    print(f"{name:<20} {report.total_values:>8} {report.good_percent:>7.1f}% "
          f"{stale_flag:>6} {outlier_flag:>9} {report.gap_count:>5}")

Recipe 7: Monitoring and alerting integration

Integrate PI data monitoring with your alerting stack. This recipe checks point values against thresholds and sends alerts via webhooks (compatible with Slack, Teams, PagerDuty, etc.).

monitoring_alerts.pypython
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone

logger = logging.getLogger("pi_monitor")

@dataclass
class AlertRule:
    point_name: str
    web_id: str
    high_limit: float | None = None
    low_limit: float | None = None
    stale_minutes: int = 30
    description: str = ""


@dataclass
class Alert:
    rule: AlertRule
    alert_type: str  # "high", "low", "stale", "bad_quality"
    value: object
    timestamp: str
    message: str


def check_alerts(session, base_url, rules: list[AlertRule]) -> list[Alert]:
    """Check all alert rules against current PI values.

    Uses batch endpoint for efficiency (one request for all points).
    """
    # Batch read all points
    batch = {}
    for rule in rules:
        batch[rule.point_name] = {
            "Method": "GET",
            "Resource": (
                f"{base_url}/streams/{rule.web_id}/value"
                f"?selectedFields=Timestamp;Value;Good"
            ),
        }

    resp = session.post(f"{base_url}/batch", json=batch)
    if not resp.ok:
        logger.error(f"Batch read failed: {resp.status_code}")
        return []

    alerts = []
    now = datetime.now(timezone.utc)

    for rule in rules:
        result = resp.json().get(rule.point_name, {})
        if result.get("Status") != 200:
            alerts.append(Alert(
                rule=rule, alert_type="read_error",
                value=None, timestamp=now.isoformat(),
                message=f"Cannot read {rule.point_name}: HTTP {result.get('Status')}",
            ))
            continue

        content = result["Content"]
        value = content["Value"]
        timestamp = content["Timestamp"]
        good = content.get("Good", True)

        # Check quality
        if not good:
            alerts.append(Alert(
                rule=rule, alert_type="bad_quality",
                value=value, timestamp=timestamp,
                message=f"{rule.point_name} has bad quality data",
            ))

        # Check staleness
        try:
            ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
            age_minutes = (now - ts).total_seconds() / 60
            if age_minutes > rule.stale_minutes:
                alerts.append(Alert(
                    rule=rule, alert_type="stale",
                    value=value, timestamp=timestamp,
                    message=f"{rule.point_name} is stale ({age_minutes:.0f}m old)",
                ))
        except Exception:
            pass

        # Check thresholds (only for numeric values)
        if isinstance(value, (int, float)):
            if rule.high_limit is not None and value > rule.high_limit:
                alerts.append(Alert(
                    rule=rule, alert_type="high",
                    value=value, timestamp=timestamp,
                    message=f"{rule.point_name} = {value} exceeds high limit {rule.high_limit}",
                ))
            if rule.low_limit is not None and value < rule.low_limit:
                alerts.append(Alert(
                    rule=rule, alert_type="low",
                    value=value, timestamp=timestamp,
                    message=f"{rule.point_name} = {value} below low limit {rule.low_limit}",
                ))

    return alerts


def send_webhook(url: str, alerts: list[Alert]):
    """Send alerts to a webhook (Slack, Teams, PagerDuty, etc.)."""
    import requests as req

    for alert in alerts:
        payload = {
            "text": f"PI Alert [{alert.alert_type.upper()}]: {alert.message}",
            "point": alert.rule.point_name,
            "value": str(alert.value),
            "timestamp": alert.timestamp,
            "type": alert.alert_type,
        }
        try:
            req.post(url, json=payload, timeout=10)
        except Exception as e:
            logger.error(f"Webhook failed: {e}")


# Usage
rules = [
    AlertRule("Reactor_Temp", reactor_wid, high_limit=350.0, low_limit=200.0,
             stale_minutes=15, description="Reactor temperature"),
    AlertRule("Feed_Pressure", pressure_wid, high_limit=150.0,
             stale_minutes=10, description="Feed pressure"),
    AlertRule("Product_Flow", flow_wid, low_limit=10.0,
             stale_minutes=30, description="Product flow rate"),
]

alerts = check_alerts(session, BASE_URL, rules)
if alerts:
    print(f"{len(alerts)} alert(s) triggered:")
    for a in alerts:
        print(f"  [{a.alert_type}] {a.message}")
    # send_webhook("https://hooks.slack.com/services/...", alerts)
else:
    print("All points within normal limits.")

Recipe 8: Scheduled snapshot to database

A production-grade pattern that reads current values on a schedule and inserts them into a SQL database. More robust than CSV files for operational data capture.

scheduled_snapshot_db.pypython
"""Scheduled PI snapshot to SQLite database.

For production, replace SQLite with PostgreSQL, MySQL, or your
preferred database. The pattern is the same.
"""
import sqlite3

from datetime import datetime, timezone

def init_database(db_path="pi_snapshots.db"):
    """Create the snapshots table if it doesn't exist."""
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            capture_time TEXT NOT NULL,
            point_name TEXT NOT NULL,
            value REAL,
            value_text TEXT,
            timestamp TEXT,
            good BOOLEAN,
            error TEXT
        )
    """)
    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_snapshots_point_time
        ON snapshots (point_name, capture_time)
    """)
    conn.commit()
    return conn


def capture_snapshot(session, base_url, points, conn):
    """Read current values and insert into database.

    Args:
        points: list of (name, web_id) tuples
        conn: SQLite connection
    """
    capture_time = datetime.now(timezone.utc).isoformat()

    # Batch read all points
    batch = {}
    for name, wid in points:
        batch[name] = {
            "Method": "GET",
            "Resource": (
                f"{base_url}/streams/{wid}/value"
                f"?selectedFields=Timestamp;Value;Good"
            ),
        }

    resp = session.post(f"{base_url}/batch", json=batch)
    if not resp.ok:
        print(f"Batch read failed: {resp.status_code}")
        return 0

    rows = []
    for name, result in resp.json().items():
        if result["Status"] == 200:
            content = result["Content"]
            value = content["Value"]

            # Handle digital states and non-numeric values
            numeric_value = None
            text_value = None
            if isinstance(value, (int, float)):
                numeric_value = value
            elif isinstance(value, dict):
                text_value = value.get("Name", str(value))
            else:
                text_value = str(value)

            rows.append((
                capture_time, name, numeric_value, text_value,
                content["Timestamp"], content.get("Good", True), None
            ))
        else:
            rows.append((
                capture_time, name, None, None, None, False,
                f"HTTP {result['Status']}"
            ))

    conn.executemany(
        "INSERT INTO snapshots "
        "(capture_time, point_name, value, value_text, timestamp, good, error) "
        "VALUES (?, ?, ?, ?, ?, ?, ?)",
        rows,
    )
    conn.commit()
    return len(rows)


# Usage (call from cron or Task Scheduler)
conn = init_database()
points = [
    ("Reactor_Temp", reactor_temp_wid),
    ("Feed_Pressure", feed_pressure_wid),
    ("Product_Flow", product_flow_wid),
]
count = capture_snapshot(session, BASE_URL, points, conn)
print(f"Captured {count} values at {datetime.now(timezone.utc).isoformat()}")
conn.close()

For production scheduling

Use cron (Linux) or Task Scheduler (Windows) to run this script on a schedule. For orchestrated pipelines, use Apache Airflow, Prefect, or Dagster. Do not use time.sleep() loops in production -- they do not survive restarts and are hard to monitor.

Next steps