Building AI Agents with Claude
Capstone Project 5
Capstone 5 of 54–6 hoursDomain C — Public Records / UCC
← Capstone 4-C 🏠 Home M23: Capstone Guide →

Capstone 5 — Domain C: Production UCC Data Intelligence Platform

The culminating project: build a production-grade UCC data pipeline with multi-layer memory, model routing, full observability, cost optimization, and a 100-case evaluation harness.

Prerequisites

Required Modules

Complete these modules before starting Capstone 5. Each one teaches a production capability you will integrate here:

  • M12 — ReAct Agents: The reasoning loop pattern used by every agent in this pipeline
  • M14 — Multi-Agent Systems: Agent-to-agent handoffs and orchestration patterns (Capstone 4 foundation)
  • M17 — HITL & Guardrails: Human-in-the-loop routing, confidence thresholds, and circuit breakers
  • M18 — Evaluation: Building test suites, automated scoring, and accuracy measurement
  • M19 — Tracing: Distributed tracing with spans, trace collectors, and structured logging
  • M20 — Monitoring: Dashboards, alerting, and real-time metrics for production systems
  • M21 — Deployment: Containerization, queue-based processing, and API design
  • M22 — Cost Optimization: Model routing, prompt caching, and per-request cost tracking

You should also have completed Capstone 4 — Domain C (multi-agent UCC pipeline), as this capstone extends that project with production capabilities.

Project Brief

Business Context

You have built a data pipeline (Capstone 4) that processes UCC filings through four agents. It works — but it does not learn, it does not optimize costs, it is not observable, and it cannot handle production-scale volume. Moving from "it works" to "it runs in production" requires six additional engineering capabilities that no course module teaches in isolation.

The pain of deploying without these capabilities is predictable: costs spiral because every entity resolution uses the most expensive model. Entity resolution accuracy plateaus because the system does not learn from data steward corrections. Bugs are invisible because there is no tracing. And when the pipeline fails at 3 AM on a batch of 10,000 filings, nobody knows what happened or where it failed.

This capstone transforms your pipeline into a production platform. It learns from past corrections (multi-layer memory), routes simple tasks to cheap models (cost optimization), traces every decision for debugging (observability), handles 10,000+ filings per batch (deployment), and validates itself continuously (evaluation). This is the difference between a demo and a system someone depends on.

Production Targets
  • Volume: Process 10,000 filings per batch in < 30 minutes
  • Accuracy: Entity resolution accuracy > 90%
  • Cost: < $0.02 per filing (vs ~$0.08 with single-model approach)
  • Observability: Every LLM call, tool invocation, and agent handoff traced
  • Learning: Data steward corrections improve future resolution confidence
  • Evaluation: 100-case test suite with automated scoring

Environment Setup

What You'll Build

A production-grade UCC data intelligence platform with multi-layer memory, model routing, full observability, cost optimization, and a 100-case evaluation harness. Time estimate: 4–6 hours (difficulty: ★★★★★).

System Requirements

  • Python 3.10+ (check with python --version)
  • pip (check with pip --version)
  • Docker (optional, for deployment steps — check with docker --version)
  • An Anthropic API key with access to Haiku, Sonnet, and Opus models

Install Dependencies

Run this single command to create your project directory and install everything:

mkdir capstone-5-production-ucc && cd capstone-5-production-ucc
python -m venv venv

# Activate the virtual environment
# On macOS/Linux:
source venv/bin/activate
# On Windows:
# venv\Scripts\activate

pip install anthropic chromadb pydantic fastapi uvicorn celery redis httpx pytest

Set Your API Key

export ANTHROPIC_API_KEY=your-api-key-here

# To make it permanent, add to ~/.bashrc or ~/.zshrc:
echo 'export ANTHROPIC_API_KEY=your-api-key-here' >> ~/.bashrc
# Command Prompt:
set ANTHROPIC_API_KEY=your-api-key-here

# PowerShell:
$env:ANTHROPIC_API_KEY = "your-api-key-here"

# To make it permanent, use System Environment Variables in Settings
Checkpoint

Verify your setup by running: python -c "import anthropic; print(anthropic.__version__)". You should see a version number like 0.39.0 or higher. If you get a ModuleNotFoundError, ensure your virtual environment is activated.

File Structure

Here is every file you will create in this capstone. Each file has a specific role in the production pipeline:

capstone-5-production-ucc/
├── pipeline/
│   ├── orchestrator.py      # Main production orchestrator — coordinates all agents
│   ├── bronze_layer.py      # Raw data ingestion — reads CSV/XML, validates format
│   ├── silver_layer.py      # Cleaned/standardized data — schema normalization
│   └── gold_layer.py        # Business-ready analytics — risk profiles, aggregations
├── memory/
│   ├── episodic.py          # Episodic memory — past entity resolutions
│   ├── procedural.py        # Procedural memory — learned rules from steward decisions
│   └── working.py           # Working memory — current batch state
├── observability/
│   ├── tracer.py            # Tracing and span collection
│   └── dashboard.py         # Monitoring dashboard — 8 metric panels
├── guardrails/
│   ├── quality_gates.py     # Data quality checks — completeness, consistency
│   └── circuit_breaker.py   # Circuit breaker pattern — auto-halt on failures
├── routing/
│   └── model_router.py      # Model routing — task-to-model mapping + cost tracking
├── mock_data.py             # 15 realistic UCC filings with data quality issues
├── config.py                # Configuration management — routing, memory, deployment
├── test_pipeline.py         # End-to-end tests — 100-case evaluation harness
└── requirements.txt         # All Python dependencies

Mock UCC Data

This mock data file contains 15 realistic UCC filings. Some filings have data quality issues (missing fields, inconsistent formats) that your quality gates must detect. Related filings (amendments, continuations, terminations) reference original filing numbers so you can test the full filing lifecycle.

"""mock_data.py — 15 realistic UCC filings for testing the production pipeline.

Includes clean filings, data quality issues, and related filings
(amendments, continuations, terminations) for lifecycle testing.
"""

MOCK_UCC_FILINGS = [
    # --- CLEAN FILINGS (baseline accuracy tests) ---
    {
        "filing_number": "NY-2024-0012345",
        "debtor_name": "Acme Manufacturing LLC",
        "secured_party_name": "JPMorgan Chase Bank, N.A.",
        "state_code": "NY",
        "collateral_description": "All inventory, equipment, and accounts receivable now owned or hereafter acquired",
        "filing_type": "UCC-1",
        "filing_date": "2024-03-15",
        "expiration_date": "2029-03-15",
    },
    {
        "filing_number": "CA-2024-0098765",
        "debtor_name": "Pacific Coast Distribution Inc.",
        "secured_party_name": "Wells Fargo Bank, N.A.",
        "state_code": "CA",
        "collateral_description": "All assets of the debtor, including but not limited to inventory, chattel paper, and general intangibles",
        "filing_type": "UCC-1",
        "filing_date": "2024-01-22",
        "expiration_date": "2029-01-22",
    },
    {
        "filing_number": "TX-2024-0054321",
        "debtor_name": "Lone Star Energy Partners LP",
        "secured_party_name": "Bank of America, N.A.",
        "state_code": "TX",
        "collateral_description": "All oil and gas extraction equipment, pipeline fixtures, and related proceeds",
        "filing_type": "UCC-1",
        "filing_date": "2024-06-10",
        "expiration_date": "2029-06-10",
    },
    {
        "filing_number": "FL-2023-0033221",
        "debtor_name": "Sunshine Hospitality Group Inc.",
        "secured_party_name": "Truist Financial Corporation",
        "state_code": "FL",
        "collateral_description": "Furniture, fixtures, and equipment located at 1200 Ocean Drive, Miami FL 33139",
        "filing_type": "UCC-1",
        "filing_date": "2023-11-05",
        "expiration_date": "2028-11-05",
    },
    {
        "filing_number": "IL-2024-0077889",
        "debtor_name": "Midwest Agricultural Supply Co.",
        "secured_party_name": "Farm Credit Services of Illinois",
        "state_code": "IL",
        "collateral_description": "All farm products, crops, livestock, and farm equipment",
        "filing_type": "UCC-1",
        "filing_date": "2024-02-28",
        "expiration_date": "2029-02-28",
    },

    # --- RELATED FILINGS (amendment lifecycle) ---
    {
        "filing_number": "NY-2024-0012346",
        "debtor_name": "Acme Manufacturing LLC",
        "secured_party_name": "JPMorgan Chase Bank, N.A.",
        "state_code": "NY",
        "collateral_description": "Amendment to add: all intellectual property, patents, and trademarks",
        "filing_type": "UCC-3 amendment",
        "filing_date": "2024-07-20",
        "expiration_date": "2029-03-15",
        "original_filing_number": "NY-2024-0012345",
    },
    {
        "filing_number": "CA-2024-0098800",
        "debtor_name": "Pacific Coast Distribution Inc.",
        "secured_party_name": "Wells Fargo Bank, N.A.",
        "state_code": "CA",
        "collateral_description": "Continuation of original filing",
        "filing_type": "UCC-3 continuation",
        "filing_date": "2024-09-15",
        "expiration_date": "2029-09-15",
        "original_filing_number": "CA-2024-0098765",
    },
    {
        "filing_number": "FL-2024-0041100",
        "debtor_name": "Sunshine Hospitality Group Inc.",
        "secured_party_name": "Truist Financial Corporation",
        "state_code": "FL",
        "collateral_description": "Termination of all security interests",
        "filing_type": "UCC-3 termination",
        "filing_date": "2024-08-01",
        "expiration_date": None,
        "original_filing_number": "FL-2023-0033221",
    },

    # --- ENTITY RESOLUTION CHALLENGES (name variants) ---
    {
        "filing_number": "NY-2024-0015500",
        "debtor_name": "ACME MFG LLC",
        "secured_party_name": "Citibank, N.A.",
        "state_code": "NY",
        "collateral_description": "All equipment and machinery",
        "filing_type": "UCC-1",
        "filing_date": "2024-04-10",
        "expiration_date": "2029-04-10",
    },
    {
        "filing_number": "DE-2024-0022111",
        "debtor_name": "Acme Manufacturing, L.L.C.",
        "secured_party_name": "Silicon Valley Bank",
        "state_code": "DE",
        "collateral_description": "All assets",
        "filing_type": "UCC-1",
        "filing_date": "2024-05-18",
        "expiration_date": "2029-05-18",
    },

    # --- DATA QUALITY ISSUES (for quality gate testing) ---
    {
        "filing_number": "OH-2024-0066000",
        "debtor_name": "",
        "secured_party_name": "KeyBank National Association",
        "state_code": "OH",
        "collateral_description": "All inventory and accounts receivable",
        "filing_type": "UCC-1",
        "filing_date": "2024-03-01",
        "expiration_date": "2029-03-01",
    },
    {
        "filing_number": "PA-2024-0088999",
        "debtor_name": "Liberty Manufacturing Corp",
        "secured_party_name": "",
        "state_code": "PA",
        "collateral_description": "All assets of the debtor",
        "filing_type": "UCC-1",
        "filing_date": "2024-07-15",
        "expiration_date": "2029-07-15",
    },
    {
        "filing_number": "WA-2024-007",
        "debtor_name": "Cascade Tech Solutions Inc.",
        "secured_party_name": "Columbia State Bank",
        "state_code": "WA",
        "collateral_description": "All accounts, inventory, equipment",
        "filing_type": "UCC-1",
        "filing_date": "2024/08/30",
        "expiration_date": "2029/08/30",
    },
    {
        "filing_number": "GA-2024-0044556",
        "debtor_name": "Peachtree Holdings LLC",
        "secured_party_name": "Synovus Financial Corp",
        "state_code": "GA",
        "collateral_description": "",
        "filing_type": "UCC-1",
        "filing_date": "2024-09-12",
        "expiration_date": "2024-09-12",
    },
    {
        "filing_number": "NV-2024-0055667",
        "debtor_name": "Silver State Gaming Corp\t",
        "secured_party_name": "Nevada State\nBank",
        "state_code": "NV",
        "collateral_description": "Gaming equipment, slot machines, and related fixtures at 500 Las Vegas Blvd",
        "filing_type": "UCC1",
        "filing_date": "2024-10-01",
        "expiration_date": "2029-10-01",
    },
]


# --- EXPECTED QUALITY ISSUES (for testing quality gates) ---
EXPECTED_QUALITY_ISSUES = {
    "OH-2024-0066000": ["missing_debtor_name"],
    "PA-2024-0088999": ["missing_secured_party_name"],
    "WA-2024-007": ["non_standard_filing_number", "non_standard_date_format"],
    "GA-2024-0044556": ["missing_collateral_description", "expiration_equals_filing_date"],
    "NV-2024-0055667": ["whitespace_in_names", "non_standard_filing_type"],
}


# --- ENTITY RESOLUTION GROUND TRUTH ---
ENTITY_CLUSTERS = {
    "ENT-00001": {
        "canonical_name": "Acme Manufacturing LLC",
        "variants": [
            "Acme Manufacturing LLC",
            "ACME MFG LLC",
            "Acme Manufacturing, L.L.C.",
        ],
        "filing_numbers": ["NY-2024-0012345", "NY-2024-0012346", "NY-2024-0015500", "DE-2024-0022111"],
    },
    "ENT-00002": {
        "canonical_name": "Pacific Coast Distribution Inc.",
        "variants": ["Pacific Coast Distribution Inc."],
        "filing_numbers": ["CA-2024-0098765", "CA-2024-0098800"],
    },
    "ENT-00003": {
        "canonical_name": "Sunshine Hospitality Group Inc.",
        "variants": ["Sunshine Hospitality Group Inc."],
        "filing_numbers": ["FL-2023-0033221", "FL-2024-0041100"],
    },
}


if __name__ == "__main__":
    print(f"Total filings: {len(MOCK_UCC_FILINGS)}")
    print(f"Expected quality issues: {len(EXPECTED_QUALITY_ISSUES)} filings")
    print(f"Entity clusters: {len(ENTITY_CLUSTERS)} entities")
    for fid, issues in EXPECTED_QUALITY_ISSUES.items():
        print(f"  {fid}: {', '.join(issues)}")
/**
 * mock_data.ts — 15 realistic UCC filings for testing the production pipeline.
 */

interface UCCFiling {
  filing_number: string;
  debtor_name: string;
  secured_party_name: string;
  state_code: string;
  collateral_description: string;
  filing_type: string;
  filing_date: string;
  expiration_date: string | null;
  original_filing_number?: string;
}

export const MOCK_UCC_FILINGS: UCCFiling[] = [
  // --- CLEAN FILINGS ---
  {
    filing_number: "NY-2024-0012345",
    debtor_name: "Acme Manufacturing LLC",
    secured_party_name: "JPMorgan Chase Bank, N.A.",
    state_code: "NY",
    collateral_description: "All inventory, equipment, and accounts receivable now owned or hereafter acquired",
    filing_type: "UCC-1",
    filing_date: "2024-03-15",
    expiration_date: "2029-03-15",
  },
  {
    filing_number: "CA-2024-0098765",
    debtor_name: "Pacific Coast Distribution Inc.",
    secured_party_name: "Wells Fargo Bank, N.A.",
    state_code: "CA",
    collateral_description: "All assets of the debtor, including but not limited to inventory, chattel paper, and general intangibles",
    filing_type: "UCC-1",
    filing_date: "2024-01-22",
    expiration_date: "2029-01-22",
  },
  // ... (remaining 13 filings follow same structure as Python version)
];

export const EXPECTED_QUALITY_ISSUES: Record<string, string[]> = {
  "OH-2024-0066000": ["missing_debtor_name"],
  "PA-2024-0088999": ["missing_secured_party"],
  "WA-2024-007": ["non_standard_filing_number", "non_standard_date_format"],
  "GA-2024-0044556": ["missing_collateral_description", "expiration_equals_filing_date"],
  "NV-2024-0055667": ["whitespace_in_names", "non_standard_filing_type"],
};

export const ENTITY_CLUSTERS: Record<string, { canonical_name: string; variants: string[]; filing_numbers: string[] }> = {
  "ENT-00001": {
    canonical_name: "Acme Manufacturing LLC",
    variants: ["Acme Manufacturing LLC", "ACME MFG LLC", "Acme Manufacturing, L.L.C."],
    filing_numbers: ["NY-2024-0012345", "NY-2024-0012346", "NY-2024-0015500", "DE-2024-0022111"],
  },
};
Data Quality Issues to Detect

OH-2024-0066000: Empty debtor name — your quality gate should reject or flag this filing.

PA-2024-0088999: Empty secured party — cannot determine who holds the lien.

WA-2024-007: Filing number too short (should be 7 digits after state-year prefix) and dates use slashes instead of hyphens.

GA-2024-0044556: Empty collateral description and expiration date equals filing date (likely data entry error).

NV-2024-0055667: Tab character in debtor name, newline in secured party name, and filing type "UCC1" instead of "UCC-1".

Six Production Pillars

The Six Pillars of a Production Agent System
🧠
Multi-Layer Memory
Working + Episodic + Procedural memory tiers
🔍
Advanced RAG
Hybrid search, re-ranking, context compression
📈
Observability
Distributed tracing, metrics dashboard, alerting
💰
Cost Optimization
Model routing, prompt caching, compression
🚀
Deployment
Docker, queue-based async, streaming API
Evaluation
100-case test suite, automated scoring

System Configuration

{
  "model_routing": {
    "collateral_classification": "claude-opus-4-7",
    "entity_resolution": "claude-sonnet-4-6",
    "complex_entity_disambiguation": "claude-opus-4-7",
    "report_generation": "claude-sonnet-4-6",
    "routing_threshold": {
      "simple_classification": 0.90,
      "ambiguous_entity_candidate_count": 3
    }
  },
  "cost_targets": {
    "haiku_per_1k_input": 0.001,
    "sonnet_per_1k_input": 0.003,
    "opus_per_1k_input": 0.015,
    "target_per_filing": 0.02
  },
  "deployment": {
    "runtime": "Docker",
    "api_framework": "FastAPI",
    "queue": "Celery + Redis",
    "vector_db": "ChromaDB",
    "max_batch_size": 10000,
    "max_concurrent_workers": 10
  }
}
{
  "memory": {
    "working": {
      "type": "in_process",
      "max_context_tokens": 4000,
      "purpose": "Current batch state, active entity being resolved"
    },
    "episodic": {
      "type": "vector_store",
      "collection": "resolution_episodes",
      "retention_days": 730,
      "purpose": "Past entity resolutions + steward corrections",
      "example": {
        "episode_id": "RE-2024-11234",
        "debtor_name_raw": "ABC MANUFACTURING CO",
        "resolved_to": "ENT-00567",
        "steward_action": "match_to_ENT-00567",
        "lesson": "Same registered agent + address = match despite suffix"
      }
    },
    "procedural": {
      "type": "rule_store",
      "purpose": "Learned rules from repeated steward decisions",
      "example": {
        "rule_id": "ER-001",
        "confidence": 0.92,
        "rule": "When entities share registered agent AND address, match with confidence > 85%",
        "created_from": ["RE-2024-11234", "RE-2024-11301", "RE-2024-11455"]
      }
    }
  }
}

Cost Optimization: Model Routing

Not every task needs Claude Opus. Format validation and quality checks are handled by Haiku at $1.00/M input tokens. Standard entity resolution and risk profiling go to Sonnet at $3.00/M input tokens. Collateral classification — which requires nuanced legal reasoning about UCC categories — routes to Opus at $15.00/M input tokens. This tiered approach cuts the average cost per filing from ~$0.08 to ~$0.04.

Model Routing — Right Model for the Right Task
Classify collateral: "All inventory and equipment" Opus $0.045
Classify collateral: "All assets of the debtor" Opus $0.045
Resolve: "Acme LLC" vs "ACME L.L.C." (high conf) Sonnet $0.008
Resolve: "Acme LLC" vs "Acme Inc" (ambiguous) Sonnet $0.012
Disambiguate: 4 candidates, conflicting registries Opus $0.045
Generate risk report for portfolio Sonnet $0.015
Cost Impact

A 10,000-filing batch with single-model (Sonnet) costs ~$800. With model routing: 70% of tasks go to Haiku ($70), 25% to Sonnet ($200), 5% to Opus ($75). Total: ~$345 — 57% cost reduction. Over 12 monthly batches across 50 states, that is ~$273,000 in annual savings.

Multi-Layer Memory

The system uses three memory tiers that work together to improve over time:

  • Working MemoryIn-process state for the current batch and entity being resolved. Limited to 4,000 tokens. Cleared after each batch completes. Think of it as the agent's "short-term memory" — what it is currently working on.: Current batch state, active entity being resolved. In-process, cleared per batch.
  • Episodic MemoryA vector database of past entity resolution decisions, including steward corrections. When the system encounters a new entity, it searches episodic memory for similar past cases. If a similar case was resolved with high confidence, the system can boost its own confidence without needing a steward.: Vector store of past resolutions + steward corrections. When the system encounters "ABC Manufacturing Co", it searches for similar past cases. If a steward previously matched this to ENT-00567, the system retrieves that lesson.
  • Procedural MemoryRules learned from patterns in steward decisions. When multiple episodes share the same pattern (e.g., "same registered agent + same address = match"), the system extracts a procedural rule. These rules can resolve entities without any LLM call — pure deterministic logic.: Rules distilled from repeated steward decisions. After 3+ episodes where "same registered agent + same address = match", the system creates a procedural rule that fires deterministically — no LLM call needed.
Why It Matters

Without memory, the system resolves the same entity variant the same (expensive, slow) way every time. With memory: the first time "ABC Manufacturing Co" appears, it costs $0.08 (Sonnet resolution + steward review). The second time, episodic memory boosts confidence above 80%, saving the steward review ($2–5 per review). By the tenth time, a procedural rule fires and resolves it with zero LLM cost. Over 100,000 entities per year, this compounds into massive savings in both cost and steward time.

Observability Dashboard

Real-Time Dashboard — 8 Metric Panels
1,247
filings/hr
91.3%
entity accuracy
14
steward queue
A-
batch quality
$0.018
cost/filing
72/18/10
Haiku/Son/Opus %
48%
cache hit rate
42/50
states covered

Step-by-Step Build Guide

What You'll Build

A complete production UCC data pipeline with 6 capabilities: model routing, multi-layer memory, data quality gates, circuit breakers, distributed tracing, and a 100-case evaluation harness. Time: 4–6 hours. Difficulty: ★★★★★.

Now that you understand the architecture and have your environment set up, it is time to build. Each step creates one file, includes a run command, and shows expected output. Follow them in order — later steps depend on earlier ones.

Step 1: Create the Configuration File

What & Why: Every production system needs centralized configuration. This file holds model routing rules, memory settings, quality thresholds, and cost targets. By keeping configuration separate from code, you can tune the system without redeploying.

Create a new file called config.py:

"""config.py — Centralized configuration for the production UCC pipeline."""

# --- Model Routing ---
MODEL_ROUTING = {
    "format_validation":       {"model": "claude-haiku-4-5-20251001", "max_tokens": 1024},
    "quality_check":           {"model": "claude-haiku-4-5-20251001", "max_tokens": 2048},
    "entity_resolution":       {"model": "claude-sonnet-4-6",        "max_tokens": 4096},
    "anomaly_analysis":        {"model": "claude-sonnet-4-6",        "max_tokens": 4096},
    "risk_profile":            {"model": "claude-sonnet-4-6",        "max_tokens": 4096},
    "collateral_classification": {"model": "claude-opus-4-7",        "max_tokens": 4096},
}

MODEL_COSTS_PER_1M = {
    "claude-haiku-4-5-20251001": {"input": 1.00, "output": 5.00},
    "claude-sonnet-4-6":         {"input": 3.00, "output": 15.00},
    "claude-opus-4-7":           {"input": 15.00, "output": 75.00},
}

# --- Memory ---
EPISODIC_SIMILARITY_THRESHOLD = 0.80
PROCEDURAL_RULE_MIN_EPISODES = 3
PROCEDURAL_RULE_MIN_CONFIDENCE = 0.85

# --- Quality Gates ---
QUALITY_REQUIRED_FIELDS = ["filing_number", "debtor_name", "secured_party_name",
                           "state_code", "collateral_description", "filing_date"]
CIRCUIT_BREAKER_ERROR_THRESHOLD = 0.10  # Trip at 10% error rate
CIRCUIT_BREAKER_WINDOW_SIZE = 100       # Check every 100 records

# --- Cost Targets ---
TARGET_COST_PER_FILING = 0.02  # $0.02 max per filing


if __name__ == "__main__":
    print("Configuration loaded successfully.")
    print(f"  Model routing rules: {len(MODEL_ROUTING)}")
    print(f"  Cost models tracked: {len(MODEL_COSTS_PER_1M)}")
    print(f"  Quality required fields: {len(QUALITY_REQUIRED_FIELDS)}")
    print(f"  Circuit breaker threshold: {CIRCUIT_BREAKER_ERROR_THRESHOLD*100}%")

Run: python config.py

Expected output:

Configuration loaded successfully.
  Model routing rules: 6
  Cost models tracked: 3
  Quality required fields: 6
  Circuit breaker threshold: 10.0%
Checkpoint

If you see the output above, Step 1 is working. If you get an IndentationError, check that you copied the entire file. If you get a SyntaxError, ensure you are using Python 3.10+.

Step 2: Build the Model Router

What & Why: The model router selects the cheapest capable model for each task type. Format validation goes to Haiku ($1.00/M input tokens). Entity resolution goes to Sonnet ($3.00/M). Collateral classification — which requires nuanced legal reasoning about UCC categories — routes to Opus ($15.00/M input). This tiered approach cuts cost per filing from ~$0.08 to ~$0.04.

Create a new file called routing/model_router.py (create the routing/ directory first):

"""routing/model_router.py — Selects cheapest capable model per task."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import MODEL_ROUTING, MODEL_COSTS_PER_1M


def route_to_model(task_type: str, complexity: str = "standard") -> dict:
    """Select the cheapest capable model for a given task type.

    Args:
        task_type: One of the keys in MODEL_ROUTING.
        complexity: "standard" or "high". High complexity upgrades
                    Haiku tasks to Sonnet for deeper analysis.

    Returns:
        Dict with model, max_tokens, and reasoning.
    """
    if task_type not in MODEL_ROUTING:
        print(f"Warning: Unknown task '{task_type}', defaulting to Sonnet")
        return {
            "model": "claude-sonnet-4-6",
            "max_tokens": 4096,
            "reasoning": "default fallback for unknown task",
        }

    rule = MODEL_ROUTING[task_type]

    # Escalate Haiku to Sonnet when complexity is high
    if complexity == "high" and rule["model"].startswith("claude-haiku"):
        return {
            "model": "claude-sonnet-4-6",
            "max_tokens": 4096,
            "reasoning": f"upgraded from Haiku due to high complexity",
        }

    return {
        **rule,
        "reasoning": f"routed {task_type} to {rule['model']}",
    }


def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """Estimate cost for a single API call in dollars."""
    costs = MODEL_COSTS_PER_1M.get(model, {"input": 3.00, "output": 15.00})
    return (input_tokens / 1_000_000 * costs["input"] +
            output_tokens / 1_000_000 * costs["output"])


if __name__ == "__main__":
    tasks = [
        ("format_validation", "standard"),
        ("entity_resolution", "standard"),
        ("collateral_classification", "standard"),
        ("format_validation", "high"),
    ]
    for task_type, complexity in tasks:
        routing = route_to_model(task_type, complexity)
        cost = estimate_cost(routing["model"], 500, 200)
        print(f"{task_type} ({complexity}): {routing['model']}")
        print(f"  Reason: {routing['reasoning']}")
        print(f"  Est. cost: ${cost:.6f}")
/**
 * routing/model_router.ts — Selects cheapest capable model per task.
 */

const MODEL_ROUTING: Record<string, { model: string; maxTokens: number }> = {
  format_validation:       { model: "claude-haiku-4-5-20251001", maxTokens: 1024 },
  quality_check:           { model: "claude-haiku-4-5-20251001", maxTokens: 2048 },
  entity_resolution:       { model: "claude-sonnet-4-6",        maxTokens: 4096 },
  collateral_classification: { model: "claude-opus-4-7",        maxTokens: 4096 },
};

const MODEL_COSTS: Record<string, { input: number; output: number }> = {
  "claude-haiku-4-5-20251001": { input: 1.00, output: 5.00 },
  "claude-sonnet-4-6":         { input: 3.00, output: 15.00 },
  "claude-opus-4-7":           { input: 15.00, output: 75.00 },
};

interface RoutingResult {
  model: string;
  maxTokens: number;
  reasoning: string;
}

export function routeToModel(
  taskType: string,
  complexity: "standard" | "high" = "standard"
): RoutingResult {
  if (!(taskType in MODEL_ROUTING)) {
    return { model: "claude-sonnet-4-6", maxTokens: 4096, reasoning: "default fallback" };
  }
  const rule = MODEL_ROUTING[taskType];
  if (complexity === "high" && rule.model.startsWith("claude-haiku")) {
    return { model: "claude-sonnet-4-6", maxTokens: 4096, reasoning: "upgraded from Haiku" };
  }
  return { ...rule, reasoning: `routed ${taskType} to ${rule.model}` };
}

export function estimateCost(model: string, inputTokens: number, outputTokens: number): number {
  const costs = MODEL_COSTS[model] ?? { input: 3.00, output: 15.00 };
  return inputTokens / 1_000_000 * costs.input + outputTokens / 1_000_000 * costs.output;
}

Run: python routing/model_router.py

Expected output:

format_validation (standard): claude-haiku-4-5-20251001
  Reason: routed format_validation to claude-haiku-4-5-20251001
  Est. cost: $0.001200
entity_resolution (standard): claude-sonnet-4-6
  Reason: routed entity_resolution to claude-sonnet-4-6
  Est. cost: $0.004500
collateral_classification (standard): claude-opus-4-7
  Reason: routed collateral_classification to claude-opus-4-7
  Est. cost: $0.022500
format_validation (high): claude-sonnet-4-6
  Reason: upgraded from Haiku due to high complexity
  Est. cost: $0.004500
Checkpoint

You should see four routing decisions with estimated costs. Notice how format validation costs $0.0012 with Haiku but $0.0045 when escalated to Sonnet. The collateral classification costs $0.0225 with Opus — 18x more than Haiku. This is why routing matters. If you get an ImportError, make sure config.py exists in the parent directory and you created an empty routing/__init__.py file.

Step 3: Build the Quality Gates

What & Why: Before any filing enters the pipeline, it must pass data quality checks. Missing debtor names, empty collateral descriptions, and malformed dates cause downstream failures. Quality gates catch these issues early and flag them for remediation instead of silently producing bad data.

Create a new file called guardrails/quality_gates.py:

"""guardrails/quality_gates.py — Data quality checks for UCC filings."""
import re
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import QUALITY_REQUIRED_FIELDS


def validate_filing(filing: dict) -> dict:
    """Validate a single UCC filing. Returns dict with is_valid and issues list."""
    issues = []

    # Check required fields
    for field in QUALITY_REQUIRED_FIELDS:
        value = filing.get(field, "")
        if not value or not str(value).strip():
            issues.append(f"missing_{field}")

    # Check filing number format: XX-YYYY-NNNNNNN
    fn = filing.get("filing_number", "")
    if fn and not re.match(r"^[A-Z]{2}-\d{4}-\d{7}$", fn):
        issues.append("non_standard_filing_number")

    # Check date format: YYYY-MM-DD
    for date_field in ["filing_date", "expiration_date"]:
        val = filing.get(date_field)
        if val and not re.match(r"^\d{4}-\d{2}-\d{2}$", str(val)):
            issues.append(f"non_standard_date_format")

    # Check for whitespace issues in text fields
    for text_field in ["debtor_name", "secured_party_name"]:
        val = filing.get(text_field, "")
        if val and (val != val.strip() or "\t" in val or "\n" in val):
            issues.append("whitespace_in_names")

    # Check filing type standardization
    valid_types = {"UCC-1", "UCC-3 amendment", "UCC-3 continuation", "UCC-3 termination"}
    if filing.get("filing_type") and filing["filing_type"] not in valid_types:
        issues.append("non_standard_filing_type")

    # Check expiration vs filing date
    if (filing.get("expiration_date") and filing.get("filing_date") and
            filing["expiration_date"] == filing["filing_date"]):
        issues.append("expiration_equals_filing_date")

    return {
        "filing_number": filing.get("filing_number", "UNKNOWN"),
        "is_valid": len(issues) == 0,
        "issues": issues,
    }


def validate_batch(filings: list[dict]) -> dict:
    """Validate a batch of filings. Returns summary with per-filing results."""
    results = [validate_filing(f) for f in filings]
    valid = sum(1 for r in results if r["is_valid"])
    invalid = len(results) - valid
    error_rate = invalid / len(results) if results else 0
    return {
        "total": len(results),
        "valid": valid,
        "invalid": invalid,
        "error_rate": round(error_rate, 4),
        "results": results,
    }


if __name__ == "__main__":
    from mock_data import MOCK_UCC_FILINGS, EXPECTED_QUALITY_ISSUES
    summary = validate_batch(MOCK_UCC_FILINGS)
    print(f"Batch validation: {summary['valid']}/{summary['total']} valid "
          f"({summary['error_rate']*100:.1f}% error rate)")
    for r in summary["results"]:
        if not r["is_valid"]:
            print(f"  INVALID {r['filing_number']}: {', '.join(r['issues'])}")

Run: python guardrails/quality_gates.py

Expected output:

Batch validation: 10/15 valid (33.3% error rate)
  INVALID OH-2024-0066000: missing_debtor_name
  INVALID PA-2024-0088999: missing_secured_party_name
  INVALID WA-2024-007: non_standard_filing_number, non_standard_date_format
  INVALID GA-2024-0044556: missing_collateral_description, expiration_equals_filing_date
  INVALID NV-2024-0055667: whitespace_in_names, non_standard_filing_type
Checkpoint

You should see 5 invalid filings detected, each with specific issues matching the EXPECTED_QUALITY_ISSUES in mock_data.py. If you see fewer than 5, check that your regex patterns are correct. If you get an ImportError on mock_data, ensure mock_data.py is in the project root directory.

Step 4: Build the Circuit Breaker

What & Why: When a batch has too many errors, continuing to process wastes API calls and produces unreliable data. The circuit breaker monitors the error rate and automatically halts processing when it exceeds a threshold (10% by default). This prevents cost spirals on bad data and alerts the team immediately.

Create a new file called guardrails/circuit_breaker.py:

"""guardrails/circuit_breaker.py — Auto-halt pipeline on excessive errors."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CIRCUIT_BREAKER_ERROR_THRESHOLD, CIRCUIT_BREAKER_WINDOW_SIZE


class CircuitBreaker:
    """Monitors error rate and trips when threshold is exceeded."""

    def __init__(
        self,
        threshold: float = CIRCUIT_BREAKER_ERROR_THRESHOLD,
        window_size: int = CIRCUIT_BREAKER_WINDOW_SIZE,
    ):
        self.threshold = threshold
        self.window_size = window_size
        self._results: list[bool] = []  # True = success, False = failure
        self._tripped = False
        self._trip_reason = ""

    @property
    def is_tripped(self) -> bool:
        return self._tripped

    @property
    def trip_reason(self) -> str:
        return self._trip_reason

    def record(self, success: bool) -> None:
        """Record a processing result. Trips if error rate exceeds threshold."""
        if self._tripped:
            return
        self._results.append(success)
        if len(self._results) >= self.window_size:
            window = self._results[-self.window_size:]
            error_rate = sum(1 for r in window if not r) / len(window)
            if error_rate > self.threshold:
                self._tripped = True
                self._trip_reason = (
                    f"Error rate {error_rate:.1%} exceeded threshold "
                    f"{self.threshold:.1%} over last {self.window_size} records"
                )

    def reset(self) -> None:
        """Reset the circuit breaker after manual review."""
        self._tripped = False
        self._trip_reason = ""
        self._results.clear()

    def status(self) -> dict:
        total = len(self._results)
        errors = sum(1 for r in self._results if not r)
        return {
            "tripped": self._tripped,
            "reason": self._trip_reason,
            "total_processed": total,
            "errors": errors,
            "error_rate": round(errors / total, 4) if total else 0,
        }


if __name__ == "__main__":
    cb = CircuitBreaker(threshold=0.10, window_size=10)

    # Simulate processing: 8 successes then 3 failures
    for i in range(8):
        cb.record(True)
        print(f"Record {i+1}: success | tripped={cb.is_tripped}")
    for i in range(3):
        cb.record(False)
        print(f"Record {8+i+1}: FAILURE | tripped={cb.is_tripped}")

    print(f"\nCircuit breaker status: {cb.status()}")

Run: python guardrails/circuit_breaker.py

Expected output:

Record 1: success | tripped=False
Record 2: success | tripped=False
...
Record 8: success | tripped=False
Record 9: FAILURE | tripped=False
Record 10: FAILURE | tripped=True
Record 11: FAILURE | tripped=True

Circuit breaker status: {'tripped': True, 'reason': 'Error rate 20.0% exceeded threshold 10.0% over last 10 records', 'total_processed': 10, 'errors': 2, 'error_rate': 0.2}
Checkpoint

The circuit breaker should trip on record 10 — that is the first record where the window has 10 entries (so the error-rate check runs) and 2 of those 10 are failures (20% > 10% threshold). If it trips earlier or later, check your window_size parameter.

Step 5: Build Episodic Memory

What & Why: Episodic memory stores past entity resolution decisions so the pipeline can learn from steward corrections. When a new entity appears, the system searches for similar past episodes. If a high-confidence match exists, it skips the expensive LLM call. This is how the pipeline gets cheaper and faster over time.

Create a new file called memory/episodic.py:

"""memory/episodic.py — Stores and recalls past entity resolutions."""
from __future__ import annotations
import math, uuid, json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict


@dataclass
class Episode:
    episode_id: str
    debtor_name_raw: str
    resolved_entity_id: str
    steward_action: str          # "auto" | "match_to_ENT-XXXXX" | "create_new"
    lesson: str                  # human-readable explanation
    embedding: list[float] = field(default_factory=list)
    created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())


class EpisodicMemory:
    """Simple vector store mock for episodic memory.
    Replace with ChromaDB or Pinecone for production."""

    def __init__(self, similarity_threshold: float = 0.80):
        self._store: list[Episode] = []
        self._threshold = similarity_threshold

    def store(self, debtor_name: str, resolved_id: str,
              steward_action: str, lesson: str) -> Episode:
        """Store a new episode after a resolution or steward correction."""
        episode = Episode(
            episode_id=f"RE-{uuid.uuid4().hex[:8]}",
            debtor_name_raw=debtor_name,
            resolved_entity_id=resolved_id,
            steward_action=steward_action,
            lesson=lesson,
            embedding=self._mock_embed(debtor_name),
        )
        self._store.append(episode)
        return episode

    def recall(self, debtor_name: str, top_k: int = 3) -> list[dict]:
        """Recall similar past episodes for a given debtor name."""
        query_vec = self._mock_embed(debtor_name)
        scored = []
        for ep in self._store:
            sim = self._cosine_sim(query_vec, ep.embedding)
            if sim >= self._threshold:
                scored.append({"episode": asdict(ep), "similarity": round(sim, 4)})
        scored.sort(key=lambda x: x["similarity"], reverse=True)
        return scored[:top_k]

    @staticmethod
    def _mock_embed(text: str) -> list[float]:
        """Mock embedding: character frequency vector. Replace with real embeddings."""
        text = text.lower().strip()
        vec = [0.0] * 26
        for ch in text:
            if "a" <= ch <= "z":
                vec[ord(ch) - ord("a")] += 1.0
        magnitude = math.sqrt(sum(v * v for v in vec)) or 1.0
        return [v / magnitude for v in vec]

    @staticmethod
    def _cosine_sim(a: list[float], b: list[float]) -> float:
        dot = sum(x * y for x, y in zip(a, b))
        mag_a = math.sqrt(sum(x * x for x in a)) or 1.0
        mag_b = math.sqrt(sum(y * y for y in b)) or 1.0
        return dot / (mag_a * mag_b)


if __name__ == "__main__":
    mem = EpisodicMemory(similarity_threshold=0.75)
    mem.store("ABC MANUFACTURING CO", "ENT-00567",
              "match_to_ENT-00567", "Same registered agent + address = match")
    mem.store("ABC MANUFACTURING COMPANY", "ENT-00567",
              "match_to_ENT-00567", "'CO' vs 'COMPANY' suffix is cosmetic")

    results = mem.recall("ABC MFG CO")
    print(f"Found {len(results)} similar episodes:")
    for r in results:
        print(f"  Similarity: {r['similarity']} -> {r['episode']['resolved_entity_id']}")
        print(f"  Lesson: {r['episode']['lesson']}")
/**
 * memory/episodic.ts — Stores and recalls past entity resolutions.
 */
interface Episode {
  episodeId: string;
  debtorNameRaw: string;
  resolvedEntityId: string;
  stewardAction: string;
  lesson: string;
  embedding: number[];
  createdAt: string;
}

class EpisodicMemory {
  private store: Episode[] = [];
  private threshold: number;

  constructor(similarityThreshold = 0.80) {
    this.threshold = similarityThreshold;
  }

  storeEpisode(debtorName: string, resolvedId: string,
    stewardAction: string, lesson: string): Episode {
    const episode: Episode = {
      episodeId: `RE-${crypto.randomUUID().slice(0, 8)}`,
      debtorNameRaw: debtorName, resolvedEntityId: resolvedId,
      stewardAction, lesson,
      embedding: EpisodicMemory.mockEmbed(debtorName),
      createdAt: new Date().toISOString(),
    };
    this.store.push(episode);
    return episode;
  }

  recall(debtorName: string, topK = 3): { episode: Episode; similarity: number }[] {
    const queryVec = EpisodicMemory.mockEmbed(debtorName);
    const scored: { episode: Episode; similarity: number }[] = [];
    for (const ep of this.store) {
      const sim = EpisodicMemory.cosineSim(queryVec, ep.embedding);
      if (sim >= this.threshold) {
        scored.push({ episode: ep, similarity: Math.round(sim * 10000) / 10000 });
      }
    }
    scored.sort((a, b) => b.similarity - a.similarity);
    return scored.slice(0, topK);
  }

  private static mockEmbed(text: string): number[] {
    const lower = text.toLowerCase().trim();
    const vec = new Array(26).fill(0);
    for (const ch of lower) {
      const idx = ch.charCodeAt(0) - 97;
      if (idx >= 0 && idx < 26) vec[idx] += 1;
    }
    const mag = Math.sqrt(vec.reduce((s: number, v: number) => s + v * v, 0)) || 1;
    return vec.map((v: number) => v / mag);
  }

  private static cosineSim(a: number[], b: number[]): number {
    const dot = a.reduce((s, v, i) => s + v * b[i], 0);
    const magA = Math.sqrt(a.reduce((s, v) => s + v * v, 0)) || 1;
    const magB = Math.sqrt(b.reduce((s, v) => s + v * v, 0)) || 1;
    return dot / (magA * magB);
  }
}

Run: python memory/episodic.py

Expected output:

Found 2 similar episodes:
  Similarity: 0.9234 -> ENT-00567
  Lesson: Same registered agent + address = match
  Similarity: 0.8871 -> ENT-00567
  Lesson: 'CO' vs 'COMPANY' suffix is cosmetic
Checkpoint

You should see 2 similar episodes found. The similarity scores will vary slightly based on the mock embedding, but both should be above 0.75 (the threshold you set). The key insight: "ABC MFG CO" matched both "ABC MANUFACTURING CO" and "ABC MANUFACTURING COMPANY" because they share similar character distributions. In production, replace _mock_embed with real embeddings from a model like voyage-3 for much better accuracy.

Step 5b: Working Memory + Procedural Memory

What & Why: Episodic memory (Step 5) handles long-term recall. Two more tiers complete the multi-layer memory promise: Working memory holds the per-batch run state (active filings, pending HITL items, in-flight tool calls) so a single pipeline run is queryable in real time. Procedural memory stores deterministic rules learned from steward corrections — e.g., "if registered agent + address match, treat as same entity, skip LLM" — so frequent patterns resolve at zero cost.

"""memory/working.py — per-run scratchpad.

WHAT: Tracks the current pipeline run's mutable state: active
      filings being processed, pending HITL items, in-flight tool
      calls, and per-state quality counters.
WHY:  Without working memory you cannot answer "what is the
      pipeline doing right now?" or recover from a mid-run crash.
GOTCHA: This is process-local. For multi-pod deployment, swap the
        dict for Redis with the run_id as the key namespace.
"""

import time
from threading import Lock


class WorkingMemory:
    def __init__(self, run_id: str):
        self.run_id = run_id
        self.started_at = time.time()
        self._lock = Lock()
        self._active_filings: dict[str, dict] = {}
        self._pending_hitl: list[dict] = []
        self._in_flight_calls: dict[str, dict] = {}
        self._counters: dict[str, int] = {}

    def begin_filing(self, filing_id: str, payload: dict) -> None:
        with self._lock:
            self._active_filings[filing_id] = {
                "payload": payload, "started_at": time.time()}
            self._counters["filings_started"] = (
                self._counters.get("filings_started", 0) + 1)

    def complete_filing(self, filing_id: str,
                        outcome: str) -> None:
        with self._lock:
            self._active_filings.pop(filing_id, None)
            key = f"filings_completed_{outcome}"
            self._counters[key] = self._counters.get(key, 0) + 1

    def queue_hitl(self, item: dict) -> None:
        with self._lock:
            self._pending_hitl.append({**item,
                                       "queued_at": time.time()})

    def begin_call(self, call_id: str, model: str,
                   tool: str | None = None) -> None:
        with self._lock:
            self._in_flight_calls[call_id] = {
                "model": model, "tool": tool,
                "started_at": time.time()}

    def end_call(self, call_id: str) -> float:
        with self._lock:
            entry = self._in_flight_calls.pop(call_id, None)
            return time.time() - entry["started_at"] if entry else 0.0

    def snapshot(self) -> dict:
        """Read-only view for the dashboard."""
        with self._lock:
            return {
                "run_id": self.run_id,
                "uptime_s": round(time.time() - self.started_at, 1),
                "active_filings": len(self._active_filings),
                "pending_hitl": len(self._pending_hitl),
                "in_flight_calls": len(self._in_flight_calls),
                "counters": dict(self._counters),
            }
"""memory/procedural.py — learned deterministic rules.

WHAT: Stores rules harvested from past steward corrections. When a
      pattern fires deterministically >=N times with the same
      outcome, promote it to a procedural rule that bypasses the
      LLM entirely on subsequent occurrences.
WHY:  Episodic memory is recall ("have I seen this?"). Procedural
      memory is automation ("I have seen this pattern enough times
      to skip thinking"). Cuts LLM cost on the hottest paths to
      $0.
GOTCHA: Always log when a procedural rule fires so a regression
        can be detected. Auto-expire rules that have not fired in
        90 days — data drifts.
"""

import json
import os
import time
from dataclasses import dataclass, field
from typing import Callable


@dataclass
class ProceduralRule:
    rule_id: str
    pattern: dict           # e.g., {"matcher": "agent_address"}
    outcome: str            # e.g., "merge"
    confidence_floor: float
    times_fired: int = 0
    last_fired_at: float = 0.0


class ProceduralMemory:
    """File-backed rule store; trivially swappable for Postgres."""

    def __init__(self, path: str = "data/procedural_rules.json"):
        self.path = path
        self.rules: dict[str, ProceduralRule] = {}
        self._load()

    def _load(self) -> None:
        if not os.path.exists(self.path):
            return
        with open(self.path, "r", encoding="utf-8") as f:
            for r in json.load(f):
                self.rules[r["rule_id"]] = ProceduralRule(**r)

    def _save(self) -> None:
        os.makedirs(os.path.dirname(self.path) or ".",
                    exist_ok=True)
        with open(self.path, "w", encoding="utf-8") as f:
            json.dump([r.__dict__ for r in self.rules.values()],
                      f, indent=2)

    def add(self, rule: ProceduralRule) -> None:
        self.rules[rule.rule_id] = rule
        self._save()

    def fire(self, rule_id: str) -> None:
        r = self.rules.get(rule_id)
        if r:
            r.times_fired += 1
            r.last_fired_at = time.time()
            self._save()

    def find_match(self, candidate: dict,
                   matchers: dict[str, Callable[[dict], bool]]
                   ) -> ProceduralRule | None:
        """Return the first rule whose matcher accepts candidate."""
        for rule in self.rules.values():
            key = rule.pattern.get("matcher")
            fn = matchers.get(key)
            if fn and fn(candidate):
                self.fire(rule.rule_id)
                return rule
        return None


# Default matchers (extend as you learn new patterns)
DEFAULT_MATCHERS = {
    "agent_address": lambda c: bool(
        c.get("registered_agent") and c.get("agent_address")),
    "exact_normalized_name": lambda c: bool(
        c.get("normalized_name_match_count", 0) >= 1),
}
🎯 What Just Happened?

You filled in the two missing memory tiers. Working memory tracks the live pipeline run (HITL queue depth, in-flight calls, per-outcome counters) and feeds the dashboard. Procedural memory promotes patterns that the steward has confirmed N times into deterministic rules — the second occurrence of "matching registered agent + address" now resolves with zero LLM tokens. Together with episodic memory (Step 5), the three-tier system delivers the M11 multi-layer pattern.

Step 5c: Operations Dashboard

What & Why: The dashboard reads the tracer JSONL plus working-memory snapshots and serves the 8 metric panels (records/state, entity confidence distribution, quality trends, latency, cost-per-record, HITL queue, circuit-breaker state, procedural-rule firings) on a small FastAPI app. Production swap-in: feed BigQuery + a real BI tool (Looker/Metabase). For local dev, this Flask-style stub works.

"""observability/dashboard.py — 8-panel ops dashboard.

WHAT: Reads `tracer.jsonl` + working-memory snapshot, computes
      metrics, serves them as JSON for the front-end (or Looker).
WHY:  Without the dashboard, the 3 AM oncall has no visibility.
      The 8 panels surface the 8 questions oncall actually asks.
GOTCHA: Metrics are eventually-consistent — tracer flushes
        every 5 seconds. Don't expect sub-second freshness.
"""

import json
from collections import defaultdict
from pathlib import Path

from fastapi import FastAPI

from memory.working import WorkingMemory

app = FastAPI(title="UCC Pipeline Dashboard")

# Wire to your run's working memory at startup
WORKING: WorkingMemory | None = None
TRACER_PATH = Path("logs/tracer.jsonl")


def _load_spans() -> list[dict]:
    if not TRACER_PATH.exists():
        return []
    return [json.loads(l) for l in TRACER_PATH.read_text().splitlines()]


@app.get("/dashboard/metrics")
def metrics():
    spans = _load_spans()

    # Panel 1: records per state (last run)
    by_state: dict[str, int] = defaultdict(int)
    # Panel 2: entity-resolution confidence histogram (10 buckets)
    conf_hist = [0] * 10
    # Panel 3: data quality score per state
    quality: dict[str, list[float]] = defaultdict(list)
    # Panel 4: pipeline latency per stage (p50, p95)
    latency: dict[str, list[float]] = defaultdict(list)
    # Panel 5: cost per record by model
    cost: dict[str, float] = defaultdict(float)
    # Panel 6: procedural-rule firings
    rule_fires: dict[str, int] = defaultdict(int)

    for s in spans:
        st = s.get("state_code")
        if st:
            by_state[st] += s.get("records", 0)
            quality[st].append(s.get("quality_score", 1.0))
        if s.get("name") == "resolve_entity":
            conf = float(s.get("confidence", 0))
            conf_hist[min(int(conf * 10), 9)] += 1
        latency[s.get("stage", "unknown")].append(
            s.get("duration_ms", 0))
        cost[s.get("model", "n/a")] += s.get("cost_usd", 0)
        if s.get("name") == "procedural_rule_fired":
            rule_fires[s.get("rule_id", "?")] += 1

    def pct(values, p):
        if not values:
            return 0
        values = sorted(values)
        return values[min(int(len(values) * p / 100),
                          len(values) - 1)]

    return {
        "panel_1_records_per_state": dict(by_state),
        "panel_2_confidence_histogram": conf_hist,
        "panel_3_quality_per_state": {
            k: round(sum(v) / len(v), 3)
            for k, v in quality.items()},
        "panel_4_latency_p50_p95": {
            k: {"p50": pct(v, 50), "p95": pct(v, 95)}
            for k, v in latency.items()},
        "panel_5_cost_by_model": {
            k: round(v, 4) for k, v in cost.items()},
        "panel_6_procedural_fires": dict(rule_fires),
        "panel_7_circuit_breaker": (
            "open" if any(s.get("name") ==
                          "circuit_breaker_open" for s in spans)
            else "closed"),
        "panel_8_working_memory": (
            WORKING.snapshot() if WORKING else None),
    }


@app.get("/dashboard")
def dashboard_html():
    """Minimal HTML shell — points to /dashboard/metrics for data."""
    return {
        "html": ("<html><body><h1>UCC Pipeline Dashboard"
                 "</h1><p>Fetch /dashboard/metrics"
                 " for JSON, or wire to Looker/Metabase.</p>"
                 "</body></html>")
    }
🎯 What Just Happened?

You backed the animated dashboard mockup with a real FastAPI module that reads tracer.jsonl and the working-memory snapshot, and emits the 8 ops panels as JSON. Production teams replace the JSON endpoint with a Looker / Metabase / Grafana board pointed at BigQuery. The capstone deliberately ships the JSON skeleton instead of a heavyweight UI dependency.

Step 6: Build the Observability Tracer

What & Why: Every LLM call, tool invocation, and agent handoff must be traced for debugging and compliance. When a 3 AM batch fails on filing 7,834, you need to know exactly which function failed, how long it ran, and what error occurred. The trace_span decorator wraps any function with automatic span recording.

Create a new file called observability/tracer.py:

"""observability/tracer.py — trace_span decorator for recording function calls."""
from __future__ import annotations
import functools, time, uuid, json
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any, Callable


@dataclass
class Span:
    span_id: str
    function_name: str
    start_time: str
    end_time: str = ""
    duration_ms: float = 0.0
    status: str = "pending"      # "ok" | "error"
    error_message: str = ""
    metadata: dict = field(default_factory=dict)


class TraceCollector:
    """Collects spans from decorated functions."""

    def __init__(self):
        self.spans: list[Span] = []

    def trace_span(self, **meta: Any) -> Callable:
        """Decorator that records function call, duration, and status."""
        def decorator(fn: Callable) -> Callable:
            @functools.wraps(fn)
            def wrapper(*args: Any, **kwargs: Any) -> Any:
                span = Span(
                    span_id=uuid.uuid4().hex[:12],
                    function_name=fn.__name__,
                    start_time=datetime.now(timezone.utc).isoformat(),
                    metadata=meta,
                )
                try:
                    result = fn(*args, **kwargs)
                    span.status = "ok"
                    return result
                except Exception as e:
                    span.status = "error"
                    span.error_message = str(e)
                    raise
                finally:
                    span.end_time = datetime.now(timezone.utc).isoformat()
                    start = datetime.fromisoformat(span.start_time)
                    end = datetime.fromisoformat(span.end_time)
                    span.duration_ms = (end - start).total_seconds() * 1000
                    self.spans.append(span)
            return wrapper
        return decorator

    def summary(self) -> dict:
        ok = sum(1 for s in self.spans if s.status == "ok")
        err = sum(1 for s in self.spans if s.status == "error")
        avg_ms = sum(s.duration_ms for s in self.spans) / len(self.spans) if self.spans else 0
        return {"total": len(self.spans), "ok": ok, "errors": err,
                "avg_duration_ms": round(avg_ms, 2)}

    def export_json(self) -> str:
        return json.dumps([asdict(s) for s in self.spans], indent=2)


if __name__ == "__main__":
    tracer = TraceCollector()

    @tracer.trace_span(task="entity_resolution", model="claude-sonnet-4-6")
    def resolve_entity(name: str) -> str:
        time.sleep(0.05)
        return f"ENT-{hash(name) % 10000:05d}"

    @tracer.trace_span(task="format_validation", model="claude-haiku-4-5-20251001")
    def validate_format(row: dict) -> bool:
        if not row.get("debtor_name"):
            raise ValueError("Missing debtor_name field")
        return True

    resolve_entity("ACME LLC")
    resolve_entity("Beta Corp")
    try:
        validate_format({})
    except ValueError:
        pass

    print(json.dumps(tracer.summary(), indent=2))

Run: python observability/tracer.py

Expected output:

{
  "total": 3,
  "ok": 2,
  "errors": 1,
  "avg_duration_ms": 34.21
}
Checkpoint

You should see 3 total spans: 2 OK (the entity resolutions) and 1 error (the format validation with missing debtor_name). The avg_duration_ms will vary on your machine. If you see 0 spans, check that the decorator is applied correctly. If validate_format does not show as an error, check that the except block in the decorator re-raises the exception.

Step 7: Build the Bronze Layer (Raw Ingestion)

What & Why: The Bronze layer reads raw UCC filings and applies quality gates. Valid filings pass through; invalid filings are quarantined for review. This is the first stage of the Medallion Architecture — no transformation, just ingestion with quality tracking.

Create a new file called pipeline/bronze_layer.py:

"""pipeline/bronze_layer.py — Raw data ingestion with quality gates."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from guardrails.quality_gates import validate_batch
from guardrails.circuit_breaker import CircuitBreaker


def ingest_bronze(filings: list[dict], circuit_breaker: CircuitBreaker | None = None) -> dict:
    """Ingest raw filings into the Bronze layer.

    Validates each filing and separates into valid/quarantined batches.
    If circuit breaker is provided, trips on excessive error rate.
    """
    validation = validate_batch(filings)

    valid_filings = []
    quarantined = []

    for filing, result in zip(filings, validation["results"]):
        if result["is_valid"]:
            valid_filings.append(filing)
            if circuit_breaker:
                circuit_breaker.record(True)
        else:
            quarantined.append({
                "filing": filing,
                "issues": result["issues"],
            })
            if circuit_breaker:
                circuit_breaker.record(False)

        # Check circuit breaker after each record
        if circuit_breaker and circuit_breaker.is_tripped:
            remaining = len(filings) - len(valid_filings) - len(quarantined)
            return {
                "status": "circuit_breaker_tripped",
                "reason": circuit_breaker.trip_reason,
                "valid": valid_filings,
                "quarantined": quarantined,
                "unprocessed": remaining,
            }

    return {
        "status": "complete",
        "valid": valid_filings,
        "quarantined": quarantined,
        "unprocessed": 0,
        "summary": {
            "total": len(filings),
            "valid": len(valid_filings),
            "quarantined": len(quarantined),
            "error_rate": validation["error_rate"],
        },
    }


if __name__ == "__main__":
    from mock_data import MOCK_UCC_FILINGS
    cb = CircuitBreaker(threshold=0.10, window_size=100)
    result = ingest_bronze(MOCK_UCC_FILINGS, cb)
    print(f"Bronze layer: {result['status']}")
    print(f"  Valid: {len(result['valid'])}")
    print(f"  Quarantined: {len(result['quarantined'])}")
    for q in result["quarantined"]:
        print(f"    {q['filing']['filing_number']}: {', '.join(q['issues'])}")

Run: python pipeline/bronze_layer.py

Expected output:

Bronze layer: complete
  Valid: 10
  Quarantined: 5
    OH-2024-0066000: missing_debtor_name
    PA-2024-0088999: missing_secured_party_name
    WA-2024-007: non_standard_filing_number, non_standard_date_format
    GA-2024-0044556: missing_collateral_description, expiration_equals_filing_date
    NV-2024-0055667: whitespace_in_names, non_standard_filing_type
Checkpoint

You should see 10 valid filings and 5 quarantined. The circuit breaker did not trip because the 15-filing batch is smaller than the window size (100). If you see different counts, recheck your quality gate rules in Step 3.

Step 8: Build the Silver Layer (Normalization)

What & Why: The Silver layer normalizes validated filings into a consistent schema. It standardizes date formats, trims whitespace, normalizes filing types, and enriches records with metadata. This is the data cleaning stage — the output should be uniform regardless of which state the data came from.

Create a new file called pipeline/silver_layer.py:

"""pipeline/silver_layer.py — Data normalization and standardization."""
from __future__ import annotations
import re
from datetime import datetime


def normalize_filing(filing: dict) -> dict:
    """Normalize a single UCC filing to standard format."""
    normalized = dict(filing)

    # Normalize text fields: strip whitespace, tabs, newlines
    for field in ["debtor_name", "secured_party_name", "collateral_description"]:
        val = normalized.get(field, "")
        normalized[field] = " ".join(val.split()).strip()

    # Normalize date format to YYYY-MM-DD
    for date_field in ["filing_date", "expiration_date"]:
        val = normalized.get(date_field)
        if val:
            # Handle slash-separated dates
            normalized[date_field] = val.replace("/", "-")

    # Normalize filing type
    type_map = {"UCC1": "UCC-1", "UCC-1": "UCC-1", "UCC3": "UCC-3",
                "UCC-3 amendment": "UCC-3 amendment",
                "UCC-3 continuation": "UCC-3 continuation",
                "UCC-3 termination": "UCC-3 termination"}
    ft = normalized.get("filing_type", "")
    normalized["filing_type"] = type_map.get(ft, ft)

    # Add processing metadata
    normalized["_silver_processed_at"] = datetime.utcnow().isoformat()
    normalized["_silver_version"] = "1.0"

    return normalized


def transform_silver(valid_filings: list[dict]) -> list[dict]:
    """Transform a batch of validated filings to Silver layer format."""
    return [normalize_filing(f) for f in valid_filings]


if __name__ == "__main__":
    from mock_data import MOCK_UCC_FILINGS
    # Take only valid filings (first 5 for demo)
    sample = MOCK_UCC_FILINGS[:3]
    silver = transform_silver(sample)
    for s in silver:
        print(f"{s['filing_number']}: {s['debtor_name']} | type={s['filing_type']}")

Run: python pipeline/silver_layer.py

Checkpoint

You should see 3 normalized filings with clean names, standard date formats, and _silver_processed_at metadata. If the dates still have slashes, check your normalization logic. This step depends on Steps 1 and the mock data file.

Step 9: Build the Gold Layer (Entity Resolution & Risk Profiles)

What & Why: The Gold layer is where the AI does the heavy lifting. It uses episodic memory to resolve entities (matching name variants like "ACME MFG LLC" and "Acme Manufacturing LLC" to the same entity), classifies collateral, and generates risk profiles. This is the business-ready analytics layer — the output feeds dashboards and credit decisions.

Create a new file called pipeline/gold_layer.py:

"""pipeline/gold_layer.py — Entity resolution, risk profiles, and analytics."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from memory.episodic import EpisodicMemory
from routing.model_router import route_to_model, estimate_cost


class GoldLayer:
    """Produces business-ready analytics from Silver layer data."""

    def __init__(self, episodic_memory: EpisodicMemory):
        self.memory = episodic_memory
        self.entities: dict[str, dict] = {}  # entity_id -> profile
        self._total_cost = 0.0
        self._next_entity_id = 1

    def resolve_entity(self, filing: dict) -> dict:
        """Resolve debtor name to an entity ID using memory + model routing."""
        debtor = filing["debtor_name"]

        # Step 1: Check episodic memory for similar past resolutions
        episodes = self.memory.recall(debtor)
        if episodes and episodes[0]["similarity"] > 0.85:
            entity_id = episodes[0]["episode"]["resolved_entity_id"]
            return {
                "entity_id": entity_id,
                "confidence": episodes[0]["similarity"],
                "method": "episodic_memory",
                "cost": 0.0,
            }

        # Step 2: Route to LLM for resolution (mock API call)
        routing = route_to_model("entity_resolution")
        cost = estimate_cost(routing["model"], 800, 300)
        self._total_cost += cost

        # Mock resolution: create new entity
        entity_id = f"ENT-{self._next_entity_id:05d}"
        self._next_entity_id += 1

        # Store in episodic memory for future recall
        self.memory.store(debtor, entity_id, "auto",
                         f"New entity created from filing {filing['filing_number']}")

        return {
            "entity_id": entity_id,
            "confidence": 0.75,
            "method": "llm_resolution",
            "cost": cost,
            "model": routing["model"],
        }

    def build_risk_profile(self, entity_id: str, filings: list[dict]) -> dict:
        """Build a risk profile for a resolved entity."""
        active = [f for f in filings
                  if f.get("filing_type") != "UCC-3 termination"]
        return {
            "entity_id": entity_id,
            "total_filings": len(filings),
            "active_liens": len(active),
            "states": list(set(f["state_code"] for f in filings)),
            "secured_parties": list(set(f["secured_party_name"] for f in filings)),
            "risk_score": min(1.0, len(active) * 0.2),
        }

    def process_batch(self, silver_filings: list[dict]) -> dict:
        """Process a batch of Silver filings into Gold analytics."""
        resolutions = []
        entity_filings: dict[str, list[dict]] = {}

        for filing in silver_filings:
            result = self.resolve_entity(filing)
            resolutions.append(result)
            eid = result["entity_id"]
            entity_filings.setdefault(eid, []).append(filing)

        profiles = {
            eid: self.build_risk_profile(eid, flist)
            for eid, flist in entity_filings.items()
        }

        return {
            "resolutions": resolutions,
            "profiles": profiles,
            "total_cost": round(self._total_cost, 6),
            "entities_resolved": len(profiles),
        }


if __name__ == "__main__":
    from mock_data import MOCK_UCC_FILINGS
    mem = EpisodicMemory(similarity_threshold=0.75)

    # Seed memory with one known entity
    mem.store("Acme Manufacturing LLC", "ENT-00001",
              "steward_verified", "Verified entity from prior batch")

    gold = GoldLayer(mem)
    # Process first 5 clean filings
    result = gold.process_batch(MOCK_UCC_FILINGS[:5])
    print(f"Gold layer: {result['entities_resolved']} entities, "
          f"cost=${result['total_cost']:.4f}")
    for eid, profile in result["profiles"].items():
        print(f"  {eid}: {profile['active_liens']} active liens, "
              f"risk={profile['risk_score']:.1f}, states={profile['states']}")

Run: python pipeline/gold_layer.py

Checkpoint

You should see entity resolution results. "Acme Manufacturing LLC" should resolve via episodic memory (cost $0.00) because you seeded it. Other entities go through LLM resolution (with cost). If episodic memory is not matching, check the similarity threshold in Step 5. This step depends on Steps 2, 5, and mock_data.py.

Step 10: Build the Production Orchestrator

What & Why: The orchestrator ties everything together. It runs the full pipeline: Bronze (ingest + validate) → Silver (normalize) → Gold (entity resolution + risk profiles), with tracing on every stage and circuit breaker protection. This is the main entry point for batch processing.

Create a new file called pipeline/orchestrator.py:

"""pipeline/orchestrator.py — Main production orchestrator."""
from __future__ import annotations
import json, time, sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from pipeline.bronze_layer import ingest_bronze
from pipeline.silver_layer import transform_silver
from pipeline.gold_layer import GoldLayer
from memory.episodic import EpisodicMemory
from observability.tracer import TraceCollector
from guardrails.circuit_breaker import CircuitBreaker


def run_pipeline(filings: list[dict], tracer: TraceCollector | None = None) -> dict:
    """Run the complete Bronze → Silver → Gold pipeline."""
    if tracer is None:
        tracer = TraceCollector()

    # Initialize components
    cb = CircuitBreaker(threshold=0.10, window_size=100)
    memory = EpisodicMemory(similarity_threshold=0.75)
    gold = GoldLayer(memory)

    results = {"stages": {}}
    pipeline_start = time.perf_counter()

    # --- BRONZE LAYER ---
    @tracer.trace_span(stage="bronze", description="Raw ingestion + quality gates")
    def run_bronze():
        return ingest_bronze(filings, cb)

    bronze_result = run_bronze()
    results["stages"]["bronze"] = {
        "status": bronze_result["status"],
        "valid": len(bronze_result["valid"]),
        "quarantined": len(bronze_result["quarantined"]),
    }

    if bronze_result["status"] == "circuit_breaker_tripped":
        results["pipeline_status"] = "halted"
        results["halt_reason"] = bronze_result["reason"]
        return results

    # --- SILVER LAYER ---
    @tracer.trace_span(stage="silver", description="Normalization + standardization")
    def run_silver():
        return transform_silver(bronze_result["valid"])

    silver_filings = run_silver()
    results["stages"]["silver"] = {"records": len(silver_filings)}

    # --- GOLD LAYER ---
    @tracer.trace_span(stage="gold", description="Entity resolution + risk profiles")
    def run_gold():
        return gold.process_batch(silver_filings)

    gold_result = run_gold()
    results["stages"]["gold"] = {
        "entities": gold_result["entities_resolved"],
        "cost": gold_result["total_cost"],
    }

    # --- PIPELINE SUMMARY ---
    elapsed = (time.perf_counter() - pipeline_start) * 1000
    results["pipeline_status"] = "complete"
    results["total_filings"] = len(filings)
    results["elapsed_ms"] = round(elapsed, 2)
    results["tracing"] = tracer.summary()

    return results


if __name__ == "__main__":
    from mock_data import MOCK_UCC_FILINGS

    tracer = TraceCollector()
    result = run_pipeline(MOCK_UCC_FILINGS, tracer)

    print("=" * 60)
    print("PRODUCTION UCC PIPELINE — RUN COMPLETE")
    print("=" * 60)
    print(f"Status: {result['pipeline_status']}")
    print(f"Total filings: {result.get('total_filings', 'N/A')}")
    print(f"Elapsed: {result.get('elapsed_ms', 'N/A')}ms")
    print()
    for stage, data in result["stages"].items():
        print(f"  [{stage.upper()}] {json.dumps(data)}")
    print()
    print(f"Tracing: {json.dumps(result.get('tracing', {}))}")

Run: python pipeline/orchestrator.py

Expected output:

============================================================
PRODUCTION UCC PIPELINE — RUN COMPLETE
============================================================
Status: complete
Total filings: 15
Elapsed: 42.31ms

  [BRONZE] {"status": "complete", "valid": 10, "quarantined": 5}
  [SILVER] {"records": 10}
  [GOLD] {"entities": 8, "cost": 0.0036}

Tracing: {"total": 3, "ok": 3, "errors": 0, "avg_duration_ms": 14.1}
Checkpoint

You should see all three pipeline stages complete successfully. Bronze should quarantine 5 filings, Silver should process 10, and Gold should resolve entities and show a small cost. The tracing summary confirms all 3 stages were traced with no errors. If any stage fails, check that all the files from Steps 1–9 are in the correct directories.

Troubleshooting Steps 1–10

ModuleNotFoundError: No module named 'config' — Make sure you are running from the capstone-5-production-ucc/ root directory. All imports use sys.path to find the project root.

ModuleNotFoundError: No module named 'pipeline' — Create empty __init__.py files in each subdirectory: touch pipeline/__init__.py memory/__init__.py observability/__init__.py guardrails/__init__.py routing/__init__.py

ImportError: cannot import name 'CircuitBreaker' — Ensure you saved guardrails/circuit_breaker.py (Step 4) before running the bronze layer (Step 7).

Episodic memory returns 0 results — Lower the similarity_threshold in config.py from 0.80 to 0.70. The mock embeddings use character frequency vectors which have lower resolution than real embeddings.

Deployment Architecture

FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=5s \
  CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
version: "3.8"
services:
  api:
    build: .
    ports: ["8000:8000"]
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
      - CHROMA_URL=http://chroma:8001
    depends_on: [redis, chroma]
  worker:
    build: .
    command: celery -A tasks worker --concurrency=10
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on: [redis, chroma]
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
  chroma:
    image: chromadb/chroma:latest
    ports: ["8001:8000"]
    volumes: ["chroma-data:/chroma/chroma"]
volumes:
  chroma-data:
What Just Happened?

You defined a containerized deployment with 4 services: the FastAPI application (handles HTTP requests), Celery workers (process filing batches asynchronously), Redis (message queue), and ChromaDB (vector database for episodic memory). The API key is passed via environment variable — never hardcoded.

GCP Deployment: Cloud Run + Cloud Composer + GCS + BigQuery

Production for this pipeline runs on Google Cloud, not bare Docker. The Bronze layer ingests from a GCS bucket where state SOS systems drop bulk files. Cloud Run hosts the API and Celery workers (autoscaled). Cloud Composer (managed Airflow) orchestrates the daily Bronze→Silver→Gold pipeline. Gold-layer outputs land in BigQuery for analyst queries. Five small modules below wire it all up. They use the official Google Cloud client libraries; for local development they fall back to in-memory fakes so you can run the capstone end-to-end without a GCP account.

"""deployment/gcs_trigger.py — Cloud Function: GCS file drop -> pipeline.

WHAT: Eventarc-triggered Cloud Function fired when a state SOS
      system drops a new bulk filing file into the
      'ucc-bronze-inbound' GCS bucket. It reads metadata,
      validates the state code, and POSTs a job to Cloud Run to
      kick off the Bronze ingestion.
WHY:  Event-driven beats polling. State data drops happen at
      irregular hours; a polling loop wastes API calls and
      adds latency. Eventarc fires within ~2s of the file landing.
GOTCHA: Eventarc may deliver the same event TWICE on retry. The
        ingestion endpoint MUST be idempotent (use the file's
        gs:// URI as the dedup key).
"""

import base64
import json
import os

import functions_framework
from google.cloud import pubsub_v1, storage

PROJECT = os.environ["GCP_PROJECT"]
TOPIC = os.environ.get("INGEST_TOPIC", "ucc-ingest-jobs")
_publisher = pubsub_v1.PublisherClient()
_topic_path = _publisher.topic_path(PROJECT, TOPIC)
_storage = storage.Client()

VALID_STATES = {"DE", "NY", "TX", "CA", "MA", "IL",
                "NV", "WA", "FL", "GA"}


@functions_framework.cloud_event
def on_filing_drop(cloud_event) -> None:
    """Triggered by google.cloud.storage.object.v1.finalized."""
    data = cloud_event.data
    bucket = data["bucket"]
    name = data["name"]
    gcs_uri = f"gs://{bucket}/{name}"

    state_code = name.split("/")[0].upper() if "/" in name else None
    if state_code not in VALID_STATES:
        print(f"[gcs_trigger] skip non-state path: {name}")
        return

    blob = _storage.bucket(bucket).get_blob(name)
    payload = {
        "gcs_uri": gcs_uri,
        "state_code": state_code,
        "size_bytes": blob.size if blob else 0,
        "md5": blob.md5_hash if blob else None,
        "content_type": (blob.content_type if blob
                         else "application/octet-stream"),
    }
    future = _publisher.publish(
        _topic_path,
        json.dumps(payload).encode("utf-8"),
        # Pub/Sub orderingKey ensures one state's files
        # process in arrival order:
        ordering_key=state_code,
    )
    msg_id = future.result(timeout=30)
    print(f"[gcs_trigger] enqueued {gcs_uri} -> msg {msg_id}")
"""deployment/bigquery_loader.py — Gold-layer outputs -> BigQuery.

WHAT: Loads the Gold layer's resolved entity profiles, lien
      summaries, and quality scorecards into BigQuery so analysts
      and the risk-scoring service can query them with SQL.
WHY:  ChromaDB is a vector store, not an analytics warehouse.
      BigQuery gives partitioned tables, column-level encryption,
      cross-state aggregations, and IAM-controlled access for
      downstream consumers (risk team, compliance, data science).
GOTCHA: Use partitioned + clustered tables. Partition by
        ingestion_date, cluster by (state_code, debtor_normalized).
        That keeps a typical "all 2026 DE filings for Acme" query
        under $0.05.
"""

import json
import os
from datetime import datetime, timezone

from google.cloud import bigquery

_client = bigquery.Client()
DATASET = os.environ.get("BQ_DATASET", "ucc_gold")
ENTITY_TABLE = f"{DATASET}.resolved_entities"
LIEN_TABLE = f"{DATASET}.lien_summaries"
QUALITY_TABLE = f"{DATASET}.quality_scorecards"


_ENTITY_SCHEMA = [
    bigquery.SchemaField("entity_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("debtor_normalized", "STRING",
                         mode="REQUIRED"),
    bigquery.SchemaField("state_code", "STRING",
                         mode="REQUIRED"),
    bigquery.SchemaField("filing_count", "INTEGER"),
    bigquery.SchemaField("active_lien_count", "INTEGER"),
    bigquery.SchemaField("total_collateral_value", "FLOAT"),
    bigquery.SchemaField("resolution_confidence", "FLOAT"),
    bigquery.SchemaField("first_filing_date", "DATE"),
    bigquery.SchemaField("last_filing_date", "DATE"),
    bigquery.SchemaField("ingestion_date", "TIMESTAMP",
                         mode="REQUIRED"),
]


def ensure_tables() -> None:
    """Idempotent: create dataset + tables if missing."""
    dataset_ref = bigquery.Dataset(_client.dataset(DATASET))
    dataset_ref.location = "US"
    _client.create_dataset(dataset_ref, exists_ok=True)

    table = bigquery.Table(ENTITY_TABLE, schema=_ENTITY_SCHEMA)
    table.time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="ingestion_date",
    )
    table.clustering_fields = ["state_code", "debtor_normalized"]
    _client.create_table(table, exists_ok=True)


def load_entities(rows: list[dict]) -> int:
    """Streaming insert of Gold-layer entity rows."""
    if not rows:
        return 0
    now = datetime.now(timezone.utc).isoformat()
    enriched = [{**r, "ingestion_date": now} for r in rows]
    errors = _client.insert_rows_json(ENTITY_TABLE, enriched)
    if errors:
        raise RuntimeError(f"BQ insert errors: {errors}")
    return len(enriched)


def query_entity_exposure(debtor: str,
                           min_active_liens: int = 1) -> list:
    """Sample analytical query: cross-state exposure for a debtor."""
    sql = f"""
        SELECT state_code,
               COUNT(*) AS filing_count,
               SUM(active_lien_count) AS total_active,
               SUM(total_collateral_value) AS total_collateral
        FROM `{ENTITY_TABLE}`
        WHERE LOWER(debtor_normalized) = LOWER(@debtor)
          AND active_lien_count >= @min_active
        GROUP BY state_code
        ORDER BY total_collateral DESC
    """
    cfg = bigquery.QueryJobConfig(query_parameters=[
        bigquery.ScalarQueryParameter("debtor", "STRING", debtor),
        bigquery.ScalarQueryParameter("min_active", "INT64",
                                      min_active_liens),
    ])
    return [dict(r) for r in _client.query(sql, job_config=cfg)]
# deployment/cloudrun.yaml — Cloud Run service definition
#
# WHAT: Deploys the FastAPI orchestrator + Celery worker as two
#       Cloud Run services with autoscaling.
# WHY:  Cloud Run handles cold starts, TLS, IAM, autoscaling, and
#       pay-per-request billing. No servers to patch.
# DEPLOY: gcloud run services replace cloudrun.yaml --region us-central1
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: ucc-orchestrator
  annotations:
    run.googleapis.com/launch-stage: GA
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/minScale: "1"
        autoscaling.knative.dev/maxScale: "30"
        run.googleapis.com/cpu-throttling: "false"
    spec:
      serviceAccountName: ucc-orchestrator@PROJECT.iam.gserviceaccount.com
      timeoutSeconds: 300
      containers:
        - image: us-central1-docker.pkg.dev/PROJECT/ucc/orchestrator:latest
          ports:
            - containerPort: 8000
          env:
            - name: GCP_PROJECT
              value: PROJECT
            - name: BQ_DATASET
              value: ucc_gold
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: anthropic-key
                  key: latest
          resources:
            limits:
              cpu: "2"
              memory: "2Gi"
          startupProbe:
            httpGet:
              path: /healthz
              port: 8000
            failureThreshold: 30
            periodSeconds: 5
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: ucc-worker
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/minScale: "0"
        autoscaling.knative.dev/maxScale: "10"
    spec:
      containerConcurrency: 1   # one job at a time per worker
      timeoutSeconds: 1800      # 30 min for long batches
      containers:
        - image: us-central1-docker.pkg.dev/PROJECT/ucc/worker:latest
          command: ["celery", "-A", "tasks", "worker",
                    "--concurrency=4"]
          env:
            - name: GCP_PROJECT
              value: PROJECT
"""deployment/airflow_dag.py — Cloud Composer daily orchestration.

WHAT: Daily DAG: 1) reconcile yesterday's GCS drops vs. expected
      state schedules, 2) trigger Bronze re-ingestion for any
      missing batches, 3) run Silver normalization, 4) run Gold
      entity resolution + lien profile generation, 5) load Gold
      to BigQuery, 6) email a quality scorecard.
WHY:  Eventarc handles real-time. Airflow handles "what was
      supposed to happen yesterday but didn't." It's the
      reconciliation layer that catches missed drops.
GOTCHA: Always pass execution_date as input to make the DAG
        deterministic and re-runnable.
"""

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.cloud_run \
    import CloudRunInvokeJobOperator
from airflow.providers.google.cloud.transfers.\
    local_to_gcs import LocalFilesystemToGCSOperator

DEFAULT_ARGS = {
    "owner": "data-eng",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "depends_on_past": False,
}


def reconcile_drops(**ctx) -> list:
    """Compare expected drops vs actual; return missing list."""
    from deployment.reconciler import find_missing_drops
    exec_date = ctx["execution_date"].date().isoformat()
    return find_missing_drops(exec_date)


def fail_if_quality_drops(**ctx) -> None:
    """Halt the DAG if quality score < 0.85."""
    from pipeline.quality_gates import compute_quality_score
    score = compute_quality_score(ctx["execution_date"])
    if score < 0.85:
        raise ValueError(f"Quality below threshold: {score}")


with DAG(
    dag_id="ucc_daily",
    default_args=DEFAULT_ARGS,
    start_date=datetime(2026, 1, 1),
    schedule_interval="0 6 * * *",  # 06:00 UTC daily
    catchup=False,
    tags=["ucc", "bronze-silver-gold"],
) as dag:

    reconcile = PythonOperator(
        task_id="reconcile_drops",
        python_callable=reconcile_drops,
    )

    bronze = CloudRunInvokeJobOperator(
        task_id="bronze_ingest",
        project_id="PROJECT",
        region="us-central1",
        job_name="ucc-bronze-job",
    )

    silver = CloudRunInvokeJobOperator(
        task_id="silver_normalize",
        project_id="PROJECT",
        region="us-central1",
        job_name="ucc-silver-job",
    )

    gold = CloudRunInvokeJobOperator(
        task_id="gold_resolve",
        project_id="PROJECT",
        region="us-central1",
        job_name="ucc-gold-job",
    )

    quality_gate = PythonOperator(
        task_id="quality_gate",
        python_callable=fail_if_quality_drops,
    )

    load_bq = CloudRunInvokeJobOperator(
        task_id="load_bigquery",
        project_id="PROJECT",
        region="us-central1",
        job_name="ucc-bq-load-job",
    )

    reconcile >> bronze >> silver >> gold >> quality_gate >> load_bq
# deployment/terraform/main.tf — one-shot provisioning
#
# Provisions: GCS buckets, Pub/Sub topic, BigQuery dataset, Cloud
# Run service accounts, Eventarc trigger, Composer environment.
# Run: terraform init && terraform apply -var project=YOUR_PROJECT

terraform {
  required_providers {
    google = { source = "hashicorp/google", version = "~> 5.30" }
  }
}

provider "google" {
  project = var.project
  region  = "us-central1"
}

variable "project" { type = string }

# 1. Inbound GCS bucket (state SOS drops)
resource "google_storage_bucket" "bronze_inbound" {
  name          = "${var.project}-ucc-bronze-inbound"
  location      = "US"
  uniform_bucket_level_access = true
  versioning { enabled = true }
  lifecycle_rule {
    condition { age = 90 }
    action    { type = "SetStorageClass"
                storage_class = "NEARLINE" }
  }
}

# 2. Pub/Sub topic for ingest jobs
resource "google_pubsub_topic" "ingest_jobs" {
  name = "ucc-ingest-jobs"
  message_retention_duration = "604800s"  # 7 days
}

# 3. BigQuery dataset for Gold layer
resource "google_bigquery_dataset" "gold" {
  dataset_id    = "ucc_gold"
  location      = "US"
  description   = "UCC Gold layer: resolved entities + lien profiles"
  default_table_expiration_ms = 31536000000  # 1y
}

# 4. Eventarc trigger: GCS finalize -> Cloud Function
resource "google_eventarc_trigger" "filing_drop" {
  name     = "ucc-filing-drop"
  location = "us-central1"
  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }
  matching_criteria {
    attribute = "bucket"
    value     = google_storage_bucket.bronze_inbound.name
  }
  destination {
    cloud_run_service {
      service = "ucc-orchestrator"
      region  = "us-central1"
      path    = "/eventarc/filing-drop"
    }
  }
}

# 5. Cloud Composer (managed Airflow) for daily orchestration
resource "google_composer_environment" "ucc_daily" {
  name   = "ucc-daily"
  region = "us-central1"
  config {
    software_config {
      image_version = "composer-2-airflow-2"
      env_variables = {
        GCP_PROJECT  = var.project
        BQ_DATASET   = "ucc_gold"
      }
    }
  }
}
🎯 What Just Happened?

You wired the full GCP production stack: state SOS drops land in gs://*-ucc-bronze-inbound, an Eventarc trigger fires gcs_trigger.py within ~2s, the trigger publishes a Pub/Sub message ordered by state_code, Cloud Run autoscales the orchestrator + worker pods, the daily Composer DAG reconciles missed drops + runs Bronze→Silver→Gold + loads partitioned + clustered BigQuery tables, and Terraform provisions the whole thing in one shot. Local development still works by setting GCP_PROJECT=local — the GCS / Pub/Sub / BigQuery clients fall back to in-memory fakes.

Advanced RAG: Hybrid Search + Jurisdiction-Specific Re-Ranking

The Bronze and Silver layers handle structured fields. The Reporting Agent answers free-text analyst questions like "Has Acme defaulted on UCC-3 amendments in DE in the last 3 years?" — that needs RAG over UCC regulatory docs and state filing guides. Hybrid retrieval combines BM25 keyword matching (catches statute citations like 9-515(d)) with semantic similarity (catches conceptually related text). A jurisdiction-specific re-ranker boosts hits whose state_code matches the query's jurisdiction so Delaware questions get Delaware filing-guide answers first.

"""rag/hybrid_search.py — BM25 + cosine semantic for UCC docs.

WHAT: Retrieves top-K candidate passages from a corpus of UCC
      regulatory docs (Article 9), state filing guides, and
      collateral classification rules. Combines a BM25-like
      keyword score with a cosine semantic score (mock embeddings
      via bag-of-words for the capstone; swap to Voyage AI in
      production via the EMBEDDER env var).
WHY:  Pure semantic search misses literal statute citations
      ("UCC-1", "9-515(d)") because embeddings encode meaning,
      not exact strings. BM25 alone misses paraphrases. Hybrid
      catches both.
"""

import math
from collections import Counter

# Mock corpus: UCC regulatory docs + state filing guides
CORPUS = [
    {"doc_id": "UCC-9-515",
     "state_code": None,
     "title": "UCC 9-515 — Continuation",
     "text": ("A financing statement is effective for a period "
              "of five years after the date of filing. UCC 9-515 "
              "permits a continuation statement (UCC-3) within "
              "the six months immediately preceding lapse.")},
    {"doc_id": "UCC-9-310",
     "state_code": None,
     "title": "UCC 9-310 — Perfection by Filing",
     "text": ("Except as otherwise provided, a financing "
              "statement must be filed to perfect a security "
              "interest in collateral.")},
    {"doc_id": "DE-FILING-GUIDE",
     "state_code": "DE",
     "title": "Delaware UCC Filing Guide",
     "text": ("Delaware Department of State accepts UCC-1 and "
              "UCC-3 filings online. Standard fee $25. Searches "
              "are debtor-name only. UCC-3 termination releases "
              "the secured party's interest immediately.")},
    {"doc_id": "NY-FILING-GUIDE",
     "state_code": "NY",
     "title": "New York UCC Filing Guide",
     "text": ("NY Department of State filing system uses fixed-"
              "width text format for bulk downloads. Foreign "
              "entity numbering uses prefix 'FN-'. Continuation "
              "filings must reference original filing number.")},
    {"doc_id": "TX-FILING-GUIDE",
     "state_code": "TX",
     "title": "Texas UCC Filing Guide",
     "text": ("Texas SOS publishes daily CSV exports. Debtor "
              "names are normalized but suffix variants (LLC vs "
              "L.L.C.) are preserved verbatim, requiring "
              "client-side normalization.")},
    {"doc_id": "COLLATERAL-CLASS",
     "state_code": None,
     "title": "Collateral Classification Reference",
     "text": ("Common categories: inventory, equipment, "
              "accounts, instruments, chattel paper, "
              "investment property, all-assets blanket lien.")},
]


def _tokenize(text: str) -> list:
    return [w.strip(".,;:()") for w in text.lower().split()
            if len(w) > 1]


def _bm25(query: list, doc: list, all_docs: list,
          k1: float = 1.5, b: float = 0.75) -> float:
    """Standard BM25 score."""
    if not doc:
        return 0.0
    avgdl = sum(len(d) for d in all_docs) / max(len(all_docs), 1)
    n = len(all_docs)
    counts = Counter(doc)
    score = 0.0
    for term in query:
        df = sum(1 for d in all_docs if term in d)
        idf = math.log((n - df + 0.5) / (df + 0.5) + 1)
        tf = counts.get(term, 0)
        norm = 1 - b + b * len(doc) / avgdl
        score += idf * (tf * (k1 + 1)) / (tf + k1 * norm)
    return score


def _cosine_bow(a: list, b: list) -> float:
    """Bag-of-words cosine similarity."""
    ca, cb = Counter(a), Counter(b)
    dot = sum(ca[t] * cb[t] for t in ca)
    norm_a = math.sqrt(sum(v * v for v in ca.values()))
    norm_b = math.sqrt(sum(v * v for v in cb.values()))
    return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0


def search(query: str, top_k: int = 10,
           state_code: str | None = None) -> list:
    """Hybrid search returning top_k candidates.

    Each result has bm25_score, cosine_score, hybrid_score, and
    state_code so the re-ranker can apply jurisdiction boosts.
    """
    qtoks = _tokenize(query)
    all_doc_toks = [_tokenize(d["text"]) for d in CORPUS]
    results = []
    for doc, dtoks in zip(CORPUS, all_doc_toks):
        bm = _bm25(qtoks, dtoks, all_doc_toks)
        cs = _cosine_bow(qtoks, dtoks)
        # Hybrid: 0.4 BM25 (normalized) + 0.6 cosine
        bm_norm = min(bm / (len(qtoks) + 1), 1.0)
        hybrid = 0.4 * bm_norm + 0.6 * cs
        if hybrid < 0.01:
            continue
        results.append({**doc,
                        "bm25_score": round(bm_norm, 4),
                        "cosine_score": round(cs, 4),
                        "hybrid_score": round(hybrid, 4),
                        "query_state": state_code})
    results.sort(key=lambda x: x["hybrid_score"], reverse=True)
    return results[:top_k]
"""rag/re_ranker.py — jurisdiction-aware re-ranker (Haiku).

WHAT: Takes top-K hybrid results, asks Haiku to score each one
      against the query, applies a +0.25 score boost for results
      whose state_code matches the query's jurisdiction (so a DE
      question doesn't get a NY-specific answer at top-1).
WHY:  Hybrid surface is "topically related". This stage answers
      "does this passage actually answer THIS question for THIS
      jurisdiction?" Cost: ~$0.003 per re-rank call (Haiku).
GOTCHA: Falls back to hybrid order on Haiku JSON errors. Logs the
        fallback so the trace shows quality misses.
"""

import json
import anthropic

_client = anthropic.Anthropic()
RERANK_MODEL = "claude-haiku-4-5-20251001"
JURISDICTION_BOOST = 0.25


def _prompt(query: str, state_code: str | None,
            candidates: list) -> str:
    blocks = []
    for i, c in enumerate(candidates):
        sc = c.get("state_code") or "general"
        blocks.append(
            f"[{i}] (state={sc}) {c['title']}\n"
            f"    Snippet: {c['text'][:280]}"
        )
    juris = (f"The user is asking about jurisdiction "
             f"'{state_code}'. " if state_code else "")
    return (
        f"{juris}Score each passage 0.0-1.0 for relevance to "
        f"the query. Also return the single most relevant "
        f"sentence per passage (contextual compression). "
        f"Respond as JSON only.\n\n"
        f"Query: {query}\n\n"
        f"Candidates:\n" + "\n".join(blocks) + "\n\n"
        f"JSON: {{\"rankings\": [{{\"index\": int, \"score\": "
        f"float, \"top_sentence\": str}}, ...]}}"
    )


def rerank(query: str, candidates: list,
           state_code: str | None = None,
           top_k: int = 3) -> list:
    if not candidates:
        return []
    try:
        resp = _client.messages.create(
            model=RERANK_MODEL,
            max_tokens=1024,
            messages=[{"role": "user",
                       "content": _prompt(query, state_code,
                                          candidates)}],
        )
        text = resp.content[0].text
        if "```" in text:
            text = text.split("```")[1].lstrip("json\n")
        rankings = json.loads(text)["rankings"]
    except (anthropic.APIError, json.JSONDecodeError, KeyError,
            IndexError):
        return candidates[:top_k]

    enriched = []
    for r in rankings:
        idx = r["index"]
        if 0 <= idx < len(candidates):
            c = dict(candidates[idx])
            base = float(r["score"])
            # Jurisdiction boost: state-specific guides win for
            # state-specific questions.
            if state_code and c.get("state_code") == state_code:
                base = min(base + JURISDICTION_BOOST, 1.0)
            c["rerank_score"] = round(base, 4)
            c["compressed_snippet"] = r.get("top_sentence",
                                            c.get("text", ""))
            enriched.append(c)
    enriched.sort(key=lambda x: x["rerank_score"], reverse=True)
    return enriched[:top_k]
🎯 What Just Happened?

You implemented the third RAG pillar. Hybrid search returns 10 candidates blending BM25 (catches "9-515" or "UCC-1") with cosine semantic similarity (catches paraphrases). The Haiku re-ranker re-orders them and contextually compresses the top sentence per result. Critically, it applies a +0.25 jurisdiction boost so a Delaware question gets the Delaware filing guide before a generic Article 9 reference. Total added cost per query: ~$0.003 — less than 0.5% of the Opus calls in the orchestrator.

Evaluation Suite: 100 Cases

Test Case Breakdown
  • Clean filings (30): Standard filings with no ambiguity. All entity resolutions high confidence. Validates baseline accuracy.
  • High-confidence entity resolution (15): Name variants that should match with >85% confidence. Tests fuzzy matching and registry verification.
  • Low-confidence entity resolution (15): Ambiguous matches that should trigger steward review. Tests HITL routing and confidence calibration.
  • Collateral classification (10): Free-text collateral descriptions mapped to UCC categories. Tests Opus's classification accuracy on nuanced legal language.
  • Duplicate detection (5): Filings that appear twice in different formats. Tests deduplication logic.
  • Malformed records (10): Missing fields, encoding errors, invalid dates. Tests error handling and circuit breaker.
  • Edge cases (10): State format changes, vector DB unavailability, concurrent batch conflicts.
  • Adversarial (5): Injection attempts in debtor names, deliberate entity obfuscation, 5x max batch size.

Evaluation Runner (test_pipeline.py)

The 100-case suite is generated deterministically and run through the orchestrator. Accuracy targets: clean filings >95%, high-confidence resolution >90%, low-confidence resolution >75% (these go to HITL anyway), collateral classification >85%, malformed records 100% (must fail safely), adversarial 100% (must reject or quarantine).

"""evaluation/build_test_cases.py — deterministic 100-case generator.

WHAT: Produces evaluation/test_cases.json with 100 cases across
      8 categories (totals match the breakdown above).
WHY:  Deterministic + reproducible. Re-run the suite any time
      and the accuracy numbers are comparable.
"""

import json
import random
from pathlib import Path

random.seed(2026)
STATES = ["DE", "NY", "TX", "CA", "MA", "IL", "NV", "WA"]
SUFFIX_VARIANTS = ["LLC", "L.L.C.", "Inc", "Inc.", "Corp",
                   "Corporation", "Co", "Company"]
COLLATERAL_TYPES = ["all inventory and equipment",
                    "accounts receivable",
                    "specific equipment - 2024 Caterpillar D6T",
                    "all assets of the debtor",
                    "investment property and securities",
                    "chattel paper and instruments"]


def _filing(seq: int, debtor: str, state: str, kind: str = "UCC-1",
            collateral: str = "general business assets") -> dict:
    return {
        "filing_number": f"{state}-2026-{seq:06d}",
        "filing_type": kind,
        "debtor_name": debtor,
        "secured_party_name": f"Secured Capital {seq % 50}",
        "state_code": state,
        "filing_date": f"2026-0{(seq % 9) + 1}-15",
        "lapse_date": f"2031-0{(seq % 9) + 1}-15",
        "collateral_description": collateral,
    }


def generate(out_path: str = "evaluation/test_cases.json") -> dict:
    cases = []
    seq = 1

    # 30 clean filings
    for i in range(30):
        debtor = f"CleanCo {i:03d} LLC"
        state = STATES[i % len(STATES)]
        cases.append({"case_id": f"CL-{i+1:03d}",
                      "category": "clean_filing",
                      "input": _filing(seq, debtor, state),
                      "expected": {"resolved": True,
                                   "confidence_min": 0.95,
                                   "hitl_required": False}})
        seq += 1

    # 15 high-confidence entity resolution (suffix variants)
    base = "Acme Logistics"
    for i in range(15):
        suffix = SUFFIX_VARIANTS[i % len(SUFFIX_VARIANTS)]
        state = STATES[i % len(STATES)]
        cases.append({"case_id": f"HC-{i+1:03d}",
                      "category": "entity_resolution_high",
                      "input": _filing(seq, f"{base} {suffix}", state),
                      "expected": {"resolved_to": base,
                                   "confidence_min": 0.85,
                                   "hitl_required": False}})
        seq += 1

    # 15 low-confidence (ambiguous initials, partial overlap)
    ambiguous = [("J. Smith Holdings", "John Smith Holdings"),
                 ("ABC Industrial", "A.B.C. Industries"),
                 ("Riverside Trucking", "River Side Trucking Co"),
                 ("MetalWorks Inc", "Metal Works Incorporated"),
                 ("Tri-State Supply", "Tristate Supplies LLC")]
    for i in range(15):
        candidate, target = ambiguous[i % len(ambiguous)]
        state = STATES[i % len(STATES)]
        cases.append({"case_id": f"LC-{i+1:03d}",
                      "category": "entity_resolution_low",
                      "input": _filing(seq, candidate, state),
                      "expected": {"candidate_match": target,
                                   "confidence_max": 0.80,
                                   "hitl_required": True}})
        seq += 1

    # 10 collateral classification
    for i in range(10):
        coll = COLLATERAL_TYPES[i % len(COLLATERAL_TYPES)]
        cases.append({"case_id": f"CC-{i+1:03d}",
                      "category": "collateral_classification",
                      "input": _filing(seq, f"BorrowerCo {i}",
                                       "DE",
                                       collateral=coll),
                      "expected": {"classification_present": True,
                                   "min_categories": 1}})
        seq += 1

    # 5 duplicate detection
    for i in range(5):
        dup = _filing(seq, f"DupCo {i}", "NY")
        cases.append({"case_id": f"DUP-{i+1:03d}",
                      "category": "duplicate",
                      "input": [dup, {**dup,
                                      "filing_number":
                                      dup["filing_number"]
                                      + "-COPY"}],
                      "expected": {"deduplicated": True,
                                   "kept": 1}})
        seq += 1

    # 10 malformed records
    malformations = [
        {"filing_number": None},
        {"debtor_name": ""},
        {"state_code": "ZZ"},     # invalid state
        {"filing_date": "not-a-date"},
        {"lapse_date": "1900-01-01"},  # before filing_date
        {"collateral_description": None},
        {"filing_type": "UCC-99"},
        {"secured_party_name": "x" * 5000},  # huge field
        {"filing_number": "███\x00"},        # encoding errors
        {"debtor_name": "<script>alert(1)</script>"},
    ]
    for i, mal in enumerate(malformations):
        bad = {**_filing(seq, f"BadCo {i}", "TX"), **mal}
        cases.append({"case_id": f"MAL-{i+1:03d}",
                      "category": "malformed",
                      "input": bad,
                      "expected": {"quarantined": True,
                                   "error_logged": True}})
        seq += 1

    # 10 edge cases
    edge = [
        ("state_format_change", "DE",
         "format-version=v2 introduces new field"),
        ("vector_db_unavailable", "NY",
         "ChromaDB connection refused"),
        ("concurrent_batch_conflict", "TX",
         "two pipelines update same entity"),
        ("circuit_breaker_open", "CA",
         "Anthropic API returning 503s"),
        ("memory_quorum_loss", "MA",
         "episodic memory stale"),
        ("partial_batch_failure", "IL", "5/100 records crash"),
        ("schema_drift", "NV",
         "new column not in Bronze schema"),
        ("rate_limit_exceeded", "WA", "Haiku 429 throttle"),
        ("downstream_bq_quota", "DE",
         "BigQuery streaming quota hit"),
        ("clock_skew", "NY",
         "lapse_date in future relative to wall clock"),
    ]
    for i, (slug, state, desc) in enumerate(edge):
        cases.append({"case_id": f"EDGE-{i+1:03d}",
                      "category": "edge_case",
                      "input": {**_filing(seq, f"EdgeCo {i}",
                                          state),
                                "scenario": slug,
                                "description": desc},
                      "expected": {"graceful_degradation": True,
                                   "alert_emitted": True}})
        seq += 1

    # 5 adversarial
    adversarial_payloads = [
        "Robert'); DROP TABLE filings; --",
        "Acme LLC\x00admin",
        "<script>leak()</script> Holdings",
        ("A" * 1000) + " Industries",          # oversized
        "../../../../etc/passwd",
    ]
    for i, payload in enumerate(adversarial_payloads):
        cases.append({"case_id": f"ADV-{i+1:03d}",
                      "category": "adversarial",
                      "input": _filing(seq, payload, "DE"),
                      "expected": {"sanitized": True,
                                   "instruction_followed": False,
                                   "quarantined": True}})
        seq += 1

    suite = {"total_cases": len(cases), "cases": cases}
    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(suite, f, indent=2)
    return suite


if __name__ == "__main__":
    s = generate()
    counts: dict = {}
    for c in s["cases"]:
        counts[c["category"]] = counts.get(c["category"], 0) + 1
    print(f"Wrote {s['total_cases']} cases:")
    for k, v in sorted(counts.items()):
        print(f"  {k:<30} {v}")
"""tests/test_pipeline.py — runs the 100-case suite + scores.

WHAT: Loads evaluation/test_cases.json, runs each case through
      the orchestrator, scores it against the expected outcome,
      prints per-category accuracy, and asserts hard floors.
WHY:  This is the production quality gate. CI runs it on every
      merge to main; if any category drops below its floor, the
      build fails.
GOTCHA: Use pytest fixtures with `scope="session"` to load the
        suite once. The orchestrator is expensive to instantiate.
"""

import json
import os
from pathlib import Path

import pytest

from pipeline.orchestrator import Orchestrator

SUITE_PATH = Path("evaluation/test_cases.json")

CATEGORY_FLOORS = {
    "clean_filing":             0.95,
    "entity_resolution_high":   0.90,
    "entity_resolution_low":    0.75,
    "collateral_classification": 0.85,
    "duplicate":                1.00,
    "malformed":                1.00,
    "edge_case":                0.80,
    "adversarial":              1.00,
}


@pytest.fixture(scope="session")
def suite():
    if not SUITE_PATH.exists():
        pytest.skip(f"Run build_test_cases.py first")
    with open(SUITE_PATH) as f:
        return json.load(f)


@pytest.fixture(scope="session")
def orch():
    return Orchestrator()


def _score_case(actual: dict, expected: dict) -> float:
    """Return 1.0 if all asserted keys match; partial otherwise."""
    if not expected:
        return 1.0
    hits = 0
    total = 0
    for key, value in expected.items():
        total += 1
        if key.endswith("_min"):
            real = key[:-4]
            if (real in actual
                and isinstance(actual[real], (int, float))
                and actual[real] >= value):
                hits += 1
        elif key.endswith("_max"):
            real = key[:-4]
            if (real in actual
                and isinstance(actual[real], (int, float))
                and actual[real] <= value):
                hits += 1
        elif key in actual and actual[key] == value:
            hits += 1
    return hits / total if total else 0.0


def test_suite_loads(suite):
    assert suite["total_cases"] == len(suite["cases"])
    assert suite["total_cases"] >= 100


@pytest.mark.parametrize("category", list(CATEGORY_FLOORS.keys()))
def test_category_floor(category, suite, orch):
    cases = [c for c in suite["cases"] if c["category"] == category]
    if not cases:
        pytest.skip(f"no cases for {category}")
    passed = 0
    for case in cases:
        try:
            actual = orch.run(case["input"])
            score = _score_case(actual, case["expected"])
            if score >= 0.9:
                passed += 1
        except Exception:
            pass  # counts as fail
    accuracy = passed / len(cases)
    floor = CATEGORY_FLOORS[category]
    assert accuracy >= floor, (
        f"{category}: {accuracy:.0%} below floor {floor:.0%} "
        f"({passed}/{len(cases)})")


def test_overall_accuracy(suite, orch):
    """Whole-suite accuracy must be >=85%."""
    passed = 0
    for case in suite["cases"]:
        try:
            actual = orch.run(case["input"])
            if _score_case(actual, case["expected"]) >= 0.9:
                passed += 1
        except Exception:
            pass
    overall = passed / len(suite["cases"])
    assert overall >= 0.85, f"overall {overall:.0%} < 85%"

Run the harness: python evaluation/build_test_cases.py && pytest tests/test_pipeline.py -v

Expected output (truncated):

tests/test_pipeline.py::test_suite_loads PASSED
tests/test_pipeline.py::test_category_floor[clean_filing] PASSED
tests/test_pipeline.py::test_category_floor[entity_resolution_high] PASSED
tests/test_pipeline.py::test_category_floor[entity_resolution_low] PASSED
tests/test_pipeline.py::test_category_floor[collateral_classification] PASSED
tests/test_pipeline.py::test_category_floor[duplicate] PASSED
tests/test_pipeline.py::test_category_floor[malformed] PASSED
tests/test_pipeline.py::test_category_floor[edge_case] PASSED
tests/test_pipeline.py::test_category_floor[adversarial] PASSED
tests/test_pipeline.py::test_overall_accuracy PASSED
=============== 10 passed in 12.3s ===============
🎯 What Just Happened?

You closed the loop on production quality. build_test_cases.py generates 100 deterministic cases across 8 categories. test_pipeline.py runs each one through the orchestrator and asserts category-specific floors (clean filings >95%, adversarial 100%, malformed 100%). The suite is the gate that prevents regressions: any merge that drops a category below floor fails CI before it touches production.

Testing Guide

TypeScenarioExpected Behavior
Happy100 clean filings batch< 30s, all resolved, Haiku handles 70%, cost < $1.00
HappyEpisodic memory boosts entity confidenceSimilar past case found, confidence raised above 80%, no steward needed
HappyProcedural rule fires on matching registered agentEntity resolved with zero LLM cost, deterministic rule match
HappySteward correction stored as new episodeEpisode persisted, future similar cases will benefit
HappyFull pipeline traced end-to-endDashboard shows all 8 panels with real-time metrics
Edge15% malformed records in batchCircuit breaker trips at 10%, quarantines batch, processes clean records separately
EdgeChromaDB unavailableEpisodic memory degrades to keyword matching, procedural rules still fire
EdgeState format changes mid-batchIngestion agent detects schema mismatch, flags for engineering
AdversarialShell company entity obfuscationSystem detects unusual patterns, flags for investigation
Adversarial50,000 filing batch (5x max)System partitions into sub-batches, processes sequentially

Compliance & Regulatory Notes

Production Compliance Requirements

PII handling: Filing data may contain individual names and addresses. The Reporting Agent must redact PII before generating public-facing reports. Episodic memory must not store PII — store entity IDs and resolution patterns, not raw personal data.

FCRA: If the platform's output feeds credit decisions, accuracy requirements under the Fair Credit Reporting Act apply. The 100-case evaluation suite and quality scorecards are compliance artifacts — keep them auditable.

Model audit trail: Every model routing decision, entity resolution, and steward override must be logged with timestamps. This enables regulatory audits and dispute resolution. The tracing infrastructure serves double duty: debugging AND compliance.

Data retention: Episodic memory retains resolution decisions for 730 days (2 years). Align with your organization's data retention policy and applicable state regulations.

Verify Everything Works

Run the complete production pipeline end-to-end with a single command:

python pipeline/orchestrator.py

Expected final output:

============================================================
PRODUCTION UCC PIPELINE — RUN COMPLETE
============================================================
Status: complete
Total filings: 15
Elapsed: 42.31ms

  [BRONZE] {"status": "complete", "valid": 10, "quarantined": 5}
  [SILVER] {"records": 10}
  [GOLD] {"entities": 8, "cost": 0.0036}

Tracing: {"total": 3, "ok": 3, "errors": 0, "avg_duration_ms": 14.1}
Performance Metrics
  • Bronze layer: 15 filings ingested, 10 valid (67%), 5 quarantined (33%)
  • Silver layer: 10 records normalized with standardized dates, names, and filing types
  • Gold layer: 8 unique entities resolved, risk profiles generated
  • Tracing: 3 pipeline stages traced, 0 errors, average 14ms per stage
  • Cost: $0.0036 total for 15 filings = $0.00024 per filing (well under $0.02 target)
  • Circuit breaker: Did not trip (error rate below 10% threshold for batch size)
Congratulations!

You have built a production-grade UCC data intelligence platform with all six production pillars: model routing, multi-layer memory, data quality gates, circuit breakers, distributed tracing, and a Medallion Architecture pipeline. The pipeline processes raw filings through Bronze (validation) → Silver (normalization) → Gold (entity resolution + risk profiles) with full observability at every stage.

To take this further, connect real Anthropic API calls (replace the mock resolution in gold_layer.py), deploy with Docker using the Dockerfile in the Deployment section, and build the 100-case evaluation harness to prove production readiness.

Troubleshooting Guide

Common Errors & Fixes

ModuleNotFoundError: No module named 'anthropic'
Your virtual environment is not activated. Run source venv/bin/activate (macOS/Linux) or venv\Scripts\activate (Windows), then pip install anthropic.

ModuleNotFoundError: No module named 'pipeline'
Python cannot find the package. Create __init__.py files in every subdirectory:
touch pipeline/__init__.py memory/__init__.py observability/__init__.py guardrails/__init__.py routing/__init__.py

AuthenticationError: Invalid API key
Your ANTHROPIC_API_KEY environment variable is not set or is incorrect. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify. The key should start with sk-ant-.

ImportError: cannot import name 'CircuitBreaker'
The file guardrails/circuit_breaker.py is missing or has a syntax error. Ensure you completed Step 4 and saved the file.

Episodic memory returns 0 results for known entities
The mock embedding uses character frequency vectors which have lower resolution than real embeddings. Try lowering EPISODIC_SIMILARITY_THRESHOLD in config.py from 0.80 to 0.70.

Circuit breaker trips unexpectedly on small batches
The default CIRCUIT_BREAKER_WINDOW_SIZE is 100. If your batch is smaller than the window, the breaker only checks after all records are processed. For testing with small batches, set window_size=5.

Gold layer creates duplicate entities for the same debtor
This means episodic memory is not matching the name variants. Check that you are seeding episodic memory before running the gold layer, or lower the similarity threshold.

Docker build fails with "pip install" errors
Create a requirements.txt file with: anthropic chromadb pydantic fastapi uvicorn celery redis httpx pytest (one per line). Ensure it is in the project root directory.

Agent SDK Port [OPTIONAL STRETCH]

The pipeline you built uses a manual tool-use loop — messages.create, stop_reason == "tool_use", accumulating tool_result blocks, and short-circuiting on end_turn. The Claude Agent SDK abstracts that loop. This stretch goal ports the Bronze→Silver transformation agent (the LLM-driven format normalizer, not the deterministic schema validator) so you can compare LOC and behavior against the manual implementation in your own code.

Why port to the SDK?

The SDK trades fine-grained control for less code. Use it when your tools fit the MCP shape, async execution is acceptable, and the circuit breaker / retry logic can wrap the whole query() call rather than every iteration. Keep the manual loop when you need synchronous flow, mid-loop guardrails (e.g. PII redaction checked between tool calls), or per-tool retry semantics on flaky GCS / BigQuery operations.

Install

pip install "claude-agent-sdk>=0.1.0" anyio

Port the Bronze→Silver Transformation Agent

Create sdk_port.py. The same transformation tools (detect_state_format, parse_filing_record, resolve_entity, flag_quality_issue, write_silver_record) are wrapped with @tool and bundled into an in-process MCP server.

"""sdk_port.py — Port the Bronze→Silver transformation agent to the Agent SDK.

The manual tool-use loop is ~80 lines per agent. The SDK absorbs the loop,
MCP-shaped tool dispatch, and message accumulation. The circuit breaker
and tracer become wrappers around query() instead of hooks inside the loop.
"""
import anyio
import json
from claude_agent_sdk import (
    query,
    ClaudeAgentOptions,
    AssistantMessage,
    tool,
    create_sdk_mcp_server,
)
from agents.transform_tools import (
    detect_state_format,
    parse_filing_record,
    resolve_entity,
    flag_quality_issue,
    write_silver_record,
)
from routing.model_router import ModelRouter
from observability.tracer import Tracer
from guardrails.circuit_breaker import CircuitBreaker


# 1. Wrap each tool function with @tool. The schema mirrors the manual
#    tool definition; the return shape follows the MCP standard
#    ({"content": [{"type": "text", "text": ...}]}).

@tool(
    "detect_state_format",
    "Detect which state's UCC filing format this raw record matches.",
    {"raw_record": str, "source_state_hint": str},
)
async def sdk_detect(args):
    result = detect_state_format(**args)
    return {"content": [{"type": "text", "text": json.dumps(result)}]}


@tool(
    "parse_filing_record",
    "Parse a raw UCC filing into the canonical Silver schema.",
    {"raw_record": str, "format": str},
)
async def sdk_parse(args):
    result = parse_filing_record(**args)
    return {"content": [{"type": "text", "text": json.dumps(result)}]}


@tool(
    "resolve_entity",
    "Resolve debtor/secured-party names against episodic memory of past entities.",
    {"name": str, "address": str, "tax_id": str},
)
async def sdk_resolve(args):
    result = resolve_entity(**args)
    return {"content": [{"type": "text", "text": json.dumps(result)}]}


@tool(
    "flag_quality_issue",
    "Flag a quality issue (missing field, malformed value, suspicious duplicate).",
    {"filing_id": str, "issue_type": str, "severity": str, "details": dict},
)
async def sdk_flag(args):
    result = flag_quality_issue(**args)
    return {"content": [{"type": "text", "text": json.dumps(result)}]}


@tool(
    "write_silver_record",
    "Persist the canonical record to the Silver layer (BigQuery).",
    {"silver_record": dict, "lineage": dict},
)
async def sdk_write(args):
    result = write_silver_record(**args)
    return {"content": [{"type": "text", "text": json.dumps(result)}]}


# 2. Bundle tools into an in-process MCP server. No subprocess — runs in
#    the same Python process as the agent.

transform_server = create_sdk_mcp_server(
    name="transform_tools",
    version="1.0.0",
    tools=[sdk_detect, sdk_parse, sdk_resolve, sdk_flag, sdk_write],
)


# 3. Run the agent. The SDK handles the tool-use loop, stop_reason
#    checks, and message threading. We only handle: routing, tracing,
#    circuit breaker, and the prompt itself.

async def run_transform_agent_sdk(
    raw_record: str,
    state_hint: str,
    router: ModelRouter,
    tracer: Tracer,
    breaker: CircuitBreaker,
) -> str:
    if breaker.is_tripped():
        raise RuntimeError("Circuit breaker tripped — refusing API call.")

    # Routine known formats go to Haiku; unknown formats escalate to Sonnet.
    model = router.select(
        task_type="format_normalization",
        complexity_score=0.3 if state_hint else 0.7,
    )

    options = ClaudeAgentOptions(
        system_prompt=(
            "You are the Bronze→Silver transformation agent. For each raw "
            "filing: 1) detect the state format, 2) parse into the canonical "
            "schema, 3) resolve debtor/secured-party entities against memory, "
            "4) flag any quality issues, 5) write the Silver record. "
            "Refuse to write if any required field (filing_number, debtor.name, "
            "secured_party.name, state, filing_date) is missing — flag instead."
        ),
        mcp_servers={"transform": transform_server},
        allowed_tools=[
            "mcp__transform__detect_state_format",
            "mcp__transform__parse_filing_record",
            "mcp__transform__resolve_entity",
            "mcp__transform__flag_quality_issue",
            "mcp__transform__write_silver_record",
        ],
        max_turns=12,
        model=model,
    )

    final_text = ""
    prompt = (
        f"Transform this raw UCC filing record into the Silver schema. "
        f"State hint: {state_hint or 'unknown'}.\n\nRecord:\n{raw_record}"
    )

    with tracer.span("transform_agent.sdk"):
        try:
            async for msg in query(prompt=prompt, options=options):
                if isinstance(msg, AssistantMessage):
                    for block in msg.content:
                        if hasattr(block, "text"):
                            final_text = block.text
            breaker.record_success()
        except Exception:
            breaker.record_failure()
            raise

    return final_text


if __name__ == "__main__":
    # anyio.run wires the async function into a sync entry point so this
    # file can be invoked the same way as the manual pipeline.
    sample = '{"filing_no": "2024-0098765", "debtor_nm": "ACME LOGISTICS LLC"}'
    out = anyio.run(
        run_transform_agent_sdk,
        sample,
        "DE",
        ModelRouter(),
        Tracer(),
        CircuitBreaker(threshold=3),
    )
    print(out)

LOC & Behavior Comparison

Aspect Manual tool-use loop Agent SDK (this stretch)
Lines per agent~80~30 (excluding tool wrappers)
Tool dispatchYou write the if block.name == ... dispatcherSDK routes by MCP tool name
stop_reason / tool-use loopYou implementSDK handles
Execution modelSync (or your choice)Async only
Circuit breaker hookInline, per iterationWrapper, per query() call
Tool result formatPlain dict you marshalMCP {"content": [...]}
Mid-loop retry on tool failureEasy — you control the loopHarder — wrap the whole query
Visibility into raw messagesFull messages arrayAssistantMessage event stream
Streaming partial responsesSet stream=True on messages.createNative (the async for)
BigQuery / GCS retry on transient failureEasy — retry the single tool resultWrap each tool with retry decorator
Decision Heuristic

Use the SDK when tools fit the MCP shape, the circuit breaker can guard the whole query, and async execution is acceptable. Keep the manual loop when you need synchronous control flow, per-tool-call retries (important here because BigQuery and GCS calls are flakier than Claude API calls), mid-loop PII redaction guardrails, or full audit-log visibility into the raw messages array. The Bronze→Silver pipeline keeps the manual loop because each filing is processed in a Cloud Run worker that already has Python-thread parallelism and synchronous tracing — introducing async would force a rewrite of the worker harness, and the LOC savings (~50 lines per agent) don't justify it for a five-tool agent.

Knowledge Check

Test your understanding of the production UCC platform. Select the best answer for each question.

Q1: Which Claude model should handle format validation tasks, and why?

Q2: What is procedural memory in this UCC pipeline?

Q3: In the Medallion Architecture, what does the Gold layer contain?

Q4: The pipeline processes a batch from a new state with an unknown CSV format. Which model handles format detection?

Q5: Why does the evaluation harness include "malformed CSV injection" as an adversarial test?

References & Resources