Capstone 4 — Domain B: B2B Order Lifecycle Pipeline
Build a 4-agent pipeline that manages the full B2B order lifecycle: intake validation, fulfillment planning with HITL for high-value splits, exception monitoring with carrier API circuit breaker, and proactive customer communication.
Project Brief
A B2B distributor processes 200+ purchase orders daily. In Capstone 3, you built a ReAct agent that investigates individual exceptions. But the full order lifecycle — from PO arrival through delivery confirmation — involves four distinct operational stages, each requiring different skills and data sources. A single agent handling everything creates a monolith that can’t scale, can’t be independently tested, and can’t be safely modified without risk to the entire pipeline.
The production solution: four specialized agents in a pipeline. The Order Intake AgentValidates incoming POs (format, credit check, pricing verification), detects duplicates, and enriches line items with current catalog data. Acts as the gateway — garbage in at this stage means errors downstream. validates and enriches POs. The Fulfillment Planning AgentChecks inventory across warehouses, determines shipping strategy (single vs. split shipment), and requests operations manager approval for high-value split shipments exceeding $50,000. allocates inventory and plans shipments. The Exception Monitor AgentContinuously watches shipment status, detects anomalies (delays, quality holds, missing scans), and triggers resolution workflows. Protected by a circuit breaker that trips if the carrier API fails repeatedly. watches for delays and issues. The Customer Communication AgentDrafts and sends proactive status updates, exception notifications, and delivery confirmations. Matches communication tone and channel to customer tier and SLA commitments. sends proactive updates.
Two production patterns make this pipeline trustworthy: HITL where the operations manager approves split shipments over $50K or exception resolutions involving credit issuance, and a circuit breaker that trips when the carrier tracking API fails 5+ times in 10 minutes, switching to fallback polling mode.
- Agent 1 — Order Intake: Validates PO format, checks customer credit, enriches line items with contract pricing. Input guardrails: duplicate PO detection, schema validation.
- Agent 2 — Fulfillment Planning: Checks multi-warehouse inventory, creates shipment plans. If split shipment >$50K, pauses for operations manager HITL approval.
- Agent 3 — Exception Monitor: Polls carrier tracking APIs, detects delays/exceptions. Circuit breaker: 5 carrier failures in 10 min → fallback polling mode. If resolution involves credit/refund issuance, requires ops manager HITL approval.
- Agent 4 — Customer Communication: Drafts proactive updates (confirmation, shipment, delay, delivery). Output guardrails: SLA compliance check, tone matching per customer tier.
Skills practiced: Multi-agent architecture (M14), HITL (M17), guardrails (M16–M17), circuit breaker, evaluation (M18), structured inter-agent state passing.
Prerequisites
Complete these modules before starting this capstone:
- M03–M05 — Tool Use Fundamentals: You need tool definitions, structured outputs, and basic tool orchestration. Every agent in this pipeline uses tool calls.
- M09 — RAG (Retrieval-Augmented Generation): Agent 2 retrieves product catalog and contract data to verify pricing. Understanding RAG patterns helps you extend the pipeline later.
- M12 — ReAct Pattern: Each agent internally uses an observe → think → act loop. The Exception Monitor agent is essentially a ReAct agent that reasons about anomalies.
- M14 — Multi-Agent Systems: The 4-agent pipeline architecture, structured state passing between agents, and agent handoff patterns come directly from M14.
- M16 — Input Guardrails: Agent 1 uses input validation (PO schema checks, duplicate detection) and Agent 4 applies output guardrails (SLA compliance, tone matching). M16 teaches these patterns.
- M17 — Human-in-the-Loop & Guardrails: The HITL approval flow for split shipments and credit resolutions, plus the circuit breaker pattern, are covered in M17.
- M18 — Evaluation: The pytest-based test suite at the end of the capstone uses M18’s scoring + harness patterns to validate end-to-end accuracy.
You should also be comfortable with Python classes, JSON data structures, and running scripts from the command line.
Environment Setup
A multi-agent B2B order lifecycle pipeline with four specialized agents, human-in-the-loop escalation for high-value splits and credit resolutions, and a time-windowed circuit breaker for carrier API failures.
Time estimate: 2–3 hours Difficulty: ★★★★☆
Requirements
- Python 3.10 or higher
- An Anthropic API key (get one at console.anthropic.com)
Install Everything
Copy and paste this entire block into your terminal:
mkdir order-lifecycle-pipeline && cd order-lifecycle-pipeline
python3 -m venv venv && source venv/bin/activate
pip install "anthropic>=0.30.0" pytest
export ANTHROPIC_API_KEY=your-key-here
mkdir order-lifecycle-pipeline && cd order-lifecycle-pipeline
python -m venv venv && venv\Scripts\activate
pip install "anthropic>=0.30.0" pytest
set ANTHROPIC_API_KEY=your-key-here
Run python -c "import anthropic; print('OK')". You should see OK. If you see ModuleNotFoundError, make sure your virtual environment is activated.
File Structure
Here is every file you will create in this capstone. The solution consolidates all agents, state, and circuit breaker logic into pipeline.py for simplicity — no separate agents/ directory needed.
order-lifecycle-pipeline/
├── mock_tools.py # All mock tool implementations (ERP, inventory, carrier, messaging)
├── pipeline.py # All 4 agents + state + circuit breaker + orchestrator
└── test_pipeline.py # Test suite (5 scenarios, pytest)
Domain Glossary
Architecture
Mock Data Specification
{
"po_number": "PO-2024-11234",
"stage": "intake",
"created_at": "2024-03-10T08:00:00Z",
"intake_output": {
"validated": true,
"customer_verified": true,
"credit_check_passed": true,
"duplicate_detected": false,
"line_items_enriched": [
{"sku": "SKU-4892", "qty": 50, "unit_price": 495.00,
"contract_price_verified": true, "available": true},
{"sku": "SKU-7721", "qty": 200, "unit_price": 125.00,
"contract_price_verified": true, "available": false}
],
"total_value": 49750.00,
"priority": "standard"
},
"fulfillment_output": {
"plan_id": "FP-2024-11234",
"requires_split": true,
"shipments": [
{"warehouse": "East", "skus": ["SKU-4892"], "value": 24750.00,
"estimated_ship_date": "2024-03-12", "carrier": "UPS"},
{"warehouse": "West", "skus": ["SKU-7721"], "value": 25000.00,
"estimated_ship_date": "2024-03-15", "carrier": "FedEx"}
],
"total_value": 49750.00,
"human_approval_required": false
},
"exception_output": null,
"communication_output": null,
"circuit_breaker": {
"carrier_api_failures": 0,
"window_start": null,
"threshold": 5,
"window_minutes": 10,
"status": "healthy"
}
}
Step-by-Step Build Guide
Step 1: Create the Mock Tools (mock_tools.py)
What & Why: All four agents need mock tool functions (ERP queries, inventory checks, carrier tracking, messaging). Building them first means you can test each agent independently. The full mock tools code is shown in the Mock Tool Implementations section below.
Create the file mock_tools.py using the code in the Mock Tool Implementations section.
Run:
Expected output:
If both outputs show validated: True and credit_approved: True, Step 1 is working. If you see CUSTOMER_NOT_FOUND, make sure you're passing the exact customer name "Acme Manufacturing".
Step 2: Build the Complete Pipeline (pipeline.py)
What & Why: The pipeline file contains everything: the OrderPipelineState dataclass, the time-windowed CarrierCircuitBreaker, all four agent functions (Intake with Claude reasoning, Fulfillment, Exception Monitor, Communication with Claude drafting), HITL approval flows, and the orchestrator that wires them together. Consolidating into one file keeps the project simple — no import headaches, no package structure to manage.
Create pipeline.py using the complete solution code in the Complete Solution section below.
Run:
Then type demo at the prompt.
Expected output:
If you see [PIPELINE] Complete!, the orchestrator is working end-to-end. If the pipeline stops at [INTAKE] FAILED, check that the sample PO uses customer name "Acme Manufacturing" (must match the credit database). If you see AuthenticationError, verify your ANTHROPIC_API_KEY is set correctly.
Step 3: Test HITL for Split Shipments & Credit Resolutions
What & Why: When the fulfillment plan requires a split shipment exceeding $50K, the pipeline pauses. Present the operations manager with three options: approve split, wait for consolidation, or partial ship with backorder cancellation. Additionally, when the Exception Monitor’s resolution involves issuing a credit or refund, the pipeline pauses for ops manager approval before proceeding.
The HITL functions (split_shipment_review and credit_resolution_review) are already included in pipeline.py. Test by modifying the sample PO to have a total value over $50K:
Run (to trigger HITL):
Expected behavior: If the total value exceeds $50K and a split shipment is required, the pipeline pauses and prompts for operations manager approval. Enter 1 to approve the split.
If you see OPERATIONS MANAGER REVIEW REQUIRED, HITL is working. If the pipeline runs straight through without pausing, check the human_approval_required logic: it should be requires_split AND total > 50000.
Step 4: Create the Test Suite (test_pipeline.py)
What & Why: The test suite runs 5 scenarios covering happy path, edge cases, and adversarial inputs. The complete test code is in the Testing Guide section below.
Run:
Expected output:
If all 5 tests pass, your pipeline is production-ready. If a test fails, check the Testing Guide table for expected behavior and compare against your implementation.
Mock Tool Implementations
"""mock_tools.py — Tools for all 4 pipeline agents."""
from datetime import datetime, timedelta
# ═══ INTAKE AGENT ══════════════════════════════════════════════
CUSTOMER_CREDIT = {
"Acme Manufacturing": {"limit": 100000, "outstanding": 35000, "terms": "Net 45"},
"Beta Industrial": {"limit": 50000, "outstanding": 48000, "terms": "Net 30"},
}
PROCESSED_POS = set()
def validate_purchase_order(raw_po: dict) -> dict:
required = ["po_number", "customer", "line_items"]
missing = [f for f in required if f not in raw_po]
if missing:
return {"validated": False, "errors": missing}
if raw_po["po_number"] in PROCESSED_POS:
return {"validated": False, "errors": ["DUPLICATE_PO"]}
PROCESSED_POS.add(raw_po["po_number"])
return {"validated": True, "errors": [], "normalized_po": raw_po}
def verify_customer_credit(customer_id: str, order_value: float) -> dict:
cust = CUSTOMER_CREDIT.get(customer_id)
if not cust:
return {"error": "CUSTOMER_NOT_FOUND"}
available = cust["limit"] - cust["outstanding"]
return {"credit_approved": available >= order_value,
"credit_limit": cust["limit"], "available_credit": available,
"payment_terms": cust["terms"]}
def enrich_line_items(line_items: list, customer_id: str) -> list:
"""Add contract pricing and availability to each line item."""
enriched = []
for item in line_items:
enriched.append({**item,
"contract_price_verified": True,
"current_availability": item["sku"] != "SKU-7721",
"lead_time": 14 if item["sku"] == "SKU-4892" else 7})
return enriched
# ═══ FULFILLMENT AGENT ═════════════════════════════════════════
WAREHOUSE_INVENTORY = {
("SKU-4892", "East"): {"available": 120, "reserved": 0},
("SKU-4892", "West"): {"available": 30, "reserved": 0},
("SKU-7721", "East"): {"available": 0, "reserved": 0},
("SKU-7721", "West"): {"available": 250, "reserved": 0},
}
def check_inventory_allocation(sku: str, quantity: int, warehouse: str = None) -> dict:
if warehouse:
inv = WAREHOUSE_INVENTORY.get((sku, warehouse))
if not inv:
return {"error": "SKU_NOT_FOUND"}
return {"sku": sku, "warehouse": warehouse,
"available": inv["available"], "allocated": min(quantity, inv["available"]),
"backorder_qty": max(0, quantity - inv["available"])}
# Check all warehouses
results = []
for (s, w), inv in WAREHOUSE_INVENTORY.items():
if s == sku:
results.append({"warehouse": w, "available": inv["available"]})
return {"sku": sku, "warehouses": results}
def create_shipment_plan(po_number: str, line_items: list,
shipping_priority: str = "standard") -> dict:
shipments = []
for item in line_items:
best_wh = None
for (s, w), inv in WAREHOUSE_INVENTORY.items():
if s == item["sku"] and inv["available"] >= item.get("qty", item.get("quantity", 0)):
best_wh = w
break
if best_wh:
shipments.append({"warehouse": best_wh, "skus": [item["sku"]],
"value": item.get("qty", item.get("quantity", 0)) * item.get("unit_price", 0),
"estimated_ship_date": (datetime.now() + timedelta(days=2)).strftime("%Y-%m-%d"),
"carrier": "UPS" if best_wh == "East" else "FedEx"})
requires_split = len(set(s["warehouse"] for s in shipments)) > 1
total = sum(s["value"] for s in shipments)
return {"plan_id": f"FP-{po_number}", "shipments": shipments,
"requires_split": requires_split, "total_value": total,
"human_approval_required": requires_split and total > 50000}
def request_split_shipment_approval(po_number: str, plan_id: str,
total_value: float, reason: str) -> dict:
return {"approval_id": f"APR-{po_number}",
"queue_position": 1, "estimated_response_time": "15 minutes"}
# ═══ EXCEPTION MONITOR ═════════════════════════════════════════
def poll_shipment_status(tracking_numbers: list) -> list:
statuses = []
for tn in tracking_numbers:
statuses.append({"tracking_number": tn, "status": "in-transit",
"carrier": "UPS", "eta": "2024-03-12"})
return statuses
def detect_exceptions(shipment_statuses: list, original_plan: dict) -> list:
exceptions = []
for s in shipment_statuses:
if s.get("status") == "exception":
exceptions.append({"type": "carrier_delay",
"severity": "medium", "tracking": s["tracking_number"],
"recommended_action": "Notify customer with revised ETA"})
return exceptions
# ═══ COMMUNICATION AGENT ═══════════════════════════════════════
def draft_order_update(po_number: str, update_type: str, details: dict) -> dict:
templates = {
"confirmation": f"Your order {po_number} has been confirmed and is being processed.",
"shipment": f"Your order {po_number} has shipped! Tracking: {details.get('tracking', 'N/A')}",
"delay": f"We're writing to let you know about a delay on order {po_number}.",
"delivery": f"Great news! Order {po_number} has been delivered.",
}
return {"draft_id": f"DRF-{po_number}-{update_type}",
"subject": f"Order Update: {po_number}",
"body": templates.get(update_type, "Order update."),
"urgency": "high" if update_type == "delay" else "standard"}
def send_customer_update(draft_id: str, channel: str, customer_contact: str) -> dict:
return {"message_id": f"MSG-{draft_id}",
"sent_at": datetime.now().isoformat(),
"delivery_status": "delivered"}
// mock_tools.ts — Tools for all 4 pipeline agents.
// ═══ INTAKE AGENT ══════════════════════════════════════════════
const CUSTOMER_CREDIT: Record<string, any> = {
"Acme Manufacturing": { limit: 100000, outstanding: 35000, terms: "Net 45" },
"Beta Industrial": { limit: 50000, outstanding: 48000, terms: "Net 30" },
};
const PROCESSED_POS = new Set<string>();
export function validatePurchaseOrder(rawPo: any): any {
const required = ["po_number", "customer", "line_items"];
const missing = required.filter(f => !rawPo[f]);
if (missing.length) return { validated: false, errors: missing };
if (PROCESSED_POS.has(rawPo.po_number)) {
return { validated: false, errors: ["DUPLICATE_PO"] };
}
PROCESSED_POS.add(rawPo.po_number);
return { validated: true, errors: [], normalized_po: rawPo };
}
export function verifyCustomerCredit(customerId: string, orderValue: number): any {
const cust = CUSTOMER_CREDIT[customerId];
if (!cust) return { error: "CUSTOMER_NOT_FOUND" };
const available = cust.limit - cust.outstanding;
return {
credit_approved: available >= orderValue,
credit_limit: cust.limit,
available_credit: available,
payment_terms: cust.terms,
};
}
export function enrichLineItems(lineItems: any[], customerId: string): any[] {
/** Add contract pricing and availability to each line item. */
return lineItems.map(item => ({
...item,
contract_price_verified: true,
current_availability: item.sku !== "SKU-7721",
lead_time: item.sku === "SKU-4892" ? 14 : 7,
}));
}
// ═══ FULFILLMENT AGENT ═════════════════════════════════════════
const WAREHOUSE_INVENTORY: Record<string, { available: number; reserved: number }> = {
"SKU-4892|East": { available: 120, reserved: 0 },
"SKU-4892|West": { available: 30, reserved: 0 },
"SKU-7721|East": { available: 0, reserved: 0 },
"SKU-7721|West": { available: 250, reserved: 0 },
};
export function checkInventoryAllocation(
sku: string, quantity: number, warehouse?: string
): any {
if (warehouse) {
const inv = WAREHOUSE_INVENTORY[`${sku}|${warehouse}`];
if (!inv) return { error: "SKU_NOT_FOUND" };
return {
sku, warehouse,
available: inv.available,
allocated: Math.min(quantity, inv.available),
backorder_qty: Math.max(0, quantity - inv.available),
};
}
// Check all warehouses
const results: any[] = [];
for (const [key, inv] of Object.entries(WAREHOUSE_INVENTORY)) {
const [s, w] = key.split("|");
if (s === sku) results.push({ warehouse: w, available: inv.available });
}
return { sku, warehouses: results };
}
export function createShipmentPlan(
poNumber: string, lineItems: any[], shippingPriority = "standard"
): any {
const shipments: any[] = [];
for (const item of lineItems) {
let bestWh: string | null = null;
const qty = item.qty ?? item.quantity ?? 0;
for (const [key, inv] of Object.entries(WAREHOUSE_INVENTORY)) {
const [s, w] = key.split("|");
if (s === item.sku && inv.available >= qty) { bestWh = w; break; }
}
if (bestWh) {
const shipDate = new Date();
shipDate.setDate(shipDate.getDate() + 2);
shipments.push({
warehouse: bestWh, skus: [item.sku],
value: qty * (item.unit_price ?? 0),
estimated_ship_date: shipDate.toISOString().slice(0, 10),
carrier: bestWh === "East" ? "UPS" : "FedEx",
});
}
}
const warehouses = new Set(shipments.map(s => s.warehouse));
const requiresSplit = warehouses.size > 1;
const total = shipments.reduce((s, sh) => s + sh.value, 0);
return {
plan_id: `FP-${poNumber}`, shipments, requires_split: requiresSplit,
total_value: total, human_approval_required: requiresSplit && total > 50000,
};
}
export function requestSplitShipmentApproval(
poNumber: string, planId: string, totalValue: number, reason: string
): any {
return {
approval_id: `APR-${poNumber}`,
queue_position: 1,
estimated_response_time: "15 minutes",
};
}
// ═══ EXCEPTION MONITOR ═════════════════════════════════════════
export function pollShipmentStatus(trackingNumbers: string[]): any[] {
return trackingNumbers.map(tn => ({
tracking_number: tn, status: "in-transit",
carrier: "UPS", eta: "2024-03-12",
}));
}
export function detectExceptions(shipmentStatuses: any[], originalPlan: any): any[] {
const exceptions: any[] = [];
for (const s of shipmentStatuses) {
if (s.status === "exception") {
exceptions.push({
type: "carrier_delay", severity: "medium",
tracking: s.tracking_number,
recommended_action: "Notify customer with revised ETA",
});
}
}
return exceptions;
}
// ═══ COMMUNICATION AGENT ═══════════════════════════════════════
export function draftOrderUpdate(
poNumber: string, updateType: string, details: any
): any {
const templates: Record<string, string> = {
confirmation: `Your order ${poNumber} has been confirmed and is being processed.`,
shipment: `Your order ${poNumber} has shipped! Tracking: ${details?.tracking ?? "N/A"}`,
delay: `We're writing to let you know about a delay on order ${poNumber}.`,
delivery: `Great news! Order ${poNumber} has been delivered.`,
};
return {
draft_id: `DRF-${poNumber}-${updateType}`,
subject: `Order Update: ${poNumber}`,
body: templates[updateType] ?? "Order update.",
urgency: updateType === "delay" ? "high" : "standard",
};
}
export function sendCustomerUpdate(
draftId: string, channel: string, customerContact: string
): any {
return {
message_id: `MSG-${draftId}`,
sent_at: new Date().toISOString(),
delivery_status: "delivered",
};
}
Complete Solution
"""pipeline.py — B2B Order Lifecycle Pipeline (Capstone 4-B)
Usage: export ANTHROPIC_API_KEY=... && python pipeline.py
Agents 1 (Intake) and 4 (Communication) use Claude API calls with tool use.
Agents 2 (Fulfillment) and 3 (Exception Monitor) are deterministic — they
don't need LLM reasoning because inventory allocation and status polling
are rule-based operations.
"""
import json, anthropic
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
# ── Time-Windowed Circuit Breaker ──────────────────────────────
@dataclass
class CarrierCircuitBreaker:
"""Trips after 5 failures within a 10-minute window."""
failures: list = field(default_factory=list) # timestamps
threshold: int = 5
window_minutes: int = 10
status: str = "healthy" # healthy | tripped | fallback
def record_failure(self):
now = datetime.now()
self.failures.append(now)
# Prune old failures outside the window
cutoff = now - timedelta(minutes=self.window_minutes)
self.failures = [f for f in self.failures if f > cutoff]
if len(self.failures) >= self.threshold:
self.status = "tripped"
print(f"[CIRCUIT BREAKER] TRIPPED! {len(self.failures)} failures in {self.window_minutes} min.")
print("[CIRCUIT BREAKER] Switching to fallback polling mode.")
def record_success(self):
# Don't clear — let the window expire naturally
pass
def is_tripped(self) -> bool:
return self.status == "tripped"
def reset(self):
self.failures.clear()
self.status = "healthy"
print("[CIRCUIT BREAKER] Reset to healthy.")
# ── Pipeline State ─────────────────────────────────────────────
@dataclass
class OrderPipelineState:
po_number: str
stage: str = "intake"
raw_po: dict = field(default_factory=dict)
intake_output: dict = field(default_factory=dict)
fulfillment_output: dict = field(default_factory=dict)
exception_output: dict = field(default_factory=dict)
communication_output: dict = field(default_factory=dict)
# ── HITL for Split Shipments ───────────────────────────────────
def split_shipment_review(state: OrderPipelineState) -> dict:
plan = state.fulfillment_output
print("\n" + "=" * 50)
print(" OPERATIONS MANAGER REVIEW REQUIRED")
print("=" * 50)
print(f" PO: {state.po_number}")
print(f" Total Value: ${plan.get('total_value', 0):,.2f}")
print(f" Split: {len(plan.get('shipments', []))} shipments")
for i, s in enumerate(plan.get("shipments", []), 1):
print(f" Ship {i}: {s['warehouse']} — SKUs: {s['skus']} — ${s['value']:,.2f}")
print(f"\n [1] Approve split [2] Wait for consolidation [3] Partial ship")
choice = input(" Decision (1-3): ").strip()
options = {"1": "approve_split", "2": "wait_consolidation", "3": "partial_ship"}
return {"decision": options.get(choice, "approve_split"),
"reviewer": "ops_manager_01",
"reviewed_at": datetime.now().isoformat()}
# ── HITL for Credit/Refund Resolutions ────────────────────────
def credit_resolution_review(state: OrderPipelineState, exception: dict) -> dict:
"""Requires ops manager approval when exception resolution involves credit/refund."""
print("\n" + "=" * 50)
print(" OPERATIONS MANAGER REVIEW REQUIRED — CREDIT/REFUND")
print("=" * 50)
print(f" PO: {state.po_number}")
print(f" Exception Type: {exception.get('type', 'unknown')}")
print(f" Severity: {exception.get('severity', 'unknown')}")
print(f" Recommended Action: {exception.get('recommended_action', 'N/A')}")
print(f"\n [1] Approve credit/refund [2] Deny [3] Escalate to finance")
choice = input(" Decision (1-3): ").strip()
options = {"1": "approve", "2": "deny", "3": "escalate_finance"}
return {"decision": options.get(choice, "approve"),
"reviewer": "ops_manager_01",
"reviewed_at": datetime.now().isoformat(),
"resolution_type": "credit_refund"}
# ── Main Pipeline ──────────────────────────────────────────────
def run_order_pipeline(raw_po: dict) -> OrderPipelineState:
from mock_tools import (validate_purchase_order, verify_customer_credit,
enrich_line_items, create_shipment_plan, poll_shipment_status,
detect_exceptions, draft_order_update, send_customer_update)
client = anthropic.Anthropic()
state = OrderPipelineState(po_number=raw_po["po_number"], raw_po=raw_po)
cb = CarrierCircuitBreaker()
# ── Agent 1: Intake (Claude-powered) ──────────────────────
print(f"\n[INTAKE] Processing {state.po_number}...")
intake_tools = [
{"name": "validate_order", "description": "Validate PO format and required fields",
"input_schema": {"type": "object", "properties": {"po_number": {"type": "string"}}, "required": ["po_number"]}},
{"name": "check_credit", "description": "Check customer credit availability",
"input_schema": {"type": "object", "properties": {"customer_id": {"type": "string"}, "order_total": {"type": "number"}}, "required": ["customer_id", "order_total"]}}
]
intake_msgs = [{"role": "user", "content": f"Validate this order and check credit: {json.dumps(raw_po, default=str)}"}]
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system="You are an order intake agent. Validate the order format and check customer credit. Use the available tools.",
tools=intake_tools, messages=intake_msgs
)
# Process tool calls in an agentic loop
while response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
if block.name == "validate_order":
result = validate_purchase_order(raw_po)
elif block.name == "check_credit":
result = verify_customer_credit(block.input["customer_id"], block.input["order_total"])
else:
result = {"error": f"Unknown tool: {block.name}"}
tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result, default=str)})
intake_msgs.append({"role": "assistant", "content": response.content})
intake_msgs.append({"role": "user", "content": tool_results})
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system="You are an order intake agent. Validate the order format and check customer credit.",
tools=intake_tools, messages=intake_msgs
)
# Extract Claude's text summary
intake_text = ""
for block in response.content:
if hasattr(block, "text"):
intake_text = block.text
break
# Check if validation/credit actually passed (from tool results)
validation = validate_purchase_order(raw_po)
if not validation.get("validated"):
print(f"[INTAKE] FAILED: {validation.get('errors')}")
state.stage = "error"
return state
order_total = sum(li.get("qty", 0) * li.get("unit_price", 0) for li in raw_po.get("line_items", []))
credit = verify_customer_credit(raw_po.get("customer", ""), order_total)
if not credit.get("credit_approved"):
print(f"[INTAKE] Credit check failed. Available: ${credit.get('available_credit', 0):,.2f}")
state.stage = "error"
return state
enriched = enrich_line_items(raw_po["line_items"], raw_po["customer"])
state.intake_output = {"validated": True, "credit_passed": True,
"line_items": enriched, "total_value": order_total,
"claude_summary": intake_text}
state.stage = "fulfillment"
print("[INTAKE] Claude validated order and extracted key fields.")
print("[INTAKE] Complete.")
# ── Agent 2: Fulfillment ───────────────────────────────────
print("[FULFILLMENT] Planning shipments...")
plan = create_shipment_plan(state.po_number, raw_po["line_items"])
state.fulfillment_output = plan
if plan.get("human_approval_required"):
override = split_shipment_review(state)
state.fulfillment_output["manager_override"] = override
if override["decision"] == "wait_consolidation":
print("[FULFILLMENT] Waiting for consolidation. Pipeline paused.")
state.stage = "awaiting_consolidation"
return state
state.stage = "monitoring"
print(f"[FULFILLMENT] Complete. Split: {plan.get('requires_split')}.")
# ── Agent 3: Exception Monitor ─────────────────────────────
print("[MONITOR] Checking shipment status...")
tracking_numbers = [s.get("tracking", "TRK-MOCK") for s in plan.get("shipments", [])]
if not cb.is_tripped():
try:
statuses = poll_shipment_status(tracking_numbers)
cb.record_success()
exceptions = detect_exceptions(statuses, plan)
state.exception_output = {"statuses": statuses, "exceptions": exceptions}
# HITL: credit/refund resolutions require ops manager approval
for exc in exceptions:
resolution_type = exc.get("recommended_action", "").lower()
requires_approval = ("credit" in resolution_type
or "refund" in resolution_type)
if requires_approval:
print(f"[MONITOR] Credit/refund resolution detected. Requesting ops manager approval.")
approval = credit_resolution_review(state, exc)
exc["manager_override"] = approval
if approval["decision"] == "deny":
exc["resolution_blocked"] = True
print("[MONITOR] Credit resolution denied by ops manager.")
except Exception:
cb.record_failure()
state.exception_output = {"fallback_mode": True}
else:
state.exception_output = {"fallback_mode": True,
"message": "Circuit breaker tripped. Using fallback polling."}
state.stage = "communication"
print("[MONITOR] Complete.")
# ── Agent 4: Communication (Claude-powered) ────────────────
print("[COMMUNICATION] Drafting updates...")
update_type = "shipment" if not state.exception_output.get("exceptions") else "delay"
comms_tools = [
{"name": "draft_update", "description": "Draft an order status email to the customer",
"input_schema": {"type": "object", "properties": {"po_number": {"type": "string"}, "update_type": {"type": "string", "enum": ["confirmation", "shipment", "delay", "delivery"]}, "details": {"type": "object"}}, "required": ["po_number", "update_type"]}},
{"name": "send_update", "description": "Send the drafted update to the customer",
"input_schema": {"type": "object", "properties": {"draft_id": {"type": "string"}, "channel": {"type": "string"}, "customer_contact": {"type": "string"}}, "required": ["draft_id", "channel", "customer_contact"]}}
]
comms_context = {"po_number": state.po_number, "update_type": update_type,
"tracking": tracking_numbers[0] if tracking_numbers else "N/A",
"customer": raw_po["customer"], "shipments": plan.get("shipments", [])}
comms_msgs = [{"role": "user", "content": f"Draft and send a {update_type} notification for this order: {json.dumps(comms_context, default=str)}"}]
comms_response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system="You are a customer communication agent for B2B orders. Draft professional, clear status updates. Use draft_update to create the message, then send_update to deliver it.",
tools=comms_tools, messages=comms_msgs
)
# Process tool calls in an agentic loop
draft_result = None
while comms_response.stop_reason == "tool_use":
tool_results = []
for block in comms_response.content:
if block.type == "tool_use":
if block.name == "draft_update":
result = draft_order_update(block.input["po_number"], block.input["update_type"], block.input.get("details", {}))
draft_result = result
elif block.name == "send_update":
result = send_customer_update(block.input["draft_id"], block.input["channel"], block.input["customer_contact"])
else:
result = {"error": f"Unknown tool: {block.name}"}
tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result, default=str)})
comms_msgs.append({"role": "assistant", "content": comms_response.content})
comms_msgs.append({"role": "user", "content": tool_results})
comms_response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system="You are a customer communication agent for B2B orders. Draft professional, clear status updates.",
tools=comms_tools, messages=comms_msgs
)
# Fallback if Claude didn't use tools
if not draft_result:
draft_result = draft_order_update(state.po_number, update_type,
{"tracking": tracking_numbers[0] if tracking_numbers else "N/A"})
send_customer_update(draft_result["draft_id"], "email", f"{raw_po['customer']}@example.com")
state.communication_output = {"draft_id": draft_result["draft_id"],
"sent_via": "email", "sent_at": datetime.now().isoformat()}
state.stage = "complete"
print("[COMMUNICATION] Claude drafted customer notification.")
print(f"[PIPELINE] Complete! Order {state.po_number} processed.")
return state
def main():
print("=" * 60)
print(" B2B Order Lifecycle Pipeline — Capstone 4-B")
print(" Type 'demo' for sample PO, 'quit' to exit.")
print("=" * 60)
sample = {"po_number": "PO-2024-11234", "customer": "Acme Manufacturing",
"line_items": [
{"sku": "SKU-4892", "qty": 50, "unit_price": 495.00},
{"sku": "SKU-7721", "qty": 200, "unit_price": 125.00}]}
while True:
cmd = input("\nCommand: ").strip()
if cmd.lower() in ("quit", "exit"): break
if cmd.lower() == "demo":
state = run_order_pipeline(sample)
print(f"\nFinal state:\n{json.dumps(asdict(state), indent=2, default=str)}")
if __name__ == "__main__":
main()
// pipeline.ts — B2B Order Lifecycle Pipeline (Capstone 4-B)
// Usage: ANTHROPIC_API_KEY=... npx ts-node pipeline.ts
import Anthropic from "@anthropic-ai/sdk";
import * as readline from "readline";
import {
validatePurchaseOrder,
verifyCustomerCredit,
enrichLineItems,
createShipmentPlan,
pollShipmentStatus,
detectExceptions,
draftOrderUpdate,
sendCustomerUpdate,
} from "./mock_tools";
// ── Types ────────────────────────────────────────────────────────
interface PipelineState {
po_number: string;
stage: string;
raw_po: Record<string, any>;
intake_output: Record<string, any>;
fulfillment_output: Record<string, any>;
exception_output: Record<string, any>;
communication_output: Record<string, any>;
}
interface CircuitBreakerState {
failures: number[]; // timestamps in ms
threshold: number;
windowMs: number; // 10 min = 600_000 ms
status: "healthy" | "tripped" | "fallback";
}
// ── Time-Windowed Circuit Breaker ────────────────────────────────
function createCircuitBreaker(): CircuitBreakerState {
return { failures: [], threshold: 5, windowMs: 10 * 60 * 1000, status: "healthy" };
}
function recordFailure(cb: CircuitBreakerState): void {
const now = Date.now();
cb.failures.push(now);
// Prune old failures outside the window
const cutoff = now - cb.windowMs;
cb.failures = cb.failures.filter(f => f > cutoff);
if (cb.failures.length >= cb.threshold) {
cb.status = "tripped";
console.log(
`[CIRCUIT BREAKER] TRIPPED! ${cb.failures.length} failures in ${cb.windowMs / 60000} min.`
);
console.log("[CIRCUIT BREAKER] Switching to fallback polling mode.");
}
}
function recordSuccess(_cb: CircuitBreakerState): void {
// Don't clear — let the window expire naturally
}
function resetCircuitBreaker(cb: CircuitBreakerState): void {
cb.failures = [];
cb.status = "healthy";
console.log("[CIRCUIT BREAKER] Reset to healthy.");
}
// ── HITL Helper (readline-based) ─────────────────────────────────
function askUser(prompt: string): Promise<string> {
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
return new Promise(resolve => {
rl.question(prompt, (answer: string) => { rl.close(); resolve(answer.trim()); });
});
}
// ── HITL for Split Shipments ─────────────────────────────────────
async function splitShipmentReview(
state: PipelineState
): Promise<Record<string, any>> {
const plan = state.fulfillment_output;
console.log("\n" + "=".repeat(50));
console.log(" OPERATIONS MANAGER REVIEW REQUIRED");
console.log("=".repeat(50));
console.log(` PO: ${state.po_number}`);
console.log(` Total Value: $${(plan.total_value ?? 0).toLocaleString("en-US", { minimumFractionDigits: 2 })}`);
console.log(` Split: ${(plan.shipments ?? []).length} shipments`);
(plan.shipments ?? []).forEach((s: any, i: number) => {
console.log(` Ship ${i + 1}: ${s.warehouse} — SKUs: ${JSON.stringify(s.skus)} — $${s.value.toLocaleString("en-US", { minimumFractionDigits: 2 })}`);
});
console.log("\n [1] Approve split [2] Wait for consolidation [3] Partial ship");
const choice = await askUser(" Decision (1-3): ");
const options: Record<string, string> = {
"1": "approve_split", "2": "wait_consolidation", "3": "partial_ship",
};
return {
decision: options[choice] ?? "approve_split",
reviewer: "ops_manager_01",
reviewed_at: new Date().toISOString(),
};
}
// ── HITL for Credit/Refund Resolutions ───────────────────────────
async function creditResolutionReview(
state: PipelineState, exception: Record<string, any>
): Promise<Record<string, any>> {
console.log("\n" + "=".repeat(50));
console.log(" OPERATIONS MANAGER REVIEW REQUIRED — CREDIT/REFUND");
console.log("=".repeat(50));
console.log(` PO: ${state.po_number}`);
console.log(` Exception Type: ${exception.type ?? "unknown"}`);
console.log(` Severity: ${exception.severity ?? "unknown"}`);
console.log(` Recommended Action: ${exception.recommended_action ?? "N/A"}`);
console.log("\n [1] Approve credit/refund [2] Deny [3] Escalate to finance");
const choice = await askUser(" Decision (1-3): ");
const options: Record<string, string> = {
"1": "approve", "2": "deny", "3": "escalate_finance",
};
return {
decision: options[choice] ?? "approve",
reviewer: "ops_manager_01",
reviewed_at: new Date().toISOString(),
resolution_type: "credit_refund",
};
}
// ── Main Pipeline ────────────────────────────────────────────────
async function runOrderPipeline(
rawPo: Record<string, any>
): Promise<PipelineState> {
const state: PipelineState = {
po_number: rawPo.po_number,
stage: "intake",
raw_po: rawPo,
intake_output: {},
fulfillment_output: {},
exception_output: {},
communication_output: {},
};
const cb = createCircuitBreaker();
// ── Agent 1: Intake ────────────────────────────────────────
console.log(`\n[INTAKE] Processing ${state.po_number}...`);
const validation = validatePurchaseOrder(rawPo);
if (!validation.validated) {
console.log(`[INTAKE] FAILED: ${JSON.stringify(validation.errors)}`);
state.stage = "error";
return state;
}
const orderValue = rawPo.line_items.reduce(
(sum: number, li: any) => sum + (li.qty ?? 0) * (li.unit_price ?? 0), 0
);
const credit = verifyCustomerCredit(rawPo.customer, orderValue);
if (!credit.credit_approved) {
console.log(
`[INTAKE] Credit check failed. Available: $${(credit.available_credit ?? 0).toLocaleString("en-US", { minimumFractionDigits: 2 })}`
);
state.stage = "error";
return state;
}
const enriched = enrichLineItems(rawPo.line_items, rawPo.customer);
state.intake_output = {
validated: true, credit_passed: true,
line_items: enriched, total_value: orderValue,
};
state.stage = "fulfillment";
console.log("[INTAKE] Complete.");
// ── Agent 2: Fulfillment ───────────────────────────────────
console.log("[FULFILLMENT] Planning shipments...");
const plan = createShipmentPlan(state.po_number, rawPo.line_items);
state.fulfillment_output = plan;
if (plan.human_approval_required) {
const override = await splitShipmentReview(state);
state.fulfillment_output.manager_override = override;
if (override.decision === "wait_consolidation") {
console.log("[FULFILLMENT] Waiting for consolidation. Pipeline paused.");
state.stage = "awaiting_consolidation";
return state;
}
}
state.stage = "monitoring";
console.log(`[FULFILLMENT] Complete. Split: ${plan.requires_split}.`);
// ── Agent 3: Exception Monitor ─────────────────────────────
console.log("[MONITOR] Checking shipment status...");
const trackingNumbers = (plan.shipments ?? []).map(
(s: any) => s.tracking ?? "TRK-MOCK"
);
if (cb.status !== "tripped") {
try {
const statuses = pollShipmentStatus(trackingNumbers);
recordSuccess(cb);
const exceptions = detectExceptions(statuses, plan);
state.exception_output = { statuses, exceptions };
// HITL: credit/refund resolutions require ops manager approval
for (const exc of exceptions) {
const resolutionType = (exc.recommended_action ?? "").toLowerCase();
const requiresApproval =
resolutionType.includes("credit") || resolutionType.includes("refund");
if (requiresApproval) {
console.log("[MONITOR] Credit/refund resolution detected. Requesting ops manager approval.");
const approval = await creditResolutionReview(state, exc);
exc.manager_override = approval;
if (approval.decision === "deny") {
exc.resolution_blocked = true;
console.log("[MONITOR] Credit resolution denied by ops manager.");
}
}
}
} catch {
recordFailure(cb);
state.exception_output = { fallback_mode: true };
}
} else {
state.exception_output = {
fallback_mode: true,
message: "Circuit breaker tripped. Using fallback polling.",
};
}
state.stage = "communication";
console.log("[MONITOR] Complete.");
// ── Agent 4: Communication ─────────────────────────────────
console.log("[COMMUNICATION] Drafting updates...");
const updateType = !(state.exception_output as any).exceptions?.length
? "shipment" : "delay";
const draft = draftOrderUpdate(state.po_number, updateType, {
tracking: trackingNumbers[0] ?? "N/A",
});
const notif = sendCustomerUpdate(
draft.draft_id, "email", `${rawPo.customer}@example.com`
);
state.communication_output = {
draft_id: draft.draft_id,
sent_via: "email",
sent_at: notif.sent_at,
};
state.stage = "complete";
console.log(`[PIPELINE] Complete! Order ${state.po_number} processed.`);
return state;
}
// ── Entry Point ──────────────────────────────────────────────────
async function main(): Promise<void> {
console.log("=".repeat(60));
console.log(" B2B Order Lifecycle Pipeline — Capstone 4-B");
console.log(" Type 'demo' for sample PO, 'quit' to exit.");
console.log("=".repeat(60));
const sample = {
po_number: "PO-2024-11234",
customer: "Acme Manufacturing",
line_items: [
{ sku: "SKU-4892", qty: 50, unit_price: 495.0 },
{ sku: "SKU-7721", qty: 200, unit_price: 125.0 },
],
};
while (true) {
const cmd = await askUser("\nCommand: ");
if (cmd.toLowerCase() === "quit" || cmd.toLowerCase() === "exit") break;
if (cmd.toLowerCase() === "demo") {
const state = await runOrderPipeline(sample);
console.log(`\nFinal state:\n${JSON.stringify(state, null, 2)}`);
}
}
}
main().catch(console.error);
You built a 4-agent pipeline with Claude API integration and production safety patterns: (1) Agent 1 (Intake) uses Claude with tool use to validate orders and check credit through an agentic loop; (2) Agent 4 (Communication) uses Claude with tool use to draft and send professional customer notifications; (3) Agents 2 and 3 are deterministic — inventory allocation and status polling don't need LLM reasoning; (4) HITL for split shipments — the pipeline pauses when a split exceeds $50K; (5) Time-windowed circuit breaker — only counts failures within a 10-minute sliding window, preventing false trips from infrequent transient errors.
Testing Guide
test_pipeline.py — Complete Test Suite
Create test_pipeline.py with these 5 pytest test cases. They use unittest.mock.patch to stub Claude API calls so tests run without an API key:
"""test_pipeline.py — 5 test scenarios for B2B Order Lifecycle Pipeline."""
import pytest
from unittest.mock import patch, MagicMock
from pipeline import (
run_order_pipeline, CarrierCircuitBreaker, OrderPipelineState
)
from mock_tools import PROCESSED_POS
def _stub_claude(*args, **kwargs):
"""Return a mock Claude response that skips tool use."""
mock_resp = MagicMock()
mock_resp.stop_reason = "end_turn"
text_block = MagicMock()
text_block.type = "text"
text_block.text = "Order validated successfully."
mock_resp.content = [text_block]
return mock_resp
@pytest.fixture(autouse=True)
def reset_processed_pos():
"""Clear duplicate-detection set between tests."""
PROCESSED_POS.clear()
yield
PROCESSED_POS.clear()
# ── Test 1: Happy path ──────────────────────────────────────
@patch("pipeline.anthropic.Anthropic")
def test_happy_path_standard_order(mock_anthropic_cls):
mock_client = MagicMock()
mock_client.messages.create.side_effect = _stub_claude
mock_anthropic_cls.return_value = mock_client
po = {"po_number": "PO-TEST-001", "customer": "Acme Manufacturing",
"line_items": [{"sku": "SKU-4892", "qty": 10, "unit_price": 100.00}]}
state = run_order_pipeline(po)
assert state.stage == "complete"
assert state.intake_output["validated"] is True
assert state.communication_output["sent_via"] == "email"
# ── Test 2: Credit check failure ─────────────────────────────
@patch("pipeline.anthropic.Anthropic")
def test_credit_check_failure(mock_anthropic_cls):
mock_client = MagicMock()
mock_client.messages.create.side_effect = _stub_claude
mock_anthropic_cls.return_value = mock_client
# Beta Industrial has only $2K available credit
po = {"po_number": "PO-TEST-002", "customer": "Beta Industrial",
"line_items": [{"sku": "SKU-4892", "qty": 50, "unit_price": 495.00}]}
state = run_order_pipeline(po)
assert state.stage == "error", "Pipeline should stop when credit is insufficient"
# ── Test 3: HITL escalation (high-value split >$50K) ─────────
@patch("builtins.input", return_value="1") # auto-approve
@patch("pipeline.anthropic.Anthropic")
def test_hitl_escalation_high_value(mock_anthropic_cls, mock_input):
mock_client = MagicMock()
mock_client.messages.create.side_effect = _stub_claude
mock_anthropic_cls.return_value = mock_client
po = {"po_number": "PO-TEST-003", "customer": "Acme Manufacturing",
"line_items": [
{"sku": "SKU-4892", "qty": 60, "unit_price": 495.00},
{"sku": "SKU-7721", "qty": 200, "unit_price": 125.00}]}
state = run_order_pipeline(po)
assert state.stage == "complete"
assert state.fulfillment_output.get("human_approval_required") is True
assert "manager_override" in state.fulfillment_output
# ── Test 4: Circuit breaker trip ────���────────────────────────
def test_circuit_breaker_trip():
cb = CarrierCircuitBreaker()
assert cb.status == "healthy"
for _ in range(5):
cb.record_failure()
assert cb.status == "tripped"
assert cb.is_tripped() is True
# ── Test 5: Invalid input (missing fields) ───────────────────
@patch("pipeline.anthropic.Anthropic")
def test_invalid_input_missing_fields(mock_anthropic_cls):
mock_client = MagicMock()
mock_client.messages.create.side_effect = _stub_claude
mock_anthropic_cls.return_value = mock_client
# Missing 'customer' and 'line_items' fields
po = {"po_number": "PO-TEST-005"}
state = run_order_pipeline(po)
assert state.stage == "error", "Pipeline should reject orders with missing fields"
Run:
Expected output:
Test Scenario Matrix
| Type | Scenario | Expected Behavior |
|---|---|---|
| HAPPY | Standard order, single warehouse | All 4 agents run, confirmation email sent, no HITL |
| HAPPY | Split shipment under $50K | Auto-approved, two shipment plans created, customer notified |
| HAPPY | Split shipment over $50K | Pipeline pauses for ops manager, manager approves, pipeline resumes |
| HAPPY | Shipment delay detected | Exception monitor catches it, communication agent sends delay notification |
| HAPPY | Full lifecycle through delivery | All stages logged, delivery confirmation sent |
| EDGE | Credit check fails (Beta Industrial, $48K outstanding) | Pipeline stops at intake with rejection |
| EDGE | Manager selects “wait for consolidation” | Pipeline pauses at fulfillment stage, no shipment created |
| EDGE | SKU discontinued mid-fulfillment | Exception monitor catches, suggests substitute |
| ADVERSARIAL | Carrier API fails 6 times in 10 minutes | Circuit breaker trips, switches to fallback polling, ops alerted |
| ADVERSARIAL | Duplicate PO submitted | Intake agent detects duplicate, prevents double-processing |
Verify Everything Works
Run the complete pipeline end-to-end with a single command:
python -c "
from pipeline import run_order_pipeline
from dataclasses import asdict
import json
sample = {'po_number': 'PO-2024-11234', 'customer': 'Acme Manufacturing',
'line_items': [
{'sku': 'SKU-4892', 'qty': 50, 'unit_price': 495.00},
{'sku': 'SKU-7721', 'qty': 200, 'unit_price': 125.00}]}
state = run_order_pipeline(sample)
print(json.dumps(asdict(state), indent=2, default=str))
" && pytest test_pipeline.py -v
python -c "from pipeline import run_order_pipeline; from dataclasses import asdict; import json; sample = {'po_number': 'PO-2024-11234', 'customer': 'Acme Manufacturing', 'line_items': [{'sku': 'SKU-4892', 'qty': 50, 'unit_price': 495.00}, {'sku': 'SKU-7721', 'qty': 200, 'unit_price': 125.00}]}; state = run_order_pipeline(sample); print(json.dumps(asdict(state), indent=2, default=str))" && pytest test_pipeline.py -v
Expected final output: The pipeline runs all 4 agents (with Claude API calls for Intake and Communication) on the sample PO, processes the split shipment, and completes. The pytest suite then runs all 5 tests (mocking Claude calls) and reports 5 passed.
You have built a complete multi-agent B2B order lifecycle pipeline with four specialized agents — two powered by Claude API calls with tool use (Intake and Communication) and two deterministic (Fulfillment and Exception Monitor). The pipeline includes a time-windowed circuit breaker that switches to fallback polling when the carrier API fails, and human-in-the-loop review for high-value split shipments and credit resolutions. This is the same architecture used in production order management systems — the only differences in a real deployment would be connecting to actual ERP, WMS, and carrier APIs instead of mock data.
Compliance Notes
A multi-agent order pipeline handles sensitive commercial data at every stage: customer credit limits, contract pricing tiers, inventory levels, and supplier quality information.
- Credit data protection: Customer credit limits and outstanding balances are highly sensitive. Never expose one customer’s credit data to another. Encrypt at rest.
- EDI audit trail: If POs arrive via EDI 850, every transformation (intake validation, enrichment) must be logged for EDI audit compliance.
- HITL decision logging: Operations manager split shipment and credit/refund resolution decisions are business records. Log decision, rationale, reviewer identity, and timestamp.
- Carrier API credentials: Carrier tracking API keys must be stored securely (environment variables or secrets manager), never in code or pipeline state.
- Circuit breaker alerts: When the breaker trips, the alert to operations must NOT include order details or customer names — only the failure count and affected service.
Each pipeline run makes 2 Claude API calls (Agent 1: Intake validation, Agent 4: Communication drafting). Agents 2 and 3 are deterministic — no LLM needed for inventory allocation or status polling. Typical consumption: 3,000–6,000 tokens total. At Sonnet pricing, budget ~$0.02–$0.04 per order. For 200 orders/day: $4–$8/day. The HITL path adds human time but no additional API cost. The circuit breaker in fallback mode reduces carrier API cost (fewer polling calls).
Troubleshooting
Common Errors
ModuleNotFoundError: No module named 'anthropic' — Your virtual environment is not activated. Run source venv/bin/activate (Unix) or venv\Scripts\activate (Windows).
ModuleNotFoundError: No module named 'mock_tools' — You are running the script from the wrong directory. Make sure you are inside order-lifecycle-pipeline/ where both pipeline.py and mock_tools.py exist.
AuthenticationError: 401 — Your ANTHROPIC_API_KEY is not set or is invalid. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify.
RateLimitError: 429 — You are sending too many requests. Wait a few seconds and retry. For testing, the pipeline uses mock tools so API calls are minimal.
Credit check always fails — Make sure the customer name in your PO exactly matches a key in CUSTOMER_CREDIT (e.g., "Acme Manufacturing", not "Acme"). The lookup is case-sensitive.
HITL never triggers — The approval check requires both conditions: requires_split == True AND total_value > 50000. If your test PO total is under $50K, HITL will be skipped even on a split shipment.
Circuit breaker doesn't trip — Ensure you are recording failures in rapid succession (within 10 minutes). If you add time.sleep() between failures for testing, the window may expire before reaching the threshold.
Duplicate PO not detected — The PROCESSED_POS set is module-level. If you restart the Python process between tests, the set is empty. Duplicate detection only works within a single process run.
Going Further
- [OPTIONAL] Async queue-based processing — Replace synchronous orchestration with Redis/BullMQ queues. Each agent processes its queue independently.
- [OPTIONAL] Carrier webhook integration — Instead of polling, receive carrier push notifications via webhooks for real-time tracking updates.
- [OPTIONAL] Customer-tier-aware communication — Gold customers get phone call + email for delays; Bronze gets email only. Adjust channel and urgency per tier.
- [OPTIONAL] SLA violation detection — Automatically detect when delivery exceeds the contract SLA commitment and calculate credit amounts.
- [OPTIONAL] Dashboard (M20) — Build a real-time dashboard showing pipeline stage distribution, HITL queue depth, circuit breaker status, and exception rates.
- [OPTIONAL] Multi-carrier routing — Add logic to select the optimal carrier per shipment based on cost, speed, and reliability metrics.