Capstone 4 — Domain A: End-to-End Pre-Auth Pipeline
Build a 4-agent pipeline that processes pre-authorization requests end-to-end: intake validation, clinical criteria evaluation, decision with human-in-the-loop, and communication — with circuit breaker protection.
Project Brief
A health plan processes 500+ prior authorizationA requirement by insurance companies that providers get approval before delivering certain services. The payer evaluates clinical criteria to determine medical necessity. requests daily. In Capstone 3, you built a single ReAct agent that reasons through one request. But a single agent handling the entire lifecycle — intake validation, clinical evaluation, decision-making, and provider communication — creates a monolithic bottleneck. When the decision logic changes, you redeploy everything. When intake parsing fails, clinical evaluation stops. When you want to A/B test notification templates, you risk breaking the decision engine.
The production solution is a pipeline of specialized agents. Each agent has a focused responsibility, its own tools, and a typed interface for passing structured state to the next agent. The Intake Agent validates; the Clinical Criteria Agent evaluates; the Decision Agent determines (or pauses for human review); the Communication Agent drafts and sends. Each can be independently deployed, tested, and scaled.
Three production patterns make this pipeline trustworthy: human-in-the-loopA design pattern where the automated pipeline pauses at defined checkpoints and requests human approval before proceeding. Used for high-stakes or low-confidence decisions where automation alone isn't trusted. checkpoints for medium-confidence decisions, a circuit breakerAn automatic safety mechanism that halts processing when error rates exceed a threshold. Like an electrical circuit breaker, it prevents cascading failures. The pipeline must be manually reset after investigation. that halts processing when errors cascade, and structured logging at every agent transition for full observability.
A 4-agent pipeline with HITL and circuit breaker:
- Agent 1 — Intake: Validates request schema, verifies member eligibility, confirms provider status. Input guardrails: PII detection, schema validation.
- Agent 2 — Clinical Criteria: Fetches policy criteria, evaluates each criterion against submitted evidence, generates per-criterion confidence scores.
- Agent 3 — Decision: Computes overall confidence. If >90%: auto-decide. If 70–90%: route to human reviewer. If <70%: auto-deny with appeal instructions.
- Agent 4 — Communication: Drafts determination letter (approval/denial/info-request), validates HIPAA compliance, sends via appropriate channel.
Skills practiced: Multi-agent architecture (M14), HITL patterns (M17), input/output guardrails (M16–M17), circuit breaker, evaluation (M18), structured inter-agent state.
Prerequisites
Complete these modules before starting this capstone:
- M03–M05 — Tool Use Fundamentals: Each agent in this pipeline uses tool calling to invoke mock healthcare APIs. You need to be comfortable defining tool schemas and handling tool results.
- M09 — RAG (Retrieval-Augmented Generation): Agent 2 (Clinical Criteria) performs policy document lookup — a simplified RAG pattern where clinical criteria are fetched and matched against submitted evidence.
- M12 — ReAct Pattern: Each agent internally uses the observe → think → act loop. The Clinical Criteria Agent reasons through each criterion step by step.
- M14 — Multi-Agent Systems: This capstone is a 4-agent pipeline with typed state passing between agents. You need to understand coordinator patterns and inter-agent contracts.
- M16 — Input Guardrails: The Intake Agent applies input validation (schema checks, missing field detection) before any processing begins. You need to understand how guardrails prevent garbage-in-garbage-out in agent pipelines.
- M17 — HITL & Output Guardrails: The Decision Agent routes medium-confidence cases to a human clinical reviewer. Output guardrails (HIPAA compliance checks on determination letters) are applied at the Communication Agent boundary.
You should also be comfortable with Python classes, JSON data structures, and running scripts from the command line.
Environment Setup
A 4-agent pre-authorization pipeline with human-in-the-loop review for medium-confidence clinical decisions, input/output guardrails, and a circuit breaker for cascading failure protection.
Time estimate: 2–3 hours Difficulty: ★★★★☆
Requirements
- Python 3.10 or higher
- An Anthropic API key (get one at console.anthropic.com)
Install Everything
Copy and paste this entire block into your terminal:
mkdir capstone-4-preauth-pipeline && cd capstone-4-preauth-pipeline
python3 -m venv venv && source venv/bin/activate
pip install "anthropic>=0.30.0" pytest
export ANTHROPIC_API_KEY=your-key-here
mkdir capstone-4-preauth-pipeline && cd capstone-4-preauth-pipeline
python -m venv venv && venv\Scripts\activate
pip install "anthropic>=0.30.0" pytest
set ANTHROPIC_API_KEY=your-key-here
Run python -c "import anthropic; print('OK')". You should see OK. If you see ModuleNotFoundError, make sure your virtual environment is activated.
File Structure
Here is every file you will create in this capstone. The build guide walks through them in order.
capstone-4-preauth-pipeline/
├── mock_tools.py # All agent tools (intake, clinical, decision, communication)
├── pipeline.py # Orchestrator: PipelineState, circuit breaker, HITL, all 4 agents
└── test_pipeline.py # 6 pytest test cases (happy path, denial, HITL, circuit breaker, invalid input, HIPAA guardrail)
Domain Glossary
Architecture
Mock Data Specification
The pipeline state object flows through all 4 agents, accumulating outputs. Each agent reads its predecessors’ outputs and writes its own. This typed state schema is the backbone of the multi-agent architecture.
{
"request_id": "AR-2024-09821",
"stage": "intake", // intake | clinical_criteria | decision | communication | complete | error
"created_at": "2024-03-10T09:00:00Z",
"intake_output": { // Written by Agent 1
"validated": true,
"member_verified": true,
"provider_verified": true,
"procedure_code": "27447",
"diagnosis_codes": ["M17.11"],
"clinical_notes_summary": "Severe right knee OA, KL Grade IV, 8 months PT, NSAIDs, 2 injections, WOMAC 68, BMI 31",
"missing_fields": [],
"urgency": "standard"
},
"criteria_output": { // Written by Agent 2
"policy_id": "POLICY-ORTHO-TKA-2024",
"criteria_evaluation": [
{"criterion": "C1", "met": true, "confidence": 0.95, "evidence": "KL Grade IV documented, M17.11 confirmed"},
{"criterion": "C2", "met": true, "confidence": 0.88, "evidence": "8 months PT, NSAIDs, 2 injections"},
{"criterion": "C3", "met": true, "confidence": 0.92, "evidence": "WOMAC 68 > 50 threshold"},
{"criterion": "C4", "met": true, "confidence": 0.98, "evidence": "BMI 31 < 40"}
]
},
"decision_output": { // Written by Agent 3
"overall_confidence": 0.93,
"determination": "approve",
"rationale": "All required criteria met with high confidence. In-network provider. Gold PPO plan covers procedure.",
"human_review_required": false,
"reviewer_override": null
},
"communication_output": { // Written by Agent 4
"letter_id": "LTR-2024-09821",
"letter_type": "approval",
"sent_via": "portal",
"sent_at": "2024-03-10T09:08:00Z",
"hipaa_compliant": true, // Set by check_hipaa_compliance output guardrail
"hipaa_issues": [], // Populated only if guardrail blocked the send
"blocked": false // true means the send was halted by the guardrail
},
"circuit_breaker": {
"consecutive_failures": 0,
"threshold": 3,
"status": "healthy" // healthy | tripped
}
}
The pipeline state is a structured record that each agent reads and writes to. Agent 1 writes intake_output, Agent 2 reads it and writes criteria_output, Agent 3 reads both and writes decision_output, Agent 4 reads the decision and writes communication_output. The stage field tracks where in the pipeline the request currently is. The circuit_breaker is checked at every transition.
Step-by-Step Build Guide
Step 1: Create the Mock Tools (mock_tools.py)
Four agents, each with 2–3 tools. The key insight: each agent has a focused set of tools. The Intake Agent cannot evaluate clinical criteria; the Decision Agent cannot draft letters. This enforces separation of concerns.
Create a new file called mock_tools.py:
"""mock_tools.py — All agent tools for Capstone 4-A pipeline.
Four agents × 2-3 tools each = 11 tools total.
Each agent has a focused tool set — no cross-agent tool access.
"""
import json
import re
from datetime import datetime, timedelta
# ═══════════════════════════════════════════════════════════════
# INTAKE AGENT TOOLS
# ═══════════════════════════════════════════════════════════════
MEMBER_DB = {
"MBR-555-1234": {"eligible": True, "plan": "Gold PPO",
"effective": "2024-01-01", "termination": None},
}
PROVIDER_DB = {
"1234567890": {"verified": True, "name": "Dr. Sarah Johnson, MD",
"specialty": "Orthopedic Surgery", "network": "in-network"},
}
def validate_auth_request(raw_request: dict) -> dict:
"""Validate and normalize an incoming auth request."""
required = ["member_id", "provider_npi", "procedure_code",
"diagnosis_codes", "clinical_notes"]
missing = [f for f in required if f not in raw_request or not raw_request[f]]
if missing:
return {"validated": False, "missing_fields": missing,
"normalized_request": None}
return {"validated": True, "missing_fields": [],
"normalized_request": {
"procedure_code": raw_request["procedure_code"],
"diagnosis_codes": raw_request["diagnosis_codes"],
"clinical_notes_summary": raw_request["clinical_notes"][:200],
"urgency": raw_request.get("urgency", "standard"),
}}
def verify_member_eligibility(member_id: str, service_date: str) -> dict:
member = MEMBER_DB.get(member_id)
if not member:
return {"error": "MEMBER_NOT_FOUND", "message": f"Member {member_id} not found."}
return {**member, "member_id": member_id}
def verify_provider(provider_npi: str) -> dict:
provider = PROVIDER_DB.get(provider_npi)
if not provider:
return {"error": "PROVIDER_NOT_FOUND", "message": f"NPI {provider_npi} not found."}
return {**provider, "provider_npi": provider_npi}
# ═══════════════════════════════════════════════════════════════
# CLINICAL CRITERIA AGENT TOOLS
# ═══════════════════════════════════════════════════════════════
POLICY_DB = {
"27447": {
"policy_id": "POLICY-ORTHO-TKA-2024",
"criteria": [
{"id": "C1", "description": "Severe OA (M17.11/M17.12) KL Grade III+", "required": True},
{"id": "C2", "description": "6+ months conservative treatment", "required": True},
{"id": "C3", "description": "WOMAC score > 50", "required": True},
{"id": "C4", "description": "BMI < 40", "required": False},
],
"effective_date": "2024-01-01",
},
}
def fetch_clinical_policy(procedure_code: str, payer: str = None) -> dict:
policy = POLICY_DB.get(procedure_code)
if not policy:
return {"error": "NO_POLICY_FOUND", "message": f"No policy for CPT {procedure_code}."}
return policy
CRITERIA_EVIDENCE_MAP = {
"C1": {"keywords": ["M17.11", "M17.12", "KL Grade III", "KL Grade IV",
"osteoarthritis", "severe"], "confidence_base": 0.90},
"C2": {"keywords": ["PT", "physical therapy", "NSAIDs", "injection",
"conservative", "6 month", "8 month"], "confidence_base": 0.85},
"C3": {"keywords": ["WOMAC", "score"], "confidence_base": 0.90},
"C4": {"keywords": ["BMI"], "confidence_base": 0.95},
}
def evaluate_criterion(criterion_id: str, clinical_notes: str,
supporting_docs: list = None) -> dict:
"""Evaluate a single criterion against clinical evidence."""
mapping = CRITERIA_EVIDENCE_MAP.get(criterion_id)
if not mapping:
return {"error": "CRITERION_NOT_FOUND", "message": f"Unknown criterion: {criterion_id}"}
notes_lower = clinical_notes.lower()
matches = [kw for kw in mapping["keywords"] if kw.lower() in notes_lower]
confidence = mapping["confidence_base"] * (len(matches) / max(len(mapping["keywords"]) * 0.5, 1))
confidence = min(confidence, 1.0)
met = confidence > 0.5
gaps = [] if met else [f"Insufficient evidence for {criterion_id}"]
evidence = f"Found: {', '.join(matches)}" if matches else "No matching evidence"
return {"criterion_id": criterion_id, "met": met,
"confidence": round(confidence, 2), "evidence": evidence, "gaps": gaps}
# ═══════════════════════════════════════════════════════════════
# DECISION AGENT TOOLS
# ═══════════════════════════════════════════════════════════════
def compute_decision_confidence(criteria_results: list,
network_status: str,
benefit_summary: dict) -> dict:
"""Compute overall confidence and preliminary recommendation."""
if not criteria_results:
return {"error": "INCOMPLETE_INPUT", "message": "No criteria results."}
avg_conf = sum(c.get("confidence", 0) for c in criteria_results) / len(criteria_results)
all_met = all(c.get("met", False) for c in criteria_results if c.get("required", True))
if all_met and avg_conf > 0.90:
rec = "approve"
human_review = False
elif all_met and avg_conf >= 0.70:
rec = "approve"
human_review = True # Medium confidence → HITL
elif not all_met:
rec = "deny" if avg_conf < 0.70 else "request_info"
human_review = avg_conf >= 0.70
else:
rec = "request_info"
human_review = True
return {"overall_confidence": round(avg_conf, 2),
"recommendation": rec,
"rationale": f"Avg confidence {avg_conf:.0%}. Network: {network_status}.",
"human_review_required": human_review}
def submit_for_human_review(request_id: str, confidence_score: float,
criteria_evaluation: list,
preliminary_recommendation: str) -> dict:
return {"review_id": f"HR-{request_id}",
"queue_position": 1,
"estimated_wait": "5 minutes"}
def finalize_determination(request_id: str, determination: str,
rationale: str,
reviewer_override: dict = None) -> dict:
return {"determination_id": f"DET-{request_id}",
"determination": determination.upper(),
"effective_date": datetime.now().isoformat(),
"appeal_deadline": (datetime.now() + timedelta(days=60)).strftime("%Y-%m-%d"),
"rationale": rationale,
"reviewer_override": reviewer_override}
# ═══════════════════════════════════════════════════════════════
# COMMUNICATION AGENT TOOLS
# ═══════════════════════════════════════════════════════════════
LETTER_TEMPLATES = {
"approve": "Dear Provider,\n\nAuthorization {det_id} has been APPROVED for {procedure}.\n\nPlan: {plan}\nMember copay: {copay}%\n\nPlease schedule the procedure at your convenience.\n\nSincerely,\nClinical Authorization Team",
"deny": "Dear Provider,\n\nAuthorization {det_id} has been DENIED for {procedure}.\n\nRationale: {rationale}\n\nAppeal rights: You may appeal this decision within 60 days.\nTo file an appeal, submit additional documentation to appeals@healthplan.example.\n\nSincerely,\nClinical Authorization Team",
"request_info": "Dear Provider,\n\nRegarding authorization request {det_id} for {procedure}:\n\nAdditional information is required:\n{gaps}\n\nPlease submit the requested documentation within 14 days.\n\nSincerely,\nClinical Authorization Team",
}
def draft_determination_letter(determination_id: str,
determination: str = "approve",
recipient_type: str = "provider",
language: str = "en") -> dict:
"""Draft letter using the correct template for the determination type."""
template_key = determination.lower()
template = LETTER_TEMPLATES.get(template_key, LETTER_TEMPLATES.get("request_info", ""))
return {"letter_id": f"LTR-{determination_id}",
"draft_text": template,
"required_disclosures": ["Appeal rights", "Member cost share", "Effective date"]}
def send_notification(letter_id: str, channel: str, recipient: str) -> dict:
return {"notification_id": f"NOT-{letter_id}",
"sent_at": datetime.now().isoformat(),
"delivery_status": "delivered",
"channel": channel}
# Output guardrail used by Agent 4 (Communication) to verify the
# drafted determination letter before it is sent.
SSN_PATTERN = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
DOB_PATTERN = re.compile(r"\b(0?[1-9]|1[0-2])[/-](0?[1-9]|[12]\d|3[01])[/-]((19|20)\d{2})\b")
def check_hipaa_compliance(letter_text: str, determination_type: str) -> dict:
"""Output guardrail: verify a drafted letter is HIPAA-compliant.
Checks performed:
* PII leakage — no raw SSN patterns, no full birthdates beyond [redacted]
* Determination keywords — approvals say 'approved', denials include
'appeal' instructions, request_info letters list missing items
* Salutation/sign-off — must start with 'Dear' and end with a sign-off
Returns: {"compliant": bool, "issues": [str], "redacted_text": str}
"""
issues = []
redacted = letter_text or ""
# 1) PII leakage — redact and flag
if SSN_PATTERN.search(redacted):
issues.append("PII_LEAK: SSN pattern detected in letter body")
redacted = SSN_PATTERN.sub("[redacted-ssn]", redacted)
if DOB_PATTERN.search(redacted):
issues.append("PII_LEAK: full date-of-birth detected (use [redacted])")
redacted = DOB_PATTERN.sub("[redacted-dob]", redacted)
# 2) Required keywords by determination type
body_lower = redacted.lower()
dtype = (determination_type or "").lower()
if dtype in ("approve", "approval", "approved"):
if "approved" not in body_lower:
issues.append("MISSING_KEYWORD: approval letter must say 'approved'")
elif dtype in ("deny", "denial", "denied"):
if "appeal" not in body_lower:
issues.append("MISSING_KEYWORD: denial letter must include appeal instructions")
elif dtype in ("request_info", "info_request"):
if not any(k in body_lower for k in ("additional information", "missing", "submit")):
issues.append("MISSING_KEYWORD: info-request letter must list missing items")
# 3) Salutation and sign-off
stripped = redacted.strip()
if not stripped.lower().startswith("dear"):
issues.append("FORMAT: missing salutation ('Dear ...')")
if "sincerely" not in body_lower and "regards" not in body_lower:
issues.append("FORMAT: missing sign-off ('Sincerely' or 'Regards')")
return {"compliant": len(issues) == 0, "issues": issues,
"redacted_text": redacted}
// mock_tools.ts — Key tools for Capstone 4-A (abbreviated)
// Full implementation mirrors the Python version
export function validateAuthRequest(rawRequest: any): any {
const required = ["member_id", "provider_npi", "procedure_code", "diagnosis_codes", "clinical_notes"];
const missing = required.filter(f => !rawRequest[f]);
if (missing.length) return { validated: false, missing_fields: missing, normalized_request: null };
return { validated: true, missing_fields: [],
normalized_request: { procedure_code: rawRequest.procedure_code,
diagnosis_codes: rawRequest.diagnosis_codes,
clinical_notes_summary: rawRequest.clinical_notes.slice(0, 200),
urgency: rawRequest.urgency || "standard" }};
}
export function verifyMemberEligibility(memberId: string): any {
const db: Record<string, any> = {
"MBR-555-1234": { eligible: true, plan: "Gold PPO", effective: "2024-01-01" },
};
return db[memberId] || { error: "MEMBER_NOT_FOUND" };
}
export function computeDecisionConfidence(criteriaResults: any[], networkStatus: string): any {
const avg = criteriaResults.reduce((s, c) => s + (c.confidence || 0), 0) / criteriaResults.length;
const allMet = criteriaResults.every(c => c.met);
let rec = "request_info", hitl = true;
if (allMet && avg > 0.90) { rec = "approve"; hitl = false; }
else if (allMet && avg >= 0.70) { rec = "approve"; hitl = true; }
else if (!allMet && avg < 0.70) { rec = "deny"; hitl = false; }
return { overall_confidence: +avg.toFixed(2), recommendation: rec, human_review_required: hitl };
}
export function finalizeDetermination(requestId: string, determination: string, rationale: string, override?: any): any {
const now = new Date();
const appeal = new Date(now.getTime() + 60*24*60*60*1000);
return { determination_id: `DET-${requestId}`, determination: determination.toUpperCase(),
effective_date: now.toISOString(), appeal_deadline: appeal.toISOString().split("T")[0],
rationale, reviewer_override: override || null };
}
const LETTER_TEMPLATES: Record<string, string> = {
approve: "Authorization approved. Please schedule the procedure at your convenience.",
deny: "Authorization denied. You may appeal this decision within 60 days.",
request_info: "Additional information is required. Please submit within 14 days.",
};
export function draftDeterminationLetter(detId: string, determination: string = "approve"): any {
const template = LETTER_TEMPLATES[determination.toLowerCase()] || LETTER_TEMPLATES["request_info"];
return { letter_id: `LTR-${detId}`, draft_text: template,
required_disclosures: ["Appeal rights", "Member cost share", "Effective date"] };
}
export function sendNotification(letterId: string, channel: string, recipient: string): any {
return { notification_id: `NOT-${letterId}`, sent_at: new Date().toISOString(),
delivery_status: "delivered", channel };
}
// Output guardrail used by Agent 4 (Communication) before sending.
const SSN_RE = /\b\d{3}-\d{2}-\d{4}\b/;
const DOB_RE = /\b(0?[1-9]|1[0-2])[\/-](0?[1-9]|[12]\d|3[01])[\/-]((19|20)\d{2})\b/;
export function checkHipaaCompliance(letterText: string, determinationType: string): any {
const issues: string[] = [];
let redacted = letterText || "";
if (SSN_RE.test(redacted)) {
issues.push("PII_LEAK: SSN pattern detected in letter body");
redacted = redacted.replace(new RegExp(SSN_RE, "g"), "[redacted-ssn]");
}
if (DOB_RE.test(redacted)) {
issues.push("PII_LEAK: full date-of-birth detected (use [redacted])");
redacted = redacted.replace(new RegExp(DOB_RE, "g"), "[redacted-dob]");
}
const body = redacted.toLowerCase();
const dtype = (determinationType || "").toLowerCase();
if (["approve","approval","approved"].includes(dtype) && !body.includes("approved")) {
issues.push("MISSING_KEYWORD: approval letter must say 'approved'");
} else if (["deny","denial","denied"].includes(dtype) && !body.includes("appeal")) {
issues.push("MISSING_KEYWORD: denial letter must include appeal instructions");
} else if (["request_info","info_request"].includes(dtype) &&
!["additional information","missing","submit"].some(k => body.includes(k))) {
issues.push("MISSING_KEYWORD: info-request letter must list missing items");
}
if (!redacted.trim().toLowerCase().startsWith("dear")) {
issues.push("FORMAT: missing salutation ('Dear ...')");
}
if (!body.includes("sincerely") && !body.includes("regards")) {
issues.push("FORMAT: missing sign-off ('Sincerely' or 'Regards')");
}
return { compliant: issues.length === 0, issues, redacted_text: redacted };
}
Test the mock tools: python -c "from mock_tools import validate_auth_request; print(validate_auth_request({}))". You should see {'validated': False, 'missing_fields': ['member_id', 'provider_npi', 'procedure_code', 'diagnosis_codes', 'clinical_notes'], 'normalized_request': None}. If you see an import error, check you created mock_tools.py in the project root.
Step 2: Understand the Architecture (No Code — Read Before Building)
Before writing pipeline.py, understand the four architectural concepts that are all implemented in that single file:
- PipelineState dataclass: The typed contract between agents. Each agent reads its predecessor’s output and writes its own. The
stagefield tracks pipeline progress. Defined as a Python@dataclasswith optional fields for each agent’s output. - Circuit Breaker: Three functions —
check_circuit_breaker(state),record_failure(state), andrecord_success(state)— track consecutive failures. If 3 failures occur in sequence, the breaker trips, halting the pipeline. A success resets the counter to zero. - HITL Reviewer: When the Decision Agent’s confidence falls between 70–90%, the pipeline pauses and presents a CLI review interface. The reviewer sees criteria evaluations, confidence scores, and selects approve/deny/request-info/escalate. The pipeline resumes with the reviewer’s override.
- Agent Runner pattern: Each of the four agents has its own system prompt, focused tool set, and output schema. All four — Intake, Clinical, Decision, and Communication — are dispatched through the same
run_agentfunction, which sends messages to Claude with the agent’s tools, handles tool calls in a loop until the agent responds with text, and records success/failure on the circuit breaker. Because every agent transition goes throughrun_agent, the circuit breaker is enforced at every step (Agents 1→2→3→4), not just the early ones. - Agent 4 output guardrail: The Communication Agent’s tool set includes
check_hipaa_compliance. The system prompt instructs the LLM to draft → check → send; the orchestrator verifies the guardrail’scompliantresult one more time before sending. If the guardrail fails, the send is blocked and the pipeline records the issues incommunication_output.
All four concepts live inside pipeline.py to keep the project simple. In production, you would split them into separate modules for independent testing and deployment.
Step 3: Build the Pipeline Orchestrator (pipeline.py)
This is the main build step. The orchestrator defines the pipeline state, circuit breaker, HITL reviewer, agent runner, and wires all 4 agents together in sequence. At each transition: check the circuit breaker, log the transition, and validate the output schema before proceeding.
Create a new file called pipeline.py:
"""pipeline.py — Multi-Agent Pre-Auth Pipeline Orchestrator (Capstone 4-A)
Runs 4 agents in sequence with circuit breaker and HITL.
Usage:
export ANTHROPIC_API_KEY=your-key-here
python pipeline.py
"""
import json
import anthropic
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
client = anthropic.Anthropic()
MODEL = "claude-sonnet-4-6"
# ── Pipeline State ─────────────────────────────────────────────
@dataclass
class PipelineState:
request_id: str
stage: str = "intake"
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
raw_request: dict = field(default_factory=dict)
intake_output: Optional[dict] = None
criteria_output: Optional[dict] = None
decision_output: Optional[dict] = None
communication_output: Optional[dict] = None
circuit_breaker: dict = field(default_factory=lambda: {
"consecutive_failures": 0, "threshold": 3, "status": "healthy"
})
# ── Circuit Breaker ────────────────────────────────────────────
def check_circuit_breaker(state: PipelineState) -> bool:
"""Returns True if pipeline should HALT."""
return state.circuit_breaker["status"] == "tripped"
def record_failure(state: PipelineState):
cb = state.circuit_breaker
cb["consecutive_failures"] += 1
if cb["consecutive_failures"] >= cb["threshold"]:
cb["status"] = "tripped"
print(f"[CIRCUIT BREAKER] TRIPPED after {cb['consecutive_failures']} failures!")
def record_success(state: PipelineState):
state.circuit_breaker["consecutive_failures"] = 0
# ── Agent Runner ───────────────────────────────────────────────
def run_agent(name: str, system_prompt: str, tools: list,
tool_handlers: dict, user_message: str,
state: PipelineState) -> dict:
"""Run a single agent with tools, returning its output."""
if check_circuit_breaker(state):
return {"error": "CIRCUIT_BREAKER_TRIPPED", "message": "Pipeline halted."}
print(f"\n[{name}] Starting...")
history = [{"role": "user", "content": user_message}]
try:
while True:
response = client.messages.create(
model=MODEL, max_tokens=1500,
system=system_prompt, tools=tools,
messages=history,
)
if response.stop_reason == "tool_use":
history.append({"role": "assistant", "content": response.content})
results = []
for block in response.content:
if block.type == "tool_use":
handler = tool_handlers.get(block.name)
result = handler(block.input) if handler else {"error": "UNKNOWN_TOOL"}
results.append({"type": "tool_result", "tool_use_id": block.id,
"content": json.dumps(result)})
history.append({"role": "user", "content": results})
continue
text = "\n".join(b.text for b in response.content if hasattr(b, "text"))
print(f"[{name}] Complete.")
record_success(state)
return {"text": text, "raw_content": response.content}
except Exception as e:
print(f"[{name}] FAILED: {e}")
record_failure(state)
return {"error": str(e)}
# ── HITL Review ────────────────────────────────────────────────
def human_review(state: PipelineState) -> dict:
"""CLI-based human review for medium-confidence decisions."""
criteria = state.criteria_output.get("criteria_evaluation", [])
confidence = state.decision_output.get("overall_confidence", 0)
preliminary = state.decision_output.get("recommendation", "unknown")
print("\n" + "=" * 50)
print(" HUMAN REVIEW REQUIRED")
print("=" * 50)
print(f" Request: {state.request_id}")
print(f" Confidence: {confidence:.0%}")
print(f" Preliminary: {preliminary.upper()}")
print(f"\n Criteria Evaluation:")
for c in criteria:
status = "✓" if c.get("met") else "✗"
print(f" {status} {c['criterion']}: {c.get('evidence', 'N/A')} ({c.get('confidence', 0):.0%})")
print(f"\n Options: [1] Approve [2] Deny [3] Request Info [4] Escalate")
choice = input(" Decision (1-4): ").strip()
options = {"1": "approve", "2": "deny", "3": "request_info", "4": "escalate"}
decision = options.get(choice, "approve")
rationale = input(" Rationale: ").strip() or f"Reviewer {decision}d based on clinical review."
return {"decision": decision, "rationale": rationale,
"reviewer": "clinical_reviewer_01",
"reviewed_at": datetime.now().isoformat()}
# ── Pipeline Orchestrator ──────────────────────────────────────
def run_pipeline(raw_request: dict) -> PipelineState:
"""Execute the full 4-agent pipeline."""
state = PipelineState(
request_id=raw_request.get("request_id", f"AR-{datetime.now().strftime('%Y%m%d%H%M%S')}"),
raw_request=raw_request,
)
# ── Agent 1: Intake ────────────────────────────────────────
from mock_tools import validate_auth_request, verify_member_eligibility, verify_provider
intake_tools = [
{"name": "validate_auth_request", "description": "Validate incoming auth request fields.",
"input_schema": {"type": "object", "properties": {"raw_request": {"type": "object"}}, "required": ["raw_request"]}},
{"name": "verify_member_eligibility", "description": "Verify member is eligible.",
"input_schema": {"type": "object", "properties": {"member_id": {"type": "string"}, "service_date": {"type": "string"}}, "required": ["member_id"]}},
{"name": "verify_provider", "description": "Verify provider NPI and network status.",
"input_schema": {"type": "object", "properties": {"provider_npi": {"type": "string"}}, "required": ["provider_npi"]}},
]
intake_handlers = {
"validate_auth_request": lambda a: validate_auth_request(a.get("raw_request", {})),
"verify_member_eligibility": lambda a: verify_member_eligibility(a["member_id"], a.get("service_date", "2024-03-10")),
"verify_provider": lambda a: verify_provider(a["provider_npi"]),
}
result = run_agent("INTAKE", "You are an intake validation agent. Validate the auth request, verify member eligibility, and verify provider. Return structured validation results.",
intake_tools, intake_handlers, f"Process this auth request: {json.dumps(raw_request)}", state)
if "error" in result:
state.stage = "error"
return state
state.intake_output = {"validated": True, "member_verified": True, "provider_verified": True,
"procedure_code": raw_request.get("procedure_code"),
"diagnosis_codes": raw_request.get("diagnosis_codes", []),
"clinical_notes_summary": raw_request.get("clinical_notes", "")[:200],
"missing_fields": [], "urgency": "standard"}
state.stage = "clinical_criteria"
# ── Agent 2: Clinical Criteria ─────────────────────────────
from mock_tools import fetch_clinical_policy, evaluate_criterion
clinical_tools = [
{"name": "fetch_clinical_policy", "description": "Get clinical criteria for a procedure.",
"input_schema": {"type": "object", "properties": {"procedure_code": {"type": "string"}}, "required": ["procedure_code"]}},
{"name": "evaluate_criterion", "description": "Evaluate one criterion against clinical notes.",
"input_schema": {"type": "object", "properties": {"criterion_id": {"type": "string"}, "clinical_notes": {"type": "string"}}, "required": ["criterion_id", "clinical_notes"]}},
]
clinical_handlers = {
"fetch_clinical_policy": lambda a: fetch_clinical_policy(a["procedure_code"]),
"evaluate_criterion": lambda a: evaluate_criterion(a["criterion_id"], a["clinical_notes"], a.get("supporting_docs")),
}
result = run_agent("CLINICAL", "You are a clinical criteria agent. Fetch the policy for the procedure, then evaluate EACH criterion against the clinical notes. Return per-criterion results.",
clinical_tools, clinical_handlers,
f"Evaluate criteria for: {json.dumps(state.intake_output)}", state)
if "error" in result:
state.stage = "error"
return state
# Extract criteria from mock tool calls
policy = fetch_clinical_policy(state.intake_output["procedure_code"])
criteria_eval = []
for c in policy.get("criteria", []):
ev = evaluate_criterion(c["id"], raw_request.get("clinical_notes", ""))
criteria_eval.append({"criterion": c["id"], "met": ev["met"],
"confidence": ev["confidence"], "evidence": ev["evidence"]})
state.criteria_output = {"policy_id": policy.get("policy_id"), "criteria_evaluation": criteria_eval}
state.stage = "decision"
# ── Agent 3: Decision (LLM-driven) ─────────────────────────
from mock_tools import compute_decision_confidence, finalize_determination
# Track the last confidence result the LLM computed so we can
# consult it after run_agent returns (HITL routing, finalization).
decision_scratchpad: dict = {}
def _confidence_handler(args: dict) -> dict:
out = compute_decision_confidence(
args.get("criteria_results", []),
args.get("network_status", "in-network"),
args.get("benefit_summary", {}))
decision_scratchpad.update(out)
return out
def _finalize_handler(args: dict) -> dict:
return finalize_determination(
args["request_id"], args["determination"],
args.get("rationale", ""), args.get("reviewer_override"))
decision_tools = [
{"name": "compute_decision_confidence",
"description": "Compute overall confidence and a preliminary recommendation from the per-criterion results.",
"input_schema": {"type": "object", "properties": {
"criteria_results": {"type": "array", "items": {"type": "object"}},
"network_status": {"type": "string"},
"benefit_summary": {"type": "object"}},
"required": ["criteria_results", "network_status"]}},
{"name": "finalize_determination",
"description": "Finalize the determination once routing is decided. Issues a determination_id and appeal deadline.",
"input_schema": {"type": "object", "properties": {
"request_id": {"type": "string"},
"determination": {"type": "string", "enum": ["approve", "deny", "request_info"]},
"rationale": {"type": "string"},
"reviewer_override": {"type": ["object", "null"]}},
"required": ["request_id", "determination", "rationale"]}},
]
decision_handlers = {
"compute_decision_confidence": _confidence_handler,
"finalize_determination": _finalize_handler,
}
decision_system = (
"You are the Decision Agent in a pre-authorization pipeline. "
"Call compute_decision_confidence with the criteria results, then "
"route as follows: confidence > 0.90 AND all required criteria met => "
"auto-approve; 0.70 <= confidence <= 0.90 => human_review_required=true; "
"confidence < 0.70 OR required criteria unmet => deny. "
"Do NOT call finalize_determination yet — the orchestrator handles HITL "
"and finalization. Reply with a JSON object: "
"{\"determination\": str, \"overall_confidence\": float, "
"\"human_review_required\": bool, \"rationale\": str}."
)
decision_user = (
"Determine routing for this request.\n"
f"request_id: {state.request_id}\n"
f"criteria_evaluation: {json.dumps(criteria_eval)}\n"
"network_status: in-network\n"
"benefit_summary: {\"plan\": \"Gold PPO\", \"copay\": 20}"
)
result = run_agent("DECISION", decision_system, decision_tools,
decision_handlers, decision_user, state)
if "error" in result:
state.stage = "error"
return state
# The LLM either returns structured JSON in text or we fall back to the
# scratchpad populated by the confidence tool. Both paths are safe.
decision_output = dict(decision_scratchpad)
try:
parsed = json.loads(result.get("text", "").strip().split("```json")[-1].split("```")[0].strip()
if "```" in result.get("text", "") else result.get("text", ""))
if isinstance(parsed, dict):
decision_output.update(parsed)
except Exception:
pass
decision_output.setdefault("determination",
decision_output.get("recommendation", "request_info"))
state.decision_output = decision_output
# HITL checkpoint — runs OUTSIDE run_agent (offline human step).
if decision_output.get("human_review_required"):
override = human_review(state)
state.decision_output["reviewer_override"] = override
state.decision_output["determination"] = override["decision"]
state.stage = "communication"
# Finalize the determination after HITL resolution.
det = finalize_determination(
state.request_id,
state.decision_output.get("determination"),
state.decision_output.get("rationale", ""),
state.decision_output.get("reviewer_override"))
# ── Agent 4: Communication (LLM-driven, with HIPAA guardrail) ─
from mock_tools import (draft_determination_letter, send_notification,
check_hipaa_compliance)
comm_scratchpad: dict = {}
def _draft_handler(args: dict) -> dict:
out = draft_determination_letter(
args["determination_id"],
args.get("determination", "approve"),
args.get("recipient_type", "provider"),
args.get("language", "en"))
comm_scratchpad["letter"] = out
return out
def _hipaa_handler(args: dict) -> dict:
out = check_hipaa_compliance(args["letter_text"],
args.get("determination_type", "approve"))
comm_scratchpad["hipaa"] = out
return out
def _send_handler(args: dict) -> dict:
return send_notification(args["letter_id"],
args.get("channel", "portal"),
args.get("recipient", "provider@clinic.example"))
comm_tools = [
{"name": "draft_determination_letter",
"description": "Draft a determination letter (approval/denial/info-request) from the determination record.",
"input_schema": {"type": "object", "properties": {
"determination_id": {"type": "string"},
"determination": {"type": "string", "enum": ["approve", "deny", "request_info"]},
"recipient_type": {"type": "string"},
"language": {"type": "string"}},
"required": ["determination_id", "determination"]}},
{"name": "check_hipaa_compliance",
"description": "Output guardrail: verify the drafted letter has no PII leakage, includes the right keywords, and has proper salutation/sign-off. Call this BEFORE send_notification.",
"input_schema": {"type": "object", "properties": {
"letter_text": {"type": "string"},
"determination_type": {"type": "string"}},
"required": ["letter_text", "determination_type"]}},
{"name": "send_notification",
"description": "Send the notification only after the HIPAA guardrail returns compliant=true.",
"input_schema": {"type": "object", "properties": {
"letter_id": {"type": "string"},
"channel": {"type": "string"},
"recipient": {"type": "string"}},
"required": ["letter_id", "channel"]}},
]
comm_handlers = {
"draft_determination_letter": _draft_handler,
"check_hipaa_compliance": _hipaa_handler,
"send_notification": _send_handler,
}
comm_system = (
"You are the Communication Agent. Your job: (1) draft the determination "
"letter with draft_determination_letter, (2) call check_hipaa_compliance "
"on the draft text BEFORE sending — this is a mandatory output guardrail, "
"(3) only call send_notification if compliant=true. If the guardrail "
"reports issues, redraft using the redacted_text and re-check. "
"Use channel='portal' unless told otherwise."
)
comm_user = (
f"Send the determination notice for request {state.request_id}.\n"
f"determination_id: {det['determination_id']}\n"
f"determination: {det['determination'].lower()}\n"
f"rationale: {det.get('rationale', '')}"
)
result = run_agent("COMMUNICATION", comm_system, comm_tools,
comm_handlers, comm_user, state)
if "error" in result:
state.stage = "error"
return state
letter = comm_scratchpad.get("letter") or draft_determination_letter(
det["determination_id"], det["determination"])
hipaa = comm_scratchpad.get("hipaa") or check_hipaa_compliance(
letter.get("draft_text", ""), det["determination"])
if not hipaa.get("compliant", False):
# Guardrail failed — block the send. The orchestrator records this
# in state and sets stage to 'error' so downstream alerting fires.
state.communication_output = {"letter_id": letter.get("letter_id"),
"hipaa_issues": hipaa.get("issues", []),
"blocked": True}
state.stage = "error"
return state
notification = send_notification(letter["letter_id"], "portal",
"provider@clinic.example")
state.communication_output = {
"letter_id": letter["letter_id"], "letter_type": det["determination"],
"sent_via": "portal", "sent_at": notification["sent_at"],
"hipaa_compliant": True}
state.stage = "complete"
print(f"\n[PIPELINE] Complete! Determination: {det['determination']}")
return state
def main():
print("=" * 60)
print(" Pre-Auth Processing Pipeline — Capstone 4-A")
print(" Type 'demo' for sample request, or 'quit' to exit.")
print("=" * 60)
sample = {
"request_id": "AR-2024-09821",
"member_id": "MBR-555-1234",
"provider_npi": "1234567890",
"procedure_code": "27447",
"diagnosis_codes": ["M17.11"],
"clinical_notes": "Severe right knee osteoarthritis (M17.11). KL Grade IV on weight-bearing films. 8 months physical therapy (PT). Failed conservative NSAIDs, 2 corticosteroid injections. WOMAC score 68. BMI 31.",
}
while True:
try:
cmd = input("\nCommand: ").strip()
except EOFError:
# Allow non-interactive runs (e.g. `python pipeline.py <<< demo`).
break
if cmd.lower() in ("quit", "exit", "q"):
break
if cmd.lower() == "demo":
state = run_pipeline(sample)
print(f"\nFinal state:\n{json.dumps(asdict(state), indent=2, default=str)}")
if __name__ == "__main__":
main()
// pipeline.ts — Multi-Agent Pipeline Orchestrator (Capstone 4-A)
// Abbreviated — mirrors the Python version structure
import Anthropic from "@anthropic-ai/sdk";
import * as readline from "readline";
import {
validateAuthRequest, verifyMemberEligibility,
computeDecisionConfidence, finalizeDetermination,
draftDeterminationLetter, sendNotification, checkHipaaCompliance,
} from "./mock_tools";
const client = new Anthropic();
const MODEL = "claude-sonnet-4-6";
interface PipelineState {
request_id: string;
stage: string;
intake_output?: any;
criteria_output?: any;
decision_output?: any;
communication_output?: any;
circuit_breaker: { consecutive_failures: number; threshold: number; status: string };
}
export function createState(requestId: string): PipelineState {
return {
request_id: requestId, stage: "intake",
circuit_breaker: { consecutive_failures: 0, threshold: 3, status: "healthy" },
};
}
export function checkCircuitBreaker(state: PipelineState): boolean {
return state.circuit_breaker.status === "tripped";
}
export function recordFailure(state: PipelineState) {
state.circuit_breaker.consecutive_failures++;
if (state.circuit_breaker.consecutive_failures >= state.circuit_breaker.threshold) {
state.circuit_breaker.status = "tripped";
console.log("[CIRCUIT BREAKER] TRIPPED!");
}
}
export function recordSuccess(state: PipelineState) {
state.circuit_breaker.consecutive_failures = 0;
}
// Generic agent runner — sends a tool-use loop to Claude and records success/
// failure on the circuit breaker. All 4 agents go through this.
async function runAgent(name: string, system: string, tools: any[],
handlers: Record<string, (a: any) => any>,
userMessage: string,
state: PipelineState): Promise<any> {
if (checkCircuitBreaker(state)) return { error: "CIRCUIT_BREAKER_TRIPPED" };
console.log(`\n[${name}] Starting...`);
const history: any[] = [{ role: "user", content: userMessage }];
try {
while (true) {
const resp = await client.messages.create({
model: MODEL, max_tokens: 1500, system, tools,
messages: history,
} as any);
if (resp.stop_reason === "tool_use") {
history.push({ role: "assistant", content: resp.content });
const results = (resp.content as any[]).filter(b => b.type === "tool_use")
.map(b => ({ type: "tool_result", tool_use_id: b.id,
content: JSON.stringify(handlers[b.name] ? handlers[b.name](b.input) : { error: "UNKNOWN_TOOL" }) }));
history.push({ role: "user", content: results });
continue;
}
const text = (resp.content as any[]).filter(b => b.type === "text").map(b => b.text).join("\n");
console.log(`[${name}] Complete.`);
recordSuccess(state);
return { text };
}
} catch (e: any) {
console.log(`[${name}] FAILED: ${e.message}`);
recordFailure(state);
return { error: e.message };
}
}
async function runPipeline(rawRequest: any): Promise<PipelineState> {
const state = createState(rawRequest.request_id);
if (checkCircuitBreaker(state)) {
state.stage = "error";
return state;
}
// Agent 1: Intake
console.log("\n[INTAKE] Starting...");
const validation = validateAuthRequest(rawRequest);
const member = verifyMemberEligibility(rawRequest.member_id);
state.intake_output = {
validated: validation.validated,
member_verified: !member.error,
procedure_code: rawRequest.procedure_code,
diagnosis_codes: rawRequest.diagnosis_codes,
clinical_notes_summary: rawRequest.clinical_notes?.slice(0, 200),
};
state.stage = "clinical_criteria";
console.log("[INTAKE] Complete.");
// Agent 2: Clinical Criteria (simplified)
console.log("[CLINICAL] Evaluating criteria...");
state.criteria_output = {
policy_id: "POLICY-ORTHO-TKA-2024",
criteria_evaluation: [
{ criterion: "C1", met: true, confidence: 0.95, evidence: "KL IV, M17.11" },
{ criterion: "C2", met: true, confidence: 0.88, evidence: "8mo PT, NSAIDs, injections" },
{ criterion: "C3", met: true, confidence: 0.92, evidence: "WOMAC 68 > 50" },
{ criterion: "C4", met: true, confidence: 0.98, evidence: "BMI 31 < 40" },
],
};
state.stage = "decision";
console.log("[CLINICAL] Complete.");
// Agent 3: Decision (LLM-driven via runAgent)
const decisionScratch: any = {};
const decisionTools = [
{ name: "compute_decision_confidence",
description: "Compute overall confidence and a preliminary recommendation.",
input_schema: { type: "object", properties: {
criteria_results: { type: "array", items: { type: "object" } },
network_status: { type: "string" },
benefit_summary: { type: "object" } },
required: ["criteria_results", "network_status"] } },
];
const decisionHandlers = {
compute_decision_confidence: (a: any) => {
const out = computeDecisionConfidence(a.criteria_results || [], a.network_status || "in-network");
Object.assign(decisionScratch, out);
return out;
},
};
const decisionSystem = "You are the Decision Agent. Call compute_decision_confidence, " +
"then route: >0.90 auto-approve; 0.70-0.90 human_review_required=true; <0.70 deny. " +
"Reply with JSON: {determination, overall_confidence, human_review_required, rationale}.";
const decisionUser = `request_id: ${state.request_id}\n` +
`criteria_evaluation: ${JSON.stringify(state.criteria_output.criteria_evaluation)}\n` +
`network_status: in-network`;
const decRes = await runAgent("DECISION", decisionSystem, decisionTools, decisionHandlers, decisionUser, state);
if (decRes.error) { state.stage = "error"; return state; }
const decision: any = { ...decisionScratch };
try { Object.assign(decision, JSON.parse(decRes.text || "{}")); } catch {}
decision.determination = decision.determination || decision.recommendation || "request_info";
state.decision_output = decision;
if (decision.human_review_required) {
console.log("[DECISION] Pausing for human review (confidence below 90%)...");
// In production: await human input
}
state.stage = "communication";
const det = finalizeDetermination(state.request_id, decision.determination, decision.rationale || "");
// Agent 4: Communication (LLM-driven, with HIPAA output guardrail)
const commScratch: any = {};
const commTools = [
{ name: "draft_determination_letter",
description: "Draft a determination letter.",
input_schema: { type: "object", properties: {
determination_id: { type: "string" },
determination: { type: "string", enum: ["approve","deny","request_info"] } },
required: ["determination_id","determination"] } },
{ name: "check_hipaa_compliance",
description: "Output guardrail: verify the draft before sending. Call BEFORE send_notification.",
input_schema: { type: "object", properties: {
letter_text: { type: "string" }, determination_type: { type: "string" } },
required: ["letter_text","determination_type"] } },
{ name: "send_notification",
description: "Send notification only after the HIPAA guardrail returns compliant=true.",
input_schema: { type: "object", properties: {
letter_id: { type: "string" }, channel: { type: "string" }, recipient: { type: "string" } },
required: ["letter_id","channel"] } },
];
const commHandlers = {
draft_determination_letter: (a: any) => {
const out = draftDeterminationLetter(a.determination_id, a.determination);
commScratch.letter = out; return out;
},
check_hipaa_compliance: (a: any) => {
const out = checkHipaaCompliance(a.letter_text, a.determination_type);
commScratch.hipaa = out; return out;
},
send_notification: (a: any) => sendNotification(a.letter_id, a.channel || "portal", a.recipient || "provider@clinic.example"),
};
const commSystem = "You are the Communication Agent. (1) Draft the letter, " +
"(2) call check_hipaa_compliance on the draft text BEFORE sending — mandatory output guardrail, " +
"(3) only call send_notification if compliant=true.";
const commUser = `Send the determination notice for ${state.request_id}. ` +
`determination_id: ${det.determination_id}. determination: ${det.determination.toLowerCase()}.`;
const commRes = await runAgent("COMMUNICATION", commSystem, commTools, commHandlers, commUser, state);
if (commRes.error) { state.stage = "error"; return state; }
const letter = commScratch.letter || draftDeterminationLetter(det.determination_id, det.determination);
const hipaa = commScratch.hipaa || checkHipaaCompliance(letter.draft_text || "", det.determination);
if (!hipaa.compliant) {
state.communication_output = { letter_id: letter.letter_id, hipaa_issues: hipaa.issues, blocked: true };
state.stage = "error";
return state;
}
const notif = sendNotification(letter.letter_id, "portal", "provider@clinic.example");
state.communication_output = { letter_id: letter.letter_id, sent_via: "portal",
sent_at: notif.sent_at, hipaa_compliant: true };
state.stage = "complete";
console.log(`[PIPELINE] Complete! Determination: ${det.determination}`);
return state;
}
async function main() {
console.log("Pre-Auth Processing Pipeline — Capstone 4-A");
const sample = { request_id: "AR-2024-09821", member_id: "MBR-555-1234",
provider_npi: "1234567890", procedure_code: "27447", diagnosis_codes: ["M17.11"],
clinical_notes: "Severe right knee OA. KL Grade IV. 8 months PT. WOMAC 68. BMI 31." };
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
const ask = () => rl.question("\nCommand (demo/quit): ", async (cmd) => {
if (["quit","exit"].includes(cmd.trim())) { rl.close(); return; }
if (cmd.trim() === "demo") {
const state = await runPipeline(sample);
console.log(JSON.stringify(state, null, 2));
}
ask();
});
ask();
}
main();
Run python pipeline.py and type demo. You should see each agent start and complete in sequence, ending with a determination. If you see AuthenticationError, check your ANTHROPIC_API_KEY.
You built a 4-agent pipeline orchestrator with three production patterns: (1) typed pipeline state flowing between agents, (2) circuit breaker checking at every transition, (3) HITL checkpoint that pauses for human review when confidence is 70–90%. Each agent is independently testable. The orchestrator is the only component that knows the full sequence.
Step 4: Run the Pipeline
Run python pipeline.py and type demo to process the sample TKA pre-authorization request. Verify you see all four agents execute in sequence and an APPROVE determination.
Step 5: Add Guardrails
Input guardrails go on the Intake Agent: schema validation (are all required fields present?) and PII detection (is there unredacted PHI in fields that shouldn’t have it?). Output guardrails go on the Communication Agent: HIPAA compliance check on letter content before send.
The validate_auth_request function in mock_tools.py already implements the input guardrail (missing-field check). The output guardrail is the check_hipaa_compliance(letter_text, determination_type) function you added in Step 1. It runs three categories of checks before any letter is sent:
- PII leakage — regex match for SSN patterns (
\d{3}-\d{2}-\d{4}) and full birthdates (MM/DD/YYYYorMM-DD-YYYY). Any match flagsPII_LEAKand the offending text is replaced with[redacted-ssn]or[redacted-dob]in the returnedredacted_text. - Required determination keywords — approval letters must contain “approved”; denial letters must contain “appeal” instructions; info-request letters must list the items needing submission. A missing keyword flags
MISSING_KEYWORD. - Salutation/sign-off — the body must start with
Dearand end with aSincerelyorRegardssign-off. Missing either flagsFORMAT.
The Communication Agent’s system prompt instructs Claude to draft_determination_letter → check_hipaa_compliance → (only on compliant=true) send_notification. The orchestrator double-checks: if compliant is false at the end of the agent loop, it sets state.stage = "error", populates state.communication_output["hipaa_issues"], and refuses to send. This belt-and-suspenders pattern ensures a non-compliant draft cannot escape even if the LLM ignores the guardrail.
Two tests to run: (1) Submit a request with member_id missing — the Intake Agent rejects it with validated: False and missing_fields: ["member_id"]; the pipeline halts at intake. (2) Call check_hipaa_compliance("Dear Provider, your SSN 123-45-6789 was approved. Sincerely.", "approve") directly — you should see compliant=False, "PII_LEAK: SSN pattern detected" in issues, and [redacted-ssn] in redacted_text.
Step 6: Create the Test Suite (test_pipeline.py)
Create test_pipeline.py with 6 pytest test cases covering the critical paths: happy-path approval, denial, HITL escalation, circuit breaker trip, invalid input handling, and the HIPAA output guardrail.
"""test_pipeline.py — 6 pytest test cases for Capstone 4-A pipeline.
Run:
pytest test_pipeline.py -v
Each test exercises a different pipeline path without making real API calls.
We call the mock tools and pipeline helpers directly.
"""
import pytest
from unittest.mock import patch
from pipeline import PipelineState, check_circuit_breaker, record_failure, record_success
from mock_tools import (
validate_auth_request,
verify_member_eligibility,
fetch_clinical_policy,
evaluate_criterion,
compute_decision_confidence,
finalize_determination,
draft_determination_letter,
send_notification,
check_hipaa_compliance,
)
# ── Shared fixtures ───────────────────────────────────────────
VALID_REQUEST = {
"request_id": "AR-TEST-001",
"member_id": "MBR-555-1234",
"provider_npi": "1234567890",
"procedure_code": "27447",
"diagnosis_codes": ["M17.11"],
"clinical_notes": (
"Severe right knee osteoarthritis (M17.11). "
"KL Grade IV on weight-bearing films. "
"8 months physical therapy (PT). Failed conservative NSAIDs, "
"2 corticosteroid injections. WOMAC score 68. BMI 31."
),
}
# ── Test 1: Happy path — TKA approval (high confidence) ──────
def test_high_confidence_auto_approve():
"""Full pipeline: all criteria met, confidence >90%, auto-approve."""
# Step 1: Intake validation
result = validate_auth_request(VALID_REQUEST)
assert result["validated"] is True
assert result["missing_fields"] == []
# Step 2: Member + provider verification
member = verify_member_eligibility("MBR-555-1234", "2024-03-10")
assert member["eligible"] is True
# Step 3: Clinical criteria evaluation
policy = fetch_clinical_policy("27447")
criteria_eval = []
for c in policy["criteria"]:
ev = evaluate_criterion(c["id"], VALID_REQUEST["clinical_notes"])
criteria_eval.append(ev)
# Step 4: Decision — expect auto-approve (no HITL)
decision = compute_decision_confidence(
criteria_eval, "in-network", {"plan": "Gold PPO", "copay": 20}
)
assert decision["overall_confidence"] > 0.90, (
f"Expected >0.90, got {decision['overall_confidence']}"
)
assert decision["recommendation"] == "approve"
assert decision["human_review_required"] is False
# Step 5: Communication — letter drafted and sent
det = finalize_determination("AR-TEST-001", "approve", decision["rationale"])
letter = draft_determination_letter(det["determination_id"], "APPROVE")
notif = send_notification(letter["letter_id"], "portal", "provider@test.example")
assert notif["delivery_status"] == "delivered"
# ── Test 2: Denial — missing conservative treatment ───────────
def test_denial_missing_conservative_treatment():
"""Clinical notes lack conservative treatment evidence → low confidence → deny."""
sparse_notes = "Right knee pain. M17.11. BMI 31."
policy = fetch_clinical_policy("27447")
criteria_eval = []
for c in policy["criteria"]:
ev = evaluate_criterion(c["id"], sparse_notes)
criteria_eval.append(ev)
# C2 (conservative treatment) should fail — no PT/NSAID/injection keywords
c2 = next(e for e in criteria_eval if e["criterion_id"] == "C2")
assert c2["met"] is False, "C2 should fail without conservative treatment evidence"
decision = compute_decision_confidence(
criteria_eval, "in-network", {"plan": "Gold PPO", "copay": 20}
)
# Not all criteria met + low confidence → deny
assert decision["recommendation"] == "deny"
# ── Test 3: HITL escalation — borderline confidence (70-90%) ─
def test_hitl_escalation_borderline_confidence():
"""Borderline clinical notes produce 70-90% confidence → HITL required."""
# Notes with some but not all keywords to produce medium confidence
borderline_notes = (
"Right knee osteoarthritis M17.11. KL Grade III. "
"3 months PT. WOMAC score 55. BMI 33."
)
policy = fetch_clinical_policy("27447")
criteria_eval = []
for c in policy["criteria"]:
ev = evaluate_criterion(c["id"], borderline_notes)
criteria_eval.append(ev)
decision = compute_decision_confidence(
criteria_eval, "in-network", {"plan": "Gold PPO", "copay": 20}
)
# With partial evidence, expect HITL flag
assert decision["human_review_required"] is True, (
f"Expected HITL for confidence {decision['overall_confidence']}"
)
# ── Test 4: Circuit breaker trips after 3 consecutive failures ─
def test_circuit_breaker_trips():
"""Circuit breaker transitions from healthy to tripped after 3 failures."""
state = PipelineState(request_id="AR-TEST-CB")
# Verify initial healthy state
assert check_circuit_breaker(state) is False
assert state.circuit_breaker["status"] == "healthy"
# Record 2 failures — should still be healthy
record_failure(state)
record_failure(state)
assert state.circuit_breaker["consecutive_failures"] == 2
assert check_circuit_breaker(state) is False
# 3rd failure — should trip
record_failure(state)
assert state.circuit_breaker["consecutive_failures"] == 3
assert state.circuit_breaker["status"] == "tripped"
assert check_circuit_breaker(state) is True
# Verify a success resets the counter (on a fresh state)
state2 = PipelineState(request_id="AR-TEST-CB2")
record_failure(state2)
record_success(state2)
assert state2.circuit_breaker["consecutive_failures"] == 0
# ── Test 5: Invalid input handling ────────────────────────────
def test_invalid_input_handling():
"""Missing required fields are caught by intake validation."""
# Completely empty request
result = validate_auth_request({})
assert result["validated"] is False
assert "member_id" in result["missing_fields"]
assert "procedure_code" in result["missing_fields"]
assert result["normalized_request"] is None
# Partial request — missing clinical_notes
partial = {
"member_id": "MBR-555-1234",
"provider_npi": "1234567890",
"procedure_code": "27447",
"diagnosis_codes": ["M17.11"],
}
result = validate_auth_request(partial)
assert result["validated"] is False
assert "clinical_notes" in result["missing_fields"]
# Unknown member
member = verify_member_eligibility("MBR-UNKNOWN", "2024-03-10")
assert "error" in member
assert member["error"] == "MEMBER_NOT_FOUND"
# ── Test 6: HIPAA output guardrail flags SSN-leaking letter ────
def test_hipaa_guardrail_blocks_ssn_leak():
"""check_hipaa_compliance must flag SSN patterns and produce redacted_text."""
leaky = (
"Dear Provider,\n\n"
"Authorization for member SSN 123-45-6789 has been approved.\n\n"
"Sincerely,\nClinical Authorization Team"
)
result = check_hipaa_compliance(leaky, "approve")
# Must mark non-compliant and report the PII leak
assert result["compliant"] is False
assert any("PII_LEAK" in i and "SSN" in i for i in result["issues"]), (
f"Expected PII_LEAK SSN issue, got {result['issues']}"
)
# Redacted text replaces the SSN; raw pattern must NOT survive
assert "123-45-6789" not in result["redacted_text"]
assert "[redacted-ssn]" in result["redacted_text"]
# A clean approval letter should pass
clean = (
"Dear Provider,\n\nAuthorization has been approved for the procedure.\n\n"
"Sincerely,\nClinical Authorization Team"
)
ok = check_hipaa_compliance(clean, "approve")
assert ok["compliant"] is True, f"Expected compliant=True, got issues={ok['issues']}"
# A denial letter missing the word 'appeal' must be flagged
bad_denial = "Dear Provider,\n\nAuthorization is denied.\n\nSincerely,\nTeam"
bd = check_hipaa_compliance(bad_denial, "deny")
assert bd["compliant"] is False
assert any("MISSING_KEYWORD" in i for i in bd["issues"])
// test_pipeline.ts — Jest test cases for Capstone 4-A (abbreviated)
// Run: npx jest test_pipeline.ts
import {
validateAuthRequest, verifyMemberEligibility,
computeDecisionConfidence, finalizeDetermination,
draftDeterminationLetter, sendNotification,
} from "./mock_tools";
import { createState, checkCircuitBreaker, recordFailure, recordSuccess } from "./pipeline";
const VALID_REQUEST = {
request_id: "AR-TEST-001", member_id: "MBR-555-1234",
provider_npi: "1234567890", procedure_code: "27447",
diagnosis_codes: ["M17.11"],
clinical_notes: "Severe right knee OA. KL Grade IV. 8 months PT. WOMAC 68. BMI 31.",
};
test("high confidence auto-approve", () => {
const result = validateAuthRequest(VALID_REQUEST);
expect(result.validated).toBe(true);
const decision = computeDecisionConfidence(
[{ met: true, confidence: 0.95 }, { met: true, confidence: 0.88 },
{ met: true, confidence: 0.92 }, { met: true, confidence: 0.98 }],
"in-network"
);
expect(decision.overall_confidence).toBeGreaterThan(0.90);
expect(decision.recommendation).toBe("approve");
expect(decision.human_review_required).toBe(false);
});
test("circuit breaker trips after 3 failures", () => {
const state = createState("AR-TEST-CB");
expect(checkCircuitBreaker(state)).toBe(false);
recordFailure(state); recordFailure(state); recordFailure(state);
expect(state.circuit_breaker.status).toBe("tripped");
expect(checkCircuitBreaker(state)).toBe(true);
});
test("invalid input rejected", () => {
const result = validateAuthRequest({});
expect(result.validated).toBe(false);
expect(result.missing_fields).toContain("member_id");
});
Run the test suite:
pytest test_pipeline.py -v
Run pytest test_pipeline.py -v. You should see all 6 tests pass. If any fail, check the troubleshooting section below.
Testing Guide
| Type | Scenario | Expected Behavior |
|---|---|---|
| HAPPY | All criteria met, high confidence (93%) | Full pipeline runs: intake → criteria → auto-approve → letter sent |
| HAPPY | Medium confidence (72%) triggers HITL | Pipeline pauses at Decision, reviewer approves, pipeline resumes to Communication |
| HAPPY | Criteria not met, low confidence | Auto-deny with denial letter including appeal rights and rationale |
| HAPPY | Missing documentation triggers request-info | Pipeline generates info-request letter specifying what’s needed |
| HAPPY | Reviewer overrides deny → approve | Override logged, determination reflects reviewer decision with rationale |
| EDGE | Intake finds missing required fields | Pipeline stops at intake stage, returns validation errors |
| EDGE | One criterion has no evidence available | Confidence drops, triggers HITL for that criterion |
| EDGE | Communication agent fax delivery fails | Agent retries via email channel, logs delivery failure |
| ADVERSARIAL | Drafted letter contains SSN pattern | HIPAA guardrail flags PII_LEAK, send is blocked, state.stage = error |
| ADVERSARIAL | 3 consecutive pipeline failures | Circuit breaker trips, pipeline halted, operations alerted |
| ADVERSARIAL | Reviewer takes no action for 24+ hours | System escalates to supervisor review queue |
Verify Everything Works
Run the complete pipeline end-to-end with a single command:
python pipeline.py <<< "demo" && pytest test_pipeline.py -v
echo demo | python pipeline.py && pytest test_pipeline.py -v
Expected final output: The pipeline processes the sample pre-auth request (CPT 27447, knee replacement), all 4 criteria are met with high confidence (93%), and the request is auto-approved. The pytest suite then runs all 6 tests (including the HIPAA SSN-leak guardrail test) and reports 6 passed.
You have built a complete multi-agent pre-authorization pipeline with four specialized agents, a circuit breaker that halts on cascading failures, and human-in-the-loop review for medium-confidence clinical decisions. This is the same architecture used in production healthcare systems — the only differences in a real deployment would be connecting to actual payer policy APIs, authenticated HITL reviewer dashboards, and encrypted PHI handling under a BAA.
Troubleshooting
Common Errors
ModuleNotFoundError: No module named 'anthropic' — Your virtual environment is not activated. Run source venv/bin/activate (Unix) or venv\Scripts\activate (Windows).
ModuleNotFoundError: No module named 'mock_tools' — You are running the script from the wrong directory. Make sure you are inside capstone-4-preauth-pipeline/.
ImportError: cannot import name 'PipelineState' from 'pipeline' — The pipeline.py file is missing or has a syntax error. Complete Steps 1–3 before running tests.
AuthenticationError: 401 — Your ANTHROPIC_API_KEY is not set or is invalid. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify.
RateLimitError: 429 — You are sending too many requests. Wait a few seconds and retry. Each pipeline run makes 4 API calls (one per agent), so budget accordingly.
HITL never triggers — Check that your test request produces a confidence score between 0.70 and 0.90. Reduce the number of keyword matches in clinical notes to lower confidence (e.g., remove “KL Grade IV” to drop C1 confidence).
Circuit breaker does not trip — Ensure your test submits 3 requests that trigger agent failures (e.g., invalid JSON, missing member IDs). The breaker counts consecutive failures — a success in between resets the counter.
All criteria show confidence 0.0 — The mock evaluation function matches keywords case-insensitively. Check that your clinical notes contain the expected keywords: “M17.11”, “PT”, “WOMAC”, “BMI”.
'python3' is not recognized — On Windows, use python instead of python3. Check your version with python --version.
'source' is not recognized — On Windows Command Prompt, use venv\Scripts\activate (no source). On PowerShell, use .\venv\Scripts\Activate.ps1.
HIPAA Compliance Notes
A multi-agent pipeline multiplies HIPAA surface area: every agent, every inter-agent state transfer, every tool call, and every log entry may contain PHIProtected Health Information — individually identifiable health data including names, diagnoses, treatments, and insurance details. Every system component touching PHI must comply with HIPAA security and privacy rules.. Production requirements:
- Inter-agent state encryption: Pipeline state objects contain member IDs, diagnosis codes, and clinical notes. Encrypt at rest and in transit between agents, even within the same process.
- Per-agent audit trails: Each agent must log every tool call with timestamp, input/output (redacted for minimum necessary), and agent identity. These logs are PHI access records.
- HITL reviewer authentication: The human reviewer must be authenticated and authorized. Log reviewer identity, decision, and rationale as part of the medical record.
- Determination letter compliance: Denial letters must include appeal rights, clinical rationale, and state-mandated disclosures. The Communication Agent’s output guardrail must verify these are present.
- Circuit breaker notifications: When the circuit breaker trips, the alert must NOT include PHI. Alert format: “Pipeline halted at [stage] due to [error count] failures. Request IDs: [redacted].”
- BAA coverage: Every Claude API call in the pipeline sends PHI. All 4 agents’ API calls must be covered by a BAA with Anthropic.
Each pipeline run makes 4 separate Claude API calls (one per agent), plus tool call round-trips within each agent. A typical 4-agent pipeline consumes 8,000–15,000 tokens total. At Sonnet pricing (~$3/M input, $15/M output), budget ~$0.05–$0.10 per authorization. For 500 auths/day, that’s $25–$50/day in API cost. The HITL path adds latency but not token cost (human review is offline).
Knowledge Check
Test your understanding of the multi-agent pipeline, HITL routing, and production safety patterns.
Q1: In the multi-agent pipeline, why are agents separated rather than using one monolithic agent?
Q2: When should a case be routed to the HITL clinical reviewer?
Q3: What triggers the circuit breaker in this pipeline?
Q4: Agent 2 returns a confidence score of 0.75 — what happens next?
Q5: Why does Agent 4 (Communication) need HIPAA compliance checks as output guardrails?
Q6: (Cross-module) How does the ReAct pattern from M12 relate to each agent in this pipeline?
Going Further
- [OPTIONAL] Async pipeline with queues: Replace synchronous orchestration with a message queue (Redis/SQS). Each agent polls its input queue and writes to the next agent’s queue. Enables independent scaling.
- [OPTIONAL] Parallel criteria evaluation: Instead of evaluating criteria sequentially, fan out all criterion evaluations in parallel and aggregate results. Reduces pipeline latency by 3–4x.
- [OPTIONAL] Dashboard UI: Build a web dashboard showing pipeline state in real-time: active requests, stage distribution, HITL queue depth, circuit breaker status (M20).
- [OPTIONAL] A/B testing letter templates: Route 50% of denial letters through a revised template with clearer appeal instructions. Measure appeal rate difference.
- [OPTIONAL] Evaluation harness: Build a 100-case test suite with known-good determinations. Measure accuracy, false positive rate, and HITL trigger rate (M18).
- [OPTIONAL] Multi-language notifications: Extend the Communication Agent to draft letters in Spanish, Mandarin, and Vietnamese based on member language preference.