• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Advanced Agent Patterns

Agent Patterns in Production

Deploy and manage agent patterns in production environments with monitoring and optimization.

Guidelines for deploying advanced agent patterns in production environments. Covers monitoring, optimization, error handling, and scalability.

Overview

Moving advanced patterns from development to production requires careful attention to performance, reliability, cost, and monitoring. This guide provides battle-tested practices for production deployments.

Architecture Considerations

Pattern Selection Strategy

class ProductionPatternSystem:
    def __init__(self):
        # Fast path for simple tasks
        self.simple_agent = Agent(...)

        # Moderate complexity
        self.self_consistency = SelfConsistencyAgent(num_generations=3)

        # High complexity
        self.reflexion = ReflexionAgent(max_iterations=3)

        # Critical tasks
        self.full_pipeline = CombinedPattern(...)

    def route(self, task, metadata):
        # Consider multiple factors
        complexity = metadata.get("complexity", "medium")
        priority = metadata.get("priority", "normal")
        budget = metadata.get("budget", "standard")

        if priority == "urgent" or budget == "low":
            return self.simple_agent

        if complexity == "high" and budget == "high":
            return self.full_pipeline

        if complexity == "high":
            return self.reflexion

        return self.self_consistency

Failover and Fallbacks

class ResilientPatternExecution:
    def run(self, task):
        try:
            # Try optimal pattern
            return self.optimal_pattern.run(task)
        except TimeoutError:
            logger.warning("Pattern timeout, falling back")
            return self.fast_fallback.run(task)
        except ResourceError:
            logger.warning("Resource limit, using simple pattern")
            return self.simple_agent.run(task)
        except Exception as e:
            logger.error(f"Pattern failed: {e}")
            return self.safe_default_response()

Performance Optimization

Caching Strategy

from functools import lru_cache
import hashlib

class CachedPatternExecution:
    def __init__(self):
        self.cache = {}

    def cache_key(self, task, pattern_config):
        # Create deterministic cache key
        content = f"{task}:{json.dumps(pattern_config, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()

    def run(self, task, pattern, config):
        key = self.cache_key(task, config)

        if key in self.cache:
            logger.info("Cache hit")
            return self.cache[key]

        result = pattern.run(task, **config)
        self.cache[key] = result

        return result

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

class ParallelPatternExecution:
    def batch_run(self, tasks, pattern):
        with ThreadPoolExecutor(max_workers=10) as executor:
            # Submit all tasks
            futures = {
                executor.submit(pattern.run, task): task
                for task in tasks
            }

            # Collect results as they complete
            results = {}
            for future in as_completed(futures):
                task = futures[future]
                try:
                    results[task] = future.result(timeout=30)
                except Exception as e:
                    logger.error(f"Task {task} failed: {e}")
                    results[task] = None

            return results

Resource Pooling

class PatternPool:
    def __init__(self, pattern_class, pool_size=5):
        self.pool = Queue()
        for _ in range(pool_size):
            self.pool.put(pattern_class())

    def run(self, task):
        # Get pattern from pool
        pattern = self.pool.get()

        try:
            result = pattern.run(task)
            return result
        finally:
            # Return to pool
            self.pool.put(pattern)

Monitoring and Observability

Comprehensive Metrics

import time
from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class PatternMetrics:
    pattern_name: str
    latency_ms: float
    token_count: int
    cost_usd: float
    quality_score: float
    iterations_used: int
    cache_hit: bool
    error: Optional[str] = None

class MonitoredPattern:
    def __init__(self, pattern, metrics_client):
        self.pattern = pattern
        self.metrics = metrics_client

    def run(self, task):
        start_time = time.time()
        error = None
        result = None

        try:
            result = self.pattern.run(task)
        except Exception as e:
            error = str(e)
            raise
        finally:
            # Record metrics regardless of success/failure
            metrics = PatternMetrics(
                pattern_name=self.pattern.__class__.__name__,
                latency_ms=(time.time() - start_time) * 1000,
                token_count=getattr(result, 'token_count', 0),
                cost_usd=self.calculate_cost(result),
                quality_score=self.evaluate_quality(result) if result else 0,
                iterations_used=getattr(result, 'iterations_used', 1),
                cache_hit=getattr(result, 'from_cache', False),
                error=error,
            )

            self.metrics.record(metrics)

        return result

Distributed Tracing

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

class TracedPattern:
    def run(self, task):
        with tracer.start_as_current_span("pattern_execution") as span:
            span.set_attribute("pattern.name", self.pattern_name)
            span.set_attribute("task.length", len(task))

            # Pattern execution
            with tracer.start_as_current_span("pattern.generation"):
                result = self.pattern.run(task)

            span.set_attribute("result.quality", self.evaluate(result))
            span.set_attribute("result.tokens", result.token_count)

            return result

Alerting

class AlertingPattern:
    def __init__(self, pattern, alert_client):
        self.pattern = pattern
        self.alerts = alert_client
        self.thresholds = {
            "latency_p95_ms": 5000,
            "error_rate": 0.05,
            "cost_per_hour": 100,
        }

    def check_alerts(self, metrics):
        if metrics.latency_ms > self.thresholds["latency_p95_ms"]:
            self.alerts.send(
                severity="warning",
                message=f"High latency: {metrics.latency_ms}ms",
            )

        if metrics.cost_usd > self.thresholds["cost_per_hour"] / 3600:
            self.alerts.send(
                severity="critical",
                message=f"High cost: ${metrics.cost_usd}",
            )

Cost Management

Budget Enforcement

class BudgetEnforcer:
    def __init__(self, hourly_budget_usd):
        self.hourly_budget = hourly_budget_usd
        self.current_hour_spend = 0
        self.hour_start = time.time()

    def check_budget(self, estimated_cost):
        # Reset if new hour
        if time.time() - self.hour_start > 3600:
            self.current_hour_spend = 0
            self.hour_start = time.time()

        # Check if within budget
        if self.current_hour_spend + estimated_cost > self.hourly_budget:
            raise BudgetExceededError(
                f"Would exceed hourly budget: ${self.hourly_budget}"
            )

        return True

    def record_spend(self, actual_cost):
        self.current_hour_spend += actual_cost

Cost Optimization

class CostOptimizedPattern:
    def run(self, task, max_cost_usd=None):
        # Estimate cost before execution
        estimated_cost = self.estimate_cost(task)

        if max_cost_usd and estimated_cost > max_cost_usd:
            # Fall back to cheaper pattern
            logger.info(f"Using cheaper pattern (est: ${estimated_cost})")
            return self.cheap_fallback.run(task)

        # Use appropriate model based on cost
        if estimated_cost < 0.01:
            model = "gpt-3.5-turbo"  # Cheap
        elif estimated_cost < 0.10:
            model = "gpt-4"  # Medium
        else:
            model = "gpt-4"  # Expensive but necessary

        return self.pattern.run(task, model=model)

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

class RetryablePattern:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        reraise=True,
    )
    def run(self, task):
        try:
            return self.pattern.run(task)
        except RateLimitError:
            logger.warning("Rate limit hit, retrying...")
            raise  # Will trigger retry
        except TransientError:
            logger.warning("Transient error, retrying...")
            raise
        except PermanentError:
            logger.error("Permanent error, not retrying")
            raise  # Won't retry

Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()

            if self.failures >= self.failure_threshold:
                self.state = "open"
                logger.error("Circuit breaker opened")

            raise

Scalability

Async Execution

import asyncio

class AsyncPattern:
    async def run_async(self, task):
        # Async pattern execution
        result = await self.pattern.run_async(task)
        return result

    async def batch_run_async(self, tasks):
        # Run multiple tasks concurrently
        results = await asyncio.gather(*[
            self.run_async(task) for task in tasks
        ], return_exceptions=True)

        return results

Queue-Based Processing

from queue import Queue
from threading import Thread

class QueuedPatternProcessor:
    def __init__(self, pattern, num_workers=5):
        self.pattern = pattern
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.workers = []

        # Start workers
        for _ in range(num_workers):
            worker = Thread(target=self._worker)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)

    def _worker(self):
        while True:
            task_id, task = self.task_queue.get()
            try:
                result = self.pattern.run(task)
                self.result_queue.put((task_id, result, None))
            except Exception as e:
                self.result_queue.put((task_id, None, e))
            finally:
                self.task_queue.task_done()

    def submit(self, task):
        task_id = generate_id()
        self.task_queue.put((task_id, task))
        return task_id

    def get_result(self, task_id, timeout=None):
        # Poll result queue for specific task_id
        while True:
            result_id, result, error = self.result_queue.get(timeout=timeout)
            if result_id == task_id:
                if error:
                    raise error
                return result

Testing

Integration Tests

import pytest

class TestProductionPatterns:
    def test_pattern_latency(self):
        """Ensure patterns meet latency SLAs"""
        pattern = ProductionPattern()

        start = time.time()
        result = pattern.run("test task")
        latency = time.time() - start

        assert latency < 5.0, f"Latency {latency}s exceeds 5s SLA"

    def test_pattern_quality(self):
        """Ensure patterns meet quality standards"""
        pattern = ProductionPattern()

        result = pattern.run("test task")
        quality = evaluate_quality(result)

        assert quality > 0.8, f"Quality {quality} below 0.8 threshold"

    def test_pattern_cost(self):
        """Ensure patterns stay within cost budget"""
        pattern = ProductionPattern()

        result = pattern.run("test task")
        cost = calculate_cost(result)

        assert cost < 0.50, f"Cost ${cost} exceeds $0.50 budget"

    def test_pattern_failover(self):
        """Ensure failover works correctly"""
        pattern = ResilientPattern()

        # Simulate failure
        with mock.patch.object(pattern.primary, 'run', side_effect=Exception):
            result = pattern.run("test task")
            assert result.used_fallback == True

Load Tests

import locust

class PatternLoadTest(locust.HttpUser):
    @locust.task
    def run_pattern(self):
        self.client.post("/pattern/run", json={
            "task": "Sample task for load testing"
        })

# Run: locust -f load_test.py --users 100 --spawn-rate 10

Deployment Checklist

  • Implement comprehensive monitoring and alerts
  • Set up distributed tracing
  • Configure budget limits and cost alerts
  • Implement retry logic and circuit breakers
  • Add caching for repeated queries
  • Set up failover and fallback patterns
  • Load test under expected traffic
  • Document runbooks for common issues
  • Set up gradual rollout/canary deployment
  • Configure rate limiting
  • Implement request queuing for bursts
  • Set up log aggregation
  • Create dashboards for key metrics
  • Test disaster recovery procedures
  • Document escalation procedures

Common Pitfalls

1. Over-Engineering

# Bad: Too complex for simple tasks
result = reflexion_sc_duo_judge.run("What is 2+2?")

# Good: Match complexity to task
result = simple_agent.run("What is 2+2?")

2. No Timeout Handling

# Bad: Can hang indefinitely
result = pattern.run(task)

# Good: Always set timeouts
result = pattern.run(task, timeout=30)

3. Ignoring Costs

# Bad: Unlimited spending
for task in tasks:
    result = expensive_pattern.run(task)

# Good: Monitor and limit costs
for task in tasks:
    if budget_checker.can_afford(task):
        result = expensive_pattern.run(task)
    else:
        result = cheap_fallback.run(task)
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs