Capstone 6 — Parallel State Testing Agent
Build a coordinator agent that spawns 50 parallel subagents to validate the Bronze Canonical Load across all US states — checking record counts, date normalization, duplicate detection, and schema compliance in one automated sweep.
Project Brief
Before standardized building codes, every US city had its own rules for wiring, plumbing, and structural loads. A contractor who built safely in New York might install something dangerous in Miami — not because they were careless, but because each jurisdiction defined "correct" differently. Inspectors had to know every local code by heart, and a single missed rule could mean a condemned building.
The pain is the same with UCC filing data. Every quarter, 50 state Secretary of State offices publish bulk filing data — but New York sends XML, California sends pipe-delimited CSV, Texas sends fixed-width text, Florida sends JSON, and so on. There is no federal standard. Your Canonical LoaderA data pipeline tool that reads each state's file format, applies state-specific transformation rules, and loads records into a single Bronze table with a universal schema. reads each state's file and normalizes it into one table — but how do you know every transformation was correct?
This capstone is the "building inspector" for your data pipeline. You will build an agent that validates all 50 states in parallel — one subagent per state, each running 12 checks, all at the same time. Instead of manually spot-checking a few states and hoping the rest are fine, your agent systematically tests every single one and produces a dashboard showing exactly what passed and what broke.
- Coordinator Agent — reads the load manifest, spawns 50 parallel state tester subagents, collects results, and generates a dashboard
- State Tester Subagent — one per state; reads the source file, queries the Bronze table, runs 12 validation checks, returns a structured pass/fail report
- 5-Format Parser — auto-detects and parses XML, pipe-CSV, comma-CSV, fixed-width, and JSON source files
- 12 Validation Checks — record counts, filing number integrity, duplicates, date normalization, status normalization, name normalization, null checks, and spot checks
- 3 Load Scenarios — full seed, incremental, and change detection — each with scenario-specific checks
- Tier 1 Local Deployment — Docker Compose with DuckDB, file watcher, and HTML dashboard
In production data pipelines, loading data is only half the job. Validating the load is what prevents bad data from reaching downstream consumers. At scale, a single undetected date format issue in Georgia's data (DD/MM/YYYY instead of YYYY-MM-DD) can cause thousands of lien risk calculations to be wrong. Manual validation of 50 states takes a data engineer 2–3 days. This agent does it in under 20 seconds, catches errors humans would miss, and runs automatically after every load.
Prerequisites
- M05 — Tool Use: How agents call tools (the state tester is a tool-calling agent)
- M06 — Multi-Tool Orchestration: Agents calling multiple tools in sequence
- M12 — ReAct Agents: The think→act→observe loop used by each state tester
- M13 — Planning & Decomposition: How the coordinator breaks a big task into 50 parallel subtasks
- M14 — Multi-Agent Systems: Coordinator + subagent pattern
- M15B — Build Complete Agent: Hands-on experience building a multi-agent system
- M16–M17 — Guardrails: Input validation and error handling patterns
- Python 3.10+ and Docker Desktop installed
"Testing data pipelines is just SELECT COUNT(*)" — Count matching is check #2 of 12. Counts can match perfectly while dates are wrong, names are unnormalized, duplicates exist, and required fields are null. You need structural, content, and referential integrity checks.
"50 agents means 50x the API cost" — The base capstone runs zero API calls per state (all 12 checks are deterministic Python), so 50 states costs $0. The optional Claude-driven failure-analysis stretch goal would add ~2–3 Haiku calls per failed state (~$0.003 each); even at 100% failure rate that's under $0.50 for the entire 50-state run. The parallelism saves time (20 seconds vs 2 days manual) at near-zero cost.
"I should test the Canonical Loader itself" — No. Your agent tests the OUTPUT of the loader — the Bronze table — not the loader code. This is black-box testing: you compare source files to what ended up in the table, regardless of how the loader works internally.
Architecture Diagram
Before writing any code, study the system blueprint. The coordinator agent is the brain; each state tester is a specialist that knows how to validate one state's data. The coordinator doesn't know how to parse XML or check dates — it knows how to delegate and aggregate.
Animation 1: The 50-State Problem
Each US state uses its own file format for UCC filing data. This animation shows the format distribution across all 50 states and how they all converge into one canonical Bronze table. Five colors, five formats, one target schema.
50 formats → 1 schema. Your agent validates every transformation.
Animation 2: Parallel Agent Swarm
Watch the coordinator spawn 50 subagents simultaneously. They don't run one-by-one — they all burst outward at once, each validating its assigned state. Results stream back asynchronously: small states finish faster, large states like California take longer. Some turn green (pass), some red (fail), one yellow (warning).
Animation 3: Single State Validation Flow
Zoom into one state tester — New York. It reads the source XML file, queries the Bronze table for NY records, then runs all 12 checks in sequence. Watch each check gate turn green (pass) or red (fail). NY has a known issue: 2 records where the lapse date is before the filing date, so check DT-02 will fail.
Animation 4: Format Parsing Comparison
The same UCC filing — filing number NY-2024-0847, debtor ACME CORPORATION, secured party FIRST NATIONAL BANK — looks completely different in each of the 5 source formats. Click each tab to see how the parser extracts the same canonical fields from wildly different structures.
Target: filing_number filing_date debtor_name secured_party status
Animation 5: The 12 Validation Checks
Click any card to expand its details — what it checks, a PASS example, a FAIL example, and which states trigger failures in the mock data. Cards with a red border are checks that have known failures baked into the test data.
Animation 6: Dashboard Assembly
Watch 50 individual state results fly in and sort themselves into three columns: pass (green), fail (red), and warning (yellow). The counter tallies up the final score.
Animation 7: Error Scenario Gallery
The mock data includes deliberate errors. Click each tab to see the problematic file, the check that catches it, and the agent's structured error response.
File Structure
Here is every file you will create. Click colors indicate the file's role: agents, tools, mock data, config.
The Bronze Canonical Schema
Every state's data normalizes into this ONE table. Regardless of whether the source was XML from New York or fixed-width from Texas, every record ends up with the same 15 fields. This is the contract your agent validates.
{
"filing_number": "string — unique within state (e.g., NY-2024-0847)",
"source_state": "string — 2-letter code (e.g., NY)",
"filing_type": "string — UCC1 | UCC3_AMENDMENT | UCC3_CONTINUATION | UCC3_TERMINATION",
"filing_date": "date — YYYY-MM-DD",
"lapse_date": "date — YYYY-MM-DD, nullable",
"status": "string — ACTIVE | TERMINATED | LAPSED",
"debtor_name": "string — uppercase, trimmed",
"debtor_address": "string",
"debtor_org_type": "string — CORPORATION | LLC | PARTNERSHIP | INDIVIDUAL | UNKNOWN",
"secured_party_name": "string — uppercase, trimmed",
"secured_party_address":"string",
"collateral_description":"string",
"source_file": "string — original filename",
"load_id": "string — batch identifier (e.g., LOAD-2024Q4-SEED)",
"load_timestamp": "timestamp"
}
CREATE TABLE bronze_filings (
filing_number VARCHAR NOT NULL,
source_state VARCHAR(2) NOT NULL,
filing_type VARCHAR NOT NULL,
filing_date DATE NOT NULL,
lapse_date DATE,
status VARCHAR NOT NULL,
debtor_name VARCHAR NOT NULL,
debtor_address VARCHAR,
debtor_org_type VARCHAR,
secured_party_name VARCHAR NOT NULL,
secured_party_address VARCHAR,
collateral_description VARCHAR,
source_file VARCHAR NOT NULL,
load_id VARCHAR NOT NULL,
load_timestamp TIMESTAMP NOT NULL
);
Step-by-Step Build Guide
What You'll Build: A coordinator agent that spawns 50 parallel state tester subagents to validate Bronze canonical load data across all US states.
Time Estimate: 4–6 hours across 2–3 sessions
Prerequisites: Python 3.10+, Docker Desktop, Anthropic API key
Files You'll Create: 15+ files (see File Structure above)
What & Why: Create the project directory structure and install dependencies. We use anthropic for the Claude API, pydantic for structured validation results, and rich for terminal dashboard formatting.
Create your project folder and install dependencies:
mkdir -p capstone-6-bronze-testing/{tools,mock_data/source_files}
cd capstone-6-bronze-testing
python -m venv venv && source venv/bin/activate
pip install anthropic pydantic rich duckdb fastapi uvicorn httpx
export ANTHROPIC_API_KEY=your-key-heremkdir capstone-6-bronze-testing\tools
mkdir capstone-6-bronze-testing\mock_data\source_files
cd capstone-6-bronze-testing
python -m venv venv && venv\Scripts\activate
pip install anthropic pydantic rich duckdb fastapi uvicorn httpx
set ANTHROPIC_API_KEY=your-key-hereRun: python -c "import anthropic; print('OK')"
Expected output:
OKOK, your environment is ready. If not, check that you activated the virtual environment and that pip install completed without errors.ModuleNotFoundError: No module named 'anthropic'→ Runpip install anthropicagain, make sure venv is activatedcommand not found: python→ Trypython3instead, or install Python 3.10+
What & Why: The state registry is the single source of truth — it maps every US state to its file format type and expected record count. When the coordinator spawns 50 testers, it reads this registry to know what format parser each tester should use. Without it, the testers would have to guess the format from the file extension, which is error-prone.
Create a new file called config.py:
"""50-state format registry for Bronze canonical validation."""
from dataclasses import dataclass
@dataclass
class StateConfig:
code: str
name: str
format_type: str # xml | pipe_csv | comma_csv | fixed_width | json
expected_records: int # expected count for Q4 2024 seed load
STATES: dict[str, StateConfig] = {
# XML states (15)
"NY": StateConfig("NY", "New York", "xml", 847),
"OH": StateConfig("OH", "Ohio", "xml", 423),
"PA": StateConfig("PA", "Pennsylvania", "xml", 612),
"MI": StateConfig("MI", "Michigan", "xml", 389),
"NJ": StateConfig("NJ", "New Jersey", "xml", 501),
"VA": StateConfig("VA", "Virginia", "xml", 445),
"MA": StateConfig("MA", "Massachusetts", "xml", 367),
"WA": StateConfig("WA", "Washington", "xml", 298),
"NC": StateConfig("NC", "North Carolina", "xml", 512),
"MD": StateConfig("MD", "Maryland", "xml", 276),
"WI": StateConfig("WI", "Wisconsin", "xml", 198),
"SC": StateConfig("SC", "South Carolina", "xml", 167),
"MN": StateConfig("MN", "Minnesota", "xml", 234),
"OR": StateConfig("OR", "Oregon", "xml", 189),
"CT": StateConfig("CT", "Connecticut", "xml", 156),
# Pipe-delimited CSV states (15)
"CA": StateConfig("CA", "California", "pipe_csv", 3421),
"IL": StateConfig("IL", "Illinois", "pipe_csv", 756),
"GA": StateConfig("GA", "Georgia", "pipe_csv", 987),
"CO": StateConfig("CO", "Colorado", "pipe_csv", 345),
"AZ": StateConfig("AZ", "Arizona", "pipe_csv", 289),
"MO": StateConfig("MO", "Missouri", "pipe_csv", 312),
"IN": StateConfig("IN", "Indiana", "pipe_csv", 267),
"TN": StateConfig("TN", "Tennessee", "pipe_csv", 298),
"LA": StateConfig("LA", "Louisiana", "pipe_csv", 234),
"KY": StateConfig("KY", "Kentucky", "pipe_csv", 201),
"AL": StateConfig("AL", "Alabama", "pipe_csv", 178),
"MS": StateConfig("MS", "Mississippi", "pipe_csv", 145),
"AR": StateConfig("AR", "Arkansas", "pipe_csv", 123),
"OK": StateConfig("OK", "Oklahoma", "pipe_csv", 167),
"IA": StateConfig("IA", "Iowa", "pipe_csv", 156),
# Comma CSV with header (15)
"DE": StateConfig("DE", "Delaware", "comma_csv", 890),
"NH": StateConfig("NH", "New Hampshire", "comma_csv", 98),
"VT": StateConfig("VT", "Vermont", "comma_csv", 67),
"ME": StateConfig("ME", "Maine", "comma_csv", 89),
"RI": StateConfig("RI", "Rhode Island", "comma_csv", 76),
"HI": StateConfig("HI", "Hawaii", "comma_csv", 54),
"AK": StateConfig("AK", "Alaska", "comma_csv", 43),
"MT": StateConfig("MT", "Montana", "comma_csv", 78),
"WY": StateConfig("WY", "Wyoming", "comma_csv", 34),
"SD": StateConfig("SD", "South Dakota", "comma_csv", 45),
"ND": StateConfig("ND", "North Dakota", "comma_csv", 56),
"NE": StateConfig("NE", "Nebraska", "comma_csv", 112),
"ID": StateConfig("ID", "Idaho", "comma_csv", 87),
"NM": StateConfig("NM", "New Mexico", "comma_csv", 98),
"WV": StateConfig("WV", "West Virginia", "comma_csv", 67),
# Fixed-width states (4)
"TX": StateConfig("TX", "Texas", "fixed_width", 1205),
"FL": StateConfig("FL", "Florida", "fixed_width", 978),
"KS": StateConfig("KS", "Kansas", "fixed_width", 156),
"UT": StateConfig("UT", "Utah", "fixed_width", 134),
# JSON state(s)
"NV": StateConfig("NV", "Nevada", "json", 543),
}
# Derived constants
TOTAL_EXPECTED_RECORDS = sum(s.expected_records for s in STATES.values())
FORMAT_GROUPS = {}
for s in STATES.values():
FORMAT_GROUPS.setdefault(s.format_type, []).append(s.code)Run: python -c "from config import STATES, TOTAL_EXPECTED_RECORDS; print(f'{len(STATES)} states, {TOTAL_EXPECTED_RECORDS} expected records')"
Expected output:
50 states, 18403 expected recordsWhat & Why: We need realistic source files in all 5 formats so the parser and validators have something to test against. We'll generate 9 source files: one per format type (5 clean representative files: NY/CA/DE/TX/NV), GA with bad-date Bronze records, plus 3 error files (truncated, bad encoding, empty). This step creates a Python script that generates the mock data — run it once and you have all your test files.
This step depends on Step 2 (the state registry). If you skipped Step 2, go back and create config.py first.
Create a new file called generate_mock_data.py:
"""Generate mock source files and Bronze table data for testing."""
import json, os, random, string
from datetime import datetime, timedelta
from config import STATES
random.seed(42) # Reproducible mock data
OUT = "mock_data"
DEBTOR_NAMES = [
"ACME CORPORATION", "GLOBEX INDUSTRIES", "INITECH LLC",
"STARK ENTERPRISES", "WAYNE INDUSTRIES", "UMBRELLA CORP",
"CYBERDYNE SYSTEMS", "OSCORP TECHNOLOGIES", "LEXCORP",
"MASSIVE DYNAMIC", "TYRELL CORPORATION", "SOYLENT CORP",
"WEYLAND-YUTANI", "APERTURE SCIENCE", "BLACK MESA",
]
SECURED_PARTIES = [
"FIRST NATIONAL BANK", "JPMORGAN CHASE", "WELLS FARGO",
"BANK OF AMERICA", "CITIBANK NA", "US BANCORP",
"PNC FINANCIAL", "TRUIST FINANCIAL", "TD BANK",
]
COLLATERAL = [
"All inventory and accounts receivable",
"Equipment, machinery, and fixtures",
"All assets of the debtor",
"Accounts, chattel paper, and general intangibles",
"Motor vehicles and titled goods",
]
ORG_TYPES = ["CORPORATION", "LLC", "PARTNERSHIP", "INDIVIDUAL"]
def rand_date(year=2024):
d = datetime(year, 1, 1) + timedelta(days=random.randint(0, 364))
return d.strftime("%Y-%m-%d")
def rand_lapse(filing_date_str):
fd = datetime.strptime(filing_date_str, "%Y-%m-%d")
return (fd + timedelta(days=5*365)).strftime("%Y-%m-%d")
def make_record(state, idx):
fn = f"{state}-2024-{idx:04d}"
fd = rand_date()
return {
"filing_number": fn,
"source_state": state,
"filing_type": random.choice(["UCC1", "UCC1", "UCC1", "UCC3_AMENDMENT"]),
"filing_date": fd,
"lapse_date": rand_lapse(fd),
"status": "ACTIVE",
"debtor_name": random.choice(DEBTOR_NAMES),
"debtor_address": f"{random.randint(1,999)} Main St, Anytown, {state}",
"debtor_org_type": random.choice(ORG_TYPES),
"secured_party_name": random.choice(SECURED_PARTIES),
"secured_party_address": f"{random.randint(1,999)} Bank Ave, Finance City, {state}",
"collateral_description": random.choice(COLLATERAL),
"source_file": "",
"load_id": "LOAD-2024Q4-SEED",
"load_timestamp": "2024-12-15T08:00:00Z",
}
def write_xml(state, records, path):
lines = ['<?xml version="1.0" encoding="UTF-8"?>', '<Filings xmlns="urn:sos:ucc">']
for r in records:
lines.append(" <Filing>")
lines.append(f" <FileNumber>{r['filing_number']}</FileNumber>")
lines.append(f" <Type>{r['filing_type']}</Type>")
lines.append(f" <FilingDate>{r['filing_date']}</FilingDate>")
lines.append(f" <LapseDate>{r['lapse_date']}</LapseDate>")
lines.append(f" <Status>{r['status']}</Status>")
lines.append(f" <Debtor><Name>{r['debtor_name']}</Name></Debtor>")
lines.append(f" <SecuredParty><Name>{r['secured_party_name']}</Name></SecuredParty>")
lines.append(f" <Collateral>{r['collateral_description']}</Collateral>")
lines.append(" </Filing>")
lines.append("</Filings>")
with open(path, "w") as f:
f.write("\n".join(lines))
def write_pipe_csv(state, records, path):
lines = []
for r in records:
md = r["filing_date"].replace("-", "/") # MM style for some states
lines.append("|".join([
r["filing_number"], r["filing_type"], r["filing_date"],
r["status"][0], r["debtor_name"], r["debtor_address"],
r["debtor_org_type"][:4], r["secured_party_name"],
r["secured_party_address"], r["collateral_description"],
]))
with open(path, "w") as f:
f.write("\n".join(lines))
def write_comma_csv(state, records, path):
header = "filing_number,type,filing_date,lapse_date,status,debtor_name,debtor_addr,org_type,sp_name,sp_addr,collateral"
lines = [header]
for r in records:
row = ",".join([
r["filing_number"], r["filing_type"], r["filing_date"],
r["lapse_date"] or "", r["status"],
f'"{r["debtor_name"]}"', f'"{r["debtor_address"]}"',
r["debtor_org_type"], f'"{r["secured_party_name"]}"',
f'"{r["secured_party_address"]}"', f'"{r["collateral_description"]}"',
])
lines.append(row)
with open(path, "w") as f:
f.write("\n".join(lines))
def write_fixed_width(state, records, path):
lines = []
for r in records:
line = (
r["filing_number"].ljust(15)
+ r["filing_type"].ljust(20)
+ r["filing_date"].replace("-", "").ljust(10)
+ r["status"][0]
+ r["debtor_name"].ljust(40)
+ r["secured_party_name"].ljust(40)
)
lines.append(line)
with open(path, "w") as f:
f.write("\n".join(lines))
def write_json_format(state, records, path):
entries = []
for r in records:
entries.append({
"fileNum": r["filing_number"],
"filingType": r["filing_type"],
"dateFiled": r["filing_date"] + "T00:00:00Z",
"currentStatus": r["status"].lower(),
"parties": {
"debtor": {"legalName": r["debtor_name"]},
"securedParty": {"legalName": r["secured_party_name"]},
},
"collateral": {"description": r["collateral_description"]},
})
with open(path, "w") as f:
json.dump(entries, f, indent=2)
WRITERS = {
"xml": ("xml", write_xml),
"pipe_csv": ("csv", write_pipe_csv),
"comma_csv": ("csv", write_comma_csv),
"fixed_width": ("dat", write_fixed_width),
"json": ("json", write_json_format),
}
def generate():
os.makedirs(f"{OUT}/source_files", exist_ok=True)
all_bronze = []
# Pick one representative state per format for source files
reps = {"xml": "NY", "pipe_csv": "CA", "comma_csv": "DE",
"fixed_width": "TX", "json": "NV"}
for state_code, cfg in STATES.items():
n = min(cfg.expected_records, 50) # Cap at 50 for mock data
records = [make_record(state_code, i+1) for i in range(n)]
ext, writer = WRITERS[cfg.format_type]
fname = f"{state_code}_2024_Q4.{ext}"
# Only write source files for representative + error states
if state_code in reps.values() or state_code == "GA":
writer(state_code, records, f"{OUT}/source_files/{fname}")
# Add to Bronze table
for r in records:
r["source_file"] = fname
all_bronze.append(r)
# === Error cases ===
# GA: inject bad date format in Bronze (DD/MM/YYYY not converted)
for r in all_bronze:
if r["source_state"] == "GA":
parts = r["filing_date"].split("-")
r["filing_date"] = f"{parts[2]}/{parts[1]}/{parts[0]}" # DD/MM/YYYY
# NY: inject 2 records with lapse before filing
# Use 2020-01-01 — guaranteed before any 2024 filing_date the
# generator could ever produce. "2024-01-01" risked colliding
# with a randomly-generated filing_date of the same day.
ny_recs = [r for r in all_bronze if r["source_state"] == "NY"]
if len(ny_recs) >= 2:
ny_recs[0]["lapse_date"] = "2020-01-01"
ny_recs[1]["lapse_date"] = "2020-01-01"
# NV: inject 3 duplicates into BOTH Bronze and the source file
# so CNT-01 stays PASS (counts match) and DUP-01 becomes WARN
# (the duplicates are a Bronze-level data quality issue, not a count mismatch).
nv_recs = [r for r in all_bronze if r["source_state"] == "NV"]
if len(nv_recs) >= 3:
for i in range(3):
dup = dict(nv_recs[i])
all_bronze.append(dup)
# Re-write NV source file with the duplicated records included
nv_full = [r for r in all_bronze if r["source_state"] == "NV"]
write_json_format("NV", nv_full, f"{OUT}/source_files/NV_2024_Q4.json")
# Write Bronze table
with open(f"{OUT}/bronze_table.json", "w") as f:
json.dump(all_bronze, f, indent=2)
# Write error source files
with open(f"{OUT}/source_files/TX_BAD_truncated.dat", "w") as f:
f.write("TX-2024-0001 UCC1 20240315A" + "X"*40 + "\n")
f.write("TX-2024-0002 UCC1 20240420A" + "X"*40 + "\n")
f.write("TX-2024-0003 UCC1 05") # Truncated
with open(f"{OUT}/source_files/FL_BAD_encoding.json", "wb") as f:
f.write(b'\xff\xfe' + '{"fileNum": "FL-BAD"}'.encode('utf-16-le'))
with open(f"{OUT}/source_files/EMPTY_STATE.csv", "w") as f:
f.write("")
# Write load manifest
manifest = {
"load_id": "LOAD-2024Q4-SEED",
"load_date": "2024-12-15",
"states": {code: {"expected_records": cfg.expected_records,
"format": cfg.format_type}
for code, cfg in STATES.items()},
}
with open(f"{OUT}/load_manifest.json", "w") as f:
json.dump(manifest, f, indent=2)
# Write format registry
registry = {code: cfg.format_type for code, cfg in STATES.items()}
with open(f"{OUT}/state_format_registry.json", "w") as f:
json.dump(registry, f, indent=2)
print(f"Generated {len(all_bronze)} Bronze records across {len(STATES)} states")
print(f"Source files: {len(os.listdir(f'{OUT}/source_files'))}")
if __name__ == "__main__":
generate()Run: python generate_mock_data.py
Expected output:
Generated 2475 Bronze records across 50 states
Source files: 9mock_data/bronze_table.json exists and mock_data/source_files/ contains files.What & Why: Python requires an __init__.py file to treat a directory as an importable package. Without it, from tools.file_parser import parse will fail with ModuleNotFoundError: No module named 'tools'.
Create an empty file called tools/__init__.py:
# tools/__init__.py
# This file makes the tools/ directory a Python package.Verify: python -c "import tools; print('tools package OK')"
tools package OK, the package is importable. If you see ModuleNotFoundError, make sure the file is at tools/__init__.py (not tool/__init__.py).What & Why: Each state tester needs to read its state's source file and extract records into a common format. The parser auto-detects the file format from the registry and applies the correct parsing strategy. This is the "input side" of each validation — what the source says should be in Bronze.
The interesting engineering challenge here is that each format has its own gotchas: XML has namespaces, pipe-CSV can have pipes inside collateral text, fixed-width requires exact character positions, and JSON keys vary by state. The parser must handle all of these correctly or the validation results will be wrong — not because Bronze is wrong, but because the parser misread the source.
Create a new file called tools/file_parser.py:
"""Auto-detecting 5-format parser for UCC source files."""
import csv, json, io
from xml.etree import ElementTree as ET
from pathlib import Path
NS = {"ucc": "urn:sos:ucc"}
def parse(state_code: str, file_path: str, format_type: str) -> list[dict]:
"""Parse a source file into canonical record dicts.
Args:
state_code: 2-letter state code (e.g., "NY")
file_path: Path to the source file
format_type: One of xml, pipe_csv, comma_csv, fixed_width, json
Returns:
List of dicts with canonical field names
Raises:
ValueError: If file is truncated, wrong encoding, or unparseable
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Source file not found: {file_path}")
if path.stat().st_size == 0:
return [] # Empty file is valid (0 records)
# Encoding check
with open(file_path, "rb") as f:
head = f.read(4)
if head[:2] in (b'\xff\xfe', b'\xfe\xff'):
raise ValueError(f"Expected UTF-8, detected UTF-16 (BOM: {head[:2].hex().upper()})")
parsers = {
"xml": _parse_xml,
"pipe_csv": _parse_pipe_csv,
"comma_csv": _parse_comma_csv,
"fixed_width": _parse_fixed_width,
"json": _parse_json,
}
parser = parsers.get(format_type)
if not parser:
raise ValueError(f"Unknown format: {format_type}")
return parser(state_code, file_path)
def _parse_xml(state: str, path: str) -> list[dict]:
tree = ET.parse(path)
root = tree.getroot()
records = []
for filing in root.findall(".//ucc:Filing", NS) or root.findall(".//Filing"):
fn = (filing.findtext("ucc:FileNumber", "", NS)
or filing.findtext("FileNumber", ""))
records.append({
"filing_number": fn.strip(),
"filing_type": (filing.findtext("ucc:Type", "", NS)
or filing.findtext("Type", "")).strip(),
"filing_date": (filing.findtext("ucc:FilingDate", "", NS)
or filing.findtext("FilingDate", "")).strip(),
"lapse_date": (filing.findtext("ucc:LapseDate", "", NS)
or filing.findtext("LapseDate", "")).strip() or None,
"status": (filing.findtext("ucc:Status", "", NS)
or filing.findtext("Status", "")).strip().upper(),
"debtor_name": _extract_name(filing, "Debtor"),
"secured_party_name": _extract_name(filing, "SecuredParty"),
})
return records
def _extract_name(filing, tag):
el = filing.find(f"ucc:{tag}/ucc:Name", NS) or filing.find(f"{tag}/Name")
return el.text.strip().upper() if el is not None and el.text else ""
def _parse_pipe_csv(state: str, path: str) -> list[dict]:
records = []
with open(path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
parts = line.split("|")
if len(parts) < 8:
raise ValueError(f"Line {line_num}: expected 8+ pipe fields, got {len(parts)}")
status_map = {"A": "ACTIVE", "T": "TERMINATED", "L": "LAPSED"}
records.append({
"filing_number": parts[0].strip(),
"filing_type": parts[1].strip(),
"filing_date": parts[2].strip(),
"status": status_map.get(parts[3].strip(), parts[3].strip().upper()),
"debtor_name": parts[4].strip().upper(),
"secured_party_name": parts[7].strip().upper(),
})
return records
def _parse_comma_csv(state: str, path: str) -> list[dict]:
records = []
with open(path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
records.append({
"filing_number": row.get("filing_number", "").strip(),
"filing_type": row.get("type", "").strip(),
"filing_date": row.get("filing_date", "").strip(),
"lapse_date": row.get("lapse_date", "").strip() or None,
"status": row.get("status", "").strip().upper(),
"debtor_name": row.get("debtor_name", "").strip().strip('"').upper(),
"secured_party_name": row.get("sp_name", "").strip().strip('"').upper(),
})
return records
def _parse_fixed_width(state: str, path: str) -> list[dict]:
records = []
with open(path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
if len(line.rstrip()) < 30:
raise ValueError(f"Line {line_num}: record truncated ({len(line.rstrip())} chars)")
status_map = {"A": "ACTIVE", "T": "TERMINATED", "L": "LAPSED"}
raw_date = line[35:45].strip()
# Convert YYYYMMDD to YYYY-MM-DD
if len(raw_date) == 8 and raw_date.isdigit():
filing_date = f"{raw_date[:4]}-{raw_date[4:6]}-{raw_date[6:]}"
else:
filing_date = raw_date
records.append({
"filing_number": line[0:15].strip(),
"filing_type": line[15:35].strip(),
"filing_date": filing_date,
"status": status_map.get(line[45:46], line[45:46].upper()),
"debtor_name": line[46:86].strip().upper(),
"secured_party_name": line[86:126].strip().upper(),
})
return records
def _parse_json(state: str, path: str) -> list[dict]:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
records = []
for entry in data:
date_raw = entry.get("dateFiled", "")
filing_date = date_raw[:10] if "T" in date_raw else date_raw
records.append({
"filing_number": entry.get("fileNum", ""),
"filing_type": entry.get("filingType", "").replace("-", ""),
"filing_date": filing_date,
"status": entry.get("currentStatus", "").upper(),
"debtor_name": entry.get("parties", {}).get("debtor", {}).get("legalName", "").upper(),
"secured_party_name": entry.get("parties", {}).get("securedParty", {}).get("legalName", "").upper(),
})
return recordsRun: python -c "from tools.file_parser import parse; r = parse('NV', 'mock_data/source_files/NV_2024_Q4.json', 'json'); print(f'{len(r)} records, first: {r[0][\"filing_number\"]}')"
Expected output:
53 records, first: NV-2024-0001parse('NY', 'mock_data/source_files/NY_2024_Q4.xml', 'xml').What & Why: This is the "output side" of each validation — what actually ended up in the Bronze table. For the local mock version, it reads from bronze_table.json. In production (Tier 2/3), this same interface would query BigQuery or DuckDB. The key insight: the state tester agent calls get_state_records("NY") regardless of whether the backend is JSON, DuckDB, or BigQuery.
Create a new file called tools/bronze_query.py:
"""Mock Bronze table query tool — reads from JSON file."""
import json
from pathlib import Path
from functools import lru_cache
BRONZE_PATH = "mock_data/bronze_table.json"
@lru_cache(maxsize=1)
def _load_bronze() -> list[dict]:
"""Load Bronze table once and cache in memory."""
with open(BRONZE_PATH, "r") as f:
return json.load(f)
def get_state_records(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> list[dict]:
"""Get all Bronze records for a specific state and load.
This is the function the state tester calls. In production,
this would query BigQuery instead of a JSON file.
"""
return [r for r in _load_bronze()
if r["source_state"] == state_code and r["load_id"] == load_id]
def get_record_count(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> int:
"""Count Bronze records for a state."""
return len(get_state_records(state_code, load_id))
def get_all_states(load_id: str = "LOAD-2024Q4-SEED") -> list[str]:
"""Get all states present in Bronze for a given load."""
return list(set(r["source_state"] for r in _load_bronze() if r["load_id"] == load_id))
def get_total_count(load_id: str = "LOAD-2024Q4-SEED") -> int:
"""Total records across all states."""
return len([r for r in _load_bronze() if r["load_id"] == load_id])Run: python -c "from tools.bronze_query import get_record_count, get_total_count; print(f'NY: {get_record_count(\"NY\")} records, Total: {get_total_count()}')"
Expected output:
NY: 50 records, Total: 2475generate_mock_data.py created (capped at 50 per state for mock data).What & Why: This is the heart of the system — 12 functions that each test one specific aspect of data quality. Each check takes source records and Bronze records as input and returns a structured result (PASS/FAIL/WARN with details). The checks are independent, so they can run in any order and a failure in one doesn't block the others.
Let's walk through the checks in logical groups. First, the "can we even read the data?" check (SRC-01). Then structural checks (counts, duplicates). Then content checks (dates, statuses, names). Finally, the spot check that does a field-by-field comparison on random records.
Create a new file called tools/validation_checks.py:
"""12 Bronze canonical validation checks."""
import re, random
from datetime import datetime
from pydantic import BaseModel
class CheckResult(BaseModel):
check_id: str
check_name: str
status: str # PASS | FAIL | WARN
details: str
records_checked: int = 0
records_failed: int = 0
REQUIRED_FIELDS = [
"filing_number", "source_state", "filing_type",
"filing_date", "status", "debtor_name",
]
VALID_TYPES = {"UCC1", "UCC3_AMENDMENT", "UCC3_CONTINUATION", "UCC3_TERMINATION"}
VALID_STATUSES = {"ACTIVE", "TERMINATED", "LAPSED"}
DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
def src_01_parseable(source_records: list, parse_error: str | None) -> CheckResult:
"""SRC-01: Source file parseable without errors."""
if parse_error:
return CheckResult(check_id="SRC-01", check_name="Source file parseable",
status="FAIL", details=parse_error)
return CheckResult(check_id="SRC-01", check_name="Source file parseable",
status="PASS", details=f"Parsed {len(source_records)} records",
records_checked=len(source_records))
def cnt_01_record_count(source: list, bronze: list) -> CheckResult:
"""CNT-01: Source record count matches Bronze record count."""
diff = len(source) - len(bronze)
if diff == 0:
return CheckResult(check_id="CNT-01", check_name="Record count match",
status="PASS", details=f"{len(source)} source = {len(bronze)} bronze",
records_checked=len(source))
return CheckResult(check_id="CNT-01", check_name="Record count match",
status="FAIL", details=f"source={len(source)}, bronze={len(bronze)}, diff={diff}",
records_checked=len(source), records_failed=abs(diff))
def fn_01_filing_integrity(source: list, bronze: list) -> CheckResult:
"""FN-01: Every source filing number exists in Bronze, no extras."""
src_fns = {r["filing_number"] for r in source}
brz_fns = {r["filing_number"] for r in bronze}
missing = src_fns - brz_fns
extra = brz_fns - src_fns
if not missing and not extra:
return CheckResult(check_id="FN-01", check_name="Filing number integrity",
status="PASS", details=f"All {len(src_fns)} filing numbers match",
records_checked=len(src_fns))
parts = []
if missing:
parts.append(f"{len(missing)} missing from Bronze")
if extra:
parts.append(f"{len(extra)} extra in Bronze")
return CheckResult(check_id="FN-01", check_name="Filing number integrity",
status="FAIL", details="; ".join(parts),
records_checked=len(src_fns), records_failed=len(missing)+len(extra))
def dup_01_no_duplicates(bronze: list) -> CheckResult:
"""DUP-01: No duplicate filing numbers in Bronze for this state."""
fns = [r["filing_number"] for r in bronze]
seen, dups = set(), set()
for fn in fns:
if fn in seen:
dups.add(fn)
seen.add(fn)
if not dups:
return CheckResult(check_id="DUP-01", check_name="No duplicates in Bronze",
status="PASS", details=f"{len(fns)} records, 0 duplicates",
records_checked=len(fns))
return CheckResult(check_id="DUP-01", check_name="No duplicates in Bronze",
status="WARN", details=f"{len(dups)} duplicate filing numbers: {', '.join(list(dups)[:5])}",
records_checked=len(fns), records_failed=len(dups))
def ft_01_filing_type(bronze: list) -> CheckResult:
"""FT-01: All filing types are one of 4 canonical types."""
bad = [r for r in bronze if r.get("filing_type") not in VALID_TYPES]
if not bad:
return CheckResult(check_id="FT-01", check_name="Filing type normalized",
status="PASS", details=f"All {len(bronze)} records have valid types",
records_checked=len(bronze))
bad_types = set(r.get("filing_type") for r in bad)
return CheckResult(check_id="FT-01", check_name="Filing type normalized",
status="FAIL", details=f"{len(bad)} records with invalid types: {bad_types}",
records_checked=len(bronze), records_failed=len(bad))
def dt_01_date_format(bronze: list) -> CheckResult:
"""DT-01: All dates in YYYY-MM-DD format."""
bad = []
for r in bronze:
for field in ["filing_date", "lapse_date"]:
val = r.get(field)
if val and not DATE_RE.match(str(val)):
bad.append(r["filing_number"])
break
if not bad:
return CheckResult(check_id="DT-01", check_name="Date format normalized",
status="PASS", details=f"All dates in YYYY-MM-DD",
records_checked=len(bronze))
return CheckResult(check_id="DT-01", check_name="Date format normalized",
status="FAIL", details=f"{len(bad)} records have non-YYYY-MM-DD dates",
records_checked=len(bronze), records_failed=len(bad))
def dt_02_date_values(bronze: list) -> CheckResult:
"""DT-02: No future filing dates, no lapse before filing."""
bad = []
today = datetime.now().date()
for r in bronze:
fd_str = r.get("filing_date", "")
ld_str = r.get("lapse_date")
if not DATE_RE.match(str(fd_str)):
continue # DT-01 catches format issues
fd = datetime.strptime(fd_str, "%Y-%m-%d").date()
if fd > today:
bad.append(f"{r['filing_number']}: future filing date {fd_str}")
if ld_str and DATE_RE.match(str(ld_str)):
ld = datetime.strptime(ld_str, "%Y-%m-%d").date()
if ld < fd:
bad.append(f"{r['filing_number']}: lapse {ld_str} before filing {fd_str}")
if not bad:
return CheckResult(check_id="DT-02", check_name="Date values valid",
status="PASS", details="No future dates or lapse-before-filing",
records_checked=len(bronze))
return CheckResult(check_id="DT-02", check_name="Date values valid",
status="FAIL", details=f"{len(bad)} issues: {'; '.join(bad[:3])}",
records_checked=len(bronze), records_failed=len(bad))
def st_01_status(bronze: list) -> CheckResult:
"""ST-01: All status values are ACTIVE, TERMINATED, or LAPSED."""
bad = [r for r in bronze if r.get("status") not in VALID_STATUSES]
if not bad:
return CheckResult(check_id="ST-01", check_name="Status normalized",
status="PASS", details=f"All {len(bronze)} records have valid status",
records_checked=len(bronze))
return CheckResult(check_id="ST-01", check_name="Status normalized",
status="FAIL", details=f"{len(bad)} records with invalid status",
records_checked=len(bronze), records_failed=len(bad))
def nm_01_name_normalization(bronze: list) -> CheckResult:
"""NM-01: Names are uppercase, trimmed, no double spaces."""
bad = []
for r in bronze:
for field in ["debtor_name", "secured_party_name"]:
val = r.get(field, "")
if val != val.upper() or val != val.strip() or " " in val:
bad.append(r["filing_number"])
break
if not bad:
return CheckResult(check_id="NM-01", check_name="Name normalization",
status="PASS", details="All names uppercase, trimmed",
records_checked=len(bronze))
return CheckResult(check_id="NM-01", check_name="Name normalization",
status="FAIL", details=f"{len(bad)} records with unnormalized names",
records_checked=len(bronze), records_failed=len(bad))
def nl_01_required_not_null(bronze: list) -> CheckResult:
"""NL-01: 6 required fields are never null/empty."""
bad = []
for r in bronze:
for field in REQUIRED_FIELDS:
if not r.get(field):
bad.append(f"{r.get('filing_number', '?')}: {field} is null")
break
if not bad:
return CheckResult(check_id="NL-01", check_name="Required fields not null",
status="PASS", details=f"All {len(REQUIRED_FIELDS)} required fields populated",
records_checked=len(bronze))
return CheckResult(check_id="NL-01", check_name="Required fields not null",
status="FAIL", details=f"{len(bad)} records with null required fields",
records_checked=len(bronze), records_failed=len(bad))
def lm_01_load_metadata(bronze: list) -> CheckResult:
"""LM-01: source_file, load_id, load_timestamp populated."""
bad = []
for r in bronze:
if not r.get("source_file") or not r.get("load_id") or not r.get("load_timestamp"):
bad.append(r.get("filing_number", "?"))
if not bad:
return CheckResult(check_id="LM-01", check_name="Load metadata present",
status="PASS", details="All records have source_file, load_id, load_timestamp",
records_checked=len(bronze))
return CheckResult(check_id="LM-01", check_name="Load metadata present",
status="FAIL", details=f"{len(bad)} records missing load metadata",
records_checked=len(bronze), records_failed=len(bad))
def sp_01_spot_check(source: list, bronze: list, n: int = 3) -> CheckResult:
"""SP-01: Random records match field-by-field source to Bronze."""
if not source or not bronze:
return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
status="PASS", details="No records to spot-check")
brz_map = {r["filing_number"]: r for r in bronze}
sample = random.sample(source, min(n, len(source)))
mismatches = []
for src in sample:
brz = brz_map.get(src["filing_number"])
if not brz:
mismatches.append(f"{src['filing_number']}: not in Bronze")
continue
if src.get("debtor_name", "").upper() != brz.get("debtor_name", "").upper():
mismatches.append(f"{src['filing_number']}: debtor name mismatch")
if not mismatches:
return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
status="PASS", details=f"{len(sample)} random records match",
records_checked=len(sample))
return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
status="FAIL", details="; ".join(mismatches),
records_checked=len(sample), records_failed=len(mismatches))
# Registry of all 12 checks for easy iteration
ALL_CHECKS = [
"SRC-01", "CNT-01", "FN-01", "DUP-01", "FT-01", "DT-01",
"DT-02", "ST-01", "NM-01", "NL-01", "LM-01", "SP-01",
]Run: python -c "from tools.validation_checks import ALL_CHECKS; print(f'{len(ALL_CHECKS)} checks: {ALL_CHECKS}')"
Expected output:
12 checks: ['SRC-01', 'CNT-01', 'FN-01', 'DUP-01', 'FT-01', 'DT-01', 'DT-02', 'ST-01', 'NM-01', 'NL-01', 'LM-01', 'SP-01']CheckResult with status, details, and counts.What & Why: This is the subagent — one instance runs per state. It runs the deterministic validation pipeline: parse the source file, query Bronze, execute all 12 checks, and emit a structured report. The "agent" label here is the parallel-coordinator pattern from M14B/M15B, not a Claude-loop agent — everything inside test_state() is pure Python so the harness is fast (~20s for 50 states) and free to run on every commit. A Claude-driven failure-analysis step is offered as a stretch goal in the "Going Further" section.
Create a new file called state_tester.py:
"""State tester subagent — validates one state's Bronze load."""
import sys, json, time
from pathlib import Path
from config import STATES
from tools.file_parser import parse
from tools.bronze_query import get_state_records
from tools import validation_checks as vc
def test_state(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> dict:
"""Run all 12 validation checks for a single state.
Returns a dict with state, status, results list, and timing.
"""
start = time.time()
cfg = STATES.get(state_code)
if not cfg:
return {"state": state_code, "status": "FAIL",
"results": [{"check_id": "CFG", "status": "FAIL",
"details": f"Unknown state: {state_code}"}],
"duration": 0}
# Step 1: Parse source file
source_dir = Path("mock_data/source_files")
ext_map = {"xml": "xml", "pipe_csv": "csv", "comma_csv": "csv",
"fixed_width": "dat", "json": "json"}
ext = ext_map[cfg.format_type]
source_path = source_dir / f"{state_code}_2024_Q4.{ext}"
source_records = []
parse_error = None
if source_path.exists():
try:
source_records = parse(state_code, str(source_path), cfg.format_type)
except Exception as e:
parse_error = str(e)
else:
# No source file for this state in mock data — use Bronze count as proxy
pass
# Step 2: Query Bronze
bronze_records = get_state_records(state_code, load_id)
# Step 3: Run all 12 checks
results = []
results.append(vc.src_01_parseable(source_records, parse_error))
# If no source file, skip source-vs-bronze checks
if source_path.exists() and not parse_error:
results.append(vc.cnt_01_record_count(source_records, bronze_records))
results.append(vc.fn_01_filing_integrity(source_records, bronze_records))
else:
results.append(vc.CheckResult(check_id="CNT-01", check_name="Record count match",
status="PASS", details="No source file; Bronze-only validation"))
results.append(vc.CheckResult(check_id="FN-01", check_name="Filing number integrity",
status="PASS", details="No source file; Bronze-only validation"))
results.append(vc.dup_01_no_duplicates(bronze_records))
results.append(vc.ft_01_filing_type(bronze_records))
results.append(vc.dt_01_date_format(bronze_records))
results.append(vc.dt_02_date_values(bronze_records))
results.append(vc.st_01_status(bronze_records))
results.append(vc.nm_01_name_normalization(bronze_records))
results.append(vc.nl_01_required_not_null(bronze_records))
results.append(vc.lm_01_load_metadata(bronze_records))
if source_path.exists() and not parse_error:
results.append(vc.sp_01_spot_check(source_records, bronze_records))
else:
results.append(vc.CheckResult(check_id="SP-01", check_name="Spot check",
status="PASS", details="No source file for spot check"))
# Determine overall status
statuses = [r.status for r in results]
if "FAIL" in statuses:
overall = "FAIL"
elif "WARN" in statuses:
overall = "WARN"
else:
overall = "PASS"
duration = time.time() - start
return {
"state": state_code,
"status": overall,
"results": [r.model_dump() for r in results],
"record_count": len(bronze_records),
"duration": round(duration, 3),
}
if __name__ == "__main__":
state = sys.argv[1] if len(sys.argv) > 1 else "NY"
load = sys.argv[2] if len(sys.argv) > 2 else "LOAD-2024Q4-SEED"
result = test_state(state, load)
print(json.dumps(result, indent=2))Run: python state_tester.py NY
Expected output: A JSON object showing NY's results — 11 PASS and 1 FAIL on DT-02 (the two records with lapse before filing date).
Run: python state_tester.py GA
Expected output: GA fails DT-01 (date format not normalized).
What & Why: The coordinator is the top-level agent. It reads the load manifest, spawns one state tester per state (all 50 in parallel using Python's concurrent.futures), collects all results, and generates the final dashboard. This is the core multi-agent orchestration pattern: one boss, many workers, results aggregated into a single view.
The coordinator uses ThreadPoolExecutor for parallelism. Why threads instead of async? Because each state tester is CPU-light (JSON parsing) and I/O-light (file reads), so threads are sufficient. In production with real BigQuery calls, you'd use asyncio for better I/O concurrency.
Create a new file called coordinator.py:
"""Coordinator agent — orchestrates 50 parallel state testers."""
import sys, json, time, argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import STATES, TOTAL_EXPECTED_RECORDS
from state_tester import test_state
def run_coordinator(load_id: str, mode: str = "full_seed",
baseline: str | None = None, max_workers: int = 50) -> dict:
"""Run Bronze validation across all 50 states in parallel.
Args:
load_id: The load batch ID to validate
mode: full_seed | incremental | change_detection
baseline: Previous load_id for incremental comparison
max_workers: Thread pool size (50 = all 50 states concurrent;
drop to 10 if your runtime has memory pressure or
quota-limited downstream APIs)
Returns:
Aggregated results dict with summary and per-state details
"""
start = time.time()
print(f"\n{'='*60}")
print(f"BRONZE CANONICAL LOAD VALIDATION")
print(f"Mode: {mode.upper()} | Load ID: {load_id}")
if baseline:
print(f"Baseline: {baseline}")
print(f"{'='*60}\n")
# Determine which states to test
states_to_test = list(STATES.keys())
print(f"Spawning {len(states_to_test)} state testers...\n")
# Run all state testers in parallel
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(test_state, state, load_id): state
for state in states_to_test
}
for future in as_completed(futures):
state = futures[future]
try:
result = future.result()
results[state] = result
icon = {"PASS": "+", "FAIL": "x", "WARN": "!"}[result["status"]]
print(f" [{icon}] {state}: {result['status']} "
f"({result['record_count']} records, {result['duration']}s)")
except Exception as e:
results[state] = {
"state": state, "status": "FAIL",
"results": [{"check_id": "ERR", "status": "FAIL", "details": str(e)}],
"record_count": 0, "duration": 0,
}
print(f" [x] {state}: ERROR - {e}")
duration = time.time() - start
# Aggregate results
pass_count = sum(1 for r in results.values() if r["status"] == "PASS")
fail_count = sum(1 for r in results.values() if r["status"] == "FAIL")
warn_count = sum(1 for r in results.values() if r["status"] == "WARN")
total_records = sum(r["record_count"] for r in results.values())
failed_states = [s for s, r in results.items() if r["status"] == "FAIL"]
warn_states = [s for s, r in results.items() if r["status"] == "WARN"]
# Print dashboard
print(f"\n{'='*60}")
print(f"SUMMARY: {pass_count} PASS | {fail_count} FAIL | {warn_count} WARN")
print(f"Records: {total_records} bronze")
print(f"Duration: {duration:.1f} seconds ({len(states_to_test)} states in parallel)")
if failed_states:
print(f"\nFAILURES:")
for state in failed_states:
failed_checks = [r for r in results[state]["results"]
if r["status"] == "FAIL"]
for fc in failed_checks:
print(f" x {state} - {fc['check_id']}: {fc['details']}")
if warn_states:
print(f"\nWARNINGS:")
for state in warn_states:
warn_checks = [r for r in results[state]["results"]
if r["status"] == "WARN"]
for wc in warn_checks:
print(f" ! {state} - {wc['check_id']}: {wc['details']}")
print(f"\nPER-STATE:")
print(f" {'State':<6} {'Records':>8} {'Status':<6} Failed Checks")
for state in sorted(results.keys()):
r = results[state]
failed = [c["check_id"] for c in r["results"] if c["status"] == "FAIL"]
failed_str = ", ".join(failed) if failed else ""
print(f" {state:<6} {r['record_count']:>8} {r['status']:<6} {failed_str}")
print(f"{'='*60}\n")
return {
"load_id": load_id,
"mode": mode,
"overall_status": "FAIL" if fail_count > 0 else ("WARN" if warn_count > 0 else "PASS"),
"summary": {
"pass_count": pass_count, "fail_count": fail_count, "warn_count": warn_count,
"total_states": len(states_to_test), "total_records": total_records,
"duration_seconds": round(duration, 1),
"failed_states": failed_states, "warn_states": warn_states,
},
"per_state": results,
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Bronze Canonical Load Validator")
parser.add_argument("--load-id", default="LOAD-2024Q4-SEED")
parser.add_argument("--mode", default="full_seed",
choices=["full_seed", "incremental", "change_detection"])
parser.add_argument("--baseline", default=None)
parser.add_argument("--workers", type=int, default=50)
args = parser.parse_args()
result = run_coordinator(args.load_id, args.mode, args.baseline, args.workers)
# Save report
with open(f"report_{args.load_id}.json", "w") as f:
json.dump(result, f, indent=2)
print(f"Report saved to report_{args.load_id}.json")Run: python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed
Expected output: A dashboard showing results for all 50 states. NY should FAIL (DT-02), GA should FAIL (DT-01), NV should WARN (DUP-01), and the remaining 47 states should PASS.
- All states PASS (no failures) → Re-run
python generate_mock_data.pyto regenerate mock data with injected errors FileNotFoundError: mock_data/bronze_table.json→ You must run Step 3 (generate_mock_data.py) before the coordinator- Results are slow → Reduce
--workersor check that mock data files are small
Three Load Scenarios
The coordinator supports three validation modes. Each mirrors a real production scenario: is this the first-ever load, an incremental update, or a format change?
Scenario A: Full Seed Load
When it runs: Initial load or full reload into an empty Bronze table.
Runs the 12 standard per-state checks plus 4 seed-specific checks (SEED-01 through SEED-04): verifying no pre-existing data, all 50 states present, total record reconciliation, and manifest accuracy.
Command: python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed
Scenario B: Incremental Load
When it runs: Quarterly update — adding new data on top of existing Bronze records.
Runs 12 standard checks plus 8 incremental checks (INC-01 through INC-08): verifying new records added correctly, no duplicates from re-load, existing records untouched, UCC-3 amendments/continuations/terminations link properly, and unchanged states have no phantom inserts.
Command: python coordinator.py --load-id LOAD-2025Q1-INC --mode incremental --baseline LOAD-2024Q4-SEED
Scenario C: Change Detection
When it runs: Pre-validation step when a state's source file looks different from previous quarters.
Runs 10 change-specific checks (CHG-01 through CHG-10): format type, column count, column names, new/removed columns, data type consistency, delimiter, record volume, date format, and encoding. This catches schema drift before bad data enters Bronze.
Command: python coordinator.py --load-id LOAD-2025Q1-INC --mode change_detection
The coordinator agent and state tester agents are identical across all three scenarios. Only the set of checks changes. This is the power of the tool abstraction pattern — the agent's reasoning loop stays the same, the tools determine what it validates.
You may notice the core pipeline (Steps 2–8) uses pure Python — no Claude API calls. This is intentional. The agent pattern is the architecture (coordinator + parallel subagents + tool abstraction), not the LLM calls. By building the validation logic in deterministic Python first, you can test and debug it without API costs or latency. The "Going Further" section shows how to add Claude-powered error analysis on top of this foundation — using the LLM where it adds value (interpreting failures, diagnosing root causes) rather than for deterministic checks where Python is faster and cheaper.
Tier 1: Local Production Deployment
No cloud account needed. Uses Docker + DuckDB + local filesystem. This is a real production setup — containerized, with a database, API server, file watcher, and dashboard — just running on your laptop.
What & Why: DuckDBAn embedded analytical database, like SQLite but optimized for column-oriented analytics queries. Runs in-process with zero setup — just pip install and go. replaces the JSON file with a real SQL database. This is important because the production version uses BigQuery, and DuckDB lets you write the same SQL queries locally. The agent calls get_state_records("NY") either way — only the backend changes.
Install DuckDB: pip install duckdb
Create a new file called tools/bronze_query_local.py:
"""DuckDB Bronze table — local production replacement for BigQuery."""
import duckdb, json, os
DB_PATH = os.environ.get("DB_PATH", "data/bronze.duckdb")
def init_db():
"""Create Bronze table and load mock data into DuckDB."""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
con = duckdb.connect(DB_PATH)
con.execute("""
CREATE TABLE IF NOT EXISTS bronze_filings (
filing_number VARCHAR, source_state VARCHAR(2),
filing_type VARCHAR, filing_date VARCHAR,
lapse_date VARCHAR, status VARCHAR,
debtor_name VARCHAR, debtor_address VARCHAR,
debtor_org_type VARCHAR, secured_party_name VARCHAR,
secured_party_address VARCHAR, collateral_description VARCHAR,
source_file VARCHAR, load_id VARCHAR, load_timestamp VARCHAR
)
""")
con.execute("DELETE FROM bronze_filings") # Clean reload
with open("mock_data/bronze_table.json") as f:
records = json.load(f)
for r in records:
con.execute(
"INSERT INTO bronze_filings VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
[r.get(k, "") for k in [
"filing_number", "source_state", "filing_type", "filing_date",
"lapse_date", "status", "debtor_name", "debtor_address",
"debtor_org_type", "secured_party_name", "secured_party_address",
"collateral_description", "source_file", "load_id", "load_timestamp",
]]
)
con.close()
print(f"Loaded {len(records)} records into DuckDB at {DB_PATH}")
def get_state_records(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> list[dict]:
"""Same interface as JSON version — drop-in replacement."""
con = duckdb.connect(DB_PATH, read_only=True)
result = con.execute(
"SELECT * FROM bronze_filings WHERE source_state = ? AND load_id = ?",
[state_code, load_id]
).fetchdf().to_dict('records')
con.close()
return result
def get_record_count(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> int:
con = duckdb.connect(DB_PATH, read_only=True)
count = con.execute(
"SELECT COUNT(*) FROM bronze_filings WHERE source_state = ? AND load_id = ?",
[state_code, load_id]
).fetchone()[0]
con.close()
return count
if __name__ == "__main__":
init_db()Run: python -m tools.bronze_query_local
Expected output:
Loaded 2475 records into DuckDB at data/bronze.duckdbVerify: python -c "from tools.bronze_query_local import get_record_count; print(get_record_count('NY'))"
What & Why: Docker Compose runs three containers: the test agent API (FastAPI), a file watcher that auto-triggers validation when new source files appear, and an HTML dashboard that shows results from DuckDB. This mirrors the production architecture (Cloud Run + Pub/Sub + Grafana) but runs entirely on your machine.
Create Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-m", "uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]Create requirements.txt:
anthropic>=0.39.0
pydantic>=2.0
rich>=13.0
duckdb>=1.0
fastapi>=0.110
uvicorn>=0.27
httpx>=0.27Create docker-compose.yml:
version: '3.8'
services:
test-agent:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- DEPLOYMENT_TIER=local
- DB_PATH=/data/bronze.duckdb
volumes:
- ./data:/data
- ./mock_data:/app/mock_data
- ./reports:/app/reports
dashboard:
build: .
command: python -m uvicorn dashboard_server:app --host 0.0.0.0 --port 8080
ports:
- "8080:8080"
environment:
- DB_PATH=/data/bronze.duckdb
volumes:
- ./data:/dataCreate server.py:
"""FastAPI server for Bronze validation agent."""
import json
from fastapi import FastAPI, BackgroundTasks
from coordinator import run_coordinator
app = FastAPI(title="Bronze Validation Agent")
@app.get("/health")
def health():
return {"status": "healthy"}
@app.post("/validate")
def validate(load_id: str = "LOAD-2024Q4-SEED", mode: str = "full_seed"):
"""Run Bronze validation and return results."""
result = run_coordinator(load_id, mode)
return result
@app.get("/results/{load_id}")
def get_results(load_id: str):
"""Retrieve saved report for a previous run."""
try:
with open(f"reports/report_{load_id}.json") as f:
return json.load(f)
except FileNotFoundError:
return {"error": f"No report found for {load_id}"}Create dashboard_server.py:
"""Dashboard API — serves validation results from DuckDB."""
import os, json, glob
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
app = FastAPI(title="Bronze Validation Dashboard")
@app.get("/health")
def health():
return {"status": "healthy"}
@app.get("/", response_class=HTMLResponse)
def dashboard():
"""Serve a simple HTML dashboard of the latest results."""
reports = sorted(glob.glob("reports/report_*.json"), reverse=True)
if not reports:
return "<h1>No validation reports found</h1><p>Run a validation first.</p>"
with open(reports[0]) as f:
data = json.load(f)
summary = data.get("summary", {})
html = f"""<html><body style="font-family:monospace;background:#0A1628;color:#E8ECF1;padding:2rem;">
<h1>Bronze Validation Dashboard</h1>
<p>Load: {data.get('load_id')} | Mode: {data.get('mode')}</p>
<p style="font-size:1.5rem;">
<span style="color:#10B981;">{summary.get('pass_count',0)} PASS</span> |
<span style="color:#F43F5E;">{summary.get('fail_count',0)} FAIL</span> |
<span style="color:#F59E0B;">{summary.get('warn_count',0)} WARN</span>
</p>
<p>{summary.get('total_records',0)} records | {summary.get('duration_seconds',0)}s</p>
</body></html>"""
return htmlRun: docker compose up -d
Verify: curl http://localhost:8000/health (should return {"status":"healthy"})
uvicorn server:app --port 8000.Test Cases
Run these 5 scenarios to verify your system handles all cases correctly.
| # | Scenario | Command | Expected | Type |
|---|---|---|---|---|
| 1 | Clean state (CA) | python state_tester.py CA |
All 12 checks PASS | Happy |
| 2 | Full coordinator run | python coordinator.py |
47 PASS, 2 FAIL, 1 WARN | Happy |
| 3 | Known failure (NY) | python state_tester.py NY |
DT-02 FAIL (lapse before filing) | Happy |
| 4 | Duplicate detection (NV) | python state_tester.py NV |
DUP-01 WARN (3 duplicates) | Edge |
| 5 | Unknown state code | python state_tester.py ZZ |
CFG FAIL (unknown state) | Error |
Run the complete pipeline end-to-end:
python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed
You should see a full dashboard with 47 PASS, 2 FAIL (NY on DT-02, GA on DT-01), and 1 WARN (NV on DUP-01). A JSON report is saved automatically.
Congratulations! You've built a parallel state testing agent that validates 50 states simultaneously, catches data quality issues, and produces a structured dashboard — all in under 20 seconds.
Going Further (Optional)
- MCP Server for Conversational Testing — Build an MCP server so you can ask Claude: "Hey, how did Georgia's load go?" and get an answer from the test results database.
- Fix Recommendation Agent — Add a second agent that analyzes FAIL results and suggests specific transformation rule updates (e.g., "GA's date parser should handle DD/MM/YYYY input").
- Trend Analysis — Store results from multiple quarters and flag anomalies (e.g., "Ohio's record count dropped 40% from Q3 to Q4 — is this real or a data feed issue?").
- Upgrade to Tier 2 (GCP) — Swap DuckDB for BigQuery, local files for GCS, file watcher for Pub/Sub. The coordinator and state tester code stays identical.
- Add Claude-Powered Error Analysis — When a check fails, send the failed records to Claude and ask it to diagnose the root cause and suggest a fix.