Building AI Agents with Claude Track 4: Agent Architectures
Module 14 of 30 ~70 min Intermediate–Advanced

Planning & Task Decomposition

Teach your agent to think before it acts — break complex goals into structured execution plans.

Learning Objectives

  • Explain why complex tasks (5+ steps) need a planning layer beyond raw ReAct execution
  • Build an intent classifier that routes user requests to the appropriate planning strategy
  • Decompose complex goals into hierarchical sub-task trees with explicit dependencies
  • Implement a DAG executor that runs independent sub-tasks in parallel and respects sequential dependencies
  • Use dynamic tool discovery to select relevant tools per sub-task instead of loading all tools at once

Prerequisites: M12 (ReAct Pattern)  |  Level: Intermediate–Advanced

Why Complex Tasks Need Planning

In M12, you built a ReAct agent that could chain 2-3 tool calls to answer a multi-step question. That works beautifully for focused research tasks. But what happens when you ask it to do something bigger — like "research our top 3 competitors, compare their pricing, and draft a summary report"? That's at least 10 tool calls across multiple phases, with dependencies between them. A raw ReAct agent tackling this will often lose track of the overall goal, backtrack unnecessarily, or miss entire phases.

Everyday Analogy

Before planning: Imagine building IKEA furniture without reading the instructions. You open the box, see 47 pieces and a bag of screws, and just start connecting things that look like they fit. You might get the shelf assembled eventually, but you'll probably attach a side panel upside down, realize at step 15 that you missed a bracket at step 3, and end up with leftover screws you're not sure where they go.

The pain: This is exactly how a ReAct-only agent handles complex tasks. It makes locally reasonable decisions ("this tool call seems right for what I'm doing now") but globally suboptimal ones ("I should have gathered all the competitor names first before researching each one"). It can't see the full picture because it never creates one.

The mapping: Planning is reading the instructions before you start building. The agent analyzes the entire goal, breaks it into phases ("gather competitor names" → "research each competitor" → "compare pricing" → "draft report"), identifies what depends on what, and creates a structured execution plan. Then it follows the plan step by step, tracking progress. This transforms "try things and hope for the best" into "understand the goal, make a plan, execute systematically."

Technical Definition

Task planningA meta-strategy where an LLM analyzes a complex goal and produces a structured execution plan (a tree or graph of sub-tasks with dependencies) before starting execution. This improves completion rates from ~40% to ~80% on complex benchmarks. is a meta-strategy where the agent analyzes a complex goal and produces a structured execution plan before starting any tool calls. Instead of diving straight into a ReAct loop, the agent first generates a tree (or graph) of sub-tasks. It identifies dependencies between them. Then it executes the plan systematically, one wave at a time.

Why does this matter? Research on agent benchmarks shows that for tasks with more than 3-4 required steps, ReAct-only agents complete successfully about 40% of the time. The same agents with a planning step complete about 80% of the time. The planning step catches issues early: "Wait, I need the competitor names before I can research each one" is obvious when you write it down, but easy to miss in the middle of a ReAct loop.

Planning isn't a separate framework — it's a layer on top of ReAct. The plan generates the "what to do" list. ReAct handles the "how to do each item" execution. In this module, you'll build both layers and wire them together.

Here's what a plan looks like in practice — this is the actual JSON structure the decomposer produces:

{ "tasks": [ { "id": "t1", "description": "Find top 3 competitors", "dependencies": [] }, { "id": "t2", "description": "Research Asana pricing", "dependencies": ["t1"] }, { "id": "t3", "description": "Research Monday.com pricing", "dependencies": ["t1"] }, { "id": "t4", "description": "Compare all pricing data", "dependencies": ["t2", "t3"] }, { "id": "t5", "description": "Draft summary report", "dependencies": ["t4"] } ]}
Unplanned vs. Planned Agent Execution
Without Planning (ReAct Only)
1 Search "competitor analysis tools"
2 Read first result (Asana)
3 Search "Asana pricing"
4 Search "project mgmt competitors"
5 Read result... wait, already have Asana
6 Lost context — starts over
With Planning (Decompose First)
P Plan: 4 phases, 8 sub-tasks
1 Find top 3 competitors
2 Research Asana features
3 Research Monday.com features
4 Research ClickUp features
5 Compare pricing (all 3)
6 Draft summary report
Why It Matters

Planning is what separates toy agents from production agents. A customer-facing agent that's supposed to "process a refund, update the CRM, and send a confirmation email" can't afford to get confused at step 2 and start over. In production deployments, adding a planning layer reduces task failure rates by 40-50% and cuts total token usage by 20-30% (because the agent doesn't waste tokens backtracking). For a team running 10,000 agent tasks per day, that's the difference between 6,000 and 8,000 successful completions — and $200-400/day saved in API costs.

Common Misconceptions

"Every request needs a plan" — No. Simple questions ("What's the weather?"), single-step tool calls, and direct generation tasks don't benefit from planning. The planning overhead (an extra API call to generate the plan) actually makes these slower. Use intent classification (next section) to skip planning for simple requests.

"The plan must be perfect before execution starts" — Plans are hypotheses, not contracts. A good agent executes the plan but adapts when sub-tasks produce unexpected results. Rigid plan-following is as bad as no planning. The best approach: plan upfront, but re-plan if a sub-task fails or returns surprising information.

"Planning requires a special planning model" — The same Claude model that reasons in ReAct loops can generate plans. You just need a different prompt: instead of "do this task," you say "break this task into sub-tasks with dependencies." It's a different use of the same capability.

Now you understand why planning matters. But not every user request needs a plan — "What time is it?" doesn't require task decomposition. The first step of any planning system is figuring out whether to plan at all. That's intent classification.

Intent Classification: Route Before You Plan

Everyday Analogy

Before intent classification: Imagine a hospital where every patient — whether they have a paper cut or chest pain — goes through the same full diagnostic workup: blood tests, MRI, specialist consultation. The paper-cut patient wastes hours and thousands of dollars on unnecessary procedures, while the chest-pain patient waits in line behind them.

The pain: Without intent classification, every user request triggers the full planning pipeline. "What's 2+2?" goes through task decomposition, DAG creation, and tool discovery — burning tokens and adding 3-5 seconds of latency for a task that should take 200 milliseconds.

The mapping: Intent classification is the triage nurse. She quickly assesses each patient's needs and routes them to the right level of care: paper cut → band-aid station (direct answer), sprained ankle → minor procedures (single tool call), chest pain → full diagnostic workup (planning pipeline). The agent's intent classifier does the same: simple query → direct Claude call, single-step task → ReAct loop, complex multi-step task → full planning pipeline.

Technical Definition

Intent classificationThe first stage of an agent pipeline that categorizes a user's request into a type (e.g., simple question, multi-step task, creative generation) and extracts key entities. This determines which execution strategy to use. is a lightweight first pass that categorizes the user's request and extracts key entities. It answers two questions: "What kind of task is this?" and "What are the important details?"

You implement it as a single Claude call with a structured system prompt. The call returns a JSON classification with three fields. First, an intent label (e.g., "simple_question", "multi_step_task", "creative_generation"). Second, a complexity score (low/medium/high). Third, extracted entities — the key nouns and parameters from the request. Based on the intent and complexity, your code routes to the appropriate execution strategy, skipping the planning overhead for simple requests.

Think of this as a router, not a filter. It doesn't block anything — it just sends each request down the most efficient path. A "simple_question" intent goes straight to a Claude call. A "multi_step_task" intent triggers the full planning pipeline. This routing typically adds only 300-500ms and 200-300 tokens of overhead, but saves 3-10x that on simple requests that would otherwise go through unnecessary planning.

Here's what the classifier actually returns — this is the JSON you'll parse to make routing decisions:

{ "intent": "multi_step_task", "complexity": "high", "entities": ["competitors", "pricing", "report"], "needs_planning": true, "reasoning": "Requires research, comparison, and synthesis across multiple subjects" }
Intent Classification & Routing
"Research top 3 competitors, compare pricing, draft a report"
Intent Classifier (Claude)
Intentmulti_step_task
Complexityhigh
Entitiescompetitors, pricing, report
Direct Answer
ReAct Loop
Full Planning Pipeline
Why It Matters

Intent classification saves both time and money. In a production agent handling 10,000 requests/day, roughly 60-70% are simple questions that don't need planning. Without classification, you'd burn ~300 tokens of planning overhead per simple request. That's 7,000 requests × 300 tokens = 2.1 million wasted tokens/day (~$32/day at Claude Sonnet pricing). With classification, those 7,000 requests skip planning and get answered in a single Claude call. The 3,000 complex requests get the full planning treatment they need. Everyone gets the right level of service.

Common Misconceptions

"Intent classification needs to be 100% accurate" — It doesn't. A misclassified simple task that goes through planning just wastes a few hundred tokens. A misclassified complex task that skips planning will fail and cost more in retries. That's why the fallback defaults to needs_planning: true — false positives (unnecessary planning) are cheap, false negatives (skipped planning) are expensive.

"I should fine-tune a model for intent classification" — For most agent systems, a well-crafted system prompt with Claude Haiku or Sonnet is sufficient. Fine-tuning only makes sense when you have thousands of labeled examples and sub-100ms latency requirements. Start with prompting; optimize later if classification becomes a bottleneck.

"More intent categories = better routing" — Actually, more categories increase misclassification risk. Start with 3-4 broad intents (direct answer, single tool, multi-step, creative). You can always split them later as you gather data on which requests fail.

Intent classification tells you whether to plan. For complex tasks that need planning, the next question is how to plan — specifically, how to break a big goal into concrete, executable sub-tasks. That's task decomposition.

Task Decomposition: Breaking Goals into Sub-Tasks

Everyday Analogy

Before decomposition: Imagine a project manager who receives the goal "Launch the new website" and just starts working on it — no breakdown, no task assignments, no timeline. They bounce between writing copy, configuring DNS, and designing the homepage, never finishing any one thing before starting the next.

The pain: When an agent tries to tackle a complex goal without decomposition, it faces the same problem. "Research competitors and draft a report" is too vague to execute directly. The agent doesn't know where to start, how to track progress, or when it's done. It makes progress on one aspect, then loses that context when it switches to another.

The mapping: A good project manager breaks "Launch the website" into concrete tasks: (1) finalize design mockups, (2) migrate content from old site, (3) configure DNS and hosting, (4) run load testing, (5) execute go-live checklist. Each task is specific, has clear inputs and outputs, and can be assigned to someone. Task decomposition does the same for an agent — it uses Claude to analyze the goal and generate a hierarchy of sub-tasks where each leaf task is small enough to accomplish with 1-2 tool calls.

Technical Definition

Task decompositionUsing an LLM to analyze a complex goal and generate a hierarchical tree of sub-tasks. Each leaf task should be atomic — achievable in 1-2 tool calls. The tree includes dependency edges showing which tasks must complete before others can start. uses Claude to analyze a complex goal and generate a hierarchical tree of sub-tasks. Each leaf task should be small enough to accomplish in 1-2 tool calls. There are three decomposition strategies:

Top-down decomposition is the most common. Claude breaks the high-level goal into 3-5 phases, then breaks each phase into specific actions. For "research competitors and draft a report," the phases might be: (1) identify competitors, (2) research each competitor, (3) compare pricing, (4) draft report. Phase 2 further decomposes into per-competitor research tasks.

Bottom-up decomposition starts with the atomic actions: "search for X," "extract pricing from Y," "format data as table." Claude then groups these into logical phases. This works well when you know the available tools and want to plan in terms of specific capabilities.

Iterative decomposition only plans the next step after completing the current one. This is useful when later steps depend on earlier results — you can't plan "research competitor pricing" until you know which competitors exist. The tradeoff: less parallelism, but more adaptive plans.

Here's what a decomposed task tree looks like. Notice how each leaf node maps to a specific action, and dependencies form a hierarchy:

Goal: "Research competitors, compare pricing, draft report" ├── t1: Identify top 3 competitors (deps: none) → web_search ├── t2: Research Asana features & pricing (deps: t1) → web_search, extract ├── t3: Research Monday.com features (deps: t1) → web_search, extract ├── t4: Research ClickUp features (deps: t1) → web_search, extract ├── t5: Compare pricing across all three (deps: t2, t3, t4) → comparison_table └── t6: Draft summary report (deps: t5) → text_generation
Task Decomposition Tree
GOAL: Research competitors, compare pricing, draft report t1: Identify top 3 competitors [web_search] deps: none t2: Research Asana pricing [search] deps: t1 t3: Research Monday.com [search] deps: t1 t4: Research ClickUp [search] deps: t1 ⚡ Run in parallel ⏳ Wait for all three t5: Compare pricing across all three [compare] deps: t2, t3, t4 t6: Draft summary report [generate] deps: t5 ✓ FINAL OUTPUT
Cert Tip — Domain 1.6

Overly narrow task decomposition creates coverage gaps — subtasks may not cover the full scope of the original goal. Overly broad decomposition overwhelms individual agents with tasks that are too large. The exam tests finding the right granularity: leaf tasks should be achievable in 1-2 tool calls, but collectively they must cover the entire goal.

Why It Matters

The quality of decomposition directly determines agent success. In production, we see a clear pattern: tasks decomposed into 5-10 sub-tasks with 1-2 tool calls each succeed about 85% of the time. Tasks decomposed into 15+ very fine-grained sub-tasks succeed only about 60% of the time (coordination overhead dominates). Tasks not decomposed at all succeed about 40% of the time on complex goals. The sweet spot is leaf tasks that take 1-2 tool calls — specific enough to execute reliably, but coarse enough to avoid drowning in coordination.

You now know how to break a complex goal into sub-tasks. But sub-tasks aren't always sequential — some can run in parallel (researching different competitors), while others must wait (can't write the report until all research is done). A DAG captures these dependencies and enables efficient execution.

DAG Execution: Parallel & Sequential Paths

Everyday Analogy

Before DAGs: Imagine cooking a complex dinner — roast chicken, mashed potatoes, salad, and dessert — and doing everything one step at a time. You peel potatoes, then wait. Chop salad, then wait. Preheat oven, then wait. Total time: 3 hours. But the oven can preheat while you chop, and the salad can be made while the chicken roasts.

The pain: Sequential execution wastes time on independent tasks. If researching Asana takes 5 seconds and researching Monday.com takes 5 seconds, running them sequentially costs 10 seconds. Running them in parallel costs 5 seconds. For an agent handling 8 sub-tasks, parallelism can cut total execution time by 40-60%.

The mapping: A DAG (Directed Acyclic Graph) is your cooking schedule. It captures which tasks can happen at the same time (chop vegetables while oven preheats = parallel) and which must wait (can't frost the cake before it's baked = sequential dependency). The DAG executor looks at the graph, starts all tasks with no unmet dependencies, and as each task completes, releases the tasks that were waiting for it.

Technical Definition

A Directed Acyclic Graph (DAG)A graph where nodes are tasks and directed edges represent dependencies (A→B means A must finish before B starts). 'Acyclic' means no circular dependencies — you can't have A depending on B and B depending on A. This guarantees the graph can be executed in a finite number of steps. represents tasks as nodes and dependencies as directed edges. "Directed" means the edges have a direction (A → B means A must finish before B can start). "Acyclic" means there are no circular dependencies — you can't have task A depending on task B and task B depending on task A, because that would be impossible to execute.

A DAG executor works in waves. First, it identifies all "root" nodes — tasks with no incoming edges (no dependencies). These can all start immediately, in parallel. As each task completes, the executor checks: "Did this unlock any new tasks?" A task is "ready" when all its dependencies are satisfied. Ready tasks are dispatched for execution, potentially in parallel. This continues until all tasks are complete or a failure triggers re-planning.

Why "acyclic"? Because a cycle (A needs B, B needs C, C needs A) creates a deadlock — no task can ever start. During decomposition, you need to validate that the generated task graph is acyclic. If Claude accidentally creates a cycle, you catch it with a topological sort validation and ask Claude to regenerate the plan.

Here's what a valid DAG looks like versus an invalid one with a cycle:

✅ Valid DAG (acyclic): t1 → t2, t3 (t2 and t3 depend on t1, run in parallel after t1) t2, t3 → t4 (t4 waits for both t2 and t3) Execution order: [t1] → [t2, t3] → [t4] ❌ Invalid DAG (has cycle): t1 → t2 (t2 depends on t1) t2 → t3 (t3 depends on t2) t3 → t1 (t1 depends on t3 — CYCLE! No task can start)
DAG Execution Graph — Structure & Timing
Wave 1 0–2s Wave 2 (parallel) 2–5s Wave 3 5–6s Wave 4 6–8s t1 Identify t2 Asana t3 Monday t4 ClickUp t5 Compare t6 Report 0s 2s 5s 6s 8s Parallel: ~8s total Sequential: ~18s total (2.25x slower)
DAG Execution — Parallel Waves
Identify competitors
Research Asana
Research Monday
Research ClickUp
Compare pricing
Compare features
Draft report
Wave 0 / 4
Why It Matters

DAG execution is how production agents achieve both reliability and speed. Consider the competitor research task: sequential execution takes ~25 seconds (5 seconds per sub-task × 5 sub-tasks). With DAG execution, the three parallel research tasks run simultaneously, cutting total time to ~15 seconds. But the real win is structure — dependencies prevent the "draft report" step from starting before all research is done, which is exactly the mistake an unplanned ReAct agent makes. In production systems handling thousands of agent tasks, DAG execution saves an average of 35% wall-clock time versus sequential execution.

Common Misconceptions

"More parallelism is always better" — Not necessarily. Each parallel task consumes API rate limit capacity. If you have 10 parallel sub-tasks but your API rate limit is 5 requests/minute, you're not actually running 10 at once — you're bottlenecked by rate limiting. In practice, 3-5 parallel tasks is the sweet spot for most API tiers.

"DAGs are only for big, complex tasks" — Even a 3-task pipeline benefits from DAG structure. The explicit dependency tracking prevents subtle bugs like "the summary task started before the data-gathering task finished." DAGs aren't complexity overhead — they're correctness infrastructure.

"If a sub-task fails, the whole DAG fails" — Not if you handle it right. A good DAG executor supports partial completion: if "Research ClickUp" fails, you can still compare Asana and Monday.com. The report just notes that ClickUp data was unavailable. This is much better than aborting the entire task.

DAG execution handles the "when to run what" question. But there's one more piece: as your tool inventory grows beyond 10-15 tools, giving all of them to Claude for every sub-task degrades its performance. Dynamic tool discovery solves this by selecting only the relevant tools for each sub-task.

Dynamic Tool Discovery

Everyday Analogy

Before tool discovery: Imagine a contractor who brings their entire tool truck to every job — plumbing tools, electrical tools, carpentry tools, landscaping tools. When they need a wrench, they have to sort through 200 tools to find it. The clutter slows them down and they sometimes grab the wrong tool because there are too many similar options.

The pain: When you give Claude 20+ tool definitions in every prompt, two things happen. First, Claude's tool selection accuracy drops — research shows a meaningful degradation above 5-7 tools per prompt. Second, you're wasting context window space on tool descriptions the model won't use. A typical tool definition is 100-200 tokens; 20 tools = 2,000-4,000 tokens of irrelevant context per call.

The mapping: Dynamic tool discovery is like the contractor assessing the job first: "This is a plumbing job, so I need pipe wrench, pipe cutter, and Teflon tape." They grab only those 3-4 tools from the truck. The agent does the same: given a sub-task like "compare competitor pricing," it searches its tool inventory and selects the 3-4 most relevant tools (web_search, price_extractor, comparison_table). Only those tools are injected into the Claude prompt for that sub-task.

Technical Definition

Dynamic tool discoveryA technique that selects a subset of tools relevant to the current sub-task, rather than loading all available tools into every prompt. Implemented via embedding similarity search or a routing classifier that maps task categories to tool subsets. selects a relevant subset of tools at runtime based on the current sub-task. Instead of loading all available tools into every Claude prompt, the agent searches its tool inventory and injects only the 3-5 most relevant tools.

There are two main implementation approaches. Embedding-based: embed each tool's description as a vector, embed the sub-task description, and find the nearest tools by cosine similarity. This is flexible and handles novel sub-tasks well, but requires a vector store. Category-based: use a simple mapping from task categories to tool subsets (e.g., "data_retrieval" maps to [web_search, db_query, api_fetch]). This is simpler but less flexible — new tools or task types require updating the mapping.

The embedding approach is preferred for systems with 15+ tools. For smaller tool sets (5-10), category-based routing is often sufficient. The key insight: tool selection accuracy matters more than tool availability. It's better to give Claude 4 relevant tools than 20 tools where 4 are relevant and 16 are noise.

Here's what a category-based tool map looks like in practice — you'd define this in your agent's configuration:

{ "data_retrieval": ["web_search", "db_query", "api_fetch"], "analysis": ["price_extractor", "comparison_table", "chart_gen"], "content": ["text_generator", "formatter", "email_sender"], "devops": ["deploy", "ci_cd", "config_manager"] } Sub-task "Compare competitor pricing" → category: "analysis" → Tools injected: [price_extractor, comparison_table, chart_gen] → 3 tools instead of 12 = faster, cheaper, more accurate
Dynamic Tool Discovery
Sub-task: "Compare competitor pricing plans"
🔍search
📊chart
💰pricing
📦deploy
📧email
📄docs
📈compare
💻code
🛠config
🔒auth
🚀ci_cd
📚kb
Selected: search, pricing, compare (3 of 12 tools injected into prompt)
Why It Matters

At scale, dynamic tool discovery is a significant cost and accuracy optimization. Consider an agent with 25 tools: loading all 25 definitions adds ~4,000 tokens to every prompt. Over 10 sub-tasks, that's 40,000 extra input tokens — about $0.60 at Claude Sonnet pricing, per task. With 1,000 tasks/day, you're spending $600/day on tool descriptions Claude never uses. Dynamic discovery cuts this to ~800 tokens per prompt (4 tools × 200 tokens each), saving 80% of that cost. And because Claude sees fewer, more relevant tools, its tool selection accuracy improves from ~85% to ~95%.

Common Misconceptions

"Embedding-based discovery is always better than category-based" — Not necessarily. Embedding search adds latency (50-100ms per lookup) and requires maintaining a vector store. For agents with fewer than 15 tools, a simple category map is faster, easier to debug, and just as accurate. Use embeddings when your tool inventory is large and evolving frequently.

"Tool descriptions don't matter if you're using discovery" — They matter even more. Discovery relies on matching sub-task descriptions to tool descriptions. A vague tool description like "does stuff with data" won't match well against any sub-task. Write tool descriptions the way you'd explain the tool to a new teammate: what it does, what it takes as input, what it returns.

"Just give Claude all tools and let it figure it out" — This works below 5-7 tools. Above that threshold, tool selection accuracy drops measurably. At 20+ tools, Claude may pick a plausible-but-wrong tool (e.g., web_search instead of db_query) because both descriptions mention "finding information." Fewer, more relevant tools = better decisions.

You now understand all five components of a planning agent: intent classification, task decomposition, DAG execution, dynamic tool discovery, and how they build on the ReAct loop from M12. In a UCC filing pipeline, these components would look like this: the classifier routes "What's the filing status for Acme Corp?" directly to a single database lookup, but routes "Run a comprehensive lien risk assessment across all Acme Corp subsidiaries" through the full planning pipeline — decomposing it into entity resolution, filing searches per subsidiary, risk scoring, and report generation. Let's wire these components together into a working implementation.

Code Walkthrough: Building a Planning Agent

Step 1: Intent Classifier

Let's start with the gatekeeper of the whole pipeline: the intent classifier. Its job is simple but critical — look at the user's request, decide if it's a quick question or a complex multi-step task, and route accordingly. We implement it as a single Claude call with a structured system prompt that forces JSON output. The interesting design choice here is the fallback: if JSON parsing fails, we default to needs_planning: true. Why? Because over-planning a simple task wastes a few hundred tokens, but under-planning a complex task wastes the entire task. It's cheaper to be cautious.

import anthropic
import json

client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY env var

CLASSIFY_PROMPT = """Classify the user's request into one of these intents:
- simple_question: Can be answered directly without tools
- single_tool: Requires one tool call (search, calculate, etc.)
- multi_step_task: Requires multiple steps with dependencies
- creative_generation: Writing, brainstorming, or content creation

Return JSON with this exact format:
{
  "intent": "simple_question|single_tool|multi_step_task|creative_generation",
  "complexity": "low|medium|high",
  "entities": ["extracted", "key", "terms"],
  "needs_planning": true/false,
  "reasoning": "One sentence explaining your classification"
}"""


def classify_intent(user_request: str) -> dict:
    """Classify the user's intent and determine if planning is needed."""
    try:
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=256,
            system=CLASSIFY_PROMPT,
            messages=[{"role": "user", "content": user_request}],
        )
        text = response.content[0].text

        # Extract JSON from the response
        # Claude may wrap it in ```json ... ``` or return plain JSON
        if "```" in text:
            text = text.split("```")[1]
            if text.startswith("json"):
                text = text[4:]
        return json.loads(text.strip())

    except (json.JSONDecodeError, IndexError, KeyError) as e:
        # Fallback: treat as multi-step to be safe
        return {
            "intent": "multi_step_task",
            "complexity": "medium",
            "entities": [],
            "needs_planning": True,
            "reasoning": f"Classification failed ({e}), defaulting to planning",
        }


# --- Usage ---
result = classify_intent(
    "Research the top 3 project management tools, compare their "
    "pricing, and draft a summary report for our team"
)
print(json.dumps(result, indent=2))
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic(); // reads ANTHROPIC_API_KEY env var

const CLASSIFY_PROMPT = `Classify the user's request into one of these intents:
- simple_question: Can be answered directly without tools
- single_tool: Requires one tool call (search, calculate, etc.)
- multi_step_task: Requires multiple steps with dependencies
- creative_generation: Writing, brainstorming, or content creation

Return JSON with this exact format:
{
  "intent": "simple_question|single_tool|multi_step_task|creative_generation",
  "complexity": "low|medium|high",
  "entities": ["extracted", "key", "terms"],
  "needs_planning": true/false,
  "reasoning": "One sentence explaining your classification"
}`;

async function classifyIntent(userRequest) {
  try {
    const response = await client.messages.create({
      model: "claude-sonnet-4-6",
      max_tokens: 256,
      system: CLASSIFY_PROMPT,
      messages: [{ role: "user", content: userRequest }],
    });
    let text = response.content[0].text;

    // Extract JSON — Claude may wrap in ```json ... ```
    if (text.includes("```")) {
      text = text.split("```")[1];
      if (text.startsWith("json")) text = text.slice(4);
    }
    return JSON.parse(text.trim());
  } catch (e) {
    // Fallback: treat as multi-step to be safe
    return {
      intent: "multi_step_task",
      complexity: "medium",
      entities: [],
      needs_planning: true,
      reasoning: `Classification failed (${e.message}), defaulting to planning`,
    };
  }
}

// --- Usage ---
const result = await classifyIntent(
  "Research the top 3 project management tools, compare their " +
    "pricing, and draft a summary report for our team"
);
console.log(JSON.stringify(result, null, 2));
What Just Happened?

You built a lightweight intent classifier that uses a single Claude call with a structured prompt to categorize user requests. The key design decisions: (1) the system prompt specifies exact JSON format so parsing is reliable, (2) the fallback on parsing failure defaults to needs_planning: true (it's safer to over-plan than to skip planning for a complex task), and (3) the entities extraction gives downstream components (like task decomposition) a head start on understanding the request.

Step 2: Task Decomposer

Now for the brains of the operation. The decomposer takes a complex goal like "research competitors and draft a report" and turns it into a structured task graph — a list of sub-tasks, each with a unique ID, a description, and a list of dependencies on other task IDs. The real challenge here isn't generating the tasks — Claude is good at that. The challenge is validation. What if Claude accidentally creates a circular dependency (task A depends on B, B depends on A)? That would deadlock the executor. So after every decomposition, we validate the graph using Kahn's topological sort algorithm. If it detects a cycle, we reject the plan and fall back to a simple sequential execution.

DECOMPOSE_PROMPT = """Break the user's goal into a task graph.
Return JSON with this structure:
{
  "tasks": [
    {
      "id": "t1",
      "description": "What this task does",
      "type": "tool_call|generation|aggregation",
      "dependencies": [],
      "tools_hint": ["suggested_tool_names"]
    }
  ]
}

Rules:
- Each leaf task should be achievable in 1-2 tool calls
- Use dependencies to indicate which tasks must finish first
- tasks with no dependencies can run in parallel
- IDs must be unique strings (t1, t2, etc.)
- No circular dependencies allowed"""


def decompose_task(goal: str, entities: list[str]) -> dict:
    """Decompose a complex goal into a DAG of sub-tasks."""
    context = f"Key entities: {', '.join(entities)}" if entities else ""

    try:
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            system=DECOMPOSE_PROMPT,
            messages=[{
                "role": "user",
                "content": f"Goal: {goal}\n{context}",
            }],
        )
        text = response.content[0].text
        if "```" in text:
            text = text.split("```")[1]
            if text.startswith("json"):
                text = text[4:]
        plan = json.loads(text.strip())

        # Validate: check for cycles using topological sort
        if not _validate_dag(plan["tasks"]):
            raise ValueError("Task graph contains cycles")

        return plan

    except Exception as e:
        # Fallback: simple sequential plan
        return {
            "tasks": [
                {
                    "id": "t1",
                    "description": goal,
                    "type": "tool_call",
                    "dependencies": [],
                    "tools_hint": [],
                }
            ],
            "error": f"Decomposition failed: {e}",
        }


def _validate_dag(tasks: list[dict]) -> bool:
    """Validate that the task graph is acyclic using topological sort."""
    task_ids = {t["id"] for t in tasks}
    in_degree = {t["id"]: 0 for t in tasks}
    adj = {t["id"]: [] for t in tasks}

    for t in tasks:
        for dep in t.get("dependencies", []):
            if dep not in task_ids:
                return False  # dependency references non-existent task
            adj[dep].append(t["id"])
            in_degree[t["id"]] += 1

    # Kahn's algorithm
    queue = [tid for tid, deg in in_degree.items() if deg == 0]
    visited = 0
    while queue:
        node = queue.pop(0)
        visited += 1
        for neighbor in adj[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    return visited == len(tasks)  # All nodes visited = no cycles


# --- Usage ---
plan = decompose_task(
    "Research top 3 competitors, compare pricing, draft report",
    ["competitors", "pricing", "report"],
)
for task in plan["tasks"]:
    deps = ", ".join(task["dependencies"]) or "none"
    print(f"  {task['id']}: {task['description']} (deps: {deps})")
const DECOMPOSE_PROMPT = `Break the user's goal into a task graph.
Return JSON with this structure:
{
  "tasks": [
    {
      "id": "t1",
      "description": "What this task does",
      "type": "tool_call|generation|aggregation",
      "dependencies": [],
      "tools_hint": ["suggested_tool_names"]
    }
  ]
}

Rules:
- Each leaf task should be achievable in 1-2 tool calls
- Use dependencies to indicate which tasks must finish first
- tasks with no dependencies can run in parallel
- IDs must be unique strings (t1, t2, etc.)
- No circular dependencies allowed`;

async function decomposeTask(goal, entities = []) {
  const context = entities.length
    ? `Key entities: ${entities.join(", ")}`
    : "";

  try {
    const response = await client.messages.create({
      model: "claude-sonnet-4-6",
      max_tokens: 1024,
      system: DECOMPOSE_PROMPT,
      messages: [{ role: "user", content: `Goal: ${goal}\n${context}` }],
    });
    let text = response.content[0].text;
    if (text.includes("```")) {
      text = text.split("```")[1];
      if (text.startsWith("json")) text = text.slice(4);
    }
    const plan = JSON.parse(text.trim());

    if (!validateDag(plan.tasks)) {
      throw new Error("Task graph contains cycles");
    }
    return plan;
  } catch (e) {
    return {
      tasks: [
        {
          id: "t1",
          description: goal,
          type: "tool_call",
          dependencies: [],
          tools_hint: [],
        },
      ],
      error: `Decomposition failed: ${e.message}`,
    };
  }
}

function validateDag(tasks) {
  const taskIds = new Set(tasks.map((t) => t.id));
  const inDegree = Object.fromEntries(tasks.map((t) => [t.id, 0]));
  const adj = Object.fromEntries(tasks.map((t) => [t.id, []]));

  for (const t of tasks) {
    for (const dep of t.dependencies || []) {
      if (!taskIds.has(dep)) return false;
      adj[dep].push(t.id);
      inDegree[t.id]++;
    }
  }

  // Kahn's algorithm — topological sort
  const queue = Object.entries(inDegree)
    .filter(([, d]) => d === 0)
    .map(([id]) => id);
  let visited = 0;

  while (queue.length > 0) {
    const node = queue.shift();
    visited++;
    for (const neighbor of adj[node]) {
      inDegree[neighbor]--;
      if (inDegree[neighbor] === 0) queue.push(neighbor);
    }
  }
  return visited === tasks.length;
}

// --- Usage ---
const plan = await decomposeTask(
  "Research top 3 competitors, compare pricing, draft report",
  ["competitors", "pricing", "report"]
);
for (const task of plan.tasks) {
  const deps = task.dependencies.join(", ") || "none";
  console.log(\`  \${task.id}: \${task.description} (deps: \${deps})\`);
}
What Just Happened?

You built a task decomposer that uses Claude to generate a DAG of sub-tasks from a complex goal, then validates the graph is acyclic using Kahn's topological sort algorithm. The cycle detection is critical — without it, a malformed plan could cause your DAG executor to deadlock. The fallback returns a single-task plan so execution can still proceed even if decomposition fails.

Step 3: DAG Executor

Here's where everything comes together. The DAG executor takes the task graph from Step 2 and actually runs it. The core logic is a loop: find all tasks whose dependencies are satisfied, run them in parallel using asyncio.gather (Python) or Promise.allSettled (Node.js), mark completed tasks, and repeat. The elegant part is error handling — if a task fails, it doesn't crash the whole pipeline. Downstream tasks that depend on the failed task are marked as "blocked," but independent tasks keep running. This partial completion strategy is much more useful than all-or-nothing: the user gets results from 7 out of 8 sub-tasks rather than nothing.

import asyncio
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    FAILED = "failed"


class DAGExecutor:
    """Execute a DAG of tasks with parallel execution."""

    def __init__(self, tasks: list[dict], execute_fn):
        self.tasks = {t["id"]: t for t in tasks}
        self.status = {t["id"]: TaskStatus.PENDING for t in tasks}
        self.results = {}
        self.execute_fn = execute_fn  # async function(task) -> result

    def _get_ready_tasks(self) -> list[str]:
        """Find tasks whose dependencies are all satisfied."""
        ready = []
        for tid, task in self.tasks.items():
            if self.status[tid] != TaskStatus.PENDING:
                continue
            deps = task.get("dependencies", [])
            if all(self.status.get(d) == TaskStatus.DONE for d in deps):
                ready.append(tid)
        return ready

    async def execute(self, on_progress=None) -> dict:
        """Run the DAG to completion, executing parallel tasks concurrently."""
        while True:
            ready = self._get_ready_tasks()
            if not ready:
                # No ready tasks — either all done or deadlocked
                break

            # Execute all ready tasks in parallel
            self.status.update({tid: TaskStatus.RUNNING for tid in ready})
            if on_progress:
                on_progress(self.status, self.results)

            results = await asyncio.gather(*[
                self._run_task(tid) for tid in ready
            ], return_exceptions=True)

            for tid, result in zip(ready, results):
                if isinstance(result, Exception):
                    self.status[tid] = TaskStatus.FAILED
                    self.results[tid] = f"Error: {result}"
                else:
                    self.status[tid] = TaskStatus.DONE
                    self.results[tid] = result

            if on_progress:
                on_progress(self.status, self.results)

        return {
            "results": self.results,
            "status": {k: v.value for k, v in self.status.items()},
            "completed": sum(
                1 for v in self.status.values() if v == TaskStatus.DONE
            ),
            "total": len(self.tasks),
        }

    async def _run_task(self, task_id: str):
        """Execute a single task with error handling."""
        task = self.tasks[task_id]
        # Gather results from dependencies as context
        dep_context = {
            dep: self.results.get(dep, "")
            for dep in task.get("dependencies", [])
        }
        try:
            return await self.execute_fn(task, dep_context)
        except Exception as e:
            raise RuntimeError(
                f"Task {task_id} failed: {e}"
            ) from e


# --- Usage with a simple executor function ---
async def execute_task(task: dict, dep_context: dict) -> str:
    """Execute a single sub-task using Claude."""
    context_str = "\n".join(
        f"Result of {k}: {v}" for k, v in dep_context.items()
    )
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": (
                f"Task: {task['description']}\n"
                f"Previous results:\n{context_str}"
            ),
        }],
    )
    return response.content[0].text


async def main():
    plan = decompose_task(
        "Research top 3 competitors, compare pricing, draft report",
        ["competitors", "pricing", "report"],
    )
    executor = DAGExecutor(plan["tasks"], execute_task)
    result = await executor.execute(
        on_progress=lambda s, r: print(
            f"  Progress: {sum(1 for v in s.values() if v == TaskStatus.DONE)}"
            f"/{len(s)} done"
        )
    )
    print(f"\nCompleted: {result['completed']}/{result['total']}")

asyncio.run(main())
class DAGExecutor {
  constructor(tasks, executeFn) {
    this.tasks = Object.fromEntries(tasks.map((t) => [t.id, t]));
    this.status = Object.fromEntries(
      tasks.map((t) => [t.id, "pending"])
    );
    this.results = {};
    this.executeFn = executeFn; // async (task, depContext) => result
  }

  _getReadyTasks() {
    return Object.entries(this.tasks)
      .filter(([id]) => this.status[id] === "pending")
      .filter(([, task]) =>
        (task.dependencies || []).every(
          (dep) => this.status[dep] === "done"
        )
      )
      .map(([id]) => id);
  }

  async execute(onProgress) {
    while (true) {
      const ready = this._getReadyTasks();
      if (ready.length === 0) break;

      // Mark ready tasks as running
      for (const id of ready) this.status[id] = "running";
      if (onProgress) onProgress({ ...this.status }, { ...this.results });

      // Execute all ready tasks in parallel
      const results = await Promise.allSettled(
        ready.map((id) => this._runTask(id))
      );

      for (let i = 0; i < ready.length; i++) {
        const id = ready[i];
        if (results[i].status === "fulfilled") {
          this.status[id] = "done";
          this.results[id] = results[i].value;
        } else {
          this.status[id] = "failed";
          this.results[id] = `Error: ${results[i].reason?.message}`;
        }
      }
      if (onProgress) onProgress({ ...this.status }, { ...this.results });
    }

    const completed = Object.values(this.status).filter(
      (s) => s === "done"
    ).length;
    return {
      results: this.results,
      status: this.status,
      completed,
      total: Object.keys(this.tasks).length,
    };
  }

  async _runTask(taskId) {
    const task = this.tasks[taskId];
    const depContext = Object.fromEntries(
      (task.dependencies || []).map((dep) => [dep, this.results[dep] ?? ""])
    );
    return this.executeFn(task, depContext);
  }
}

// --- Usage ---
async function executeTask(task, depContext) {
  const contextStr = Object.entries(depContext)
    .map(([k, v]) => `Result of ${k}: ${v}`)
    .join("\n");
  const response = await client.messages.create({
    model: "claude-sonnet-4-6",
    max_tokens: 512,
    messages: [
      {
        role: "user",
        content: `Task: ${task.description}\nPrevious results:\n${contextStr}`,
      },
    ],
  });
  return response.content[0].text;
}

const plan = await decomposeTask(
  "Research top 3 competitors, compare pricing, draft report",
  ["competitors", "pricing", "report"]
);
const executor = new DAGExecutor(plan.tasks, executeTask);
const result = await executor.execute((status) => {
  const done = Object.values(status).filter((s) => s === "done").length;
  console.log(\`  Progress: \${done}/\${Object.keys(status).length} done\`);
});
console.log(\`\nCompleted: \${result.completed}/\${result.total}\`);
What Just Happened?

You built a DAG executor that runs tasks in parallel waves. The _get_ready_tasks() method finds tasks whose dependencies are all satisfied. asyncio.gather (Python) / Promise.allSettled (Node.js) runs those tasks concurrently. Failed tasks don't crash the whole pipeline — they're marked as failed and downstream tasks see their absence. The progress callback lets you display real-time status. This is production-grade orchestration: parallel where possible, sequential where required, resilient to partial failures.

Hands-On Exercise

What You'll Build

A planning agent pipeline with intent classification, Claude-powered task decomposition into a DAG, and parallel execution with progress reporting. You'll test it with both simple (direct answer) and complex (multi-step research) tasks.

Time Estimate: 45–60 minutes  |  Files You'll Create: planning_agent.py

Prerequisites: Python 3.10+, an Anthropic API key, and anthropic SDK installed. You should have completed M12 (ReAct Pattern) to understand the agent loop this module builds on.

Environment Setup

mkdir planning-lab && cd planning-lab
python -m venv venv && source venv/bin/activate   # Windows: venv\Scripts\activate
pip install "anthropic>=0.30.0"
export ANTHROPIC_API_KEY=your-key-here             # Windows: set ANTHROPIC_API_KEY=your-key-here

Step 1: Build the Planning Agent Pipeline

What: Create the entire planning system in a single file — an intent classifier that routes simple vs. complex tasks, a task decomposer that produces a DAG of sub-tasks, a parallel executor that runs independent tasks concurrently, and the pipeline that wires them together.

Why: Keeping everything in one file lets you see how the components connect. The classifier saves API costs by skipping the planning overhead for simple questions. The decomposer turns vague goals into structured task graphs. The executor runs independent sub-tasks in parallel, cutting wall-clock time by 35-60%.

Create a new file called planning_agent.py and add the following:

import anthropic
import json
import asyncio
import time

client = anthropic.Anthropic()

# ── Intent Classifier ────────────────────────────────────────
def classify_intent(request: str) -> dict:
    """Classify whether a request needs planning or direct answer."""
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=256,
        system=(
            "Classify the user request. Respond with JSON only:\n"
            '{"intent": "direct|research|multi_step", '
            '"complexity": "simple|moderate|complex", '
            '"needs_planning": true/false, '
            '"reason": "one sentence why"}'
        ),
        messages=[{"role": "user", "content": request}],
    )
    try:
        return json.loads(response.content[0].text)
    except json.JSONDecodeError:
        return {"intent": "direct", "complexity": "simple",
                "needs_planning": False, "reason": "Parse error, defaulting to direct"}

# ── Task Decomposer ─────────────────────────────────────────
def decompose_task(goal: str) -> list[dict]:
    """Break a complex goal into a DAG of sub-tasks."""
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        system=(
            "Decompose this goal into 3-7 sub-tasks. Respond with JSON only:\n"
            '[{"id": "task_1", "description": "...", "depends_on": [], '
            '"tools_needed": ["search", "analyze", etc]}, ...]'
            "\n\nRules:\n"
            "- Each task should be achievable in 1-2 tool calls\n"
            "- depends_on lists task IDs that must complete first\n"
            "- Independent tasks should have empty depends_on (they run in parallel)\n"
            "- NO circular dependencies"
        ),
        messages=[{"role": "user", "content": f"Goal: {goal}"}],
    )
    try:
        tasks = json.loads(response.content[0].text)
        # Validate: check for cycles using topological sort
        if not _validate_dag(tasks):
            print("  ⚠ WARNING: Cycle detected in task DAG, removing problematic edges")
        return tasks
    except json.JSONDecodeError:
        return [{"id": "task_1", "description": goal, "depends_on": [], "tools_needed": []}]

def _validate_dag(tasks: list[dict]) -> bool:
    """Check for circular dependencies via topological sort."""
    task_ids = {t["id"] for t in tasks}
    in_degree = {t["id"]: len(t.get("depends_on", [])) for t in tasks}
    deps = {t["id"]: set(t.get("depends_on", [])) for t in tasks}
    queue = [tid for tid, deg in in_degree.items() if deg == 0]
    visited = 0
    while queue:
        tid = queue.pop(0)
        visited += 1
        for other_id, other_deps in deps.items():
            if tid in other_deps:
                in_degree[other_id] -= 1
                if in_degree[other_id] == 0:
                    queue.append(other_id)
    return visited == len(tasks)  # True if no cycle

# ── Task Executor (simulated) ────────────────────────────────
async def execute_task(task: dict) -> dict:
    """Execute a single sub-task (simulated)."""
    await asyncio.sleep(0.5)  # simulate work
    return {
        "task_id": task["id"],
        "status": "completed",
        "result": f"Completed: {task['description'][:60]}",
    }

# ── DAG Executor ─────────────────────────────────────────────
async def execute_dag(tasks: list[dict], verbose: bool = True) -> list[dict]:
    """Execute tasks respecting dependencies, parallelizing where possible."""
    completed = {}
    failed = set()
    results = []
    remaining = {t["id"]: t for t in tasks}
    wave = 0

    while remaining:
        wave += 1
        # Find tasks whose dependencies are all satisfied
        ready = [
            t for tid, t in remaining.items()
            if all(d in completed for d in t.get("depends_on", []))
            and not any(d in failed for d in t.get("depends_on", []))
        ]

        if not ready:
            # Remaining tasks are blocked by failures
            for tid in remaining:
                results.append({"task_id": tid, "status": "blocked", "result": "Blocked by failed dependency"})
            break

        if verbose:
            task_names = [t["id"] for t in ready]
            mode = "PARALLEL" if len(ready) > 1 else "SEQUENTIAL"
            print(f"    Wave {wave} [{mode}]: {task_names}")

        # Execute ready tasks in parallel
        wave_results = await asyncio.gather(
            *[execute_task(t) for t in ready],
            return_exceptions=True,
        )

        for task, result in zip(ready, wave_results):
            if isinstance(result, Exception):
                failed.add(task["id"])
                results.append({"task_id": task["id"], "status": "failed", "result": str(result)})
            else:
                completed[task["id"]] = result
                results.append(result)
            del remaining[task["id"]]

    return results

# ── Progress Visualization ───────────────────────────────────
def print_plan(tasks: list[dict], results: list[dict] = None) -> None:
    """Print a visual task tree with status indicators."""
    status_map = {}
    if results:
        for r in results:
            status_map[r["task_id"]] = r["status"]

    print("\n  📋 Execution Plan:")
    for t in tasks:
        status = status_map.get(t["id"], "pending")
        icon = {"completed": "✅", "failed": "❌", "blocked": "🚫", "pending": "⏳"}.get(status, "⏳")
        deps = f" (after: {', '.join(t.get('depends_on', []))})" if t.get("depends_on") else " (no deps)"
        print(f"    {icon} {t['id']}: {t['description'][:50]}{deps}")

# ── Full Pipeline ────────────────────────────────────────────
async def planning_pipeline(request: str, verbose: bool = True) -> str:
    """Classify → (Plan → Execute) or Direct Answer."""
    if verbose:
        print(f"\n{'═' * 55}")
        print(f"  Request: {request}")
        print(f"{'═' * 55}")

    # Step 1: Classify
    classification = classify_intent(request)
    if verbose:
        print(f"\n  📊 Classification: {classification['intent']} "
              f"({classification['complexity']})")
        print(f"     Needs planning: {classification['needs_planning']}")
        print(f"     Reason: {classification['reason']}")

    if not classification["needs_planning"]:
        # Direct answer — skip planning overhead
        if verbose:
            print("  → Routing to direct answer (no planning needed)")
        response = client.messages.create(
            model="claude-sonnet-4-6", max_tokens=1024,
            messages=[{"role": "user", "content": request}],
        )
        return response.content[0].text

    # Step 2: Decompose into DAG
    if verbose:
        print("\n  🔨 Decomposing into sub-tasks...")
    tasks = decompose_task(request)
    if verbose:
        print_plan(tasks)

    # Step 3: Execute DAG
    if verbose:
        print(f"\n  ▶ Executing {len(tasks)} tasks...")
    start = time.time()
    results = await execute_dag(tasks, verbose=verbose)
    elapsed = time.time() - start

    if verbose:
        print_plan(tasks, results)
        completed = sum(1 for r in results if r["status"] == "completed")
        print(f"\n  ⏱ Completed {completed}/{len(tasks)} tasks in {elapsed:.1f}s")

    # Step 4: Synthesize final answer
    result_text = "\n".join(f"- {r['task_id']}: {r['result']}" for r in results)
    response = client.messages.create(
        model="claude-sonnet-4-6", max_tokens=1024,
        system="Synthesize a final answer from these sub-task results.",
        messages=[{"role": "user", "content":
            f"Original request: {request}\n\nSub-task results:\n{result_text}"}],
    )
    return response.content[0].text


# ── Tests ────────────────────────────────────────────────────
async def main():
    # Test 1: Simple task (should skip planning)
    print("\n▶ TEST 1: Simple task (direct answer)")
    r1 = await planning_pipeline("What is 2 + 2?")
    print(f"\n  Answer: {r1[:120]}")

    # Test 2: Complex task (should trigger planning)
    print("\n▶ TEST 2: Complex task (planning + execution)")
    r2 = await planning_pipeline(
        "Research the top 3 AI agent frameworks, compare their features, "
        "and draft a recommendation for a startup team."
    )
    print(f"\n  Answer: {r2[:200]}...")

    # Test 3: DAG validation
    print("\n▶ TEST 3: DAG validation")
    valid = _validate_dag([
        {"id": "a", "depends_on": []},
        {"id": "b", "depends_on": ["a"]},
        {"id": "c", "depends_on": ["a"]},
        {"id": "d", "depends_on": ["b", "c"]},
    ])
    print(f"  Valid DAG (no cycles): {valid}")

    cyclic = _validate_dag([
        {"id": "a", "depends_on": ["c"]},
        {"id": "b", "depends_on": ["a"]},
        {"id": "c", "depends_on": ["b"]},
    ])
    print(f"  Cyclic DAG (should be False): {cyclic}")

if __name__ == "__main__":
    asyncio.run(main())

Step 2: Run and Verify the Pipeline

What: Execute all three tests — a simple task (should skip planning), a complex task (should decompose and execute in parallel waves), and a DAG validation test (should detect cycles).

Why: Running all three tests confirms that intent classification routes correctly, the decomposer generates valid DAGs, and the executor handles parallel waves.

Run Command
python planning_agent.py
Expected Output (abbreviated)
▶ TEST 1: Simple task (direct answer) ═══════════════════════════════════════════════════════ Request: What is 2 + 2? ═══════════════════════════════════════════════════════ 📊 Classification: direct (simple) Needs planning: False Reason: Simple arithmetic question with immediate answer → Routing to direct answer (no planning needed) Answer: 2 + 2 = 4. ▶ TEST 2: Complex task (planning + execution) ═══════════════════════════════════════════════════════ Request: Research the top 3 AI agent frameworks... ═══════════════════════════════════════════════════════ 📊 Classification: research (complex) Needs planning: True Reason: Requires multiple research steps and synthesis 🔨 Decomposing into sub-tasks... 📋 Execution Plan: ⏳ task_1: Research LangChain features and use cases (no deps) ⏳ task_2: Research CrewAI features and use cases (no deps) ⏳ task_3: Research AutoGen features and use cases (no deps) ⏳ task_4: Compare frameworks across key dimensions (after: task_1, task_2, task_3) ⏳ task_5: Draft recommendation for startup team (after: task_4) ▶ Executing 5 tasks... Wave 1 [PARALLEL]: ['task_1', 'task_2', 'task_3'] Wave 2 [SEQUENTIAL]: ['task_4'] Wave 3 [SEQUENTIAL]: ['task_5'] 📋 Execution Plan: ✅ task_1: Research LangChain features and use cases (no deps) ✅ task_2: Research CrewAI features and use cases (no deps) ✅ task_3: Research AutoGen features and use cases (no deps) ✅ task_4: Compare frameworks across key dimensions (after: task_1, task_2, task_3) ✅ task_5: Draft recommendation for startup team (after: task_4) ⏱ Completed 5/5 tasks in 1.5s Answer: Based on my research, here are the top 3 AI agent frameworks... ▶ TEST 3: DAG validation Valid DAG (no cycles): True Cyclic DAG (should be False): False
✅ Checkpoint

Verify these key behaviors:

  • Test 1: Classification should say needs_planning: False and route directly to Claude (no decomposition)
  • Test 2: Should produce 3–7 sub-tasks with correct dependencies, Wave 1 should run independent tasks in PARALLEL, later waves run sequentially
  • Test 3: Valid DAG returns True, cyclic DAG returns False
Troubleshooting
  • If you see ModuleNotFoundError: No module named 'anthropic' → Run pip install "anthropic>=0.30.0" and make sure your virtual environment is activated.
  • If you see AuthenticationError → Check that your ANTHROPIC_API_KEY environment variable is set. Run echo $ANTHROPIC_API_KEY (Linux/Mac) or echo %ANTHROPIC_API_KEY% (Windows) to verify.
  • Classification always returns needs_planning: False → Claude may classify some tasks differently. Try a more explicitly complex request: "Research, compare, and write a report about..."
  • JSON parse error in decompose_task → Claude occasionally wraps JSON in markdown code fences. The code falls back to a single task. Try running again — LLM outputs are non-deterministic.
  • All tasks run sequentially (no parallel waves) → Check that some tasks have empty depends_on arrays. If Claude generates serial dependencies for everything, add to the prompt: "Make independent tasks parallelizable with empty depends_on."
  • If you see RuntimeError: no running event loop → Make sure you're using asyncio.run(main()) at the bottom, not calling main() directly.

Wrap-Up

If all three tests passed, you have a working planning agent. Test 1 skipped planning for the simple question, Test 2 ran the full plan-then-execute flow with parallel waves, and Test 3 validated DAG cycle detection. To re-run at any time: python planning_agent.py.

🎉 Congratulations

You've built a planning agent that classifies intent, decomposes complex tasks into a validated DAG, executes them in parallel waves, and synthesizes results. This is the architecture used by production agent systems that handle tasks too complex for a single ReAct loop.

Stretch Goals (Optional)
  • Dynamic tool discovery: Embed tool descriptions and select top-3 per sub-task using cosine similarity
  • Adaptive re-planning: When a sub-task fails, have Claude generate an alternative plan for that branch
  • Plan preview: Add a /plan command that shows the execution plan before running it, letting the user approve or modify
Cost Note

A full planning pipeline uses 3–5 Claude calls for planning (classify + decompose + synthesize), plus 1 simulated call per sub-task. At Claude Sonnet pricing, a typical run costs $0.05–0.15 — about 3–5x more than a raw ReAct agent, but the success rate improvement (40% → 80%) often makes it worthwhile.

Knowledge Check

1. Why does a ReAct-only agent struggle with tasks that have more than 5 steps?

AThe Claude API has a hard limit of 5 tool calls per conversation
BReAct agents can only use one tool at a time
CWithout a plan, the agent loses track of the overall goal, makes locally reasonable but globally suboptimal choices, and misses dependencies between subtasks
DThe context window fills up after 5 tool calls
Correct! ReAct agents make step-by-step decisions without seeing the full picture. With 5+ steps, they frequently lose track of their progress, backtrack unnecessarily, or miss dependencies between tasks. A planning layer provides the "big picture" roadmap that keeps execution on track.
Not quite. There's no hard limit on tool calls. The real issue is cognitive: without a plan, the agent can't see the full picture. It makes locally reasonable choices ("this search seems useful") that are globally suboptimal ("I should have gathered all names first"). The planning step creates a roadmap that prevents these mistakes.

2. What is a DAG, and why must the graph be acyclic?

AA Directed Acyclic Graph stores data efficiently; cycles waste memory
BA Directed Acyclic Graph represents tasks and dependencies; cycles create deadlocks where no task can start because each waits for another
CA Direct Access Gateway connects tools to the agent; acyclic means one-way data flow
DDAG is a planning framework where acyclic means each task runs exactly once
Correct! A DAG has nodes (tasks) and directed edges (dependencies). "Acyclic" means no circular dependencies — if A depends on B and B depends on A, neither can ever start. Cycle detection (via topological sort) is essential when validating generated plans.
Not quite. A Directed Acyclic Graph has nodes (tasks) connected by directed edges (dependencies: A→B means "A must finish before B starts"). "Acyclic" is critical because a cycle (A depends on B, B depends on A) creates a deadlock — neither task can ever start since each is waiting for the other.

3. An agent has 20 available tools. Why not load all 20 into every prompt?

AThe Claude API rejects requests with more than 10 tools
BMore tools increase response time proportionally (20 tools = 20x slower)
CClaude will call all 20 tools in parallel, causing API rate limit issues
DTool selection accuracy degrades with too many options, and irrelevant tool descriptions waste context window tokens
Correct! Two problems: (1) Claude's tool selection accuracy drops significantly above 5-7 tools per prompt — too many similar options cause confusion. (2) Each tool definition costs 100-200 tokens; 20 tools = 2,000-4,000 wasted tokens per call. Dynamic tool discovery solves both by selecting only the 3-5 most relevant tools per sub-task.
Not quite. There's no hard API limit on tool count, and Claude doesn't call all tools at once. The real issues are: (1) tool selection accuracy degrades above 5-7 tools per prompt, and (2) irrelevant tool descriptions waste context window tokens. Dynamic tool discovery keeps the active tool set focused and relevant.

4. A planning agent decomposes a task into 8 sub-tasks but sub-task 5 fails. What's the best approach?

AAbort the entire task and return an error
BRetry sub-task 5 indefinitely until it succeeds
CRetry once; if it fails again, continue with remaining sub-tasks that don't depend on it, and return a partial result noting what's missing
DRe-decompose the entire task from scratch
Correct! Partial completion is almost always better than total failure. Retry once to handle transient errors, then skip and continue. Sub-tasks that depend on the failed one are marked as skipped. The final result notes "ClickUp pricing data unavailable" rather than losing all the work from the other 7 sub-tasks.
The best approach is resilient partial completion: retry once (handles transient errors), then skip if it fails again. Tasks that don't depend on it continue normally. Tasks that do depend on it are skipped with an explanation. The agent returns a partial result noting what's missing — this is far more valuable than aborting and losing all progress.

5. In M12, you checked stop_reason to determine when the ReAct loop should end. How does the planning agent from this module decide when it's done?

AIt checks stop_reason after each sub-task
BThe DAG executor runs until no more tasks are ready — all tasks are either done, failed, or blocked by failed dependencies
CIt uses the same max_iterations stop condition as M12
DClaude decides when enough sub-tasks have completed
Correct! The DAG executor's completion condition is structural, not heuristic. It stops when _get_ready_tasks() returns an empty list — meaning every task is either done, failed, or waiting on a failed dependency. This is deterministic and reliable, unlike checking natural language signals.
The DAG executor uses a structural completion condition: it stops when no more tasks are in the "ready" state. That means every task is either done, failed, or blocked by a failed dependency. This is deterministic — the plan defines the scope, and the executor runs until that scope is exhausted.

Your Score

0/0

Module Summary

Key Concepts

  • Planning layer: Analyzes the goal and generates a structured execution plan before starting. Improves complex task completion from ~40% to ~80%.
  • Intent classification: A lightweight router that sends simple requests directly to Claude and reserves the planning overhead for genuinely complex tasks.
  • Task decomposition: Uses Claude to break complex goals into a hierarchical DAG of sub-tasks. Each leaf task is achievable in 1-2 tool calls.
  • DAG execution: Runs independent sub-tasks in parallel while respecting sequential dependencies. Typically saves 35% wall-clock time versus sequential execution.
  • Dynamic tool discovery: Selects the 3-5 most relevant tools per sub-task instead of loading all tools into every prompt, improving selection accuracy and reducing token costs.

What We Built

A complete planning agent pipeline: intent classifier → task decomposer (with DAG validation) → parallel DAG executor → progress reporting. The agent can handle complex, multi-step tasks that would defeat a raw ReAct agent.

Next Module Preview

In M14: Multi-Agent Systems, you'll go beyond a single agent handling everything. Instead of one agent with one plan, you'll build a coordinator that delegates sub-tasks to specialized agents — each with their own tools, context, and expertise. This is how production systems handle tasks too complex for any single agent.