Cost Optimization

Publish at:

An agent that works perfectly but costs a dollar per task is a prototype. An agent that costs a tenth of a cent per task at the same quality level is a product. The difference is engineering around the model: intercepting redundant work before it reaches the API, compressing what you send, batching what you can defer, and building architectures that are aware of their own budget.

Cost in agent systems is dominated by token consumption. Every reasoning step, every tool call description, every context window stuffed with history — these are tokens flowing through the meter. The levers are straightforward: send fewer tokens, send them to cheaper models, avoid sending them at all when the answer already exists. But applying those levers without degrading quality requires careful design.

Where the Money Goes #

Before optimizing, you need to know what you are optimizing. Agent costs break down into a few dominant categories:

Input tokens. The system prompt, conversation history, tool schemas, retrieved context (RAG chunks), and the user's actual request. For a well-equipped agent with 20 tools and a detailed system prompt, the input can be 3,000-5,000 tokens before the user says a word.

Output tokens. The model's reasoning, tool call arguments, and final response. Output tokens are typically 3-5x more expensive than input tokens per unit.

Iteration multiplier. A ReAct agent making 8 tool calls means 8 round trips to the model. Each round trip carries the full (growing) context. By iteration 8, you are paying for the system prompt, all previous reasoning, all tool results, and the new step — accumulated.

Embedding calls. RAG systems embed queries and sometimes re-embed documents. At scale, embedding costs add up.

Tool execution. External API calls, database queries, and compute for code execution. Not LLM costs, but part of the total.

┌────────────────────────────────────────────────────────────────┐
│           Cost Anatomy of a Typical Agent Task                 │
│                                                                │
│  Step 1: Initial call                                          │
│  ├─ System prompt:        1,200 tokens                         │
│  ├─ Tool schemas:         1,800 tokens                         │
│  ├─ User message:           100 tokens                         │
│  ├─ Output (reasoning):     300 tokens                         │
│  └─ Subtotal:     3,100 in + 300 out                           │
│                                                                │
│  Step 2: After tool result                                     │
│  ├─ Previous context:     3,400 tokens                         │
│  ├─ Tool result:            500 tokens                         │
│  ├─ Output (next action):   200 tokens                         │
│  └─ Subtotal:     3,900 in + 200 out                           │
│                                                                │
│  Step 3-8: Accumulating context...                             │
│  └─ By step 8:  ~8,000 in + 400 out per call                   │
│                                                                │
│  Total for task: ~42,000 input tokens + ~2,500 output tokens   │
│                                                                │
│  At $3/M input, $15/M output:                                  │
│  Cost = $0.126 + $0.0375 = $0.16 per task                      │
│                                                                │
│  At 100,000 tasks/day = $16,000/day                            │
└────────────────────────────────────────────────────────────────┘

The accumulating context is the real killer. Each iteration pays for everything that came before. This is why a 10-step agent task does not cost 10x a single call — it costs closer to 50x due to the growing context window.

Semantic Caching #

The most effective cost optimization is not doing the work at all. Semantic caching stores model responses indexed by the meaning of the request in addition to the exact text. When a new request is semantically similar to one already answered, the cache returns the stored response without calling the model.

Unlike exact-match caching (which only helps with identical inputs), semantic caching handles the natural variation in how users phrase the same question.

import hashlib
import time
from dataclasses import dataclass, field

import numpy as np


@dataclass
class CacheEntry:
    """A cached model response with metadata."""
    key_embedding: list[float]
    request_text: str
    response: str
    model_id: str
    created_at: float
    hit_count: int = 0
    ttl_seconds: int = 3600
    metadata: dict = field(default_factory=dict)

    @property
    def is_expired(self) -> bool:
        return time.time() - self.created_at > self.ttl_seconds


class SemanticCache:
    """Cache model responses by semantic similarity."""

    def __init__(
        self,
        embedding_model,
        similarity_threshold: float = 0.95,
        max_entries: int = 100_000,
    ):
        self.embedding_model = embedding_model
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self.entries: list[CacheEntry] = []
        self._embeddings_matrix: np.ndarray | None = None

    async def get(self, request: str, model_id: str) -> str | None:
        """Look up a semantically similar cached response."""
        if not self.entries:
            return None

        query_embedding = await self.embedding_model.embed(request)

        # Find most similar cached request
        similarities = self._compute_similarities(query_embedding)
        best_idx = int(np.argmax(similarities))
        best_score = similarities[best_idx]

        if best_score >= self.threshold:
            entry = self.entries[best_idx]
            if not entry.is_expired and entry.model_id == model_id:
                entry.hit_count += 1
                return entry.response

        return None

    async def put(
        self, request: str, response: str, model_id: str, ttl: int = 3600
    ):
        """Store a response in the cache."""
        embedding = await self.embedding_model.embed(request)

        entry = CacheEntry(
            key_embedding=embedding,
            request_text=request,
            response=response,
            model_id=model_id,
            created_at=time.time(),
            ttl_seconds=ttl,
        )
        self.entries.append(entry)
        self._rebuild_matrix()

        # Evict if over capacity
        if len(self.entries) > self.max_entries:
            self._evict()

    def _compute_similarities(self, query: list[float]) -> np.ndarray:
        """Cosine similarity against all cached embeddings."""
        query_vec = np.array(query)
        return np.dot(self._embeddings_matrix, query_vec) / (
            np.linalg.norm(self._embeddings_matrix, axis=1)
            * np.linalg.norm(query_vec)
        )

    def _evict(self):
        """Remove expired entries, then least-used entries."""
        # Remove expired first
        self.entries = [e for e in self.entries if not e.is_expired]

        # If still over capacity, remove least-hit entries
        if len(self.entries) > self.max_entries:
            self.entries.sort(key=lambda e: e.hit_count, reverse=True)
            self.entries = self.entries[: self.max_entries]

        self._rebuild_matrix()

    def _rebuild_matrix(self):
        """Rebuild the embedding matrix for fast similarity search."""
        if self.entries:
            self._embeddings_matrix = np.array(
                [e.key_embedding for e in self.entries]
            )
        else:
            self._embeddings_matrix = None

When Semantic Caching Works #

Semantic caching is most effective when:

  • Many users ask similar questions (customer support, FAQ-style queries).
  • The agent performs repeatable tasks with stable inputs (daily report generation, standard analyses).
  • Tool results are deterministic for the same inputs (database lookups, API calls with stable data).

It is dangerous when:

  • Answers depend on real-time data (stock prices, live system state).
  • Subtle wording differences change the correct answer ("cancel my last order" vs. "cancel my first order").
  • The agent needs to maintain conversational context that differs between users.

The similarity threshold is the critical tuning parameter. Too high (0.99) and the cache rarely hits. Too low (0.85) and you return wrong answers for superficially similar but semantically different requests. Start at 0.95 and adjust based on your false-positive rate.

Caching at Multiple Levels #

A sophisticated agent system caches at several layers:

Cache Level What is Cached Hit Rate Risk
Full response Complete agent output for a task Low-medium Stale answers
Tool results Individual tool call responses High Stale data
Intermediate reasoning Partial chains for sub-problems Medium Context mismatch
Embeddings Vector representations of text Very high Minimal
Prompt prefix KV cache for static system prompts Very high None

Tool-result caching is particularly effective. If your agent calls a get_user_profile tool and the profile has not changed since the last call, there is no reason to hit the database again. A TTL-based cache on tool results can eliminate 30-50% of external calls in many workloads.

Prompt Compression #

Every token in the prompt costs money. Prompt compression reduces token count while preserving the information the model needs to do its job.

Static Compression - Trimming the System Prompt #

The system prompt is sent on every single call. A 200-token reduction here saves 200 tokens × every iteration × every task × every day. Small absolute savings compound enormously.

Techniques:

Remove redundant instructions. If the model already does something correctly without being told (because it was fine-tuned for it), delete that instruction.

Abbreviate tool schemas. Instead of verbose descriptions, use terse parameter names and minimal descriptions. A well-named tool (search_documents) needs less explanation than a generic one (run_query).

Conditional inclusion. Only include tool schemas for tools relevant to the current task. If the user is asking a factual question, do not inject the 15 tool schemas for code execution, file management, and database access.

class DynamicPromptAssembler:
    """Assemble prompts with only the components needed for this task."""

    def __init__(self, base_prompt: str, tool_registry: dict):
        self.base_prompt = base_prompt
        self.tool_registry = tool_registry

    def assemble(self, task: str, task_category: str) -> str:
        """Build a minimal prompt for the given task."""
        parts = [self.base_prompt]

        # Only include relevant tool schemas
        relevant_tools = self._select_tools(task_category)
        if relevant_tools:
            schemas = self._format_schemas(relevant_tools)
            parts.append(schemas)

        return "\n\n".join(parts)

    def _select_tools(self, category: str) -> list[str]:
        """Select tools relevant to the task category."""
        category_tools = {
            "research": ["web_search", "read_document", "summarize"],
            "coding": ["read_file", "write_file", "run_code", "search_code"],
            "data": ["query_db", "visualize", "export_csv"],
            "general": ["web_search", "calculator"],
        }
        return category_tools.get(category, list(self.tool_registry.keys()))

    def _format_schemas(self, tool_names: list[str]) -> str:
        """Format only selected tool schemas, minimizing token count."""
        schemas = []
        for name in tool_names:
            tool = self.tool_registry[name]
            # Compact format — no verbose descriptions
            schemas.append(
                f"{name}({', '.join(tool.parameters.keys())}): "
                f"{tool.short_description}"
            )
        return "Tools:\n" + "\n".join(schemas)

Dynamic Compression - Shrinking Context Between Iterations #

As the agent loop iterates, the conversation history grows. By iteration 8, you are carrying reasoning from steps 1-7 that may no longer be relevant. Dynamic compression summarizes or prunes old context to keep the window manageable.

class ContextCompressor:
    """Compress conversation history to reduce token usage."""

    def __init__(self, model, max_context_tokens: int = 4000):
        self.model = model
        self.max_tokens = max_context_tokens

    async def compress(self, messages: list[dict]) -> list[dict]:
        """Compress message history while preserving essential information."""
        current_tokens = self._count_tokens(messages)

        if current_tokens <= self.max_tokens:
            return messages  # No compression needed

        # Strategy 1: Summarize old tool results
        messages = self._truncate_tool_results(messages)

        if self._count_tokens(messages) <= self.max_tokens:
            return messages

        # Strategy 2: Summarize early reasoning steps
        messages = await self._summarize_early_steps(messages)

        if self._count_tokens(messages) <= self.max_tokens:
            return messages

        # Strategy 3: Drop intermediate steps, keep first and recent
        messages = self._keep_bookends(messages)
        return messages

    def _truncate_tool_results(self, messages: list[dict]) -> list[dict]:
        """Truncate verbose tool results, keeping only key findings."""
        compressed = []
        for msg in messages:
            if msg["role"] == "tool" and len(msg["content"]) > 500:
                # Keep first 200 chars + last 100 chars
                content = msg["content"]
                truncated = (
                    content[:200] + "\n...[truncated]...\n" + content[-100:]
                )
                compressed.append({**msg, "content": truncated})
            else:
                compressed.append(msg)
        return compressed

    async def _summarize_early_steps(
        self, messages: list[dict]
    ) -> list[dict]:
        """Replace early reasoning steps with a summary."""
        # Keep system prompt and last 3 exchanges intact
        system = messages[0]
        recent = messages[-6:]  # Last 3 turns (assistant + tool/user)
        middle = messages[1:-6]

        if not middle:
            return messages

        # Summarize the middle section
        middle_text = "\n".join(
            f"{m['role']}: {m['content'][:200]}" for m in middle
        )
        summary = await self.model.generate(
            f"Summarize these agent steps concisely, preserving key "
            f"findings and decisions:\n{middle_text}"
        )

        summary_msg = {
            "role": "system",
            "content": f"[Previous steps summary: {summary}]",
        }
        return [system, summary_msg] + recent

    def _keep_bookends(self, messages: list[dict]) -> list[dict]:
        """Keep first exchange and last few exchanges, drop middle."""
        system = messages[0]
        first_exchange = messages[1:3]  # First user + assistant
        recent = messages[-6:]
        return [system] + first_exchange + recent

The trade-off is clear: compression loses information. Aggressive compression saves tokens but risks the agent "forgetting" something important from earlier steps. Conservative compression preserves fidelity but saves less. A practical approach is to compress only tool results and early reasoning — the model rarely needs the full text of a tool result from 5 steps ago, but it does need to remember what it concluded from that result.

Response Deduplication #

Agents often re-derive the same intermediate result. In a multi-agent system, different agents may independently call the same tool with the same arguments, or compute the same sub-answer. Deduplication catches these redundancies.

import asyncio
import hashlib
import json
import time
from collections.abc import Awaitable, Callable


class RequestDeduplicator:
    """Prevent redundant in-flight and recently-completed requests."""

    def __init__(self, ttl_seconds: int = 60):
        self.in_flight: dict[str, asyncio.Future] = {}
        self.recent_results: dict[str, tuple[str, float]] = {}
        self.ttl = ttl_seconds

    def _make_key(self, model_id: str, messages: list[dict]) -> str:
        """Create a deduplication key from the request."""
        # Hash the meaningful parts of the request
        content = json.dumps(messages, sort_keys=True)
        return hashlib.sha256(
            f"{model_id}:{content}".encode()
        ).hexdigest()

    async def deduplicated_call(
        self,
        model_id: str,
        messages: list[dict],
        call_fn: Callable[[str, list[dict]], Awaitable[str]],
    ) -> str:
        """Execute a model call, deduplicating concurrent identical requests."""
        key = self._make_key(model_id, messages)

        # Check recent results first
        if key in self.recent_results:
            result, timestamp = self.recent_results[key]
            if time.time() - timestamp < self.ttl:
                return result

        # Check if an identical request is already in flight
        if key in self.in_flight:
            return await self.in_flight[key]

        # Execute the call, sharing the result with any concurrent duplicates
        future = asyncio.get_event_loop().create_future()
        self.in_flight[key] = future

        try:
            result = await call_fn(model_id, messages)
            future.set_result(result)
            self.recent_results[key] = (result, time.time())
            return result
        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            del self.in_flight[key]

Deduplication is especially valuable in parallel execution patterns. When a coordinator agent spawns five sub-agents that all need the same background context computed, deduplication ensures that computation happens once and the result is shared.

Batching #

Model API calls have per-request overhead — network latency, queue scheduling, and cold-start penalties. Batching groups multiple independent requests into a single API call (where the provider supports it) or a tight burst, amortizing that overhead.

Two batching strategies:

Micro-Batching - Collecting Requests Over a Short Window #

When multiple agent tasks arrive concurrently, their initial model calls are independent. Instead of sending them one by one, collect requests over a short window (10-50ms) and send them as a batch.

import asyncio
from dataclasses import dataclass


@dataclass
class PendingRequest:
    messages: list[dict]
    model_id: str
    future: asyncio.Future


class MicroBatcher:
    """Batch concurrent model requests for throughput and cost savings."""

    def __init__(
        self,
        model_client,
        max_batch_size: int = 8,
        window_ms: int = 20,
    ):
        self.client = model_client
        self.max_batch_size = max_batch_size
        self.window_ms = window_ms
        self.pending: list[PendingRequest] = []
        self._flush_task: asyncio.Task | None = None

    async def call(self, model_id: str, messages: list[dict]) -> str:
        """Submit a request for batched execution."""
        future = asyncio.get_event_loop().create_future()
        request = PendingRequest(
            messages=messages, model_id=model_id, future=future
        )
        self.pending.append(request)

        # Start the collection window if not already running
        if self._flush_task is None:
            self._flush_task = asyncio.create_task(self._flush_after_window())

        # Flush immediately if batch is full
        if len(self.pending) >= self.max_batch_size:
            await self._flush()

        return await future

    async def _flush_after_window(self):
        """Wait for the collection window, then flush."""
        await asyncio.sleep(self.window_ms / 1000)
        await self._flush()

    async def _flush(self):
        """Send all pending requests as a batch."""
        if not self.pending:
            return

        batch = self.pending[:]
        self.pending = []
        self._flush_task = None

        # Group by model for efficient batching
        by_model: dict[str, list[PendingRequest]] = {}
        for req in batch:
            by_model.setdefault(req.model_id, []).append(req)

        for model_id, requests in by_model.items():
            try:
                results = await self.client.batch_generate(
                    model_id=model_id,
                    messages_batch=[r.messages for r in requests],
                )
                for req, result in zip(requests, results):
                    req.future.set_result(result)
            except Exception as e:
                for req in requests:
                    req.future.set_exception(e)

Offline Batching - Deferred Processing for Non-Urgent Work #

Many model providers offer batch APIs at 50% discount — you submit a batch of requests and get results hours later. For any agent work that is not time-sensitive (nightly report generation, bulk document processing, periodic evaluations), offline batching cuts cost in half with no quality trade-off.

from collections.abc import Callable


class OfflineBatchQueue:
    """Queue non-urgent agent tasks for discounted batch processing."""

    def __init__(self, batch_client, max_batch_size: int = 1000):
        self.client = batch_client
        self.max_batch_size = max_batch_size
        self.queue: list[dict] = []
        self.callbacks: dict[str, Callable[[str], None]] = {}

    def enqueue(
        self,
        task_id: str,
        messages: list[dict],
        callback: Callable[[str], None],
    ):
        """Add a task to the batch queue."""
        self.queue.append({
            "custom_id": task_id,
            "messages": messages,
        })
        self.callbacks[task_id] = callback

        if len(self.queue) >= self.max_batch_size:
            self._submit_batch()

    def _submit_batch(self):
        """Submit the current queue as a batch job."""
        batch = self.queue[:]
        self.queue = []

        # Submit to provider's batch API
        job_id = self.client.create_batch(requests=batch)
        # Register a webhook or poll for completion
        self.client.on_complete(job_id, self._handle_results)

    def _handle_results(self, results: list[dict]):
        """Distribute batch results to waiting callbacks."""
        for result in results:
            task_id = result["custom_id"]
            if task_id in self.callbacks:
                self.callbacks[task_id](result["response"])
                del self.callbacks[task_id]

Budget-Aware Architectures #

The techniques above are optimizations applied to individual calls. Budget-aware architecture is a system-level concern: the agent knows what it is spending and adjusts its behavior accordingly.

Per-Task Budget Enforcement #

Each agent task gets a token or cost budget. The orchestrator tracks spending across iterations and takes action when the budget is running low.

from dataclasses import dataclass


@dataclass
class TaskBudget:
    """Token and cost budget for a single agent task."""
    max_input_tokens: int = 50_000
    max_output_tokens: int = 10_000
    max_cost_dollars: float = 0.50
    max_iterations: int = 15

    # Tracking
    input_tokens_used: int = 0
    output_tokens_used: int = 0
    cost_used: float = 0.0
    iterations_used: int = 0

    def record_usage(
        self, input_tokens: int, output_tokens: int, cost: float
    ):
        self.input_tokens_used += input_tokens
        self.output_tokens_used += output_tokens
        self.cost_used += cost
        self.iterations_used += 1

    @property
    def remaining_fraction(self) -> float:
        """What fraction of the budget remains (0 to 1)."""
        token_frac = 1 - (
            self.input_tokens_used / self.max_input_tokens
        )
        cost_frac = 1 - (self.cost_used / self.max_cost_dollars)
        iter_frac = 1 - (self.iterations_used / self.max_iterations)
        return min(token_frac, cost_frac, iter_frac)

    @property
    def is_exhausted(self) -> bool:
        return self.remaining_fraction <= 0

    @property
    def is_low(self) -> bool:
        return self.remaining_fraction < 0.2


class BudgetAwareAgent:
    """An agent that adapts behavior based on remaining budget."""

    def __init__(self, model_router, tools, budget: TaskBudget):
        self.router = model_router
        self.tools = tools
        self.budget = budget

    async def step(self, state: dict) -> dict:
        """Execute one agent step with budget awareness."""
        if self.budget.is_exhausted:
            return self._force_final_answer(state)

        # Adapt model selection based on remaining budget
        model_id = self._select_model_for_budget()

        # Adapt context size based on remaining budget
        messages = self._trim_context_for_budget(state["messages"])

        response = await self.router.call(model_id, messages)

        # Record usage
        self.budget.record_usage(
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            cost=response.usage.cost,
        )

        return response

    def _select_model_for_budget(self) -> str:
        """Pick a model based on remaining budget."""
        if self.budget.remaining_fraction > 0.5:
            return "large-model"  # Plenty of budget — use the best
        elif self.budget.remaining_fraction > 0.2:
            return "medium-model"  # Getting tight — use mid-tier
        else:
            return "small-model"  # Almost out — use cheapest

    def _trim_context_for_budget(self, messages: list[dict]) -> list[dict]:
        """Aggressively compress context when budget is low."""
        if self.budget.is_low:
            # Keep only system prompt and last 2 exchanges
            return [messages[0]] + messages[-4:]
        return messages

    def _force_final_answer(self, state: dict) -> dict:
        """Budget exhausted — produce best answer with what we have."""
        return {
            "type": "final_answer",
            "content": self._synthesize_from_current_state(state),
            "metadata": {"budget_exhausted": True},
        }

Cascading Model Tiers #

A budget-aware architecture routes requests through a cascade of models, starting with the cheapest and escalating only when the cheap model cannot handle the task confidently.

┌──────────────────────────────────────────────────────────────┐
│                  Model Cascade Architecture                  │
│                                                              │
│  Request ──► Small Model (fast, cheap)                       │
│                   │                                          │
│                   ├── Confident? ──► Return response         │
│                   │                                          │
│                   └── Uncertain? ──► Medium Model            │
│                                         │                    │
│                                         ├── Confident? ──►   │
│                                         │   Return response  │
│                                         │                    │
│                                         └── Uncertain? ──►   │
│                                             Large Model      │
│                                               │              │
│                                               └── Return     │
│                                                   response   │
│                                                              │
│  Cost: 80% of requests resolved at tier 1 ($0.001/call)      │
│        15% escalated to tier 2 ($0.01/call)                  │
│         5% escalated to tier 3 ($0.05/call)                  │
│                                                              │
│  Blended cost: ~$0.004/call (vs. $0.05 for always-large)     │
└──────────────────────────────────────────────────────────────┘
class ModelCascade:
    """Route requests through a cascade of increasingly capable models."""

    def __init__(self, tiers: list[dict]):
        """
        tiers: [
            {"model_id": "small", "client": ..., "confidence_threshold": 0.8},
            {"model_id": "medium", "client": ..., "confidence_threshold": 0.7},
            {"model_id": "large", "client": ..., "confidence_threshold": 0.0},
        ]
        """
        self.tiers = tiers

    async def call(self, messages: list[dict]) -> dict:
        """Try each tier in order, escalating on low confidence."""
        for tier in self.tiers:
            response = await tier["client"].generate(
                model=tier["model_id"],
                messages=messages,
                return_confidence=True,
            )

            if response.confidence >= tier["confidence_threshold"]:
                return {
                    "content": response.content,
                    "model_used": tier["model_id"],
                    "confidence": response.confidence,
                    "escalated": tier != self.tiers[0],
                }

        # Final tier always returns regardless of confidence
        return {
            "content": response.content,
            "model_used": self.tiers[-1]["model_id"],
            "confidence": response.confidence,
            "escalated": True,
        }

The confidence signal can come from the model itself (logprobs on the output), a separate classifier, or heuristics (short responses for complex questions often indicate uncertainty).

Prompt Prefix Caching #

Most API providers now support prompt prefix caching — if consecutive requests share the same prefix (system prompt, tool schemas, static context), the provider caches the key-value computations for that prefix and charges reduced rates for cached tokens. This is free money for agent systems.

The requirement is simple: structure your prompts so the static parts come first and the dynamic parts come last.

class PrefixOptimizedPrompt:
    """Structure prompts to maximize prefix cache hits."""

    def __init__(self, system_prompt: str, tool_schemas: str):
        # Static prefix — cached across all calls
        self.prefix = f"{system_prompt}\n\n{tool_schemas}"

    def build_messages(
        self, conversation: list[dict], task_context: str = ""
    ) -> list[dict]:
        """Build messages with cacheable prefix ordering."""
        messages = [
            # Static system prompt — always the same, always first
            {"role": "system", "content": self.prefix},
        ]

        # Dynamic context goes in a separate system message or user message
        if task_context:
            messages.append({
                "role": "system",
                "content": task_context,
            })

        # Conversation history
        messages.extend(conversation)
        return messages

For an agent making 8 calls per task with a 3,000-token system prompt, prefix caching means the system prompt is processed (and paid for at full price) only on the first call. The remaining 7 calls pay the reduced cached-token rate — typically 75-90% cheaper for input tokens. On a 3,000-token prefix over 8 iterations, that saves ~18,000 tokens worth of full-price computation per task.

The key constraint: the prefix must be identical across requests. If you inject dynamic content (like a timestamp or user ID) into the system prompt, you break the cache. Move all dynamic content after the static prefix.

Putting It All Together #

A production agent system layers these techniques:

┌─────────────────────────────────────────────────────────┐
│              Cost-Optimized Agent Architecture          │
│                                                         │
│  Incoming task                                          │
│       │                                                 │
│       ▼                                                 │
│  ┌──────────────┐                                       │
│  │  Semantic    │── Hit ──► Return cached response      │
│  │  cache       │                                       │
│  └──────┬───────┘                                       │
│         │ Miss                                          │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Task        │── Route to category                   │
│  │  classifier  │                                       │
│  └──────┬───────┘                                       │
│         │                                               │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Prompt      │── Minimal tools + compressed context  │
│  │  assembler   │                                       │
│  └──────┬───────┘                                       │
│         │                                               │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Model       │── Cheapest model first                │
│  │  cascade     │                                       │
│  └──────┬───────┘                                       │
│         │                                               │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Dedup +     │── Batch concurrent calls              │
│  │  batcher     │                                       │
│  └──────┬───────┘                                       │
│         │                                               │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Budget      │── Track spending, adapt behavior      │
│  │  enforcer    │                                       │
│  └──────┬───────┘                                       │
│         │                                               │
│         ▼                                               │
│  ┌──────────────┐                                       │
│  │  Context     │── Compress history between iterations │
│  │  compressor  │                                       │
│  └──────────────┘                                       │
│                                                         │
└─────────────────────────────────────────────────────────┘

The compound effect is dramatic. Consider a workload of 100,000 agent tasks per day:

Optimization Savings How
Semantic cache (30% hit rate) 30% Avoid model calls entirely
Prefix caching 15-20% Reduced input token cost
Conditional tool schemas 10-15% Fewer tokens per call
Context compression 15-20% Shorter context in later iterations
Model cascade 40-60% Cheaper model handles most tasks
Offline batching (non-urgent) 50% on subset Discounted batch API
Deduplication 5-10% Eliminate redundant work

These savings multiply. A model cascade (60% saving) combined with semantic caching (30% of remaining) combined with prefix caching (15% of remaining) yields total savings of 75-85% compared to the naive approach of always using the largest model with full context.

Conclusion #

Cost optimization for agents is a layered architecture where each layer removes waste at a different level. Semantic caching eliminates redundant work entirely. Prompt compression reduces what you send. Model cascades route simple tasks to cheap models. Batching amortizes overhead. Budget enforcement prevents runaway spending. Context compression stops the accumulating-history problem from making later iterations exponentially expensive.

The foundational principle is that tokens are the unit of cost, and the agent loop is a token multiplier. Every iteration carries the weight of all previous iterations. Optimizations that reduce per-iteration tokens compound across the entire task. A system that saves 40% per call saves far more than 40% per task because it also reduces the context that future calls must carry.

Start with measurement — instrument your agent to report tokens consumed per task, cache hit rates, and cost per iteration. Once you see where the tokens go, the highest-leverage optimizations become obvious. In most systems, prefix caching and conditional tool inclusion are free wins that require only prompt restructuring. Semantic caching and model cascades require more infrastructure but deliver the largest absolute savings.