M21: API Design & Deployment
Package your agent as a production API with streaming, containers, and cloud deployment.
Your agent works on your laptop. Now it needs to work for thousands of users, 24/7, without you babysitting it. This module takes you from a local script to a production API — choosing the right protocol, packaging it in a container, deploying it to the cloud, and scaling it under real-world load.
Learning Objectives
- Choose the right API protocol (REST, SSE, WebSocket) for your agent’s communication pattern
- Build a streaming agent API with health checks and structured event types
- Containerize an agent with Docker using multi-stage builds, cached layers, and non-root users
- Deploy to Cloud Run, Lambda, or Railway and understand the trade-offs of each
- Scale with queue-based processing and handle rate limits from the Anthropic API
Designing the Agent API: REST, WebSocket, SSE
Here is what an SSE stream actually looks like on the wire — this is the raw HTTP response your client receives when it calls POST /chat:
Each chunk arrives as soon as it is generated. The client sees tokens one by one, tool calls as they happen, and a final done event with usage stats. Compare this to REST polling, where you would need to send a new request every 500ms asking “is there anything new yet?”
text/event-stream content type and sends named events (like event: token) as they become available. Built on standard HTTP, works through proxies and load balancers, and automatically reconnects on failure. is a protocol built on top of standard HTTP. Unlike REST, where the client must repeatedly ask “anything new?” (polling), SSE lets the server push data to the client as it becomes available. The client opens one HTTP connection, and the server writes events to it over time. Each event has an optional event: type (like token, tool_call, or done) and a data: payload. Because SSE uses plain HTTP, it works through every proxy, load balancer, and CDN — unlike WebSockets, which require protocol upgrades that some infrastructure cannot handle.
When to Use Each Protocol
- REST (request/response) — Best for simple, stateless operations: health checks, submitting feedback, retrieving past conversations. The client sends a request, waits for the full response, and closes the connection.
- SSE (server push) — Best for most agent interactions. The user sends a message, and tokens stream back as they are generated. The client only listens; it does not need to send data during the stream. This is how the Anthropic Messages API works in streaming mode.
- WebSocket (bidirectional) — Best when the client must send data during the stream: canceling a generation, providing real-time tool results, or collaborative editing. WebSockets add complexity — you must handle heartbeats, reconnection, and protocol upgrades — so only choose them when SSE is genuinely insufficient.
EventSource API. The Anthropic Messages API itself uses SSE for streaming, so your agent’s protocol matches the upstream provider perfectly. Reserve WebSockets for the rare case where you need to push data to the server mid-stream (like a “stop generating” button that requires server acknowledgment beyond simply closing the connection).
“WebSockets are always better because they support bidirectional communication.” — More capability does not mean better. WebSockets require protocol upgrades that some proxies and CDNs cannot handle. They need heartbeat management, reconnection logic, and careful connection lifecycle handling. For the 95% of agent interactions that follow a simple send-message-receive-stream pattern, SSE is simpler, more reliable, and works everywhere. Only reach for WebSockets when you genuinely need the client to push data during an active stream.
“I need WebSockets for a ‘stop generating’ button.” — Not necessarily. The simplest “stop” implementation just closes the SSE connection from the client side. The server detects the disconnect and stops generating. You only need WebSockets if the server must acknowledge the cancellation or send a final summary before closing.
“REST polling is fine if I poll frequently enough.” — Polling every 200ms means 5 requests per second per user. At 100 concurrent users, that is 500 requests per second just for status checks — most returning empty responses. SSE uses one connection per user with zero wasted requests. The infrastructure cost difference is enormous at scale.
“I should build my own streaming protocol.” — SSE is a W3C standard with native browser support via EventSource. It handles reconnection, event IDs, and content encoding automatically. Building a custom protocol means rebuilding all of this from scratch and debugging edge cases that SSE solved years ago.
Containerization: Docker
Here is what that “standardized box” actually looks like in practice. A Dockerfile is just a text file with step-by-step instructions:
That is a complete Dockerfile. Five lines. It produces an image — a snapshot of your agent and everything it needs — that runs identically everywhere.
A Docker image is a read-only template containing your application code, runtime (Python, Node.js), system libraries, and configuration. Think of it as a frozen snapshot of everything your agent needs to run. It is built from a Dockerfile — a simple text file with step-by-step instructions like “start from Python 3.12, install these packages, copy my code.”
Each instruction in the Dockerfile creates a layer. Layers are Docker’s secret weapon for speed: they are cached and reused across builds. If you change your application code but not your dependencies, Docker reuses the cached dependency layer and only rebuilds the code layer. This turns a 3-minute build into a 10-second build.
A container is a running instance of an image — an isolated process with its own filesystem, network, and process namespace. It is not a virtual machine (a common confusion). Containers share the host OS kernel, making them much lighter than VMs — they start in milliseconds, not minutes. Multiple containers can run from the same image simultaneously, which is exactly how auto-scaling works: Cloud Run launches more containers of your image when traffic increases.
A container registry (Docker Hub, Google Artifact Registry, AWS ECR) stores and distributes images. You push your built image to a registry, and cloud platforms pull from it when deploying. It is like a package repository (npm, PyPI) but for entire application environments.
Dockerfile Best Practices for Agents
- Use slim base images —
python:3.12-slim(150MB) instead ofpython:3.12(900MB), ornode:20-alpine(180MB) instead ofnode:20(1GB). - Cache dependency layers — Copy
requirements.txt/package.jsonbefore copying source code. Dependencies change rarely; code changes on every commit. This saves minutes on every build. - Multi-stage builds — Use one stage to install build tools (compilers, headers), then copy only the artifacts into a clean final stage. The result is a smaller, more secure image.
- Non-root user — Run the application as a non-root user. If an attacker exploits a vulnerability, they get limited permissions instead of root access to the host.
- Health checks — Add a
HEALTHCHECKinstruction so Docker (and your orchestrator) knows when the container is ready to serve traffic.
.dockerignore to prevent .env files from being copied into the build context.
“Docker containers are like virtual machines.” — This is the most common confusion. A VM runs a full operating system with its own kernel, consuming gigabytes of RAM and taking minutes to boot. A container shares the host OS kernel and runs as an isolated process — it starts in milliseconds and uses megabytes, not gigabytes. Think of VMs as separate houses on a street, and containers as apartments in one building sharing the same foundation.
“I should use the full base image for maximum compatibility.” — The full python:3.12 image is 900MB and includes compilers, man pages, and tools you will never use. The python:3.12-slim image at 150MB has everything your agent actually needs. A smaller image means faster deploys, faster scaling, and fewer vulnerabilities to patch.
“I deleted the .env file in a later Dockerfile step, so it’s safe.” — No. Docker images are layered like geological strata. Each layer is immutable. Running RUN rm .env creates a new layer that hides the file, but the original layer still contains it. Anyone with docker history or docker save can extract it. Use .dockerignore to prevent secrets from entering the build context in the first place.
Cloud Deployment: Lambda, Cloud Run, Railway
Here is what the “leasing” option looks like in practice — a single command deploys your container to Cloud Run:
One command. You get a URL with TLS, auto-scaling, and zero cost when idle. That is the power of managed containers.
Second: managed container platforms like Cloud RunCloud Run — A Google Cloud managed container platform that automatically scales container instances from zero to thousands based on incoming requests. It supports HTTP/1.1, HTTP/2, WebSocket, and SSE streaming with request timeouts up to 60 minutes. You pay only for CPU and memory consumed while handling requests.. These take a different approach. You give them a Docker container image, and they handle scaling, load balancing, and TLS certificates for you.
One pain point shared by both models is the cold startCold start — The delay when a serverless function or container must be initialized from scratch because no warm instance is available. Includes downloading the container image, starting the runtime, and loading application code. Typically 500ms–5s depending on image size and runtime.. This happens when no warm instance is available. The platform must spin up a fresh one from scratch — downloading the image, starting the runtime, and loading your code. Users feel this as a 1–5 second delay on the first request.
Platform Comparison
| Feature | Lambda | Cloud Run | Railway / VM |
|---|---|---|---|
| Max timeout | 15 min | 60 min | Unlimited |
| Streaming (SSE) | Limited | Native | Native |
| Scale to zero | Yes | Yes | No |
| Cold start | 0.5–3s | 1–5s | None |
| Cost when idle | $0 | $0 | $5–50/mo |
“Serverless is always cheaper because you only pay for what you use.” — It depends on your traffic pattern. At high sustained load (thousands of requests per hour, 24/7), serverless per-invocation pricing can exceed the cost of a fixed VM. Serverless wins for bursty, low-traffic workloads. For always-on high-traffic APIs, a VM or reserved instance may be cheaper.
“Cold starts don’t matter because they only happen once.” — Cold starts happen every time a new instance is created. With scale-to-zero, that means the first request after a quiet period always gets a cold start. With auto-scaling, a traffic spike creates multiple new instances, each with its own cold start. Users feel 2–5 seconds of latency that can be frustrating for chat interfaces.
“Lambda can’t handle agent tasks at all.” — Lambda works fine for short, synchronous agent tasks (single-turn Q&A, classification, extraction) that complete in under 30 seconds. Its limitation is streaming and long multi-step reasoning chains, not agent tasks in general.
Model Host Choice: Direct API vs Bedrock vs Vertex AI
Picking where your code runs (Cloud Run, Lambda, VM) is one decision. Picking where Claude itself runs is a second, independent one. The same agent code can call Claude through three different model hosts, and the choice usually depends on which cloud already holds your data, your IAM policies, and your billing relationship.
| Dimension | Direct Anthropic API | AWS Bedrock | Google Vertex AI |
|---|---|---|---|
| Auth | API key | AWS IAM / SigV4 | Google IAM / ADC |
| Billing | Anthropic invoice | Rolls into AWS bill | Rolls into GCP bill |
| Data residency | US / EU regions | Any AWS region with Claude | Any GCP region with Claude |
| Newest model lag | Day 0 | Days to weeks | Days to weeks |
| Procurement | New vendor | Existing AWS contract | Existing GCP contract |
| Best for | Fastest model access, simplest auth | AWS-native shops, VPC-only data | GCP-native shops, Vertex pipelines |
Code-wise, switching hosts is usually a constructor change — the message-shape and tool-use protocol are identical across all three:
# Direct Anthropic API — uses ANTHROPIC_API_KEY from env
from anthropic import Anthropic
client = Anthropic()
msg = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
# AWS Bedrock — uses your AWS IAM credentials, no API key
from anthropic import AnthropicBedrock
bedrock = AnthropicBedrock(aws_region="us-west-2")
msg = bedrock.messages.create(
model="anthropic.claude-opus-4-7-v1:0", # Bedrock model ID
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
# Google Vertex AI — uses Application Default Credentials
from anthropic import AnthropicVertex
vertex = AnthropicVertex(project_id="my-gcp-project", region="us-east5")
msg = vertex.messages.create(
model="claude-opus-4-7@20260101", # Vertex publisher model ID
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
// Direct Anthropic API
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const msg = await client.messages.create({
model: "claude-opus-4-7",
max_tokens: 1024,
messages: [{ role: "user", content: "Hello" }],
});
// AWS Bedrock
import { AnthropicBedrock } from "@anthropic-ai/bedrock-sdk";
const bedrock = new AnthropicBedrock({ awsRegion: "us-west-2" });
const msg2 = await bedrock.messages.create({
model: "anthropic.claude-opus-4-7-v1:0",
max_tokens: 1024,
messages: [{ role: "user", content: "Hello" }],
});
// Google Vertex AI
import { AnthropicVertex } from "@anthropic-ai/vertex-sdk";
const vertex = new AnthropicVertex({ projectId: "my-gcp-project", region: "us-east5" });
const msg3 = await vertex.messages.create({
model: "claude-opus-4-7@20260101",
max_tokens: 1024,
messages: [{ role: "user", content: "Hello" }],
});
Healthcare and finance teams often cannot use the direct Anthropic API at all — their compliance teams have already approved AWS or GCP under a BAA / DPA, and adding a fourth-party data processor restarts that review. Bedrock and Vertex AI are the same Claude model, but invoked through an already-approved cloud account. The data never leaves your AWS or GCP perimeter.
Startups usually go direct — new models land on the Anthropic API days to weeks before they appear on Bedrock or Vertex, and shipping on the latest model is often a competitive edge.
Managed Agent Platforms (Skip the Container Entirely)
Cloud Run and Lambda host your agent code. Managed agent platforms host a preconfigured agent runtime — you upload a system prompt, a list of tools (called “action groups” on Bedrock or “tools” on Vertex Agent Engine), and the platform runs the tool-use loop, persists session state, and exposes a single InvokeAgent endpoint. You never write a FastAPI server.
| Platform | What it gives you | Use when |
|---|---|---|
| AWS Bedrock AgentCore | Hosted agent runtime, session memory, action groups (tools defined as Lambda functions or OpenAPI specs), Bedrock Knowledge Bases for built-in RAG, Bedrock Guardrails for safety filters. | You are AWS-native and want session state, tool execution, and RAG without writing or operating any of it. |
| Vertex AI Agent Engine | Managed runtime for LangChain / LangGraph / CrewAI / Agent Development Kit agents. Handles deployment, autoscaling, sessions, and Cloud Trace integration. | You wrote your agent in a Python framework and want to deploy without containerizing or running infrastructure. |
| Anthropic Agent SDK | A code SDK (not a hosted runtime) that bundles the agent loop, sub-agents, file system tools, and bash/code execution. Run it inside your Cloud Run / Lambda / VM. | You want Anthropic’s reference agent loop but keep full control over hosting and data flow. |
Working example: here is what calling a deployed Bedrock agent looks like from your application code. The agent itself was configured through the Bedrock console or Terraform — system prompt, foundation model (Claude), action groups, and an optional knowledge base ID. Your code just invokes it:
import boto3, uuid
# bedrock-agent-runtime is the data-plane client (invoke a deployed agent).
# bedrock-agent (without -runtime) is the control-plane client (create/update agents).
client = boto3.client("bedrock-agent-runtime", region_name="us-west-2")
def ask_agent(question: str, session_id: str) -> str:
response = client.invoke_agent(
agentId="ABCDE12345", # set when you created the agent
agentAliasId="TSTALIASID", # "TSTALIASID" = the auto-created test alias
sessionId=session_id, # reuse to keep conversation state on Bedrock's side
inputText=question,
)
# invoke_agent returns a streaming event iterator, not a JSON blob
chunks = []
for event in response["completion"]:
if "chunk" in event:
chunks.append(event["chunk"]["bytes"].decode("utf-8"))
return "".join(chunks)
# Same session_id across calls = Bedrock remembers the conversation
session = str(uuid.uuid4())
print(ask_agent("What were Q4 sales for Acme Corp?", session))
print(ask_agent("And how does that compare to Q3?", session)) # context is preserved
import {
BedrockAgentRuntimeClient,
InvokeAgentCommand,
} from "@aws-sdk/client-bedrock-agent-runtime";
import { randomUUID } from "node:crypto";
const client = new BedrockAgentRuntimeClient({ region: "us-west-2" });
async function askAgent(question: string, sessionId: string): Promise<string> {
const cmd = new InvokeAgentCommand({
agentId: "ABCDE12345",
agentAliasId: "TSTALIASID",
sessionId,
inputText: question,
});
const response = await client.send(cmd);
let out = "";
// response.completion is an async iterable of events
for await (const event of response.completion ?? []) {
if (event.chunk?.bytes) {
out += new TextDecoder().decode(event.chunk.bytes);
}
}
return out;
}
const session = randomUUID();
console.log(await askAgent("What were Q4 sales for Acme Corp?", session));
console.log(await askAgent("And how does that compare to Q3?", session));
Scaling: Concurrent Requests and Queues
Here is what the “reservation system” looks like as a queue message. When a user sends a chat request, your API server pushes a task like this onto Redis or SQS:
A worker picks this up, calls the Claude API, and streams the result back. If the worker gets a 429 rate limit, it re-enqueues the task with an incremented retry_count and a backoff delay. The user sees “processing...” instead of an error.
On the other side, worker processes pull tasks from the queue at a controlled rate. This architecture handles traffic spikes gracefully: the queue absorbs bursts, and workers process them at a sustainable pace.
There is a related concept called autoscaling on concurrency. Instead of watching CPU usage to decide when to add more instances, you watch the number of in-flight requests per instance. Why? Because agent work is I/O-bound — your server spends most of its time waiting for Claude’s API to respond, not crunching numbers. CPU usage stays low even when the instance is fully saturated with requests. If you scale on CPU, you will never add instances and your queue will grow unbounded.
Cloud Run makes this easy with the --concurrency=10 flag. This tells Cloud Run: “each instance handles at most 10 simultaneous requests.” When the 11th request arrives, Cloud Run automatically spins up a new instance to handle it.
How is this different from traditional auto-scaling? In a web application, you might scale when CPU exceeds 70%. But an agent handler waiting for the Claude API to respond uses almost zero CPU — it is just sitting idle, holding an open HTTP connection. CPU-based scaling would never trigger, even though your instance is fully saturated with waiting requests. Concurrency-based scaling measures what actually matters: how many requests is this instance juggling right now?
Handling Anthropic API Rate Limits
The Anthropic API enforces rate limits on requests per minute (RPM) and tokens per minute (TPM). When you exceed them, you get a 429 Too Many Requests response with a retry-after header. The correct response is not to retry immediately:
- Respect
retry-after— Wait the number of seconds specified in the header before retrying. - Exponential backoff — If no header is present, wait 1s, 2s, 4s, 8s (doubling each time) with random jitter to prevent thundering herd.
- Proactive rate tracking — Track your usage in-process and slow down before hitting the limit. The
x-ratelimit-remainingresponse header tells you how many requests you have left. - Circuit breaker — If you receive 5 consecutive 429s, stop sending requests entirely for 30 seconds. This prevents cascading failures and gives the API time to recover.
Handling Long-Running Agents
The problem. Real agent runs — multi-tool loops, deep retrieval, multi-step reasoning — routinely take 2–5 minutes. But the HTTP world disagrees: browsers, CDNs, API gateways, and load balancers idle out connections at 30–60 seconds. The user sees a 504 Gateway Timeout while the agent is still happily working in the background. You cannot solve this by raising timeouts everywhere; you must decouple accepting a request from delivering its result.
Three patterns handle this cleanly:
- Pattern A — Async job queue.
POST /chatenqueues the task and immediately returns202 Acceptedwith ajob_id. The client pollsGET /jobs/{job_id}every few seconds untilstatus: done. Simple, survives client disconnects, works through any proxy. Trade-off: polling latency and no progressive output. - Pattern B — Server-Sent Events (SSE). One-way streaming over a long-lived HTTP connection. The server pushes incremental events (
token,tool_call,step_complete) as they happen. Best UX for progressive output, traverses most proxies, native browser support viaEventSource. - Pattern C — WebSocket. Full duplex, so the client can send
cancelmid-run, adjust parameters, or interject. Use when you need bidirectional control. Costlier to scale (sticky sessions) and harder to debug.
Pattern B with FastAPI looks like this:
Timeout handling. Cap every run at 5 minutes via asyncio.wait_for. On timeout, do not raise — emit a partial event containing whatever the agent produced so far (tool results, draft answer) plus a truncated: true flag. A partial answer the user can act on is far more useful than a 504.
Code Walkthrough
Building the Agent API Server
We will build a complete agent API with three endpoints:
- POST /chat — The main endpoint. Accepts a user message and streams the agent’s response via SSE with structured event types (
token,tool_call,done). - GET /health — Returns the service status. Used by load balancers and orchestrators to know when the container is ready.
- POST /feedback — Accepts user feedback (thumbs up/down) for continuous improvement.
Let’s build the server. The core idea is simple: when a user sends a message to /chat, we open a streaming connection to the Claude API and forward each token to the client as an SSE event. FastAPI makes this especially clean with its built-in StreamingResponse, while Express requires setting the SSE headers manually.
Why SSE instead of a regular JSON response? Because agent responses take 10–45 seconds. Without streaming, the user stares at a blank screen for that entire time. With SSE, tokens appear as they are generated — just like the ChatGPT or Claude.ai interfaces you have used. Each event has a type (token, tool_call, done, error) so the client knows exactly what to render.
One gotcha that trips up nearly every team on their first deploy: you must set Cache-Control: no-cache and Connection: keep-alive in the response headers. Without these, proxies and CDNs (like Cloudflare or nginx) will buffer the entire stream and deliver it all at once — your user gets nothing for 30 seconds and then the full response dumps at once. The X-Accel-Buffering: no header specifically tells nginx to stop buffering.
"""
Agent API Server — FastAPI + SSE Streaming
Endpoints: POST /chat, GET /health, POST /feedback
"""
import os
import json
import time
import asyncio
from datetime import datetime
from typing import AsyncGenerator
import anthropic
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# --- Configuration ---
app = FastAPI(title="Agent API", version="1.0.0")
client = anthropic.AsyncAnthropic() # Uses ANTHROPIC_API_KEY env var
# --- Rate limiting (in-memory, use Redis in production) ---
request_counts: dict[str, list[float]] = {}
RATE_LIMIT = 20 # requests per minute per IP
def check_rate_limit(ip: str) -> bool:
"""Return True if the request is allowed."""
now = time.time()
if ip not in request_counts:
request_counts[ip] = []
# Remove timestamps older than 60 seconds
request_counts[ip] = [t for t in request_counts[ip] if now - t < 60]
if len(request_counts[ip]) >= RATE_LIMIT:
return False
request_counts[ip].append(now)
return True
# --- Models ---
class ChatRequest(BaseModel):
message: str
conversation_id: str | None = None
class FeedbackRequest(BaseModel):
conversation_id: str
rating: str # "thumbs_up" or "thumbs_down"
comment: str | None = None
# --- SSE Streaming ---
async def stream_agent_response(
message: str,
) -> AsyncGenerator[str, None]:
"""Stream agent response as SSE events."""
try:
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": message}],
) as stream:
async for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
# Stream each token as an SSE event
yield f"event: token\ndata: {json.dumps({'text': event.delta.text})}\n\n"
elif event.delta.type == "input_json_delta":
yield f"event: tool_call\ndata: {json.dumps({'partial_json': event.delta.partial_json})}\n\n"
elif event.type == "message_stop":
# Get final usage stats
final = stream.get_final_message()
yield f"event: done\ndata: {json.dumps({'usage': {'input_tokens': final.usage.input_tokens, 'output_tokens': final.usage.output_tokens}})}\n\n"
except anthropic.RateLimitError as e:
retry_after = e.response.headers.get("retry-after", "30")
yield f"event: error\ndata: {json.dumps({'error': 'rate_limited', 'retry_after': int(retry_after)})}\n\n"
except anthropic.APIError as e:
yield f"event: error\ndata: {json.dumps({'error': 'api_error', 'message': str(e)})}\n\n"
# --- Endpoints ---
@app.post("/chat")
async def chat(req: ChatRequest, request: Request):
"""Stream agent response via SSE."""
client_ip = request.client.host if request.client else "unknown"
if not check_rate_limit(client_ip):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Try again in 60 seconds.",
)
return StreamingResponse(
stream_agent_response(req.message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
@app.get("/health")
async def health():
"""Health check for load balancers and orchestrators."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
}
@app.post("/feedback")
async def feedback(req: FeedbackRequest):
"""Record user feedback for continuous improvement."""
# In production: write to database or analytics pipeline
print(f"Feedback: {req.conversation_id} -> {req.rating}")
return {"status": "recorded"}
# --- Run with: uvicorn server:app --host 0.0.0.0 --port 8080 ---
/**
* Agent API Server — Express + SSE Streaming
* Endpoints: POST /chat, GET /health, POST /feedback
*/
import express from "express";
import Anthropic from "@anthropic-ai/sdk";
const app = express();
app.use(express.json());
const client = new Anthropic(); // Uses ANTHROPIC_API_KEY env var
const PORT = process.env.PORT || 8080;
// --- Rate limiting (in-memory, use Redis in production) ---
const requestCounts = new Map();
const RATE_LIMIT = 20; // requests per minute per IP
function checkRateLimit(ip) {
const now = Date.now();
if (!requestCounts.has(ip)) requestCounts.set(ip, []);
const timestamps = requestCounts
.get(ip)
.filter((t) => now - t < 60_000);
requestCounts.set(ip, timestamps);
if (timestamps.length >= RATE_LIMIT) return false;
timestamps.push(now);
return true;
}
// --- POST /chat — SSE Streaming ---
app.post("/chat", async (req, res) => {
const clientIp = req.ip || "unknown";
if (!checkRateLimit(clientIp)) {
return res.status(429).json({
error: "Rate limit exceeded. Try again in 60 seconds.",
});
}
const { message } = req.body;
if (!message) {
return res.status(400).json({ error: "message is required" });
}
// Set SSE headers — CRITICAL for streaming
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no"); // nginx
res.flushHeaders();
try {
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 4096,
messages: [{ role: "user", content: message }],
});
stream.on("text", (text) => {
res.write(
`event: token\ndata: ${JSON.stringify({ text })}\n\n`
);
});
stream.on("inputJson", (partialJson) => {
res.write(
`event: tool_call\ndata: ${JSON.stringify({
partial_json: partialJson,
})}\n\n`
);
});
stream.on("finalMessage", (msg) => {
res.write(
`event: done\ndata: ${JSON.stringify({
usage: {
input_tokens: msg.usage.input_tokens,
output_tokens: msg.usage.output_tokens,
},
})}\n\n`
);
res.end();
});
stream.on("error", (err) => {
const isRateLimit = err.status === 429;
res.write(
`event: error\ndata: ${JSON.stringify({
error: isRateLimit ? "rate_limited" : "api_error",
message: err.message,
retry_after: isRateLimit ? 30 : undefined,
})}\n\n`
);
res.end();
});
} catch (err) {
res.write(
`event: error\ndata: ${JSON.stringify({
error: "server_error",
message: err.message,
})}\n\n`
);
res.end();
}
});
// --- GET /health ---
app.get("/health", (req, res) => {
res.json({
status: "healthy",
timestamp: new Date().toISOString(),
version: "1.0.0",
});
});
// --- POST /feedback ---
app.post("/feedback", (req, res) => {
const { conversation_id, rating, comment } = req.body;
if (!conversation_id || !rating) {
return res
.status(400)
.json({ error: "conversation_id and rating required" });
}
// In production: write to database or analytics pipeline
console.log(`Feedback: ${conversation_id} -> ${rating}`);
res.json({ status: "recorded" });
});
app.listen(PORT, () => {
console.log(`Agent API listening on port ${PORT}`);
});
/chat endpoint streams tokens via SSE with typed events (token, tool_call, done, error). Rate limiting prevents abuse. The /health endpoint tells orchestrators when the service is ready. The code handles Claude API rate limits gracefully, surfacing them as SSE error events instead of crashing.
Production Dockerfile
Now let’s look at a production-quality Dockerfile. This uses a multi-stage build. Here’s the idea: the first stage is a “workshop” where we install compilers, build tools, and all our dependencies. The second stage is a clean “showroom” — we copy only the finished artifacts into it. The result is a smaller image (under 200MB) that also has a smaller attack surface — no compilers or build tools for an attacker to exploit.
The most important detail — and the one that saves you the most time day-to-day — is the order of COPY instructions. We copy requirements.txt first and install dependencies, then copy the application code. Why does order matter? Docker caches each layer. If you copy everything at once, every code change — even fixing a typo in a print statement — invalidates the dependency layer, and Docker re-downloads and reinstalls all packages. That turns a 5-second rebuild into a 2-minute rebuild. By copying the dependency file first, Docker reuses the cached dependency layer whenever only your code changes. You will appreciate this the fifteenth time you tweak a prompt and rebuild.
# ---- Stage 1: Build dependencies ----
FROM python:3.12-slim AS builder
WORKDIR /app
# Install dependencies first (cached layer)
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# ---- Stage 2: Production image ----
FROM python:3.12-slim
# Security: create non-root user
RUN groupadd -r agent && useradd -r -g agent -d /app -s /bin/false agent
WORKDIR /app
# Copy only installed packages from builder
COPY --from=builder /root/.local /home/agent/.local
ENV PATH="/home/agent/.local/bin:$PATH"
# Copy application code
COPY . .
# Remove any .env files that slipped in
RUN rm -f .env .env.* || true
# Set ownership
RUN chown -R agent:agent /app
# Switch to non-root user
USER agent
# Health check — used by Docker and orchestrators
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" || exit 1
EXPOSE 8080
# Run with uvicorn — 4 workers for production
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]
# ---- Stage 1: Install dependencies ----
FROM node:20-alpine AS builder
WORKDIR /app
# Install dependencies first (cached layer)
COPY package.json package-lock.json ./
RUN npm ci --production
# ---- Stage 2: Production image ----
FROM node:20-alpine
# Security: create non-root user
RUN addgroup -S agent && adduser -S agent -G agent
WORKDIR /app
# Copy node_modules from builder
COPY --from=builder /app/node_modules ./node_modules
# Copy application code
COPY . .
# Remove any .env files that slipped in
RUN rm -f .env .env.* || true
# Set ownership
RUN chown -R agent:agent /app
# Switch to non-root user
USER agent
# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD wget --spider --quiet http://localhost:8080/health || exit 1
EXPOSE 8080
CMD ["node", "server.js"]
docker run, injecting the API key via environment variable.
Deploying to Cloud Run
With the Dockerfile ready, deploying to Cloud Run takes two commands. The first builds your image and pushes it to Google’s container registry. The second tells Cloud Run to run that image with your scaling configuration. Cloud Run handles everything else — TLS certificates, load balancing, auto-scaling, health monitoring. You focus on your agent code, not infrastructure.
# Build and push to Google Artifact Registry
gcloud builds submit --tag gcr.io/MY_PROJECT/agent-api:v1
# Deploy to Cloud Run
gcloud run deploy agent-api \
--image gcr.io/MY_PROJECT/agent-api:v1 \
--platform managed \
--region us-central1 \
--port 8080 \
--memory 512Mi \
--cpu 1 \
--concurrency 10 \
--min-instances 0 \
--max-instances 20 \
--timeout 300 \
--set-env-vars "ANTHROPIC_API_KEY=$(gcloud secrets versions access latest --secret=anthropic-key)" \
--allow-unauthenticated
# Output: Service URL: https://agent-api-xxxx-uc.a.run.app
Key flags explained:
--concurrency 10— Each instance handles at most 10 simultaneous requests. Instance 11 triggers a new instance.--min-instances 0— Scale to zero when idle (no cost).--max-instances 20— Cap at 20 instances to stay within Anthropic API rate limits.--timeout 300— Allow requests up to 5 minutes (long agent tasks).
Streaming Responses
SSE vs WebSocket: Choosing the Right Channel
SSE is the default choice for agent streaming because agent conversations are inherently server-push: the client sends one request and the server streams back many chunks. SSE rides on plain HTTP, works through most proxies and CDNs without special configuration, and automatically reconnects on drop. WebSocket adds bidirectional communication but introduces connection management complexity (heartbeats, reconnection logic, sticky sessions behind load balancers). Use WebSocket only when the client must send mid-stream signals like “cancel generation” or “inject a follow-up tool result.” Long polling — the client repeatedly asks “anything new?” — wastes bandwidth and adds latency; avoid it for agent APIs.
Claude Streaming API: stream=True
The Anthropic SDK makes streaming a one-line change. Set stream=True (Python) or call .stream() (Node) and iterate over delta events. Each event carries a type field so your server can decide what to forward to the client:
# WHAT: Stream Claude response and forward events as SSE
# WHY: Gives the user token-by-token feedback instead of a 15-second blank screen
FUNCTION stream_agent_response(user_message):
open SSE connection to client
FOR EACH event IN claude.messages.create(stream=True, ...):
IF event.type == "content_block_delta":
# Regular text token — send immediately
YIELD SSE event: { type: "token", text: event.delta.text }
ELSE IF event.type == "content_block_start" AND event.content_block.type == "tool_use":
# Agent is calling a tool — show a progress indicator
YIELD SSE event: { type: "tool_start", name: event.content_block.name }
result = EXECUTE tool(event.content_block.input)
YIELD SSE event: { type: "tool_result", data: result }
ELSE IF event.type == "message_stop":
YIELD SSE event: { type: "done" }
# GOTCHA: Set Cache-Control: no-cache and X-Accel-Buffering: no
# or proxies will buffer the entire stream and deliver it all at once.
Client-Side Consumption & Progress Indicators
On the browser side, use the native EventSource API or the fetch API with a ReadableStream reader. Parse each SSE line, switch on the event type, and update the UI: append text tokens to the message bubble, show a spinner with the tool name during tool_start, and render the final result on done. For progress indicators, map tool names to human-friendly labels: search_filings → “Searching filings…”, analyze_risk → “Analyzing risk factors…”. This turns a black-box wait into a transparent workflow the user can follow in real time.
stream=True parameter gives you delta events you can forward as typed SSE messages, and client-side progress indicators keep users engaged during multi-step agent execution.
Authentication & Authorization
API Key Authentication
The simplest approach: issue a static API key per client. The server checks the key against a lookup table and rejects unrecognized keys with a 401 Unauthorized. API keys work well for internal tools and service-to-service calls. The downside: keys don’t expire automatically, can’t carry user identity claims, and if leaked they grant full access until manually revoked.
OAuth 2.0 / JWT for User-Facing Agents
For user-facing agents, use JWT tokens issued by an OAuth 2.0 provider (Auth0, Cognito, Firebase Auth). The token is short-lived (15–60 minutes), carries the user’s identity and roles in its payload, and is verified by checking the cryptographic signature — no database round-trip required. When the token expires, the client uses a refresh token to get a new one.
Role-Based Access & Tool-Level Permissions
Not every user should trigger every agent action. A RBAC layer maps user roles to allowed tools. For example, an analyst can call search_filings and generate_report, but only an admin can call delete_filing or override_risk_score. This is enforced in middleware before the agent executes the tool:
# WHAT: Authenticate requests and enforce tool-level permissions
# WHY: Prevents unauthorized access and limits blast radius per user role
MIDDLEWARE authenticate(request):
token = EXTRACT Bearer token FROM request.headers["Authorization"]
IF token is missing: RETURN 401 "Missing credentials"
user = VERIFY token (check signature, expiry)
IF not valid: RETURN 401 "Invalid or expired token"
request.user = user
request.permissions = LOOKUP roles for user # e.g., ["analyst"]
NEXT()
# WHAT: Check if the user's role allows the requested tool
# WHY: An analyst should never trigger delete_filing even if the agent tries
MIDDLEWARE authorize_tool(tool_name, user):
allowed_tools = ROLE_TOOL_MAP[user.role]
# e.g., { "analyst": ["search_filings", "generate_report"],
# "admin": ["search_filings", "generate_report", "delete_filing"] }
IF tool_name NOT IN allowed_tools:
LOG warning: "Blocked {tool_name} for user {user.id} (role: {user.role})"
RETURN 403 "Your role cannot use {tool_name}"
NEXT()
# GOTCHA: Enforce permissions at the middleware layer, NOT inside the agent prompt.
# Prompt-level restrictions can be bypassed with prompt injection.
Rate Limiting per User
IP-based rate limiting is not enough — a single corporate IP can have hundreds of legitimate users, and a bad actor can rotate IPs. Rate limit by authenticated user ID instead: track requests per user in Redis or an in-memory store, enforce a sliding window (e.g., 20 requests per minute), and return 429 Too Many Requests with a Retry-After header when the limit is hit. For agents with expensive tool calls, consider token-based rate limiting: each user gets a daily token budget, and each request deducts the tokens consumed by the Claude API call.
Hands-On Lab: Deploy a Streaming Agent API
What You’ll Build
A streaming agent API with SSE, health checks, and a production Docker container — tested locally and ready for cloud deployment. Time estimate: 30–40 minutes.
Prerequisites
- Python 3.10+ installed
- Docker Desktop installed and running
- An Anthropic API key (set as environment variable)
Files You’ll Create
server.py— FastAPI agent API with SSE streamingrequirements.txt— Python dependenciesDockerfile— Multi-stage production image.dockerignore— Prevents secrets from entering the image
Environment Setup
mkdir agent-api-lab && cd agent-api-lab
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install fastapi uvicorn anthropic
export ANTHROPIC_API_KEY=your-key-here # Windows: set ANTHROPIC_API_KEY=your-key-here
Step 1: Create the API Server
What & Why: This is the core of your agent API. It defines three endpoints: /chat for streaming agent responses, /health for load balancer checks, and /feedback for recording user ratings. The /chat endpoint uses SSE to stream tokens as they are generated — this is what gives users the “typing” effect instead of a blank screen for 30 seconds.
Create: Create a new file called server.py and paste the complete Python server code from the Code Walkthrough section above.
ModuleNotFoundError, run pip install fastapi uvicorn anthropic. If you see a port conflict, change --port 8080 to another port like 8081.
Step 2: Test the Health Endpoint
What & Why: Before testing the streaming chat, verify the simplest endpoint works. This confirms the server is running and reachable. Open a second terminal (keep the server running in the first).
{"status":"healthy"}, your server is running correctly. If you see Connection refused, make sure the server is still running in the first terminal. If you see 404 Not Found, check that server.py has the @app.get("/health") endpoint.
Step 3: Test SSE Streaming
What & Why: Now test the main /chat endpoint. This is the core feature — streaming tokens as the agent generates them. The -N flag tells curl not to buffer the output, so you see tokens as they arrive rather than all at once.
event: token lines appearing one by one (not all at once), streaming is working correctly. If you see event: error with "rate_limited", wait 60 seconds and try again. If you see AuthenticationError, check that ANTHROPIC_API_KEY is set in the terminal running the server.
Step 4: Create the Dockerfile
What & Why: Now package your agent for production. The Dockerfile creates a portable, reproducible image that runs identically on any machine. This step also creates the .dockerignore file to keep secrets out of the image.
Create: Create a file called requirements.txt:
Create a file called .dockerignore to prevent secrets from entering the image:
Create a file called Dockerfile and paste the Python Dockerfile from the Code Walkthrough section above.
docker images agent-api:v1. If you see “Cannot connect to Docker daemon”, make sure Docker Desktop is running. If the build fails at pip install, check your requirements.txt for typos.
Step 5: Run the Container
What & Why: Stop the uvicorn server from Step 1 (Ctrl+C). Now run your agent inside the Docker container. This validates that your container image works end-to-end before deploying to the cloud. Note that this step depends on the image built in Step 4.
Important: The API key is injected as an environment variable at runtime — it is not in the image. This is the correct, secure pattern.
docker logs agent-api. Common issue: ANTHROPIC_API_KEY not passed correctly — verify with docker exec agent-api env | grep ANTHROPIC.
Verify Everything Works
Run this complete end-to-end verification:
echo "=== Testing Health ==="
curl -s http://localhost:8080/health | python -m json.tool
echo ""
echo "=== Testing Chat (first 5 events) ==="
curl -s -N -X POST http://localhost:8080/chat \
-H "Content-Type: application/json" \
-d '{"message": "Say hello in exactly 3 words."}' | head -15
echo ""
echo "=== Testing Feedback ==="
curl -s -X POST http://localhost:8080/feedback \
-H "Content-Type: application/json" \
-d '{"conversation_id": "test_001", "rating": "thumbs_up"}' | python -m json.tool
echo ""
echo "=== Docker Image Size ==="
docker images agent-api:v1 --format "Size: {{.Size}}"
echo ""
echo "All tests passed!"
docker stop agent-api && docker rm agent-api.
Knowledge Check
Test your understanding of API design and deployment concepts.
1. What is the main advantage of SSE over REST polling for agent APIs?
2. Why should API keys never be placed in Docker images?
3. An agent task takes 45 seconds (multi-step reasoning + tool calls). Which platform is the best fit?
4. Your agent API starts receiving 429 errors from the Anthropic API. What is the correct response?
5. In this architecture — Client → API Server → Claude API — what is the single point of failure?
6. When should you choose WebSocket over SSE for an agent API?
Your Score
Summary
In this module, you learned to take an agent from a local script to a production API:
- Protocol choice: SSE is the sweet spot for most agent APIs — one-way streaming over plain HTTP, with WebSocket reserved for bidirectional needs.
- Containerization: Docker packages your agent with its dependencies into a portable image. Use slim base images, cached layers, multi-stage builds, non-root users, and never bake secrets.
- Cloud deployment: Cloud Run is the sweet spot — native streaming, generous timeouts, scale-to-zero, and pay-per-use. Lambda works for short tasks; VMs for always-on workloads.
- Scaling: Scale on concurrent requests (not CPU). Use queues to absorb traffic spikes. Handle rate limits with exponential backoff, jitter, and circuit breakers.
Next in M22: Cost Optimization, you will learn to control the biggest cost driver in agent systems — API token usage — and build dashboards to track spend per conversation, per user, and per feature.