M20: Monitoring & Continuous Improvement
Build dashboards, alerts, and feedback loops that keep your agent improving every week.
This is the final module of Track 6: Observability. In M19, you learned to instrument your agent with traces and structured logs. Now you will learn to act on that data — building dashboards that surface problems, alerts that wake the right people, feedback loops that capture user satisfaction, and deployment patterns that minimize blast radius when you ship changes.
Learning Objectives
- Build production monitoring dashboards covering latency, tokens, success rates, and drift
- Configure a tiered alerting system (P1/Page, P2/Ticket, P3/Info) that prevents alert fatigue
- Implement feedback loops at three speeds: real-time, daily, and weekly
- Deploy prompt and model changes safely using canary deployments and A/B tests
- Establish a continuous improvement culture with eval datasets, failure reviews, and version-controlled prompts
Production Monitoring Dashboards
Before: Imagine a hospital where nurses check on patients by asking "How do you feel?" once an hour and writing a single word — "fine" — on a clipboard. They have no heart rate, no blood pressure, no oxygen saturation. Pain: A patient's blood pressure could spike dangerously, and no one would know until the patient collapsed. The delay between problem and detection could be fatal. Mapping: A production monitoring dashboard is that bedside vital-signs monitor for your AI agent. It displays four critical signals — latency, token burn rate, success/failure rates, and behavioral drift — all on one screen, updating in real time. When any signal crosses a threshold, alarms fire immediately, not hours later when a user complains.
Here is what a single metric snapshot looks like in practice — one JSON object your monitoring system emits every minute:
That single object powers all four dashboard quadrants. Your dashboard just reads these snapshots and plots them on charts.
A production monitoring dashboard is a real-time visualization layer that sits on top of your agent's traces and logs. It pulls raw data — timestamps, token counts, error codes — and turns them into human-readable charts and numbers.
The dashboard answers four questions at a glance. First: How fast is the agent responding? That is the latency category. Second: How much is it costing? That is token usage and cost. Third: How often is it succeeding? That is the success/failure rate. Fourth: Is its behavior changing over time? That is drift detection.
Each of those four questions maps to what we call a metric category — a group of related measurements that together paint a picture of one aspect of agent health. Think of each category as one vital sign on a hospital monitor: individually useful, but most powerful when viewed together.
The Four Metric Categories
Every production agent dashboard should have four quadrants, each tracking a different dimension of health:
- Latency (p50 / p95 / p99): Percentile latencyp50 means the median response time (50% of requests are faster). p95 means 95% of requests finish within this time. p99 captures the slowest 1% — the tail latency that your most unlucky users experience. tells you how fast your agent responds. The p50 is the typical experience; the p99 is the worst-case experience. If your p50 is 1.2s but your p99 is 15s, one in a hundred users is waiting 15 seconds.
- Token Usage & Cost: Track tokens consumed per conversation, cost per request, and your burn rateBurn rate is the rate at which you are consuming tokens (and therefore spending money) over time. A spike in burn rate often indicates a prompt regression or an infinite loop where the agent keeps calling the LLM without terminating. (tokens/minute or cost/hour). A sudden spike often means a prompt regression is causing longer outputs or more tool calls.
- Success / Failure Rates: The percentage of requests that complete successfully versus those that hit errors (API failures, guardrail blocks, timeouts, malformed output). Track both the overall rate and the rate per error type so you can distinguish between a spike in timeouts (infrastructure) versus a spike in guardrail blocks (prompt regression).
- Drift Detection: DriftDrift is a gradual, often invisible change in an agent's behavior over time. It can happen because the model provider updates their weights, because your data sources change, or because user behavior shifts. Drift detection compares current metric distributions against a baseline to catch these slow-moving changes before they become outages. measures whether your agent's behavior is slowly changing even when no code has been deployed. You track three things over rolling windows. First, output length distribution — are responses getting longer or shorter? Second, tool-call frequency — is the agent calling tools more or fewer times per request? Third, sentiment scores — is the tone of responses shifting? For example, a 20% increase in average output length over a week might indicate the model is becoming more verbose — possibly because the provider updated their model weights behind the scenes. You did not change anything, but your agent's behavior shifted anyway. That is drift, and it is dangerous precisely because nobody deployed a change to trigger an investigation.
Without a unified dashboard, teams discover problems from angry users. A study by Datadog found that organizations with real-time dashboards detect incidents 12x faster than those relying on user reports — reducing mean-time-to-detection from 45 minutes to under 4 minutes. For an AI agent handling 10,000 requests/hour, 45 minutes of undetected elevated error rates means 7,500 degraded user experiences before anyone notices.
"I can just check the dashboard once a day." — Dashboards are for real-time visibility, not daily check-ins. If you only look at your dashboard during morning standup, you will miss the 2 PM latency spike that resolved by 3 PM. Combine dashboards with automated alerts (next section) so problems find you.
"p50 latency is the number that matters." — The p50 (median) is the typical experience, but p95 and p99 reveal the experience of your most frustrated users. A p50 of 1.2s with a p99 of 25s means 1 in 100 users waits 25 seconds. Those users are the most likely to abandon your agent.
"If my success rate is above 95%, everything is fine." — A 95% overall success rate can mask a 50% failure rate for a specific question category. Always break down success rates by error type and, if possible, by query category. Aggregate numbers hide category-level disasters.
You learned the four pillars of agent monitoring: latency percentiles (how fast), token cost (how expensive), success/failure breakdown (how reliable), and drift (how stable over time). Each pillar is a quadrant on your dashboard, and together they give you a complete picture of agent health.
Alerting: Pages vs Tickets
Before: Imagine every smoke alarm in your house had the same deafening siren — whether you burned toast, left a candle too close to a curtain, or had an actual kitchen fire. Every alarm sounds identical. Pain: After the third false alarm from burnt toast, you rip the batteries out. Now you have no protection against real fires. This is exactly what happens with alert fatigueAlert fatigue occurs when engineers receive so many alerts (most of them low-priority or false positives) that they start ignoring ALL alerts, including critical ones. It is the leading cause of delayed incident response in production systems. The cure is strict severity tiering and threshold tuning. in production systems. Mapping: A tiered alerting system is like having three types of alarms: a gentle chime for burnt toast (P3/Info — check it next week), a loud beep for a suspicious candle (P2/Ticket — fix it tomorrow morning), and a full siren for an actual fire (P1/Page — wake someone up NOW).
Here is what an alert rule definition actually looks like in a configuration file. Notice how each rule specifies its severity, threshold, and routing destination:
Tiered alerting is a system that classifies every alert into a severity level and routes it to the appropriate response channel. The goal is to ensure that critical problems get immediate human attention while non-urgent issues are batched for efficient review. The three standard tiers are:
- P1 / Page: Conditions that require immediate human response, typically within minutes. Examples: error rate exceeds 50%, data breach detected, agent is completely unresponsive. Delivery: PagerDuty, phone call, SMS. Wakes people up at night.
- P2 / Ticket: Conditions that need attention during the next business day. Examples: error rate climbs above 10%, p95 latency exceeds 30 seconds, cost anomaly detected. Delivery: Jira ticket, Slack channel alert.
- P3 / Info: Trends and observations that should be reviewed periodically. Examples: drift detected in output length, gradual increase in token usage, new error types appearing at low frequency. Delivery: weekly email digest, dashboard annotation.
Many teams set all thresholds too aggressively at launch ("alert me if error rate exceeds 1%"). This creates a flood of P1 pages that erodes trust in the alerting system. Start with conservative thresholds (50% for P1, 10% for P2) and tighten them gradually as your system stabilizes. It is always safer to have a few missed low-severity alerts than to train your team to ignore all alerts.
You learned the three alert tiers: P1/Page for emergencies requiring immediate response, P2/Ticket for next-business-day issues, and P3/Info for weekly trend reviews. The key insight is that alert fatigue — caused by too many noisy alerts — is more dangerous than missing a P3, because it causes teams to ignore P1 pages too.
Feedback Loops
Before: Imagine a restaurant that serves hundreds of meals per day but never reads its online reviews, never asks diners if they enjoyed the food, and never tracks which dishes get sent back to the kitchen. The chef cooks the same menu year after year, convinced everything is great. Pain: Slowly, customers stop coming. The chef has no idea that the risotto has been underseasoned for months, or that a new competitor across the street is doing it better. By the time revenue drops enough to notice, the restaurant's reputation is ruined. Mapping: An AI agent without feedback loops is that restaurant. Users may be getting subtly wrong answers, hitting confusing edge cases, or finding the agent unhelpful — but without systematic feedback collection, the team has no signal to improve. Feedback loops at three speeds (real-time, daily, weekly) ensure you catch problems at every timescale.
Here is what a single feedback record looks like when a user clicks the thumbs-down button. This JSON gets attached to the trace in your observability platform:
And here is what the daily aggregation report looks like — a script groups the previous 24 hours of negative feedback by failure category:
A feedback loop is a system that collects signals about agent performance, aggregates them, and routes them to processes that improve the agent. The three speeds are:
- Real-time (seconds): Thumbs up/down buttons on every agent response. Each click is recorded as a score attached to the trace in your observability platform. This gives you immediate signal on individual responses.
- Daily (hours): Automated aggregation of failures from the previous 24 hours. A script groups failures by error type, extracts representative examples, and posts a summary to a Slack channel or dashboard. Engineers triage the top failures each morning.
- Weekly (days): A scheduled process that reviews the week's low-scoring responses, selects the most informative ones, and adds them to your evaluation dataset. This is how your test suite grows organically from real production failures.
thumbs up/down
daily failures
fix prompts/tools
weekly dataset
Teams that implement all three feedback speeds see a 40-60% reduction in repeated failures within the first month. The real-time signal catches acute problems (a broken tool, a bad prompt change), the daily aggregation surfaces systemic issues (a category of questions the agent consistently fumbles), and the weekly eval growth prevents regressions by encoding every fix as a test case. Without the weekly loop, you fix bugs but never prove they stay fixed.
"We only need real-time feedback — daily and weekly are redundant." — Real-time feedback (thumbs down) tells you something went wrong, but not why. Daily aggregation reveals patterns ("23% of failures are wrong-order errors"), and weekly eval growth encodes fixes as permanent test cases. Each speed answers a different question: what broke, what pattern keeps breaking, and did we actually fix it?
"Low feedback response rate means the data is useless." — Even a 5% response rate on 10,000 daily requests yields 500 data points per day. At that volume, failure category patterns become statistically meaningful within a single day. The trick is aggregation and classification, not raw volume.
You learned the three speeds of feedback: real-time thumbs up/down for immediate signal, daily failure aggregation for triage, and weekly eval dataset growth for long-term quality. The circular nature is critical — each improvement becomes a test case, which validates future changes, which generates new feedback.
A/B Testing & Canary Deployments
Before: Imagine an airline that develops a new, more efficient jet engine. Instead of testing it on one plane first, they ground the entire fleet overnight and replace every engine on every aircraft simultaneously. Pain: If the new engine has a defect, every single plane is affected. There are no working aircraft to fall back on. Passengers are stranded, and the airline faces a catastrophic safety incident. Mapping: Deploying a new prompt or model version to 100% of your agent's traffic is the same gamble. A canary deploymentA canary deployment sends a small percentage of traffic (typically 1-5%) to the new version while the remaining traffic continues to use the current stable version. If the canary shows elevated errors or degraded metrics, traffic is automatically rolled back. The name comes from the "canary in a coal mine" — a small, early-warning signal of danger. sends just 1-5% of traffic to the new version. If the canary shows problems, you roll back instantly with minimal user impact. If it looks healthy, you gradually increase traffic to 10%, 25%, 50%, and finally 100%.
Here is what a canary deployment configuration actually looks like. This JSON controls the traffic split and the auto-rollback thresholds:
A canary deployment is a risk-mitigation strategy: send a tiny percentage of traffic (typically 1-5%) to the new version, monitor for errors, and auto-rollback if metrics degrade. Its goal is safety — you are asking "does this change break anything?" before exposing it to all users.
An A/B test is an experiment: send a statistically meaningful split of traffic (often 50/50) to two versions and measure which one performs better on a specific metric such as user satisfaction, task completion rate, or cost per conversation. Its goal is learning — you are asking "which version is better?" The key difference is intent: a canary protects users from bad changes, while an A/B test generates data to inform future decisions.
In practice, you often start with a canary (1-5% to check for crashes and regressions) and then expand to an A/B test (50/50 to measure quality). If you skip the canary phase and go straight to 50/50, you risk exposing half your users to a broken change. If you only run canaries and never A/B test, you know your changes are safe but never know whether they are actually better. The two patterns are complementary, not interchangeable.
Traditional A/B tests in web development might need 1,000 samples to reach statistical significance. Agent A/B tests often need 3-5x more samples because LLM outputs are non-deterministic — the same input can produce different outputs on consecutive calls. This variance inflates confidence intervals. Plan for larger sample sizes and longer test durations than you would for a button-color test.
Canary Deployment Flow
- Deploy canary: Route 1-5% of traffic to the new version
- Monitor: Compare error rate, latency p95, and user feedback between canary and control
- Auto-rollback gate: If canary error rate exceeds control by more than 5 percentage points, automatically roll back
- Promote: If canary passes for the soak period (1-4 hours), increase traffic to 25%, then 50%, then 100%
You learned the difference between a canary deployment (safety — 1-5% traffic, auto-rollback) and an A/B test (learning — 50/50 split, measure quality). You also learned that agent A/B tests need more samples than traditional web tests because LLM outputs are non-deterministic. The key pattern: always start with a canary before expanding to a full A/B test.
Continuous Improvement Culture
Before: Imagine a professional basketball team that plays games but never watches game film afterward. They never analyze which plays worked, which defensive rotations failed, or which opponent strategies surprised them. They just show up for the next game and hope for the best. Pain: They keep making the same mistakes — turning the ball over against full-court presses, missing the same defensive switch. Teams that do review film identify these patterns and drill fixes. Over a season, the gap between film-reviewers and non-reviewers becomes enormous. Mapping: Continuous improvement for AI agents follows the same playbook: systematically review failures (game film), identify patterns (scouting report), implement fixes (practice), and verify them with evals (scrimmage). Teams that do this weekly compound improvements; teams that only react to crises plateau.
Here is what a weekly improvement ticket looks like in practice. This is the kind of structured record your team creates during each Friday review session:
The Five Pillars of Continuous Improvement
Continuous improvement is the practice of systematically making your agent better over time through structured processes, not ad-hoc heroics. Unlike traditional software where you ship a feature and move on, an AI agent's quality is a moving target — user expectations shift, data changes, and model providers update their weights. Without a repeatable improvement process, your agent's quality will degrade even if you never change a line of code.
How does it work internally? The process is a closed loop with five stages. Each week, you run automated scoring on production responses to measure quality. You pull the worst-performing responses and have humans classify the failures. Those classified failures become new test cases in your evaluation dataset. Your team then fixes the underlying issues — prompt edits, tool improvements, or guardrail changes — and verifies the fixes pass the new eval cases. Finally, you deploy the fixes via canary and track whether eval scores actually improved. The loop then repeats.
If you have worked in traditional software engineering, continuous improvement might sound like "just fixing bugs." The difference is scale and non-determinism. In traditional software, a bug is deterministic: the same input always produces the same wrong output. In an AI agent, the same input might produce the right answer 90% of the time and the wrong answer 10% of the time. You cannot just "fix the bug" — you need to improve the probability distribution. That requires statistical thinking. First, you track scores over time to spot trends. Second, you run evaluations on representative datasets — not just a handful of cherry-picked examples. Third, you measure whether your changes moved the needle across hundreds of test cases, not just one. In other words, you are managing a probability distribution, not patching a single code path.
- Auto-Scoring: Run automated evaluations (from M18) on every production response, not just test data. Score for correctness, format compliance, and safety. This turns production into a continuous evaluation environment.
- Weekly Failure Review: Every week, pull the lowest-scoring 5% of production responses. Have a human review them and classify failures: was it a prompt problem, a tool problem, a data problem, or a model limitation? This classification drives targeted fixes.
- Growing Eval Datasets: Every classified failure becomes a new test case. If the agent mishandled a shipping-status question, that exact input/expected-output pair gets added to the eval suite. Over time, your eval dataset becomes a comprehensive map of every failure mode your agent has encountered.
- Version-Controlled Prompts: Store prompts in version control (Git) alongside your code. Every prompt change gets a PR, a review, and an eval run before merging. This prevents "prompt drift" — where ad-hoc edits accumulate without documentation.
- Improvement Velocity Tracking: Track how quickly you close the loop: time from failure detection to fix deployed. Measure weekly eval score trends. The goal is not perfection — it is consistent, measurable improvement.
Teams that implement weekly failure reviews and growing eval datasets see their agent's eval scores improve by 2-5 percentage points per month. Over six months, that is a 12-30 point improvement — often the difference between an agent that users tolerate and one they love. The compounding effect is powerful: each fix prevents a class of failures, which means the team can focus on progressively harder problems instead of re-fighting the same battles.
"Once my agent works, I don't need monitoring — it's just an API call." — An LLM-based agent is fundamentally different from a traditional API. The model provider can update weights at any time, changing your agent's behavior without any code change on your side. User input patterns shift seasonally. Data sources go stale. Without monitoring, these changes are invisible until users start complaining — or worse, silently leaving.
"More alerts = better monitoring." — The opposite is true. Alert fatigue is the number one killer of effective monitoring. A team with 50 noisy alerts that all get ignored has worse monitoring than a team with 5 well-tuned alerts that always get acted on. Quality of alerts matters far more than quantity.
"User feedback is too noisy to be useful." — Individual thumbs-down clicks are noisy, yes. But when you aggregate hundreds of them daily and classify them by failure type, clear patterns emerge. The trick is aggregation and classification, not ignoring the signal. Even a 5% feedback response rate gives you thousands of data points per month at scale.
"We can just re-run our eval suite instead of monitoring production." — Eval suites test known scenarios with fixed inputs. Production traffic contains inputs your eval suite has never seen — novel phrasings, edge cases, adversarial probes. Eval suites tell you whether known problems are fixed; production monitoring tells you whether unknown problems exist. You need both.
"Canary deployments are overkill for prompt changes — it's just text." — A single word change in a system prompt can completely alter an agent's behavior for certain input categories. We have seen a prompt change that improved overall quality by 3% but caused a 40% regression for a specific question type. Without a canary phase, that regression would have hit 100% of users for hours before anyone noticed.
You learned the five pillars of continuous improvement: auto-scoring production responses, weekly failure reviews, growing eval datasets from real failures, version-controlling prompts, and tracking improvement velocity. The key insight is that improvement is a process, not an event — and the process must be systematic, not ad-hoc.
Agent Versioning and Rollback
When you update a system prompt, swap a tool implementation, or change routing logic, your agent's behavior shifts — sometimes in ways that only surface under real traffic. Agent versioning treats every combination of prompt text, tool configuration, and model parameters as an immutable, tagged release, just like a software version. This gives you a concrete target to roll back to when something goes wrong.
Canary Deployments for Agents
Before promoting a new agent version to all users, route a small slice of traffic — typically 5% — to the candidate version while the remaining 95% stays on the proven stable release. Monitor the canary's error rate, latency, and eval scores in real time. If any metric crosses a threshold (for example, error rate exceeds 3% or eval score drops below 85%), an automated rollback sends 100% of traffic back to the stable version within seconds. Only after the canary survives a soak period — usually 30–60 minutes of clean metrics — do you gradually ramp it to 25%, 50%, and finally 100%.
Feature Flags for Agent Behavior
Feature flags let you toggle individual capabilities without redeploying. You can enable a new tool for 10% of requests, activate a revised system prompt section for internal testers only, or disable a problematic guardrail instantly. Flags decouple deployment from release: the code is live in production, but the behavior is gated behind a flag you flip in your configuration store. This is especially powerful for agents because a single prompt clause can dramatically change output quality for certain input categories.
Rollback Strategy and Version Tagging
Every deployment should be tagged with a version identifier (for example, v2.4.1) that is recorded alongside its eval results in your observability platform. When you detect a regression, you do not debug under pressure — you revert to the last known-good tag and investigate later. Store the mapping of version → eval scores → deployment timestamp so that any team member can answer "what changed?" in under a minute. The rollback itself should be a single command or API call, not a manual process.
Without versioning, a bad prompt edit can silently degrade your agent for hours before anyone notices. With tagged versions and automated canary gates, the blast radius of a bad change shrinks from "every user for hours" to "5% of users for minutes." Teams that adopt this pattern report 70% fewer production incidents caused by agent updates.
You learned three interlocking practices: canary deployments limit exposure to new versions, feature flags give you fine-grained control over agent behavior, and version tagging tied to eval results lets you roll back confidently. Together, they make agent updates as safe and reversible as traditional software releases.
Code Walkthrough
Let us build the monitoring, feedback, alerting, and A/B testing systems we have been discussing. All examples use Langfuse as the observability backend.
1. Metric Collection with Langfuse Scores
Let's start with the foundation: recording metrics. Every chart on your dashboard is ultimately an aggregation of individual scores — latency numbers, token counts, success/failure flags — attached to each trace. The function below wraps a standard Claude API call and records all of these as Langfuse scores after each response. The critical design decision here is the try/except around the score-recording block. Why? Because an observability outage should never break your agent. If Langfuse is down, your users should still get answers — you just lose visibility for that period. This is a principle you will see throughout production monitoring: the monitoring system is a passenger, not the driver.
import time
import anthropic
from langfuse import Langfuse
langfuse = Langfuse()
client = anthropic.Anthropic()
def run_agent_with_metrics(user_message: str, trace_id: str) -> dict:
"""Run the agent and record production metrics as Langfuse scores."""
trace = langfuse.trace(id=trace_id, name="agent-request")
start = time.time()
try:
# --- LLM Call ---
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
)
latency_ms = (time.time() - start) * 1000
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# --- Record Metrics as Scores ---
# WHAT: Attach numeric scores to the trace for dashboard aggregation.
# WHY: Langfuse scores power dashboard charts, alerting thresholds, and drift detection.
# GOTCHA: Use try/except so a Langfuse outage never crashes your agent.
try:
trace.score(name="latency_ms", value=latency_ms)
trace.score(name="input_tokens", value=input_tokens)
trace.score(name="output_tokens", value=output_tokens)
trace.score(name="total_tokens", value=input_tokens + output_tokens)
trace.score(name="success", value=1)
# Cost estimation (Claude Sonnet pricing as of 2025)
cost = (input_tokens * 3.0 / 1_000_000) + (output_tokens * 15.0 / 1_000_000)
trace.score(name="cost_usd", value=round(cost, 6))
except Exception as obs_err:
# Observability failure must NEVER break the agent
print(f"[WARN] Failed to record metrics: {obs_err}")
return {
"response": response.content[0].text,
"latency_ms": round(latency_ms, 1),
"tokens": input_tokens + output_tokens,
"cost_usd": round(cost, 6),
}
except Exception as e:
latency_ms = (time.time() - start) * 1000
try:
trace.score(name="latency_ms", value=latency_ms)
trace.score(name="success", value=0)
trace.score(name="error_type", value=0, comment=str(type(e).__name__))
except Exception:
pass
raise
import Anthropic from "@anthropic-ai/sdk";
import { Langfuse } from "langfuse";
const langfuse = new Langfuse();
const anthropic = new Anthropic();
async function runAgentWithMetrics(userMessage, traceId) {
/** Run the agent and record production metrics as Langfuse scores. */
const trace = langfuse.trace({ id: traceId, name: "agent-request" });
const start = Date.now();
try {
// --- LLM Call ---
const response = await anthropic.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: userMessage }],
});
const latencyMs = Date.now() - start;
const inputTokens = response.usage.input_tokens;
const outputTokens = response.usage.output_tokens;
// --- Record Metrics as Scores ---
// WHAT: Attach numeric scores to the trace for dashboard aggregation.
// WHY: Langfuse scores power dashboard charts and alerting thresholds.
// GOTCHA: Wrap in try/catch so observability failures never crash the agent.
try {
trace.score({ name: "latency_ms", value: latencyMs });
trace.score({ name: "input_tokens", value: inputTokens });
trace.score({ name: "output_tokens", value: outputTokens });
trace.score({ name: "total_tokens", value: inputTokens + outputTokens });
trace.score({ name: "success", value: 1 });
const cost = (inputTokens * 3.0) / 1_000_000 + (outputTokens * 15.0) / 1_000_000;
trace.score({ name: "cost_usd", value: Math.round(cost * 1_000_000) / 1_000_000 });
} catch (obsErr) {
console.warn(`[WARN] Failed to record metrics: ${obsErr.message}`);
}
return {
response: response.content[0].text,
latencyMs,
tokens: inputTokens + outputTokens,
};
} catch (err) {
const latencyMs = Date.now() - start;
try {
trace.score({ name: "latency_ms", value: latencyMs });
trace.score({ name: "success", value: 0 });
trace.score({ name: "error_type", value: 0, comment: err.constructor.name });
} catch (_) {}
throw err;
}
}
You instrumented an agent call to record latency, token count, cost, and success/failure as Langfuse scores. Every score is attached to a trace, making it queryable for dashboards and alerting. The try/except wrapper ensures observability never breaks the agent.
2. User Feedback Endpoint
Next, we need a way for users to tell us when the agent got it wrong (or right). This REST endpoint accepts thumbs up/down clicks and attaches them to the original trace in Langfuse. User feedback is the most valuable signal you can collect — metrics can tell you that latency spiked, but only a human can tell you "this answer was completely wrong even though it returned in 1.2 seconds." One subtle but important detail: notice the trace ID validation with a regex pattern. Without this, an attacker could POST fake scores to your observability platform, poisoning your dashboard data. Always validate external input, even for feedback endpoints.
from flask import Flask, request, jsonify
from langfuse import Langfuse
import re
app = Flask(__name__)
langfuse = Langfuse()
# WHAT: Validate trace IDs to prevent injection of fake feedback.
# WHY: Without validation, anyone could POST fake scores to manipulate your metrics.
TRACE_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{8,64}$")
@app.route("/api/feedback", methods=["POST"])
def submit_feedback():
"""Accept user thumbs-up/down feedback and attach to the trace."""
data = request.get_json()
# --- Input Validation ---
trace_id = data.get("trace_id", "")
score = data.get("score") # 1 = thumbs up, 0 = thumbs down
comment = data.get("comment", "")
if not TRACE_ID_PATTERN.match(trace_id):
return jsonify({"error": "Invalid trace_id format"}), 400
if score not in (0, 1):
return jsonify({"error": "Score must be 0 or 1"}), 400
if len(comment) > 500:
return jsonify({"error": "Comment too long (max 500 chars)"}), 400
# --- Record Feedback ---
# WHAT: Attach user feedback as a score on the original trace.
# WHY: This links subjective quality signal to the full trace context.
# GOTCHA: Always record the feedback even if you suspect it is spam —
# filter during analysis, not collection.
try:
langfuse.score(
trace_id=trace_id,
name="user_feedback",
value=score,
comment=comment[:500] if comment else None,
)
return jsonify({"status": "recorded"}), 200
except Exception as e:
# Log but don't expose internal errors to the client
app.logger.error(f"Failed to record feedback: {e}")
return jsonify({"status": "recorded"}), 200 # Still return 200
import express from "express";
import { Langfuse } from "langfuse";
const app = express();
app.use(express.json());
const langfuse = new Langfuse();
const TRACE_ID_PATTERN = /^[a-zA-Z0-9_-]{8,64}$/;
app.post("/api/feedback", async (req, res) => {
/** Accept user thumbs-up/down feedback and attach to the trace. */
const { trace_id, score, comment = "" } = req.body;
// --- Input Validation ---
if (!trace_id || !TRACE_ID_PATTERN.test(trace_id)) {
return res.status(400).json({ error: "Invalid trace_id format" });
}
if (score !== 0 && score !== 1) {
return res.status(400).json({ error: "Score must be 0 or 1" });
}
if (comment.length > 500) {
return res.status(400).json({ error: "Comment too long (max 500 chars)" });
}
// --- Record Feedback ---
// WHAT: Attach user feedback as a score on the original trace.
// WHY: This links subjective quality signal to the full trace context.
// GOTCHA: Always record the feedback — filter spam during analysis, not collection.
try {
langfuse.score({
traceId: trace_id,
name: "user_feedback",
value: score,
comment: comment.slice(0, 500) || undefined,
});
return res.json({ status: "recorded" });
} catch (err) {
console.error(`Failed to record feedback: ${err.message}`);
return res.json({ status: "recorded" }); // Still return 200
}
});
app.listen(3000, () => console.log("Feedback API on :3000"));
3. Alerting Module with Configurable Thresholds
Now let's build the alerting system that turns those metrics into actionable notifications. The key architectural choice here is separating the alert rules from the alert routing. Each rule is a small object with a name, a severity level, a condition function, and a message template. The evaluate_alerts function loops through all rules, checks which ones fire against the current metrics, and returns a list of fired alerts. The route_alert function then sends each alert to the right channel based on severity. Why separate them? Because you will be adding and tuning alert rules constantly as your system matures — and you do not want to touch the routing code every time you adjust a threshold. One subtle gotcha: in production, you would also add hysteresisHysteresis is the practice of requiring an alert condition to persist for N consecutive checks before firing. It prevents one-off spikes from generating noise alerts. The opposite problem — an alert that fires and clears repeatedly — is called "flapping.". Hysteresis means requiring a condition to persist for N consecutive checks before firing an alert. Think of it like a smoke detector that waits 10 seconds of continuous smoke before sounding — a brief puff from opening the oven does not trigger it, but a real fire does. Without hysteresis, a 2-second latency spike can trigger a P2 ticket that resolves before anyone reads it. That flapping erodes trust in your alerting system. In practice, you would add a consecutive_failures counter to each rule and only fire when it reaches a threshold like 3 or 5.
from dataclasses import dataclass
from enum import Enum
from typing import Callable
class Severity(Enum):
P1_PAGE = "P1" # Immediate: PagerDuty / phone call
P2_TICKET = "P2" # Next business day: Jira ticket
P3_INFO = "P3" # Weekly review: dashboard annotation
@dataclass
class AlertRule:
name: str
severity: Severity
check: Callable[[dict], bool] # Returns True if alert should fire
message_template: str
# WHAT: Define alert rules with severity and threshold.
# WHY: Separating rules from routing makes it easy to add/tune alerts.
# GOTCHA: Start with CONSERVATIVE thresholds. Tighten over time.
ALERT_RULES = [
AlertRule(
name="high_error_rate",
severity=Severity.P1_PAGE,
check=lambda m: m.get("error_rate", 0) > 0.50,
message_template="CRITICAL: Error rate {error_rate:.1%} exceeds 50%",
),
AlertRule(
name="elevated_error_rate",
severity=Severity.P2_TICKET,
check=lambda m: 0.10 < m.get("error_rate", 0) <= 0.50,
message_template="WARNING: Error rate {error_rate:.1%} exceeds 10%",
),
AlertRule(
name="high_latency",
severity=Severity.P2_TICKET,
check=lambda m: m.get("p95_latency_s", 0) > 30,
message_template="WARNING: p95 latency {p95_latency_s:.1f}s exceeds 30s",
),
AlertRule(
name="cost_anomaly",
severity=Severity.P2_TICKET,
check=lambda m: m.get("cost_per_hour", 0) > 50,
message_template="WARNING: Cost ${cost_per_hour:.2f}/hr exceeds $50/hr",
),
AlertRule(
name="drift_detected",
severity=Severity.P3_INFO,
check=lambda m: abs(m.get("output_length_drift_pct", 0)) > 15,
message_template="INFO: Output length drifted {output_length_drift_pct:+.1f}%",
),
]
def evaluate_alerts(metrics: dict) -> list[dict]:
"""Evaluate all alert rules against current metrics. Return fired alerts."""
fired = []
for rule in ALERT_RULES:
try:
if rule.check(metrics):
fired.append({
"name": rule.name,
"severity": rule.severity.value,
"message": rule.message_template.format(**metrics),
})
except Exception as e:
# Never let a broken alert rule crash the alerting system
print(f"[WARN] Alert rule '{rule.name}' failed: {e}")
return fired
def route_alert(alert: dict):
"""Route alert to the appropriate channel based on severity."""
sev = alert["severity"]
if sev == "P1":
send_pagerduty(alert["message"]) # Wake someone up
elif sev == "P2":
create_jira_ticket(alert["message"]) # Next business day
elif sev == "P3":
annotate_dashboard(alert["message"]) # Weekly review
# Placeholder implementations
def send_pagerduty(msg): print(f"[PAGERDUTY] {msg}")
def create_jira_ticket(msg): print(f"[JIRA] {msg}")
def annotate_dashboard(msg): print(f"[DASHBOARD] {msg}")
// Alert severity levels
const Severity = { P1_PAGE: "P1", P2_TICKET: "P2", P3_INFO: "P3" };
// WHAT: Define alert rules with severity and threshold.
// WHY: Separating rules from routing makes it easy to add/tune alerts.
// GOTCHA: Start with CONSERVATIVE thresholds. Tighten over time.
const ALERT_RULES = [
{
name: "high_error_rate",
severity: Severity.P1_PAGE,
check: (m) => (m.error_rate ?? 0) > 0.5,
messageTemplate: (m) =>
`CRITICAL: Error rate ${(m.error_rate * 100).toFixed(1)}% exceeds 50%`,
},
{
name: "elevated_error_rate",
severity: Severity.P2_TICKET,
check: (m) => (m.error_rate ?? 0) > 0.1 && (m.error_rate ?? 0) <= 0.5,
messageTemplate: (m) =>
`WARNING: Error rate ${(m.error_rate * 100).toFixed(1)}% exceeds 10%`,
},
{
name: "high_latency",
severity: Severity.P2_TICKET,
check: (m) => (m.p95_latency_s ?? 0) > 30,
messageTemplate: (m) =>
`WARNING: p95 latency ${m.p95_latency_s.toFixed(1)}s exceeds 30s`,
},
{
name: "cost_anomaly",
severity: Severity.P2_TICKET,
check: (m) => (m.cost_per_hour ?? 0) > 50,
messageTemplate: (m) =>
`WARNING: Cost $${m.cost_per_hour.toFixed(2)}/hr exceeds $50/hr`,
},
{
name: "drift_detected",
severity: Severity.P3_INFO,
check: (m) => Math.abs(m.output_length_drift_pct ?? 0) > 15,
messageTemplate: (m) =>
`INFO: Output length drifted ${m.output_length_drift_pct > 0 ? "+" : ""}${m.output_length_drift_pct.toFixed(1)}%`,
},
];
function evaluateAlerts(metrics) {
/** Evaluate all alert rules against current metrics. */
const fired = [];
for (const rule of ALERT_RULES) {
try {
if (rule.check(metrics)) {
fired.push({
name: rule.name,
severity: rule.severity,
message: rule.messageTemplate(metrics),
});
}
} catch (err) {
console.warn(`[WARN] Alert rule '${rule.name}' failed: ${err.message}`);
}
}
return fired;
}
function routeAlert(alert) {
/** Route alert to the appropriate channel based on severity. */
if (alert.severity === "P1") sendPagerDuty(alert.message);
else if (alert.severity === "P2") createJiraTicket(alert.message);
else if (alert.severity === "P3") annotateDashboard(alert.message);
}
// Placeholder implementations
function sendPagerDuty(msg) { console.log(`[PAGERDUTY] ${msg}`); }
function createJiraTicket(msg) { console.log(`[JIRA] ${msg}`); }
function annotateDashboard(msg) { console.log(`[DASHBOARD] ${msg}`); }
You built a configurable alerting module that evaluates metrics against threshold-based rules and routes alerts to severity-appropriate channels. The key design choices: rules are separated from routing (easy to add new rules), thresholds start conservative (avoid alert fatigue), and broken rules never crash the system (try/except on each rule).
4. Simple A/B Traffic Router
The final piece is a traffic router that decides which version of your agent each user sees. The clever trick here is using a hash of the user ID instead of random assignment. Here is the dilemma: if you use random(), the same user might see the canary version on one request and the control version on the next. That creates a jarring experience ("why did the agent's personality just change?") and makes your A/B test results unreliable — you are measuring version differences but also injecting noise from inconsistent assignment. By hashing the user ID with SHA-256 and taking the result modulo 100, you get a stable number between 0 and 99 for each user. User "alice-123" will always get the same number, so she always sees the same variant. No database needed, no session state, and it works across multiple servers.
import hashlib
from dataclasses import dataclass
@dataclass
class PromptVariant:
name: str
system_prompt: str
model: str
# WHAT: Define the control and canary prompt variants.
# WHY: Explicit variant definitions make it clear what changed.
VARIANTS = {
"control": PromptVariant(
name="v2.3-stable",
system_prompt="You are a helpful customer support agent. Be concise.",
model="claude-sonnet-4-6",
),
"canary": PromptVariant(
name="v2.4-canary",
system_prompt="You are a helpful customer support agent. Be concise and empathetic. "
"Acknowledge the customer's frustration before solving.",
model="claude-sonnet-4-6",
),
}
def get_variant(user_id: str, canary_pct: int = 5) -> PromptVariant:
"""Deterministically assign user to control or canary group.
Uses a hash of user_id so the same user always gets the same variant.
canary_pct: percentage of traffic routed to canary (1-50).
"""
# WHAT: Hash the user ID to a number between 0-99.
# WHY: Deterministic — same user always gets same variant.
# GOTCHA: Don't use random() — it gives inconsistent UX.
hash_val = int(hashlib.sha256(user_id.encode()).hexdigest(), 16) % 100
if hash_val < canary_pct:
variant = VARIANTS["canary"]
else:
variant = VARIANTS["control"]
return variant
# Usage
variant = get_variant("user-12345", canary_pct=5)
print(f"User assigned to: {variant.name}")
# Output: User assigned to: v2.3-stable (or v2.4-canary for ~5% of users)
import { createHash } from "crypto";
// WHAT: Define the control and canary prompt variants.
// WHY: Explicit variant definitions make it clear what changed.
const VARIANTS = {
control: {
name: "v2.3-stable",
systemPrompt: "You are a helpful customer support agent. Be concise.",
model: "claude-sonnet-4-6",
},
canary: {
name: "v2.4-canary",
systemPrompt:
"You are a helpful customer support agent. Be concise and empathetic. " +
"Acknowledge the customer's frustration before solving.",
model: "claude-sonnet-4-6",
},
};
function getVariant(userId, canaryPct = 5) {
/**
* Deterministically assign user to control or canary group.
* Uses a hash of userId so the same user always gets the same variant.
*/
// WHAT: Hash the user ID to a number between 0-99.
// WHY: Deterministic — same user always gets same variant.
// GOTCHA: Don't use Math.random() — it gives inconsistent UX.
const hash = createHash("sha256").update(userId).digest("hex");
const hashVal = parseInt(hash.slice(0, 8), 16) % 100;
return hashVal < canaryPct ? VARIANTS.canary : VARIANTS.control;
}
// Usage
const variant = getVariant("user-12345", 5);
console.log(`User assigned to: ${variant.name}`);
// Output: User assigned to: v2.3-stable (or v2.4-canary for ~5% of users)
You built a deterministic A/B traffic router using SHA-256 hashing. The same user always gets the same variant (consistent UX), no state storage is needed, and the canary percentage is easily adjustable. This router powers both canary deployments (1-5%) and full A/B tests (50/50) by simply changing the percentage parameter.
Hands-On Lab: Build a Monitoring & Feedback Pipeline
What You'll Build
A complete monitoring pipeline that collects agent metrics, evaluates alert rules, records user feedback, and routes traffic via A/B testing — all running locally with mock data so you do not need a Langfuse account. Time estimate: 30-40 minutes.
Prerequisites: Python 3.9+ installed, a terminal, and a text editor. No API keys required for this lab (we use mock data).
Files You'll Create:
metrics.py— Metric collection and mock agent runneralerts.py— Alert rule evaluation and routingfeedback.py— Feedback collection and daily aggregationrouter.py— A/B traffic router with hash-based assignmentrun_pipeline.py— End-to-end pipeline that ties everything together
Environment Setup
# No external dependencies needed — this lab uses only the Python standard library
mkdir monitoring-lab && cd monitoring-lab
Step 1: Create the Metric Collector
Let's start with the foundation: getting raw numbers out of your agent. Every dashboard chart, every alert rule, and every drift detector is ultimately powered by individual metric records — one record per agent call, capturing latency, token counts, cost, and whether the call succeeded or failed. In production, you would send these to Langfuse or Datadog. Here, we store them in a Python list so you can inspect them directly and feed them into the alerting module in Step 2.
Create a new file called metrics.py:
"""metrics.py — Metric collection for the monitoring lab."""
import time
import random
import uuid
from dataclasses import dataclass, field
from typing import List
@dataclass
class MetricRecord:
trace_id: str
timestamp: float
latency_ms: float
input_tokens: int
output_tokens: int
success: bool
error_type: str = ""
cost_usd: float = 0.0
# In-memory store (replace with Langfuse in production)
metric_store: List[MetricRecord] = []
def simulate_agent_call(user_message: str) -> MetricRecord:
"""Simulate an agent call and record metrics.
In production, this wraps a real Claude API call.
Here we use random values to simulate realistic behavior.
"""
trace_id = f"tr_{uuid.uuid4().hex[:8]}"
start = time.time()
# Simulate varying latency (800ms - 8000ms)
latency_ms = random.gauss(1500, 500)
latency_ms = max(300, latency_ms) # Floor at 300ms
time.sleep(latency_ms / 10000) # Brief sleep to simulate work
# Simulate token usage
input_tokens = random.randint(50, 300)
output_tokens = random.randint(100, 800)
# Simulate 5% error rate
success = random.random() > 0.05
error_type = ""
if not success:
error_type = random.choice(["timeout", "guardrail_block", "api_error"])
# Cost estimation (Claude Sonnet pricing)
cost_usd = (input_tokens * 3.0 / 1_000_000) + (output_tokens * 15.0 / 1_000_000)
record = MetricRecord(
trace_id=trace_id,
timestamp=time.time(),
latency_ms=round(latency_ms, 1),
input_tokens=input_tokens,
output_tokens=output_tokens,
success=success,
error_type=error_type,
cost_usd=round(cost_usd, 6),
)
metric_store.append(record)
return record
def get_summary_metrics() -> dict:
"""Aggregate stored metrics into summary statistics."""
if not metric_store:
return {}
latencies = sorted([m.latency_ms for m in metric_store])
total = len(metric_store)
errors = [m for m in metric_store if not m.success]
def percentile(data, pct):
idx = int(len(data) * pct / 100)
return data[min(idx, len(data) - 1)]
# Group errors by type
error_breakdown = {}
for e in errors:
error_breakdown[e.error_type] = error_breakdown.get(e.error_type, 0) + 1
return {
"total_requests": total,
"error_rate": len(errors) / total if total else 0,
"error_breakdown": error_breakdown,
"p50_latency_ms": percentile(latencies, 50),
"p95_latency_ms": percentile(latencies, 95),
"p99_latency_ms": percentile(latencies, 99),
"p95_latency_s": percentile(latencies, 95) / 1000,
"avg_tokens": sum(m.input_tokens + m.output_tokens for m in metric_store) / total,
"total_cost_usd": sum(m.cost_usd for m in metric_store),
"cost_per_hour": sum(m.cost_usd for m in metric_store) * 3600 / max(1, metric_store[-1].timestamp - metric_store[0].timestamp) if len(metric_store) > 1 else 0,
}
if __name__ == "__main__":
# Quick test: simulate 20 agent calls
print("Simulating 20 agent calls...")
for i in range(20):
record = simulate_agent_call(f"Test message {i}")
status = "OK" if record.success else f"FAIL ({record.error_type})"
print(f" {record.trace_id}: {record.latency_ms}ms, {record.input_tokens + record.output_tokens} tokens, {status}")
summary = get_summary_metrics()
print(f"\nSummary:")
print(f" Total requests: {summary['total_requests']}")
print(f" Error rate: {summary['error_rate']:.1%}")
print(f" p50 latency: {summary['p50_latency_ms']:.0f}ms")
print(f" p95 latency: {summary['p95_latency_ms']:.0f}ms")
print(f" Total cost: ${summary['total_cost_usd']:.4f}")
Run it:
Expected output:
If you see a summary with total requests, error rate, and latency percentiles, Step 1 is working. If the error rate is 0%, that is fine — it is random and will vary between runs.
Troubleshooting:
- If you see
ModuleNotFoundError: No module named 'dataclasses'— upgrade to Python 3.7+ (dataclasses were added in 3.7). Runpython --versionto check. - If you see
ImportError: cannot import name 'List'— check that the import line readsfrom typing import List. The capital-LListis the generic alias fromtyping(works on Python 3.7+); the lowercaselist[...]built-in generic syntax requires Python 3.9+. - If the script runs but prints no output — make sure you are running
python metrics.py(not importing it). Theif __name__ == "__main__"block only runs when the file is executed directly.
Step 2: Create the Alert Evaluator
Now comes the part that saves your on-call engineer's sanity. The alerting module takes the summary metrics from Step 1 and checks them against a set of threshold-based rules. Here is the dilemma: if you alert on everything, you get alert fatigue and people start ignoring pages. If you alert on nothing, you discover outages from angry users. The solution is tiered severity — and that is exactly what this module implements. Each rule has a severity level (P1/P2/P3) and a routing destination (PagerDuty/Jira/dashboard).
Create a new file called alerts.py:
"""alerts.py — Tiered alerting with configurable thresholds."""
from dataclasses import dataclass
from enum import Enum
from typing import Callable, List
class Severity(Enum):
P1_PAGE = "P1" # Immediate: PagerDuty / phone call
P2_TICKET = "P2" # Next business day: Jira ticket
P3_INFO = "P3" # Weekly review: dashboard annotation
@dataclass
class AlertRule:
name: str
severity: Severity
check: Callable[[dict], bool]
message_template: str
ALERT_RULES = [
AlertRule(
name="high_error_rate",
severity=Severity.P1_PAGE,
check=lambda m: m.get("error_rate", 0) > 0.50,
message_template="CRITICAL: Error rate {error_rate:.1%} exceeds 50%",
),
AlertRule(
name="elevated_error_rate",
severity=Severity.P2_TICKET,
check=lambda m: 0.10 < m.get("error_rate", 0) <= 0.50,
message_template="WARNING: Error rate {error_rate:.1%} exceeds 10%",
),
AlertRule(
name="high_latency",
severity=Severity.P2_TICKET,
check=lambda m: m.get("p95_latency_s", 0) > 30,
message_template="WARNING: p95 latency {p95_latency_s:.1f}s exceeds 30s",
),
AlertRule(
name="cost_anomaly",
severity=Severity.P2_TICKET,
check=lambda m: m.get("cost_per_hour", 0) > 50,
message_template="WARNING: Cost ${cost_per_hour:.2f}/hr exceeds $50/hr",
),
AlertRule(
name="drift_detected",
severity=Severity.P3_INFO,
check=lambda m: abs(m.get("output_length_drift_pct", 0)) > 15,
message_template="INFO: Output length drifted {output_length_drift_pct:+.1f}%",
),
]
def evaluate_alerts(metrics: dict) -> List[dict]:
"""Evaluate all alert rules against current metrics. Return fired alerts."""
fired = []
for rule in ALERT_RULES:
try:
if rule.check(metrics):
fired.append({
"name": rule.name,
"severity": rule.severity.value,
"message": rule.message_template.format(**metrics),
})
except Exception as e:
print(f"[WARN] Alert rule '{rule.name}' failed: {e}")
return fired
def route_alert(alert: dict) -> str:
"""Route alert to the appropriate channel. Returns the action taken."""
sev = alert["severity"]
if sev == "P1":
return f"[PAGERDUTY] {alert['message']}"
elif sev == "P2":
return f"[JIRA TICKET] {alert['message']}"
elif sev == "P3":
return f"[DASHBOARD NOTE] {alert['message']}"
return f"[UNKNOWN] {alert['message']}"
if __name__ == "__main__":
# Test with a scenario that should trigger alerts
test_metrics = {
"error_rate": 0.15,
"p95_latency_s": 4.2,
"cost_per_hour": 22.50,
"output_length_drift_pct": 18.5,
}
print("Testing alert rules with elevated error rate and drift...")
print(f" Metrics: error_rate={test_metrics['error_rate']:.0%}, "
f"p95={test_metrics['p95_latency_s']}s, "
f"cost=${test_metrics['cost_per_hour']}/hr, "
f"drift={test_metrics['output_length_drift_pct']:+.1f}%")
print()
alerts = evaluate_alerts(test_metrics)
if alerts:
for alert in alerts:
action = route_alert(alert)
print(f" {action}")
else:
print(" No alerts fired.")
print(f"\n Total alerts fired: {len(alerts)}")
Run it:
Expected output:
You should see exactly 2 alerts: one P2/Ticket for the elevated error rate (15% > 10% threshold) and one P3/Info for drift (+18.5% > 15% threshold). The latency and cost metrics are below their thresholds, so those rules do not fire. If you see 0 alerts, double-check the test_metrics values.
Troubleshooting:
- If you see
KeyErrorin the message formatting — make suretest_metricscontains all the keys referenced in themessage_templatestrings (e.g.,error_rate,output_length_drift_pct). - If you see 0 alerts fired — verify the
test_metricsdictionary has"error_rate": 0.15(not15). The threshold checks expect a decimal fraction, not a percentage. - If you see 3 or more alerts — check that the lambda conditions use
>(greater than) not>=(greater than or equal to) for the boundaries.
Step 3: Create the Feedback Collector
Metrics tell you how your agent is performing, but only humans can tell you whether it got the right answer. This module closes that gap. It simulates collecting thumbs-up/down feedback from users and then aggregates that feedback into a daily summary — showing you exactly which failure categories are hurting your users the most. In production, this would be a REST endpoint (like the one in the code walkthrough above); here we simulate the feedback locally so you can see the aggregation logic in action. This step uses the trace IDs from Step 1 to link feedback back to specific agent calls.
Create a new file called feedback.py:
"""feedback.py — User feedback collection and daily aggregation."""
import random
from dataclasses import dataclass
from typing import List, Dict
from collections import Counter
FAILURE_CATEGORIES = [
"wrong_order_referenced",
"outdated_shipping_status",
"hallucinated_tracking_number",
"incomplete_answer",
"tone_too_formal",
"other",
]
@dataclass
class FeedbackRecord:
trace_id: str
score: int # 1 = thumbs up, 0 = thumbs down
comment: str = ""
failure_category: str = ""
# In-memory feedback store
feedback_store: List[FeedbackRecord] = []
def record_feedback(trace_id: str, score: int, comment: str = "") -> FeedbackRecord:
"""Record a single piece of user feedback."""
if score not in (0, 1):
raise ValueError("Score must be 0 (thumbs down) or 1 (thumbs up)")
# Auto-classify negative feedback (in production, use an LLM for this)
category = ""
if score == 0:
category = random.choice(FAILURE_CATEGORIES)
record = FeedbackRecord(
trace_id=trace_id,
score=score,
comment=comment[:500],
failure_category=category,
)
feedback_store.append(record)
return record
def daily_aggregation() -> Dict:
"""Aggregate feedback into a daily summary report."""
if not feedback_store:
return {"total": 0, "positive": 0, "negative": 0}
total = len(feedback_store)
positive = sum(1 for f in feedback_store if f.score == 1)
negative = sum(1 for f in feedback_store if f.score == 0)
# Group negative feedback by category
category_counts = Counter(
f.failure_category for f in feedback_store
if f.score == 0 and f.failure_category
)
return {
"total": total,
"positive": positive,
"negative": negative,
"satisfaction_rate": positive / total if total else 0,
"top_failure_categories": category_counts.most_common(5),
}
def generate_eval_candidates(min_negatives: int = 3) -> List[FeedbackRecord]:
"""Select negative feedback cases that should become eval test cases."""
negatives = [f for f in feedback_store if f.score == 0]
# In production, you would de-duplicate and select diverse examples
return negatives[:min_negatives]
if __name__ == "__main__":
# Simulate feedback for 50 agent interactions
print("Simulating feedback for 50 interactions...")
for i in range(50):
trace_id = f"tr_{i:04d}"
# 85% positive, 15% negative
score = 1 if random.random() > 0.15 else 0
comment = "Great answer!" if score == 1 else "This was wrong"
record_feedback(trace_id, score, comment)
report = daily_aggregation()
print(f"\nDaily Feedback Summary:")
print(f" Total feedback: {report['total']}")
print(f" Positive: {report['positive']} ({report['satisfaction_rate']:.0%})")
print(f" Negative: {report['negative']}")
print(f"\n Top failure categories:")
for cat, count in report["top_failure_categories"]:
print(f" {cat}: {count} cases")
candidates = generate_eval_candidates()
print(f"\n Eval candidates selected: {len(candidates)}")
for c in candidates:
print(f" {c.trace_id} — {c.failure_category}")
Run it:
Expected output:
You should see a daily summary with positive/negative counts, a satisfaction rate around 80-90%, and a ranked list of failure categories. The exact numbers will vary because we use random data. The eval candidates at the bottom represent the traces that would become new test cases in your weekly review.
Troubleshooting:
- If you see
ImportError: cannot import name 'Counter'— make sure the import line readsfrom collections import Counter. TheCounterclass is in Python's standard library. - If you see
KeyError— make sure theFAILURE_CATEGORIESlist is defined at the top of the file before therecord_feedbackfunction references it. - If all feedback is positive (0 negatives) — this can happen by chance with random simulation. Run the script again, or change the negative probability from
0.15to0.30temporarily to force more negative feedback.
Step 4: Create the A/B Traffic Router
This step uses the hash-based routing approach from the code walkthrough. We will test that the same user always gets assigned to the same variant.
Create a new file called router.py:
"""router.py — Deterministic A/B traffic router."""
import hashlib
from dataclasses import dataclass
@dataclass
class PromptVariant:
name: str
system_prompt: str
VARIANTS = {
"control": PromptVariant(
name="v2.3-stable",
system_prompt="You are a helpful customer support agent. Be concise.",
),
"canary": PromptVariant(
name="v2.4-canary",
system_prompt="You are a helpful customer support agent. Be concise and empathetic. "
"Acknowledge the customer's frustration before solving.",
),
}
def get_variant(user_id: str, canary_pct: int = 5) -> PromptVariant:
"""Deterministically assign user to control or canary group."""
hash_val = int(hashlib.sha256(user_id.encode()).hexdigest(), 16) % 100
if hash_val < canary_pct:
return VARIANTS["canary"]
else:
return VARIANTS["control"]
if __name__ == "__main__":
# Test determinism: same user should always get the same variant
print("Testing deterministic routing (same user = same variant):")
for _ in range(3):
v = get_variant("user-alice-123", canary_pct=5)
print(f" user-alice-123 -> {v.name}")
# Test distribution across many users
print("\nTesting traffic split with 1000 users at 5% canary:")
canary_count = 0
for i in range(1000):
v = get_variant(f"user-{i}", canary_pct=5)
if v.name == "v2.4-canary":
canary_count += 1
control_count = 1000 - canary_count
print(f" Control: {control_count} ({control_count/10:.1f}%)")
print(f" Canary: {canary_count} ({canary_count/10:.1f}%)")
# Test with 50% split (A/B test mode)
print("\nTesting 50/50 A/B split with 1000 users:")
canary_count = sum(
1 for i in range(1000)
if get_variant(f"user-{i}", canary_pct=50).name == "v2.4-canary"
)
print(f" Control: {1000 - canary_count} ({(1000 - canary_count)/10:.1f}%)")
print(f" Canary: {canary_count} ({canary_count/10:.1f}%)")
Run it:
Expected output:
Three things to verify: (1) The same user always gets the same variant (all three lines show the same version). (2) The 5% canary split routes roughly 5% of users to canary. (3) The 50/50 A/B split produces roughly equal groups. If the splits are wildly off (e.g., 30/70 for a 50/50 split), check that you are using SHA-256 hashing, not hash() which is randomized in Python 3.3+.
Troubleshooting:
- If you see
ImportError: cannot import name 'dataclass'— upgrade to Python 3.7+ (dataclasses were added in 3.7). - If the percentage splits seem random on each run — make sure you are using
hashlib.sha256, not Python's built-inhash()function, which is intentionally randomized for security.
Step 5: Run the Complete Pipeline
This final step ties all four modules together into a single pipeline that simulates a production monitoring cycle: collect metrics, evaluate alerts, gather feedback, and route traffic. This step uses the modules created in Steps 1-4.
Create a new file called run_pipeline.py:
"""run_pipeline.py — End-to-end monitoring pipeline."""
from metrics import simulate_agent_call, get_summary_metrics, metric_store
from alerts import evaluate_alerts, route_alert
from feedback import record_feedback, daily_aggregation, generate_eval_candidates
from router import get_variant
import random
def main():
print("=" * 60)
print(" MONITORING & CONTINUOUS IMPROVEMENT PIPELINE")
print("=" * 60)
# --- Phase 1: Simulate Traffic with A/B Routing ---
print("\n--- Phase 1: Simulating 100 agent requests with A/B routing ---")
users = [f"user-{i}" for i in range(100)]
canary_count = 0
for user_id in users:
variant = get_variant(user_id, canary_pct=10)
if variant.name == "v2.4-canary":
canary_count += 1
record = simulate_agent_call(f"Question from {user_id}")
print(f" Requests processed: {len(metric_store)}")
print(f" Control group: {100 - canary_count} users")
print(f" Canary group: {canary_count} users")
# --- Phase 2: Aggregate Metrics ---
print("\n--- Phase 2: Aggregating metrics ---")
summary = get_summary_metrics()
print(f" Error rate: {summary['error_rate']:.1%}")
print(f" p50 latency: {summary['p50_latency_ms']:.0f}ms")
print(f" p95 latency: {summary['p95_latency_ms']:.0f}ms")
print(f" Total cost: ${summary['total_cost_usd']:.4f}")
# Add drift metric for alert evaluation
summary["output_length_drift_pct"] = random.uniform(-5, 25)
print(f" Output drift: {summary['output_length_drift_pct']:+.1f}%")
# --- Phase 3: Evaluate Alerts ---
print("\n--- Phase 3: Evaluating alert rules ---")
alerts = evaluate_alerts(summary)
if alerts:
for alert in alerts:
action = route_alert(alert)
print(f" {action}")
else:
print(" No alerts fired. All metrics within thresholds.")
# --- Phase 4: Collect User Feedback ---
print("\n--- Phase 4: Simulating user feedback ---")
for record in metric_store:
# Simulate: 30% of users leave feedback, 85% positive
if random.random() < 0.30:
score = 1 if random.random() > 0.15 else 0
record_feedback(record.trace_id, score)
report = daily_aggregation()
print(f" Feedback collected: {report['total']}")
print(f" Satisfaction rate: {report['satisfaction_rate']:.0%}")
if report["top_failure_categories"]:
print(f" Top failures:")
for cat, count in report["top_failure_categories"][:3]:
print(f" - {cat}: {count}")
# --- Phase 5: Generate Eval Candidates ---
print("\n--- Phase 5: Selecting eval candidates for weekly review ---")
candidates = generate_eval_candidates(min_negatives=3)
print(f" Candidates for new eval cases: {len(candidates)}")
for c in candidates:
print(f" {c.trace_id} — {c.failure_category}")
# --- Summary ---
print("\n" + "=" * 60)
print(" PIPELINE COMPLETE")
print("=" * 60)
print(f" Requests processed: {len(metric_store)}")
print(f" Alerts fired: {len(alerts)}")
print(f" Feedback collected: {report['total']}")
print(f" Eval candidates: {len(candidates)}")
print()
print(" In production, these results would feed into:")
print(" - A Langfuse/Grafana dashboard (Phase 2)")
print(" - PagerDuty/Jira/Slack (Phase 3)")
print(" - Your eval test suite (Phase 5)")
print(" - A weekly failure review meeting (Phase 5)")
if __name__ == "__main__":
main()
Run it:
Expected output:
Verify Everything Works
Run the complete pipeline one more time to confirm all five phases execute without errors:
You should see all five phases complete with metrics, alerts, feedback, and eval candidates. The exact numbers will vary due to random simulation, but the structure should match the expected output above.
You have built a complete monitoring and continuous improvement pipeline with four modules: metric collection, tiered alerting, feedback aggregation, and A/B traffic routing. In production, you would replace the simulated data with real Claude API calls and a Langfuse backend, but the architecture — collect, aggregate, alert, collect feedback, grow evals — remains identical.
Troubleshooting:
- If you see
ModuleNotFoundError: No module named 'metrics'— make sure all four files (metrics.py,alerts.py,feedback.py,router.py) are in the same directory asrun_pipeline.py. - If you see
SyntaxError— ensure you are running Python 3.9+ (python --version). - If the error rate seems too high or too low — this is expected with random simulation. Run the pipeline multiple times to see natural variance.
Knowledge Check
Test your understanding of monitoring, alerting, feedback loops, and continuous improvement. 6 questions.
1. Which of the following is NOT one of the four key metric categories for a production agent dashboard?
2. Your agent's error rate has climbed from 2% to 8%. What alert severity should this trigger?
3. Why do agent A/B tests typically need more samples than traditional web A/B tests?
4. What is the primary risk of deploying a prompt change to 100% of traffic without a canary phase?
5. What is the correct order for the continuous improvement cycle?
6. What is alert fatigue, and what is the best way to prevent it?
Your Score
Summary
You have completed the final module of Track 6: Observability. Here is what you learned:
- Production Dashboards: Monitor four quadrants — latency (p50/p95/p99), token usage and cost, success/failure rates, and drift detection. A single dashboard gives you a complete picture of agent health.
- Tiered Alerting: P1/Page for emergencies (>50% error rate), P2/Ticket for next-day issues (>10% error rate, high latency), P3/Info for weekly trends (drift). Start conservative to avoid alert fatigue.
- Feedback Loops: Collect at three speeds — real-time (thumbs up/down), daily (failure aggregation), weekly (eval dataset growth). Each speed catches problems at a different timescale.
- Canary Deployments & A/B Tests: Start with 1-5% canary traffic before expanding. Use hash-based routing for consistency. Agent A/B tests need 3-5x more samples due to LLM non-determinism.
- Continuous Improvement: Auto-score production responses, run weekly failure reviews, grow eval datasets from real failures, version-control prompts, and track improvement velocity.
With M19 (Tracing & Logging) and this module, you now have the complete observability toolkit: instrument, visualize, alert, collect feedback, and improve continuously. You are ready for Track 7.
Next up: M21: API Design & Deployment begins Track 7 (Deployment), where you will learn to package your agent as a production API with authentication, rate limiting, and infrastructure best practices.