Error Handling and Recovery
Agents fail. Tools time out, APIs return 500s, models hallucinate malformed JSON, rate limits kick in mid-task, and external services go down without warning. In traditional software, a failure usually means a clear exception and a stack trace. In an agent, a failure is just another observation — and what the system does with that observation determines whether the agent recovers gracefully or spirals into a loop of confused retries.
The difference between a fragile agent and a resilient one is whether the system is designed to expect failures. Resilient agents treat errors as first-class data: they classify them, decide on a strategy (retry, fall back, escalate, abort), and continue with a clear plan. Fragile agents either crash on the first error or retry blindly until they exhaust their budget.
We touched on this briefly when we discussed feeding tool failures back as observations and system prompt rules for retry limits. Here we go deeper: the full taxonomy of agent failures and the engineering patterns for handling each one.
A Taxonomy of Agent Failures #
Before you can handle errors well, you need to classify them. Different failure types demand different responses. Retrying a rate-limited API call makes sense. Retrying a request that the model fundamentally misunderstood does not.
Agent failures fall into four categories:
Transient infrastructure failures. The API returned a 429 or 503. The network timed out. The database connection dropped. These are temporary — the same request will likely succeed if you wait and try again. This is the classic retry case.
Model output failures. The model returned malformed JSON, called a tool that does not exist, or produced arguments that fail schema validation. The model tried to do the right thing but got the format wrong. A retry with the error fed back as context often fixes these.
Semantic failures. The model followed instructions correctly but produced a wrong answer — a hallucinated fact, an incorrect calculation, a misinterpretation of the user's intent. Retrying the same call will likely produce the same (or equally wrong) result. These need a different strategy: a different prompt, a different model, or human escalation.
Unrecoverable failures. The tool is permanently unavailable. The user's request violates a hard policy. The agent has exhausted its budget. No amount of retrying will help. The right response is to stop, report the situation clearly, and let the user or the calling system decide what to do next.
┌──────────────────────────────────────────────────────────────┐
│ Agent Failure │
├──────────────┬───────────────┬───────────────┬───────────────┤
│ Transient │ Model Output │ Semantic │ Unrecoverable │
│ │ │ │ │
├──────────────┼───────────────┼───────────────┼───────────────┤
│ 429, 503, │ Bad JSON, │ Wrong │ Tool gone, │
│ timeout, │ unknown tool, │ answer, │ policy │
│ conn reset │ invalid args │ hallucination │ violation, │
│ │ │ │ budget │
├──────────────┼───────────────┼───────────────┼───────────────┤
│ Retry with │ Retry with │ Fallback, │ Abort │
│ backoff │ error context │ escalate, │ cleanly │
│ │ │ reroute │ │
└──────────────┴───────────────┴───────────────┴───────────────┘
The classification can often happen automatically. HTTP status codes map cleanly: 429 and 503 are transient; 400 and 404 are not. JSON parse errors and schema validation failures are model output errors. Semantic failures are harder to catch — they usually require a second opinion, either from a critic model or from a human.
Retries and Exponential Backoff #
Transient failures are the most common and the most straightforward to handle. The pattern is retry with exponential backoff and jitter: wait a short time, try again, and double the wait on each subsequent attempt. Jitter — adding randomness to the delay — prevents thundering herds when many agents hit the same API simultaneously.
import random
import asyncio
async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable: tuple = (TransientError, TimeoutError, RateLimitError),
):
"""Retry a function with exponential backoff and jitter."""
last_error = None
for attempt in range(max_retries + 1):
try:
return await fn()
except retryable as e:
last_error = e
if attempt == max_retries:
break
retry_after = getattr(e, "retry_after", None)
delay = retry_after if retry_after is not None else base_delay * (2 ** attempt)
delay = min(delay, max_delay)
jitter = random.uniform(0, delay * 0.5)
await asyncio.sleep(delay + jitter)
raise MaxRetriesExceeded(
f"Failed after {max_retries + 1} attempts",
last_error=last_error,
)
Three design decisions matter here:
What counts as retryable. Only transient errors belong in the retry list. If you retry a 400 Bad Request, you will get the same 400 every time. If you retry a schema validation failure with the same prompt, you will probably get the same bad JSON. Be specific about which exceptions trigger a retry.
How many retries. Three is a reasonable default for API calls. For expensive model calls, two might be enough — each retry costs tokens, adds latency, and burns budget. The right number depends on the cost of the operation versus the cost of failure.
When to respect Retry-After. Many APIs return a Retry-After header with rate-limit responses. If the header says "wait 30 seconds," your backoff should honor that instead of using its own schedule. Ignoring it leads to repeated rejections and, in some cases, longer lockouts.
async def call_tool_with_retry(tool_name: str, arguments: dict, tool_runner) -> dict:
"""Call a tool with automatic retry for transient failures."""
async def attempt():
result = await tool_runner.execute(tool_name, arguments)
if result.status_code in (429, 503):
retry_after = result.headers.get("Retry-After")
error = RateLimitError(
f"{tool_name} returned {result.status_code}",
retry_after=float(retry_after) if retry_after else None,
)
raise error
if result.status_code >= 500:
raise TransientError(f"{tool_name} returned {result.status_code}")
return result
return await retry_with_backoff(attempt)
Feeding Errors Back to the Model #
For model output failures — malformed JSON, invalid tool names, bad arguments — the most effective strategy is to tell the model what went wrong and let it fix the mistake. We covered this idea in the ReAct pattern: failures become observations. The model sees the error, adjusts, and tries again.
The key is structuring the error message so the model can act on it. A raw Python traceback is noise. A clear, specific message is a prompt the model can learn from in-context.
def format_tool_error(tool_name: str, error: Exception, arguments: dict) -> str:
"""Format a tool error as a model-readable observation."""
if isinstance(error, ToolNotFoundError):
return (
f"Error: tool '{tool_name}' does not exist. "
f"Available tools: {', '.join(tool_registry.keys())}. "
f"Pick one of these instead."
)
if isinstance(error, ValidationError):
return (
f"Error: invalid arguments for '{tool_name}'. "
f"Issues: {error.details}. "
f"Expected schema: {json.dumps(tool_registry[tool_name]['parameters'])}. "
f"Fix the arguments and try again."
)
if isinstance(error, TimeoutError):
return (
f"Error: '{tool_name}' timed out after {error.timeout}s. "
f"The service may be slow. You can retry or try a different approach."
)
return f"Error: '{tool_name}' failed with: {str(error)}"
Notice the pattern: each error message tells the model three things — what happened, why it happened, and what to do about it. "Tool not found" includes the list of valid tools. "Validation error" includes the expected schema. "Timeout" suggests both retry and alternative approaches. The model has enough context to make a reasonable next move.
Error Budgets Inside the Loop #
Feeding errors back to the model creates a risk: the model might get stuck in a retry loop, burning through tokens on the same mistake. You need a limit — an error budget that caps how many consecutive errors the agent tolerates before escalating or aborting.
class ErrorBudget:
"""Track consecutive errors and enforce limits."""
def __init__(self, max_consecutive: int = 3, max_total: int = 10):
self.max_consecutive = max_consecutive
self.max_total = max_total
self.consecutive = 0
self.total = 0
def record_error(self, error_type: str) -> str:
"""Record an error and return the action: 'continue', 'warn', or 'abort'."""
self.consecutive += 1
self.total += 1
if self.total >= self.max_total:
return "abort"
if self.consecutive >= self.max_consecutive:
return "warn"
return "continue"
def record_success(self):
"""Reset consecutive counter on success."""
self.consecutive = 0
When the budget returns "warn," the agent injects a message into the context: "You have failed 3 times in a row on this step. Try a fundamentally different approach or ask the user for clarification." When it returns "abort," the agent stops the loop and returns an error to the user. This is the difference between a resilient agent and an expensive infinite loop.
Circuit Breakers #
Retries handle individual call failures. Circuit breakers handle systemic failures — when an entire service is down and every call will fail. Without a circuit breaker, the agent will keep sending requests to a dead service, accumulating timeouts and wasting time it could spend on a fallback path.
The circuit breaker pattern borrows from distributed systems. It has three states: closed (normal operation, requests flow through), open (the service is known to be down, requests fail immediately without calling the service), and half-open (a probe request is sent to check if the service has recovered).
failure threshold
exceeded
┌─────────────┐ ┌──────────────────┐
│ CLOSED │ ─────────────────────────────▶ │ OPEN │
│ │ │ │
│Requests │ │ Requests │
│flow through │ │fail immediately │
│ │◀──── probe succeeds ────────── │ │
└─────────────┘ │ │
▲ └────┬─────────────┘
│ │
│ ┌───────────┐ │
│ │ HALF-OPEN │ │
└──────────────│ │◀───────────────────┘
probe │One probe │ timeout
succeeds │request │ expires
│allowed │
└───────────┘
import time
class CircuitBreaker:
"""Prevent repeated calls to a failing service."""
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.state = "closed"
self.last_failure_time = 0
def allow_request(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
return True
return False
if self.state == "half-open":
return True # Allow the probe request
return False
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == "half-open":
self.state = "open"
elif self.failure_count >= self.failure_threshold:
self.state = "open"
In an agent context, you typically have one circuit breaker per external service (or per tool, if tools map to separate backends). When the circuit opens, the agent gets an immediate error observation: "Service X is currently unavailable (circuit open). Use an alternative approach or inform the user." The model can then fall back to a different tool, use cached data, or give a partial answer — instead of waiting 30 seconds for a timeout five times in a row.
Fallback Strategies #
When the primary path fails and retries are exhausted, the agent needs a Plan B. Fallbacks define what that plan looks like. The right fallback depends on what failed and how critical it is to the user's request.
Model Fallbacks #
If the primary model is down or returning errors, fall back to a secondary model. This is straightforward to implement — swap the model identifier and retry:
MODEL_CHAIN = ["primary-model", "fallback-model-a", "fallback-model-b"]
async def call_with_model_fallback(prompt: str, tools: list) -> ModelResponse:
"""Try models in order until one succeeds."""
last_error = None
for model_id in MODEL_CHAIN:
try:
return await call_model(model=model_id, prompt=prompt, tools=tools)
except (TransientError, RateLimitError) as e:
last_error = e
continue
raise AllModelsFailed(f"All models failed. Last error: {last_error}")
The trade-off is quality versus availability. A fallback model is typically smaller, cheaper, and less capable — it might lack tool-use support, handle fewer languages, or produce shallower reasoning. For simple queries (summarization, classification, FAQ lookup), the user may not notice the difference. For complex tasks (multi-step planning, code generation, nuanced analysis), a weaker model can produce subtly wrong results that are harder to catch than an outright failure. The decision comes down to your domain: if a wrong answer is worse than no answer, abort instead of falling back. If approximate is acceptable, fall back and log it. Either way, always log when a fallback fires so you can review the output quality and measure how often the primary path degrades.
Tool Fallbacks #
When a tool fails, the agent might have an alternative way to get the same information. A web search tool can fall back to a cached knowledge base. A real-time pricing API can fall back to a static price list with a staleness warning. The fallback is not as good, but it is better than nothing.
The cleanest way to implement tool fallbacks is in the tool registry — each tool can declare its fallback:
TOOL_REGISTRY = {
"get_live_price": {
"handler": get_live_price,
"fallback": "get_cached_price",
"fallback_warning": "Using cached pricing data (may be up to 1 hour stale).",
},
"get_cached_price": {
"handler": get_cached_price,
"fallback": None,
},
}
async def execute_with_fallback(tool_name: str, arguments: dict) -> ToolResult:
"""Execute a tool, falling back if the primary fails."""
try:
return await execute_tool(tool_name, arguments)
except (TransientError, TimeoutError):
meta = TOOL_REGISTRY[tool_name]
if meta.get("fallback"):
result = await execute_tool(meta["fallback"], arguments)
result.warning = meta.get("fallback_warning")
return result
raise
The warning is important. When the agent uses a fallback, the model — and ultimately the user — should know that the data comes from a less reliable source. Silently swapping a real-time API for a cache without disclosure erodes trust.
Graceful Degradation #
Sometimes the best fallback is a partial answer. If the agent needs three pieces of information and one tool is down, it can return the two it has with a clear note about what is missing.
async def gather_with_degradation(tasks: dict[str, Callable]) -> dict:
"""Run multiple data-gathering tasks. Collect successes, note failures."""
results = {}
failures = {}
for name, task_fn in tasks.items():
try:
results[name] = await task_fn()
except Exception as e:
failures[name] = str(e)
return {
"data": results,
"unavailable": failures,
"complete": len(failures) == 0,
}
The model receives both the available data and the list of what failed. Its response can then say: "I was able to retrieve your order history and shipping status, but the billing system is currently unavailable. Here is what I have..." This is almost always better than returning nothing or making the user wait for a full retry cycle.
Degradation is not always appropriate, though. If the missing piece is essential to the answer — a financial calculation missing a key input, a compliance check missing the policy document — returning a partial result can be worse than returning an error. The rule of thumb: degrade when the missing data is supplementary (nice to have, adds context), and block when it is foundational (the answer is meaningless without it). Tag each data-gathering task with a criticality level so the agent can make this distinction automatically.
Resumable Workflows #
Some agent tasks take minutes or hours — processing a batch of documents, running a multi-step analysis, migrating data across systems. If the agent fails on step 47 of 100, starting over from step 1 is expensive and frustrating. Resumable workflows solve this by checkpointing progress so the agent can pick up where it left off.
The idea is simple: after each step completes successfully, persist the step result. On restart, load the persisted state and skip steps that are already done.
class CheckpointedWorkflow:
"""A workflow that saves progress and can resume after failure."""
def __init__(self, workflow_id: str, steps: list, store: CheckpointStore):
self.workflow_id = workflow_id
self.steps = steps
self.store = store
async def run(self) -> dict:
state = await self.store.load(self.workflow_id) or {}
for i, step in enumerate(self.steps):
step_key = f"step_{i}_{step.name}"
if step_key in state:
# Already completed — skip
continue
try:
result = await step.execute(state)
state[step_key] = {
"status": "completed",
"result": result,
"timestamp": time.time(),
}
await self.store.save(self.workflow_id, state)
except Exception as e:
state[step_key] = {
"status": "failed",
"error": str(e),
"timestamp": time.time(),
}
await self.store.save(self.workflow_id, state)
raise WorkflowStepFailed(
step=step.name,
step_index=i,
total_steps=len(self.steps),
error=e,
)
return state
The checkpoint store can be a database, a file, or even a key-value store — anything durable. The important properties are: checkpoints survive process restarts, each step's result is stored with enough detail to reconstruct the workflow state, and step results are immutable once written (no accidental overwrites on retry).
Idempotent Steps #
Resumability only works if steps are idempotent — running a step twice with the same input produces the same result and the same side effects. If step 5 sends an email and the workflow crashes after sending but before checkpointing, a naive restart will send the email again.
We covered idempotency keys for tool calls. The same principle applies here: each step should have a deterministic key that downstream systems can use to deduplicate. For steps that call external APIs, include the idempotency key in the request. For steps that produce artifacts (files, database rows), check for existence before creating.
class IdempotentStep:
"""A workflow step that can safely be retried."""
def __init__(self, name: str, fn, key_fn):
self.name = name
self.fn = fn
self.key_fn = key_fn # generates idempotency key from input
async def execute(self, state: dict) -> dict:
idempotency_key = self.key_fn(state)
# Check if this step already produced a result externally
existing = await check_external_result(idempotency_key)
if existing:
return existing
return await self.fn(state, idempotency_key=idempotency_key)
This is the boring, essential plumbing that makes the difference between a workflow you can restart with confidence and one you restart while holding your breath.
Timeout Management #
Every external call an agent makes should have a timeout. This sounds obvious, but the default in many HTTP libraries is no timeout — meaning a single stalled API call can block the entire agent loop indefinitely.
Timeouts need to be set at three levels:
Individual tool call timeout. How long a single API call can take before the agent gives up. For a database query, 5-10 seconds is reasonable. For a document processing service, 30-60 seconds. For a code execution sandbox, 120 seconds.
Step timeout. How long the agent can spend on a single step, including retries. If a tool call times out and the agent retries three times, the step timeout prevents the retries from taking too long collectively.
Session timeout. The total time the agent has to complete the entire task. This is the outer budget. Even if individual steps are within limits, a session that has been running for 10 minutes on a 2-minute task has gone wrong somewhere.
class TimeoutManager:
"""Enforce timeouts at tool, step, and session levels."""
def __init__(self, tool_timeout: float, step_timeout: float, session_timeout: float):
self.tool_timeout = tool_timeout
self.step_timeout = step_timeout
self.step_start = time.time()
self.session_start = time.time()
self.session_timeout = session_timeout
def step_remaining(self) -> float:
elapsed = time.time() - self.step_start
remaining = self.step_timeout - elapsed
if remaining <= 0:
raise StepTimeoutError("Step time budget exceeded")
return remaining
def session_remaining(self) -> float:
elapsed = time.time() - self.session_start
remaining = self.session_timeout - elapsed
if remaining <= 0:
raise SessionTimeoutError("Session time budget exceeded")
return remaining
async def call_with_timeout(self, fn, timeout_override: float = None):
timeout = min(
timeout_override or self.tool_timeout,
self.step_remaining(),
self.session_remaining(),
)
try:
return await asyncio.wait_for(fn(), timeout=timeout)
except asyncio.TimeoutError:
raise ToolTimeoutError(f"Call timed out after {timeout}s")
The nested structure matters. A tool timeout of 10 seconds means one call cannot stall forever. A step timeout of 30 seconds means retries cannot accumulate indefinitely. A session timeout of 300 seconds means the entire task has a budget. Each level catches a different kind of runaway.
Putting It All Together #
In practice, these patterns do not exist in isolation. A resilient agent runtime stacks them into a layered defense:
┌───────────────────────────────────────────────────┐
│ Session Timeout │
│ ┌─────────────────────────────────────────────┐ │
│ │ Agent Loop │ │
│ │ │ │
│ │ ┌───────────────────────────────────────┐ │ │
│ │ │ Step Timeout │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────┐ │ │ │
│ │ │ │ Circuit Breaker per service │ │ │ │
│ │ │ │ ┌───────────────────────────┐ │ │ │ │
│ │ │ │ │ Retry with backoff │ │ │ │ │
│ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │
│ │ │ │ │ │ Tool call + timeout │ │ │ │ │ │
│ │ │ │ │ └─────────────────────┘ │ │ │ │ │
│ │ │ │ └───────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────┘ │ │ │
│ │ └───────────────────────────────────────┘ │ │
│ │ │ │
│ │ Error budget: max consecutive / total │ │
│ │ Fallback chain: model → tool → degrade │ │
│ │ Checkpoint: save state after each step │ │
│ └─────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────┘
Here is how they compose for a single tool call inside the agent loop:
async def execute_tool_resilient(
tool_name: str,
arguments: dict,
breakers: dict[str, CircuitBreaker],
timeout_mgr: TimeoutManager,
error_budget: ErrorBudget,
) -> ToolResult:
"""Execute a tool with the full resilience stack."""
breaker = breakers[tool_name]
# 1. Circuit breaker — fail fast if service is known-down
if not breaker.allow_request():
fallback = TOOL_REGISTRY[tool_name].get("fallback")
if fallback:
return await execute_tool_resilient(
fallback, arguments, breakers, timeout_mgr, error_budget,
)
raise ServiceUnavailable(f"{tool_name}: circuit open, no fallback")
# 2. Retry with backoff, wrapped in a timeout
try:
result = await retry_with_backoff(
fn=lambda: timeout_mgr.call_with_timeout(
lambda: execute_tool(tool_name, arguments),
),
max_retries=2,
)
breaker.record_success()
error_budget.record_success()
return result
except MaxRetriesExceeded as e:
breaker.record_failure()
action = error_budget.record_error("tool_failure")
if action == "abort":
raise AgentAbort("Error budget exhausted")
# Try fallback
fallback = TOOL_REGISTRY[tool_name].get("fallback")
if fallback:
return await execute_tool_resilient(
fallback, arguments, breakers, timeout_mgr, error_budget,
)
# No fallback — return error as observation for the model
return ToolResult(
success=False,
error=format_tool_error(tool_name, e.last_error, arguments),
)
The order matters. The circuit breaker checks first (microseconds, saves you from pointless retries). The retry loop runs inside the timeout manager (caps total time). The error budget tracks cumulative failures across steps (prevents runaway loops). Fallbacks engage only after retries are exhausted (prefer the primary path). If everything fails, the error becomes an observation for the model to reason about.
Conclusion #
Error handling in agents is the difference between an agent that works in demos and one that works in production.
- Classify errors. Transient, model output, semantic, and unrecoverable failures each demand different responses. Retrying a semantic failure is a waste; aborting on a transient failure is premature.
- Retry with backoff. Exponential backoff with jitter handles transient failures. Be specific about what counts as retryable, cap the number of retries, and respect
Retry-Afterheaders. - Feed errors back. Model output failures are best fixed by telling the model what went wrong — clearly, specifically, with enough context to correct the mistake.
- Budget errors. Cap consecutive and total failures per session. Without a budget, a confused model will retry forever.
- Use circuit breakers. When a service is down, fail fast and fall back instead of accumulating timeouts.
- Design fallbacks. Model fallbacks, tool fallbacks, and graceful degradation with partial answers. A partial answer with a clear disclaimer is better than no answer.
- Checkpoint long workflows. Persist step results so multi-step tasks can resume after failure instead of starting over.
- Layer timeouts. Tool-level, step-level, and session-level timeouts prevent any single failure from stalling the entire agent.
The common thread across all of these patterns: make failure visible. Log it, surface it to the model as context, and surface it to the user when necessary. Hidden failures compound. Visible failures get fixed.