Building AI Agents with Claude
Capstone Project 4
Capstone 4 of 5 2–3 hours Domain B — B2B Ecommerce
← Capstone 3-B: Exception Resolution 🏠 Home M23: Capstone Guide →

Capstone 4 — Domain B: B2B Order Lifecycle Pipeline

Build a 4-agent pipeline that manages the full B2B order lifecycle: intake validation, fulfillment planning with HITL for high-value splits, exception monitoring with carrier API circuit breaker, and proactive customer communication.

Project Brief

Business Context

A B2B distributor processes 200+ purchase orders daily. In Capstone 3, you built a ReAct agent that investigates individual exceptions. But the full order lifecycle — from PO arrival through delivery confirmation — involves four distinct operational stages, each requiring different skills and data sources. A single agent handling everything creates a monolith that can’t scale, can’t be independently tested, and can’t be safely modified without risk to the entire pipeline.

The production solution: four specialized agents in a pipeline. The Order Intake AgentValidates incoming POs (format, credit check, pricing verification), detects duplicates, and enriches line items with current catalog data. Acts as the gateway — garbage in at this stage means errors downstream. validates and enriches POs. The Fulfillment Planning AgentChecks inventory across warehouses, determines shipping strategy (single vs. split shipment), and requests operations manager approval for high-value split shipments exceeding $50,000. allocates inventory and plans shipments. The Exception Monitor AgentContinuously watches shipment status, detects anomalies (delays, quality holds, missing scans), and triggers resolution workflows. Protected by a circuit breaker that trips if the carrier API fails repeatedly. watches for delays and issues. The Customer Communication AgentDrafts and sends proactive status updates, exception notifications, and delivery confirmations. Matches communication tone and channel to customer tier and SLA commitments. sends proactive updates.

Two production patterns make this pipeline trustworthy: HITL where the operations manager approves split shipments over $50K or exception resolutions involving credit issuance, and a circuit breaker that trips when the carrier tracking API fails 5+ times in 10 minutes, switching to fallback polling mode.

What You Will Build
  • Agent 1 — Order Intake: Validates PO format, checks customer credit, enriches line items with contract pricing. Input guardrails: duplicate PO detection, schema validation.
  • Agent 2 — Fulfillment Planning: Checks multi-warehouse inventory, creates shipment plans. If split shipment >$50K, pauses for operations manager HITL approval.
  • Agent 3 — Exception Monitor: Polls carrier tracking APIs, detects delays/exceptions. Circuit breaker: 5 carrier failures in 10 min → fallback polling mode. If resolution involves credit/refund issuance, requires ops manager HITL approval.
  • Agent 4 — Customer Communication: Drafts proactive updates (confirmation, shipment, delay, delivery). Output guardrails: SLA compliance check, tone matching per customer tier.

Skills practiced: Multi-agent architecture (M14), HITL (M17), guardrails (M16–M17), circuit breaker, evaluation (M18), structured inter-agent state passing.

Prerequisites

Complete these modules before starting this capstone:

  • M03–M05 — Tool Use Fundamentals: You need tool definitions, structured outputs, and basic tool orchestration. Every agent in this pipeline uses tool calls.
  • M09 — RAG (Retrieval-Augmented Generation): Agent 2 retrieves product catalog and contract data to verify pricing. Understanding RAG patterns helps you extend the pipeline later.
  • M12 — ReAct Pattern: Each agent internally uses an observe → think → act loop. The Exception Monitor agent is essentially a ReAct agent that reasons about anomalies.
  • M14 — Multi-Agent Systems: The 4-agent pipeline architecture, structured state passing between agents, and agent handoff patterns come directly from M14.
  • M16 — Input Guardrails: Agent 1 uses input validation (PO schema checks, duplicate detection) and Agent 4 applies output guardrails (SLA compliance, tone matching). M16 teaches these patterns.
  • M17 — Human-in-the-Loop & Guardrails: The HITL approval flow for split shipments and credit resolutions, plus the circuit breaker pattern, are covered in M17.
  • M18 — Evaluation: The pytest-based test suite at the end of the capstone uses M18’s scoring + harness patterns to validate end-to-end accuracy.

You should also be comfortable with Python classes, JSON data structures, and running scripts from the command line.

Environment Setup

What You'll Build

A multi-agent B2B order lifecycle pipeline with four specialized agents, human-in-the-loop escalation for high-value splits and credit resolutions, and a time-windowed circuit breaker for carrier API failures.

Time estimate: 2–3 hours   Difficulty: ★★★★☆

Requirements

Install Everything

Copy and paste this entire block into your terminal:

mkdir order-lifecycle-pipeline && cd order-lifecycle-pipeline
python3 -m venv venv && source venv/bin/activate
pip install "anthropic>=0.30.0" pytest
export ANTHROPIC_API_KEY=your-key-here
mkdir order-lifecycle-pipeline && cd order-lifecycle-pipeline
python -m venv venv && venv\Scripts\activate
pip install "anthropic>=0.30.0" pytest
set ANTHROPIC_API_KEY=your-key-here
Checkpoint

Run python -c "import anthropic; print('OK')". You should see OK. If you see ModuleNotFoundError, make sure your virtual environment is activated.

File Structure

Here is every file you will create in this capstone. The solution consolidates all agents, state, and circuit breaker logic into pipeline.py for simplicity — no separate agents/ directory needed.

order-lifecycle-pipeline/
├── mock_tools.py           # All mock tool implementations (ERP, inventory, carrier, messaging)
├── pipeline.py             # All 4 agents + state + circuit breaker + orchestrator
└── test_pipeline.py        # Test suite (5 scenarios, pytest)

Domain Glossary

Split Shipment
When an order is fulfilled from multiple warehouses due to inventory distribution. Creates multiple tracking numbers. Shipments over $50K require operations manager approval.
Circuit Breaker
Safety mechanism that trips after 5 carrier API failures in 10 minutes. Switches to fallback polling mode (check every 30 min instead of real-time). Prevents cascade failures.
EDI 850
Electronic Data Interchange format for Purchase Orders. Standardized B2B document format. The Intake Agent must parse and validate EDI-formatted POs alongside email and portal submissions.
Credit Check
Verification that the customer's account has sufficient credit for the order. Available credit = credit limit - outstanding balance. Failed credit checks block order processing.
Backorder
When a line item's requested quantity exceeds available inventory. The item is placed on backorder with an estimated restock date. May trigger a split shipment.
Customer Tier
Classification (Gold/Silver/Bronze) determining communication priority, SLA commitments, and escalation thresholds. Gold customers get proactive updates; Bronze gets standard notifications.
Fallback Polling
When the circuit breaker trips, the system switches from real-time carrier API calls to periodic batch polling (every 30 minutes). Degraded but functional monitoring.
Pipeline State
A typed object flowing through all 4 agents. Each agent reads predecessor outputs and writes its own. The single source of truth for order processing status.

Architecture

4-Agent Pipeline — B2B Order Lifecycle
📥
Order Intake
validate, credit, enrich
🚚
Fulfillment Planning
inventory, plan, HITL
🔎
Exception Monitor
poll, detect, circuit brk
✉️
Customer Comms
draft, send, SLA check
HITL — Split Shipment Approval (>$50K)
FULFILLMENTSplit required: SKU-4892 at East, SKU-7721 at West. Total: $62,000.
PAUSETotal >$50K threshold. Submitting to operations manager queue.
OPS MGRReviews: Ship 1 ($24,750 East) + Ship 2 ($25,000 West, +3 days)
OPTIONSApprove split | Wait for consolidation | Partial ship + cancel backorder
RESUMEManager approves split. Pipeline resumes with 2-shipment plan.
Circuit Breaker — Carrier API (5 failures / 10 min)
0
Failures
Threshold: 5 in 10 min
HEALTHY

Mock Data Specification

{
  "po_number": "PO-2024-11234",
  "stage": "intake",
  "created_at": "2024-03-10T08:00:00Z",

  "intake_output": {
    "validated": true,
    "customer_verified": true,
    "credit_check_passed": true,
    "duplicate_detected": false,
    "line_items_enriched": [
      {"sku": "SKU-4892", "qty": 50, "unit_price": 495.00,
       "contract_price_verified": true, "available": true},
      {"sku": "SKU-7721", "qty": 200, "unit_price": 125.00,
       "contract_price_verified": true, "available": false}
    ],
    "total_value": 49750.00,
    "priority": "standard"
  },

  "fulfillment_output": {
    "plan_id": "FP-2024-11234",
    "requires_split": true,
    "shipments": [
      {"warehouse": "East", "skus": ["SKU-4892"], "value": 24750.00,
       "estimated_ship_date": "2024-03-12", "carrier": "UPS"},
      {"warehouse": "West", "skus": ["SKU-7721"], "value": 25000.00,
       "estimated_ship_date": "2024-03-15", "carrier": "FedEx"}
    ],
    "total_value": 49750.00,
    "human_approval_required": false
  },

  "exception_output": null,
  "communication_output": null,

  "circuit_breaker": {
    "carrier_api_failures": 0,
    "window_start": null,
    "threshold": 5,
    "window_minutes": 10,
    "status": "healthy"
  }
}

Step-by-Step Build Guide

Now you understand the architecture and data flow. Let's build it piece by piece. Each step creates one file, gives you a command to test it, and shows the expected output. Follow the steps in order — later steps depend on earlier ones.

Step 1: Create the Mock Tools (mock_tools.py)

What & Why: All four agents need mock tool functions (ERP queries, inventory checks, carrier tracking, messaging). Building them first means you can test each agent independently. The full mock tools code is shown in the Mock Tool Implementations section below.

Create the file mock_tools.py using the code in the Mock Tool Implementations section.

Run:

python -c " from mock_tools import validate_purchase_order, verify_customer_credit po = {'po_number': 'PO-TEST', 'customer': 'Acme', 'line_items': []} print(validate_purchase_order(po)) print(verify_customer_credit('Acme Manufacturing', 50000)) "

Expected output:

{'validated': True, 'errors': [], 'normalized_po': {'po_number': 'PO-TEST', 'customer': 'Acme', 'line_items': []}} {'credit_approved': True, 'credit_limit': 100000, 'available_credit': 65000, 'payment_terms': 'Net 45'}
Checkpoint

If both outputs show validated: True and credit_approved: True, Step 1 is working. If you see CUSTOMER_NOT_FOUND, make sure you're passing the exact customer name "Acme Manufacturing".

Step 2: Build the Complete Pipeline (pipeline.py)

What & Why: The pipeline file contains everything: the OrderPipelineState dataclass, the time-windowed CarrierCircuitBreaker, all four agent functions (Intake with Claude reasoning, Fulfillment, Exception Monitor, Communication with Claude drafting), HITL approval flows, and the orchestrator that wires them together. Consolidating into one file keeps the project simple — no import headaches, no package structure to manage.

Create pipeline.py using the complete solution code in the Complete Solution section below.

Run:

python pipeline.py

Then type demo at the prompt.

Expected output:

[INTAKE] Processing PO-2024-11234... [INTAKE] Claude validated order and extracted key fields. [INTAKE] Complete. [FULFILLMENT] Planning shipments... [FULFILLMENT] Complete. Split: True. [MONITOR] Checking shipment status... [MONITOR] Complete. [COMMUNICATION] Drafting updates... [COMMUNICATION] Claude drafted customer notification. [PIPELINE] Complete! Order PO-2024-11234 processed.
Checkpoint

If you see [PIPELINE] Complete!, the orchestrator is working end-to-end. If the pipeline stops at [INTAKE] FAILED, check that the sample PO uses customer name "Acme Manufacturing" (must match the credit database). If you see AuthenticationError, verify your ANTHROPIC_API_KEY is set correctly.

Step 3: Test HITL for Split Shipments & Credit Resolutions

What & Why: When the fulfillment plan requires a split shipment exceeding $50K, the pipeline pauses. Present the operations manager with three options: approve split, wait for consolidation, or partial ship with backorder cancellation. Additionally, when the Exception Monitor’s resolution involves issuing a credit or refund, the pipeline pauses for ops manager approval before proceeding.

The HITL functions (split_shipment_review and credit_resolution_review) are already included in pipeline.py. Test by modifying the sample PO to have a total value over $50K:

Run (to trigger HITL):

python -c " from pipeline import run_order_pipeline po = {'po_number': 'PO-HITL-TEST', 'customer': 'Acme Manufacturing', 'line_items': [ {'sku': 'SKU-4892', 'qty': 60, 'unit_price': 495.00}, {'sku': 'SKU-7721', 'qty': 200, 'unit_price': 125.00}]} run_order_pipeline(po) "

Expected behavior: If the total value exceeds $50K and a split shipment is required, the pipeline pauses and prompts for operations manager approval. Enter 1 to approve the split.

Checkpoint

If you see OPERATIONS MANAGER REVIEW REQUIRED, HITL is working. If the pipeline runs straight through without pausing, check the human_approval_required logic: it should be requires_split AND total > 50000.

Step 4: Create the Test Suite (test_pipeline.py)

What & Why: The test suite runs 5 scenarios covering happy path, edge cases, and adversarial inputs. The complete test code is in the Testing Guide section below.

Run:

pytest test_pipeline.py -v

Expected output:

test_pipeline.py::test_happy_path_standard_order PASSED test_pipeline.py::test_credit_check_failure PASSED test_pipeline.py::test_hitl_escalation_high_value PASSED test_pipeline.py::test_circuit_breaker_trip PASSED test_pipeline.py::test_invalid_input_missing_fields PASSED ========================= 5 passed =========================
Checkpoint

If all 5 tests pass, your pipeline is production-ready. If a test fails, check the Testing Guide table for expected behavior and compare against your implementation.

Mock Tool Implementations

"""mock_tools.py — Tools for all 4 pipeline agents."""

from datetime import datetime, timedelta

# ═══ INTAKE AGENT ══════════════════════════════════════════════

CUSTOMER_CREDIT = {
    "Acme Manufacturing": {"limit": 100000, "outstanding": 35000, "terms": "Net 45"},
    "Beta Industrial": {"limit": 50000, "outstanding": 48000, "terms": "Net 30"},
}
PROCESSED_POS = set()

def validate_purchase_order(raw_po: dict) -> dict:
    required = ["po_number", "customer", "line_items"]
    missing = [f for f in required if f not in raw_po]
    if missing:
        return {"validated": False, "errors": missing}
    if raw_po["po_number"] in PROCESSED_POS:
        return {"validated": False, "errors": ["DUPLICATE_PO"]}
    PROCESSED_POS.add(raw_po["po_number"])
    return {"validated": True, "errors": [], "normalized_po": raw_po}

def verify_customer_credit(customer_id: str, order_value: float) -> dict:
    cust = CUSTOMER_CREDIT.get(customer_id)
    if not cust:
        return {"error": "CUSTOMER_NOT_FOUND"}
    available = cust["limit"] - cust["outstanding"]
    return {"credit_approved": available >= order_value,
            "credit_limit": cust["limit"], "available_credit": available,
            "payment_terms": cust["terms"]}

def enrich_line_items(line_items: list, customer_id: str) -> list:
    """Add contract pricing and availability to each line item."""
    enriched = []
    for item in line_items:
        enriched.append({**item,
            "contract_price_verified": True,
            "current_availability": item["sku"] != "SKU-7721",
            "lead_time": 14 if item["sku"] == "SKU-4892" else 7})
    return enriched

# ═══ FULFILLMENT AGENT ═════════════════════════════════════════

WAREHOUSE_INVENTORY = {
    ("SKU-4892", "East"): {"available": 120, "reserved": 0},
    ("SKU-4892", "West"): {"available": 30, "reserved": 0},
    ("SKU-7721", "East"): {"available": 0, "reserved": 0},
    ("SKU-7721", "West"): {"available": 250, "reserved": 0},
}

def check_inventory_allocation(sku: str, quantity: int, warehouse: str = None) -> dict:
    if warehouse:
        inv = WAREHOUSE_INVENTORY.get((sku, warehouse))
        if not inv:
            return {"error": "SKU_NOT_FOUND"}
        return {"sku": sku, "warehouse": warehouse,
                "available": inv["available"], "allocated": min(quantity, inv["available"]),
                "backorder_qty": max(0, quantity - inv["available"])}
    # Check all warehouses
    results = []
    for (s, w), inv in WAREHOUSE_INVENTORY.items():
        if s == sku:
            results.append({"warehouse": w, "available": inv["available"]})
    return {"sku": sku, "warehouses": results}

def create_shipment_plan(po_number: str, line_items: list,
                          shipping_priority: str = "standard") -> dict:
    shipments = []
    for item in line_items:
        best_wh = None
        for (s, w), inv in WAREHOUSE_INVENTORY.items():
            if s == item["sku"] and inv["available"] >= item.get("qty", item.get("quantity", 0)):
                best_wh = w
                break
        if best_wh:
            shipments.append({"warehouse": best_wh, "skus": [item["sku"]],
                "value": item.get("qty", item.get("quantity", 0)) * item.get("unit_price", 0),
                "estimated_ship_date": (datetime.now() + timedelta(days=2)).strftime("%Y-%m-%d"),
                "carrier": "UPS" if best_wh == "East" else "FedEx"})
    requires_split = len(set(s["warehouse"] for s in shipments)) > 1
    total = sum(s["value"] for s in shipments)
    return {"plan_id": f"FP-{po_number}", "shipments": shipments,
            "requires_split": requires_split, "total_value": total,
            "human_approval_required": requires_split and total > 50000}

def request_split_shipment_approval(po_number: str, plan_id: str,
                                     total_value: float, reason: str) -> dict:
    return {"approval_id": f"APR-{po_number}",
            "queue_position": 1, "estimated_response_time": "15 minutes"}

# ═══ EXCEPTION MONITOR ═════════════════════════════════════════

def poll_shipment_status(tracking_numbers: list) -> list:
    statuses = []
    for tn in tracking_numbers:
        statuses.append({"tracking_number": tn, "status": "in-transit",
                         "carrier": "UPS", "eta": "2024-03-12"})
    return statuses

def detect_exceptions(shipment_statuses: list, original_plan: dict) -> list:
    exceptions = []
    for s in shipment_statuses:
        if s.get("status") == "exception":
            exceptions.append({"type": "carrier_delay",
                "severity": "medium", "tracking": s["tracking_number"],
                "recommended_action": "Notify customer with revised ETA"})
    return exceptions

# ═══ COMMUNICATION AGENT ═══════════════════════════════════════

def draft_order_update(po_number: str, update_type: str, details: dict) -> dict:
    templates = {
        "confirmation": f"Your order {po_number} has been confirmed and is being processed.",
        "shipment": f"Your order {po_number} has shipped! Tracking: {details.get('tracking', 'N/A')}",
        "delay": f"We're writing to let you know about a delay on order {po_number}.",
        "delivery": f"Great news! Order {po_number} has been delivered.",
    }
    return {"draft_id": f"DRF-{po_number}-{update_type}",
            "subject": f"Order Update: {po_number}",
            "body": templates.get(update_type, "Order update."),
            "urgency": "high" if update_type == "delay" else "standard"}

def send_customer_update(draft_id: str, channel: str, customer_contact: str) -> dict:
    return {"message_id": f"MSG-{draft_id}",
            "sent_at": datetime.now().isoformat(),
            "delivery_status": "delivered"}
// mock_tools.ts — Tools for all 4 pipeline agents.

// ═══ INTAKE AGENT ══════════════════════════════════════════════

const CUSTOMER_CREDIT: Record<string, any> = {
  "Acme Manufacturing": { limit: 100000, outstanding: 35000, terms: "Net 45" },
  "Beta Industrial": { limit: 50000, outstanding: 48000, terms: "Net 30" },
};
const PROCESSED_POS = new Set<string>();

export function validatePurchaseOrder(rawPo: any): any {
  const required = ["po_number", "customer", "line_items"];
  const missing = required.filter(f => !rawPo[f]);
  if (missing.length) return { validated: false, errors: missing };
  if (PROCESSED_POS.has(rawPo.po_number)) {
    return { validated: false, errors: ["DUPLICATE_PO"] };
  }
  PROCESSED_POS.add(rawPo.po_number);
  return { validated: true, errors: [], normalized_po: rawPo };
}

export function verifyCustomerCredit(customerId: string, orderValue: number): any {
  const cust = CUSTOMER_CREDIT[customerId];
  if (!cust) return { error: "CUSTOMER_NOT_FOUND" };
  const available = cust.limit - cust.outstanding;
  return {
    credit_approved: available >= orderValue,
    credit_limit: cust.limit,
    available_credit: available,
    payment_terms: cust.terms,
  };
}

export function enrichLineItems(lineItems: any[], customerId: string): any[] {
  /** Add contract pricing and availability to each line item. */
  return lineItems.map(item => ({
    ...item,
    contract_price_verified: true,
    current_availability: item.sku !== "SKU-7721",
    lead_time: item.sku === "SKU-4892" ? 14 : 7,
  }));
}

// ═══ FULFILLMENT AGENT ═════════════════════════════════════════

const WAREHOUSE_INVENTORY: Record<string, { available: number; reserved: number }> = {
  "SKU-4892|East": { available: 120, reserved: 0 },
  "SKU-4892|West": { available: 30, reserved: 0 },
  "SKU-7721|East": { available: 0, reserved: 0 },
  "SKU-7721|West": { available: 250, reserved: 0 },
};

export function checkInventoryAllocation(
  sku: string, quantity: number, warehouse?: string
): any {
  if (warehouse) {
    const inv = WAREHOUSE_INVENTORY[`${sku}|${warehouse}`];
    if (!inv) return { error: "SKU_NOT_FOUND" };
    return {
      sku, warehouse,
      available: inv.available,
      allocated: Math.min(quantity, inv.available),
      backorder_qty: Math.max(0, quantity - inv.available),
    };
  }
  // Check all warehouses
  const results: any[] = [];
  for (const [key, inv] of Object.entries(WAREHOUSE_INVENTORY)) {
    const [s, w] = key.split("|");
    if (s === sku) results.push({ warehouse: w, available: inv.available });
  }
  return { sku, warehouses: results };
}

export function createShipmentPlan(
  poNumber: string, lineItems: any[], shippingPriority = "standard"
): any {
  const shipments: any[] = [];
  for (const item of lineItems) {
    let bestWh: string | null = null;
    const qty = item.qty ?? item.quantity ?? 0;
    for (const [key, inv] of Object.entries(WAREHOUSE_INVENTORY)) {
      const [s, w] = key.split("|");
      if (s === item.sku && inv.available >= qty) { bestWh = w; break; }
    }
    if (bestWh) {
      const shipDate = new Date();
      shipDate.setDate(shipDate.getDate() + 2);
      shipments.push({
        warehouse: bestWh, skus: [item.sku],
        value: qty * (item.unit_price ?? 0),
        estimated_ship_date: shipDate.toISOString().slice(0, 10),
        carrier: bestWh === "East" ? "UPS" : "FedEx",
      });
    }
  }
  const warehouses = new Set(shipments.map(s => s.warehouse));
  const requiresSplit = warehouses.size > 1;
  const total = shipments.reduce((s, sh) => s + sh.value, 0);
  return {
    plan_id: `FP-${poNumber}`, shipments, requires_split: requiresSplit,
    total_value: total, human_approval_required: requiresSplit && total > 50000,
  };
}

export function requestSplitShipmentApproval(
  poNumber: string, planId: string, totalValue: number, reason: string
): any {
  return {
    approval_id: `APR-${poNumber}`,
    queue_position: 1,
    estimated_response_time: "15 minutes",
  };
}

// ═══ EXCEPTION MONITOR ═════════════════════════════════════════

export function pollShipmentStatus(trackingNumbers: string[]): any[] {
  return trackingNumbers.map(tn => ({
    tracking_number: tn, status: "in-transit",
    carrier: "UPS", eta: "2024-03-12",
  }));
}

export function detectExceptions(shipmentStatuses: any[], originalPlan: any): any[] {
  const exceptions: any[] = [];
  for (const s of shipmentStatuses) {
    if (s.status === "exception") {
      exceptions.push({
        type: "carrier_delay", severity: "medium",
        tracking: s.tracking_number,
        recommended_action: "Notify customer with revised ETA",
      });
    }
  }
  return exceptions;
}

// ═══ COMMUNICATION AGENT ═══════════════════════════════════════

export function draftOrderUpdate(
  poNumber: string, updateType: string, details: any
): any {
  const templates: Record<string, string> = {
    confirmation: `Your order ${poNumber} has been confirmed and is being processed.`,
    shipment: `Your order ${poNumber} has shipped! Tracking: ${details?.tracking ?? "N/A"}`,
    delay: `We're writing to let you know about a delay on order ${poNumber}.`,
    delivery: `Great news! Order ${poNumber} has been delivered.`,
  };
  return {
    draft_id: `DRF-${poNumber}-${updateType}`,
    subject: `Order Update: ${poNumber}`,
    body: templates[updateType] ?? "Order update.",
    urgency: updateType === "delay" ? "high" : "standard",
  };
}

export function sendCustomerUpdate(
  draftId: string, channel: string, customerContact: string
): any {
  return {
    message_id: `MSG-${draftId}`,
    sent_at: new Date().toISOString(),
    delivery_status: "delivered",
  };
}

Complete Solution

The orchestrator runs agents sequentially, checking the circuit breaker and HITL conditions at each transition. The time-windowed circuit breaker is the key architectural difference from Capstone 3-B’s simple counter.
"""pipeline.py — B2B Order Lifecycle Pipeline (Capstone 4-B)

Usage: export ANTHROPIC_API_KEY=... && python pipeline.py

Agents 1 (Intake) and 4 (Communication) use Claude API calls with tool use.
Agents 2 (Fulfillment) and 3 (Exception Monitor) are deterministic — they
don't need LLM reasoning because inventory allocation and status polling
are rule-based operations.
"""

import json, anthropic
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict

# ── Time-Windowed Circuit Breaker ──────────────────────────────
@dataclass
class CarrierCircuitBreaker:
    """Trips after 5 failures within a 10-minute window."""
    failures: list = field(default_factory=list)  # timestamps
    threshold: int = 5
    window_minutes: int = 10
    status: str = "healthy"  # healthy | tripped | fallback

    def record_failure(self):
        now = datetime.now()
        self.failures.append(now)
        # Prune old failures outside the window
        cutoff = now - timedelta(minutes=self.window_minutes)
        self.failures = [f for f in self.failures if f > cutoff]
        if len(self.failures) >= self.threshold:
            self.status = "tripped"
            print(f"[CIRCUIT BREAKER] TRIPPED! {len(self.failures)} failures in {self.window_minutes} min.")
            print("[CIRCUIT BREAKER] Switching to fallback polling mode.")

    def record_success(self):
        # Don't clear — let the window expire naturally
        pass

    def is_tripped(self) -> bool:
        return self.status == "tripped"

    def reset(self):
        self.failures.clear()
        self.status = "healthy"
        print("[CIRCUIT BREAKER] Reset to healthy.")


# ── Pipeline State ─────────────────────────────────────────────
@dataclass
class OrderPipelineState:
    po_number: str
    stage: str = "intake"
    raw_po: dict = field(default_factory=dict)
    intake_output: dict = field(default_factory=dict)
    fulfillment_output: dict = field(default_factory=dict)
    exception_output: dict = field(default_factory=dict)
    communication_output: dict = field(default_factory=dict)


# ── HITL for Split Shipments ───────────────────────────────────
def split_shipment_review(state: OrderPipelineState) -> dict:
    plan = state.fulfillment_output
    print("\n" + "=" * 50)
    print("  OPERATIONS MANAGER REVIEW REQUIRED")
    print("=" * 50)
    print(f"  PO: {state.po_number}")
    print(f"  Total Value: ${plan.get('total_value', 0):,.2f}")
    print(f"  Split: {len(plan.get('shipments', []))} shipments")
    for i, s in enumerate(plan.get("shipments", []), 1):
        print(f"    Ship {i}: {s['warehouse']} — SKUs: {s['skus']} — ${s['value']:,.2f}")
    print(f"\n  [1] Approve split  [2] Wait for consolidation  [3] Partial ship")
    choice = input("  Decision (1-3): ").strip()
    options = {"1": "approve_split", "2": "wait_consolidation", "3": "partial_ship"}
    return {"decision": options.get(choice, "approve_split"),
            "reviewer": "ops_manager_01",
            "reviewed_at": datetime.now().isoformat()}


# ── HITL for Credit/Refund Resolutions ────────────────────────
def credit_resolution_review(state: OrderPipelineState, exception: dict) -> dict:
    """Requires ops manager approval when exception resolution involves credit/refund."""
    print("\n" + "=" * 50)
    print("  OPERATIONS MANAGER REVIEW REQUIRED — CREDIT/REFUND")
    print("=" * 50)
    print(f"  PO: {state.po_number}")
    print(f"  Exception Type: {exception.get('type', 'unknown')}")
    print(f"  Severity: {exception.get('severity', 'unknown')}")
    print(f"  Recommended Action: {exception.get('recommended_action', 'N/A')}")
    print(f"\n  [1] Approve credit/refund  [2] Deny  [3] Escalate to finance")
    choice = input("  Decision (1-3): ").strip()
    options = {"1": "approve", "2": "deny", "3": "escalate_finance"}
    return {"decision": options.get(choice, "approve"),
            "reviewer": "ops_manager_01",
            "reviewed_at": datetime.now().isoformat(),
            "resolution_type": "credit_refund"}


# ── Main Pipeline ──────────────────────────────────────────────
def run_order_pipeline(raw_po: dict) -> OrderPipelineState:
    from mock_tools import (validate_purchase_order, verify_customer_credit,
        enrich_line_items, create_shipment_plan, poll_shipment_status,
        detect_exceptions, draft_order_update, send_customer_update)

    client = anthropic.Anthropic()
    state = OrderPipelineState(po_number=raw_po["po_number"], raw_po=raw_po)
    cb = CarrierCircuitBreaker()

    # ── Agent 1: Intake (Claude-powered) ──────────────────────
    print(f"\n[INTAKE] Processing {state.po_number}...")
    intake_tools = [
        {"name": "validate_order", "description": "Validate PO format and required fields",
         "input_schema": {"type": "object", "properties": {"po_number": {"type": "string"}}, "required": ["po_number"]}},
        {"name": "check_credit", "description": "Check customer credit availability",
         "input_schema": {"type": "object", "properties": {"customer_id": {"type": "string"}, "order_total": {"type": "number"}}, "required": ["customer_id", "order_total"]}}
    ]
    intake_msgs = [{"role": "user", "content": f"Validate this order and check credit: {json.dumps(raw_po, default=str)}"}]

    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        system="You are an order intake agent. Validate the order format and check customer credit. Use the available tools.",
        tools=intake_tools, messages=intake_msgs
    )

    # Process tool calls in an agentic loop
    while response.stop_reason == "tool_use":
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "validate_order":
                    result = validate_purchase_order(raw_po)
                elif block.name == "check_credit":
                    result = verify_customer_credit(block.input["customer_id"], block.input["order_total"])
                else:
                    result = {"error": f"Unknown tool: {block.name}"}
                tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result, default=str)})
        intake_msgs.append({"role": "assistant", "content": response.content})
        intake_msgs.append({"role": "user", "content": tool_results})
        response = client.messages.create(
            model="claude-sonnet-4-6", max_tokens=1024,
            system="You are an order intake agent. Validate the order format and check customer credit.",
            tools=intake_tools, messages=intake_msgs
        )

    # Extract Claude's text summary
    intake_text = ""
    for block in response.content:
        if hasattr(block, "text"):
            intake_text = block.text
            break

    # Check if validation/credit actually passed (from tool results)
    validation = validate_purchase_order(raw_po)
    if not validation.get("validated"):
        print(f"[INTAKE] FAILED: {validation.get('errors')}")
        state.stage = "error"
        return state
    order_total = sum(li.get("qty", 0) * li.get("unit_price", 0) for li in raw_po.get("line_items", []))
    credit = verify_customer_credit(raw_po.get("customer", ""), order_total)
    if not credit.get("credit_approved"):
        print(f"[INTAKE] Credit check failed. Available: ${credit.get('available_credit', 0):,.2f}")
        state.stage = "error"
        return state
    enriched = enrich_line_items(raw_po["line_items"], raw_po["customer"])
    state.intake_output = {"validated": True, "credit_passed": True,
        "line_items": enriched, "total_value": order_total,
        "claude_summary": intake_text}
    state.stage = "fulfillment"
    print("[INTAKE] Claude validated order and extracted key fields.")
    print("[INTAKE] Complete.")

    # ── Agent 2: Fulfillment ───────────────────────────────────
    print("[FULFILLMENT] Planning shipments...")
    plan = create_shipment_plan(state.po_number, raw_po["line_items"])
    state.fulfillment_output = plan

    if plan.get("human_approval_required"):
        override = split_shipment_review(state)
        state.fulfillment_output["manager_override"] = override
        if override["decision"] == "wait_consolidation":
            print("[FULFILLMENT] Waiting for consolidation. Pipeline paused.")
            state.stage = "awaiting_consolidation"
            return state
    state.stage = "monitoring"
    print(f"[FULFILLMENT] Complete. Split: {plan.get('requires_split')}.")

    # ── Agent 3: Exception Monitor ─────────────────────────────
    print("[MONITOR] Checking shipment status...")
    tracking_numbers = [s.get("tracking", "TRK-MOCK") for s in plan.get("shipments", [])]
    if not cb.is_tripped():
        try:
            statuses = poll_shipment_status(tracking_numbers)
            cb.record_success()
            exceptions = detect_exceptions(statuses, plan)
            state.exception_output = {"statuses": statuses, "exceptions": exceptions}

            # HITL: credit/refund resolutions require ops manager approval
            for exc in exceptions:
                resolution_type = exc.get("recommended_action", "").lower()
                requires_approval = ("credit" in resolution_type
                                     or "refund" in resolution_type)
                if requires_approval:
                    print(f"[MONITOR] Credit/refund resolution detected. Requesting ops manager approval.")
                    approval = credit_resolution_review(state, exc)
                    exc["manager_override"] = approval
                    if approval["decision"] == "deny":
                        exc["resolution_blocked"] = True
                        print("[MONITOR] Credit resolution denied by ops manager.")
        except Exception:
            cb.record_failure()
            state.exception_output = {"fallback_mode": True}
    else:
        state.exception_output = {"fallback_mode": True,
            "message": "Circuit breaker tripped. Using fallback polling."}
    state.stage = "communication"
    print("[MONITOR] Complete.")

    # ── Agent 4: Communication (Claude-powered) ────────────────
    print("[COMMUNICATION] Drafting updates...")
    update_type = "shipment" if not state.exception_output.get("exceptions") else "delay"
    comms_tools = [
        {"name": "draft_update", "description": "Draft an order status email to the customer",
         "input_schema": {"type": "object", "properties": {"po_number": {"type": "string"}, "update_type": {"type": "string", "enum": ["confirmation", "shipment", "delay", "delivery"]}, "details": {"type": "object"}}, "required": ["po_number", "update_type"]}},
        {"name": "send_update", "description": "Send the drafted update to the customer",
         "input_schema": {"type": "object", "properties": {"draft_id": {"type": "string"}, "channel": {"type": "string"}, "customer_contact": {"type": "string"}}, "required": ["draft_id", "channel", "customer_contact"]}}
    ]
    comms_context = {"po_number": state.po_number, "update_type": update_type,
        "tracking": tracking_numbers[0] if tracking_numbers else "N/A",
        "customer": raw_po["customer"], "shipments": plan.get("shipments", [])}
    comms_msgs = [{"role": "user", "content": f"Draft and send a {update_type} notification for this order: {json.dumps(comms_context, default=str)}"}]

    comms_response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        system="You are a customer communication agent for B2B orders. Draft professional, clear status updates. Use draft_update to create the message, then send_update to deliver it.",
        tools=comms_tools, messages=comms_msgs
    )

    # Process tool calls in an agentic loop
    draft_result = None
    while comms_response.stop_reason == "tool_use":
        tool_results = []
        for block in comms_response.content:
            if block.type == "tool_use":
                if block.name == "draft_update":
                    result = draft_order_update(block.input["po_number"], block.input["update_type"], block.input.get("details", {}))
                    draft_result = result
                elif block.name == "send_update":
                    result = send_customer_update(block.input["draft_id"], block.input["channel"], block.input["customer_contact"])
                else:
                    result = {"error": f"Unknown tool: {block.name}"}
                tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result, default=str)})
        comms_msgs.append({"role": "assistant", "content": comms_response.content})
        comms_msgs.append({"role": "user", "content": tool_results})
        comms_response = client.messages.create(
            model="claude-sonnet-4-6", max_tokens=1024,
            system="You are a customer communication agent for B2B orders. Draft professional, clear status updates.",
            tools=comms_tools, messages=comms_msgs
        )

    # Fallback if Claude didn't use tools
    if not draft_result:
        draft_result = draft_order_update(state.po_number, update_type,
            {"tracking": tracking_numbers[0] if tracking_numbers else "N/A"})
        send_customer_update(draft_result["draft_id"], "email", f"{raw_po['customer']}@example.com")

    state.communication_output = {"draft_id": draft_result["draft_id"],
        "sent_via": "email", "sent_at": datetime.now().isoformat()}
    state.stage = "complete"
    print("[COMMUNICATION] Claude drafted customer notification.")
    print(f"[PIPELINE] Complete! Order {state.po_number} processed.")
    return state


def main():
    print("=" * 60)
    print("  B2B Order Lifecycle Pipeline — Capstone 4-B")
    print("  Type 'demo' for sample PO, 'quit' to exit.")
    print("=" * 60)
    sample = {"po_number": "PO-2024-11234", "customer": "Acme Manufacturing",
        "line_items": [
            {"sku": "SKU-4892", "qty": 50, "unit_price": 495.00},
            {"sku": "SKU-7721", "qty": 200, "unit_price": 125.00}]}
    while True:
        cmd = input("\nCommand: ").strip()
        if cmd.lower() in ("quit", "exit"): break
        if cmd.lower() == "demo":
            state = run_order_pipeline(sample)
            print(f"\nFinal state:\n{json.dumps(asdict(state), indent=2, default=str)}")

if __name__ == "__main__":
    main()
// pipeline.ts — B2B Order Lifecycle Pipeline (Capstone 4-B)
// Usage: ANTHROPIC_API_KEY=... npx ts-node pipeline.ts

import Anthropic from "@anthropic-ai/sdk";
import * as readline from "readline";
import {
  validatePurchaseOrder,
  verifyCustomerCredit,
  enrichLineItems,
  createShipmentPlan,
  pollShipmentStatus,
  detectExceptions,
  draftOrderUpdate,
  sendCustomerUpdate,
} from "./mock_tools";

// ── Types ────────────────────────────────────────────────────────

interface PipelineState {
  po_number: string;
  stage: string;
  raw_po: Record<string, any>;
  intake_output: Record<string, any>;
  fulfillment_output: Record<string, any>;
  exception_output: Record<string, any>;
  communication_output: Record<string, any>;
}

interface CircuitBreakerState {
  failures: number[];          // timestamps in ms
  threshold: number;
  windowMs: number;            // 10 min = 600_000 ms
  status: "healthy" | "tripped" | "fallback";
}

// ── Time-Windowed Circuit Breaker ────────────────────────────────

function createCircuitBreaker(): CircuitBreakerState {
  return { failures: [], threshold: 5, windowMs: 10 * 60 * 1000, status: "healthy" };
}

function recordFailure(cb: CircuitBreakerState): void {
  const now = Date.now();
  cb.failures.push(now);
  // Prune old failures outside the window
  const cutoff = now - cb.windowMs;
  cb.failures = cb.failures.filter(f => f > cutoff);
  if (cb.failures.length >= cb.threshold) {
    cb.status = "tripped";
    console.log(
      `[CIRCUIT BREAKER] TRIPPED! ${cb.failures.length} failures in ${cb.windowMs / 60000} min.`
    );
    console.log("[CIRCUIT BREAKER] Switching to fallback polling mode.");
  }
}

function recordSuccess(_cb: CircuitBreakerState): void {
  // Don't clear — let the window expire naturally
}

function resetCircuitBreaker(cb: CircuitBreakerState): void {
  cb.failures = [];
  cb.status = "healthy";
  console.log("[CIRCUIT BREAKER] Reset to healthy.");
}

// ── HITL Helper (readline-based) ─────────────────────────────────

function askUser(prompt: string): Promise<string> {
  const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
  return new Promise(resolve => {
    rl.question(prompt, (answer: string) => { rl.close(); resolve(answer.trim()); });
  });
}

// ── HITL for Split Shipments ─────────────────────────────────────

async function splitShipmentReview(
  state: PipelineState
): Promise<Record<string, any>> {
  const plan = state.fulfillment_output;
  console.log("\n" + "=".repeat(50));
  console.log("  OPERATIONS MANAGER REVIEW REQUIRED");
  console.log("=".repeat(50));
  console.log(`  PO: ${state.po_number}`);
  console.log(`  Total Value: $${(plan.total_value ?? 0).toLocaleString("en-US", { minimumFractionDigits: 2 })}`);
  console.log(`  Split: ${(plan.shipments ?? []).length} shipments`);
  (plan.shipments ?? []).forEach((s: any, i: number) => {
    console.log(`    Ship ${i + 1}: ${s.warehouse} — SKUs: ${JSON.stringify(s.skus)} — $${s.value.toLocaleString("en-US", { minimumFractionDigits: 2 })}`);
  });
  console.log("\n  [1] Approve split  [2] Wait for consolidation  [3] Partial ship");
  const choice = await askUser("  Decision (1-3): ");
  const options: Record<string, string> = {
    "1": "approve_split", "2": "wait_consolidation", "3": "partial_ship",
  };
  return {
    decision: options[choice] ?? "approve_split",
    reviewer: "ops_manager_01",
    reviewed_at: new Date().toISOString(),
  };
}

// ── HITL for Credit/Refund Resolutions ───────────────────────────

async function creditResolutionReview(
  state: PipelineState, exception: Record<string, any>
): Promise<Record<string, any>> {
  console.log("\n" + "=".repeat(50));
  console.log("  OPERATIONS MANAGER REVIEW REQUIRED — CREDIT/REFUND");
  console.log("=".repeat(50));
  console.log(`  PO: ${state.po_number}`);
  console.log(`  Exception Type: ${exception.type ?? "unknown"}`);
  console.log(`  Severity: ${exception.severity ?? "unknown"}`);
  console.log(`  Recommended Action: ${exception.recommended_action ?? "N/A"}`);
  console.log("\n  [1] Approve credit/refund  [2] Deny  [3] Escalate to finance");
  const choice = await askUser("  Decision (1-3): ");
  const options: Record<string, string> = {
    "1": "approve", "2": "deny", "3": "escalate_finance",
  };
  return {
    decision: options[choice] ?? "approve",
    reviewer: "ops_manager_01",
    reviewed_at: new Date().toISOString(),
    resolution_type: "credit_refund",
  };
}

// ── Main Pipeline ────────────────────────────────────────────────

async function runOrderPipeline(
  rawPo: Record<string, any>
): Promise<PipelineState> {
  const state: PipelineState = {
    po_number: rawPo.po_number,
    stage: "intake",
    raw_po: rawPo,
    intake_output: {},
    fulfillment_output: {},
    exception_output: {},
    communication_output: {},
  };
  const cb = createCircuitBreaker();

  // ── Agent 1: Intake ────────────────────────────────────────
  console.log(`\n[INTAKE] Processing ${state.po_number}...`);
  const validation = validatePurchaseOrder(rawPo);
  if (!validation.validated) {
    console.log(`[INTAKE] FAILED: ${JSON.stringify(validation.errors)}`);
    state.stage = "error";
    return state;
  }
  const orderValue = rawPo.line_items.reduce(
    (sum: number, li: any) => sum + (li.qty ?? 0) * (li.unit_price ?? 0), 0
  );
  const credit = verifyCustomerCredit(rawPo.customer, orderValue);
  if (!credit.credit_approved) {
    console.log(
      `[INTAKE] Credit check failed. Available: $${(credit.available_credit ?? 0).toLocaleString("en-US", { minimumFractionDigits: 2 })}`
    );
    state.stage = "error";
    return state;
  }
  const enriched = enrichLineItems(rawPo.line_items, rawPo.customer);
  state.intake_output = {
    validated: true, credit_passed: true,
    line_items: enriched, total_value: orderValue,
  };
  state.stage = "fulfillment";
  console.log("[INTAKE] Complete.");

  // ── Agent 2: Fulfillment ───────────────────────────────────
  console.log("[FULFILLMENT] Planning shipments...");
  const plan = createShipmentPlan(state.po_number, rawPo.line_items);
  state.fulfillment_output = plan;

  if (plan.human_approval_required) {
    const override = await splitShipmentReview(state);
    state.fulfillment_output.manager_override = override;
    if (override.decision === "wait_consolidation") {
      console.log("[FULFILLMENT] Waiting for consolidation. Pipeline paused.");
      state.stage = "awaiting_consolidation";
      return state;
    }
  }
  state.stage = "monitoring";
  console.log(`[FULFILLMENT] Complete. Split: ${plan.requires_split}.`);

  // ── Agent 3: Exception Monitor ─────────────────────────────
  console.log("[MONITOR] Checking shipment status...");
  const trackingNumbers = (plan.shipments ?? []).map(
    (s: any) => s.tracking ?? "TRK-MOCK"
  );

  if (cb.status !== "tripped") {
    try {
      const statuses = pollShipmentStatus(trackingNumbers);
      recordSuccess(cb);
      const exceptions = detectExceptions(statuses, plan);
      state.exception_output = { statuses, exceptions };

      // HITL: credit/refund resolutions require ops manager approval
      for (const exc of exceptions) {
        const resolutionType = (exc.recommended_action ?? "").toLowerCase();
        const requiresApproval =
          resolutionType.includes("credit") || resolutionType.includes("refund");
        if (requiresApproval) {
          console.log("[MONITOR] Credit/refund resolution detected. Requesting ops manager approval.");
          const approval = await creditResolutionReview(state, exc);
          exc.manager_override = approval;
          if (approval.decision === "deny") {
            exc.resolution_blocked = true;
            console.log("[MONITOR] Credit resolution denied by ops manager.");
          }
        }
      }
    } catch {
      recordFailure(cb);
      state.exception_output = { fallback_mode: true };
    }
  } else {
    state.exception_output = {
      fallback_mode: true,
      message: "Circuit breaker tripped. Using fallback polling.",
    };
  }
  state.stage = "communication";
  console.log("[MONITOR] Complete.");

  // ── Agent 4: Communication ─────────────────────────────────
  console.log("[COMMUNICATION] Drafting updates...");
  const updateType = !(state.exception_output as any).exceptions?.length
    ? "shipment" : "delay";
  const draft = draftOrderUpdate(state.po_number, updateType, {
    tracking: trackingNumbers[0] ?? "N/A",
  });
  const notif = sendCustomerUpdate(
    draft.draft_id, "email", `${rawPo.customer}@example.com`
  );
  state.communication_output = {
    draft_id: draft.draft_id,
    sent_via: "email",
    sent_at: notif.sent_at,
  };
  state.stage = "complete";
  console.log(`[PIPELINE] Complete! Order ${state.po_number} processed.`);
  return state;
}

// ── Entry Point ──────────────────────────────────────────────────

async function main(): Promise<void> {
  console.log("=".repeat(60));
  console.log("  B2B Order Lifecycle Pipeline — Capstone 4-B");
  console.log("  Type 'demo' for sample PO, 'quit' to exit.");
  console.log("=".repeat(60));

  const sample = {
    po_number: "PO-2024-11234",
    customer: "Acme Manufacturing",
    line_items: [
      { sku: "SKU-4892", qty: 50, unit_price: 495.0 },
      { sku: "SKU-7721", qty: 200, unit_price: 125.0 },
    ],
  };

  while (true) {
    const cmd = await askUser("\nCommand: ");
    if (cmd.toLowerCase() === "quit" || cmd.toLowerCase() === "exit") break;
    if (cmd.toLowerCase() === "demo") {
      const state = await runOrderPipeline(sample);
      console.log(`\nFinal state:\n${JSON.stringify(state, null, 2)}`);
    }
  }
}

main().catch(console.error);
🎯 What Just Happened?

You built a 4-agent pipeline with Claude API integration and production safety patterns: (1) Agent 1 (Intake) uses Claude with tool use to validate orders and check credit through an agentic loop; (2) Agent 4 (Communication) uses Claude with tool use to draft and send professional customer notifications; (3) Agents 2 and 3 are deterministic — inventory allocation and status polling don't need LLM reasoning; (4) HITL for split shipments — the pipeline pauses when a split exceeds $50K; (5) Time-windowed circuit breaker — only counts failures within a 10-minute sliding window, preventing false trips from infrequent transient errors.

Expected Output — Standard Flow
[INTAKE] Processing PO-2024-11234... [INTAKE] Claude validated order and extracted key fields. [INTAKE] Complete. [FULFILLMENT] Planning shipments... [FULFILLMENT] Complete. Split: True. [MONITOR] Checking shipment status... [MONITOR] Complete. [COMMUNICATION] Drafting updates... [COMMUNICATION] Claude drafted customer notification. [PIPELINE] Complete! Order PO-2024-11234 processed. Final state: po_number: PO-2024-11234 stage: complete fulfillment_output.requires_split: true circuit_breaker.status: healthy

Testing Guide

test_pipeline.py — Complete Test Suite

Create test_pipeline.py with these 5 pytest test cases. They use unittest.mock.patch to stub Claude API calls so tests run without an API key:

"""test_pipeline.py — 5 test scenarios for B2B Order Lifecycle Pipeline."""

import pytest
from unittest.mock import patch, MagicMock
from pipeline import (
    run_order_pipeline, CarrierCircuitBreaker, OrderPipelineState
)
from mock_tools import PROCESSED_POS


def _stub_claude(*args, **kwargs):
    """Return a mock Claude response that skips tool use."""
    mock_resp = MagicMock()
    mock_resp.stop_reason = "end_turn"
    text_block = MagicMock()
    text_block.type = "text"
    text_block.text = "Order validated successfully."
    mock_resp.content = [text_block]
    return mock_resp


@pytest.fixture(autouse=True)
def reset_processed_pos():
    """Clear duplicate-detection set between tests."""
    PROCESSED_POS.clear()
    yield
    PROCESSED_POS.clear()


# ── Test 1: Happy path ──────────────────────────────────────
@patch("pipeline.anthropic.Anthropic")
def test_happy_path_standard_order(mock_anthropic_cls):
    mock_client = MagicMock()
    mock_client.messages.create.side_effect = _stub_claude
    mock_anthropic_cls.return_value = mock_client

    po = {"po_number": "PO-TEST-001", "customer": "Acme Manufacturing",
          "line_items": [{"sku": "SKU-4892", "qty": 10, "unit_price": 100.00}]}
    state = run_order_pipeline(po)

    assert state.stage == "complete"
    assert state.intake_output["validated"] is True
    assert state.communication_output["sent_via"] == "email"


# ── Test 2: Credit check failure ─────────────────────────────
@patch("pipeline.anthropic.Anthropic")
def test_credit_check_failure(mock_anthropic_cls):
    mock_client = MagicMock()
    mock_client.messages.create.side_effect = _stub_claude
    mock_anthropic_cls.return_value = mock_client

    # Beta Industrial has only $2K available credit
    po = {"po_number": "PO-TEST-002", "customer": "Beta Industrial",
          "line_items": [{"sku": "SKU-4892", "qty": 50, "unit_price": 495.00}]}
    state = run_order_pipeline(po)

    assert state.stage == "error", "Pipeline should stop when credit is insufficient"


# ── Test 3: HITL escalation (high-value split >$50K) ─────────
@patch("builtins.input", return_value="1")  # auto-approve
@patch("pipeline.anthropic.Anthropic")
def test_hitl_escalation_high_value(mock_anthropic_cls, mock_input):
    mock_client = MagicMock()
    mock_client.messages.create.side_effect = _stub_claude
    mock_anthropic_cls.return_value = mock_client

    po = {"po_number": "PO-TEST-003", "customer": "Acme Manufacturing",
          "line_items": [
              {"sku": "SKU-4892", "qty": 60, "unit_price": 495.00},
              {"sku": "SKU-7721", "qty": 200, "unit_price": 125.00}]}
    state = run_order_pipeline(po)

    assert state.stage == "complete"
    assert state.fulfillment_output.get("human_approval_required") is True
    assert "manager_override" in state.fulfillment_output


# ── Test 4: Circuit breaker trip ────���────────────────────────
def test_circuit_breaker_trip():
    cb = CarrierCircuitBreaker()
    assert cb.status == "healthy"

    for _ in range(5):
        cb.record_failure()

    assert cb.status == "tripped"
    assert cb.is_tripped() is True


# ── Test 5: Invalid input (missing fields) ───────────────────
@patch("pipeline.anthropic.Anthropic")
def test_invalid_input_missing_fields(mock_anthropic_cls):
    mock_client = MagicMock()
    mock_client.messages.create.side_effect = _stub_claude
    mock_anthropic_cls.return_value = mock_client

    # Missing 'customer' and 'line_items' fields
    po = {"po_number": "PO-TEST-005"}
    state = run_order_pipeline(po)

    assert state.stage == "error", "Pipeline should reject orders with missing fields"

Run:

pytest test_pipeline.py -v

Expected output:

test_pipeline.py::test_happy_path_standard_order PASSED test_pipeline.py::test_credit_check_failure PASSED test_pipeline.py::test_hitl_escalation_high_value PASSED test_pipeline.py::test_circuit_breaker_trip PASSED test_pipeline.py::test_invalid_input_missing_fields PASSED ========================= 5 passed =========================

Test Scenario Matrix

TypeScenarioExpected Behavior
HAPPYStandard order, single warehouseAll 4 agents run, confirmation email sent, no HITL
HAPPYSplit shipment under $50KAuto-approved, two shipment plans created, customer notified
HAPPYSplit shipment over $50KPipeline pauses for ops manager, manager approves, pipeline resumes
HAPPYShipment delay detectedException monitor catches it, communication agent sends delay notification
HAPPYFull lifecycle through deliveryAll stages logged, delivery confirmation sent
EDGECredit check fails (Beta Industrial, $48K outstanding)Pipeline stops at intake with rejection
EDGEManager selects “wait for consolidation”Pipeline pauses at fulfillment stage, no shipment created
EDGESKU discontinued mid-fulfillmentException monitor catches, suggests substitute
ADVERSARIALCarrier API fails 6 times in 10 minutesCircuit breaker trips, switches to fallback polling, ops alerted
ADVERSARIALDuplicate PO submittedIntake agent detects duplicate, prevents double-processing

Verify Everything Works

Run the complete pipeline end-to-end with a single command:

python -c "
from pipeline import run_order_pipeline
from dataclasses import asdict
import json

sample = {'po_number': 'PO-2024-11234', 'customer': 'Acme Manufacturing',
    'line_items': [
        {'sku': 'SKU-4892', 'qty': 50, 'unit_price': 495.00},
        {'sku': 'SKU-7721', 'qty': 200, 'unit_price': 125.00}]}
state = run_order_pipeline(sample)
print(json.dumps(asdict(state), indent=2, default=str))
" && pytest test_pipeline.py -v
python -c "from pipeline import run_order_pipeline; from dataclasses import asdict; import json; sample = {'po_number': 'PO-2024-11234', 'customer': 'Acme Manufacturing', 'line_items': [{'sku': 'SKU-4892', 'qty': 50, 'unit_price': 495.00}, {'sku': 'SKU-7721', 'qty': 200, 'unit_price': 125.00}]}; state = run_order_pipeline(sample); print(json.dumps(asdict(state), indent=2, default=str))" && pytest test_pipeline.py -v

Expected final output: The pipeline runs all 4 agents (with Claude API calls for Intake and Communication) on the sample PO, processes the split shipment, and completes. The pytest suite then runs all 5 tests (mocking Claude calls) and reports 5 passed.

[INTAKE] Processing PO-2024-11234... [INTAKE] Claude validated order and extracted key fields. [INTAKE] Complete. [FULFILLMENT] Planning shipments... [FULFILLMENT] Complete. Split: True. [MONITOR] Checking shipment status... [MONITOR] Complete. [COMMUNICATION] Drafting updates... [COMMUNICATION] Claude drafted customer notification. [PIPELINE] Complete! Order PO-2024-11234 processed. Final state: po_number: PO-2024-11234 stage: complete fulfillment_output.requires_split: true test_pipeline.py::test_happy_path_standard_order PASSED test_pipeline.py::test_credit_check_failure PASSED test_pipeline.py::test_hitl_escalation_high_value PASSED test_pipeline.py::test_circuit_breaker_trip PASSED test_pipeline.py::test_invalid_input_missing_fields PASSED ========================= 5 passed =========================
Congratulations!

You have built a complete multi-agent B2B order lifecycle pipeline with four specialized agents — two powered by Claude API calls with tool use (Intake and Communication) and two deterministic (Fulfillment and Exception Monitor). The pipeline includes a time-windowed circuit breaker that switches to fallback polling when the carrier API fails, and human-in-the-loop review for high-value split shipments and credit resolutions. This is the same architecture used in production order management systems — the only differences in a real deployment would be connecting to actual ERP, WMS, and carrier APIs instead of mock data.

Compliance Notes

⚠️ PCI-DSS, EDI & Contract Confidentiality

A multi-agent order pipeline handles sensitive commercial data at every stage: customer credit limits, contract pricing tiers, inventory levels, and supplier quality information.

  • Credit data protection: Customer credit limits and outstanding balances are highly sensitive. Never expose one customer’s credit data to another. Encrypt at rest.
  • EDI audit trail: If POs arrive via EDI 850, every transformation (intake validation, enrichment) must be logged for EDI audit compliance.
  • HITL decision logging: Operations manager split shipment and credit/refund resolution decisions are business records. Log decision, rationale, reviewer identity, and timestamp.
  • Carrier API credentials: Carrier tracking API keys must be stored securely (environment variables or secrets manager), never in code or pipeline state.
  • Circuit breaker alerts: When the breaker trips, the alert to operations must NOT include order details or customer names — only the failure count and affected service.
💰 Cost: 2 Claude API Calls per Order

Each pipeline run makes 2 Claude API calls (Agent 1: Intake validation, Agent 4: Communication drafting). Agents 2 and 3 are deterministic — no LLM needed for inventory allocation or status polling. Typical consumption: 3,000–6,000 tokens total. At Sonnet pricing, budget ~$0.02–$0.04 per order. For 200 orders/day: $4–$8/day. The HITL path adds human time but no additional API cost. The circuit breaker in fallback mode reduces carrier API cost (fewer polling calls).

Troubleshooting

Common Errors

Import Errors

ModuleNotFoundError: No module named 'anthropic' — Your virtual environment is not activated. Run source venv/bin/activate (Unix) or venv\Scripts\activate (Windows).

ModuleNotFoundError: No module named 'mock_tools' — You are running the script from the wrong directory. Make sure you are inside order-lifecycle-pipeline/ where both pipeline.py and mock_tools.py exist.

API Errors

AuthenticationError: 401 — Your ANTHROPIC_API_KEY is not set or is invalid. Run echo $ANTHROPIC_API_KEY (Unix) or echo %ANTHROPIC_API_KEY% (Windows) to verify.

RateLimitError: 429 — You are sending too many requests. Wait a few seconds and retry. For testing, the pipeline uses mock tools so API calls are minimal.

Pipeline Logic Errors

Credit check always fails — Make sure the customer name in your PO exactly matches a key in CUSTOMER_CREDIT (e.g., "Acme Manufacturing", not "Acme"). The lookup is case-sensitive.

HITL never triggers — The approval check requires both conditions: requires_split == True AND total_value > 50000. If your test PO total is under $50K, HITL will be skipped even on a split shipment.

Circuit breaker doesn't trip — Ensure you are recording failures in rapid succession (within 10 minutes). If you add time.sleep() between failures for testing, the window may expire before reaching the threshold.

Duplicate PO not detected — The PROCESSED_POS set is module-level. If you restart the Python process between tests, the set is empty. Duplicate detection only works within a single process run.

Going Further

  1. [OPTIONAL] Async queue-based processing — Replace synchronous orchestration with Redis/BullMQ queues. Each agent processes its queue independently.
  2. [OPTIONAL] Carrier webhook integration — Instead of polling, receive carrier push notifications via webhooks for real-time tracking updates.
  3. [OPTIONAL] Customer-tier-aware communication — Gold customers get phone call + email for delays; Bronze gets email only. Adjust channel and urgency per tier.
  4. [OPTIONAL] SLA violation detection — Automatically detect when delivery exceeds the contract SLA commitment and calculate credit amounts.
  5. [OPTIONAL] Dashboard (M20) — Build a real-time dashboard showing pipeline stage distribution, HITL queue depth, circuit breaker status, and exception rates.
  6. [OPTIONAL] Multi-carrier routing — Add logic to select the optimal carrier per shipment based on cost, speed, and reliability metrics.

Knowledge Check

Q1: Why is the B2B order pipeline split into 4 separate agents instead of one monolithic agent?

Q2: What triggers the circuit breaker in this pipeline?

Q3: When does the pipeline require human-in-the-loop approval from an operations manager?

Q4 (Applied): Agent 3 detects a carrier delay on a $75K split shipment order. What happens?

Q5: What is the purpose of output guardrails on Agent 4 (Customer Communication)?

References