Building AI Agents with Claude
Capstone Project 6
Capstone 6 of 64–6 hoursDomain C — UCC Data Engineering
← Capstone 5-C 🏠 Home M25: Claude Code Mastery →

Capstone 6 — Parallel State Testing Agent

Build a coordinator agent that spawns 50 parallel subagents to validate the Bronze Canonical Load across all US states — checking record counts, date normalization, duplicate detection, and schema compliance in one automated sweep.

Project Brief

The 50-State Problem

Before standardized building codes, every US city had its own rules for wiring, plumbing, and structural loads. A contractor who built safely in New York might install something dangerous in Miami — not because they were careless, but because each jurisdiction defined "correct" differently. Inspectors had to know every local code by heart, and a single missed rule could mean a condemned building.

The pain is the same with UCC filing data. Every quarter, 50 state Secretary of State offices publish bulk filing data — but New York sends XML, California sends pipe-delimited CSV, Texas sends fixed-width text, Florida sends JSON, and so on. There is no federal standard. Your Canonical LoaderA data pipeline tool that reads each state's file format, applies state-specific transformation rules, and loads records into a single Bronze table with a universal schema. reads each state's file and normalizes it into one table — but how do you know every transformation was correct?

This capstone is the "building inspector" for your data pipeline. You will build an agent that validates all 50 states in parallel — one subagent per state, each running 12 checks, all at the same time. Instead of manually spot-checking a few states and hoping the rest are fine, your agent systematically tests every single one and produces a dashboard showing exactly what passed and what broke.

What You'll Build
  • Coordinator Agent — reads the load manifest, spawns 50 parallel state tester subagents, collects results, and generates a dashboard
  • State Tester Subagent — one per state; reads the source file, queries the Bronze table, runs 12 validation checks, returns a structured pass/fail report
  • 5-Format Parser — auto-detects and parses XML, pipe-CSV, comma-CSV, fixed-width, and JSON source files
  • 12 Validation Checks — record counts, filing number integrity, duplicates, date normalization, status normalization, name normalization, null checks, and spot checks
  • 3 Load Scenarios — full seed, incremental, and change detection — each with scenario-specific checks
  • Tier 1 Local Deployment — Docker Compose with DuckDB, file watcher, and HTML dashboard
Why It Matters

In production data pipelines, loading data is only half the job. Validating the load is what prevents bad data from reaching downstream consumers. At scale, a single undetected date format issue in Georgia's data (DD/MM/YYYY instead of YYYY-MM-DD) can cause thousands of lien risk calculations to be wrong. Manual validation of 50 states takes a data engineer 2–3 days. This agent does it in under 20 seconds, catches errors humans would miss, and runs automatically after every load.

Prerequisites

Required Before Starting
  • M05 — Tool Use: How agents call tools (the state tester is a tool-calling agent)
  • M06 — Multi-Tool Orchestration: Agents calling multiple tools in sequence
  • M12 — ReAct Agents: The think→act→observe loop used by each state tester
  • M13 — Planning & Decomposition: How the coordinator breaks a big task into 50 parallel subtasks
  • M14 — Multi-Agent Systems: Coordinator + subagent pattern
  • M15B — Build Complete Agent: Hands-on experience building a multi-agent system
  • M16–M17 — Guardrails: Input validation and error handling patterns
  • Python 3.10+ and Docker Desktop installed
Common Misconceptions

"Testing data pipelines is just SELECT COUNT(*)" — Count matching is check #2 of 12. Counts can match perfectly while dates are wrong, names are unnormalized, duplicates exist, and required fields are null. You need structural, content, and referential integrity checks.

"50 agents means 50x the API cost" — The base capstone runs zero API calls per state (all 12 checks are deterministic Python), so 50 states costs $0. The optional Claude-driven failure-analysis stretch goal would add ~2–3 Haiku calls per failed state (~$0.003 each); even at 100% failure rate that's under $0.50 for the entire 50-state run. The parallelism saves time (20 seconds vs 2 days manual) at near-zero cost.

"I should test the Canonical Loader itself" — No. Your agent tests the OUTPUT of the loader — the Bronze table — not the loader code. This is black-box testing: you compare source files to what ended up in the table, regardless of how the loader works internally.

Architecture Diagram

Before writing any code, study the system blueprint. The coordinator agent is the brain; each state tester is a specialist that knows how to validate one state's data. The coordinator doesn't know how to parse XML or check dates — it knows how to delegate and aggregate.

System Architecture
User: "Run Bronze validation for 2024 Q4 load" | v COORDINATOR AGENT 1. Read load manifest 2. Spawn 50 state testers IN PARALLEL 3. Collect 50 results as they stream in 4. Aggregate into dashboard | +---> [NY tester] ---> 12 checks ---> result +---> [CA tester] ---> 12 checks ---> result +---> [TX tester] ---> 12 checks ---> result +---> ... (all 50 concurrent) ... +---> [WY tester] ---> 12 checks ---> result
Now you see the shape of the system: one coordinator, 50 parallel workers, 12 checks each. Before building it, let's see WHY this matters through the animations below — the 50-state problem, the agent swarm, and what happens inside each tester.

Animation 1: The 50-State Problem

Each US state uses its own file format for UCC filing data. This animation shows the format distribution across all 50 states and how they all converge into one canonical Bronze table. Five colors, five formats, one target schema.

50 Formats → 1 Schema
XML (15)
Pipe CSV (15)
Comma CSV (15)
Fixed-width (4)
JSON (1+)

50 formats → 1 schema. Your agent validates every transformation.

Animation 2: Parallel Agent Swarm

Watch the coordinator spawn 50 subagents simultaneously. They don't run one-by-one — they all burst outward at once, each validating its assigned state. Results stream back asynchronously: small states finish faster, large states like California take longer. Some turn green (pass), some red (fail), one yellow (warning).

Coordinator → 50 Parallel State Testers
COORDINATOR
47 PASS  |  2 FAIL  |  1 WARN

Animation 3: Single State Validation Flow

Zoom into one state tester — New York. It reads the source XML file, queries the Bronze table for NY records, then runs all 12 checks in sequence. Watch each check gate turn green (pass) or red (fail). NY has a known issue: 2 records where the lapse date is before the filing date, so check DT-02 will fail.

NY State Tester — 12 Validation Checks
Result: 11 PASS | 1 FAIL (DT-02)

Animation 4: Format Parsing Comparison

The same UCC filing — filing number NY-2024-0847, debtor ACME CORPORATION, secured party FIRST NATIONAL BANK — looks completely different in each of the 5 source formats. Click each tab to see how the parser extracts the same canonical fields from wildly different structures.

One Filing, Five Formats

Target: filing_number filing_date debtor_name secured_party status

<Filing xmlns="urn:sos:ucc"> <FileNumber>NY-2024-0847</FileNumber> <Type>UCC1</Type> <FilingDate>2024-03-15</FilingDate> <Status>Active</Status> <Debtor> <Name>Acme Corporation</Name> <Address>123 Broadway, New York, NY 10001</Address> <OrgType>Corporation</OrgType> </Debtor> <SecuredParty> <Name>First National Bank</Name> </SecuredParty> <Collateral>All inventory and accounts receivable</Collateral> </Filing>
# Pipe-delimited: CA format (no header row) NY-2024-0847|UCC1|03/15/2024|A|ACME CORPORATION|123 Broadway, New York, NY 10001|CORP|FIRST NATIONAL BANK|555 Wall St, New York, NY 10005|All inventory and accounts receivable # Note: Date is MM/DD/YYYY (not ISO), Status is single-char code, # Name is already uppercase. Pipes in collateral text would break naive parsing.
# Comma CSV with header row: DE/NH/VT format filing_number,type,filing_date,status,debtor_name,debtor_addr,org_type,sp_name,sp_addr,collateral NY-2024-0847,UCC1,2024-03-15,ACTIVE,"Acme Corporation","123 Broadway, New York, NY 10001",Corporation,First National Bank,"555 Wall St, New York, NY 10005","All inventory and accounts receivable" # Note: Commas in addresses require quoting. Mixed case names need normalization.
# Fixed-width: TX/FL format (ruler shows column positions) |---15---|--5-|---10----|1|----30---------...---|---20---...| NY-2024-0847 UCC1 20240315 AACME CORPORATION FIRST NATIONAL BANK # Note: No delimiters. Fields are at exact character positions. # Date is YYYYMMDD (no separators). Padded with spaces.
{ "fileNum": "NY-2024-0847", "filingType": "UCC-1", "dateFiled": "2024-03-15T00:00:00Z", "currentStatus": "active", "parties": { "debtor": { "legalName": "Acme Corporation", "address": { "line1": "123 Broadway", "city": "New York", "state": "NY" }, "entityType": "corp" }, "securedParty": { "legalName": "First National Bank" } }, "collateral": { "description": "All inventory and accounts receivable" } } // Note: Nested objects, different key names (fileNum vs filing_number), // ISO 8601 timestamp, lowercase status. Key names vary by state.

Animation 5: The 12 Validation Checks

Click any card to expand its details — what it checks, a PASS example, a FAIL example, and which states trigger failures in the mock data. Cards with a red border are checks that have known failures baked into the test data.

Interactive Check Reference

Animation 6: Dashboard Assembly

Watch 50 individual state results fly in and sort themselves into three columns: pass (green), fail (red), and warning (yellow). The counter tallies up the final score.

50 Results → One Dashboard

Animation 7: Error Scenario Gallery

The mock data includes deliberate errors. Click each tab to see the problematic file, the check that catches it, and the agent's structured error response.

Error Detection Showcase
TX_BAD_truncated.dat
TX-2024-0001 UCC1 20240315A... TX-2024-0002 UCC1 20240420A... TX-2024-0003 UCC1 20EOF ^ File ends mid-record
Agent Response
Check: SRC-01 FAIL State: TX Message: "File truncated — last record incomplete. Expected 80 chars, got 23. Records parsed: 2 of 3 expected."
FL_BAD_encoding.json
FF FE 7B 00 22 00 66 00 69 00 ^ BOM indicates UTF-16LE Expected: UTF-8 (no BOM)
Agent Response
Check: SRC-01 FAIL State: FL Message: "Expected UTF-8 encoding, detected UTF-16LE (BOM: FF FE). File must be re-encoded before Bronze loading."
NV_2024_Q4.json (Bronze table)
{"filing_number": "NV-2024-0101", ...} {"filing_number": "NV-2024-0101", ...} {"filing_number": "NV-2024-0102", ...} ^ NV-2024-0101 appears twice
Agent Response
Check: DUP-01 WARN State: NV Message: "3 duplicate filing numbers found in Bronze: NV-2024-0101, NV-2024-0203, NV-2024-0417. Investigate source or loader."
GA_2024_Q4.csv (Bronze table)
GA-2024-0001|UCC1|15/03/2024|A|... GA-2024-0002|UCC1|20/04/2024|A|... ^ DD/MM/YYYY not normalized to YYYY-MM-DD canonical format
Agent Response
Check: DT-01 FAIL State: GA Message: "47 records have dates in DD/MM/YYYY format instead of canonical YYYY-MM-DD. Loader transformation rule needs fix."
NY records in Bronze
NY-2024-0312: filing=2024-06-01 lapse=2024-05-15 ^ lapse BEFORE filing! NY-2024-0588: filing=2024-09-10 lapse=2024-08-22 ^ lapse BEFORE filing!
Agent Response
Check: DT-02 FAIL State: NY Message: "2 records have lapse_date before filing_date. Filings: NY-2024-0312, NY-2024-0588. Likely source data error."
EMPTY_STATE.csv
(file contains 0 data records) (header row only, or completely empty)
Agent Response
Check: ALL PASS State: EMPTY Message: "0 source records, 0 Bronze records. Empty state is valid — no data to validate."

File Structure

Here is every file you will create. Click colors indicate the file's role: agents, tools, mock data, config.

capstone-6-bronze-testing/ ├── coordinator.py # Coordinator agent — spawns 50 parallel testers ├── state_tester.py # Single state tester subagent ├── tools/ │ ├── __init__.py # Makes tools/ a Python package │ ├── file_parser.py # Auto-detecting 5-format parser │ ├── bronze_query.py # Queries mock Bronze table (JSON) │ ├── validation_checks.py # All 12 check implementations │ └── report_generator.py # Dashboard + report output ├── mock_data/ │ ├── source_files/ # 9 files (5 formats + GA + 3 error cases) │ │ ├── NY_2024_Q4.xml │ │ ├── CA_2024_Q4.csv # pipe-delimited │ │ ├── DE_2024_Q4.csv # comma with header │ │ ├── TX_2024_Q4.dat # fixed-width │ │ ├── NV_2024_Q4.json │ │ ├── GA_2024_Q4.csv # bad date format │ │ ├── TX_BAD_truncated.dat │ │ ├── FL_BAD_encoding.json │ │ └── EMPTY_STATE.csv │ ├── bronze_table.json # ~2,475 mock Bronze records │ ├── state_format_registry.json │ └── load_manifest.json ├── config.py # 50-state registry with format types ├── requirements.txt # anthropic, pydantic, rich, duckdb ├── server.py # FastAPI server for Docker deployment ├── dashboard_server.py # Dashboard API for Docker deployment └── run_tests.py # CLI entry point

The Bronze Canonical Schema

Every state's data normalizes into this ONE table. Regardless of whether the source was XML from New York or fixed-width from Texas, every record ends up with the same 15 fields. This is the contract your agent validates.

{
  "filing_number":        "string — unique within state (e.g., NY-2024-0847)",
  "source_state":         "string — 2-letter code (e.g., NY)",
  "filing_type":          "string — UCC1 | UCC3_AMENDMENT | UCC3_CONTINUATION | UCC3_TERMINATION",
  "filing_date":          "date   — YYYY-MM-DD",
  "lapse_date":           "date   — YYYY-MM-DD, nullable",
  "status":               "string — ACTIVE | TERMINATED | LAPSED",
  "debtor_name":          "string — uppercase, trimmed",
  "debtor_address":       "string",
  "debtor_org_type":      "string — CORPORATION | LLC | PARTNERSHIP | INDIVIDUAL | UNKNOWN",
  "secured_party_name":   "string — uppercase, trimmed",
  "secured_party_address":"string",
  "collateral_description":"string",
  "source_file":          "string — original filename",
  "load_id":              "string — batch identifier (e.g., LOAD-2024Q4-SEED)",
  "load_timestamp":       "timestamp"
}
CREATE TABLE bronze_filings (
    filing_number         VARCHAR NOT NULL,
    source_state          VARCHAR(2) NOT NULL,
    filing_type           VARCHAR NOT NULL,
    filing_date           DATE NOT NULL,
    lapse_date            DATE,
    status                VARCHAR NOT NULL,
    debtor_name           VARCHAR NOT NULL,
    debtor_address        VARCHAR,
    debtor_org_type       VARCHAR,
    secured_party_name    VARCHAR NOT NULL,
    secured_party_address VARCHAR,
    collateral_description VARCHAR,
    source_file           VARCHAR NOT NULL,
    load_id               VARCHAR NOT NULL,
    load_timestamp        TIMESTAMP NOT NULL
);
Now you know the target schema. Every one of the 12 validation checks compares source data against this schema. Let's build the system step by step.

Step-by-Step Build Guide

Lab Overview

What You'll Build: A coordinator agent that spawns 50 parallel state tester subagents to validate Bronze canonical load data across all US states.

Time Estimate: 4–6 hours across 2–3 sessions

Prerequisites: Python 3.10+, Docker Desktop, Anthropic API key

Files You'll Create: 15+ files (see File Structure above)

Step 1: Environment Setup
5 minSetup

What & Why: Create the project directory structure and install dependencies. We use anthropic for the Claude API, pydantic for structured validation results, and rich for terminal dashboard formatting.

Create your project folder and install dependencies:

mkdir -p capstone-6-bronze-testing/{tools,mock_data/source_files}
cd capstone-6-bronze-testing
python -m venv venv && source venv/bin/activate
pip install anthropic pydantic rich duckdb fastapi uvicorn httpx
export ANTHROPIC_API_KEY=your-key-here
mkdir capstone-6-bronze-testing\tools
mkdir capstone-6-bronze-testing\mock_data\source_files
cd capstone-6-bronze-testing
python -m venv venv && venv\Scripts\activate
pip install anthropic pydantic rich duckdb fastapi uvicorn httpx
set ANTHROPIC_API_KEY=your-key-here

Run: python -c "import anthropic; print('OK')"

Expected output:

OK
Checkpoint
If you see OK, your environment is ready. If not, check that you activated the virtual environment and that pip install completed without errors.
Troubleshooting
  • ModuleNotFoundError: No module named 'anthropic' → Run pip install anthropic again, make sure venv is activated
  • command not found: python → Try python3 instead, or install Python 3.10+
Step 2: Create the 50-State Registry
10 minconfig.py

What & Why: The state registry is the single source of truth — it maps every US state to its file format type and expected record count. When the coordinator spawns 50 testers, it reads this registry to know what format parser each tester should use. Without it, the testers would have to guess the format from the file extension, which is error-prone.

Create a new file called config.py:

"""50-state format registry for Bronze canonical validation."""
from dataclasses import dataclass

@dataclass
class StateConfig:
    code: str
    name: str
    format_type: str  # xml | pipe_csv | comma_csv | fixed_width | json
    expected_records: int  # expected count for Q4 2024 seed load

STATES: dict[str, StateConfig] = {
    # XML states (15)
    "NY": StateConfig("NY", "New York",       "xml",         847),
    "OH": StateConfig("OH", "Ohio",            "xml",         423),
    "PA": StateConfig("PA", "Pennsylvania",    "xml",         612),
    "MI": StateConfig("MI", "Michigan",        "xml",         389),
    "NJ": StateConfig("NJ", "New Jersey",      "xml",         501),
    "VA": StateConfig("VA", "Virginia",        "xml",         445),
    "MA": StateConfig("MA", "Massachusetts",   "xml",         367),
    "WA": StateConfig("WA", "Washington",      "xml",         298),
    "NC": StateConfig("NC", "North Carolina",  "xml",         512),
    "MD": StateConfig("MD", "Maryland",        "xml",         276),
    "WI": StateConfig("WI", "Wisconsin",       "xml",         198),
    "SC": StateConfig("SC", "South Carolina",  "xml",         167),
    "MN": StateConfig("MN", "Minnesota",       "xml",         234),
    "OR": StateConfig("OR", "Oregon",          "xml",         189),
    "CT": StateConfig("CT", "Connecticut",     "xml",         156),
    # Pipe-delimited CSV states (15)
    "CA": StateConfig("CA", "California",      "pipe_csv",   3421),
    "IL": StateConfig("IL", "Illinois",        "pipe_csv",    756),
    "GA": StateConfig("GA", "Georgia",         "pipe_csv",    987),
    "CO": StateConfig("CO", "Colorado",        "pipe_csv",    345),
    "AZ": StateConfig("AZ", "Arizona",         "pipe_csv",    289),
    "MO": StateConfig("MO", "Missouri",        "pipe_csv",    312),
    "IN": StateConfig("IN", "Indiana",         "pipe_csv",    267),
    "TN": StateConfig("TN", "Tennessee",       "pipe_csv",    298),
    "LA": StateConfig("LA", "Louisiana",       "pipe_csv",    234),
    "KY": StateConfig("KY", "Kentucky",        "pipe_csv",    201),
    "AL": StateConfig("AL", "Alabama",         "pipe_csv",    178),
    "MS": StateConfig("MS", "Mississippi",     "pipe_csv",    145),
    "AR": StateConfig("AR", "Arkansas",        "pipe_csv",    123),
    "OK": StateConfig("OK", "Oklahoma",        "pipe_csv",    167),
    "IA": StateConfig("IA", "Iowa",            "pipe_csv",    156),
    # Comma CSV with header (15)
    "DE": StateConfig("DE", "Delaware",        "comma_csv",   890),
    "NH": StateConfig("NH", "New Hampshire",   "comma_csv",    98),
    "VT": StateConfig("VT", "Vermont",         "comma_csv",    67),
    "ME": StateConfig("ME", "Maine",           "comma_csv",    89),
    "RI": StateConfig("RI", "Rhode Island",    "comma_csv",    76),
    "HI": StateConfig("HI", "Hawaii",          "comma_csv",    54),
    "AK": StateConfig("AK", "Alaska",          "comma_csv",    43),
    "MT": StateConfig("MT", "Montana",         "comma_csv",    78),
    "WY": StateConfig("WY", "Wyoming",         "comma_csv",    34),
    "SD": StateConfig("SD", "South Dakota",    "comma_csv",    45),
    "ND": StateConfig("ND", "North Dakota",    "comma_csv",    56),
    "NE": StateConfig("NE", "Nebraska",        "comma_csv",   112),
    "ID": StateConfig("ID", "Idaho",           "comma_csv",    87),
    "NM": StateConfig("NM", "New Mexico",      "comma_csv",    98),
    "WV": StateConfig("WV", "West Virginia",   "comma_csv",    67),
    # Fixed-width states (4)
    "TX": StateConfig("TX", "Texas",           "fixed_width", 1205),
    "FL": StateConfig("FL", "Florida",         "fixed_width",  978),
    "KS": StateConfig("KS", "Kansas",          "fixed_width",  156),
    "UT": StateConfig("UT", "Utah",            "fixed_width",  134),
    # JSON state(s)
    "NV": StateConfig("NV", "Nevada",          "json",         543),
}

# Derived constants
TOTAL_EXPECTED_RECORDS = sum(s.expected_records for s in STATES.values())
FORMAT_GROUPS = {}
for s in STATES.values():
    FORMAT_GROUPS.setdefault(s.format_type, []).append(s.code)

Run: python -c "from config import STATES, TOTAL_EXPECTED_RECORDS; print(f'{len(STATES)} states, {TOTAL_EXPECTED_RECORDS} expected records')"

Expected output:

50 states, 18403 expected records
Checkpoint
You should see exactly 50 states. If the count is wrong, check that you didn't accidentally duplicate or omit a state entry.
Step 3: Create Mock Source Files
20 minmock_data/source_files/

What & Why: We need realistic source files in all 5 formats so the parser and validators have something to test against. We'll generate 9 source files: one per format type (5 clean representative files: NY/CA/DE/TX/NV), GA with bad-date Bronze records, plus 3 error files (truncated, bad encoding, empty). This step creates a Python script that generates the mock data — run it once and you have all your test files.

This step depends on Step 2 (the state registry). If you skipped Step 2, go back and create config.py first.

Create a new file called generate_mock_data.py:

"""Generate mock source files and Bronze table data for testing."""
import json, os, random, string
from datetime import datetime, timedelta
from config import STATES

random.seed(42)  # Reproducible mock data
OUT = "mock_data"

DEBTOR_NAMES = [
    "ACME CORPORATION", "GLOBEX INDUSTRIES", "INITECH LLC",
    "STARK ENTERPRISES", "WAYNE INDUSTRIES", "UMBRELLA CORP",
    "CYBERDYNE SYSTEMS", "OSCORP TECHNOLOGIES", "LEXCORP",
    "MASSIVE DYNAMIC", "TYRELL CORPORATION", "SOYLENT CORP",
    "WEYLAND-YUTANI", "APERTURE SCIENCE", "BLACK MESA",
]
SECURED_PARTIES = [
    "FIRST NATIONAL BANK", "JPMORGAN CHASE", "WELLS FARGO",
    "BANK OF AMERICA", "CITIBANK NA", "US BANCORP",
    "PNC FINANCIAL", "TRUIST FINANCIAL", "TD BANK",
]
COLLATERAL = [
    "All inventory and accounts receivable",
    "Equipment, machinery, and fixtures",
    "All assets of the debtor",
    "Accounts, chattel paper, and general intangibles",
    "Motor vehicles and titled goods",
]
ORG_TYPES = ["CORPORATION", "LLC", "PARTNERSHIP", "INDIVIDUAL"]

def rand_date(year=2024):
    d = datetime(year, 1, 1) + timedelta(days=random.randint(0, 364))
    return d.strftime("%Y-%m-%d")

def rand_lapse(filing_date_str):
    fd = datetime.strptime(filing_date_str, "%Y-%m-%d")
    return (fd + timedelta(days=5*365)).strftime("%Y-%m-%d")

def make_record(state, idx):
    fn = f"{state}-2024-{idx:04d}"
    fd = rand_date()
    return {
        "filing_number": fn,
        "source_state": state,
        "filing_type": random.choice(["UCC1", "UCC1", "UCC1", "UCC3_AMENDMENT"]),
        "filing_date": fd,
        "lapse_date": rand_lapse(fd),
        "status": "ACTIVE",
        "debtor_name": random.choice(DEBTOR_NAMES),
        "debtor_address": f"{random.randint(1,999)} Main St, Anytown, {state}",
        "debtor_org_type": random.choice(ORG_TYPES),
        "secured_party_name": random.choice(SECURED_PARTIES),
        "secured_party_address": f"{random.randint(1,999)} Bank Ave, Finance City, {state}",
        "collateral_description": random.choice(COLLATERAL),
        "source_file": "",
        "load_id": "LOAD-2024Q4-SEED",
        "load_timestamp": "2024-12-15T08:00:00Z",
    }

def write_xml(state, records, path):
    lines = ['<?xml version="1.0" encoding="UTF-8"?>', '<Filings xmlns="urn:sos:ucc">']
    for r in records:
        lines.append("  <Filing>")
        lines.append(f"    <FileNumber>{r['filing_number']}</FileNumber>")
        lines.append(f"    <Type>{r['filing_type']}</Type>")
        lines.append(f"    <FilingDate>{r['filing_date']}</FilingDate>")
        lines.append(f"    <LapseDate>{r['lapse_date']}</LapseDate>")
        lines.append(f"    <Status>{r['status']}</Status>")
        lines.append(f"    <Debtor><Name>{r['debtor_name']}</Name></Debtor>")
        lines.append(f"    <SecuredParty><Name>{r['secured_party_name']}</Name></SecuredParty>")
        lines.append(f"    <Collateral>{r['collateral_description']}</Collateral>")
        lines.append("  </Filing>")
    lines.append("</Filings>")
    with open(path, "w") as f:
        f.write("\n".join(lines))

def write_pipe_csv(state, records, path):
    lines = []
    for r in records:
        md = r["filing_date"].replace("-", "/")  # MM style for some states
        lines.append("|".join([
            r["filing_number"], r["filing_type"], r["filing_date"],
            r["status"][0], r["debtor_name"], r["debtor_address"],
            r["debtor_org_type"][:4], r["secured_party_name"],
            r["secured_party_address"], r["collateral_description"],
        ]))
    with open(path, "w") as f:
        f.write("\n".join(lines))

def write_comma_csv(state, records, path):
    header = "filing_number,type,filing_date,lapse_date,status,debtor_name,debtor_addr,org_type,sp_name,sp_addr,collateral"
    lines = [header]
    for r in records:
        row = ",".join([
            r["filing_number"], r["filing_type"], r["filing_date"],
            r["lapse_date"] or "", r["status"],
            f'"{r["debtor_name"]}"', f'"{r["debtor_address"]}"',
            r["debtor_org_type"], f'"{r["secured_party_name"]}"',
            f'"{r["secured_party_address"]}"', f'"{r["collateral_description"]}"',
        ])
        lines.append(row)
    with open(path, "w") as f:
        f.write("\n".join(lines))

def write_fixed_width(state, records, path):
    lines = []
    for r in records:
        line = (
            r["filing_number"].ljust(15)
            + r["filing_type"].ljust(20)
            + r["filing_date"].replace("-", "").ljust(10)
            + r["status"][0]
            + r["debtor_name"].ljust(40)
            + r["secured_party_name"].ljust(40)
        )
        lines.append(line)
    with open(path, "w") as f:
        f.write("\n".join(lines))

def write_json_format(state, records, path):
    entries = []
    for r in records:
        entries.append({
            "fileNum": r["filing_number"],
            "filingType": r["filing_type"],
            "dateFiled": r["filing_date"] + "T00:00:00Z",
            "currentStatus": r["status"].lower(),
            "parties": {
                "debtor": {"legalName": r["debtor_name"]},
                "securedParty": {"legalName": r["secured_party_name"]},
            },
            "collateral": {"description": r["collateral_description"]},
        })
    with open(path, "w") as f:
        json.dump(entries, f, indent=2)

WRITERS = {
    "xml": ("xml", write_xml),
    "pipe_csv": ("csv", write_pipe_csv),
    "comma_csv": ("csv", write_comma_csv),
    "fixed_width": ("dat", write_fixed_width),
    "json": ("json", write_json_format),
}

def generate():
    os.makedirs(f"{OUT}/source_files", exist_ok=True)
    all_bronze = []
    # Pick one representative state per format for source files
    reps = {"xml": "NY", "pipe_csv": "CA", "comma_csv": "DE",
            "fixed_width": "TX", "json": "NV"}

    for state_code, cfg in STATES.items():
        n = min(cfg.expected_records, 50)  # Cap at 50 for mock data
        records = [make_record(state_code, i+1) for i in range(n)]
        ext, writer = WRITERS[cfg.format_type]
        fname = f"{state_code}_2024_Q4.{ext}"

        # Only write source files for representative + error states
        if state_code in reps.values() or state_code == "GA":
            writer(state_code, records, f"{OUT}/source_files/{fname}")

        # Add to Bronze table
        for r in records:
            r["source_file"] = fname
            all_bronze.append(r)

    # === Error cases ===
    # GA: inject bad date format in Bronze (DD/MM/YYYY not converted)
    for r in all_bronze:
        if r["source_state"] == "GA":
            parts = r["filing_date"].split("-")
            r["filing_date"] = f"{parts[2]}/{parts[1]}/{parts[0]}"  # DD/MM/YYYY

    # NY: inject 2 records with lapse before filing
    # Use 2020-01-01 — guaranteed before any 2024 filing_date the
    # generator could ever produce. "2024-01-01" risked colliding
    # with a randomly-generated filing_date of the same day.
    ny_recs = [r for r in all_bronze if r["source_state"] == "NY"]
    if len(ny_recs) >= 2:
        ny_recs[0]["lapse_date"] = "2020-01-01"
        ny_recs[1]["lapse_date"] = "2020-01-01"

    # NV: inject 3 duplicates into BOTH Bronze and the source file
    # so CNT-01 stays PASS (counts match) and DUP-01 becomes WARN
    # (the duplicates are a Bronze-level data quality issue, not a count mismatch).
    nv_recs = [r for r in all_bronze if r["source_state"] == "NV"]
    if len(nv_recs) >= 3:
        for i in range(3):
            dup = dict(nv_recs[i])
            all_bronze.append(dup)
        # Re-write NV source file with the duplicated records included
        nv_full = [r for r in all_bronze if r["source_state"] == "NV"]
        write_json_format("NV", nv_full, f"{OUT}/source_files/NV_2024_Q4.json")

    # Write Bronze table
    with open(f"{OUT}/bronze_table.json", "w") as f:
        json.dump(all_bronze, f, indent=2)

    # Write error source files
    with open(f"{OUT}/source_files/TX_BAD_truncated.dat", "w") as f:
        f.write("TX-2024-0001  UCC1 20240315A" + "X"*40 + "\n")
        f.write("TX-2024-0002  UCC1 20240420A" + "X"*40 + "\n")
        f.write("TX-2024-0003  UCC1 05")  # Truncated

    with open(f"{OUT}/source_files/FL_BAD_encoding.json", "wb") as f:
        f.write(b'\xff\xfe' + '{"fileNum": "FL-BAD"}'.encode('utf-16-le'))

    with open(f"{OUT}/source_files/EMPTY_STATE.csv", "w") as f:
        f.write("")

    # Write load manifest
    manifest = {
        "load_id": "LOAD-2024Q4-SEED",
        "load_date": "2024-12-15",
        "states": {code: {"expected_records": cfg.expected_records,
                          "format": cfg.format_type}
                   for code, cfg in STATES.items()},
    }
    with open(f"{OUT}/load_manifest.json", "w") as f:
        json.dump(manifest, f, indent=2)

    # Write format registry
    registry = {code: cfg.format_type for code, cfg in STATES.items()}
    with open(f"{OUT}/state_format_registry.json", "w") as f:
        json.dump(registry, f, indent=2)

    print(f"Generated {len(all_bronze)} Bronze records across {len(STATES)} states")
    print(f"Source files: {len(os.listdir(f'{OUT}/source_files'))}")

if __name__ == "__main__":
    generate()

Run: python generate_mock_data.py

Expected output:

Generated 2475 Bronze records across 50 states
Source files: 9
Checkpoint
You should see 2475 Bronze records (50 states × up to 50 records each, plus 3 injected NV duplicates) and 9 source files (5 representative format files + GA + 3 error files). Check that mock_data/bronze_table.json exists and mock_data/source_files/ contains files.
Step 3B: Create the tools Package
1 mintools/__init__.py

What & Why: Python requires an __init__.py file to treat a directory as an importable package. Without it, from tools.file_parser import parse will fail with ModuleNotFoundError: No module named 'tools'.

Create an empty file called tools/__init__.py:

# tools/__init__.py
# This file makes the tools/ directory a Python package.

Verify: python -c "import tools; print('tools package OK')"

Checkpoint
If you see tools package OK, the package is importable. If you see ModuleNotFoundError, make sure the file is at tools/__init__.py (not tool/__init__.py).
Step 4: Build the 5-Format Parser
30 mintools/file_parser.py

What & Why: Each state tester needs to read its state's source file and extract records into a common format. The parser auto-detects the file format from the registry and applies the correct parsing strategy. This is the "input side" of each validation — what the source says should be in Bronze.

The interesting engineering challenge here is that each format has its own gotchas: XML has namespaces, pipe-CSV can have pipes inside collateral text, fixed-width requires exact character positions, and JSON keys vary by state. The parser must handle all of these correctly or the validation results will be wrong — not because Bronze is wrong, but because the parser misread the source.

Create a new file called tools/file_parser.py:

"""Auto-detecting 5-format parser for UCC source files."""
import csv, json, io
from xml.etree import ElementTree as ET
from pathlib import Path

NS = {"ucc": "urn:sos:ucc"}

def parse(state_code: str, file_path: str, format_type: str) -> list[dict]:
    """Parse a source file into canonical record dicts.

    Args:
        state_code: 2-letter state code (e.g., "NY")
        file_path: Path to the source file
        format_type: One of xml, pipe_csv, comma_csv, fixed_width, json

    Returns:
        List of dicts with canonical field names

    Raises:
        ValueError: If file is truncated, wrong encoding, or unparseable
    """
    path = Path(file_path)
    if not path.exists():
        raise FileNotFoundError(f"Source file not found: {file_path}")
    if path.stat().st_size == 0:
        return []  # Empty file is valid (0 records)

    # Encoding check
    with open(file_path, "rb") as f:
        head = f.read(4)
    if head[:2] in (b'\xff\xfe', b'\xfe\xff'):
        raise ValueError(f"Expected UTF-8, detected UTF-16 (BOM: {head[:2].hex().upper()})")

    parsers = {
        "xml": _parse_xml,
        "pipe_csv": _parse_pipe_csv,
        "comma_csv": _parse_comma_csv,
        "fixed_width": _parse_fixed_width,
        "json": _parse_json,
    }
    parser = parsers.get(format_type)
    if not parser:
        raise ValueError(f"Unknown format: {format_type}")
    return parser(state_code, file_path)


def _parse_xml(state: str, path: str) -> list[dict]:
    tree = ET.parse(path)
    root = tree.getroot()
    records = []
    for filing in root.findall(".//ucc:Filing", NS) or root.findall(".//Filing"):
        fn = (filing.findtext("ucc:FileNumber", "", NS)
              or filing.findtext("FileNumber", ""))
        records.append({
            "filing_number": fn.strip(),
            "filing_type": (filing.findtext("ucc:Type", "", NS)
                            or filing.findtext("Type", "")).strip(),
            "filing_date": (filing.findtext("ucc:FilingDate", "", NS)
                            or filing.findtext("FilingDate", "")).strip(),
            "lapse_date": (filing.findtext("ucc:LapseDate", "", NS)
                           or filing.findtext("LapseDate", "")).strip() or None,
            "status": (filing.findtext("ucc:Status", "", NS)
                       or filing.findtext("Status", "")).strip().upper(),
            "debtor_name": _extract_name(filing, "Debtor"),
            "secured_party_name": _extract_name(filing, "SecuredParty"),
        })
    return records


def _extract_name(filing, tag):
    el = filing.find(f"ucc:{tag}/ucc:Name", NS) or filing.find(f"{tag}/Name")
    return el.text.strip().upper() if el is not None and el.text else ""


def _parse_pipe_csv(state: str, path: str) -> list[dict]:
    records = []
    with open(path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            parts = line.split("|")
            if len(parts) < 8:
                raise ValueError(f"Line {line_num}: expected 8+ pipe fields, got {len(parts)}")
            status_map = {"A": "ACTIVE", "T": "TERMINATED", "L": "LAPSED"}
            records.append({
                "filing_number": parts[0].strip(),
                "filing_type": parts[1].strip(),
                "filing_date": parts[2].strip(),
                "status": status_map.get(parts[3].strip(), parts[3].strip().upper()),
                "debtor_name": parts[4].strip().upper(),
                "secured_party_name": parts[7].strip().upper(),
            })
    return records


def _parse_comma_csv(state: str, path: str) -> list[dict]:
    records = []
    with open(path, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            records.append({
                "filing_number": row.get("filing_number", "").strip(),
                "filing_type": row.get("type", "").strip(),
                "filing_date": row.get("filing_date", "").strip(),
                "lapse_date": row.get("lapse_date", "").strip() or None,
                "status": row.get("status", "").strip().upper(),
                "debtor_name": row.get("debtor_name", "").strip().strip('"').upper(),
                "secured_party_name": row.get("sp_name", "").strip().strip('"').upper(),
            })
    return records


def _parse_fixed_width(state: str, path: str) -> list[dict]:
    records = []
    with open(path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            if len(line.rstrip()) < 30:
                raise ValueError(f"Line {line_num}: record truncated ({len(line.rstrip())} chars)")
            status_map = {"A": "ACTIVE", "T": "TERMINATED", "L": "LAPSED"}
            raw_date = line[35:45].strip()
            # Convert YYYYMMDD to YYYY-MM-DD
            if len(raw_date) == 8 and raw_date.isdigit():
                filing_date = f"{raw_date[:4]}-{raw_date[4:6]}-{raw_date[6:]}"
            else:
                filing_date = raw_date
            records.append({
                "filing_number": line[0:15].strip(),
                "filing_type": line[15:35].strip(),
                "filing_date": filing_date,
                "status": status_map.get(line[45:46], line[45:46].upper()),
                "debtor_name": line[46:86].strip().upper(),
                "secured_party_name": line[86:126].strip().upper(),
            })
    return records


def _parse_json(state: str, path: str) -> list[dict]:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    records = []
    for entry in data:
        date_raw = entry.get("dateFiled", "")
        filing_date = date_raw[:10] if "T" in date_raw else date_raw
        records.append({
            "filing_number": entry.get("fileNum", ""),
            "filing_type": entry.get("filingType", "").replace("-", ""),
            "filing_date": filing_date,
            "status": entry.get("currentStatus", "").upper(),
            "debtor_name": entry.get("parties", {}).get("debtor", {}).get("legalName", "").upper(),
            "secured_party_name": entry.get("parties", {}).get("securedParty", {}).get("legalName", "").upper(),
        })
    return records

Run: python -c "from tools.file_parser import parse; r = parse('NV', 'mock_data/source_files/NV_2024_Q4.json', 'json'); print(f'{len(r)} records, first: {r[0][\"filing_number\"]}')"

Expected output:

53 records, first: NV-2024-0001
Checkpoint
The parser successfully reads a JSON source file and returns structured records. Test with the XML file too: parse('NY', 'mock_data/source_files/NY_2024_Q4.xml', 'xml').
Step 5: Build the Bronze Query Tool
20 mintools/bronze_query.py

What & Why: This is the "output side" of each validation — what actually ended up in the Bronze table. For the local mock version, it reads from bronze_table.json. In production (Tier 2/3), this same interface would query BigQuery or DuckDB. The key insight: the state tester agent calls get_state_records("NY") regardless of whether the backend is JSON, DuckDB, or BigQuery.

Create a new file called tools/bronze_query.py:

"""Mock Bronze table query tool — reads from JSON file."""
import json
from pathlib import Path
from functools import lru_cache

BRONZE_PATH = "mock_data/bronze_table.json"

@lru_cache(maxsize=1)
def _load_bronze() -> list[dict]:
    """Load Bronze table once and cache in memory."""
    with open(BRONZE_PATH, "r") as f:
        return json.load(f)

def get_state_records(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> list[dict]:
    """Get all Bronze records for a specific state and load.

    This is the function the state tester calls. In production,
    this would query BigQuery instead of a JSON file.
    """
    return [r for r in _load_bronze()
            if r["source_state"] == state_code and r["load_id"] == load_id]

def get_record_count(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> int:
    """Count Bronze records for a state."""
    return len(get_state_records(state_code, load_id))

def get_all_states(load_id: str = "LOAD-2024Q4-SEED") -> list[str]:
    """Get all states present in Bronze for a given load."""
    return list(set(r["source_state"] for r in _load_bronze() if r["load_id"] == load_id))

def get_total_count(load_id: str = "LOAD-2024Q4-SEED") -> int:
    """Total records across all states."""
    return len([r for r in _load_bronze() if r["load_id"] == load_id])

Run: python -c "from tools.bronze_query import get_record_count, get_total_count; print(f'NY: {get_record_count(\"NY\")} records, Total: {get_total_count()}')"

Expected output:

NY: 50 records, Total: 2475
Checkpoint
Bronze query returns records filtered by state. The NY count should match what generate_mock_data.py created (capped at 50 per state for mock data).
Step 6: Implement the 12 Validation Checks
45 mintools/validation_checks.py

What & Why: This is the heart of the system — 12 functions that each test one specific aspect of data quality. Each check takes source records and Bronze records as input and returns a structured result (PASS/FAIL/WARN with details). The checks are independent, so they can run in any order and a failure in one doesn't block the others.

Let's walk through the checks in logical groups. First, the "can we even read the data?" check (SRC-01). Then structural checks (counts, duplicates). Then content checks (dates, statuses, names). Finally, the spot check that does a field-by-field comparison on random records.

Create a new file called tools/validation_checks.py:

"""12 Bronze canonical validation checks."""
import re, random
from datetime import datetime
from pydantic import BaseModel

class CheckResult(BaseModel):
    check_id: str
    check_name: str
    status: str  # PASS | FAIL | WARN
    details: str
    records_checked: int = 0
    records_failed: int = 0

REQUIRED_FIELDS = [
    "filing_number", "source_state", "filing_type",
    "filing_date", "status", "debtor_name",
]
VALID_TYPES = {"UCC1", "UCC3_AMENDMENT", "UCC3_CONTINUATION", "UCC3_TERMINATION"}
VALID_STATUSES = {"ACTIVE", "TERMINATED", "LAPSED"}
DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")


def src_01_parseable(source_records: list, parse_error: str | None) -> CheckResult:
    """SRC-01: Source file parseable without errors."""
    if parse_error:
        return CheckResult(check_id="SRC-01", check_name="Source file parseable",
                           status="FAIL", details=parse_error)
    return CheckResult(check_id="SRC-01", check_name="Source file parseable",
                       status="PASS", details=f"Parsed {len(source_records)} records",
                       records_checked=len(source_records))


def cnt_01_record_count(source: list, bronze: list) -> CheckResult:
    """CNT-01: Source record count matches Bronze record count."""
    diff = len(source) - len(bronze)
    if diff == 0:
        return CheckResult(check_id="CNT-01", check_name="Record count match",
                           status="PASS", details=f"{len(source)} source = {len(bronze)} bronze",
                           records_checked=len(source))
    return CheckResult(check_id="CNT-01", check_name="Record count match",
                       status="FAIL", details=f"source={len(source)}, bronze={len(bronze)}, diff={diff}",
                       records_checked=len(source), records_failed=abs(diff))


def fn_01_filing_integrity(source: list, bronze: list) -> CheckResult:
    """FN-01: Every source filing number exists in Bronze, no extras."""
    src_fns = {r["filing_number"] for r in source}
    brz_fns = {r["filing_number"] for r in bronze}
    missing = src_fns - brz_fns
    extra = brz_fns - src_fns
    if not missing and not extra:
        return CheckResult(check_id="FN-01", check_name="Filing number integrity",
                           status="PASS", details=f"All {len(src_fns)} filing numbers match",
                           records_checked=len(src_fns))
    parts = []
    if missing:
        parts.append(f"{len(missing)} missing from Bronze")
    if extra:
        parts.append(f"{len(extra)} extra in Bronze")
    return CheckResult(check_id="FN-01", check_name="Filing number integrity",
                       status="FAIL", details="; ".join(parts),
                       records_checked=len(src_fns), records_failed=len(missing)+len(extra))


def dup_01_no_duplicates(bronze: list) -> CheckResult:
    """DUP-01: No duplicate filing numbers in Bronze for this state."""
    fns = [r["filing_number"] for r in bronze]
    seen, dups = set(), set()
    for fn in fns:
        if fn in seen:
            dups.add(fn)
        seen.add(fn)
    if not dups:
        return CheckResult(check_id="DUP-01", check_name="No duplicates in Bronze",
                           status="PASS", details=f"{len(fns)} records, 0 duplicates",
                           records_checked=len(fns))
    return CheckResult(check_id="DUP-01", check_name="No duplicates in Bronze",
                       status="WARN", details=f"{len(dups)} duplicate filing numbers: {', '.join(list(dups)[:5])}",
                       records_checked=len(fns), records_failed=len(dups))


def ft_01_filing_type(bronze: list) -> CheckResult:
    """FT-01: All filing types are one of 4 canonical types."""
    bad = [r for r in bronze if r.get("filing_type") not in VALID_TYPES]
    if not bad:
        return CheckResult(check_id="FT-01", check_name="Filing type normalized",
                           status="PASS", details=f"All {len(bronze)} records have valid types",
                           records_checked=len(bronze))
    bad_types = set(r.get("filing_type") for r in bad)
    return CheckResult(check_id="FT-01", check_name="Filing type normalized",
                       status="FAIL", details=f"{len(bad)} records with invalid types: {bad_types}",
                       records_checked=len(bronze), records_failed=len(bad))


def dt_01_date_format(bronze: list) -> CheckResult:
    """DT-01: All dates in YYYY-MM-DD format."""
    bad = []
    for r in bronze:
        for field in ["filing_date", "lapse_date"]:
            val = r.get(field)
            if val and not DATE_RE.match(str(val)):
                bad.append(r["filing_number"])
                break
    if not bad:
        return CheckResult(check_id="DT-01", check_name="Date format normalized",
                           status="PASS", details=f"All dates in YYYY-MM-DD",
                           records_checked=len(bronze))
    return CheckResult(check_id="DT-01", check_name="Date format normalized",
                       status="FAIL", details=f"{len(bad)} records have non-YYYY-MM-DD dates",
                       records_checked=len(bronze), records_failed=len(bad))


def dt_02_date_values(bronze: list) -> CheckResult:
    """DT-02: No future filing dates, no lapse before filing."""
    bad = []
    today = datetime.now().date()
    for r in bronze:
        fd_str = r.get("filing_date", "")
        ld_str = r.get("lapse_date")
        if not DATE_RE.match(str(fd_str)):
            continue  # DT-01 catches format issues
        fd = datetime.strptime(fd_str, "%Y-%m-%d").date()
        if fd > today:
            bad.append(f"{r['filing_number']}: future filing date {fd_str}")
        if ld_str and DATE_RE.match(str(ld_str)):
            ld = datetime.strptime(ld_str, "%Y-%m-%d").date()
            if ld < fd:
                bad.append(f"{r['filing_number']}: lapse {ld_str} before filing {fd_str}")
    if not bad:
        return CheckResult(check_id="DT-02", check_name="Date values valid",
                           status="PASS", details="No future dates or lapse-before-filing",
                           records_checked=len(bronze))
    return CheckResult(check_id="DT-02", check_name="Date values valid",
                       status="FAIL", details=f"{len(bad)} issues: {'; '.join(bad[:3])}",
                       records_checked=len(bronze), records_failed=len(bad))


def st_01_status(bronze: list) -> CheckResult:
    """ST-01: All status values are ACTIVE, TERMINATED, or LAPSED."""
    bad = [r for r in bronze if r.get("status") not in VALID_STATUSES]
    if not bad:
        return CheckResult(check_id="ST-01", check_name="Status normalized",
                           status="PASS", details=f"All {len(bronze)} records have valid status",
                           records_checked=len(bronze))
    return CheckResult(check_id="ST-01", check_name="Status normalized",
                       status="FAIL", details=f"{len(bad)} records with invalid status",
                       records_checked=len(bronze), records_failed=len(bad))


def nm_01_name_normalization(bronze: list) -> CheckResult:
    """NM-01: Names are uppercase, trimmed, no double spaces."""
    bad = []
    for r in bronze:
        for field in ["debtor_name", "secured_party_name"]:
            val = r.get(field, "")
            if val != val.upper() or val != val.strip() or "  " in val:
                bad.append(r["filing_number"])
                break
    if not bad:
        return CheckResult(check_id="NM-01", check_name="Name normalization",
                           status="PASS", details="All names uppercase, trimmed",
                           records_checked=len(bronze))
    return CheckResult(check_id="NM-01", check_name="Name normalization",
                       status="FAIL", details=f"{len(bad)} records with unnormalized names",
                       records_checked=len(bronze), records_failed=len(bad))


def nl_01_required_not_null(bronze: list) -> CheckResult:
    """NL-01: 6 required fields are never null/empty."""
    bad = []
    for r in bronze:
        for field in REQUIRED_FIELDS:
            if not r.get(field):
                bad.append(f"{r.get('filing_number', '?')}: {field} is null")
                break
    if not bad:
        return CheckResult(check_id="NL-01", check_name="Required fields not null",
                           status="PASS", details=f"All {len(REQUIRED_FIELDS)} required fields populated",
                           records_checked=len(bronze))
    return CheckResult(check_id="NL-01", check_name="Required fields not null",
                       status="FAIL", details=f"{len(bad)} records with null required fields",
                       records_checked=len(bronze), records_failed=len(bad))


def lm_01_load_metadata(bronze: list) -> CheckResult:
    """LM-01: source_file, load_id, load_timestamp populated."""
    bad = []
    for r in bronze:
        if not r.get("source_file") or not r.get("load_id") or not r.get("load_timestamp"):
            bad.append(r.get("filing_number", "?"))
    if not bad:
        return CheckResult(check_id="LM-01", check_name="Load metadata present",
                           status="PASS", details="All records have source_file, load_id, load_timestamp",
                           records_checked=len(bronze))
    return CheckResult(check_id="LM-01", check_name="Load metadata present",
                       status="FAIL", details=f"{len(bad)} records missing load metadata",
                       records_checked=len(bronze), records_failed=len(bad))


def sp_01_spot_check(source: list, bronze: list, n: int = 3) -> CheckResult:
    """SP-01: Random records match field-by-field source to Bronze."""
    if not source or not bronze:
        return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
                           status="PASS", details="No records to spot-check")
    brz_map = {r["filing_number"]: r for r in bronze}
    sample = random.sample(source, min(n, len(source)))
    mismatches = []
    for src in sample:
        brz = brz_map.get(src["filing_number"])
        if not brz:
            mismatches.append(f"{src['filing_number']}: not in Bronze")
            continue
        if src.get("debtor_name", "").upper() != brz.get("debtor_name", "").upper():
            mismatches.append(f"{src['filing_number']}: debtor name mismatch")
    if not mismatches:
        return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
                           status="PASS", details=f"{len(sample)} random records match",
                           records_checked=len(sample))
    return CheckResult(check_id="SP-01", check_name="Spot check (3 records)",
                       status="FAIL", details="; ".join(mismatches),
                       records_checked=len(sample), records_failed=len(mismatches))


# Registry of all 12 checks for easy iteration
ALL_CHECKS = [
    "SRC-01", "CNT-01", "FN-01", "DUP-01", "FT-01", "DT-01",
    "DT-02", "ST-01", "NM-01", "NL-01", "LM-01", "SP-01",
]

Run: python -c "from tools.validation_checks import ALL_CHECKS; print(f'{len(ALL_CHECKS)} checks: {ALL_CHECKS}')"

Expected output:

12 checks: ['SRC-01', 'CNT-01', 'FN-01', 'DUP-01', 'FT-01', 'DT-01', 'DT-02', 'ST-01', 'NM-01', 'NL-01', 'LM-01', 'SP-01']
Checkpoint
All 12 checks are importable. Each returns a CheckResult with status, details, and counts.
Step 7: Build the State Tester Agent
30 minstate_tester.py

What & Why: This is the subagent — one instance runs per state. It runs the deterministic validation pipeline: parse the source file, query Bronze, execute all 12 checks, and emit a structured report. The "agent" label here is the parallel-coordinator pattern from M14B/M15B, not a Claude-loop agent — everything inside test_state() is pure Python so the harness is fast (~20s for 50 states) and free to run on every commit. A Claude-driven failure-analysis step is offered as a stretch goal in the "Going Further" section.

Create a new file called state_tester.py:

"""State tester subagent — validates one state's Bronze load."""
import sys, json, time
from pathlib import Path
from config import STATES
from tools.file_parser import parse
from tools.bronze_query import get_state_records
from tools import validation_checks as vc

def test_state(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> dict:
    """Run all 12 validation checks for a single state.

    Returns a dict with state, status, results list, and timing.
    """
    start = time.time()
    cfg = STATES.get(state_code)
    if not cfg:
        return {"state": state_code, "status": "FAIL",
                "results": [{"check_id": "CFG", "status": "FAIL",
                              "details": f"Unknown state: {state_code}"}],
                "duration": 0}

    # Step 1: Parse source file
    source_dir = Path("mock_data/source_files")
    ext_map = {"xml": "xml", "pipe_csv": "csv", "comma_csv": "csv",
               "fixed_width": "dat", "json": "json"}
    ext = ext_map[cfg.format_type]
    source_path = source_dir / f"{state_code}_2024_Q4.{ext}"

    source_records = []
    parse_error = None
    if source_path.exists():
        try:
            source_records = parse(state_code, str(source_path), cfg.format_type)
        except Exception as e:
            parse_error = str(e)
    else:
        # No source file for this state in mock data — use Bronze count as proxy
        pass

    # Step 2: Query Bronze
    bronze_records = get_state_records(state_code, load_id)

    # Step 3: Run all 12 checks
    results = []
    results.append(vc.src_01_parseable(source_records, parse_error))

    # If no source file, skip source-vs-bronze checks
    if source_path.exists() and not parse_error:
        results.append(vc.cnt_01_record_count(source_records, bronze_records))
        results.append(vc.fn_01_filing_integrity(source_records, bronze_records))
    else:
        results.append(vc.CheckResult(check_id="CNT-01", check_name="Record count match",
                                       status="PASS", details="No source file; Bronze-only validation"))
        results.append(vc.CheckResult(check_id="FN-01", check_name="Filing number integrity",
                                       status="PASS", details="No source file; Bronze-only validation"))

    results.append(vc.dup_01_no_duplicates(bronze_records))
    results.append(vc.ft_01_filing_type(bronze_records))
    results.append(vc.dt_01_date_format(bronze_records))
    results.append(vc.dt_02_date_values(bronze_records))
    results.append(vc.st_01_status(bronze_records))
    results.append(vc.nm_01_name_normalization(bronze_records))
    results.append(vc.nl_01_required_not_null(bronze_records))
    results.append(vc.lm_01_load_metadata(bronze_records))

    if source_path.exists() and not parse_error:
        results.append(vc.sp_01_spot_check(source_records, bronze_records))
    else:
        results.append(vc.CheckResult(check_id="SP-01", check_name="Spot check",
                                       status="PASS", details="No source file for spot check"))

    # Determine overall status
    statuses = [r.status for r in results]
    if "FAIL" in statuses:
        overall = "FAIL"
    elif "WARN" in statuses:
        overall = "WARN"
    else:
        overall = "PASS"

    duration = time.time() - start
    return {
        "state": state_code,
        "status": overall,
        "results": [r.model_dump() for r in results],
        "record_count": len(bronze_records),
        "duration": round(duration, 3),
    }


if __name__ == "__main__":
    state = sys.argv[1] if len(sys.argv) > 1 else "NY"
    load = sys.argv[2] if len(sys.argv) > 2 else "LOAD-2024Q4-SEED"
    result = test_state(state, load)
    print(json.dumps(result, indent=2))

Run: python state_tester.py NY

Expected output: A JSON object showing NY's results — 11 PASS and 1 FAIL on DT-02 (the two records with lapse before filing date).

Run: python state_tester.py GA

Expected output: GA fails DT-01 (date format not normalized).

Checkpoint
NY shows DT-02 FAIL (lapse before filing). GA shows DT-01 FAIL (DD/MM/YYYY dates). Both are the deliberate errors baked into mock data — your agent caught them!
Step 8: Build the Coordinator Agent
45 mincoordinator.py

What & Why: The coordinator is the top-level agent. It reads the load manifest, spawns one state tester per state (all 50 in parallel using Python's concurrent.futures), collects all results, and generates the final dashboard. This is the core multi-agent orchestration pattern: one boss, many workers, results aggregated into a single view.

The coordinator uses ThreadPoolExecutor for parallelism. Why threads instead of async? Because each state tester is CPU-light (JSON parsing) and I/O-light (file reads), so threads are sufficient. In production with real BigQuery calls, you'd use asyncio for better I/O concurrency.

Create a new file called coordinator.py:

"""Coordinator agent — orchestrates 50 parallel state testers."""
import sys, json, time, argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import STATES, TOTAL_EXPECTED_RECORDS
from state_tester import test_state

def run_coordinator(load_id: str, mode: str = "full_seed",
                    baseline: str | None = None, max_workers: int = 50) -> dict:
    """Run Bronze validation across all 50 states in parallel.

    Args:
        load_id: The load batch ID to validate
        mode: full_seed | incremental | change_detection
        baseline: Previous load_id for incremental comparison
        max_workers: Thread pool size (50 = all 50 states concurrent;
            drop to 10 if your runtime has memory pressure or
            quota-limited downstream APIs)

    Returns:
        Aggregated results dict with summary and per-state details
    """
    start = time.time()
    print(f"\n{'='*60}")
    print(f"BRONZE CANONICAL LOAD VALIDATION")
    print(f"Mode: {mode.upper()} | Load ID: {load_id}")
    if baseline:
        print(f"Baseline: {baseline}")
    print(f"{'='*60}\n")

    # Determine which states to test
    states_to_test = list(STATES.keys())
    print(f"Spawning {len(states_to_test)} state testers...\n")

    # Run all state testers in parallel
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(test_state, state, load_id): state
            for state in states_to_test
        }
        for future in as_completed(futures):
            state = futures[future]
            try:
                result = future.result()
                results[state] = result
                icon = {"PASS": "+", "FAIL": "x", "WARN": "!"}[result["status"]]
                print(f"  [{icon}] {state}: {result['status']} "
                      f"({result['record_count']} records, {result['duration']}s)")
            except Exception as e:
                results[state] = {
                    "state": state, "status": "FAIL",
                    "results": [{"check_id": "ERR", "status": "FAIL", "details": str(e)}],
                    "record_count": 0, "duration": 0,
                }
                print(f"  [x] {state}: ERROR - {e}")

    duration = time.time() - start

    # Aggregate results
    pass_count = sum(1 for r in results.values() if r["status"] == "PASS")
    fail_count = sum(1 for r in results.values() if r["status"] == "FAIL")
    warn_count = sum(1 for r in results.values() if r["status"] == "WARN")
    total_records = sum(r["record_count"] for r in results.values())

    failed_states = [s for s, r in results.items() if r["status"] == "FAIL"]
    warn_states = [s for s, r in results.items() if r["status"] == "WARN"]

    # Print dashboard
    print(f"\n{'='*60}")
    print(f"SUMMARY: {pass_count} PASS | {fail_count} FAIL | {warn_count} WARN")
    print(f"Records: {total_records} bronze")
    print(f"Duration: {duration:.1f} seconds ({len(states_to_test)} states in parallel)")

    if failed_states:
        print(f"\nFAILURES:")
        for state in failed_states:
            failed_checks = [r for r in results[state]["results"]
                             if r["status"] == "FAIL"]
            for fc in failed_checks:
                print(f"  x {state} - {fc['check_id']}: {fc['details']}")

    if warn_states:
        print(f"\nWARNINGS:")
        for state in warn_states:
            warn_checks = [r for r in results[state]["results"]
                           if r["status"] == "WARN"]
            for wc in warn_checks:
                print(f"  ! {state} - {wc['check_id']}: {wc['details']}")

    print(f"\nPER-STATE:")
    print(f"  {'State':<6} {'Records':>8} {'Status':<6} Failed Checks")
    for state in sorted(results.keys()):
        r = results[state]
        failed = [c["check_id"] for c in r["results"] if c["status"] == "FAIL"]
        failed_str = ", ".join(failed) if failed else ""
        print(f"  {state:<6} {r['record_count']:>8} {r['status']:<6} {failed_str}")

    print(f"{'='*60}\n")

    return {
        "load_id": load_id,
        "mode": mode,
        "overall_status": "FAIL" if fail_count > 0 else ("WARN" if warn_count > 0 else "PASS"),
        "summary": {
            "pass_count": pass_count, "fail_count": fail_count, "warn_count": warn_count,
            "total_states": len(states_to_test), "total_records": total_records,
            "duration_seconds": round(duration, 1),
            "failed_states": failed_states, "warn_states": warn_states,
        },
        "per_state": results,
    }


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Bronze Canonical Load Validator")
    parser.add_argument("--load-id", default="LOAD-2024Q4-SEED")
    parser.add_argument("--mode", default="full_seed",
                        choices=["full_seed", "incremental", "change_detection"])
    parser.add_argument("--baseline", default=None)
    parser.add_argument("--workers", type=int, default=50)
    args = parser.parse_args()

    result = run_coordinator(args.load_id, args.mode, args.baseline, args.workers)

    # Save report
    with open(f"report_{args.load_id}.json", "w") as f:
        json.dump(result, f, indent=2)
    print(f"Report saved to report_{args.load_id}.json")

Run: python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed

Expected output: A dashboard showing results for all 50 states. NY should FAIL (DT-02), GA should FAIL (DT-01), NV should WARN (DUP-01), and the remaining 47 states should PASS.

Checkpoint
The coordinator dashboard shows 47 PASS, 2 FAIL (NY, GA), 1 WARN (NV). All 50 states are tested in parallel. If you see different counts, check your mock data generation (Step 3) for the deliberate errors.
Troubleshooting
  • All states PASS (no failures) → Re-run python generate_mock_data.py to regenerate mock data with injected errors
  • FileNotFoundError: mock_data/bronze_table.json → You must run Step 3 (generate_mock_data.py) before the coordinator
  • Results are slow → Reduce --workers or check that mock data files are small

Three Load Scenarios

The coordinator supports three validation modes. Each mirrors a real production scenario: is this the first-ever load, an incremental update, or a format change?

Scenario A: Full Seed Load

When it runs: Initial load or full reload into an empty Bronze table.

Runs the 12 standard per-state checks plus 4 seed-specific checks (SEED-01 through SEED-04): verifying no pre-existing data, all 50 states present, total record reconciliation, and manifest accuracy.

Command: python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed

Scenario B: Incremental Load

When it runs: Quarterly update — adding new data on top of existing Bronze records.

Runs 12 standard checks plus 8 incremental checks (INC-01 through INC-08): verifying new records added correctly, no duplicates from re-load, existing records untouched, UCC-3 amendments/continuations/terminations link properly, and unchanged states have no phantom inserts.

Command: python coordinator.py --load-id LOAD-2025Q1-INC --mode incremental --baseline LOAD-2024Q4-SEED

Scenario C: Change Detection

When it runs: Pre-validation step when a state's source file looks different from previous quarters.

Runs 10 change-specific checks (CHG-01 through CHG-10): format type, column count, column names, new/removed columns, data type consistency, delimiter, record volume, date format, and encoding. This catches schema drift before bad data enters Bronze.

Command: python coordinator.py --load-id LOAD-2025Q1-INC --mode change_detection

The Critical Insight

The coordinator agent and state tester agents are identical across all three scenarios. Only the set of checks changes. This is the power of the tool abstraction pattern — the agent's reasoning loop stays the same, the tools determine what it validates.

Design Note: Why Pure Python First?

You may notice the core pipeline (Steps 2–8) uses pure Python — no Claude API calls. This is intentional. The agent pattern is the architecture (coordinator + parallel subagents + tool abstraction), not the LLM calls. By building the validation logic in deterministic Python first, you can test and debug it without API costs or latency. The "Going Further" section shows how to add Claude-powered error analysis on top of this foundation — using the LLM where it adds value (interpreting failures, diagnosing root causes) rather than for deterministic checks where Python is faster and cheaper.

Tier 1: Local Production Deployment

No cloud account needed. Uses Docker + DuckDB + local filesystem. This is a real production setup — containerized, with a database, API server, file watcher, and dashboard — just running on your laptop.

About Tier 2 (GCP) and Tier 3 (AWS): The capstone ships only Tier 1 as runnable code. The cloud tiers swap DuckDB → BigQuery, local files → GCS/S3, and the file watcher → Pub/Sub or EventBridge — but the coordinator and state-tester code remain byte-for-byte identical. See "Going Further" for the swap recipe and Q3 for the architectural takeaway.

Step 9: Replace JSON with DuckDB
15 mintools/bronze_query_local.py

What & Why: DuckDBAn embedded analytical database, like SQLite but optimized for column-oriented analytics queries. Runs in-process with zero setup — just pip install and go. replaces the JSON file with a real SQL database. This is important because the production version uses BigQuery, and DuckDB lets you write the same SQL queries locally. The agent calls get_state_records("NY") either way — only the backend changes.

Install DuckDB: pip install duckdb

Create a new file called tools/bronze_query_local.py:

"""DuckDB Bronze table — local production replacement for BigQuery."""
import duckdb, json, os

DB_PATH = os.environ.get("DB_PATH", "data/bronze.duckdb")

def init_db():
    """Create Bronze table and load mock data into DuckDB."""
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    con = duckdb.connect(DB_PATH)
    con.execute("""
        CREATE TABLE IF NOT EXISTS bronze_filings (
            filing_number VARCHAR, source_state VARCHAR(2),
            filing_type VARCHAR, filing_date VARCHAR,
            lapse_date VARCHAR, status VARCHAR,
            debtor_name VARCHAR, debtor_address VARCHAR,
            debtor_org_type VARCHAR, secured_party_name VARCHAR,
            secured_party_address VARCHAR, collateral_description VARCHAR,
            source_file VARCHAR, load_id VARCHAR, load_timestamp VARCHAR
        )
    """)
    con.execute("DELETE FROM bronze_filings")  # Clean reload
    with open("mock_data/bronze_table.json") as f:
        records = json.load(f)
    for r in records:
        con.execute(
            "INSERT INTO bronze_filings VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
            [r.get(k, "") for k in [
                "filing_number", "source_state", "filing_type", "filing_date",
                "lapse_date", "status", "debtor_name", "debtor_address",
                "debtor_org_type", "secured_party_name", "secured_party_address",
                "collateral_description", "source_file", "load_id", "load_timestamp",
            ]]
        )
    con.close()
    print(f"Loaded {len(records)} records into DuckDB at {DB_PATH}")

def get_state_records(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> list[dict]:
    """Same interface as JSON version — drop-in replacement."""
    con = duckdb.connect(DB_PATH, read_only=True)
    result = con.execute(
        "SELECT * FROM bronze_filings WHERE source_state = ? AND load_id = ?",
        [state_code, load_id]
    ).fetchdf().to_dict('records')
    con.close()
    return result

def get_record_count(state_code: str, load_id: str = "LOAD-2024Q4-SEED") -> int:
    con = duckdb.connect(DB_PATH, read_only=True)
    count = con.execute(
        "SELECT COUNT(*) FROM bronze_filings WHERE source_state = ? AND load_id = ?",
        [state_code, load_id]
    ).fetchone()[0]
    con.close()
    return count

if __name__ == "__main__":
    init_db()

Run: python -m tools.bronze_query_local

Expected output:

Loaded 2475 records into DuckDB at data/bronze.duckdb

Verify: python -c "from tools.bronze_query_local import get_record_count; print(get_record_count('NY'))"

Checkpoint
DuckDB returns the same record count as the JSON version. You now have a real SQL database backing your Bronze table.
Step 10: Docker Compose Local Stack
20 mindocker-compose.yml + Dockerfile

What & Why: Docker Compose runs three containers: the test agent API (FastAPI), a file watcher that auto-triggers validation when new source files appear, and an HTML dashboard that shows results from DuckDB. This mirrors the production architecture (Cloud Run + Pub/Sub + Grafana) but runs entirely on your machine.

Create Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-m", "uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]

Create requirements.txt:

anthropic>=0.39.0
pydantic>=2.0
rich>=13.0
duckdb>=1.0
fastapi>=0.110
uvicorn>=0.27
httpx>=0.27

Create docker-compose.yml:

version: '3.8'
services:
  test-agent:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - DEPLOYMENT_TIER=local
      - DB_PATH=/data/bronze.duckdb
    volumes:
      - ./data:/data
      - ./mock_data:/app/mock_data
      - ./reports:/app/reports

  dashboard:
    build: .
    command: python -m uvicorn dashboard_server:app --host 0.0.0.0 --port 8080
    ports:
      - "8080:8080"
    environment:
      - DB_PATH=/data/bronze.duckdb
    volumes:
      - ./data:/data

Create server.py:

"""FastAPI server for Bronze validation agent."""
import json
from fastapi import FastAPI, BackgroundTasks
from coordinator import run_coordinator

app = FastAPI(title="Bronze Validation Agent")

@app.get("/health")
def health():
    return {"status": "healthy"}

@app.post("/validate")
def validate(load_id: str = "LOAD-2024Q4-SEED", mode: str = "full_seed"):
    """Run Bronze validation and return results."""
    result = run_coordinator(load_id, mode)
    return result

@app.get("/results/{load_id}")
def get_results(load_id: str):
    """Retrieve saved report for a previous run."""
    try:
        with open(f"reports/report_{load_id}.json") as f:
            return json.load(f)
    except FileNotFoundError:
        return {"error": f"No report found for {load_id}"}

Create dashboard_server.py:

"""Dashboard API — serves validation results from DuckDB."""
import os, json, glob
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI(title="Bronze Validation Dashboard")

@app.get("/health")
def health():
    return {"status": "healthy"}

@app.get("/", response_class=HTMLResponse)
def dashboard():
    """Serve a simple HTML dashboard of the latest results."""
    reports = sorted(glob.glob("reports/report_*.json"), reverse=True)
    if not reports:
        return "<h1>No validation reports found</h1><p>Run a validation first.</p>"
    with open(reports[0]) as f:
        data = json.load(f)
    summary = data.get("summary", {})
    html = f"""<html><body style="font-family:monospace;background:#0A1628;color:#E8ECF1;padding:2rem;">
    <h1>Bronze Validation Dashboard</h1>
    <p>Load: {data.get('load_id')} | Mode: {data.get('mode')}</p>
    <p style="font-size:1.5rem;">
      <span style="color:#10B981;">{summary.get('pass_count',0)} PASS</span> |
      <span style="color:#F43F5E;">{summary.get('fail_count',0)} FAIL</span> |
      <span style="color:#F59E0B;">{summary.get('warn_count',0)} WARN</span>
    </p>
    <p>{summary.get('total_records',0)} records | {summary.get('duration_seconds',0)}s</p>
    </body></html>"""
    return html

Run: docker compose up -d

Verify: curl http://localhost:8000/health (should return {"status":"healthy"})

Checkpoint
Two containers running — the test agent API on port 8000 and the dashboard on port 8080. If Docker isn't available, you can run the FastAPI server directly with uvicorn server:app --port 8000.

Test Cases

Run these 5 scenarios to verify your system handles all cases correctly.

#ScenarioCommandExpectedType
1Clean state (CA) python state_tester.py CA All 12 checks PASS Happy
2Full coordinator run python coordinator.py 47 PASS, 2 FAIL, 1 WARN Happy
3Known failure (NY) python state_tester.py NY DT-02 FAIL (lapse before filing) Happy
4Duplicate detection (NV) python state_tester.py NV DUP-01 WARN (3 duplicates) Edge
5Unknown state code python state_tester.py ZZ CFG FAIL (unknown state) Error
Final Verification

Run the complete pipeline end-to-end:

python coordinator.py --load-id LOAD-2024Q4-SEED --mode full_seed

You should see a full dashboard with 47 PASS, 2 FAIL (NY on DT-02, GA on DT-01), and 1 WARN (NV on DUP-01). A JSON report is saved automatically.

Congratulations! You've built a parallel state testing agent that validates 50 states simultaneously, catches data quality issues, and produces a structured dashboard — all in under 20 seconds.

Going Further (Optional)

  1. MCP Server for Conversational Testing — Build an MCP server so you can ask Claude: "Hey, how did Georgia's load go?" and get an answer from the test results database.
  2. Fix Recommendation Agent — Add a second agent that analyzes FAIL results and suggests specific transformation rule updates (e.g., "GA's date parser should handle DD/MM/YYYY input").
  3. Trend Analysis — Store results from multiple quarters and flag anomalies (e.g., "Ohio's record count dropped 40% from Q3 to Q4 — is this real or a data feed issue?").
  4. Upgrade to Tier 2 (GCP) — Swap DuckDB for BigQuery, local files for GCS, file watcher for Pub/Sub. The coordinator and state tester code stays identical.
  5. Add Claude-Powered Error Analysis — When a check fails, send the failed records to Claude and ask it to diagnose the root cause and suggest a fix.

Knowledge Check

Q1: Why does the coordinator spawn 50 separate state testers instead of running one agent that tests all 50 states sequentially?

Q2: Check DT-01 verifies date FORMAT (YYYY-MM-DD). Check DT-02 verifies date VALUES (no future dates, no lapse before filing). Why are these separate checks?

Q3: The coordinator and state tester agents are identical across Tier 1 (local), Tier 2 (GCP), and Tier 3 (AWS). What changes between tiers?

Q4: Nevada's DUP-01 check returns WARN (not FAIL) for 3 duplicate filing numbers. Why WARN instead of FAIL?

Q5: In Scenario C (Change Detection), why does the agent check for format changes BEFORE loading data into Bronze?

Q6: The empty state file (EMPTY_STATE.csv) passes ALL checks. Why is "0 source = 0 Bronze" considered valid?