Guidelines for deploying advanced agent patterns in production environments. Covers monitoring, optimization, error handling, and scalability.
Overview
Moving advanced patterns from development to production requires careful attention to performance, reliability, cost, and monitoring. This guide provides battle-tested practices for production deployments.
Architecture Considerations
Pattern Selection Strategy
class ProductionPatternSystem:
def __init__(self):
# Fast path for simple tasks
self.simple_agent = Agent(...)
# Moderate complexity
self.self_consistency = SelfConsistencyAgent(num_generations=3)
# High complexity
self.reflexion = ReflexionAgent(max_iterations=3)
# Critical tasks
self.full_pipeline = CombinedPattern(...)
def route(self, task, metadata):
# Consider multiple factors
complexity = metadata.get("complexity", "medium")
priority = metadata.get("priority", "normal")
budget = metadata.get("budget", "standard")
if priority == "urgent" or budget == "low":
return self.simple_agent
if complexity == "high" and budget == "high":
return self.full_pipeline
if complexity == "high":
return self.reflexion
return self.self_consistency
Failover and Fallbacks
class ResilientPatternExecution:
def run(self, task):
try:
# Try optimal pattern
return self.optimal_pattern.run(task)
except TimeoutError:
logger.warning("Pattern timeout, falling back")
return self.fast_fallback.run(task)
except ResourceError:
logger.warning("Resource limit, using simple pattern")
return self.simple_agent.run(task)
except Exception as e:
logger.error(f"Pattern failed: {e}")
return self.safe_default_response()
Performance Optimization
Caching Strategy
from functools import lru_cache
import hashlib
class CachedPatternExecution:
def __init__(self):
self.cache = {}
def cache_key(self, task, pattern_config):
# Create deterministic cache key
content = f"{task}:{json.dumps(pattern_config, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()
def run(self, task, pattern, config):
key = self.cache_key(task, config)
if key in self.cache:
logger.info("Cache hit")
return self.cache[key]
result = pattern.run(task, **config)
self.cache[key] = result
return result
Parallel Execution
from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelPatternExecution:
def batch_run(self, tasks, pattern):
with ThreadPoolExecutor(max_workers=10) as executor:
# Submit all tasks
futures = {
executor.submit(pattern.run, task): task
for task in tasks
}
# Collect results as they complete
results = {}
for future in as_completed(futures):
task = futures[future]
try:
results[task] = future.result(timeout=30)
except Exception as e:
logger.error(f"Task {task} failed: {e}")
results[task] = None
return results
Resource Pooling
class PatternPool:
def __init__(self, pattern_class, pool_size=5):
self.pool = Queue()
for _ in range(pool_size):
self.pool.put(pattern_class())
def run(self, task):
# Get pattern from pool
pattern = self.pool.get()
try:
result = pattern.run(task)
return result
finally:
# Return to pool
self.pool.put(pattern)
Monitoring and Observability
Comprehensive Metrics
import time
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class PatternMetrics:
pattern_name: str
latency_ms: float
token_count: int
cost_usd: float
quality_score: float
iterations_used: int
cache_hit: bool
error: Optional[str] = None
class MonitoredPattern:
def __init__(self, pattern, metrics_client):
self.pattern = pattern
self.metrics = metrics_client
def run(self, task):
start_time = time.time()
error = None
result = None
try:
result = self.pattern.run(task)
except Exception as e:
error = str(e)
raise
finally:
# Record metrics regardless of success/failure
metrics = PatternMetrics(
pattern_name=self.pattern.__class__.__name__,
latency_ms=(time.time() - start_time) * 1000,
token_count=getattr(result, 'token_count', 0),
cost_usd=self.calculate_cost(result),
quality_score=self.evaluate_quality(result) if result else 0,
iterations_used=getattr(result, 'iterations_used', 1),
cache_hit=getattr(result, 'from_cache', False),
error=error,
)
self.metrics.record(metrics)
return result
Distributed Tracing
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
class TracedPattern:
def run(self, task):
with tracer.start_as_current_span("pattern_execution") as span:
span.set_attribute("pattern.name", self.pattern_name)
span.set_attribute("task.length", len(task))
# Pattern execution
with tracer.start_as_current_span("pattern.generation"):
result = self.pattern.run(task)
span.set_attribute("result.quality", self.evaluate(result))
span.set_attribute("result.tokens", result.token_count)
return result
Alerting
class AlertingPattern:
def __init__(self, pattern, alert_client):
self.pattern = pattern
self.alerts = alert_client
self.thresholds = {
"latency_p95_ms": 5000,
"error_rate": 0.05,
"cost_per_hour": 100,
}
def check_alerts(self, metrics):
if metrics.latency_ms > self.thresholds["latency_p95_ms"]:
self.alerts.send(
severity="warning",
message=f"High latency: {metrics.latency_ms}ms",
)
if metrics.cost_usd > self.thresholds["cost_per_hour"] / 3600:
self.alerts.send(
severity="critical",
message=f"High cost: ${metrics.cost_usd}",
)
Cost Management
Budget Enforcement
class BudgetEnforcer:
def __init__(self, hourly_budget_usd):
self.hourly_budget = hourly_budget_usd
self.current_hour_spend = 0
self.hour_start = time.time()
def check_budget(self, estimated_cost):
# Reset if new hour
if time.time() - self.hour_start > 3600:
self.current_hour_spend = 0
self.hour_start = time.time()
# Check if within budget
if self.current_hour_spend + estimated_cost > self.hourly_budget:
raise BudgetExceededError(
f"Would exceed hourly budget: ${self.hourly_budget}"
)
return True
def record_spend(self, actual_cost):
self.current_hour_spend += actual_cost
Cost Optimization
class CostOptimizedPattern:
def run(self, task, max_cost_usd=None):
# Estimate cost before execution
estimated_cost = self.estimate_cost(task)
if max_cost_usd and estimated_cost > max_cost_usd:
# Fall back to cheaper pattern
logger.info(f"Using cheaper pattern (est: ${estimated_cost})")
return self.cheap_fallback.run(task)
# Use appropriate model based on cost
if estimated_cost < 0.01:
model = "gpt-3.5-turbo" # Cheap
elif estimated_cost < 0.10:
model = "gpt-4" # Medium
else:
model = "gpt-4" # Expensive but necessary
return self.pattern.run(task, model=model)
Error Handling
Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
class RetryablePattern:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
def run(self, task):
try:
return self.pattern.run(task)
except RateLimitError:
logger.warning("Rate limit hit, retrying...")
raise # Will trigger retry
except TransientError:
logger.warning("Transient error, retrying...")
raise
except PermanentError:
logger.error("Permanent error, not retrying")
raise # Won't retry
Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
logger.error("Circuit breaker opened")
raise
Scalability
Async Execution
import asyncio
class AsyncPattern:
async def run_async(self, task):
# Async pattern execution
result = await self.pattern.run_async(task)
return result
async def batch_run_async(self, tasks):
# Run multiple tasks concurrently
results = await asyncio.gather(*[
self.run_async(task) for task in tasks
], return_exceptions=True)
return results
Queue-Based Processing
from queue import Queue
from threading import Thread
class QueuedPatternProcessor:
def __init__(self, pattern, num_workers=5):
self.pattern = pattern
self.task_queue = Queue()
self.result_queue = Queue()
self.workers = []
# Start workers
for _ in range(num_workers):
worker = Thread(target=self._worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker(self):
while True:
task_id, task = self.task_queue.get()
try:
result = self.pattern.run(task)
self.result_queue.put((task_id, result, None))
except Exception as e:
self.result_queue.put((task_id, None, e))
finally:
self.task_queue.task_done()
def submit(self, task):
task_id = generate_id()
self.task_queue.put((task_id, task))
return task_id
def get_result(self, task_id, timeout=None):
# Poll result queue for specific task_id
while True:
result_id, result, error = self.result_queue.get(timeout=timeout)
if result_id == task_id:
if error:
raise error
return result
Testing
Integration Tests
import pytest
class TestProductionPatterns:
def test_pattern_latency(self):
"""Ensure patterns meet latency SLAs"""
pattern = ProductionPattern()
start = time.time()
result = pattern.run("test task")
latency = time.time() - start
assert latency < 5.0, f"Latency {latency}s exceeds 5s SLA"
def test_pattern_quality(self):
"""Ensure patterns meet quality standards"""
pattern = ProductionPattern()
result = pattern.run("test task")
quality = evaluate_quality(result)
assert quality > 0.8, f"Quality {quality} below 0.8 threshold"
def test_pattern_cost(self):
"""Ensure patterns stay within cost budget"""
pattern = ProductionPattern()
result = pattern.run("test task")
cost = calculate_cost(result)
assert cost < 0.50, f"Cost ${cost} exceeds $0.50 budget"
def test_pattern_failover(self):
"""Ensure failover works correctly"""
pattern = ResilientPattern()
# Simulate failure
with mock.patch.object(pattern.primary, 'run', side_effect=Exception):
result = pattern.run("test task")
assert result.used_fallback == True
Load Tests
import locust
class PatternLoadTest(locust.HttpUser):
@locust.task
def run_pattern(self):
self.client.post("/pattern/run", json={
"task": "Sample task for load testing"
})
# Run: locust -f load_test.py --users 100 --spawn-rate 10
Deployment Checklist
- Implement comprehensive monitoring and alerts
- Set up distributed tracing
- Configure budget limits and cost alerts
- Implement retry logic and circuit breakers
- Add caching for repeated queries
- Set up failover and fallback patterns
- Load test under expected traffic
- Document runbooks for common issues
- Set up gradual rollout/canary deployment
- Configure rate limiting
- Implement request queuing for bursts
- Set up log aggregation
- Create dashboards for key metrics
- Test disaster recovery procedures
- Document escalation procedures
Common Pitfalls
1. Over-Engineering
# Bad: Too complex for simple tasks
result = reflexion_sc_duo_judge.run("What is 2+2?")
# Good: Match complexity to task
result = simple_agent.run("What is 2+2?")
2. No Timeout Handling
# Bad: Can hang indefinitely
result = pattern.run(task)
# Good: Always set timeouts
result = pattern.run(task, timeout=30)
3. Ignoring Costs
# Bad: Unlimited spending
for task in tasks:
result = expensive_pattern.run(task)
# Good: Monitor and limit costs
for task in tasks:
if budget_checker.can_afford(task):
result = expensive_pattern.run(task)
else:
result = cheap_fallback.run(task)