Capstone 5 — Domain C: Production UCC Data Intelligence Platform
The culminating project: build a production-grade UCC data pipeline with multi-layer memory, model routing, full observability, cost optimization, and a 100-case evaluation harness.
Prerequisites
Complete these modules before starting Capstone 5. Each one teaches a production capability you will integrate here:
- M12 — ReAct Agents: The reasoning loop pattern used by every agent in this pipeline
- M14 — Multi-Agent Systems: Agent-to-agent handoffs and orchestration patterns (Capstone 4 foundation)
- M17 — HITL & Guardrails: Human-in-the-loop routing, confidence thresholds, and circuit breakers
- M18 — Evaluation: Building test suites, automated scoring, and accuracy measurement
- M19 — Tracing: Distributed tracing with spans, trace collectors, and structured logging
- M20 — Monitoring: Dashboards, alerting, and real-time metrics for production systems
- M21 — Deployment: Containerization, queue-based processing, and API design
- M22 — Cost Optimization: Model routing, prompt caching, and per-request cost tracking
You should also have completed Capstone 4 — Domain C (multi-agent UCC pipeline), as this capstone extends that project with production capabilities.
Project Brief
You have built a data pipeline (Capstone 4) that processes UCC filings through four agents. It works — but it does not learn, it does not optimize costs, it is not observable, and it cannot handle production-scale volume. Moving from "it works" to "it runs in production" requires six additional engineering capabilities that no course module teaches in isolation.
The pain of deploying without these capabilities is predictable: costs spiral because every entity resolution uses the most expensive model. Entity resolution accuracy plateaus because the system does not learn from data steward corrections. Bugs are invisible because there is no tracing. And when the pipeline fails at 3 AM on a batch of 10,000 filings, nobody knows what happened or where it failed.
This capstone transforms your pipeline into a production platform. It learns from past corrections (multi-layer memory), routes simple tasks to cheap models (cost optimization), traces every decision for debugging (observability), handles 10,000+ filings per batch (deployment), and validates itself continuously (evaluation). This is the difference between a demo and a system someone depends on.
- Volume: Process 10,000 filings per batch in < 30 minutes
- Accuracy: Entity resolution accuracy > 90%
- Cost: < $0.02 per filing (vs ~$0.08 with single-model approach)
- Observability: Every LLM call, tool invocation, and agent handoff traced
- Learning: Data steward corrections improve future resolution confidence
- Evaluation: 100-case test suite with automated scoring
Environment Setup
A production-grade UCC data intelligence platform with multi-layer memory, model routing, full observability, cost optimization, and a 100-case evaluation harness. Time estimate: 4–6 hours (difficulty: ★★★★★).
System Requirements
- Python 3.10+ (check with
python --version) - pip (check with
pip --version) - Docker (optional, for deployment steps — check with
docker --version) - An Anthropic API key with access to Haiku, Sonnet, and Opus models
Install Dependencies
Run this single command to create your project directory and install everything:
mkdir capstone-5-production-ucc && cd capstone-5-production-ucc
python -m venv venv
# Activate the virtual environment
# On macOS/Linux:
source venv/bin/activate
# On Windows:
# venv\Scripts\activate
pip install anthropic chromadb pydantic fastapi uvicorn celery redis httpx pytest
Set Your API Key
export ANTHROPIC_API_KEY=your-api-key-here
# To make it permanent, add to ~/.bashrc or ~/.zshrc:
echo 'export ANTHROPIC_API_KEY=your-api-key-here' >> ~/.bashrc
# Command Prompt:
set ANTHROPIC_API_KEY=your-api-key-here
# PowerShell:
$env:ANTHROPIC_API_KEY = "your-api-key-here"
# To make it permanent, use System Environment Variables in Settings
Verify your setup by running: python -c "import anthropic; print(anthropic.__version__)". You should see a version number like 0.39.0 or higher. If you get a ModuleNotFoundError, ensure your virtual environment is activated.
File Structure
Here is every file you will create in this capstone. Each file has a specific role in the production pipeline:
capstone-5-production-ucc/
├── pipeline/
│ ├── orchestrator.py # Main production orchestrator — coordinates all agents
│ ├── bronze_layer.py # Raw data ingestion — reads CSV/XML, validates format
│ ├── silver_layer.py # Cleaned/standardized data — schema normalization
│ └── gold_layer.py # Business-ready analytics — risk profiles, aggregations
├── memory/
│ ├── episodic.py # Episodic memory — past entity resolutions
│ ├── procedural.py # Procedural memory — learned rules from steward decisions
│ └── working.py # Working memory — current batch state
├── observability/
│ ├── tracer.py # Tracing and span collection
│ └── dashboard.py # Monitoring dashboard — 8 metric panels
├── guardrails/
│ ├── quality_gates.py # Data quality checks — completeness, consistency
│ └── circuit_breaker.py # Circuit breaker pattern — auto-halt on failures
├── routing/
│ └── model_router.py # Model routing — task-to-model mapping + cost tracking
├── mock_data.py # 15 realistic UCC filings with data quality issues
├── config.py # Configuration management — routing, memory, deployment
├── test_pipeline.py # End-to-end tests — 100-case evaluation harness
└── requirements.txt # All Python dependencies
Mock UCC Data
This mock data file contains 15 realistic UCC filings. Some filings have data quality issues (missing fields, inconsistent formats) that your quality gates must detect. Related filings (amendments, continuations, terminations) reference original filing numbers so you can test the full filing lifecycle.
"""mock_data.py — 15 realistic UCC filings for testing the production pipeline.
Includes clean filings, data quality issues, and related filings
(amendments, continuations, terminations) for lifecycle testing.
"""
MOCK_UCC_FILINGS = [
# --- CLEAN FILINGS (baseline accuracy tests) ---
{
"filing_number": "NY-2024-0012345",
"debtor_name": "Acme Manufacturing LLC",
"secured_party_name": "JPMorgan Chase Bank, N.A.",
"state_code": "NY",
"collateral_description": "All inventory, equipment, and accounts receivable now owned or hereafter acquired",
"filing_type": "UCC-1",
"filing_date": "2024-03-15",
"expiration_date": "2029-03-15",
},
{
"filing_number": "CA-2024-0098765",
"debtor_name": "Pacific Coast Distribution Inc.",
"secured_party_name": "Wells Fargo Bank, N.A.",
"state_code": "CA",
"collateral_description": "All assets of the debtor, including but not limited to inventory, chattel paper, and general intangibles",
"filing_type": "UCC-1",
"filing_date": "2024-01-22",
"expiration_date": "2029-01-22",
},
{
"filing_number": "TX-2024-0054321",
"debtor_name": "Lone Star Energy Partners LP",
"secured_party_name": "Bank of America, N.A.",
"state_code": "TX",
"collateral_description": "All oil and gas extraction equipment, pipeline fixtures, and related proceeds",
"filing_type": "UCC-1",
"filing_date": "2024-06-10",
"expiration_date": "2029-06-10",
},
{
"filing_number": "FL-2023-0033221",
"debtor_name": "Sunshine Hospitality Group Inc.",
"secured_party_name": "Truist Financial Corporation",
"state_code": "FL",
"collateral_description": "Furniture, fixtures, and equipment located at 1200 Ocean Drive, Miami FL 33139",
"filing_type": "UCC-1",
"filing_date": "2023-11-05",
"expiration_date": "2028-11-05",
},
{
"filing_number": "IL-2024-0077889",
"debtor_name": "Midwest Agricultural Supply Co.",
"secured_party_name": "Farm Credit Services of Illinois",
"state_code": "IL",
"collateral_description": "All farm products, crops, livestock, and farm equipment",
"filing_type": "UCC-1",
"filing_date": "2024-02-28",
"expiration_date": "2029-02-28",
},
# --- RELATED FILINGS (amendment lifecycle) ---
{
"filing_number": "NY-2024-0012346",
"debtor_name": "Acme Manufacturing LLC",
"secured_party_name": "JPMorgan Chase Bank, N.A.",
"state_code": "NY",
"collateral_description": "Amendment to add: all intellectual property, patents, and trademarks",
"filing_type": "UCC-3 amendment",
"filing_date": "2024-07-20",
"expiration_date": "2029-03-15",
"original_filing_number": "NY-2024-0012345",
},
{
"filing_number": "CA-2024-0098800",
"debtor_name": "Pacific Coast Distribution Inc.",
"secured_party_name": "Wells Fargo Bank, N.A.",
"state_code": "CA",
"collateral_description": "Continuation of original filing",
"filing_type": "UCC-3 continuation",
"filing_date": "2024-09-15",
"expiration_date": "2029-09-15",
"original_filing_number": "CA-2024-0098765",
},
{
"filing_number": "FL-2024-0041100",
"debtor_name": "Sunshine Hospitality Group Inc.",
"secured_party_name": "Truist Financial Corporation",
"state_code": "FL",
"collateral_description": "Termination of all security interests",
"filing_type": "UCC-3 termination",
"filing_date": "2024-08-01",
"expiration_date": None,
"original_filing_number": "FL-2023-0033221",
},
# --- ENTITY RESOLUTION CHALLENGES (name variants) ---
{
"filing_number": "NY-2024-0015500",
"debtor_name": "ACME MFG LLC",
"secured_party_name": "Citibank, N.A.",
"state_code": "NY",
"collateral_description": "All equipment and machinery",
"filing_type": "UCC-1",
"filing_date": "2024-04-10",
"expiration_date": "2029-04-10",
},
{
"filing_number": "DE-2024-0022111",
"debtor_name": "Acme Manufacturing, L.L.C.",
"secured_party_name": "Silicon Valley Bank",
"state_code": "DE",
"collateral_description": "All assets",
"filing_type": "UCC-1",
"filing_date": "2024-05-18",
"expiration_date": "2029-05-18",
},
# --- DATA QUALITY ISSUES (for quality gate testing) ---
{
"filing_number": "OH-2024-0066000",
"debtor_name": "",
"secured_party_name": "KeyBank National Association",
"state_code": "OH",
"collateral_description": "All inventory and accounts receivable",
"filing_type": "UCC-1",
"filing_date": "2024-03-01",
"expiration_date": "2029-03-01",
},
{
"filing_number": "PA-2024-0088999",
"debtor_name": "Liberty Manufacturing Corp",
"secured_party_name": "",
"state_code": "PA",
"collateral_description": "All assets of the debtor",
"filing_type": "UCC-1",
"filing_date": "2024-07-15",
"expiration_date": "2029-07-15",
},
{
"filing_number": "WA-2024-007",
"debtor_name": "Cascade Tech Solutions Inc.",
"secured_party_name": "Columbia State Bank",
"state_code": "WA",
"collateral_description": "All accounts, inventory, equipment",
"filing_type": "UCC-1",
"filing_date": "2024/08/30",
"expiration_date": "2029/08/30",
},
{
"filing_number": "GA-2024-0044556",
"debtor_name": "Peachtree Holdings LLC",
"secured_party_name": "Synovus Financial Corp",
"state_code": "GA",
"collateral_description": "",
"filing_type": "UCC-1",
"filing_date": "2024-09-12",
"expiration_date": "2024-09-12",
},
{
"filing_number": "NV-2024-0055667",
"debtor_name": "Silver State Gaming Corp\t",
"secured_party_name": "Nevada State\nBank",
"state_code": "NV",
"collateral_description": "Gaming equipment, slot machines, and related fixtures at 500 Las Vegas Blvd",
"filing_type": "UCC1",
"filing_date": "2024-10-01",
"expiration_date": "2029-10-01",
},
]
# --- EXPECTED QUALITY ISSUES (for testing quality gates) ---
EXPECTED_QUALITY_ISSUES = {
"OH-2024-0066000": ["missing_debtor_name"],
"PA-2024-0088999": ["missing_secured_party_name"],
"WA-2024-007": ["non_standard_filing_number", "non_standard_date_format"],
"GA-2024-0044556": ["missing_collateral_description", "expiration_equals_filing_date"],
"NV-2024-0055667": ["whitespace_in_names", "non_standard_filing_type"],
}
# --- ENTITY RESOLUTION GROUND TRUTH ---
ENTITY_CLUSTERS = {
"ENT-00001": {
"canonical_name": "Acme Manufacturing LLC",
"variants": [
"Acme Manufacturing LLC",
"ACME MFG LLC",
"Acme Manufacturing, L.L.C.",
],
"filing_numbers": ["NY-2024-0012345", "NY-2024-0012346", "NY-2024-0015500", "DE-2024-0022111"],
},
"ENT-00002": {
"canonical_name": "Pacific Coast Distribution Inc.",
"variants": ["Pacific Coast Distribution Inc."],
"filing_numbers": ["CA-2024-0098765", "CA-2024-0098800"],
},
"ENT-00003": {
"canonical_name": "Sunshine Hospitality Group Inc.",
"variants": ["Sunshine Hospitality Group Inc."],
"filing_numbers": ["FL-2023-0033221", "FL-2024-0041100"],
},
}
if __name__ == "__main__":
print(f"Total filings: {len(MOCK_UCC_FILINGS)}")
print(f"Expected quality issues: {len(EXPECTED_QUALITY_ISSUES)} filings")
print(f"Entity clusters: {len(ENTITY_CLUSTERS)} entities")
for fid, issues in EXPECTED_QUALITY_ISSUES.items():
print(f" {fid}: {', '.join(issues)}")
/**
* mock_data.ts — 15 realistic UCC filings for testing the production pipeline.
*/
interface UCCFiling {
filing_number: string;
debtor_name: string;
secured_party_name: string;
state_code: string;
collateral_description: string;
filing_type: string;
filing_date: string;
expiration_date: string | null;
original_filing_number?: string;
}
export const MOCK_UCC_FILINGS: UCCFiling[] = [
// --- CLEAN FILINGS ---
{
filing_number: "NY-2024-0012345",
debtor_name: "Acme Manufacturing LLC",
secured_party_name: "JPMorgan Chase Bank, N.A.",
state_code: "NY",
collateral_description: "All inventory, equipment, and accounts receivable now owned or hereafter acquired",
filing_type: "UCC-1",
filing_date: "2024-03-15",
expiration_date: "2029-03-15",
},
{
filing_number: "CA-2024-0098765",
debtor_name: "Pacific Coast Distribution Inc.",
secured_party_name: "Wells Fargo Bank, N.A.",
state_code: "CA",
collateral_description: "All assets of the debtor, including but not limited to inventory, chattel paper, and general intangibles",
filing_type: "UCC-1",
filing_date: "2024-01-22",
expiration_date: "2029-01-22",
},
// ... (remaining 13 filings follow same structure as Python version)
];
export const EXPECTED_QUALITY_ISSUES: Record<string, string[]> = {
"OH-2024-0066000": ["missing_debtor_name"],
"PA-2024-0088999": ["missing_secured_party"],
"WA-2024-007": ["non_standard_filing_number", "non_standard_date_format"],
"GA-2024-0044556": ["missing_collateral_description", "expiration_equals_filing_date"],
"NV-2024-0055667": ["whitespace_in_names", "non_standard_filing_type"],
};
export const ENTITY_CLUSTERS: Record<string, { canonical_name: string; variants: string[]; filing_numbers: string[] }> = {
"ENT-00001": {
canonical_name: "Acme Manufacturing LLC",
variants: ["Acme Manufacturing LLC", "ACME MFG LLC", "Acme Manufacturing, L.L.C."],
filing_numbers: ["NY-2024-0012345", "NY-2024-0012346", "NY-2024-0015500", "DE-2024-0022111"],
},
};
OH-2024-0066000: Empty debtor name — your quality gate should reject or flag this filing.
PA-2024-0088999: Empty secured party — cannot determine who holds the lien.
WA-2024-007: Filing number too short (should be 7 digits after state-year prefix) and dates use slashes instead of hyphens.
GA-2024-0044556: Empty collateral description and expiration date equals filing date (likely data entry error).
NV-2024-0055667: Tab character in debtor name, newline in secured party name, and filing type "UCC1" instead of "UCC-1".
Six Production Pillars
System Configuration
{
"model_routing": {
"collateral_classification": "claude-opus-4-7",
"entity_resolution": "claude-sonnet-4-6",
"complex_entity_disambiguation": "claude-opus-4-7",
"report_generation": "claude-sonnet-4-6",
"routing_threshold": {
"simple_classification": 0.90,
"ambiguous_entity_candidate_count": 3
}
},
"cost_targets": {
"haiku_per_1k_input": 0.001,
"sonnet_per_1k_input": 0.003,
"opus_per_1k_input": 0.015,
"target_per_filing": 0.02
},
"deployment": {
"runtime": "Docker",
"api_framework": "FastAPI",
"queue": "Celery + Redis",
"vector_db": "ChromaDB",
"max_batch_size": 10000,
"max_concurrent_workers": 10
}
}
{
"memory": {
"working": {
"type": "in_process",
"max_context_tokens": 4000,
"purpose": "Current batch state, active entity being resolved"
},
"episodic": {
"type": "vector_store",
"collection": "resolution_episodes",
"retention_days": 730,
"purpose": "Past entity resolutions + steward corrections",
"example": {
"episode_id": "RE-2024-11234",
"debtor_name_raw": "ABC MANUFACTURING CO",
"resolved_to": "ENT-00567",
"steward_action": "match_to_ENT-00567",
"lesson": "Same registered agent + address = match despite suffix"
}
},
"procedural": {
"type": "rule_store",
"purpose": "Learned rules from repeated steward decisions",
"example": {
"rule_id": "ER-001",
"confidence": 0.92,
"rule": "When entities share registered agent AND address, match with confidence > 85%",
"created_from": ["RE-2024-11234", "RE-2024-11301", "RE-2024-11455"]
}
}
}
}
Cost Optimization: Model Routing
Not every task needs Claude Opus. Format validation and quality checks are handled by Haiku at $1.00/M input tokens. Standard entity resolution and risk profiling go to Sonnet at $3.00/M input tokens. Collateral classification — which requires nuanced legal reasoning about UCC categories — routes to Opus at $15.00/M input tokens. This tiered approach cuts the average cost per filing from ~$0.08 to ~$0.04.
A 10,000-filing batch with single-model (Sonnet) costs ~$800. With model routing: 70% of tasks go to Haiku ($70), 25% to Sonnet ($200), 5% to Opus ($75). Total: ~$345 — 57% cost reduction. Over 12 monthly batches across 50 states, that is ~$273,000 in annual savings.
Multi-Layer Memory
The system uses three memory tiers that work together to improve over time:
- Working MemoryIn-process state for the current batch and entity being resolved. Limited to 4,000 tokens. Cleared after each batch completes. Think of it as the agent's "short-term memory" — what it is currently working on.: Current batch state, active entity being resolved. In-process, cleared per batch.
- Episodic MemoryA vector database of past entity resolution decisions, including steward corrections. When the system encounters a new entity, it searches episodic memory for similar past cases. If a similar case was resolved with high confidence, the system can boost its own confidence without needing a steward.: Vector store of past resolutions + steward corrections. When the system encounters "ABC Manufacturing Co", it searches for similar past cases. If a steward previously matched this to ENT-00567, the system retrieves that lesson.
- Procedural MemoryRules learned from patterns in steward decisions. When multiple episodes share the same pattern (e.g., "same registered agent + same address = match"), the system extracts a procedural rule. These rules can resolve entities without any LLM call — pure deterministic logic.: Rules distilled from repeated steward decisions. After 3+ episodes where "same registered agent + same address = match", the system creates a procedural rule that fires deterministically — no LLM call needed.
Without memory, the system resolves the same entity variant the same (expensive, slow) way every time. With memory: the first time "ABC Manufacturing Co" appears, it costs $0.08 (Sonnet resolution + steward review). The second time, episodic memory boosts confidence above 80%, saving the steward review ($2–5 per review). By the tenth time, a procedural rule fires and resolves it with zero LLM cost. Over 100,000 entities per year, this compounds into massive savings in both cost and steward time.
Observability Dashboard
Step-by-Step Build Guide
A complete production UCC data pipeline with 6 capabilities: model routing, multi-layer memory, data quality gates, circuit breakers, distributed tracing, and a 100-case evaluation harness. Time: 4–6 hours. Difficulty: ★★★★★.
Step 1: Create the Configuration File
What & Why: Every production system needs centralized configuration. This file holds model routing rules, memory settings, quality thresholds, and cost targets. By keeping configuration separate from code, you can tune the system without redeploying.
Create a new file called config.py:
"""config.py — Centralized configuration for the production UCC pipeline."""
# --- Model Routing ---
MODEL_ROUTING = {
"format_validation": {"model": "claude-haiku-4-5-20251001", "max_tokens": 1024},
"quality_check": {"model": "claude-haiku-4-5-20251001", "max_tokens": 2048},
"entity_resolution": {"model": "claude-sonnet-4-6", "max_tokens": 4096},
"anomaly_analysis": {"model": "claude-sonnet-4-6", "max_tokens": 4096},
"risk_profile": {"model": "claude-sonnet-4-6", "max_tokens": 4096},
"collateral_classification": {"model": "claude-opus-4-7", "max_tokens": 4096},
}
MODEL_COSTS_PER_1M = {
"claude-haiku-4-5-20251001": {"input": 1.00, "output": 5.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-opus-4-7": {"input": 15.00, "output": 75.00},
}
# --- Memory ---
EPISODIC_SIMILARITY_THRESHOLD = 0.80
PROCEDURAL_RULE_MIN_EPISODES = 3
PROCEDURAL_RULE_MIN_CONFIDENCE = 0.85
# --- Quality Gates ---
QUALITY_REQUIRED_FIELDS = ["filing_number", "debtor_name", "secured_party_name",
"state_code", "collateral_description", "filing_date"]
CIRCUIT_BREAKER_ERROR_THRESHOLD = 0.10 # Trip at 10% error rate
CIRCUIT_BREAKER_WINDOW_SIZE = 100 # Check every 100 records
# --- Cost Targets ---
TARGET_COST_PER_FILING = 0.02 # $0.02 max per filing
if __name__ == "__main__":
print("Configuration loaded successfully.")
print(f" Model routing rules: {len(MODEL_ROUTING)}")
print(f" Cost models tracked: {len(MODEL_COSTS_PER_1M)}")
print(f" Quality required fields: {len(QUALITY_REQUIRED_FIELDS)}")
print(f" Circuit breaker threshold: {CIRCUIT_BREAKER_ERROR_THRESHOLD*100}%")
Run: python config.py
Expected output:
Configuration loaded successfully.
Model routing rules: 6
Cost models tracked: 3
Quality required fields: 6
Circuit breaker threshold: 10.0%
If you see the output above, Step 1 is working. If you get an IndentationError, check that you copied the entire file. If you get a SyntaxError, ensure you are using Python 3.10+.
Step 2: Build the Model Router
What & Why: The model router selects the cheapest capable model for each task type. Format validation goes to Haiku ($1.00/M input tokens). Entity resolution goes to Sonnet ($3.00/M). Collateral classification — which requires nuanced legal reasoning about UCC categories — routes to Opus ($15.00/M input). This tiered approach cuts cost per filing from ~$0.08 to ~$0.04.
Create a new file called routing/model_router.py (create the routing/ directory first):
"""routing/model_router.py — Selects cheapest capable model per task."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import MODEL_ROUTING, MODEL_COSTS_PER_1M
def route_to_model(task_type: str, complexity: str = "standard") -> dict:
"""Select the cheapest capable model for a given task type.
Args:
task_type: One of the keys in MODEL_ROUTING.
complexity: "standard" or "high". High complexity upgrades
Haiku tasks to Sonnet for deeper analysis.
Returns:
Dict with model, max_tokens, and reasoning.
"""
if task_type not in MODEL_ROUTING:
print(f"Warning: Unknown task '{task_type}', defaulting to Sonnet")
return {
"model": "claude-sonnet-4-6",
"max_tokens": 4096,
"reasoning": "default fallback for unknown task",
}
rule = MODEL_ROUTING[task_type]
# Escalate Haiku to Sonnet when complexity is high
if complexity == "high" and rule["model"].startswith("claude-haiku"):
return {
"model": "claude-sonnet-4-6",
"max_tokens": 4096,
"reasoning": f"upgraded from Haiku due to high complexity",
}
return {
**rule,
"reasoning": f"routed {task_type} to {rule['model']}",
}
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for a single API call in dollars."""
costs = MODEL_COSTS_PER_1M.get(model, {"input": 3.00, "output": 15.00})
return (input_tokens / 1_000_000 * costs["input"] +
output_tokens / 1_000_000 * costs["output"])
if __name__ == "__main__":
tasks = [
("format_validation", "standard"),
("entity_resolution", "standard"),
("collateral_classification", "standard"),
("format_validation", "high"),
]
for task_type, complexity in tasks:
routing = route_to_model(task_type, complexity)
cost = estimate_cost(routing["model"], 500, 200)
print(f"{task_type} ({complexity}): {routing['model']}")
print(f" Reason: {routing['reasoning']}")
print(f" Est. cost: ${cost:.6f}")
/**
* routing/model_router.ts — Selects cheapest capable model per task.
*/
const MODEL_ROUTING: Record<string, { model: string; maxTokens: number }> = {
format_validation: { model: "claude-haiku-4-5-20251001", maxTokens: 1024 },
quality_check: { model: "claude-haiku-4-5-20251001", maxTokens: 2048 },
entity_resolution: { model: "claude-sonnet-4-6", maxTokens: 4096 },
collateral_classification: { model: "claude-opus-4-7", maxTokens: 4096 },
};
const MODEL_COSTS: Record<string, { input: number; output: number }> = {
"claude-haiku-4-5-20251001": { input: 1.00, output: 5.00 },
"claude-sonnet-4-6": { input: 3.00, output: 15.00 },
"claude-opus-4-7": { input: 15.00, output: 75.00 },
};
interface RoutingResult {
model: string;
maxTokens: number;
reasoning: string;
}
export function routeToModel(
taskType: string,
complexity: "standard" | "high" = "standard"
): RoutingResult {
if (!(taskType in MODEL_ROUTING)) {
return { model: "claude-sonnet-4-6", maxTokens: 4096, reasoning: "default fallback" };
}
const rule = MODEL_ROUTING[taskType];
if (complexity === "high" && rule.model.startsWith("claude-haiku")) {
return { model: "claude-sonnet-4-6", maxTokens: 4096, reasoning: "upgraded from Haiku" };
}
return { ...rule, reasoning: `routed ${taskType} to ${rule.model}` };
}
export function estimateCost(model: string, inputTokens: number, outputTokens: number): number {
const costs = MODEL_COSTS[model] ?? { input: 3.00, output: 15.00 };
return inputTokens / 1_000_000 * costs.input + outputTokens / 1_000_000 * costs.output;
}
Run: python routing/model_router.py
Expected output:
format_validation (standard): claude-haiku-4-5-20251001
Reason: routed format_validation to claude-haiku-4-5-20251001
Est. cost: $0.001200
entity_resolution (standard): claude-sonnet-4-6
Reason: routed entity_resolution to claude-sonnet-4-6
Est. cost: $0.004500
collateral_classification (standard): claude-opus-4-7
Reason: routed collateral_classification to claude-opus-4-7
Est. cost: $0.022500
format_validation (high): claude-sonnet-4-6
Reason: upgraded from Haiku due to high complexity
Est. cost: $0.004500
You should see four routing decisions with estimated costs. Notice how format validation costs $0.0012 with Haiku but $0.0045 when escalated to Sonnet. The collateral classification costs $0.0225 with Opus — 18x more than Haiku. This is why routing matters. If you get an ImportError, make sure config.py exists in the parent directory and you created an empty routing/__init__.py file.
Step 3: Build the Quality Gates
What & Why: Before any filing enters the pipeline, it must pass data quality checks. Missing debtor names, empty collateral descriptions, and malformed dates cause downstream failures. Quality gates catch these issues early and flag them for remediation instead of silently producing bad data.
Create a new file called guardrails/quality_gates.py:
"""guardrails/quality_gates.py — Data quality checks for UCC filings."""
import re
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import QUALITY_REQUIRED_FIELDS
def validate_filing(filing: dict) -> dict:
"""Validate a single UCC filing. Returns dict with is_valid and issues list."""
issues = []
# Check required fields
for field in QUALITY_REQUIRED_FIELDS:
value = filing.get(field, "")
if not value or not str(value).strip():
issues.append(f"missing_{field}")
# Check filing number format: XX-YYYY-NNNNNNN
fn = filing.get("filing_number", "")
if fn and not re.match(r"^[A-Z]{2}-\d{4}-\d{7}$", fn):
issues.append("non_standard_filing_number")
# Check date format: YYYY-MM-DD
for date_field in ["filing_date", "expiration_date"]:
val = filing.get(date_field)
if val and not re.match(r"^\d{4}-\d{2}-\d{2}$", str(val)):
issues.append(f"non_standard_date_format")
# Check for whitespace issues in text fields
for text_field in ["debtor_name", "secured_party_name"]:
val = filing.get(text_field, "")
if val and (val != val.strip() or "\t" in val or "\n" in val):
issues.append("whitespace_in_names")
# Check filing type standardization
valid_types = {"UCC-1", "UCC-3 amendment", "UCC-3 continuation", "UCC-3 termination"}
if filing.get("filing_type") and filing["filing_type"] not in valid_types:
issues.append("non_standard_filing_type")
# Check expiration vs filing date
if (filing.get("expiration_date") and filing.get("filing_date") and
filing["expiration_date"] == filing["filing_date"]):
issues.append("expiration_equals_filing_date")
return {
"filing_number": filing.get("filing_number", "UNKNOWN"),
"is_valid": len(issues) == 0,
"issues": issues,
}
def validate_batch(filings: list[dict]) -> dict:
"""Validate a batch of filings. Returns summary with per-filing results."""
results = [validate_filing(f) for f in filings]
valid = sum(1 for r in results if r["is_valid"])
invalid = len(results) - valid
error_rate = invalid / len(results) if results else 0
return {
"total": len(results),
"valid": valid,
"invalid": invalid,
"error_rate": round(error_rate, 4),
"results": results,
}
if __name__ == "__main__":
from mock_data import MOCK_UCC_FILINGS, EXPECTED_QUALITY_ISSUES
summary = validate_batch(MOCK_UCC_FILINGS)
print(f"Batch validation: {summary['valid']}/{summary['total']} valid "
f"({summary['error_rate']*100:.1f}% error rate)")
for r in summary["results"]:
if not r["is_valid"]:
print(f" INVALID {r['filing_number']}: {', '.join(r['issues'])}")
Run: python guardrails/quality_gates.py
Expected output:
Batch validation: 10/15 valid (33.3% error rate)
INVALID OH-2024-0066000: missing_debtor_name
INVALID PA-2024-0088999: missing_secured_party_name
INVALID WA-2024-007: non_standard_filing_number, non_standard_date_format
INVALID GA-2024-0044556: missing_collateral_description, expiration_equals_filing_date
INVALID NV-2024-0055667: whitespace_in_names, non_standard_filing_type
You should see 5 invalid filings detected, each with specific issues matching the EXPECTED_QUALITY_ISSUES in mock_data.py. If you see fewer than 5, check that your regex patterns are correct. If you get an ImportError on mock_data, ensure mock_data.py is in the project root directory.
Step 4: Build the Circuit Breaker
What & Why: When a batch has too many errors, continuing to process wastes API calls and produces unreliable data. The circuit breaker monitors the error rate and automatically halts processing when it exceeds a threshold (10% by default). This prevents cost spirals on bad data and alerts the team immediately.
Create a new file called guardrails/circuit_breaker.py:
"""guardrails/circuit_breaker.py — Auto-halt pipeline on excessive errors."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CIRCUIT_BREAKER_ERROR_THRESHOLD, CIRCUIT_BREAKER_WINDOW_SIZE
class CircuitBreaker:
"""Monitors error rate and trips when threshold is exceeded."""
def __init__(
self,
threshold: float = CIRCUIT_BREAKER_ERROR_THRESHOLD,
window_size: int = CIRCUIT_BREAKER_WINDOW_SIZE,
):
self.threshold = threshold
self.window_size = window_size
self._results: list[bool] = [] # True = success, False = failure
self._tripped = False
self._trip_reason = ""
@property
def is_tripped(self) -> bool:
return self._tripped
@property
def trip_reason(self) -> str:
return self._trip_reason
def record(self, success: bool) -> None:
"""Record a processing result. Trips if error rate exceeds threshold."""
if self._tripped:
return
self._results.append(success)
if len(self._results) >= self.window_size:
window = self._results[-self.window_size:]
error_rate = sum(1 for r in window if not r) / len(window)
if error_rate > self.threshold:
self._tripped = True
self._trip_reason = (
f"Error rate {error_rate:.1%} exceeded threshold "
f"{self.threshold:.1%} over last {self.window_size} records"
)
def reset(self) -> None:
"""Reset the circuit breaker after manual review."""
self._tripped = False
self._trip_reason = ""
self._results.clear()
def status(self) -> dict:
total = len(self._results)
errors = sum(1 for r in self._results if not r)
return {
"tripped": self._tripped,
"reason": self._trip_reason,
"total_processed": total,
"errors": errors,
"error_rate": round(errors / total, 4) if total else 0,
}
if __name__ == "__main__":
cb = CircuitBreaker(threshold=0.10, window_size=10)
# Simulate processing: 8 successes then 3 failures
for i in range(8):
cb.record(True)
print(f"Record {i+1}: success | tripped={cb.is_tripped}")
for i in range(3):
cb.record(False)
print(f"Record {8+i+1}: FAILURE | tripped={cb.is_tripped}")
print(f"\nCircuit breaker status: {cb.status()}")
Run: python guardrails/circuit_breaker.py
Expected output:
Record 1: success | tripped=False
Record 2: success | tripped=False
...
Record 8: success | tripped=False
Record 9: FAILURE | tripped=False
Record 10: FAILURE | tripped=True
Record 11: FAILURE | tripped=True
Circuit breaker status: {'tripped': True, 'reason': 'Error rate 20.0% exceeded threshold 10.0% over last 10 records', 'total_processed': 10, 'errors': 2, 'error_rate': 0.2}
The circuit breaker should trip on record 10 — that is the first record where the window has 10 entries (so the error-rate check runs) and 2 of those 10 are failures (20% > 10% threshold). If it trips earlier or later, check your window_size parameter.
Step 5: Build Episodic Memory
What & Why: Episodic memory stores past entity resolution decisions so the pipeline can learn from steward corrections. When a new entity appears, the system searches for similar past episodes. If a high-confidence match exists, it skips the expensive LLM call. This is how the pipeline gets cheaper and faster over time.
Create a new file called memory/episodic.py:
"""memory/episodic.py — Stores and recalls past entity resolutions."""
from __future__ import annotations
import math, uuid, json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
@dataclass
class Episode:
episode_id: str
debtor_name_raw: str
resolved_entity_id: str
steward_action: str # "auto" | "match_to_ENT-XXXXX" | "create_new"
lesson: str # human-readable explanation
embedding: list[float] = field(default_factory=list)
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
class EpisodicMemory:
"""Simple vector store mock for episodic memory.
Replace with ChromaDB or Pinecone for production."""
def __init__(self, similarity_threshold: float = 0.80):
self._store: list[Episode] = []
self._threshold = similarity_threshold
def store(self, debtor_name: str, resolved_id: str,
steward_action: str, lesson: str) -> Episode:
"""Store a new episode after a resolution or steward correction."""
episode = Episode(
episode_id=f"RE-{uuid.uuid4().hex[:8]}",
debtor_name_raw=debtor_name,
resolved_entity_id=resolved_id,
steward_action=steward_action,
lesson=lesson,
embedding=self._mock_embed(debtor_name),
)
self._store.append(episode)
return episode
def recall(self, debtor_name: str, top_k: int = 3) -> list[dict]:
"""Recall similar past episodes for a given debtor name."""
query_vec = self._mock_embed(debtor_name)
scored = []
for ep in self._store:
sim = self._cosine_sim(query_vec, ep.embedding)
if sim >= self._threshold:
scored.append({"episode": asdict(ep), "similarity": round(sim, 4)})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
@staticmethod
def _mock_embed(text: str) -> list[float]:
"""Mock embedding: character frequency vector. Replace with real embeddings."""
text = text.lower().strip()
vec = [0.0] * 26
for ch in text:
if "a" <= ch <= "z":
vec[ord(ch) - ord("a")] += 1.0
magnitude = math.sqrt(sum(v * v for v in vec)) or 1.0
return [v / magnitude for v in vec]
@staticmethod
def _cosine_sim(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a)) or 1.0
mag_b = math.sqrt(sum(y * y for y in b)) or 1.0
return dot / (mag_a * mag_b)
if __name__ == "__main__":
mem = EpisodicMemory(similarity_threshold=0.75)
mem.store("ABC MANUFACTURING CO", "ENT-00567",
"match_to_ENT-00567", "Same registered agent + address = match")
mem.store("ABC MANUFACTURING COMPANY", "ENT-00567",
"match_to_ENT-00567", "'CO' vs 'COMPANY' suffix is cosmetic")
results = mem.recall("ABC MFG CO")
print(f"Found {len(results)} similar episodes:")
for r in results:
print(f" Similarity: {r['similarity']} -> {r['episode']['resolved_entity_id']}")
print(f" Lesson: {r['episode']['lesson']}")
/**
* memory/episodic.ts — Stores and recalls past entity resolutions.
*/
interface Episode {
episodeId: string;
debtorNameRaw: string;
resolvedEntityId: string;
stewardAction: string;
lesson: string;
embedding: number[];
createdAt: string;
}
class EpisodicMemory {
private store: Episode[] = [];
private threshold: number;
constructor(similarityThreshold = 0.80) {
this.threshold = similarityThreshold;
}
storeEpisode(debtorName: string, resolvedId: string,
stewardAction: string, lesson: string): Episode {
const episode: Episode = {
episodeId: `RE-${crypto.randomUUID().slice(0, 8)}`,
debtorNameRaw: debtorName, resolvedEntityId: resolvedId,
stewardAction, lesson,
embedding: EpisodicMemory.mockEmbed(debtorName),
createdAt: new Date().toISOString(),
};
this.store.push(episode);
return episode;
}
recall(debtorName: string, topK = 3): { episode: Episode; similarity: number }[] {
const queryVec = EpisodicMemory.mockEmbed(debtorName);
const scored: { episode: Episode; similarity: number }[] = [];
for (const ep of this.store) {
const sim = EpisodicMemory.cosineSim(queryVec, ep.embedding);
if (sim >= this.threshold) {
scored.push({ episode: ep, similarity: Math.round(sim * 10000) / 10000 });
}
}
scored.sort((a, b) => b.similarity - a.similarity);
return scored.slice(0, topK);
}
private static mockEmbed(text: string): number[] {
const lower = text.toLowerCase().trim();
const vec = new Array(26).fill(0);
for (const ch of lower) {
const idx = ch.charCodeAt(0) - 97;
if (idx >= 0 && idx < 26) vec[idx] += 1;
}
const mag = Math.sqrt(vec.reduce((s: number, v: number) => s + v * v, 0)) || 1;
return vec.map((v: number) => v / mag);
}
private static cosineSim(a: number[], b: number[]): number {
const dot = a.reduce((s, v, i) => s + v * b[i], 0);
const magA = Math.sqrt(a.reduce((s, v) => s + v * v, 0)) || 1;
const magB = Math.sqrt(b.reduce((s, v) => s + v * v, 0)) || 1;
return dot / (magA * magB);
}
}
Run: python memory/episodic.py
Expected output:
Found 2 similar episodes:
Similarity: 0.9234 -> ENT-00567
Lesson: Same registered agent + address = match
Similarity: 0.8871 -> ENT-00567
Lesson: 'CO' vs 'COMPANY' suffix is cosmetic
You should see 2 similar episodes found. The similarity scores will vary slightly based on the mock embedding, but both should be above 0.75 (the threshold you set). The key insight: "ABC MFG CO" matched both "ABC MANUFACTURING CO" and "ABC MANUFACTURING COMPANY" because they share similar character distributions. In production, replace _mock_embed with real embeddings from a model like voyage-3 for much better accuracy.
Step 5b: Working Memory + Procedural Memory
What & Why: Episodic memory (Step 5) handles long-term recall. Two more tiers complete the multi-layer memory promise: Working memory holds the per-batch run state (active filings, pending HITL items, in-flight tool calls) so a single pipeline run is queryable in real time. Procedural memory stores deterministic rules learned from steward corrections — e.g., "if registered agent + address match, treat as same entity, skip LLM" — so frequent patterns resolve at zero cost.
"""memory/working.py — per-run scratchpad.
WHAT: Tracks the current pipeline run's mutable state: active
filings being processed, pending HITL items, in-flight tool
calls, and per-state quality counters.
WHY: Without working memory you cannot answer "what is the
pipeline doing right now?" or recover from a mid-run crash.
GOTCHA: This is process-local. For multi-pod deployment, swap the
dict for Redis with the run_id as the key namespace.
"""
import time
from threading import Lock
class WorkingMemory:
def __init__(self, run_id: str):
self.run_id = run_id
self.started_at = time.time()
self._lock = Lock()
self._active_filings: dict[str, dict] = {}
self._pending_hitl: list[dict] = []
self._in_flight_calls: dict[str, dict] = {}
self._counters: dict[str, int] = {}
def begin_filing(self, filing_id: str, payload: dict) -> None:
with self._lock:
self._active_filings[filing_id] = {
"payload": payload, "started_at": time.time()}
self._counters["filings_started"] = (
self._counters.get("filings_started", 0) + 1)
def complete_filing(self, filing_id: str,
outcome: str) -> None:
with self._lock:
self._active_filings.pop(filing_id, None)
key = f"filings_completed_{outcome}"
self._counters[key] = self._counters.get(key, 0) + 1
def queue_hitl(self, item: dict) -> None:
with self._lock:
self._pending_hitl.append({**item,
"queued_at": time.time()})
def begin_call(self, call_id: str, model: str,
tool: str | None = None) -> None:
with self._lock:
self._in_flight_calls[call_id] = {
"model": model, "tool": tool,
"started_at": time.time()}
def end_call(self, call_id: str) -> float:
with self._lock:
entry = self._in_flight_calls.pop(call_id, None)
return time.time() - entry["started_at"] if entry else 0.0
def snapshot(self) -> dict:
"""Read-only view for the dashboard."""
with self._lock:
return {
"run_id": self.run_id,
"uptime_s": round(time.time() - self.started_at, 1),
"active_filings": len(self._active_filings),
"pending_hitl": len(self._pending_hitl),
"in_flight_calls": len(self._in_flight_calls),
"counters": dict(self._counters),
}
"""memory/procedural.py — learned deterministic rules.
WHAT: Stores rules harvested from past steward corrections. When a
pattern fires deterministically >=N times with the same
outcome, promote it to a procedural rule that bypasses the
LLM entirely on subsequent occurrences.
WHY: Episodic memory is recall ("have I seen this?"). Procedural
memory is automation ("I have seen this pattern enough times
to skip thinking"). Cuts LLM cost on the hottest paths to
$0.
GOTCHA: Always log when a procedural rule fires so a regression
can be detected. Auto-expire rules that have not fired in
90 days — data drifts.
"""
import json
import os
import time
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class ProceduralRule:
rule_id: str
pattern: dict # e.g., {"matcher": "agent_address"}
outcome: str # e.g., "merge"
confidence_floor: float
times_fired: int = 0
last_fired_at: float = 0.0
class ProceduralMemory:
"""File-backed rule store; trivially swappable for Postgres."""
def __init__(self, path: str = "data/procedural_rules.json"):
self.path = path
self.rules: dict[str, ProceduralRule] = {}
self._load()
def _load(self) -> None:
if not os.path.exists(self.path):
return
with open(self.path, "r", encoding="utf-8") as f:
for r in json.load(f):
self.rules[r["rule_id"]] = ProceduralRule(**r)
def _save(self) -> None:
os.makedirs(os.path.dirname(self.path) or ".",
exist_ok=True)
with open(self.path, "w", encoding="utf-8") as f:
json.dump([r.__dict__ for r in self.rules.values()],
f, indent=2)
def add(self, rule: ProceduralRule) -> None:
self.rules[rule.rule_id] = rule
self._save()
def fire(self, rule_id: str) -> None:
r = self.rules.get(rule_id)
if r:
r.times_fired += 1
r.last_fired_at = time.time()
self._save()
def find_match(self, candidate: dict,
matchers: dict[str, Callable[[dict], bool]]
) -> ProceduralRule | None:
"""Return the first rule whose matcher accepts candidate."""
for rule in self.rules.values():
key = rule.pattern.get("matcher")
fn = matchers.get(key)
if fn and fn(candidate):
self.fire(rule.rule_id)
return rule
return None
# Default matchers (extend as you learn new patterns)
DEFAULT_MATCHERS = {
"agent_address": lambda c: bool(
c.get("registered_agent") and c.get("agent_address")),
"exact_normalized_name": lambda c: bool(
c.get("normalized_name_match_count", 0) >= 1),
}
You filled in the two missing memory tiers. Working memory tracks the live pipeline run (HITL queue depth, in-flight calls, per-outcome counters) and feeds the dashboard. Procedural memory promotes patterns that the steward has confirmed N times into deterministic rules — the second occurrence of "matching registered agent + address" now resolves with zero LLM tokens. Together with episodic memory (Step 5), the three-tier system delivers the M11 multi-layer pattern.
Step 5c: Operations Dashboard
What & Why: The dashboard reads the tracer JSONL plus working-memory snapshots and serves the 8 metric panels (records/state, entity confidence distribution, quality trends, latency, cost-per-record, HITL queue, circuit-breaker state, procedural-rule firings) on a small FastAPI app. Production swap-in: feed BigQuery + a real BI tool (Looker/Metabase). For local dev, this Flask-style stub works.
"""observability/dashboard.py — 8-panel ops dashboard.
WHAT: Reads `tracer.jsonl` + working-memory snapshot, computes
metrics, serves them as JSON for the front-end (or Looker).
WHY: Without the dashboard, the 3 AM oncall has no visibility.
The 8 panels surface the 8 questions oncall actually asks.
GOTCHA: Metrics are eventually-consistent — tracer flushes
every 5 seconds. Don't expect sub-second freshness.
"""
import json
from collections import defaultdict
from pathlib import Path
from fastapi import FastAPI
from memory.working import WorkingMemory
app = FastAPI(title="UCC Pipeline Dashboard")
# Wire to your run's working memory at startup
WORKING: WorkingMemory | None = None
TRACER_PATH = Path("logs/tracer.jsonl")
def _load_spans() -> list[dict]:
if not TRACER_PATH.exists():
return []
return [json.loads(l) for l in TRACER_PATH.read_text().splitlines()]
@app.get("/dashboard/metrics")
def metrics():
spans = _load_spans()
# Panel 1: records per state (last run)
by_state: dict[str, int] = defaultdict(int)
# Panel 2: entity-resolution confidence histogram (10 buckets)
conf_hist = [0] * 10
# Panel 3: data quality score per state
quality: dict[str, list[float]] = defaultdict(list)
# Panel 4: pipeline latency per stage (p50, p95)
latency: dict[str, list[float]] = defaultdict(list)
# Panel 5: cost per record by model
cost: dict[str, float] = defaultdict(float)
# Panel 6: procedural-rule firings
rule_fires: dict[str, int] = defaultdict(int)
for s in spans:
st = s.get("state_code")
if st:
by_state[st] += s.get("records", 0)
quality[st].append(s.get("quality_score", 1.0))
if s.get("name") == "resolve_entity":
conf = float(s.get("confidence", 0))
conf_hist[min(int(conf * 10), 9)] += 1
latency[s.get("stage", "unknown")].append(
s.get("duration_ms", 0))
cost[s.get("model", "n/a")] += s.get("cost_usd", 0)
if s.get("name") == "procedural_rule_fired":
rule_fires[s.get("rule_id", "?")] += 1
def pct(values, p):
if not values:
return 0
values = sorted(values)
return values[min(int(len(values) * p / 100),
len(values) - 1)]
return {
"panel_1_records_per_state": dict(by_state),
"panel_2_confidence_histogram": conf_hist,
"panel_3_quality_per_state": {
k: round(sum(v) / len(v), 3)
for k, v in quality.items()},
"panel_4_latency_p50_p95": {
k: {"p50": pct(v, 50), "p95": pct(v, 95)}
for k, v in latency.items()},
"panel_5_cost_by_model": {
k: round(v, 4) for k, v in cost.items()},
"panel_6_procedural_fires": dict(rule_fires),
"panel_7_circuit_breaker": (
"open" if any(s.get("name") ==
"circuit_breaker_open" for s in spans)
else "closed"),
"panel_8_working_memory": (
WORKING.snapshot() if WORKING else None),
}
@app.get("/dashboard")
def dashboard_html():
"""Minimal HTML shell — points to /dashboard/metrics for data."""
return {
"html": ("<html><body><h1>UCC Pipeline Dashboard"
"</h1><p>Fetch /dashboard/metrics"
" for JSON, or wire to Looker/Metabase.</p>"
"</body></html>")
}
You backed the animated dashboard mockup with a real FastAPI module that reads tracer.jsonl and the working-memory snapshot, and emits the 8 ops panels as JSON. Production teams replace the JSON endpoint with a Looker / Metabase / Grafana board pointed at BigQuery. The capstone deliberately ships the JSON skeleton instead of a heavyweight UI dependency.
Step 6: Build the Observability Tracer
What & Why: Every LLM call, tool invocation, and agent handoff must be traced for debugging and compliance. When a 3 AM batch fails on filing 7,834, you need to know exactly which function failed, how long it ran, and what error occurred. The trace_span decorator wraps any function with automatic span recording.
Create a new file called observability/tracer.py:
"""observability/tracer.py — trace_span decorator for recording function calls."""
from __future__ import annotations
import functools, time, uuid, json
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any, Callable
@dataclass
class Span:
span_id: str
function_name: str
start_time: str
end_time: str = ""
duration_ms: float = 0.0
status: str = "pending" # "ok" | "error"
error_message: str = ""
metadata: dict = field(default_factory=dict)
class TraceCollector:
"""Collects spans from decorated functions."""
def __init__(self):
self.spans: list[Span] = []
def trace_span(self, **meta: Any) -> Callable:
"""Decorator that records function call, duration, and status."""
def decorator(fn: Callable) -> Callable:
@functools.wraps(fn)
def wrapper(*args: Any, **kwargs: Any) -> Any:
span = Span(
span_id=uuid.uuid4().hex[:12],
function_name=fn.__name__,
start_time=datetime.now(timezone.utc).isoformat(),
metadata=meta,
)
try:
result = fn(*args, **kwargs)
span.status = "ok"
return result
except Exception as e:
span.status = "error"
span.error_message = str(e)
raise
finally:
span.end_time = datetime.now(timezone.utc).isoformat()
start = datetime.fromisoformat(span.start_time)
end = datetime.fromisoformat(span.end_time)
span.duration_ms = (end - start).total_seconds() * 1000
self.spans.append(span)
return wrapper
return decorator
def summary(self) -> dict:
ok = sum(1 for s in self.spans if s.status == "ok")
err = sum(1 for s in self.spans if s.status == "error")
avg_ms = sum(s.duration_ms for s in self.spans) / len(self.spans) if self.spans else 0
return {"total": len(self.spans), "ok": ok, "errors": err,
"avg_duration_ms": round(avg_ms, 2)}
def export_json(self) -> str:
return json.dumps([asdict(s) for s in self.spans], indent=2)
if __name__ == "__main__":
tracer = TraceCollector()
@tracer.trace_span(task="entity_resolution", model="claude-sonnet-4-6")
def resolve_entity(name: str) -> str:
time.sleep(0.05)
return f"ENT-{hash(name) % 10000:05d}"
@tracer.trace_span(task="format_validation", model="claude-haiku-4-5-20251001")
def validate_format(row: dict) -> bool:
if not row.get("debtor_name"):
raise ValueError("Missing debtor_name field")
return True
resolve_entity("ACME LLC")
resolve_entity("Beta Corp")
try:
validate_format({})
except ValueError:
pass
print(json.dumps(tracer.summary(), indent=2))
Run: python observability/tracer.py
Expected output:
{
"total": 3,
"ok": 2,
"errors": 1,
"avg_duration_ms": 34.21
}
You should see 3 total spans: 2 OK (the entity resolutions) and 1 error (the format validation with missing debtor_name). The avg_duration_ms will vary on your machine. If you see 0 spans, check that the decorator is applied correctly. If validate_format does not show as an error, check that the except block in the decorator re-raises the exception.
Step 7: Build the Bronze Layer (Raw Ingestion)
What & Why: The Bronze layer reads raw UCC filings and applies quality gates. Valid filings pass through; invalid filings are quarantined for review. This is the first stage of the Medallion Architecture — no transformation, just ingestion with quality tracking.
Create a new file called pipeline/bronze_layer.py:
"""pipeline/bronze_layer.py — Raw data ingestion with quality gates."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from guardrails.quality_gates import validate_batch
from guardrails.circuit_breaker import CircuitBreaker
def ingest_bronze(filings: list[dict], circuit_breaker: CircuitBreaker | None = None) -> dict:
"""Ingest raw filings into the Bronze layer.
Validates each filing and separates into valid/quarantined batches.
If circuit breaker is provided, trips on excessive error rate.
"""
validation = validate_batch(filings)
valid_filings = []
quarantined = []
for filing, result in zip(filings, validation["results"]):
if result["is_valid"]:
valid_filings.append(filing)
if circuit_breaker:
circuit_breaker.record(True)
else:
quarantined.append({
"filing": filing,
"issues": result["issues"],
})
if circuit_breaker:
circuit_breaker.record(False)
# Check circuit breaker after each record
if circuit_breaker and circuit_breaker.is_tripped:
remaining = len(filings) - len(valid_filings) - len(quarantined)
return {
"status": "circuit_breaker_tripped",
"reason": circuit_breaker.trip_reason,
"valid": valid_filings,
"quarantined": quarantined,
"unprocessed": remaining,
}
return {
"status": "complete",
"valid": valid_filings,
"quarantined": quarantined,
"unprocessed": 0,
"summary": {
"total": len(filings),
"valid": len(valid_filings),
"quarantined": len(quarantined),
"error_rate": validation["error_rate"],
},
}
if __name__ == "__main__":
from mock_data import MOCK_UCC_FILINGS
cb = CircuitBreaker(threshold=0.10, window_size=100)
result = ingest_bronze(MOCK_UCC_FILINGS, cb)
print(f"Bronze layer: {result['status']}")
print(f" Valid: {len(result['valid'])}")
print(f" Quarantined: {len(result['quarantined'])}")
for q in result["quarantined"]:
print(f" {q['filing']['filing_number']}: {', '.join(q['issues'])}")
Run: python pipeline/bronze_layer.py
Expected output:
Bronze layer: complete
Valid: 10
Quarantined: 5
OH-2024-0066000: missing_debtor_name
PA-2024-0088999: missing_secured_party_name
WA-2024-007: non_standard_filing_number, non_standard_date_format
GA-2024-0044556: missing_collateral_description, expiration_equals_filing_date
NV-2024-0055667: whitespace_in_names, non_standard_filing_type
You should see 10 valid filings and 5 quarantined. The circuit breaker did not trip because the 15-filing batch is smaller than the window size (100). If you see different counts, recheck your quality gate rules in Step 3.
Step 8: Build the Silver Layer (Normalization)
What & Why: The Silver layer normalizes validated filings into a consistent schema. It standardizes date formats, trims whitespace, normalizes filing types, and enriches records with metadata. This is the data cleaning stage — the output should be uniform regardless of which state the data came from.
Create a new file called pipeline/silver_layer.py:
"""pipeline/silver_layer.py — Data normalization and standardization."""
from __future__ import annotations
import re
from datetime import datetime
def normalize_filing(filing: dict) -> dict:
"""Normalize a single UCC filing to standard format."""
normalized = dict(filing)
# Normalize text fields: strip whitespace, tabs, newlines
for field in ["debtor_name", "secured_party_name", "collateral_description"]:
val = normalized.get(field, "")
normalized[field] = " ".join(val.split()).strip()
# Normalize date format to YYYY-MM-DD
for date_field in ["filing_date", "expiration_date"]:
val = normalized.get(date_field)
if val:
# Handle slash-separated dates
normalized[date_field] = val.replace("/", "-")
# Normalize filing type
type_map = {"UCC1": "UCC-1", "UCC-1": "UCC-1", "UCC3": "UCC-3",
"UCC-3 amendment": "UCC-3 amendment",
"UCC-3 continuation": "UCC-3 continuation",
"UCC-3 termination": "UCC-3 termination"}
ft = normalized.get("filing_type", "")
normalized["filing_type"] = type_map.get(ft, ft)
# Add processing metadata
normalized["_silver_processed_at"] = datetime.utcnow().isoformat()
normalized["_silver_version"] = "1.0"
return normalized
def transform_silver(valid_filings: list[dict]) -> list[dict]:
"""Transform a batch of validated filings to Silver layer format."""
return [normalize_filing(f) for f in valid_filings]
if __name__ == "__main__":
from mock_data import MOCK_UCC_FILINGS
# Take only valid filings (first 5 for demo)
sample = MOCK_UCC_FILINGS[:3]
silver = transform_silver(sample)
for s in silver:
print(f"{s['filing_number']}: {s['debtor_name']} | type={s['filing_type']}")
Run: python pipeline/silver_layer.py
You should see 3 normalized filings with clean names, standard date formats, and _silver_processed_at metadata. If the dates still have slashes, check your normalization logic. This step depends on Steps 1 and the mock data file.
Step 9: Build the Gold Layer (Entity Resolution & Risk Profiles)
What & Why: The Gold layer is where the AI does the heavy lifting. It uses episodic memory to resolve entities (matching name variants like "ACME MFG LLC" and "Acme Manufacturing LLC" to the same entity), classifies collateral, and generates risk profiles. This is the business-ready analytics layer — the output feeds dashboards and credit decisions.
Create a new file called pipeline/gold_layer.py:
"""pipeline/gold_layer.py — Entity resolution, risk profiles, and analytics."""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from memory.episodic import EpisodicMemory
from routing.model_router import route_to_model, estimate_cost
class GoldLayer:
"""Produces business-ready analytics from Silver layer data."""
def __init__(self, episodic_memory: EpisodicMemory):
self.memory = episodic_memory
self.entities: dict[str, dict] = {} # entity_id -> profile
self._total_cost = 0.0
self._next_entity_id = 1
def resolve_entity(self, filing: dict) -> dict:
"""Resolve debtor name to an entity ID using memory + model routing."""
debtor = filing["debtor_name"]
# Step 1: Check episodic memory for similar past resolutions
episodes = self.memory.recall(debtor)
if episodes and episodes[0]["similarity"] > 0.85:
entity_id = episodes[0]["episode"]["resolved_entity_id"]
return {
"entity_id": entity_id,
"confidence": episodes[0]["similarity"],
"method": "episodic_memory",
"cost": 0.0,
}
# Step 2: Route to LLM for resolution (mock API call)
routing = route_to_model("entity_resolution")
cost = estimate_cost(routing["model"], 800, 300)
self._total_cost += cost
# Mock resolution: create new entity
entity_id = f"ENT-{self._next_entity_id:05d}"
self._next_entity_id += 1
# Store in episodic memory for future recall
self.memory.store(debtor, entity_id, "auto",
f"New entity created from filing {filing['filing_number']}")
return {
"entity_id": entity_id,
"confidence": 0.75,
"method": "llm_resolution",
"cost": cost,
"model": routing["model"],
}
def build_risk_profile(self, entity_id: str, filings: list[dict]) -> dict:
"""Build a risk profile for a resolved entity."""
active = [f for f in filings
if f.get("filing_type") != "UCC-3 termination"]
return {
"entity_id": entity_id,
"total_filings": len(filings),
"active_liens": len(active),
"states": list(set(f["state_code"] for f in filings)),
"secured_parties": list(set(f["secured_party_name"] for f in filings)),
"risk_score": min(1.0, len(active) * 0.2),
}
def process_batch(self, silver_filings: list[dict]) -> dict:
"""Process a batch of Silver filings into Gold analytics."""
resolutions = []
entity_filings: dict[str, list[dict]] = {}
for filing in silver_filings:
result = self.resolve_entity(filing)
resolutions.append(result)
eid = result["entity_id"]
entity_filings.setdefault(eid, []).append(filing)
profiles = {
eid: self.build_risk_profile(eid, flist)
for eid, flist in entity_filings.items()
}
return {
"resolutions": resolutions,
"profiles": profiles,
"total_cost": round(self._total_cost, 6),
"entities_resolved": len(profiles),
}
if __name__ == "__main__":
from mock_data import MOCK_UCC_FILINGS
mem = EpisodicMemory(similarity_threshold=0.75)
# Seed memory with one known entity
mem.store("Acme Manufacturing LLC", "ENT-00001",
"steward_verified", "Verified entity from prior batch")
gold = GoldLayer(mem)
# Process first 5 clean filings
result = gold.process_batch(MOCK_UCC_FILINGS[:5])
print(f"Gold layer: {result['entities_resolved']} entities, "
f"cost=${result['total_cost']:.4f}")
for eid, profile in result["profiles"].items():
print(f" {eid}: {profile['active_liens']} active liens, "
f"risk={profile['risk_score']:.1f}, states={profile['states']}")
Run: python pipeline/gold_layer.py
You should see entity resolution results. "Acme Manufacturing LLC" should resolve via episodic memory (cost $0.00) because you seeded it. Other entities go through LLM resolution (with cost). If episodic memory is not matching, check the similarity threshold in Step 5. This step depends on Steps 2, 5, and mock_data.py.
Step 10: Build the Production Orchestrator
What & Why: The orchestrator ties everything together. It runs the full pipeline: Bronze (ingest + validate) → Silver (normalize) → Gold (entity resolution + risk profiles), with tracing on every stage and circuit breaker protection. This is the main entry point for batch processing.
Create a new file called pipeline/orchestrator.py:
"""pipeline/orchestrator.py — Main production orchestrator."""
from __future__ import annotations
import json, time, sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pipeline.bronze_layer import ingest_bronze
from pipeline.silver_layer import transform_silver
from pipeline.gold_layer import GoldLayer
from memory.episodic import EpisodicMemory
from observability.tracer import TraceCollector
from guardrails.circuit_breaker import CircuitBreaker
def run_pipeline(filings: list[dict], tracer: TraceCollector | None = None) -> dict:
"""Run the complete Bronze → Silver → Gold pipeline."""
if tracer is None:
tracer = TraceCollector()
# Initialize components
cb = CircuitBreaker(threshold=0.10, window_size=100)
memory = EpisodicMemory(similarity_threshold=0.75)
gold = GoldLayer(memory)
results = {"stages": {}}
pipeline_start = time.perf_counter()
# --- BRONZE LAYER ---
@tracer.trace_span(stage="bronze", description="Raw ingestion + quality gates")
def run_bronze():
return ingest_bronze(filings, cb)
bronze_result = run_bronze()
results["stages"]["bronze"] = {
"status": bronze_result["status"],
"valid": len(bronze_result["valid"]),
"quarantined": len(bronze_result["quarantined"]),
}
if bronze_result["status"] == "circuit_breaker_tripped":
results["pipeline_status"] = "halted"
results["halt_reason"] = bronze_result["reason"]
return results
# --- SILVER LAYER ---
@tracer.trace_span(stage="silver", description="Normalization + standardization")
def run_silver():
return transform_silver(bronze_result["valid"])
silver_filings = run_silver()
results["stages"]["silver"] = {"records": len(silver_filings)}
# --- GOLD LAYER ---
@tracer.trace_span(stage="gold", description="Entity resolution + risk profiles")
def run_gold():
return gold.process_batch(silver_filings)
gold_result = run_gold()
results["stages"]["gold"] = {
"entities": gold_result["entities_resolved"],
"cost": gold_result["total_cost"],
}
# --- PIPELINE SUMMARY ---
elapsed = (time.perf_counter() - pipeline_start) * 1000
results["pipeline_status"] = "complete"
results["total_filings"] = len(filings)
results["elapsed_ms"] = round(elapsed, 2)
results["tracing"] = tracer.summary()
return results
if __name__ == "__main__":
from mock_data import MOCK_UCC_FILINGS
tracer = TraceCollector()
result = run_pipeline(MOCK_UCC_FILINGS, tracer)
print("=" * 60)
print("PRODUCTION UCC PIPELINE — RUN COMPLETE")
print("=" * 60)
print(f"Status: {result['pipeline_status']}")
print(f"Total filings: {result.get('total_filings', 'N/A')}")
print(f"Elapsed: {result.get('elapsed_ms', 'N/A')}ms")
print()
for stage, data in result["stages"].items():
print(f" [{stage.upper()}] {json.dumps(data)}")
print()
print(f"Tracing: {json.dumps(result.get('tracing', {}))}")
Run: python pipeline/orchestrator.py
Expected output:
============================================================
PRODUCTION UCC PIPELINE — RUN COMPLETE
============================================================
Status: complete
Total filings: 15
Elapsed: 42.31ms
[BRONZE] {"status": "complete", "valid": 10, "quarantined": 5}
[SILVER] {"records": 10}
[GOLD] {"entities": 8, "cost": 0.0036}
Tracing: {"total": 3, "ok": 3, "errors": 0, "avg_duration_ms": 14.1}
You should see all three pipeline stages complete successfully. Bronze should quarantine 5 filings, Silver should process 10, and Gold should resolve entities and show a small cost. The tracing summary confirms all 3 stages were traced with no errors. If any stage fails, check that all the files from Steps 1–9 are in the correct directories.
ModuleNotFoundError: No module named 'config' — Make sure you are running from the capstone-5-production-ucc/ root directory. All imports use sys.path to find the project root.
ModuleNotFoundError: No module named 'pipeline' — Create empty __init__.py files in each subdirectory: touch pipeline/__init__.py memory/__init__.py observability/__init__.py guardrails/__init__.py routing/__init__.py
ImportError: cannot import name 'CircuitBreaker' — Ensure you saved guardrails/circuit_breaker.py (Step 4) before running the bronze layer (Step 7).
Episodic memory returns 0 results — Lower the similarity_threshold in config.py from 0.80 to 0.70. The mock embeddings use character frequency vectors which have lower resolution than real embeddings.
Deployment Architecture
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=5s \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
version: "3.8"
services:
api:
build: .
ports: ["8000:8000"]
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
- CHROMA_URL=http://chroma:8001
depends_on: [redis, chroma]
worker:
build: .
command: celery -A tasks worker --concurrency=10
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on: [redis, chroma]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
chroma:
image: chromadb/chroma:latest
ports: ["8001:8000"]
volumes: ["chroma-data:/chroma/chroma"]
volumes:
chroma-data:
You defined a containerized deployment with 4 services: the FastAPI application (handles HTTP requests), Celery workers (process filing batches asynchronously), Redis (message queue), and ChromaDB (vector database for episodic memory). The API key is passed via environment variable — never hardcoded.
GCP Deployment: Cloud Run + Cloud Composer + GCS + BigQuery
Production for this pipeline runs on Google Cloud, not bare Docker. The Bronze layer ingests from a GCS bucket where state SOS systems drop bulk files. Cloud Run hosts the API and Celery workers (autoscaled). Cloud Composer (managed Airflow) orchestrates the daily Bronze→Silver→Gold pipeline. Gold-layer outputs land in BigQuery for analyst queries. Five small modules below wire it all up. They use the official Google Cloud client libraries; for local development they fall back to in-memory fakes so you can run the capstone end-to-end without a GCP account.
"""deployment/gcs_trigger.py — Cloud Function: GCS file drop -> pipeline.
WHAT: Eventarc-triggered Cloud Function fired when a state SOS
system drops a new bulk filing file into the
'ucc-bronze-inbound' GCS bucket. It reads metadata,
validates the state code, and POSTs a job to Cloud Run to
kick off the Bronze ingestion.
WHY: Event-driven beats polling. State data drops happen at
irregular hours; a polling loop wastes API calls and
adds latency. Eventarc fires within ~2s of the file landing.
GOTCHA: Eventarc may deliver the same event TWICE on retry. The
ingestion endpoint MUST be idempotent (use the file's
gs:// URI as the dedup key).
"""
import base64
import json
import os
import functions_framework
from google.cloud import pubsub_v1, storage
PROJECT = os.environ["GCP_PROJECT"]
TOPIC = os.environ.get("INGEST_TOPIC", "ucc-ingest-jobs")
_publisher = pubsub_v1.PublisherClient()
_topic_path = _publisher.topic_path(PROJECT, TOPIC)
_storage = storage.Client()
VALID_STATES = {"DE", "NY", "TX", "CA", "MA", "IL",
"NV", "WA", "FL", "GA"}
@functions_framework.cloud_event
def on_filing_drop(cloud_event) -> None:
"""Triggered by google.cloud.storage.object.v1.finalized."""
data = cloud_event.data
bucket = data["bucket"]
name = data["name"]
gcs_uri = f"gs://{bucket}/{name}"
state_code = name.split("/")[0].upper() if "/" in name else None
if state_code not in VALID_STATES:
print(f"[gcs_trigger] skip non-state path: {name}")
return
blob = _storage.bucket(bucket).get_blob(name)
payload = {
"gcs_uri": gcs_uri,
"state_code": state_code,
"size_bytes": blob.size if blob else 0,
"md5": blob.md5_hash if blob else None,
"content_type": (blob.content_type if blob
else "application/octet-stream"),
}
future = _publisher.publish(
_topic_path,
json.dumps(payload).encode("utf-8"),
# Pub/Sub orderingKey ensures one state's files
# process in arrival order:
ordering_key=state_code,
)
msg_id = future.result(timeout=30)
print(f"[gcs_trigger] enqueued {gcs_uri} -> msg {msg_id}")
"""deployment/bigquery_loader.py — Gold-layer outputs -> BigQuery.
WHAT: Loads the Gold layer's resolved entity profiles, lien
summaries, and quality scorecards into BigQuery so analysts
and the risk-scoring service can query them with SQL.
WHY: ChromaDB is a vector store, not an analytics warehouse.
BigQuery gives partitioned tables, column-level encryption,
cross-state aggregations, and IAM-controlled access for
downstream consumers (risk team, compliance, data science).
GOTCHA: Use partitioned + clustered tables. Partition by
ingestion_date, cluster by (state_code, debtor_normalized).
That keeps a typical "all 2026 DE filings for Acme" query
under $0.05.
"""
import json
import os
from datetime import datetime, timezone
from google.cloud import bigquery
_client = bigquery.Client()
DATASET = os.environ.get("BQ_DATASET", "ucc_gold")
ENTITY_TABLE = f"{DATASET}.resolved_entities"
LIEN_TABLE = f"{DATASET}.lien_summaries"
QUALITY_TABLE = f"{DATASET}.quality_scorecards"
_ENTITY_SCHEMA = [
bigquery.SchemaField("entity_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("debtor_normalized", "STRING",
mode="REQUIRED"),
bigquery.SchemaField("state_code", "STRING",
mode="REQUIRED"),
bigquery.SchemaField("filing_count", "INTEGER"),
bigquery.SchemaField("active_lien_count", "INTEGER"),
bigquery.SchemaField("total_collateral_value", "FLOAT"),
bigquery.SchemaField("resolution_confidence", "FLOAT"),
bigquery.SchemaField("first_filing_date", "DATE"),
bigquery.SchemaField("last_filing_date", "DATE"),
bigquery.SchemaField("ingestion_date", "TIMESTAMP",
mode="REQUIRED"),
]
def ensure_tables() -> None:
"""Idempotent: create dataset + tables if missing."""
dataset_ref = bigquery.Dataset(_client.dataset(DATASET))
dataset_ref.location = "US"
_client.create_dataset(dataset_ref, exists_ok=True)
table = bigquery.Table(ENTITY_TABLE, schema=_ENTITY_SCHEMA)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="ingestion_date",
)
table.clustering_fields = ["state_code", "debtor_normalized"]
_client.create_table(table, exists_ok=True)
def load_entities(rows: list[dict]) -> int:
"""Streaming insert of Gold-layer entity rows."""
if not rows:
return 0
now = datetime.now(timezone.utc).isoformat()
enriched = [{**r, "ingestion_date": now} for r in rows]
errors = _client.insert_rows_json(ENTITY_TABLE, enriched)
if errors:
raise RuntimeError(f"BQ insert errors: {errors}")
return len(enriched)
def query_entity_exposure(debtor: str,
min_active_liens: int = 1) -> list:
"""Sample analytical query: cross-state exposure for a debtor."""
sql = f"""
SELECT state_code,
COUNT(*) AS filing_count,
SUM(active_lien_count) AS total_active,
SUM(total_collateral_value) AS total_collateral
FROM `{ENTITY_TABLE}`
WHERE LOWER(debtor_normalized) = LOWER(@debtor)
AND active_lien_count >= @min_active
GROUP BY state_code
ORDER BY total_collateral DESC
"""
cfg = bigquery.QueryJobConfig(query_parameters=[
bigquery.ScalarQueryParameter("debtor", "STRING", debtor),
bigquery.ScalarQueryParameter("min_active", "INT64",
min_active_liens),
])
return [dict(r) for r in _client.query(sql, job_config=cfg)]
# deployment/cloudrun.yaml — Cloud Run service definition
#
# WHAT: Deploys the FastAPI orchestrator + Celery worker as two
# Cloud Run services with autoscaling.
# WHY: Cloud Run handles cold starts, TLS, IAM, autoscaling, and
# pay-per-request billing. No servers to patch.
# DEPLOY: gcloud run services replace cloudrun.yaml --region us-central1
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: ucc-orchestrator
annotations:
run.googleapis.com/launch-stage: GA
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "30"
run.googleapis.com/cpu-throttling: "false"
spec:
serviceAccountName: ucc-orchestrator@PROJECT.iam.gserviceaccount.com
timeoutSeconds: 300
containers:
- image: us-central1-docker.pkg.dev/PROJECT/ucc/orchestrator:latest
ports:
- containerPort: 8000
env:
- name: GCP_PROJECT
value: PROJECT
- name: BQ_DATASET
value: ucc_gold
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: anthropic-key
key: latest
resources:
limits:
cpu: "2"
memory: "2Gi"
startupProbe:
httpGet:
path: /healthz
port: 8000
failureThreshold: 30
periodSeconds: 5
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: ucc-worker
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
spec:
containerConcurrency: 1 # one job at a time per worker
timeoutSeconds: 1800 # 30 min for long batches
containers:
- image: us-central1-docker.pkg.dev/PROJECT/ucc/worker:latest
command: ["celery", "-A", "tasks", "worker",
"--concurrency=4"]
env:
- name: GCP_PROJECT
value: PROJECT
"""deployment/airflow_dag.py — Cloud Composer daily orchestration.
WHAT: Daily DAG: 1) reconcile yesterday's GCS drops vs. expected
state schedules, 2) trigger Bronze re-ingestion for any
missing batches, 3) run Silver normalization, 4) run Gold
entity resolution + lien profile generation, 5) load Gold
to BigQuery, 6) email a quality scorecard.
WHY: Eventarc handles real-time. Airflow handles "what was
supposed to happen yesterday but didn't." It's the
reconciliation layer that catches missed drops.
GOTCHA: Always pass execution_date as input to make the DAG
deterministic and re-runnable.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.cloud_run \
import CloudRunInvokeJobOperator
from airflow.providers.google.cloud.transfers.\
local_to_gcs import LocalFilesystemToGCSOperator
DEFAULT_ARGS = {
"owner": "data-eng",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"depends_on_past": False,
}
def reconcile_drops(**ctx) -> list:
"""Compare expected drops vs actual; return missing list."""
from deployment.reconciler import find_missing_drops
exec_date = ctx["execution_date"].date().isoformat()
return find_missing_drops(exec_date)
def fail_if_quality_drops(**ctx) -> None:
"""Halt the DAG if quality score < 0.85."""
from pipeline.quality_gates import compute_quality_score
score = compute_quality_score(ctx["execution_date"])
if score < 0.85:
raise ValueError(f"Quality below threshold: {score}")
with DAG(
dag_id="ucc_daily",
default_args=DEFAULT_ARGS,
start_date=datetime(2026, 1, 1),
schedule_interval="0 6 * * *", # 06:00 UTC daily
catchup=False,
tags=["ucc", "bronze-silver-gold"],
) as dag:
reconcile = PythonOperator(
task_id="reconcile_drops",
python_callable=reconcile_drops,
)
bronze = CloudRunInvokeJobOperator(
task_id="bronze_ingest",
project_id="PROJECT",
region="us-central1",
job_name="ucc-bronze-job",
)
silver = CloudRunInvokeJobOperator(
task_id="silver_normalize",
project_id="PROJECT",
region="us-central1",
job_name="ucc-silver-job",
)
gold = CloudRunInvokeJobOperator(
task_id="gold_resolve",
project_id="PROJECT",
region="us-central1",
job_name="ucc-gold-job",
)
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=fail_if_quality_drops,
)
load_bq = CloudRunInvokeJobOperator(
task_id="load_bigquery",
project_id="PROJECT",
region="us-central1",
job_name="ucc-bq-load-job",
)
reconcile >> bronze >> silver >> gold >> quality_gate >> load_bq
# deployment/terraform/main.tf — one-shot provisioning
#
# Provisions: GCS buckets, Pub/Sub topic, BigQuery dataset, Cloud
# Run service accounts, Eventarc trigger, Composer environment.
# Run: terraform init && terraform apply -var project=YOUR_PROJECT
terraform {
required_providers {
google = { source = "hashicorp/google", version = "~> 5.30" }
}
}
provider "google" {
project = var.project
region = "us-central1"
}
variable "project" { type = string }
# 1. Inbound GCS bucket (state SOS drops)
resource "google_storage_bucket" "bronze_inbound" {
name = "${var.project}-ucc-bronze-inbound"
location = "US"
uniform_bucket_level_access = true
versioning { enabled = true }
lifecycle_rule {
condition { age = 90 }
action { type = "SetStorageClass"
storage_class = "NEARLINE" }
}
}
# 2. Pub/Sub topic for ingest jobs
resource "google_pubsub_topic" "ingest_jobs" {
name = "ucc-ingest-jobs"
message_retention_duration = "604800s" # 7 days
}
# 3. BigQuery dataset for Gold layer
resource "google_bigquery_dataset" "gold" {
dataset_id = "ucc_gold"
location = "US"
description = "UCC Gold layer: resolved entities + lien profiles"
default_table_expiration_ms = 31536000000 # 1y
}
# 4. Eventarc trigger: GCS finalize -> Cloud Function
resource "google_eventarc_trigger" "filing_drop" {
name = "ucc-filing-drop"
location = "us-central1"
matching_criteria {
attribute = "type"
value = "google.cloud.storage.object.v1.finalized"
}
matching_criteria {
attribute = "bucket"
value = google_storage_bucket.bronze_inbound.name
}
destination {
cloud_run_service {
service = "ucc-orchestrator"
region = "us-central1"
path = "/eventarc/filing-drop"
}
}
}
# 5. Cloud Composer (managed Airflow) for daily orchestration
resource "google_composer_environment" "ucc_daily" {
name = "ucc-daily"
region = "us-central1"
config {
software_config {
image_version = "composer-2-airflow-2"
env_variables = {
GCP_PROJECT = var.project
BQ_DATASET = "ucc_gold"
}
}
}
}
You wired the full GCP production stack: state SOS drops land in gs://*-ucc-bronze-inbound, an Eventarc trigger fires gcs_trigger.py within ~2s, the trigger publishes a Pub/Sub message ordered by state_code, Cloud Run autoscales the orchestrator + worker pods, the daily Composer DAG reconciles missed drops + runs Bronze→Silver→Gold + loads partitioned + clustered BigQuery tables, and Terraform provisions the whole thing in one shot. Local development still works by setting GCP_PROJECT=local — the GCS / Pub/Sub / BigQuery clients fall back to in-memory fakes.
Advanced RAG: Hybrid Search + Jurisdiction-Specific Re-Ranking
The Bronze and Silver layers handle structured fields. The Reporting Agent answers free-text analyst questions like "Has Acme defaulted on UCC-3 amendments in DE in the last 3 years?" — that needs RAG over UCC regulatory docs and state filing guides. Hybrid retrieval combines BM25 keyword matching (catches statute citations like 9-515(d)) with semantic similarity (catches conceptually related text). A jurisdiction-specific re-ranker boosts hits whose state_code matches the query's jurisdiction so Delaware questions get Delaware filing-guide answers first.
"""rag/hybrid_search.py — BM25 + cosine semantic for UCC docs.
WHAT: Retrieves top-K candidate passages from a corpus of UCC
regulatory docs (Article 9), state filing guides, and
collateral classification rules. Combines a BM25-like
keyword score with a cosine semantic score (mock embeddings
via bag-of-words for the capstone; swap to Voyage AI in
production via the EMBEDDER env var).
WHY: Pure semantic search misses literal statute citations
("UCC-1", "9-515(d)") because embeddings encode meaning,
not exact strings. BM25 alone misses paraphrases. Hybrid
catches both.
"""
import math
from collections import Counter
# Mock corpus: UCC regulatory docs + state filing guides
CORPUS = [
{"doc_id": "UCC-9-515",
"state_code": None,
"title": "UCC 9-515 — Continuation",
"text": ("A financing statement is effective for a period "
"of five years after the date of filing. UCC 9-515 "
"permits a continuation statement (UCC-3) within "
"the six months immediately preceding lapse.")},
{"doc_id": "UCC-9-310",
"state_code": None,
"title": "UCC 9-310 — Perfection by Filing",
"text": ("Except as otherwise provided, a financing "
"statement must be filed to perfect a security "
"interest in collateral.")},
{"doc_id": "DE-FILING-GUIDE",
"state_code": "DE",
"title": "Delaware UCC Filing Guide",
"text": ("Delaware Department of State accepts UCC-1 and "
"UCC-3 filings online. Standard fee $25. Searches "
"are debtor-name only. UCC-3 termination releases "
"the secured party's interest immediately.")},
{"doc_id": "NY-FILING-GUIDE",
"state_code": "NY",
"title": "New York UCC Filing Guide",
"text": ("NY Department of State filing system uses fixed-"
"width text format for bulk downloads. Foreign "
"entity numbering uses prefix 'FN-'. Continuation "
"filings must reference original filing number.")},
{"doc_id": "TX-FILING-GUIDE",
"state_code": "TX",
"title": "Texas UCC Filing Guide",
"text": ("Texas SOS publishes daily CSV exports. Debtor "
"names are normalized but suffix variants (LLC vs "
"L.L.C.) are preserved verbatim, requiring "
"client-side normalization.")},
{"doc_id": "COLLATERAL-CLASS",
"state_code": None,
"title": "Collateral Classification Reference",
"text": ("Common categories: inventory, equipment, "
"accounts, instruments, chattel paper, "
"investment property, all-assets blanket lien.")},
]
def _tokenize(text: str) -> list:
return [w.strip(".,;:()") for w in text.lower().split()
if len(w) > 1]
def _bm25(query: list, doc: list, all_docs: list,
k1: float = 1.5, b: float = 0.75) -> float:
"""Standard BM25 score."""
if not doc:
return 0.0
avgdl = sum(len(d) for d in all_docs) / max(len(all_docs), 1)
n = len(all_docs)
counts = Counter(doc)
score = 0.0
for term in query:
df = sum(1 for d in all_docs if term in d)
idf = math.log((n - df + 0.5) / (df + 0.5) + 1)
tf = counts.get(term, 0)
norm = 1 - b + b * len(doc) / avgdl
score += idf * (tf * (k1 + 1)) / (tf + k1 * norm)
return score
def _cosine_bow(a: list, b: list) -> float:
"""Bag-of-words cosine similarity."""
ca, cb = Counter(a), Counter(b)
dot = sum(ca[t] * cb[t] for t in ca)
norm_a = math.sqrt(sum(v * v for v in ca.values()))
norm_b = math.sqrt(sum(v * v for v in cb.values()))
return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0
def search(query: str, top_k: int = 10,
state_code: str | None = None) -> list:
"""Hybrid search returning top_k candidates.
Each result has bm25_score, cosine_score, hybrid_score, and
state_code so the re-ranker can apply jurisdiction boosts.
"""
qtoks = _tokenize(query)
all_doc_toks = [_tokenize(d["text"]) for d in CORPUS]
results = []
for doc, dtoks in zip(CORPUS, all_doc_toks):
bm = _bm25(qtoks, dtoks, all_doc_toks)
cs = _cosine_bow(qtoks, dtoks)
# Hybrid: 0.4 BM25 (normalized) + 0.6 cosine
bm_norm = min(bm / (len(qtoks) + 1), 1.0)
hybrid = 0.4 * bm_norm + 0.6 * cs
if hybrid < 0.01:
continue
results.append({**doc,
"bm25_score": round(bm_norm, 4),
"cosine_score": round(cs, 4),
"hybrid_score": round(hybrid, 4),
"query_state": state_code})
results.sort(key=lambda x: x["hybrid_score"], reverse=True)
return results[:top_k]
"""rag/re_ranker.py — jurisdiction-aware re-ranker (Haiku).
WHAT: Takes top-K hybrid results, asks Haiku to score each one
against the query, applies a +0.25 score boost for results
whose state_code matches the query's jurisdiction (so a DE
question doesn't get a NY-specific answer at top-1).
WHY: Hybrid surface is "topically related". This stage answers
"does this passage actually answer THIS question for THIS
jurisdiction?" Cost: ~$0.003 per re-rank call (Haiku).
GOTCHA: Falls back to hybrid order on Haiku JSON errors. Logs the
fallback so the trace shows quality misses.
"""
import json
import anthropic
_client = anthropic.Anthropic()
RERANK_MODEL = "claude-haiku-4-5-20251001"
JURISDICTION_BOOST = 0.25
def _prompt(query: str, state_code: str | None,
candidates: list) -> str:
blocks = []
for i, c in enumerate(candidates):
sc = c.get("state_code") or "general"
blocks.append(
f"[{i}] (state={sc}) {c['title']}\n"
f" Snippet: {c['text'][:280]}"
)
juris = (f"The user is asking about jurisdiction "
f"'{state_code}'. " if state_code else "")
return (
f"{juris}Score each passage 0.0-1.0 for relevance to "
f"the query. Also return the single most relevant "
f"sentence per passage (contextual compression). "
f"Respond as JSON only.\n\n"
f"Query: {query}\n\n"
f"Candidates:\n" + "\n".join(blocks) + "\n\n"
f"JSON: {{\"rankings\": [{{\"index\": int, \"score\": "
f"float, \"top_sentence\": str}}, ...]}}"
)
def rerank(query: str, candidates: list,
state_code: str | None = None,
top_k: int = 3) -> list:
if not candidates:
return []
try:
resp = _client.messages.create(
model=RERANK_MODEL,
max_tokens=1024,
messages=[{"role": "user",
"content": _prompt(query, state_code,
candidates)}],
)
text = resp.content[0].text
if "```" in text:
text = text.split("```")[1].lstrip("json\n")
rankings = json.loads(text)["rankings"]
except (anthropic.APIError, json.JSONDecodeError, KeyError,
IndexError):
return candidates[:top_k]
enriched = []
for r in rankings:
idx = r["index"]
if 0 <= idx < len(candidates):
c = dict(candidates[idx])
base = float(r["score"])
# Jurisdiction boost: state-specific guides win for
# state-specific questions.
if state_code and c.get("state_code") == state_code:
base = min(base + JURISDICTION_BOOST, 1.0)
c["rerank_score"] = round(base, 4)
c["compressed_snippet"] = r.get("top_sentence",
c.get("text", ""))
enriched.append(c)
enriched.sort(key=lambda x: x["rerank_score"], reverse=True)
return enriched[:top_k]
You implemented the third RAG pillar. Hybrid search returns 10 candidates blending BM25 (catches "9-515" or "UCC-1") with cosine semantic similarity (catches paraphrases). The Haiku re-ranker re-orders them and contextually compresses the top sentence per result. Critically, it applies a +0.25 jurisdiction boost so a Delaware question gets the Delaware filing guide before a generic Article 9 reference. Total added cost per query: ~$0.003 — less than 0.5% of the Opus calls in the orchestrator.
Evaluation Suite: 100 Cases
- Clean filings (30): Standard filings with no ambiguity. All entity resolutions high confidence. Validates baseline accuracy.
- High-confidence entity resolution (15): Name variants that should match with >85% confidence. Tests fuzzy matching and registry verification.
- Low-confidence entity resolution (15): Ambiguous matches that should trigger steward review. Tests HITL routing and confidence calibration.
- Collateral classification (10): Free-text collateral descriptions mapped to UCC categories. Tests Opus's classification accuracy on nuanced legal language.
- Duplicate detection (5): Filings that appear twice in different formats. Tests deduplication logic.
- Malformed records (10): Missing fields, encoding errors, invalid dates. Tests error handling and circuit breaker.
- Edge cases (10): State format changes, vector DB unavailability, concurrent batch conflicts.
- Adversarial (5): Injection attempts in debtor names, deliberate entity obfuscation, 5x max batch size.
Evaluation Runner (test_pipeline.py)
The 100-case suite is generated deterministically and run through the orchestrator. Accuracy targets: clean filings >95%, high-confidence resolution >90%, low-confidence resolution >75% (these go to HITL anyway), collateral classification >85%, malformed records 100% (must fail safely), adversarial 100% (must reject or quarantine).
"""evaluation/build_test_cases.py — deterministic 100-case generator.
WHAT: Produces evaluation/test_cases.json with 100 cases across
8 categories (totals match the breakdown above).
WHY: Deterministic + reproducible. Re-run the suite any time
and the accuracy numbers are comparable.
"""
import json
import random
from pathlib import Path
random.seed(2026)
STATES = ["DE", "NY", "TX", "CA", "MA", "IL", "NV", "WA"]
SUFFIX_VARIANTS = ["LLC", "L.L.C.", "Inc", "Inc.", "Corp",
"Corporation", "Co", "Company"]
COLLATERAL_TYPES = ["all inventory and equipment",
"accounts receivable",
"specific equipment - 2024 Caterpillar D6T",
"all assets of the debtor",
"investment property and securities",
"chattel paper and instruments"]
def _filing(seq: int, debtor: str, state: str, kind: str = "UCC-1",
collateral: str = "general business assets") -> dict:
return {
"filing_number": f"{state}-2026-{seq:06d}",
"filing_type": kind,
"debtor_name": debtor,
"secured_party_name": f"Secured Capital {seq % 50}",
"state_code": state,
"filing_date": f"2026-0{(seq % 9) + 1}-15",
"lapse_date": f"2031-0{(seq % 9) + 1}-15",
"collateral_description": collateral,
}
def generate(out_path: str = "evaluation/test_cases.json") -> dict:
cases = []
seq = 1
# 30 clean filings
for i in range(30):
debtor = f"CleanCo {i:03d} LLC"
state = STATES[i % len(STATES)]
cases.append({"case_id": f"CL-{i+1:03d}",
"category": "clean_filing",
"input": _filing(seq, debtor, state),
"expected": {"resolved": True,
"confidence_min": 0.95,
"hitl_required": False}})
seq += 1
# 15 high-confidence entity resolution (suffix variants)
base = "Acme Logistics"
for i in range(15):
suffix = SUFFIX_VARIANTS[i % len(SUFFIX_VARIANTS)]
state = STATES[i % len(STATES)]
cases.append({"case_id": f"HC-{i+1:03d}",
"category": "entity_resolution_high",
"input": _filing(seq, f"{base} {suffix}", state),
"expected": {"resolved_to": base,
"confidence_min": 0.85,
"hitl_required": False}})
seq += 1
# 15 low-confidence (ambiguous initials, partial overlap)
ambiguous = [("J. Smith Holdings", "John Smith Holdings"),
("ABC Industrial", "A.B.C. Industries"),
("Riverside Trucking", "River Side Trucking Co"),
("MetalWorks Inc", "Metal Works Incorporated"),
("Tri-State Supply", "Tristate Supplies LLC")]
for i in range(15):
candidate, target = ambiguous[i % len(ambiguous)]
state = STATES[i % len(STATES)]
cases.append({"case_id": f"LC-{i+1:03d}",
"category": "entity_resolution_low",
"input": _filing(seq, candidate, state),
"expected": {"candidate_match": target,
"confidence_max": 0.80,
"hitl_required": True}})
seq += 1
# 10 collateral classification
for i in range(10):
coll = COLLATERAL_TYPES[i % len(COLLATERAL_TYPES)]
cases.append({"case_id": f"CC-{i+1:03d}",
"category": "collateral_classification",
"input": _filing(seq, f"BorrowerCo {i}",
"DE",
collateral=coll),
"expected": {"classification_present": True,
"min_categories": 1}})
seq += 1
# 5 duplicate detection
for i in range(5):
dup = _filing(seq, f"DupCo {i}", "NY")
cases.append({"case_id": f"DUP-{i+1:03d}",
"category": "duplicate",
"input": [dup, {**dup,
"filing_number":
dup["filing_number"]
+ "-COPY"}],
"expected": {"deduplicated": True,
"kept": 1}})
seq += 1
# 10 malformed records
malformations = [
{"filing_number": None},
{"debtor_name": ""},
{"state_code": "ZZ"}, # invalid state
{"filing_date": "not-a-date"},
{"lapse_date": "1900-01-01"}, # before filing_date
{"collateral_description": None},
{"filing_type": "UCC-99"},
{"secured_party_name": "x" * 5000}, # huge field
{"filing_number": "███\x00"}, # encoding errors
{"debtor_name": "<script>alert(1)</script>"},
]
for i, mal in enumerate(malformations):
bad = {**_filing(seq, f"BadCo {i}", "TX"), **mal}
cases.append({"case_id": f"MAL-{i+1:03d}",
"category": "malformed",
"input": bad,
"expected": {"quarantined": True,
"error_logged": True}})
seq += 1
# 10 edge cases
edge = [
("state_format_change", "DE",
"format-version=v2 introduces new field"),
("vector_db_unavailable", "NY",
"ChromaDB connection refused"),
("concurrent_batch_conflict", "TX",
"two pipelines update same entity"),
("circuit_breaker_open", "CA",
"Anthropic API returning 503s"),
("memory_quorum_loss", "MA",
"episodic memory stale"),
("partial_batch_failure", "IL", "5/100 records crash"),
("schema_drift", "NV",
"new column not in Bronze schema"),
("rate_limit_exceeded", "WA", "Haiku 429 throttle"),
("downstream_bq_quota", "DE",
"BigQuery streaming quota hit"),
("clock_skew", "NY",
"lapse_date in future relative to wall clock"),
]
for i, (slug, state, desc) in enumerate(edge):
cases.append({"case_id": f"EDGE-{i+1:03d}",
"category": "edge_case",
"input": {**_filing(seq, f"EdgeCo {i}",
state),
"scenario": slug,
"description": desc},
"expected": {"graceful_degradation": True,
"alert_emitted": True}})
seq += 1
# 5 adversarial
adversarial_payloads = [
"Robert'); DROP TABLE filings; --",
"Acme LLC\x00admin",
"<script>leak()</script> Holdings",
("A" * 1000) + " Industries", # oversized
"../../../../etc/passwd",
]
for i, payload in enumerate(adversarial_payloads):
cases.append({"case_id": f"ADV-{i+1:03d}",
"category": "adversarial",
"input": _filing(seq, payload, "DE"),
"expected": {"sanitized": True,
"instruction_followed": False,
"quarantined": True}})
seq += 1
suite = {"total_cases": len(cases), "cases": cases}
Path(out_path).parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(suite, f, indent=2)
return suite
if __name__ == "__main__":
s = generate()
counts: dict = {}
for c in s["cases"]:
counts[c["category"]] = counts.get(c["category"], 0) + 1
print(f"Wrote {s['total_cases']} cases:")
for k, v in sorted(counts.items()):
print(f" {k:<30} {v}")
"""tests/test_pipeline.py — runs the 100-case suite + scores.
WHAT: Loads evaluation/test_cases.json, runs each case through
the orchestrator, scores it against the expected outcome,
prints per-category accuracy, and asserts hard floors.
WHY: This is the production quality gate. CI runs it on every
merge to main; if any category drops below its floor, the
build fails.
GOTCHA: Use pytest fixtures with `scope="session"` to load the
suite once. The orchestrator is expensive to instantiate.
"""
import json
import os
from pathlib import Path
import pytest
from pipeline.orchestrator import Orchestrator
SUITE_PATH = Path("evaluation/test_cases.json")
CATEGORY_FLOORS = {
"clean_filing": 0.95,
"entity_resolution_high": 0.90,
"entity_resolution_low": 0.75,
"collateral_classification": 0.85,
"duplicate": 1.00,
"malformed": 1.00,
"edge_case": 0.80,
"adversarial": 1.00,
}
@pytest.fixture(scope="session")
def suite():
if not SUITE_PATH.exists():
pytest.skip(f"Run build_test_cases.py first")
with open(SUITE_PATH) as f:
return json.load(f)
@pytest.fixture(scope="session")
def orch():
return Orchestrator()
def _score_case(actual: dict, expected: dict) -> float:
"""Return 1.0 if all asserted keys match; partial otherwise."""
if not expected:
return 1.0
hits = 0
total = 0
for key, value in expected.items():
total += 1
if key.endswith("_min"):
real = key[:-4]
if (real in actual
and isinstance(actual[real], (int, float))
and actual[real] >= value):
hits += 1
elif key.endswith("_max"):
real = key[:-4]
if (real in actual
and isinstance(actual[real], (int, float))
and actual[real] <= value):
hits += 1
elif key in actual and actual[key] == value:
hits += 1
return hits / total if total else 0.0
def test_suite_loads(suite):
assert suite["total_cases"] == len(suite["cases"])
assert suite["total_cases"] >= 100
@pytest.mark.parametrize("category", list(CATEGORY_FLOORS.keys()))
def test_category_floor(category, suite, orch):
cases = [c for c in suite["cases"] if c["category"] == category]
if not cases:
pytest.skip(f"no cases for {category}")
passed = 0
for case in cases:
try:
actual = orch.run(case["input"])
score = _score_case(actual, case["expected"])
if score >= 0.9:
passed += 1
except Exception:
pass # counts as fail
accuracy = passed / len(cases)
floor = CATEGORY_FLOORS[category]
assert accuracy >= floor, (
f"{category}: {accuracy:.0%} below floor {floor:.0%} "
f"({passed}/{len(cases)})")
def test_overall_accuracy(suite, orch):
"""Whole-suite accuracy must be >=85%."""
passed = 0
for case in suite["cases"]:
try:
actual = orch.run(case["input"])
if _score_case(actual, case["expected"]) >= 0.9:
passed += 1
except Exception:
pass
overall = passed / len(suite["cases"])
assert overall >= 0.85, f"overall {overall:.0%} < 85%"
Run the harness: python evaluation/build_test_cases.py && pytest tests/test_pipeline.py -v
Expected output (truncated):
tests/test_pipeline.py::test_suite_loads PASSED
tests/test_pipeline.py::test_category_floor[clean_filing] PASSED
tests/test_pipeline.py::test_category_floor[entity_resolution_high] PASSED
tests/test_pipeline.py::test_category_floor[entity_resolution_low] PASSED
tests/test_pipeline.py::test_category_floor[collateral_classification] PASSED
tests/test_pipeline.py::test_category_floor[duplicate] PASSED
tests/test_pipeline.py::test_category_floor[malformed] PASSED
tests/test_pipeline.py::test_category_floor[edge_case] PASSED
tests/test_pipeline.py::test_category_floor[adversarial] PASSED
tests/test_pipeline.py::test_overall_accuracy PASSED
=============== 10 passed in 12.3s ===============
You closed the loop on production quality. build_test_cases.py generates 100 deterministic cases across 8 categories. test_pipeline.py runs each one through the orchestrator and asserts category-specific floors (clean filings >95%, adversarial 100%, malformed 100%). The suite is the gate that prevents regressions: any merge that drops a category below floor fails CI before it touches production.
Testing Guide
| Type | Scenario | Expected Behavior |
|---|---|---|
| Happy | 100 clean filings batch | < 30s, all resolved, Haiku handles 70%, cost < $1.00 |
| Happy | Episodic memory boosts entity confidence | Similar past case found, confidence raised above 80%, no steward needed |
| Happy | Procedural rule fires on matching registered agent | Entity resolved with zero LLM cost, deterministic rule match |
| Happy | Steward correction stored as new episode | Episode persisted, future similar cases will benefit |
| Happy | Full pipeline traced end-to-end | Dashboard shows all 8 panels with real-time metrics |
| Edge | 15% malformed records in batch | Circuit breaker trips at 10%, quarantines batch, processes clean records separately |
| Edge | ChromaDB unavailable | Episodic memory degrades to keyword matching, procedural rules still fire |
| Edge | State format changes mid-batch | Ingestion agent detects schema mismatch, flags for engineering |
| Adversarial | Shell company entity obfuscation | System detects unusual patterns, flags for investigation |
| Adversarial | 50,000 filing batch (5x max) | System partitions into sub-batches, processes sequentially |
Compliance & Regulatory Notes
PII handling: Filing data may contain individual names and addresses. The Reporting Agent must redact PII before generating public-facing reports. Episodic memory must not store PII — store entity IDs and resolution patterns, not raw personal data.
FCRA: If the platform's output feeds credit decisions, accuracy requirements under the Fair Credit Reporting Act apply. The 100-case evaluation suite and quality scorecards are compliance artifacts — keep them auditable.
Model audit trail: Every model routing decision, entity resolution, and steward override must be logged with timestamps. This enables regulatory audits and dispute resolution. The tracing infrastructure serves double duty: debugging AND compliance.
Data retention: Episodic memory retains resolution decisions for 730 days (2 years). Align with your organization's data retention policy and applicable state regulations.
Verify Everything Works
Run the complete production pipeline end-to-end with a single command:
python pipeline/orchestrator.py
Expected final output:
============================================================
PRODUCTION UCC PIPELINE — RUN COMPLETE
============================================================
Status: complete
Total filings: 15
Elapsed: 42.31ms
[BRONZE] {"status": "complete", "valid": 10, "quarantined": 5}
[SILVER] {"records": 10}
[GOLD] {"entities": 8, "cost": 0.0036}
Tracing: {"total": 3, "ok": 3, "errors": 0, "avg_duration_ms": 14.1}
- Bronze layer: 15 filings ingested, 10 valid (67%), 5 quarantined (33%)
- Silver layer: 10 records normalized with standardized dates, names, and filing types
- Gold layer: 8 unique entities resolved, risk profiles generated
- Tracing: 3 pipeline stages traced, 0 errors, average 14ms per stage
- Cost: $0.0036 total for 15 filings = $0.00024 per filing (well under $0.02 target)
- Circuit breaker: Did not trip (error rate below 10% threshold for batch size)
You have built a production-grade UCC data intelligence platform with all six production pillars: model routing, multi-layer memory, data quality gates, circuit breakers, distributed tracing, and a Medallion Architecture pipeline. The pipeline processes raw filings through Bronze (validation) → Silver (normalization) → Gold (entity resolution + risk profiles) with full observability at every stage.
To take this further, connect real Anthropic API calls (replace the mock resolution in gold_layer.py), deploy with Docker using the Dockerfile in the Deployment section, and build the 100-case evaluation harness to prove production readiness.
Troubleshooting Guide
ModuleNotFoundError: No module named 'anthropic'
Your virtual environment is not activated. Run source venv/bin/activate (macOS/Linux) or venv\Scripts\activate (Windows), then pip install anthropic.
ModuleNotFoundError: No module named 'pipeline'
Python cannot find the package. Create __init__.py files in every subdirectory:
touch pipeline/__init__.py memory/__init__.py observability/__init__.py guardrails/__init__.py routing/__init__.py
AuthenticationError: Invalid API key
Your ANTHROPIC_API_KEY environment variable is not set or is incorrect. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify. The key should start with sk-ant-.
ImportError: cannot import name 'CircuitBreaker'
The file guardrails/circuit_breaker.py is missing or has a syntax error. Ensure you completed Step 4 and saved the file.
Episodic memory returns 0 results for known entities
The mock embedding uses character frequency vectors which have lower resolution than real embeddings. Try lowering EPISODIC_SIMILARITY_THRESHOLD in config.py from 0.80 to 0.70.
Circuit breaker trips unexpectedly on small batches
The default CIRCUIT_BREAKER_WINDOW_SIZE is 100. If your batch is smaller than the window, the breaker only checks after all records are processed. For testing with small batches, set window_size=5.
Gold layer creates duplicate entities for the same debtor
This means episodic memory is not matching the name variants. Check that you are seeding episodic memory before running the gold layer, or lower the similarity threshold.
Docker build fails with "pip install" errors
Create a requirements.txt file with: anthropic chromadb pydantic fastapi uvicorn celery redis httpx pytest (one per line). Ensure it is in the project root directory.
Agent SDK Port [OPTIONAL STRETCH]
The pipeline you built uses a manual tool-use loop — messages.create, stop_reason == "tool_use", accumulating tool_result blocks, and short-circuiting on end_turn. The Claude Agent SDK abstracts that loop. This stretch goal ports the Bronze→Silver transformation agent (the LLM-driven format normalizer, not the deterministic schema validator) so you can compare LOC and behavior against the manual implementation in your own code.
The SDK trades fine-grained control for less code. Use it when your tools fit the MCP shape, async execution is acceptable, and the circuit breaker / retry logic can wrap the whole query() call rather than every iteration. Keep the manual loop when you need synchronous flow, mid-loop guardrails (e.g. PII redaction checked between tool calls), or per-tool retry semantics on flaky GCS / BigQuery operations.
Install
pip install "claude-agent-sdk>=0.1.0" anyio
Port the Bronze→Silver Transformation Agent
Create sdk_port.py. The same transformation tools (detect_state_format, parse_filing_record, resolve_entity, flag_quality_issue, write_silver_record) are wrapped with @tool and bundled into an in-process MCP server.
"""sdk_port.py — Port the Bronze→Silver transformation agent to the Agent SDK.
The manual tool-use loop is ~80 lines per agent. The SDK absorbs the loop,
MCP-shaped tool dispatch, and message accumulation. The circuit breaker
and tracer become wrappers around query() instead of hooks inside the loop.
"""
import anyio
import json
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
AssistantMessage,
tool,
create_sdk_mcp_server,
)
from agents.transform_tools import (
detect_state_format,
parse_filing_record,
resolve_entity,
flag_quality_issue,
write_silver_record,
)
from routing.model_router import ModelRouter
from observability.tracer import Tracer
from guardrails.circuit_breaker import CircuitBreaker
# 1. Wrap each tool function with @tool. The schema mirrors the manual
# tool definition; the return shape follows the MCP standard
# ({"content": [{"type": "text", "text": ...}]}).
@tool(
"detect_state_format",
"Detect which state's UCC filing format this raw record matches.",
{"raw_record": str, "source_state_hint": str},
)
async def sdk_detect(args):
result = detect_state_format(**args)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
@tool(
"parse_filing_record",
"Parse a raw UCC filing into the canonical Silver schema.",
{"raw_record": str, "format": str},
)
async def sdk_parse(args):
result = parse_filing_record(**args)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
@tool(
"resolve_entity",
"Resolve debtor/secured-party names against episodic memory of past entities.",
{"name": str, "address": str, "tax_id": str},
)
async def sdk_resolve(args):
result = resolve_entity(**args)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
@tool(
"flag_quality_issue",
"Flag a quality issue (missing field, malformed value, suspicious duplicate).",
{"filing_id": str, "issue_type": str, "severity": str, "details": dict},
)
async def sdk_flag(args):
result = flag_quality_issue(**args)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
@tool(
"write_silver_record",
"Persist the canonical record to the Silver layer (BigQuery).",
{"silver_record": dict, "lineage": dict},
)
async def sdk_write(args):
result = write_silver_record(**args)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
# 2. Bundle tools into an in-process MCP server. No subprocess — runs in
# the same Python process as the agent.
transform_server = create_sdk_mcp_server(
name="transform_tools",
version="1.0.0",
tools=[sdk_detect, sdk_parse, sdk_resolve, sdk_flag, sdk_write],
)
# 3. Run the agent. The SDK handles the tool-use loop, stop_reason
# checks, and message threading. We only handle: routing, tracing,
# circuit breaker, and the prompt itself.
async def run_transform_agent_sdk(
raw_record: str,
state_hint: str,
router: ModelRouter,
tracer: Tracer,
breaker: CircuitBreaker,
) -> str:
if breaker.is_tripped():
raise RuntimeError("Circuit breaker tripped — refusing API call.")
# Routine known formats go to Haiku; unknown formats escalate to Sonnet.
model = router.select(
task_type="format_normalization",
complexity_score=0.3 if state_hint else 0.7,
)
options = ClaudeAgentOptions(
system_prompt=(
"You are the Bronze→Silver transformation agent. For each raw "
"filing: 1) detect the state format, 2) parse into the canonical "
"schema, 3) resolve debtor/secured-party entities against memory, "
"4) flag any quality issues, 5) write the Silver record. "
"Refuse to write if any required field (filing_number, debtor.name, "
"secured_party.name, state, filing_date) is missing — flag instead."
),
mcp_servers={"transform": transform_server},
allowed_tools=[
"mcp__transform__detect_state_format",
"mcp__transform__parse_filing_record",
"mcp__transform__resolve_entity",
"mcp__transform__flag_quality_issue",
"mcp__transform__write_silver_record",
],
max_turns=12,
model=model,
)
final_text = ""
prompt = (
f"Transform this raw UCC filing record into the Silver schema. "
f"State hint: {state_hint or 'unknown'}.\n\nRecord:\n{raw_record}"
)
with tracer.span("transform_agent.sdk"):
try:
async for msg in query(prompt=prompt, options=options):
if isinstance(msg, AssistantMessage):
for block in msg.content:
if hasattr(block, "text"):
final_text = block.text
breaker.record_success()
except Exception:
breaker.record_failure()
raise
return final_text
if __name__ == "__main__":
# anyio.run wires the async function into a sync entry point so this
# file can be invoked the same way as the manual pipeline.
sample = '{"filing_no": "2024-0098765", "debtor_nm": "ACME LOGISTICS LLC"}'
out = anyio.run(
run_transform_agent_sdk,
sample,
"DE",
ModelRouter(),
Tracer(),
CircuitBreaker(threshold=3),
)
print(out)
LOC & Behavior Comparison
| Aspect | Manual tool-use loop | Agent SDK (this stretch) |
|---|---|---|
| Lines per agent | ~80 | ~30 (excluding tool wrappers) |
| Tool dispatch | You write the if block.name == ... dispatcher | SDK routes by MCP tool name |
stop_reason / tool-use loop | You implement | SDK handles |
| Execution model | Sync (or your choice) | Async only |
| Circuit breaker hook | Inline, per iteration | Wrapper, per query() call |
| Tool result format | Plain dict you marshal | MCP {"content": [...]} |
| Mid-loop retry on tool failure | Easy — you control the loop | Harder — wrap the whole query |
| Visibility into raw messages | Full messages array | AssistantMessage event stream |
| Streaming partial responses | Set stream=True on messages.create | Native (the async for) |
| BigQuery / GCS retry on transient failure | Easy — retry the single tool result | Wrap each tool with retry decorator |
Use the SDK when tools fit the MCP shape, the circuit breaker can guard the whole query, and async execution is acceptable. Keep the manual loop when you need synchronous control flow, per-tool-call retries (important here because BigQuery and GCS calls are flakier than Claude API calls), mid-loop PII redaction guardrails, or full audit-log visibility into the raw messages array. The Bronze→Silver pipeline keeps the manual loop because each filing is processed in a Cloud Run worker that already has Python-thread parallelism and synchronous tracing — introducing async would force a rewrite of the worker harness, and the LOC savings (~50 lines per agent) don't justify it for a five-tool agent.
Knowledge Check
Test your understanding of the production UCC platform. Select the best answer for each question.
Q1: Which Claude model should handle format validation tasks, and why?
Q2: What is procedural memory in this UCC pipeline?
Q3: In the Medallion Architecture, what does the Gold layer contain?
Q4: The pipeline processes a batch from a new state with an unknown CSV format. Which model handles format detection?
Q5: Why does the evaluation harness include "malformed CSV injection" as an adversarial test?
References & Resources
- Claude Tool Use Documentation — Multi-tool orchestration
- Claude Model Overview — Model routing and pricing
- Prompt Caching — Cache system prompts for cost savings
- Message Batches API — Batch processing at 50% cost
- Anthropic Cookbook — Production agent patterns
- UCC Article 9 (Cornell Law) — Legal reference