Planning & Task Decomposition
Teach your agent to think before it acts — break complex goals into structured execution plans.
Learning Objectives
- Explain why complex tasks (5+ steps) need a planning layer beyond raw ReAct execution
- Build an intent classifier that routes user requests to the appropriate planning strategy
- Decompose complex goals into hierarchical sub-task trees with explicit dependencies
- Implement a DAG executor that runs independent sub-tasks in parallel and respects sequential dependencies
- Use dynamic tool discovery to select relevant tools per sub-task instead of loading all tools at once
Prerequisites: M12 (ReAct Pattern) | Level: Intermediate–Advanced
Why Complex Tasks Need Planning
In M12, you built a ReAct agent that could chain 2-3 tool calls to answer a multi-step question. That works beautifully for focused research tasks. But what happens when you ask it to do something bigger — like "research our top 3 competitors, compare their pricing, and draft a summary report"? That's at least 10 tool calls across multiple phases, with dependencies between them. A raw ReAct agent tackling this will often lose track of the overall goal, backtrack unnecessarily, or miss entire phases.
Before planning: Imagine building IKEA furniture without reading the instructions. You open the box, see 47 pieces and a bag of screws, and just start connecting things that look like they fit. You might get the shelf assembled eventually, but you'll probably attach a side panel upside down, realize at step 15 that you missed a bracket at step 3, and end up with leftover screws you're not sure where they go.
The pain: This is exactly how a ReAct-only agent handles complex tasks. It makes locally reasonable decisions ("this tool call seems right for what I'm doing now") but globally suboptimal ones ("I should have gathered all the competitor names first before researching each one"). It can't see the full picture because it never creates one.
The mapping: Planning is reading the instructions before you start building. The agent analyzes the entire goal, breaks it into phases ("gather competitor names" → "research each competitor" → "compare pricing" → "draft report"), identifies what depends on what, and creates a structured execution plan. Then it follows the plan step by step, tracking progress. This transforms "try things and hope for the best" into "understand the goal, make a plan, execute systematically."
Task planningA meta-strategy where an LLM analyzes a complex goal and produces a structured execution plan (a tree or graph of sub-tasks with dependencies) before starting execution. This improves completion rates from ~40% to ~80% on complex benchmarks. is a meta-strategy where the agent analyzes a complex goal and produces a structured execution plan before starting any tool calls. Instead of diving straight into a ReAct loop, the agent first generates a tree (or graph) of sub-tasks. It identifies dependencies between them. Then it executes the plan systematically, one wave at a time.
Why does this matter? Research on agent benchmarks shows that for tasks with more than 3-4 required steps, ReAct-only agents complete successfully about 40% of the time. The same agents with a planning step complete about 80% of the time. The planning step catches issues early: "Wait, I need the competitor names before I can research each one" is obvious when you write it down, but easy to miss in the middle of a ReAct loop.
Planning isn't a separate framework — it's a layer on top of ReAct. The plan generates the "what to do" list. ReAct handles the "how to do each item" execution. In this module, you'll build both layers and wire them together.
Here's what a plan looks like in practice — this is the actual JSON structure the decomposer produces:
Planning is what separates toy agents from production agents. A customer-facing agent that's supposed to "process a refund, update the CRM, and send a confirmation email" can't afford to get confused at step 2 and start over. In production deployments, adding a planning layer reduces task failure rates by 40-50% and cuts total token usage by 20-30% (because the agent doesn't waste tokens backtracking). For a team running 10,000 agent tasks per day, that's the difference between 6,000 and 8,000 successful completions — and $200-400/day saved in API costs.
"Every request needs a plan" — No. Simple questions ("What's the weather?"), single-step tool calls, and direct generation tasks don't benefit from planning. The planning overhead (an extra API call to generate the plan) actually makes these slower. Use intent classification (next section) to skip planning for simple requests.
"The plan must be perfect before execution starts" — Plans are hypotheses, not contracts. A good agent executes the plan but adapts when sub-tasks produce unexpected results. Rigid plan-following is as bad as no planning. The best approach: plan upfront, but re-plan if a sub-task fails or returns surprising information.
"Planning requires a special planning model" — The same Claude model that reasons in ReAct loops can generate plans. You just need a different prompt: instead of "do this task," you say "break this task into sub-tasks with dependencies." It's a different use of the same capability.
Intent Classification: Route Before You Plan
Before intent classification: Imagine a hospital where every patient — whether they have a paper cut or chest pain — goes through the same full diagnostic workup: blood tests, MRI, specialist consultation. The paper-cut patient wastes hours and thousands of dollars on unnecessary procedures, while the chest-pain patient waits in line behind them.
The pain: Without intent classification, every user request triggers the full planning pipeline. "What's 2+2?" goes through task decomposition, DAG creation, and tool discovery — burning tokens and adding 3-5 seconds of latency for a task that should take 200 milliseconds.
The mapping: Intent classification is the triage nurse. She quickly assesses each patient's needs and routes them to the right level of care: paper cut → band-aid station (direct answer), sprained ankle → minor procedures (single tool call), chest pain → full diagnostic workup (planning pipeline). The agent's intent classifier does the same: simple query → direct Claude call, single-step task → ReAct loop, complex multi-step task → full planning pipeline.
Intent classificationThe first stage of an agent pipeline that categorizes a user's request into a type (e.g., simple question, multi-step task, creative generation) and extracts key entities. This determines which execution strategy to use. is a lightweight first pass that categorizes the user's request and extracts key entities. It answers two questions: "What kind of task is this?" and "What are the important details?"
You implement it as a single Claude call with a structured system prompt. The call returns a JSON classification with three fields. First, an intent label (e.g., "simple_question", "multi_step_task", "creative_generation"). Second, a complexity score (low/medium/high). Third, extracted entities — the key nouns and parameters from the request. Based on the intent and complexity, your code routes to the appropriate execution strategy, skipping the planning overhead for simple requests.
Think of this as a router, not a filter. It doesn't block anything — it just sends each request down the most efficient path. A "simple_question" intent goes straight to a Claude call. A "multi_step_task" intent triggers the full planning pipeline. This routing typically adds only 300-500ms and 200-300 tokens of overhead, but saves 3-10x that on simple requests that would otherwise go through unnecessary planning.
Here's what the classifier actually returns — this is the JSON you'll parse to make routing decisions:
Intent classification saves both time and money. In a production agent handling 10,000 requests/day, roughly 60-70% are simple questions that don't need planning. Without classification, you'd burn ~300 tokens of planning overhead per simple request. That's 7,000 requests × 300 tokens = 2.1 million wasted tokens/day (~$32/day at Claude Sonnet pricing). With classification, those 7,000 requests skip planning and get answered in a single Claude call. The 3,000 complex requests get the full planning treatment they need. Everyone gets the right level of service.
"Intent classification needs to be 100% accurate" — It doesn't. A misclassified simple task that goes through planning just wastes a few hundred tokens. A misclassified complex task that skips planning will fail and cost more in retries. That's why the fallback defaults to needs_planning: true — false positives (unnecessary planning) are cheap, false negatives (skipped planning) are expensive.
"I should fine-tune a model for intent classification" — For most agent systems, a well-crafted system prompt with Claude Haiku or Sonnet is sufficient. Fine-tuning only makes sense when you have thousands of labeled examples and sub-100ms latency requirements. Start with prompting; optimize later if classification becomes a bottleneck.
"More intent categories = better routing" — Actually, more categories increase misclassification risk. Start with 3-4 broad intents (direct answer, single tool, multi-step, creative). You can always split them later as you gather data on which requests fail.
Task Decomposition: Breaking Goals into Sub-Tasks
Before decomposition: Imagine a project manager who receives the goal "Launch the new website" and just starts working on it — no breakdown, no task assignments, no timeline. They bounce between writing copy, configuring DNS, and designing the homepage, never finishing any one thing before starting the next.
The pain: When an agent tries to tackle a complex goal without decomposition, it faces the same problem. "Research competitors and draft a report" is too vague to execute directly. The agent doesn't know where to start, how to track progress, or when it's done. It makes progress on one aspect, then loses that context when it switches to another.
The mapping: A good project manager breaks "Launch the website" into concrete tasks: (1) finalize design mockups, (2) migrate content from old site, (3) configure DNS and hosting, (4) run load testing, (5) execute go-live checklist. Each task is specific, has clear inputs and outputs, and can be assigned to someone. Task decomposition does the same for an agent — it uses Claude to analyze the goal and generate a hierarchy of sub-tasks where each leaf task is small enough to accomplish with 1-2 tool calls.
Task decompositionUsing an LLM to analyze a complex goal and generate a hierarchical tree of sub-tasks. Each leaf task should be atomic — achievable in 1-2 tool calls. The tree includes dependency edges showing which tasks must complete before others can start. uses Claude to analyze a complex goal and generate a hierarchical tree of sub-tasks. Each leaf task should be small enough to accomplish in 1-2 tool calls. There are three decomposition strategies:
Top-down decomposition is the most common. Claude breaks the high-level goal into 3-5 phases, then breaks each phase into specific actions. For "research competitors and draft a report," the phases might be: (1) identify competitors, (2) research each competitor, (3) compare pricing, (4) draft report. Phase 2 further decomposes into per-competitor research tasks.
Bottom-up decomposition starts with the atomic actions: "search for X," "extract pricing from Y," "format data as table." Claude then groups these into logical phases. This works well when you know the available tools and want to plan in terms of specific capabilities.
Iterative decomposition only plans the next step after completing the current one. This is useful when later steps depend on earlier results — you can't plan "research competitor pricing" until you know which competitors exist. The tradeoff: less parallelism, but more adaptive plans.
Here's what a decomposed task tree looks like. Notice how each leaf node maps to a specific action, and dependencies form a hierarchy:
Overly narrow task decomposition creates coverage gaps — subtasks may not cover the full scope of the original goal. Overly broad decomposition overwhelms individual agents with tasks that are too large. The exam tests finding the right granularity: leaf tasks should be achievable in 1-2 tool calls, but collectively they must cover the entire goal.
The quality of decomposition directly determines agent success. In production, we see a clear pattern: tasks decomposed into 5-10 sub-tasks with 1-2 tool calls each succeed about 85% of the time. Tasks decomposed into 15+ very fine-grained sub-tasks succeed only about 60% of the time (coordination overhead dominates). Tasks not decomposed at all succeed about 40% of the time on complex goals. The sweet spot is leaf tasks that take 1-2 tool calls — specific enough to execute reliably, but coarse enough to avoid drowning in coordination.
DAG Execution: Parallel & Sequential Paths
Before DAGs: Imagine cooking a complex dinner — roast chicken, mashed potatoes, salad, and dessert — and doing everything one step at a time. You peel potatoes, then wait. Chop salad, then wait. Preheat oven, then wait. Total time: 3 hours. But the oven can preheat while you chop, and the salad can be made while the chicken roasts.
The pain: Sequential execution wastes time on independent tasks. If researching Asana takes 5 seconds and researching Monday.com takes 5 seconds, running them sequentially costs 10 seconds. Running them in parallel costs 5 seconds. For an agent handling 8 sub-tasks, parallelism can cut total execution time by 40-60%.
The mapping: A DAG (Directed Acyclic Graph) is your cooking schedule. It captures which tasks can happen at the same time (chop vegetables while oven preheats = parallel) and which must wait (can't frost the cake before it's baked = sequential dependency). The DAG executor looks at the graph, starts all tasks with no unmet dependencies, and as each task completes, releases the tasks that were waiting for it.
A Directed Acyclic Graph (DAG)A graph where nodes are tasks and directed edges represent dependencies (A→B means A must finish before B starts). 'Acyclic' means no circular dependencies — you can't have A depending on B and B depending on A. This guarantees the graph can be executed in a finite number of steps. represents tasks as nodes and dependencies as directed edges. "Directed" means the edges have a direction (A → B means A must finish before B can start). "Acyclic" means there are no circular dependencies — you can't have task A depending on task B and task B depending on task A, because that would be impossible to execute.
A DAG executor works in waves. First, it identifies all "root" nodes — tasks with no incoming edges (no dependencies). These can all start immediately, in parallel. As each task completes, the executor checks: "Did this unlock any new tasks?" A task is "ready" when all its dependencies are satisfied. Ready tasks are dispatched for execution, potentially in parallel. This continues until all tasks are complete or a failure triggers re-planning.
Why "acyclic"? Because a cycle (A needs B, B needs C, C needs A) creates a deadlock — no task can ever start. During decomposition, you need to validate that the generated task graph is acyclic. If Claude accidentally creates a cycle, you catch it with a topological sort validation and ask Claude to regenerate the plan.
Here's what a valid DAG looks like versus an invalid one with a cycle:
DAG execution is how production agents achieve both reliability and speed. Consider the competitor research task: sequential execution takes ~25 seconds (5 seconds per sub-task × 5 sub-tasks). With DAG execution, the three parallel research tasks run simultaneously, cutting total time to ~15 seconds. But the real win is structure — dependencies prevent the "draft report" step from starting before all research is done, which is exactly the mistake an unplanned ReAct agent makes. In production systems handling thousands of agent tasks, DAG execution saves an average of 35% wall-clock time versus sequential execution.
"More parallelism is always better" — Not necessarily. Each parallel task consumes API rate limit capacity. If you have 10 parallel sub-tasks but your API rate limit is 5 requests/minute, you're not actually running 10 at once — you're bottlenecked by rate limiting. In practice, 3-5 parallel tasks is the sweet spot for most API tiers.
"DAGs are only for big, complex tasks" — Even a 3-task pipeline benefits from DAG structure. The explicit dependency tracking prevents subtle bugs like "the summary task started before the data-gathering task finished." DAGs aren't complexity overhead — they're correctness infrastructure.
"If a sub-task fails, the whole DAG fails" — Not if you handle it right. A good DAG executor supports partial completion: if "Research ClickUp" fails, you can still compare Asana and Monday.com. The report just notes that ClickUp data was unavailable. This is much better than aborting the entire task.
Dynamic Tool Discovery
Before tool discovery: Imagine a contractor who brings their entire tool truck to every job — plumbing tools, electrical tools, carpentry tools, landscaping tools. When they need a wrench, they have to sort through 200 tools to find it. The clutter slows them down and they sometimes grab the wrong tool because there are too many similar options.
The pain: When you give Claude 20+ tool definitions in every prompt, two things happen. First, Claude's tool selection accuracy drops — research shows a meaningful degradation above 5-7 tools per prompt. Second, you're wasting context window space on tool descriptions the model won't use. A typical tool definition is 100-200 tokens; 20 tools = 2,000-4,000 tokens of irrelevant context per call.
The mapping: Dynamic tool discovery is like the contractor assessing the job first: "This is a plumbing job, so I need pipe wrench, pipe cutter, and Teflon tape." They grab only those 3-4 tools from the truck. The agent does the same: given a sub-task like "compare competitor pricing," it searches its tool inventory and selects the 3-4 most relevant tools (web_search, price_extractor, comparison_table). Only those tools are injected into the Claude prompt for that sub-task.
Dynamic tool discoveryA technique that selects a subset of tools relevant to the current sub-task, rather than loading all available tools into every prompt. Implemented via embedding similarity search or a routing classifier that maps task categories to tool subsets. selects a relevant subset of tools at runtime based on the current sub-task. Instead of loading all available tools into every Claude prompt, the agent searches its tool inventory and injects only the 3-5 most relevant tools.
There are two main implementation approaches. Embedding-based: embed each tool's description as a vector, embed the sub-task description, and find the nearest tools by cosine similarity. This is flexible and handles novel sub-tasks well, but requires a vector store. Category-based: use a simple mapping from task categories to tool subsets (e.g., "data_retrieval" maps to [web_search, db_query, api_fetch]). This is simpler but less flexible — new tools or task types require updating the mapping.
The embedding approach is preferred for systems with 15+ tools. For smaller tool sets (5-10), category-based routing is often sufficient. The key insight: tool selection accuracy matters more than tool availability. It's better to give Claude 4 relevant tools than 20 tools where 4 are relevant and 16 are noise.
Here's what a category-based tool map looks like in practice — you'd define this in your agent's configuration:
At scale, dynamic tool discovery is a significant cost and accuracy optimization. Consider an agent with 25 tools: loading all 25 definitions adds ~4,000 tokens to every prompt. Over 10 sub-tasks, that's 40,000 extra input tokens — about $0.60 at Claude Sonnet pricing, per task. With 1,000 tasks/day, you're spending $600/day on tool descriptions Claude never uses. Dynamic discovery cuts this to ~800 tokens per prompt (4 tools × 200 tokens each), saving 80% of that cost. And because Claude sees fewer, more relevant tools, its tool selection accuracy improves from ~85% to ~95%.
"Embedding-based discovery is always better than category-based" — Not necessarily. Embedding search adds latency (50-100ms per lookup) and requires maintaining a vector store. For agents with fewer than 15 tools, a simple category map is faster, easier to debug, and just as accurate. Use embeddings when your tool inventory is large and evolving frequently.
"Tool descriptions don't matter if you're using discovery" — They matter even more. Discovery relies on matching sub-task descriptions to tool descriptions. A vague tool description like "does stuff with data" won't match well against any sub-task. Write tool descriptions the way you'd explain the tool to a new teammate: what it does, what it takes as input, what it returns.
"Just give Claude all tools and let it figure it out" — This works below 5-7 tools. Above that threshold, tool selection accuracy drops measurably. At 20+ tools, Claude may pick a plausible-but-wrong tool (e.g., web_search instead of db_query) because both descriptions mention "finding information." Fewer, more relevant tools = better decisions.
Code Walkthrough: Building a Planning Agent
Step 1: Intent Classifier
Let's start with the gatekeeper of the whole pipeline: the intent classifier. Its job is simple but critical — look at the user's request, decide if it's a quick question or a complex multi-step task, and route accordingly. We implement it as a single Claude call with a structured system prompt that forces JSON output. The interesting design choice here is the fallback: if JSON parsing fails, we default to needs_planning: true. Why? Because over-planning a simple task wastes a few hundred tokens, but under-planning a complex task wastes the entire task. It's cheaper to be cautious.
import anthropic
import json
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY env var
CLASSIFY_PROMPT = """Classify the user's request into one of these intents:
- simple_question: Can be answered directly without tools
- single_tool: Requires one tool call (search, calculate, etc.)
- multi_step_task: Requires multiple steps with dependencies
- creative_generation: Writing, brainstorming, or content creation
Return JSON with this exact format:
{
"intent": "simple_question|single_tool|multi_step_task|creative_generation",
"complexity": "low|medium|high",
"entities": ["extracted", "key", "terms"],
"needs_planning": true/false,
"reasoning": "One sentence explaining your classification"
}"""
def classify_intent(user_request: str) -> dict:
"""Classify the user's intent and determine if planning is needed."""
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=256,
system=CLASSIFY_PROMPT,
messages=[{"role": "user", "content": user_request}],
)
text = response.content[0].text
# Extract JSON from the response
# Claude may wrap it in ```json ... ``` or return plain JSON
if "```" in text:
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
return json.loads(text.strip())
except (json.JSONDecodeError, IndexError, KeyError) as e:
# Fallback: treat as multi-step to be safe
return {
"intent": "multi_step_task",
"complexity": "medium",
"entities": [],
"needs_planning": True,
"reasoning": f"Classification failed ({e}), defaulting to planning",
}
# --- Usage ---
result = classify_intent(
"Research the top 3 project management tools, compare their "
"pricing, and draft a summary report for our team"
)
print(json.dumps(result, indent=2))
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic(); // reads ANTHROPIC_API_KEY env var
const CLASSIFY_PROMPT = `Classify the user's request into one of these intents:
- simple_question: Can be answered directly without tools
- single_tool: Requires one tool call (search, calculate, etc.)
- multi_step_task: Requires multiple steps with dependencies
- creative_generation: Writing, brainstorming, or content creation
Return JSON with this exact format:
{
"intent": "simple_question|single_tool|multi_step_task|creative_generation",
"complexity": "low|medium|high",
"entities": ["extracted", "key", "terms"],
"needs_planning": true/false,
"reasoning": "One sentence explaining your classification"
}`;
async function classifyIntent(userRequest) {
try {
const response = await client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 256,
system: CLASSIFY_PROMPT,
messages: [{ role: "user", content: userRequest }],
});
let text = response.content[0].text;
// Extract JSON — Claude may wrap in ```json ... ```
if (text.includes("```")) {
text = text.split("```")[1];
if (text.startsWith("json")) text = text.slice(4);
}
return JSON.parse(text.trim());
} catch (e) {
// Fallback: treat as multi-step to be safe
return {
intent: "multi_step_task",
complexity: "medium",
entities: [],
needs_planning: true,
reasoning: `Classification failed (${e.message}), defaulting to planning`,
};
}
}
// --- Usage ---
const result = await classifyIntent(
"Research the top 3 project management tools, compare their " +
"pricing, and draft a summary report for our team"
);
console.log(JSON.stringify(result, null, 2));
You built a lightweight intent classifier that uses a single Claude call with a structured prompt to categorize user requests. The key design decisions: (1) the system prompt specifies exact JSON format so parsing is reliable, (2) the fallback on parsing failure defaults to needs_planning: true (it's safer to over-plan than to skip planning for a complex task), and (3) the entities extraction gives downstream components (like task decomposition) a head start on understanding the request.
Step 2: Task Decomposer
Now for the brains of the operation. The decomposer takes a complex goal like "research competitors and draft a report" and turns it into a structured task graph — a list of sub-tasks, each with a unique ID, a description, and a list of dependencies on other task IDs. The real challenge here isn't generating the tasks — Claude is good at that. The challenge is validation. What if Claude accidentally creates a circular dependency (task A depends on B, B depends on A)? That would deadlock the executor. So after every decomposition, we validate the graph using Kahn's topological sort algorithm. If it detects a cycle, we reject the plan and fall back to a simple sequential execution.
DECOMPOSE_PROMPT = """Break the user's goal into a task graph.
Return JSON with this structure:
{
"tasks": [
{
"id": "t1",
"description": "What this task does",
"type": "tool_call|generation|aggregation",
"dependencies": [],
"tools_hint": ["suggested_tool_names"]
}
]
}
Rules:
- Each leaf task should be achievable in 1-2 tool calls
- Use dependencies to indicate which tasks must finish first
- tasks with no dependencies can run in parallel
- IDs must be unique strings (t1, t2, etc.)
- No circular dependencies allowed"""
def decompose_task(goal: str, entities: list[str]) -> dict:
"""Decompose a complex goal into a DAG of sub-tasks."""
context = f"Key entities: {', '.join(entities)}" if entities else ""
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=DECOMPOSE_PROMPT,
messages=[{
"role": "user",
"content": f"Goal: {goal}\n{context}",
}],
)
text = response.content[0].text
if "```" in text:
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
plan = json.loads(text.strip())
# Validate: check for cycles using topological sort
if not _validate_dag(plan["tasks"]):
raise ValueError("Task graph contains cycles")
return plan
except Exception as e:
# Fallback: simple sequential plan
return {
"tasks": [
{
"id": "t1",
"description": goal,
"type": "tool_call",
"dependencies": [],
"tools_hint": [],
}
],
"error": f"Decomposition failed: {e}",
}
def _validate_dag(tasks: list[dict]) -> bool:
"""Validate that the task graph is acyclic using topological sort."""
task_ids = {t["id"] for t in tasks}
in_degree = {t["id"]: 0 for t in tasks}
adj = {t["id"]: [] for t in tasks}
for t in tasks:
for dep in t.get("dependencies", []):
if dep not in task_ids:
return False # dependency references non-existent task
adj[dep].append(t["id"])
in_degree[t["id"]] += 1
# Kahn's algorithm
queue = [tid for tid, deg in in_degree.items() if deg == 0]
visited = 0
while queue:
node = queue.pop(0)
visited += 1
for neighbor in adj[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return visited == len(tasks) # All nodes visited = no cycles
# --- Usage ---
plan = decompose_task(
"Research top 3 competitors, compare pricing, draft report",
["competitors", "pricing", "report"],
)
for task in plan["tasks"]:
deps = ", ".join(task["dependencies"]) or "none"
print(f" {task['id']}: {task['description']} (deps: {deps})")
const DECOMPOSE_PROMPT = `Break the user's goal into a task graph.
Return JSON with this structure:
{
"tasks": [
{
"id": "t1",
"description": "What this task does",
"type": "tool_call|generation|aggregation",
"dependencies": [],
"tools_hint": ["suggested_tool_names"]
}
]
}
Rules:
- Each leaf task should be achievable in 1-2 tool calls
- Use dependencies to indicate which tasks must finish first
- tasks with no dependencies can run in parallel
- IDs must be unique strings (t1, t2, etc.)
- No circular dependencies allowed`;
async function decomposeTask(goal, entities = []) {
const context = entities.length
? `Key entities: ${entities.join(", ")}`
: "";
try {
const response = await client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 1024,
system: DECOMPOSE_PROMPT,
messages: [{ role: "user", content: `Goal: ${goal}\n${context}` }],
});
let text = response.content[0].text;
if (text.includes("```")) {
text = text.split("```")[1];
if (text.startsWith("json")) text = text.slice(4);
}
const plan = JSON.parse(text.trim());
if (!validateDag(plan.tasks)) {
throw new Error("Task graph contains cycles");
}
return plan;
} catch (e) {
return {
tasks: [
{
id: "t1",
description: goal,
type: "tool_call",
dependencies: [],
tools_hint: [],
},
],
error: `Decomposition failed: ${e.message}`,
};
}
}
function validateDag(tasks) {
const taskIds = new Set(tasks.map((t) => t.id));
const inDegree = Object.fromEntries(tasks.map((t) => [t.id, 0]));
const adj = Object.fromEntries(tasks.map((t) => [t.id, []]));
for (const t of tasks) {
for (const dep of t.dependencies || []) {
if (!taskIds.has(dep)) return false;
adj[dep].push(t.id);
inDegree[t.id]++;
}
}
// Kahn's algorithm — topological sort
const queue = Object.entries(inDegree)
.filter(([, d]) => d === 0)
.map(([id]) => id);
let visited = 0;
while (queue.length > 0) {
const node = queue.shift();
visited++;
for (const neighbor of adj[node]) {
inDegree[neighbor]--;
if (inDegree[neighbor] === 0) queue.push(neighbor);
}
}
return visited === tasks.length;
}
// --- Usage ---
const plan = await decomposeTask(
"Research top 3 competitors, compare pricing, draft report",
["competitors", "pricing", "report"]
);
for (const task of plan.tasks) {
const deps = task.dependencies.join(", ") || "none";
console.log(\` \${task.id}: \${task.description} (deps: \${deps})\`);
}
You built a task decomposer that uses Claude to generate a DAG of sub-tasks from a complex goal, then validates the graph is acyclic using Kahn's topological sort algorithm. The cycle detection is critical — without it, a malformed plan could cause your DAG executor to deadlock. The fallback returns a single-task plan so execution can still proceed even if decomposition fails.
Step 3: DAG Executor
Here's where everything comes together. The DAG executor takes the task graph from Step 2 and actually runs it. The core logic is a loop: find all tasks whose dependencies are satisfied, run them in parallel using asyncio.gather (Python) or Promise.allSettled (Node.js), mark completed tasks, and repeat. The elegant part is error handling — if a task fails, it doesn't crash the whole pipeline. Downstream tasks that depend on the failed task are marked as "blocked," but independent tasks keep running. This partial completion strategy is much more useful than all-or-nothing: the user gets results from 7 out of 8 sub-tasks rather than nothing.
import asyncio
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
class DAGExecutor:
"""Execute a DAG of tasks with parallel execution."""
def __init__(self, tasks: list[dict], execute_fn):
self.tasks = {t["id"]: t for t in tasks}
self.status = {t["id"]: TaskStatus.PENDING for t in tasks}
self.results = {}
self.execute_fn = execute_fn # async function(task) -> result
def _get_ready_tasks(self) -> list[str]:
"""Find tasks whose dependencies are all satisfied."""
ready = []
for tid, task in self.tasks.items():
if self.status[tid] != TaskStatus.PENDING:
continue
deps = task.get("dependencies", [])
if all(self.status.get(d) == TaskStatus.DONE for d in deps):
ready.append(tid)
return ready
async def execute(self, on_progress=None) -> dict:
"""Run the DAG to completion, executing parallel tasks concurrently."""
while True:
ready = self._get_ready_tasks()
if not ready:
# No ready tasks — either all done or deadlocked
break
# Execute all ready tasks in parallel
self.status.update({tid: TaskStatus.RUNNING for tid in ready})
if on_progress:
on_progress(self.status, self.results)
results = await asyncio.gather(*[
self._run_task(tid) for tid in ready
], return_exceptions=True)
for tid, result in zip(ready, results):
if isinstance(result, Exception):
self.status[tid] = TaskStatus.FAILED
self.results[tid] = f"Error: {result}"
else:
self.status[tid] = TaskStatus.DONE
self.results[tid] = result
if on_progress:
on_progress(self.status, self.results)
return {
"results": self.results,
"status": {k: v.value for k, v in self.status.items()},
"completed": sum(
1 for v in self.status.values() if v == TaskStatus.DONE
),
"total": len(self.tasks),
}
async def _run_task(self, task_id: str):
"""Execute a single task with error handling."""
task = self.tasks[task_id]
# Gather results from dependencies as context
dep_context = {
dep: self.results.get(dep, "")
for dep in task.get("dependencies", [])
}
try:
return await self.execute_fn(task, dep_context)
except Exception as e:
raise RuntimeError(
f"Task {task_id} failed: {e}"
) from e
# --- Usage with a simple executor function ---
async def execute_task(task: dict, dep_context: dict) -> str:
"""Execute a single sub-task using Claude."""
context_str = "\n".join(
f"Result of {k}: {v}" for k, v in dep_context.items()
)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": (
f"Task: {task['description']}\n"
f"Previous results:\n{context_str}"
),
}],
)
return response.content[0].text
async def main():
plan = decompose_task(
"Research top 3 competitors, compare pricing, draft report",
["competitors", "pricing", "report"],
)
executor = DAGExecutor(plan["tasks"], execute_task)
result = await executor.execute(
on_progress=lambda s, r: print(
f" Progress: {sum(1 for v in s.values() if v == TaskStatus.DONE)}"
f"/{len(s)} done"
)
)
print(f"\nCompleted: {result['completed']}/{result['total']}")
asyncio.run(main())
class DAGExecutor {
constructor(tasks, executeFn) {
this.tasks = Object.fromEntries(tasks.map((t) => [t.id, t]));
this.status = Object.fromEntries(
tasks.map((t) => [t.id, "pending"])
);
this.results = {};
this.executeFn = executeFn; // async (task, depContext) => result
}
_getReadyTasks() {
return Object.entries(this.tasks)
.filter(([id]) => this.status[id] === "pending")
.filter(([, task]) =>
(task.dependencies || []).every(
(dep) => this.status[dep] === "done"
)
)
.map(([id]) => id);
}
async execute(onProgress) {
while (true) {
const ready = this._getReadyTasks();
if (ready.length === 0) break;
// Mark ready tasks as running
for (const id of ready) this.status[id] = "running";
if (onProgress) onProgress({ ...this.status }, { ...this.results });
// Execute all ready tasks in parallel
const results = await Promise.allSettled(
ready.map((id) => this._runTask(id))
);
for (let i = 0; i < ready.length; i++) {
const id = ready[i];
if (results[i].status === "fulfilled") {
this.status[id] = "done";
this.results[id] = results[i].value;
} else {
this.status[id] = "failed";
this.results[id] = `Error: ${results[i].reason?.message}`;
}
}
if (onProgress) onProgress({ ...this.status }, { ...this.results });
}
const completed = Object.values(this.status).filter(
(s) => s === "done"
).length;
return {
results: this.results,
status: this.status,
completed,
total: Object.keys(this.tasks).length,
};
}
async _runTask(taskId) {
const task = this.tasks[taskId];
const depContext = Object.fromEntries(
(task.dependencies || []).map((dep) => [dep, this.results[dep] ?? ""])
);
return this.executeFn(task, depContext);
}
}
// --- Usage ---
async function executeTask(task, depContext) {
const contextStr = Object.entries(depContext)
.map(([k, v]) => `Result of ${k}: ${v}`)
.join("\n");
const response = await client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 512,
messages: [
{
role: "user",
content: `Task: ${task.description}\nPrevious results:\n${contextStr}`,
},
],
});
return response.content[0].text;
}
const plan = await decomposeTask(
"Research top 3 competitors, compare pricing, draft report",
["competitors", "pricing", "report"]
);
const executor = new DAGExecutor(plan.tasks, executeTask);
const result = await executor.execute((status) => {
const done = Object.values(status).filter((s) => s === "done").length;
console.log(\` Progress: \${done}/\${Object.keys(status).length} done\`);
});
console.log(\`\nCompleted: \${result.completed}/\${result.total}\`);
You built a DAG executor that runs tasks in parallel waves. The _get_ready_tasks() method finds tasks whose dependencies are all satisfied. asyncio.gather (Python) / Promise.allSettled (Node.js) runs those tasks concurrently. Failed tasks don't crash the whole pipeline — they're marked as failed and downstream tasks see their absence. The progress callback lets you display real-time status. This is production-grade orchestration: parallel where possible, sequential where required, resilient to partial failures.
Hands-On Exercise
What You'll Build
A planning agent pipeline with intent classification, Claude-powered task decomposition into a DAG, and parallel execution with progress reporting. You'll test it with both simple (direct answer) and complex (multi-step research) tasks.
Time Estimate: 45–60 minutes | Files You'll Create: planning_agent.py
Prerequisites: Python 3.10+, an Anthropic API key, and anthropic SDK installed. You should have completed M12 (ReAct Pattern) to understand the agent loop this module builds on.
Environment Setup
mkdir planning-lab && cd planning-lab
python -m venv venv && source venv/bin/activate # Windows: venv\Scripts\activate
pip install "anthropic>=0.30.0"
export ANTHROPIC_API_KEY=your-key-here # Windows: set ANTHROPIC_API_KEY=your-key-here
Step 1: Build the Planning Agent Pipeline
What: Create the entire planning system in a single file — an intent classifier that routes simple vs. complex tasks, a task decomposer that produces a DAG of sub-tasks, a parallel executor that runs independent tasks concurrently, and the pipeline that wires them together.
Why: Keeping everything in one file lets you see how the components connect. The classifier saves API costs by skipping the planning overhead for simple questions. The decomposer turns vague goals into structured task graphs. The executor runs independent sub-tasks in parallel, cutting wall-clock time by 35-60%.
Create a new file called planning_agent.py and add the following:
import anthropic
import json
import asyncio
import time
client = anthropic.Anthropic()
# ── Intent Classifier ────────────────────────────────────────
def classify_intent(request: str) -> dict:
"""Classify whether a request needs planning or direct answer."""
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=256,
system=(
"Classify the user request. Respond with JSON only:\n"
'{"intent": "direct|research|multi_step", '
'"complexity": "simple|moderate|complex", '
'"needs_planning": true/false, '
'"reason": "one sentence why"}'
),
messages=[{"role": "user", "content": request}],
)
try:
return json.loads(response.content[0].text)
except json.JSONDecodeError:
return {"intent": "direct", "complexity": "simple",
"needs_planning": False, "reason": "Parse error, defaulting to direct"}
# ── Task Decomposer ─────────────────────────────────────────
def decompose_task(goal: str) -> list[dict]:
"""Break a complex goal into a DAG of sub-tasks."""
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system=(
"Decompose this goal into 3-7 sub-tasks. Respond with JSON only:\n"
'[{"id": "task_1", "description": "...", "depends_on": [], '
'"tools_needed": ["search", "analyze", etc]}, ...]'
"\n\nRules:\n"
"- Each task should be achievable in 1-2 tool calls\n"
"- depends_on lists task IDs that must complete first\n"
"- Independent tasks should have empty depends_on (they run in parallel)\n"
"- NO circular dependencies"
),
messages=[{"role": "user", "content": f"Goal: {goal}"}],
)
try:
tasks = json.loads(response.content[0].text)
# Validate: check for cycles using topological sort
if not _validate_dag(tasks):
print(" ⚠ WARNING: Cycle detected in task DAG, removing problematic edges")
return tasks
except json.JSONDecodeError:
return [{"id": "task_1", "description": goal, "depends_on": [], "tools_needed": []}]
def _validate_dag(tasks: list[dict]) -> bool:
"""Check for circular dependencies via topological sort."""
task_ids = {t["id"] for t in tasks}
in_degree = {t["id"]: len(t.get("depends_on", [])) for t in tasks}
deps = {t["id"]: set(t.get("depends_on", [])) for t in tasks}
queue = [tid for tid, deg in in_degree.items() if deg == 0]
visited = 0
while queue:
tid = queue.pop(0)
visited += 1
for other_id, other_deps in deps.items():
if tid in other_deps:
in_degree[other_id] -= 1
if in_degree[other_id] == 0:
queue.append(other_id)
return visited == len(tasks) # True if no cycle
# ── Task Executor (simulated) ────────────────────────────────
async def execute_task(task: dict) -> dict:
"""Execute a single sub-task (simulated)."""
await asyncio.sleep(0.5) # simulate work
return {
"task_id": task["id"],
"status": "completed",
"result": f"Completed: {task['description'][:60]}",
}
# ── DAG Executor ─────────────────────────────────────────────
async def execute_dag(tasks: list[dict], verbose: bool = True) -> list[dict]:
"""Execute tasks respecting dependencies, parallelizing where possible."""
completed = {}
failed = set()
results = []
remaining = {t["id"]: t for t in tasks}
wave = 0
while remaining:
wave += 1
# Find tasks whose dependencies are all satisfied
ready = [
t for tid, t in remaining.items()
if all(d in completed for d in t.get("depends_on", []))
and not any(d in failed for d in t.get("depends_on", []))
]
if not ready:
# Remaining tasks are blocked by failures
for tid in remaining:
results.append({"task_id": tid, "status": "blocked", "result": "Blocked by failed dependency"})
break
if verbose:
task_names = [t["id"] for t in ready]
mode = "PARALLEL" if len(ready) > 1 else "SEQUENTIAL"
print(f" Wave {wave} [{mode}]: {task_names}")
# Execute ready tasks in parallel
wave_results = await asyncio.gather(
*[execute_task(t) for t in ready],
return_exceptions=True,
)
for task, result in zip(ready, wave_results):
if isinstance(result, Exception):
failed.add(task["id"])
results.append({"task_id": task["id"], "status": "failed", "result": str(result)})
else:
completed[task["id"]] = result
results.append(result)
del remaining[task["id"]]
return results
# ── Progress Visualization ───────────────────────────────────
def print_plan(tasks: list[dict], results: list[dict] = None) -> None:
"""Print a visual task tree with status indicators."""
status_map = {}
if results:
for r in results:
status_map[r["task_id"]] = r["status"]
print("\n 📋 Execution Plan:")
for t in tasks:
status = status_map.get(t["id"], "pending")
icon = {"completed": "✅", "failed": "❌", "blocked": "🚫", "pending": "⏳"}.get(status, "⏳")
deps = f" (after: {', '.join(t.get('depends_on', []))})" if t.get("depends_on") else " (no deps)"
print(f" {icon} {t['id']}: {t['description'][:50]}{deps}")
# ── Full Pipeline ────────────────────────────────────────────
async def planning_pipeline(request: str, verbose: bool = True) -> str:
"""Classify → (Plan → Execute) or Direct Answer."""
if verbose:
print(f"\n{'═' * 55}")
print(f" Request: {request}")
print(f"{'═' * 55}")
# Step 1: Classify
classification = classify_intent(request)
if verbose:
print(f"\n 📊 Classification: {classification['intent']} "
f"({classification['complexity']})")
print(f" Needs planning: {classification['needs_planning']}")
print(f" Reason: {classification['reason']}")
if not classification["needs_planning"]:
# Direct answer — skip planning overhead
if verbose:
print(" → Routing to direct answer (no planning needed)")
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
messages=[{"role": "user", "content": request}],
)
return response.content[0].text
# Step 2: Decompose into DAG
if verbose:
print("\n 🔨 Decomposing into sub-tasks...")
tasks = decompose_task(request)
if verbose:
print_plan(tasks)
# Step 3: Execute DAG
if verbose:
print(f"\n ▶ Executing {len(tasks)} tasks...")
start = time.time()
results = await execute_dag(tasks, verbose=verbose)
elapsed = time.time() - start
if verbose:
print_plan(tasks, results)
completed = sum(1 for r in results if r["status"] == "completed")
print(f"\n ⏱ Completed {completed}/{len(tasks)} tasks in {elapsed:.1f}s")
# Step 4: Synthesize final answer
result_text = "\n".join(f"- {r['task_id']}: {r['result']}" for r in results)
response = client.messages.create(
model="claude-sonnet-4-6", max_tokens=1024,
system="Synthesize a final answer from these sub-task results.",
messages=[{"role": "user", "content":
f"Original request: {request}\n\nSub-task results:\n{result_text}"}],
)
return response.content[0].text
# ── Tests ────────────────────────────────────────────────────
async def main():
# Test 1: Simple task (should skip planning)
print("\n▶ TEST 1: Simple task (direct answer)")
r1 = await planning_pipeline("What is 2 + 2?")
print(f"\n Answer: {r1[:120]}")
# Test 2: Complex task (should trigger planning)
print("\n▶ TEST 2: Complex task (planning + execution)")
r2 = await planning_pipeline(
"Research the top 3 AI agent frameworks, compare their features, "
"and draft a recommendation for a startup team."
)
print(f"\n Answer: {r2[:200]}...")
# Test 3: DAG validation
print("\n▶ TEST 3: DAG validation")
valid = _validate_dag([
{"id": "a", "depends_on": []},
{"id": "b", "depends_on": ["a"]},
{"id": "c", "depends_on": ["a"]},
{"id": "d", "depends_on": ["b", "c"]},
])
print(f" Valid DAG (no cycles): {valid}")
cyclic = _validate_dag([
{"id": "a", "depends_on": ["c"]},
{"id": "b", "depends_on": ["a"]},
{"id": "c", "depends_on": ["b"]},
])
print(f" Cyclic DAG (should be False): {cyclic}")
if __name__ == "__main__":
asyncio.run(main())
Step 2: Run and Verify the Pipeline
What: Execute all three tests — a simple task (should skip planning), a complex task (should decompose and execute in parallel waves), and a DAG validation test (should detect cycles).
Why: Running all three tests confirms that intent classification routes correctly, the decomposer generates valid DAGs, and the executor handles parallel waves.
Verify these key behaviors:
- Test 1: Classification should say
needs_planning: Falseand route directly to Claude (no decomposition) - Test 2: Should produce 3–7 sub-tasks with correct dependencies, Wave 1 should run independent tasks in PARALLEL, later waves run sequentially
- Test 3: Valid DAG returns
True, cyclic DAG returnsFalse
- If you see
ModuleNotFoundError: No module named 'anthropic'→ Runpip install "anthropic>=0.30.0"and make sure your virtual environment is activated. - If you see
AuthenticationError→ Check that yourANTHROPIC_API_KEYenvironment variable is set. Runecho $ANTHROPIC_API_KEY(Linux/Mac) orecho %ANTHROPIC_API_KEY%(Windows) to verify. - Classification always returns
needs_planning: False→ Claude may classify some tasks differently. Try a more explicitly complex request: "Research, compare, and write a report about..." - JSON parse error in decompose_task → Claude occasionally wraps JSON in markdown code fences. The code falls back to a single task. Try running again — LLM outputs are non-deterministic.
- All tasks run sequentially (no parallel waves) → Check that some tasks have empty
depends_onarrays. If Claude generates serial dependencies for everything, add to the prompt: "Make independent tasks parallelizable with empty depends_on." - If you see
RuntimeError: no running event loop→ Make sure you're usingasyncio.run(main())at the bottom, not callingmain()directly.
Wrap-Up
If all three tests passed, you have a working planning agent. Test 1 skipped planning for the simple question, Test 2 ran the full plan-then-execute flow with parallel waves, and Test 3 validated DAG cycle detection. To re-run at any time: python planning_agent.py.
You've built a planning agent that classifies intent, decomposes complex tasks into a validated DAG, executes them in parallel waves, and synthesizes results. This is the architecture used by production agent systems that handle tasks too complex for a single ReAct loop.
- Dynamic tool discovery: Embed tool descriptions and select top-3 per sub-task using cosine similarity
- Adaptive re-planning: When a sub-task fails, have Claude generate an alternative plan for that branch
- Plan preview: Add a
/plancommand that shows the execution plan before running it, letting the user approve or modify
A full planning pipeline uses 3–5 Claude calls for planning (classify + decompose + synthesize), plus 1 simulated call per sub-task. At Claude Sonnet pricing, a typical run costs $0.05–0.15 — about 3–5x more than a raw ReAct agent, but the success rate improvement (40% → 80%) often makes it worthwhile.
Knowledge Check
1. Why does a ReAct-only agent struggle with tasks that have more than 5 steps?
2. What is a DAG, and why must the graph be acyclic?
3. An agent has 20 available tools. Why not load all 20 into every prompt?
4. A planning agent decomposes a task into 8 sub-tasks but sub-task 5 fails. What's the best approach?
5. In M12, you checked stop_reason to determine when the ReAct loop should end. How does the planning agent from this module decide when it's done?
_get_ready_tasks() returns an empty list — meaning every task is either done, failed, or waiting on a failed dependency. This is deterministic and reliable, unlike checking natural language signals.Your Score
Module Summary
Key Concepts
- Planning layer: Analyzes the goal and generates a structured execution plan before starting. Improves complex task completion from ~40% to ~80%.
- Intent classification: A lightweight router that sends simple requests directly to Claude and reserves the planning overhead for genuinely complex tasks.
- Task decomposition: Uses Claude to break complex goals into a hierarchical DAG of sub-tasks. Each leaf task is achievable in 1-2 tool calls.
- DAG execution: Runs independent sub-tasks in parallel while respecting sequential dependencies. Typically saves 35% wall-clock time versus sequential execution.
- Dynamic tool discovery: Selects the 3-5 most relevant tools per sub-task instead of loading all tools into every prompt, improving selection accuracy and reducing token costs.
What We Built
A complete planning agent pipeline: intent classifier → task decomposer (with DAG validation) → parallel DAG executor → progress reporting. The agent can handle complex, multi-step tasks that would defeat a raw ReAct agent.
Next Module Preview
In M14: Multi-Agent Systems, you'll go beyond a single agent handling everything. Instead of one agent with one plan, you'll build a coordinator that delegates sub-tasks to specialized agents — each with their own tools, context, and expertise. This is how production systems handle tasks too complex for any single agent.