Building AI Agents with Claude
Capstone Project 4
Capstone 4 of 52–3 hoursDomain C — Public Records / UCC
← Capstone 3-C 🏠 Home M23: Capstone Guide →

Capstone 4 — Domain C: UCC Data Pipeline Orchestration

Build a multi-agent pipeline with four specialized agents, human-in-the-loop review for entity conflicts, and a circuit breaker for data quality failures.

Project Brief

Business Context

A commercial data provider ingests bulk UCC filing data from 50+ Secretary of State offices. Each state delivers data in a different format — Delaware sends XML, New York sends fixed-width text, Texas sends CSV. A single batch from Delaware might contain 245 filings that need to be parsed, normalized, quality-checked, and converted into risk reports.

The pain of doing this manually is both cost and error rate. Human data engineers process about 50 filings per hour. At 245 filings per batch and 50+ states, that is hundreds of engineering hours per month. Worse, manual processing has a 3–5% error rate on entity name normalization — which means 3–5% of lien risk profiles are wrong.

Your multi-agent pipeline automates this: an Ingestion AgentThe first agent in the pipeline. It detects file formats, runs the appropriate parser, and validates the raw data against expected schemas. If the parse error rate exceeds 10%, it triggers the circuit breaker to quarantine the batch. parses raw files, a Transformation AgentThe second agent. It normalizes entity names, classifies collateral types, and maps raw data into the canonical schema (Bronze → Silver layer in the Medallion Architecture). Handles entity resolution with fuzzy matching. normalizes the data, a Quality AgentThe third agent. It runs data quality checks: completeness (all required fields present), consistency (dates make sense, states match), and freshness (filing dates are recent). Flags anomalies like sudden drops in filing counts. validates quality, and a Reporting AgentThe fourth agent. It generates lien summary reports, risk profiles, and portfolio exposure analytics from the validated data (Silver → Gold layer). Applies PII redaction for public-facing reports. generates risk reports. A data steward reviews entity resolution conflicts where confidence is below 80%.

What You Will Build

A four-agent pipeline with:

  • Agent 1 — Ingestion: Parse raw filing data, validate format, detect errors. Tools: detect_file_format, parse_filing_batch, validate_schema
  • Agent 2 — Transformation: Normalize names, classify collateral, resolve entities. Tools: normalize_entity_name, classify_collateral, resolve_entity
  • Agent 3 — Quality: Check completeness, consistency, freshness. Flag anomalies. Tools: run_quality_checks, detect_anomalies, generate_quality_scorecard
  • Agent 4 — Reporting: Generate risk profiles and lien summaries. Tools: generate_risk_profile, generate_lien_summary, redact_pii

Production patterns:

  • Human-in-the-loopA pattern where the pipeline pauses and routes a decision to a human reviewer. In this project, entity resolution conflicts with confidence below 80% go to a data steward who can match, create a new entity, or flag for investigation.: Data steward reviews entity resolution conflicts (confidence < 80%)
  • Circuit breakerAn automatic safety mechanism that halts the pipeline when error rates exceed a threshold. If more than 10% of records fail parsing, the batch is quarantined and the data engineering team is alerted. Prevents bad data from flowing downstream.: If parse error rate exceeds 10%, quarantine the batch and alert the team
  • Structured state: Typed pipeline state object flows between agents (not free text)

Prerequisites

Complete these modules before starting this capstone:

  • M12 — ReAct Pattern: You need the agentic loop pattern (observe → think → act) that each agent in this pipeline uses internally.
  • M14 — Multi-Agent Systems: You need to understand how to wire multiple agents together with structured state passing and handoffs.
  • M16 — Input Guardrails: The Ingestion Agent applies file-integrity and per-state schema validation at the door — M16 covers those guardrail patterns.
  • M17 — Human-in-the-Loop & Guardrails: This capstone's HITL review flow and circuit breaker pattern come directly from M17.
  • M18 — Evaluation: The pytest test suite at the end uses M18’s scoring patterns to gate releases (the production capstone, 5-C, scales this to a 100-case harness).

You should also be comfortable with Python classes, JSON data structures, and running scripts from the command line.

Environment Setup

What You'll Build

A multi-agent UCC data pipeline with four specialized agents, human-in-the-loop entity resolution, and a circuit breaker for data quality failures.

Time estimate: 2–3 hours   Difficulty: ★★★★☆

Requirements

Install Everything

Copy and paste this entire block into your terminal:

mkdir capstone-4-multi-agent-ucc && cd capstone-4-multi-agent-ucc
python3 -m venv venv && source venv/bin/activate
pip install anthropic pydantic
export ANTHROPIC_API_KEY=your-key-here
mkdir capstone-4-multi-agent-ucc && cd capstone-4-multi-agent-ucc
python -m venv venv && venv\Scripts\activate
pip install anthropic pydantic
set ANTHROPIC_API_KEY=your-key-here
Checkpoint

Run python -c "import anthropic; print('OK')". You should see OK. If you see ModuleNotFoundError, make sure your virtual environment is activated.

File Structure

Here is every file you will create in this capstone. The build guide walks through them in order.

capstone-4-multi-agent-ucc/
├── mock_data.py            # 15 realistic UCC filings + helper functions
├── agents/
│   ├── __init__.py         # Package init
│   ├── ingestion_agent.py  # Agent 1: format detection, parsing, schema validation
│   ├── transform_agent.py  # Agent 2: entity normalization, collateral classification
│   ├── quality_agent.py    # Agent 3: completeness, consistency, anomaly detection
│   └── reporting_agent.py  # Agent 4: risk profiles, lien summaries, PII redaction
├── quality_gate.py         # Circuit breaker + HITL review logic
├── coordinator.py          # Main orchestrator wiring all 4 agents together
├── test_pipeline.py        # 5 test scenarios (happy, edge, error)
└── requirements.txt        # anthropic, pydantic

Domain Glossary

Medallion Architecture
A data pipeline pattern with three layers: Bronze (raw ingestion), Silver (cleaned/normalized), Gold (business-ready analytics). Each layer adds more structure and value.
Circuit Breaker
An automatic safety mechanism that halts processing when error rates exceed a threshold. Prevents bad data from flowing downstream and contaminating reports.
Data Steward
A human reviewer responsible for resolving data quality issues that automated systems cannot handle — like ambiguous entity matches or anomalous filing patterns.
Pipeline State
A typed object that tracks the batch through all four agents. Each agent reads the previous agent's output and writes its own. Enables observability and resumability.
Collateral Classification
Mapping free-text collateral descriptions ("all assets incl inventory, accts recv, equip") to standardized UCC categories (inventory, accounts, equipment).
Quality Scorecard
A per-batch report showing completeness %, consistency %, anomaly count, and an overall quality grade. Used by data stewards to decide if a batch is production-ready.

Pipeline Architecture

Multi-Agent Pipeline — Ingestion → Transformation → Quality → Reporting
📥
Ingestion
detect_format, parse_batch, validate_schema
>10% errors = HALT
🔄
Transform
normalize_name, classify_collateral, resolve_entity
HITL: confidence < 80%
Quality
run_checks, detect_anomalies, scorecard
📈
Reporting
risk_profile, lien_summary, redact_pii

Circuit Breaker Pattern

The circuit breaker monitors the parse error rate during ingestion. If more than 10% of records fail parsing, the batch is quarantined and the pipeline halts — preventing bad data from flowing into transformation, quality checks, and reports.

Circuit Breaker — Error Rate Monitor
Parse Error Counter (threshold: 10%)
1
2
3
4
5
6
7
8
9
10
Records 1–10 of 100 batch
HEALTHY

Human-in-the-Loop Review Flow

When the Transformation Agent encounters an entity resolution match with confidence below 80%, the pipeline pauses and routes the conflict to a data steward. The steward reviews the two candidate entities, the match score, and the filing details, then decides: merge, keep separate, or flag for further review. Once the decision is recorded, the pipeline resumes with the steward's resolution applied.

HITL Review Flow — Entity Conflict → Pause → Steward Review → Resume
🔄
Transform Agent
⚠️
Conflict Detected
confidence: 76%
⏸️
Pipeline Paused
👨‍💻
Data Steward
Reviews Entities
Decision Applied
merge_to_a
▶️
Pipeline Resumes
RUNNING

Pipeline State Schema

{
  "batch_id": "BATCH-2024-0310-DE",
  "source_state": "DE",
  "stage": "transformation",
  "created_at": "2024-03-10T06:00:00Z",
  "ingestion_output": {
    "total_records": 245,
    "parsed_successfully": 238,
    "parse_errors": 7,
    "error_rate": 0.0286,
    "file_format": "XML",
    "records": [
      {
        "raw_filing_number": "2024-0098765",
        "raw_debtor_name": "ACME LOGISTICS, L.L.C.",
        "raw_secured_party": "First National Bank",
        "raw_collateral": "All assets incl inventory, accts recv, equip",
        "filing_date": "2024-03-08",
        "filing_type": "UCC-1"
      }
    ]
  },
  "transformation_output": {
    "records_transformed": 238,
    "entity_matches": [
      {
        "filing_number": "2024-0099200",
        "candidate_a": {"id": "ENT-00234", "name": "Acme Logistics LLC"},
        "candidate_b": {"id": "ENT-00234-ALT", "name": "ACME LOGISTICS, L.L.C."},
        "confidence": 0.76
      },
      {
        "filing_number": "2024-0099950",
        "candidate_a": {"id": "ENT-00234", "name": "Acme Logistics LLC"},
        "candidate_b": {"id": "ENT-09950", "name": "Acme  Logistics   LLC"},
        "confidence": 0.68
      }
    ]
  },
  "quality_output": null,
  "reporting_output": null,
  "circuit_breaker": {
    "parse_error_rate": 0.0286,
    "threshold": 0.10,
    "status": "healthy"
  }
}
{
  "review_id": "DSR-2024-0445",
  "batch_id": "BATCH-2024-0310-DE",
  "type": "entity_resolution_conflict",
  "record_filing_number": "2024-0098765",
  "candidate_matches": [
    {"canonical_id": "ENT-00234", "name": "Acme Logistics LLC", "confidence": 0.76},
    {"canonical_id": "ENT-08891", "name": "Acme Logistics Inc.", "confidence": 0.71}
  ],
  "steward_options": [
    "match_to_ENT-00234",
    "match_to_ENT-08891",
    "create_new_entity",
    "flag_for_investigation"
  ]
}

Step-by-Step Build Guide

Now you understand the architecture and data flow. Let's build it piece by piece. Each step creates one file, gives you a command to test it, and shows the expected output. Follow the steps in order — later steps depend on earlier ones.

Step 1: Create the Mock Data (mock_data.py)

What & Why: Before we can build any agents, we need realistic data to feed them. This file contains 15 UCC filings that cover the variety you would see in production: UCC-1 originals, UCC-3 amendments, continuations, and terminations. Some filings have intentional quality issues (missing fields, garbled text) so we can test the circuit breaker and quality agent later.

Create a new file called mock_data.py:

# mock_data.py — 15 realistic UCC filings for pipeline testing
# Includes UCC-1 originals, UCC-3 amendments/continuations/terminations,
# and filings with data quality issues for circuit breaker testing.

MOCK_FILINGS = [
    # --- Clean UCC-1 filings (happy path) ---
    {
        "filing_number": "2024-0098765",
        "debtor_name": "ACME LOGISTICS, L.L.C.",
        "secured_party_name": "First National Bank",
        "state_code": "DE",
        "filing_date": "2024-03-08",
        "filing_type": "UCC-1",
        "collateral_description": "All assets incl inventory, accts recv, equip",
        "lapse_date": "2029-03-08"
    },
    {
        "filing_number": "2024-0098801",
        "debtor_name": "BuildRight Construction Inc.",
        "secured_party_name": "Commerce Capital Group",
        "state_code": "DE",
        "filing_date": "2024-03-09",
        "filing_type": "UCC-1",
        "collateral_description": "All inventory and equipment located at 123 Industrial Pkwy",
        "lapse_date": "2029-03-09"
    },
    {
        "filing_number": "2024-0099012",
        "debtor_name": "GreenField Ag Solutions LLC",
        "secured_party_name": "AgriFinance Corp",
        "state_code": "TX",
        "filing_date": "2024-03-10",
        "filing_type": "UCC-1",
        "collateral_description": "Farm products, crops, livestock, and farm equipment",
        "lapse_date": "2029-03-10"
    },
    {
        "filing_number": "2024-0099200",
        "debtor_name": "Acme Logistics LLC",
        "secured_party_name": "Second Regional Bank",
        "state_code": "NY",
        "filing_date": "2024-03-11",
        "filing_type": "UCC-1",
        "collateral_description": "Accounts receivable and general intangibles",
        "lapse_date": "2029-03-11"
    },
    {
        "filing_number": "2024-0099301",
        "debtor_name": "Pacific Coast Distributors, Inc.",
        "secured_party_name": "Western Commercial Lending",
        "state_code": "CA",
        "filing_date": "2024-03-12",
        "filing_type": "UCC-1",
        "collateral_description": "All assets, including but not limited to inventory, equipment, accounts",
        "lapse_date": "2029-03-12"
    },
    {
        "filing_number": "2024-0099450",
        "debtor_name": "MedTech Innovations Corp",
        "secured_party_name": "HealthVentures Capital",
        "state_code": "MA",
        "filing_date": "2024-03-13",
        "filing_type": "UCC-1",
        "collateral_description": "All intellectual property, patents, trademarks, and related proceeds",
        "lapse_date": "2029-03-13"
    },
    {
        "filing_number": "2024-0099510",
        "debtor_name": "Summit Energy Holdings LLC",
        "secured_party_name": "National Infrastructure Bank",
        "state_code": "TX",
        "filing_date": "2024-03-14",
        "filing_type": "UCC-1",
        "collateral_description": "Oil and gas leases, mineral rights, extraction equipment",
        "lapse_date": "2029-03-14"
    },

    # --- UCC-3 amendments/continuations/terminations ---
    {
        "filing_number": "2024-0099601",
        "debtor_name": "ACME LOGISTICS, L.L.C.",
        "secured_party_name": "First National Bank",
        "state_code": "DE",
        "filing_date": "2024-03-15",
        "filing_type": "UCC-3",
        "amendment_type": "continuation",
        "original_filing_number": "2019-0045123",
        "collateral_description": "All assets incl inventory, accts recv, equip",
        "lapse_date": "2029-03-15"
    },
    {
        "filing_number": "2024-0099701",
        "debtor_name": "BuildRight Construction Inc.",
        "secured_party_name": "Commerce Capital Group",
        "state_code": "DE",
        "filing_date": "2024-03-15",
        "filing_type": "UCC-3",
        "amendment_type": "amendment",
        "original_filing_number": "2022-0078901",
        "collateral_description": "All inventory, equipment, AND fixtures located at 123 Industrial Pkwy and 456 Commerce Dr",
        "lapse_date": "2027-06-01"
    },
    {
        "filing_number": "2024-0099801",
        "debtor_name": "Riverside Manufacturing Co.",
        "secured_party_name": "Midwest Commercial Finance",
        "state_code": "IL",
        "filing_date": "2024-03-16",
        "filing_type": "UCC-3",
        "amendment_type": "termination",
        "original_filing_number": "2019-0034567",
        "collateral_description": "Terminated - all collateral released",
        "lapse_date": None
    },

    # --- Filings with data quality issues (for circuit breaker / quality testing) ---
    {
        "filing_number": "2024-0099901",
        "debtor_name": "",
        "secured_party_name": "Unknown Lender",
        "state_code": "DE",
        "filing_date": "2024-03-17",
        "filing_type": "UCC-1",
        "collateral_description": "All assets",
        "lapse_date": "2029-03-17",
        "parse_error": True,
        "error_detail": "Missing required field: debtor_name"
    },
    {
        "filing_number": "2024-0099902",
        "debtor_name": "&#CORRUPT_DATA_%%^&",
        "secured_party_name": "&#CORRUPT_DATA_%%^&",
        "state_code": "XX",
        "filing_date": "not-a-date",
        "filing_type": "UCC-1",
        "collateral_description": "",
        "lapse_date": None,
        "parse_error": True,
        "error_detail": "Multiple parse failures: invalid state_code, invalid date, garbled names"
    },
    {
        "filing_number": "",
        "debtor_name": "Ghost Entity Partners",
        "secured_party_name": "Shadow Capital LLC",
        "state_code": "NV",
        "filing_date": "2024-03-18",
        "filing_type": "UCC-1",
        "collateral_description": "All assets",
        "lapse_date": "2029-03-18",
        "parse_error": True,
        "error_detail": "Missing required field: filing_number"
    },
    {
        "filing_number": "2024-0099950",
        "debtor_name": "Acme  Logistics   LLC",
        "secured_party_name": "Third Party Lender",
        "state_code": "DE",
        "filing_date": "2024-03-19",
        "filing_type": "UCC-1",
        "collateral_description": "All assets",
        "lapse_date": "2029-03-19"
    },
    {
        "filing_number": "2024-0099999",
        "debtor_name": "Pacific Coast Distributors Inc",
        "secured_party_name": "Eastern Trade Finance",
        "state_code": "CA",
        "filing_date": "2024-03-20",
        "filing_type": "UCC-1",
        "collateral_description": "Inventory",
        "lapse_date": "2029-03-20"
    },
]


# Entity resolution test data — pre-computed match candidates
# The transformation agent uses this to simulate entity resolution.
ENTITY_MATCHES = [
    {
        "filing_number": "2024-0099200",
        "candidate_a": {"id": "ENT-00234", "name": "Acme Logistics LLC"},
        "candidate_b": {"id": "ENT-00234-ALT", "name": "ACME LOGISTICS, L.L.C."},
        "confidence": 0.76
    },
    {
        "filing_number": "2024-0099999",
        "candidate_a": {"id": "ENT-00891", "name": "Pacific Coast Distributors, Inc."},
        "candidate_b": {"id": "ENT-00891-ALT", "name": "Pacific Coast Distributors Inc"},
        "confidence": 0.92
    },
    {
        "filing_number": "2024-0099950",
        "candidate_a": {"id": "ENT-00234", "name": "Acme Logistics LLC"},
        "candidate_b": {"id": "ENT-09950", "name": "Acme  Logistics   LLC"},
        "confidence": 0.68
    },
]


def get_clean_filings():
    """Return only filings without parse errors."""
    return [f for f in MOCK_FILINGS if not f.get("parse_error")]


def get_error_filings():
    """Return only filings with parse errors."""
    return [f for f in MOCK_FILINGS if f.get("parse_error")]


def get_low_confidence_matches(threshold=0.80):
    """Return entity matches with confidence below threshold."""
    return [m for m in ENTITY_MATCHES if m["confidence"] < threshold]


def compute_error_rate(filings):
    """Compute the parse error rate for a list of filings."""
    if not filings:
        return 0.0
    errors = sum(1 for f in filings if f.get("parse_error"))
    return errors / len(filings)

Run it:

python -c "from mock_data import MOCK_FILINGS, compute_error_rate; print(f'{len(MOCK_FILINGS)} filings, error rate: {compute_error_rate(MOCK_FILINGS):.1%}')"

Expected output:

15 filings, error rate: 20.0%
Checkpoint

You should see 15 filings with a 20.0% error rate. The error rate is above the 10% threshold because this is the full dataset including bad records. In practice, a batch with this error rate would trip the circuit breaker. If you see an ImportError, make sure you are running the command from the capstone-4-multi-agent-ucc/ directory.

Step 2: Create the Agents Package (agents/)

What & Why: Each agent is a separate module with its own system prompt and tools. This separation of concerns means you can test, debug, and modify each agent independently. We start with the __init__.py and then the ingestion agent.

Create the agents/ directory and agents/__init__.py:

# agents/__init__.py
from .ingestion_agent import run_ingestion
from .transform_agent import run_transformation
from .quality_agent import run_quality_checks
from .reporting_agent import run_reporting

Create agents/ingestion_agent.py. This agent detects the file format, parses each filing record, and validates the schema. It returns a structured output with parsed records and error counts.

# agents/ingestion_agent.py — Agent 1: Parse and validate raw filing data
import anthropic
import json

SYSTEM_PROMPT = """You are the Ingestion Agent for a UCC data pipeline.
Your job: parse raw filing records, validate required fields, and report errors.
Required fields: filing_number, debtor_name, secured_party_name, state_code,
filing_date, filing_type, collateral_description.
If a record has parse_error=True, count it as a parse failure.
Call the parse_filing_batch tool with the records to process them."""

TOOLS = [{
    "name": "parse_filing_batch",
    "description": "Parse and validate a batch of raw UCC filing records. Returns parsed records and error counts.",
    "input_schema": {
        "type": "object",
        "properties": {
            "records": {"type": "array", "description": "Raw filing records to parse"},
            "source_state": {"type": "string", "description": "State code for this batch"}
        },
        "required": ["records", "source_state"]
    }
}]


def handle_parse_filing_batch(records, source_state):
    """Mock tool handler: validate records and separate clean from errors."""
    parsed = []
    errors = []
    required_fields = ["filing_number", "debtor_name", "secured_party_name",
                        "state_code", "filing_date", "filing_type",
                        "collateral_description"]

    for record in records:
        if record.get("parse_error"):
            errors.append({
                "filing_number": record.get("filing_number", "UNKNOWN"),
                "error": record.get("error_detail", "Parse error flagged"),
            })
            continue

        missing = [f for f in required_fields if not record.get(f)]
        if missing:
            errors.append({
                "filing_number": record.get("filing_number", "UNKNOWN"),
                "error": f"Missing fields: {', '.join(missing)}",
            })
            continue

        parsed.append(record)

    return {
        "total_records": len(records),
        "parsed_successfully": len(parsed),
        "parse_errors": len(errors),
        "error_rate": len(errors) / max(len(records), 1),
        "source_state": source_state,
        "parsed_records": parsed,
        "error_details": errors,
    }


def run_ingestion(client, filings, source_state="DE"):
    """Run the ingestion agent on a batch of filings."""
    messages = [{
        "role": "user",
        "content": f"Parse this batch of {len(filings)} UCC filings from state {source_state}.\n"
                   f"Records:\n{json.dumps(filings, indent=2, default=str)}"
    }]

    for _ in range(5):
        try:
            response = client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=4096,
                system=SYSTEM_PROMPT,
                tools=TOOLS,
                messages=messages,
            )
        except anthropic.APIError as e:
            return {"error": str(e), "stage": "ingestion"}

        if response.stop_reason == "tool_use":
            tool_results = []
            for block in response.content:
                if block.type == "tool_use" and block.name == "parse_filing_batch":
                    result = handle_parse_filing_batch(
                        block.input.get("records", filings),
                        block.input.get("source_state", source_state),
                    )
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result, default=str),
                    })
            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})
        elif response.stop_reason == "end_turn":
            text = " ".join(b.text for b in response.content if hasattr(b, "text"))
            # Return the structured result from the tool handler
            result = handle_parse_filing_batch(filings, source_state)
            result["agent_summary"] = text
            return result

    return {"error": "max iterations reached", "stage": "ingestion"}

Troubleshooting:

  • If you see mkdir: cannot create directory 'agents' — the directory already exists, that's fine.
  • If you see ImportError: cannot import name 'run_transformation' — that's expected until you complete the next steps. The __init__.py imports all four agents.

Step 3: Create the Transformation Agent (agents/transform_agent.py)

What & Why: The transformation agent takes parsed records and normalizes them: standardizing entity names (removing extra spaces, normalizing LLC/L.L.C. variations), classifying collateral types, and performing entity resolution with fuzzy matching. This is where most data quality value is created — and where HITL review gets triggered for ambiguous matches.

Create agents/transform_agent.py:

# agents/transform_agent.py — Agent 2: Normalize, classify, resolve entities
import anthropic
import json
import re

SYSTEM_PROMPT = """You are the Transformation Agent for a UCC data pipeline.
Your job: normalize entity names, classify collateral into standard UCC categories,
and resolve entities across filings using fuzzy matching.
Call normalize_entity_name for each debtor, then classify_collateral for each filing."""

TOOLS = [
    {
        "name": "normalize_entity_name",
        "description": "Normalize a business entity name by standardizing abbreviations, removing extra whitespace, and fixing common variations.",
        "input_schema": {
            "type": "object",
            "properties": {"name": {"type": "string", "description": "Raw entity name to normalize"}},
            "required": ["name"]
        }
    },
    {
        "name": "classify_collateral",
        "description": "Classify free-text collateral description into standard UCC categories.",
        "input_schema": {
            "type": "object",
            "properties": {"description": {"type": "string"}},
            "required": ["description"]
        }
    },
]

COLLATERAL_CATEGORIES = {
    "inventory": ["inventory", "stock", "goods", "merchandise"],
    "equipment": ["equipment", "equip", "machinery", "vehicles"],
    "accounts": ["accounts", "accts", "receivable", "receivables"],
    "general_intangibles": ["intangible", "intellectual property", "patents", "trademarks"],
    "farm_products": ["farm", "crops", "livestock", "agricultural"],
    "fixtures": ["fixtures", "fixture"],
    "minerals": ["oil", "gas", "mineral", "extraction"],
    "all_assets": ["all assets"],
}


def normalize_name(name):
    """Standardize entity name formatting."""
    name = re.sub(r'\s+', ' ', name.strip())
    name = re.sub(r',\s*$', '', name)
    replacements = {
        "L.L.C.": "LLC", "l.l.c.": "LLC",
        " Inc.": " Inc", " Corp.": " Corp",
        " Co.": " Co", " Ltd.": " Ltd",
    }
    for old, new in replacements.items():
        name = name.replace(old, new)
    # Strip a comma directly before a corporate suffix: "ACME, LLC" -> "ACME LLC"
    name = re.sub(r',\s+(LLC|Inc|Corp|Co|Ltd)\b', r' \1', name)
    return name


def classify_collateral(description):
    """Map collateral description to standard UCC categories."""
    desc_lower = description.lower()
    categories = []
    for category, keywords in COLLATERAL_CATEGORIES.items():
        if any(kw in desc_lower for kw in keywords):
            categories.append(category)
    return categories if categories else ["other"]


def run_transformation(client, ingestion_output, entity_matches=None):
    """Run the transformation agent on ingestion output."""
    records = ingestion_output.get("parsed_records", [])
    if entity_matches is None:
        from mock_data import ENTITY_MATCHES
        entity_matches = ENTITY_MATCHES

    transformed = []
    for record in records:
        normalized = {
            **record,
            "normalized_debtor": normalize_name(record.get("debtor_name", "")),
            "normalized_secured_party": normalize_name(record.get("secured_party_name", "")),
            "collateral_categories": classify_collateral(record.get("collateral_description", "")),
        }
        transformed.append(normalized)

    return {
        "stage": "transformation",
        "records_transformed": len(transformed),
        "transformed_records": transformed,
        "entity_matches": entity_matches,
    }
Checkpoint

You now have both the ingestion and transformation agents. The transformation agent normalizes "ACME LOGISTICS, L.L.C." to "ACME LOGISTICS LLC" and classifies "All assets incl inventory, accts recv, equip" into ["inventory", "equipment", "accounts", "all_assets"].

Step 4: Create the Quality Agent (agents/quality_agent.py)

What & Why: The quality agent checks the transformed data for completeness (are all required fields present?), consistency (do dates and states make sense?), and anomalies (did filing counts drop suspiciously?). It generates a quality scorecard that data stewards use to decide if a batch is production-ready.

Create agents/quality_agent.py:

# agents/quality_agent.py — Agent 3: Data quality checks and scorecard
import json
from datetime import datetime

REQUIRED_FIELDS = ["filing_number", "normalized_debtor", "normalized_secured_party",
                    "state_code", "filing_date", "filing_type", "collateral_categories"]

VALID_STATES = {"AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL",
                "IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT",
                "NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI",
                "SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC"}


def check_completeness(records):
    """Check that all required fields are present and non-empty."""
    issues = []
    for r in records:
        missing = [f for f in REQUIRED_FIELDS if not r.get(f)]
        if missing:
            issues.append({"filing_number": r.get("filing_number"), "missing": missing})
    total = len(records)
    complete = total - len(issues)
    return {"completeness_pct": (complete / max(total, 1)) * 100, "issues": issues}


def check_consistency(records):
    """Check that dates, states, and types are valid."""
    issues = []
    for r in records:
        # State code check
        if r.get("state_code") not in VALID_STATES:
            issues.append({"filing_number": r.get("filing_number"),
                           "issue": f"Invalid state_code: {r.get('state_code')}"})
        # Date check
        try:
            dt = datetime.strptime(r.get("filing_date", ""), "%Y-%m-%d")
            if dt.year < 2000 or dt.year > 2030:
                issues.append({"filing_number": r.get("filing_number"),
                               "issue": f"Suspicious date: {r.get('filing_date')}"})
        except (ValueError, TypeError):
            issues.append({"filing_number": r.get("filing_number"),
                           "issue": f"Invalid date format: {r.get('filing_date')}"})
        # Filing type check
        if r.get("filing_type") not in ("UCC-1", "UCC-3"):
            issues.append({"filing_number": r.get("filing_number"),
                           "issue": f"Unknown filing_type: {r.get('filing_type')}"})
    total = len(records)
    consistent = total - len(set(i["filing_number"] for i in issues))
    return {"consistency_pct": (consistent / max(total, 1)) * 100, "issues": issues}


def detect_anomalies(records, previous_count=None):
    """Detect suspicious patterns like sudden drops in filing counts."""
    anomalies = []
    if previous_count is not None and len(records) < previous_count * 0.2:
        anomalies.append({
            "type": "filing_count_drop",
            "detail": f"Filing count dropped from {previous_count} to {len(records)} (>{80}% decrease)",
            "severity": "high"
        })
    # Check for duplicate filing numbers
    numbers = [r.get("filing_number") for r in records]
    dupes = set(n for n in numbers if numbers.count(n) > 1)
    if dupes:
        anomalies.append({"type": "duplicate_filings", "detail": f"Duplicate filing numbers: {dupes}", "severity": "medium"})
    return anomalies


def run_quality_checks(transformation_output, previous_count=None):
    """Run all quality checks and generate a scorecard."""
    records = transformation_output.get("transformed_records", [])
    completeness = check_completeness(records)
    consistency = check_consistency(records)
    anomalies = detect_anomalies(records, previous_count)

    overall = (completeness["completeness_pct"] + consistency["consistency_pct"]) / 2
    grade = "A" if overall >= 95 else "B" if overall >= 85 else "C" if overall >= 70 else "F"

    return {
        "stage": "quality",
        "records_checked": len(records),
        "completeness": completeness,
        "consistency": consistency,
        "anomalies": anomalies,
        "overall_score": round(overall, 1),
        "grade": grade,
    }

Step 5: Create the Reporting Agent (agents/reporting_agent.py)

What & Why: The reporting agent generates risk profiles and lien summaries from the quality-checked data. This is the Gold layer in the Medallion Architecture — it turns normalized data into business intelligence. It also applies PII redaction for any public-facing reports.

Create agents/reporting_agent.py:

# agents/reporting_agent.py — Agent 4: Risk profiles, lien summaries, PII redaction
import re
from collections import defaultdict

PII_PATTERNS = [
    (re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '[SSN-REDACTED]'),      # SSN
    (re.compile(r'\b\d{2}-\d{7}\b'), '[EIN-REDACTED]'),             # EIN
    (re.compile(r'\b\d{5}(-\d{4})?\b'), '[ZIP-REDACTED]'),          # ZIP codes
]


def redact_pii(text):
    """Remove PII patterns from text."""
    for pattern, replacement in PII_PATTERNS:
        text = pattern.sub(replacement, text)
    return text


def generate_risk_profile(records):
    """Generate a risk profile per entity based on lien exposure."""
    entity_filings = defaultdict(list)
    for r in records:
        debtor = r.get("normalized_debtor", r.get("debtor_name", "Unknown"))
        entity_filings[debtor].append(r)

    profiles = []
    for entity, filings in entity_filings.items():
        active = [f for f in filings if f.get("filing_type") == "UCC-1"
                  or (f.get("filing_type") == "UCC-3"
                      and f.get("amendment_type") != "termination")]
        terminated = [f for f in filings if f.get("filing_type") == "UCC-3"
                      and f.get("amendment_type") == "termination"]
        lien_count = len(active)
        risk = "HIGH" if lien_count >= 3 else "MEDIUM" if lien_count >= 2 else "LOW"

        profiles.append({
            "entity": entity,
            "active_liens": lien_count,
            "terminated_liens": len(terminated),
            "secured_parties": list(set(f.get("normalized_secured_party", f.get("secured_party_name", ""))
                                        for f in active)),
            "collateral_types": list(set(
                cat for f in active for cat in f.get("collateral_categories", [])
            )),
            "risk_level": risk,
        })
    return sorted(profiles, key=lambda p: p["active_liens"], reverse=True)


def generate_lien_summary(records, redact=True):
    """Generate a text summary of all liens, with optional PII redaction."""
    lines = [f"LIEN SUMMARY — {len(records)} records processed\n"]
    for r in records:
        line = (f"  Filing {r.get('filing_number')}: "
                f"{r.get('normalized_debtor', r.get('debtor_name', 'N/A'))} "
                f"({r.get('filing_type')}) — "
                f"Secured by: {r.get('normalized_secured_party', r.get('secured_party_name', 'N/A'))}")
        lines.append(line)
    summary = "\n".join(lines)
    return redact_pii(summary) if redact else summary


def run_reporting(transformation_output, quality_output):
    """Run the reporting agent to generate risk profiles and summaries."""
    records = transformation_output.get("transformed_records", [])
    profiles = generate_risk_profile(records)
    summary = generate_lien_summary(records)

    return {
        "stage": "reporting",
        "risk_profiles": profiles,
        "lien_summary": summary,
        "entities_profiled": len(profiles),
        "quality_grade": quality_output.get("grade", "N/A"),
    }
Checkpoint

You now have all four agents. Each one takes input from the previous stage and produces structured output for the next. Before wiring them together, we need the safety mechanisms: the circuit breaker and HITL quality gate.

Step 6: Create the Quality Gate (quality_gate.py)

What & Why: This file contains two critical safety mechanisms. The circuit breaker halts the pipeline when parse error rates exceed 10%, preventing bad data from flowing downstream. The HITL review gate pauses the pipeline when entity resolution confidence is below 80%, routing the decision to a human data steward. These patterns are essential for production agent systems — they prevent silent failures and ensure humans stay in the loop for uncertain decisions.

Create quality_gate.py:

# quality_gate.py — Circuit breaker and HITL review logic

from mock_data import compute_error_rate


class CircuitBreaker:
    """Monitors parse error rates and halts the pipeline if threshold exceeded."""

    def __init__(self, threshold=0.10):
        self.threshold = threshold
        self.status = "healthy"
        self.error_rate = 0.0
        self.error_details = []

    def check(self, filings):
        """Check error rate against threshold. Returns True if pipeline should continue."""
        self.error_rate = compute_error_rate(filings)
        self.error_details = [
            {"filing_number": f.get("filing_number", "UNKNOWN"),
             "error": f.get("error_detail", "Parse error")}
            for f in filings if f.get("parse_error")
        ]

        if self.error_rate > self.threshold:
            self.status = "tripped"
            return False  # HALT the pipeline
        self.status = "healthy"
        return True  # Continue

    def get_status(self):
        return {
            "status": self.status,
            "error_rate": self.error_rate,
            "threshold": self.threshold,
            "errors_found": len(self.error_details),
            "error_details": self.error_details,
        }


class HITLReviewGate:
    """Routes low-confidence entity matches to a human data steward."""

    def __init__(self, confidence_threshold=0.80):
        self.confidence_threshold = confidence_threshold
        self.pending_reviews = []
        self.completed_reviews = []

    def check_matches(self, entity_matches):
        """Identify matches that need human review."""
        self.pending_reviews = [
            m for m in entity_matches
            if m.get("confidence", 1.0) < self.confidence_threshold
        ]
        return len(self.pending_reviews) > 0  # True if HITL needed

    def present_reviews(self, auto_resolve=False):
        """Present each conflict to the data steward for resolution.
        If auto_resolve=True, automatically pick candidate_a (for testing)."""
        for match in self.pending_reviews:
            print("\n" + "=" * 60)
            print(f"HITL REVIEW REQUIRED — Filing: {match['filing_number']}")
            print(f"  Candidate A: {match['candidate_a']['name']} "
                  f"(ID: {match['candidate_a']['id']})")
            print(f"  Candidate B: {match['candidate_b']['name']} "
                  f"(ID: {match['candidate_b']['id']})")
            print(f"  Match Confidence: {match['confidence']:.0%}")

            if auto_resolve:
                decision = "1"
                print("  [AUTO-RESOLVE] Selecting option 1: Merge to A")
            else:
                print("  Options: [1] Merge to A  [2] Merge to B  "
                      "[3] Keep Separate  [4] Flag for Review")
                decision = input("  Enter choice (1/2/3/4): ").strip()

            review = {"filing_number": match["filing_number"], "confidence": match["confidence"]}
            if decision == "1":
                review["decision"] = "merge_to_a"
                review["resolved_entity"] = match["candidate_a"]
            elif decision == "2":
                review["decision"] = "merge_to_b"
                review["resolved_entity"] = match["candidate_b"]
            elif decision == "3":
                review["decision"] = "keep_separate"
            else:
                review["decision"] = "flag_for_review"

            self.completed_reviews.append(review)
            print(f"  Decision recorded: {review['decision']}")
            print("=" * 60)

        self.pending_reviews = []
        return self.completed_reviews

Run it:

python -c "
from quality_gate import CircuitBreaker
from mock_data import MOCK_FILINGS

cb = CircuitBreaker(threshold=0.10)
can_continue = cb.check(MOCK_FILINGS)
status = cb.get_status()
print(f'Status: {status[\"status\"]}')
print(f'Error rate: {status[\"error_rate\"]:.1%}')
print(f'Continue? {can_continue}')
"

Expected output:

Status: tripped
Error rate: 20.0%
Continue? False
Checkpoint

The circuit breaker correctly trips on the full mock data set (20% error rate exceeds the 10% threshold). In the coordinator, we will use the clean filings subset for the happy path and the full set to demonstrate the circuit breaker trip. If you see True instead of False, double-check that your mock_data.py has the 3 filings with parse_error: True.

Step 7: Create the Coordinator (coordinator.py)

What & Why: The coordinator is the "conductor" that wires all four agents together. It manages the pipeline state, runs the circuit breaker after ingestion, triggers HITL review after transformation, and passes output between stages. This is where you see the multi-agent pattern in action — each agent is independent, but the coordinator enforces the data flow, safety checks, and human review gates.

Create coordinator.py:

# coordinator.py — Main pipeline orchestrator
# Wires 4 agents together with circuit breaker and HITL gates.

import anthropic
import json
from dataclasses import dataclass, field, asdict

from mock_data import MOCK_FILINGS, get_clean_filings, ENTITY_MATCHES
from agents.ingestion_agent import run_ingestion, handle_parse_filing_batch
from agents.transform_agent import run_transformation
from agents.quality_agent import run_quality_checks
from agents.reporting_agent import run_reporting
from quality_gate import CircuitBreaker, HITLReviewGate


@dataclass
class PipelineState:
    batch_id: str
    source_state: str
    stage: str = "pending"
    ingestion_output: dict = field(default_factory=dict)
    transformation_output: dict = field(default_factory=dict)
    quality_output: dict = field(default_factory=dict)
    reporting_output: dict = field(default_factory=dict)
    circuit_breaker_status: dict = field(default_factory=dict)
    hitl_decisions: list = field(default_factory=list)


def run_pipeline(batch_id, source_state, filings, auto_resolve_hitl=False):
    """Execute the full 4-agent pipeline with safety checks.

    Args:
        batch_id: Unique identifier for this batch
        source_state: State code (e.g., 'DE')
        filings: List of raw filing records
        auto_resolve_hitl: If True, auto-resolve HITL reviews (for testing)
    """
    client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY env var
    state = PipelineState(batch_id=batch_id, source_state=source_state)

    print(f"\n{'='*60}")
    print(f"PIPELINE START — Batch: {batch_id}, State: {source_state}")
    print(f"Records: {len(filings)}")
    print(f"{'='*60}")

    # --- Circuit Breaker Check ---
    print("\n[1/5] Circuit Breaker Check...")
    cb = CircuitBreaker(threshold=0.10)
    if not cb.check(filings):
        state.stage = "halted"
        state.circuit_breaker_status = cb.get_status()
        print(f"  TRIPPED! Error rate {cb.error_rate:.1%} exceeds 10% threshold.")
        print(f"  Batch {batch_id} quarantined. Pipeline halted.")
        return {"status": "circuit_breaker_tripped", "state": asdict(state)}

    print(f"  HEALTHY — Error rate: {cb.error_rate:.1%}")
    state.circuit_breaker_status = cb.get_status()

    # --- Stage 1: Ingestion ---
    print("\n[2/5] Ingestion Agent...")
    state.stage = "ingestion"
    # Use the tool handler directly for clean execution
    ingestion_result = handle_parse_filing_batch(filings, source_state)
    state.ingestion_output = ingestion_result
    print(f"  Parsed: {ingestion_result['parsed_successfully']}/{ingestion_result['total_records']}")

    # --- Stage 2: Transformation ---
    print("\n[3/5] Transformation Agent...")
    state.stage = "transformation"
    transform_result = run_transformation(client, ingestion_result, ENTITY_MATCHES)
    state.transformation_output = transform_result
    print(f"  Transformed: {transform_result['records_transformed']} records")

    # --- HITL Gate ---
    hitl_gate = HITLReviewGate(confidence_threshold=0.80)
    entity_matches = transform_result.get("entity_matches", [])
    if hitl_gate.check_matches(entity_matches):
        print(f"\n  HITL GATE: {len(hitl_gate.pending_reviews)} conflicts need review")
        decisions = hitl_gate.present_reviews(auto_resolve=auto_resolve_hitl)
        state.hitl_decisions = decisions
        transform_result["hitl_decisions"] = decisions
    else:
        print("  No entity conflicts requiring HITL review.")

    # --- Stage 3: Quality ---
    print("\n[4/5] Quality Agent...")
    state.stage = "quality"
    quality_result = run_quality_checks(transform_result)
    state.quality_output = quality_result
    print(f"  Quality Grade: {quality_result['grade']} "
          f"(Score: {quality_result['overall_score']}%)")

    # --- Stage 4: Reporting ---
    print("\n[5/5] Reporting Agent...")
    state.stage = "reporting"
    reporting_result = run_reporting(transform_result, quality_result)
    state.reporting_output = reporting_result
    print(f"  Entities profiled: {reporting_result['entities_profiled']}")

    state.stage = "complete"
    print(f"\n{'='*60}")
    print(f"PIPELINE COMPLETE — Batch: {batch_id}")
    print(f"  Quality: {quality_result['grade']} | "
          f"Entities: {reporting_result['entities_profiled']} | "
          f"HITL Reviews: {len(state.hitl_decisions)}")
    print(f"{'='*60}")

    return {"status": "complete", "state": asdict(state)}


if __name__ == "__main__":
    # Happy path: use clean filings (no parse errors)
    clean = get_clean_filings()
    result = run_pipeline("BATCH-2024-0310-DE", "DE", clean, auto_resolve_hitl=True)
    print(f"\nFinal status: {result['status']}")
    print(f"Risk profiles:")
    for p in result["state"]["reporting_output"].get("risk_profiles", []):
        print(f"  {p['entity']}: {p['risk_level']} "
              f"({p['active_liens']} active liens)")
// coordinator.ts — Multi-Agent Pipeline Orchestrator (Node.js version)
// This is a simplified TypeScript port. The Python version above is the
// primary implementation with full HITL and circuit breaker integration.
// To run this version, install: npm install @anthropic-ai/sdk

import Anthropic from "@anthropic-ai/sdk";

interface PipelineState {
  batchId: string;
  sourceState: string;
  stage: string;
  ingestionOutput: Record<string, unknown> | null;
  transformationOutput: Record<string, unknown> | null;
  qualityOutput: Record<string, unknown> | null;
  reportingOutput: Record<string, unknown> | null;
  circuitBreaker: { errorRate: number; threshold: number; status: string };
}

interface Filing {
  filing_number: string;
  debtor_name: string;
  secured_party_name: string;
  state_code: string;
  filing_date: string;
  filing_type: string;
  collateral_description: string;
  parse_error?: boolean;
  error_detail?: string;
}

function checkCircuitBreaker(filings: Filing[], threshold = 0.10) {
  const errors = filings.filter(f => f.parse_error).length;
  const errorRate = errors / Math.max(filings.length, 1);
  return {
    canContinue: errorRate <= threshold,
    errorRate,
    status: errorRate > threshold ? "tripped" : "healthy",
  };
}

export async function runPipeline(
  batchId: string, sourceState: string, filings: Filing[]
) {
  const state: PipelineState = {
    batchId, sourceState, stage: "pending",
    ingestionOutput: null, transformationOutput: null,
    qualityOutput: null, reportingOutput: null,
    circuitBreaker: { errorRate: 0, threshold: 0.10, status: "healthy" },
  };

  // Circuit breaker check
  const cb = checkCircuitBreaker(filings);
  state.circuitBreaker = { errorRate: cb.errorRate, threshold: 0.10, status: cb.status };
  if (!cb.canContinue) {
    state.stage = "halted";
    console.log(`CIRCUIT BREAKER TRIPPED — ${(cb.errorRate*100).toFixed(1)}% error rate`);
    return { status: "circuit_breaker_tripped", state };
  }

  // Filter to clean filings for processing
  const cleanFilings = filings.filter(f => !f.parse_error);
  console.log(`Processing ${cleanFilings.length} clean filings out of ${filings.length} total`);

  // Stages 1-4 would follow the same pattern as the Python version
  // Each stage processes data and passes results to the next
  state.stage = "complete";
  return { status: "complete", state };
}

Run the pipeline:

python coordinator.py

Expected output:

============================================================
PIPELINE START — Batch: BATCH-2024-0310-DE, State: DE
Records: 12
============================================================

[1/5] Circuit Breaker Check...
  HEALTHY — Error rate: 0.0%

[2/5] Ingestion Agent...
  Parsed: 12/12

[3/5] Transformation Agent...
  Transformed: 12 records

  HITL GATE: 2 conflicts need review

============================================================
HITL REVIEW REQUIRED — Filing: 2024-0099200
  Candidate A: Acme Logistics LLC (ID: ENT-00234)
  Candidate B: ACME LOGISTICS, L.L.C. (ID: ENT-00234-ALT)
  Match Confidence: 76%
  [AUTO-RESOLVE] Selecting option 1: Merge to A
  Decision recorded: merge_to_a
============================================================

============================================================
HITL REVIEW REQUIRED — Filing: 2024-0099950
  Candidate A: Acme Logistics LLC (ID: ENT-00234)
  Candidate B: Acme  Logistics   LLC (ID: ENT-09950)
  Match Confidence: 68%
  [AUTO-RESOLVE] Selecting option 1: Merge to A
  Decision recorded: merge_to_a
============================================================

[4/5] Quality Agent...
  Quality Grade: A (Score: 100.0%)

[5/5] Reporting Agent...
  Entities profiled: 8

============================================================
PIPELINE COMPLETE — Batch: BATCH-2024-0310-DE
  Quality: A | Entities: 8 | HITL Reviews: 2
============================================================

Final status: complete
Risk profiles:
  ACME LOGISTICS LLC: MEDIUM (2 active liens)
  BuildRight Construction Inc: MEDIUM (2 active liens)
  Acme Logistics LLC: MEDIUM (2 active liens)
  Pacific Coast Distributors Inc: MEDIUM (2 active liens)
  ...
What Just Happened?

You ran the complete 4-agent pipeline end-to-end. The coordinator: (1) checked the circuit breaker (healthy for clean data), (2) parsed 12 filings through ingestion, (3) normalized names and classified collateral in transformation, (4) detected 2 entity conflicts below 80% confidence and auto-resolved them via HITL, (5) ran quality checks (grade A), and (6) generated risk profiles for 8 entities. The pipeline passed structured state between every stage — no agent needed to know about the others.

Step 8: Create the Test Suite (test_pipeline.py)

What & Why: Testing is critical for multi-agent systems because failures can be subtle — an agent might produce output that looks right but contains incorrect entity resolutions. This test file covers 5 scenarios: happy path, HITL trigger, circuit breaker trip, edge case (empty batch), and the circuit breaker threshold boundary.

Create test_pipeline.py:

# test_pipeline.py — Test cases for the UCC pipeline
import json
from mock_data import MOCK_FILINGS, get_clean_filings, get_error_filings, compute_error_rate
from quality_gate import CircuitBreaker, HITLReviewGate
from agents.ingestion_agent import handle_parse_filing_batch
from agents.transform_agent import run_transformation, normalize_name, classify_collateral
from agents.quality_agent import run_quality_checks
from agents.reporting_agent import run_reporting, generate_risk_profile


def test_1_happy_path():
    """Test: Clean filings flow through all 4 stages without errors."""
    print("\n--- Test 1: Happy Path (clean filings) ---")
    clean = get_clean_filings()
    assert len(clean) == 12, f"Expected 12 clean filings, got {len(clean)}"

    ingestion = handle_parse_filing_batch(clean, "DE")
    assert ingestion["parse_errors"] == 0, f"Expected 0 errors, got {ingestion['parse_errors']}"
    assert ingestion["parsed_successfully"] == 12

    transform = run_transformation(None, ingestion)
    assert transform["records_transformed"] == 12

    quality = run_quality_checks(transform)
    assert quality["grade"] in ("A", "B"), f"Expected A or B grade, got {quality['grade']}"

    reporting = run_reporting(transform, quality)
    assert reporting["entities_profiled"] > 0

    print(f"  PASSED — {reporting['entities_profiled']} entities, grade {quality['grade']}")


def test_2_circuit_breaker_trips():
    """Test: Full dataset (20% errors) triggers circuit breaker."""
    print("\n--- Test 2: Circuit Breaker Trip ---")
    cb = CircuitBreaker(threshold=0.10)
    can_continue = cb.check(MOCK_FILINGS)
    assert not can_continue, "Circuit breaker should trip at 20% error rate"
    assert cb.status == "tripped"
    assert cb.error_rate > 0.10
    print(f"  PASSED — Tripped at {cb.error_rate:.1%} (threshold: 10%)")


def test_3_hitl_triggers():
    """Test: Low-confidence entity matches trigger HITL review."""
    print("\n--- Test 3: HITL Review Trigger ---")
    from mock_data import ENTITY_MATCHES
    gate = HITLReviewGate(confidence_threshold=0.80)
    needs_review = gate.check_matches(ENTITY_MATCHES)
    assert needs_review, "HITL should be triggered for matches below 80%"
    assert len(gate.pending_reviews) == 2, f"Expected 2 reviews, got {len(gate.pending_reviews)}"

    # Auto-resolve
    decisions = gate.present_reviews(auto_resolve=True)
    assert len(decisions) == 2
    assert all(d["decision"] == "merge_to_a" for d in decisions)
    print(f"  PASSED — {len(decisions)} conflicts auto-resolved")


def test_4_edge_empty_batch():
    """Test: Empty batch handles gracefully."""
    print("\n--- Test 4: Edge Case (empty batch) ---")
    cb = CircuitBreaker(threshold=0.10)
    can_continue = cb.check([])
    assert can_continue, "Empty batch should not trip circuit breaker"
    assert cb.error_rate == 0.0

    ingestion = handle_parse_filing_batch([], "DE")
    assert ingestion["total_records"] == 0
    assert ingestion["parse_errors"] == 0
    print("  PASSED — Empty batch handled without errors")


def test_5_entity_normalization():
    """Test: Entity name normalization handles variations correctly."""
    print("\n--- Test 5: Entity Name Normalization ---")
    assert normalize_name("ACME LOGISTICS, L.L.C.") == "ACME LOGISTICS LLC"
    assert normalize_name("Acme  Logistics   LLC") == "Acme Logistics LLC"
    assert normalize_name("BuildRight Construction Inc.") == "BuildRight Construction Inc"
    assert classify_collateral("All assets incl inventory, accts recv, equip") == \
        ["inventory", "equipment", "accounts", "all_assets"]
    print("  PASSED — All normalizations correct")


if __name__ == "__main__":
    print("=" * 60)
    print("UCC PIPELINE TEST SUITE")
    print("=" * 60)

    tests = [test_1_happy_path, test_2_circuit_breaker_trips,
             test_3_hitl_triggers, test_4_edge_empty_batch,
             test_5_entity_normalization]
    passed = 0
    failed = 0
    for test in tests:
        try:
            test()
            passed += 1
        except AssertionError as e:
            print(f"  FAILED — {e}")
            failed += 1
        except Exception as e:
            print(f"  ERROR — {type(e).__name__}: {e}")
            failed += 1

    print(f"\n{'='*60}")
    print(f"Results: {passed} passed, {failed} failed out of {len(tests)} tests")
    print(f"{'='*60}")

Run the tests:

python test_pipeline.py

Expected output:

============================================================
UCC PIPELINE TEST SUITE
============================================================

--- Test 1: Happy Path (clean filings) ---
  PASSED — 8 entities, grade A

--- Test 2: Circuit Breaker Trip ---
  PASSED — Tripped at 20.0% (threshold: 10%)

--- Test 3: HITL Review Trigger ---

============================================================
HITL REVIEW REQUIRED — Filing: 2024-0099200
  ...
  [AUTO-RESOLVE] Selecting option 1: Merge to A
  Decision recorded: merge_to_a
============================================================

============================================================
HITL REVIEW REQUIRED — Filing: 2024-0099950
  ...
  [AUTO-RESOLVE] Selecting option 1: Merge to A
  Decision recorded: merge_to_a
============================================================
  PASSED — 2 conflicts auto-resolved

--- Test 4: Edge Case (empty batch) ---
  PASSED — Empty batch handled without errors

--- Test 5: Entity Name Normalization ---
  PASSED — All normalizations correct

============================================================
Results: 5 passed, 0 failed out of 5 tests
============================================================
Checkpoint

All 5 tests should pass. Test 2 confirms the circuit breaker trips at 20% error rate. Test 3 confirms HITL triggers for the two entity matches below 80% confidence. If any test fails, check the error message — the most common issue is a missing import or file.

Testing Guide

TypeScenarioExpected Behavior
Happy245 DE XML filings, 2.8% error rateAll 4 agents complete, quality scorecard generated, reports produced
HappyEntity with 76% confidence matchPipeline pauses, routes to data steward queue
HappySteward selects "match_to_ENT-00234"Pipeline resumes with resolved entity, continues through quality+reporting
HappyClean batch with no conflictsFull pipeline: ingestion → transform → quality → reporting, no HITL
HappyBatch with mixed active/lapsed filingsQuality agent correctly categorizes, reporting separates active vs lapsed
EdgeState sends CSV instead of expected XMLIngestion agent detects format, uses correct parser
EdgeQuality agent detects 80% drop in filing countFlags anomaly for investigation, does not auto-halt
EdgeCollateral description is emptyTransform agent flags as incomplete, quality agent deducts from completeness score
Adversarial12% parse error rate in batchCircuit breaker trips, batch quarantined, team alerted
AdversarialMalformed CSV with injection charactersParser treats as literal data, does not execute, flags as parse error

Verify Everything Works

Run the complete pipeline end-to-end with a single command:

python coordinator.py && python test_pipeline.py

Expected final output: The coordinator runs the full pipeline on 12 clean filings, auto-resolves 2 HITL conflicts, and produces risk profiles. The test suite then runs all 5 tests and reports 5 passed, 0 failed.

You can also test the circuit breaker trip by running the pipeline on the full dataset (including bad records):

python -c "
from coordinator import run_pipeline
from mock_data import MOCK_FILINGS
result = run_pipeline('BATCH-BAD', 'DE', MOCK_FILINGS)
print(f'Status: {result[\"status\"]}')
"

Expected output:

============================================================
PIPELINE START — Batch: BATCH-BAD, State: DE
Records: 15
============================================================

[1/5] Circuit Breaker Check...
  TRIPPED! Error rate 20.0% exceeds 10% threshold.
  Batch BATCH-BAD quarantined. Pipeline halted.
Status: circuit_breaker_tripped
Congratulations!

You have built a complete multi-agent UCC data pipeline with four specialized agents, a circuit breaker that halts on data quality failures, and human-in-the-loop review for ambiguous entity matches. This is the same architecture used in production data engineering systems — the only differences in a real deployment would be connecting to actual state SOS data feeds and a BigQuery Gold layer instead of mock data.

Troubleshooting

Common Errors

Import Errors

ModuleNotFoundError: No module named 'anthropic' — Your virtual environment is not activated. Run source venv/bin/activate (Unix) or venv\Scripts\activate (Windows).

ModuleNotFoundError: No module named 'agents' — You are running the script from the wrong directory. Make sure you are inside capstone-4-multi-agent-ucc/.

ImportError: cannot import name 'run_ingestion' from 'agents' — The agents/__init__.py file is missing or one of the agent files has not been created yet. Complete all steps before running the coordinator.

API Errors

AuthenticationError: 401 — Your ANTHROPIC_API_KEY is not set or is invalid. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify.

RateLimitError: 429 — You are sending too many requests. Wait a few seconds and retry. For testing, the coordinator uses mock tool handlers so API calls are minimal.

Test Failures

Test 5 (normalization) fails — Check that your classify_collateral function checks keywords in the correct order. The function returns categories in the order they are matched, which depends on dictionary iteration order in Python 3.7+.

Circuit breaker doesn't trip — Ensure your mock_data.py has exactly 3 filings with "parse_error": True. The error rate should be 3/15 = 20%.

HITL doesn't trigger — Ensure your ENTITY_MATCHES list in mock_data.py has at least 2 entries with "confidence" below 0.80.

Compliance & Regulatory Notes

Data Pipeline Regulations

PII redaction: The Reporting Agent must redact personally identifiable information before generating public-facing reports. Individual names, addresses, and Social Security numbers (if present in collateral descriptions) must be masked.

FCRA compliance: If pipeline output is used for credit decisions, accuracy requirements under the Fair Credit Reporting Act apply. The circuit breaker and quality checks are compliance mechanisms — they prevent inaccurate data from reaching decision-making systems.

Audit trail: Every pipeline run must log: batch ID, source state, records processed, error count, HITL decisions, quality scores, and output file locations. This enables regulatory audits and dispute resolution.

Going Further

  • [OPTIONAL] Capstone 5 integration: Add multi-layer memory (past pipeline runs), advanced RAG (regulatory reference), full observability (tracing every agent transition), and cost optimization (model routing per agent).
  • [OPTIONAL] Real-time monitoring: Build a dashboard that shows pipeline status, error rates, HITL queue depth, and quality trends across batches (M20).
  • [OPTIONAL] Parallel processing: Run ingestion for multiple states simultaneously using the Message Batches API (M25).
  • [OPTIONAL] Automated retraining: Track data steward decisions and use them to improve entity resolution confidence thresholds over time.
  • [OPTIONAL] Event-driven triggers: Use hooks (M26) to automatically start the pipeline when a new state data file is detected.

Knowledge Check

Q1: Why is the UCC data pipeline split into 4 separate agents?
Q2: What triggers the circuit breaker in this pipeline?
Q3: When should entity resolution conflicts be routed to a human data steward?
Q4 (Applied): Agent 3 detects that a state's filing count dropped 80% from the previous month. What should happen?
Q5: What is the purpose of PII redaction in Agent 4's output guardrails?

References & Resources