Comprehensive guide for optimizing Az Core framework applications for maximum performance, cost-efficiency, and scalability.
Overview
This guide covers optimization strategies for LLM calls, caching, concurrency, memory management, and overall system performance.
Performance Profiling
CPU Profiling
# profiling/cpu_profiler.py
import cProfile
import pstats
import io
from functools import wraps
from typing import Callable
def profile_cpu(sort_by: str = 'cumulative', top_n: int = 20):
"""Decorator to profile CPU usage."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# Print statistics
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats(sort_by)
ps.print_stats(top_n)
print(s.getvalue())
return result
return wrapper
return decorator
# Usage
@profile_cpu(sort_by='cumulative', top_n=10)
def execute_workflow(task: str):
# Workflow logic
pass
Memory Profiling
# profiling/memory_profiler.py
from memory_profiler import profile
import tracemalloc
from typing import Callable
from functools import wraps
def profile_memory(func: Callable) -> Callable:
"""Decorator to profile memory usage."""
@wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Memory usage:")
print(f" Current: {current / 1024 / 1024:.2f} MB")
print(f" Peak: {peak / 1024 / 1024:.2f} MB")
return result
return wrapper
# Line-by-line memory profiling
@profile
def process_large_dataset(data):
"""Process large dataset with memory profiling."""
results = []
for item in data:
processed = expensive_operation(item)
results.append(processed)
return results
Request Profiling
# profiling/request_profiler.py
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
@dataclass
class ProfilePoint:
"""A profiling checkpoint."""
name: str
timestamp: float
duration_from_start: float
memory_mb: Optional[float] = None
@dataclass
class RequestProfile:
"""Profile data for a single request."""
request_id: str
start_time: float = field(default_factory=time.time)
checkpoints: List[ProfilePoint] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
def checkpoint(self, name: str):
"""Add profiling checkpoint."""
now = time.time()
duration = now - self.start_time
# Get memory usage
try:
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
except:
memory_mb = None
point = ProfilePoint(
name=name,
timestamp=now,
duration_from_start=duration,
memory_mb=memory_mb
)
self.checkpoints.append(point)
def get_duration(self) -> float:
"""Get total duration."""
return time.time() - self.start_time
def get_report(self) -> str:
"""Generate profiling report."""
lines = [
f"Request Profile: {self.request_id}",
f"Total Duration: {self.get_duration():.3f}s",
"\nCheckpoints:"
]
for i, point in enumerate(self.checkpoints):
if i > 0:
prev_point = self.checkpoints[i - 1]
delta = point.duration_from_start - prev_point.duration_from_start
lines.append(
f" {point.name}: {point.duration_from_start:.3f}s "
f"(+{delta:.3f}s) "
f"[{point.memory_mb:.1f} MB]" if point.memory_mb else ""
)
else:
lines.append(
f" {point.name}: {point.duration_from_start:.3f}s "
f"[{point.memory_mb:.1f} MB]" if point.memory_mb else ""
)
return "\n".join(lines)
# Usage
profile = RequestProfile(request_id="req_123")
profile.checkpoint("start")
# ... do work ...
profile.checkpoint("llm_call_complete")
# ... more work ...
profile.checkpoint("agent_complete")
print(profile.get_report())
LLM Optimization
Model Selection
# optimization/model_selector.py
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class ModelConfig:
"""Configuration for LLM model."""
name: str
cost_per_1m_input: float # USD per 1M tokens
cost_per_1m_output: float
max_tokens: int
speed: str # "fast", "medium", "slow"
quality: str # "high", "medium", "low"
# Model catalog
MODELS = {
"gpt-4o": ModelConfig(
name="gpt-4o",
cost_per_1m_input=2.50,
cost_per_1m_output=10.00,
max_tokens=128000,
speed="fast",
quality="high"
),
"gpt-4o-mini": ModelConfig(
name="gpt-4o-mini",
cost_per_1m_input=0.15,
cost_per_1m_output=0.60,
max_tokens=128000,
speed="fast",
quality="medium"
),
"claude-3-5-sonnet": ModelConfig(
name="claude-3-5-sonnet-20241022",
cost_per_1m_input=3.00,
cost_per_1m_output=15.00,
max_tokens=200000,
speed="medium",
quality="high"
),
"claude-3-haiku": ModelConfig(
name="claude-3-haiku-20240307",
cost_per_1m_input=0.25,
cost_per_1m_output=1.25,
max_tokens=200000,
speed="fast",
quality="medium"
),
}
def select_model(
task_complexity: str, # "simple", "medium", "complex"
latency_requirement: str, # "fast", "medium"
budget_constraint: str # "low", "medium", "high"
) -> ModelConfig:
"""Select optimal model based on requirements."""
if task_complexity == "simple":
# Use cheapest fast models for simple tasks
if budget_constraint == "low":
return MODELS["gpt-4o-mini"]
else:
return MODELS["claude-3-haiku"]
elif task_complexity == "medium":
# Balance of speed, quality, and cost
if latency_requirement == "fast":
return MODELS["gpt-4o-mini"]
else:
return MODELS["gpt-4o"]
else: # complex
# Use highest quality models
if budget_constraint == "low":
return MODELS["gpt-4o"]
else:
return MODELS["claude-3-5-sonnet"]
# Usage
model = select_model(
task_complexity="simple",
latency_requirement="fast",
budget_constraint="low"
)
Prompt Optimization
# optimization/prompt_optimizer.py
class PromptOptimizer:
"""Optimize prompts for better performance."""
@staticmethod
def compress_prompt(prompt: str, max_tokens: int = 1000) -> str:
"""
Compress verbose prompts while preserving meaning.
Strategies:
- Remove redundant words
- Use concise phrasing
- Remove unnecessary examples
"""
# Remove extra whitespace
compressed = " ".join(prompt.split())
# Remove filler words (carefully)
fillers = ["please", "kindly", "very", "really", "just"]
for filler in fillers:
compressed = compressed.replace(f" {filler} ", " ")
# Estimate tokens (rough: 1 token ~= 4 characters)
estimated_tokens = len(compressed) // 4
if estimated_tokens > max_tokens:
# Truncate if still too long
max_chars = max_tokens * 4
compressed = compressed[:max_chars] + "..."
return compressed
@staticmethod
def add_caching_hints(prompt: str) -> str:
"""
Add hints for LLM provider caching.
Many providers cache common prefixes.
"""
# Add consistent prefix for caching
prefix = "You are a helpful AI assistant.\n\n"
if not prompt.startswith(prefix):
return prefix + prompt
return prompt
@staticmethod
def optimize_for_batch(prompts: List[str]) -> List[str]:
"""
Optimize multiple prompts for batch processing.
- Extract common prefix
- Ensure consistent formatting
"""
if not prompts:
return []
# Find common prefix
common_prefix = os.path.commonprefix(prompts)
if len(common_prefix) > 50: # Worth optimizing
# Remove common prefix from all prompts
optimized = [p[len(common_prefix):] for p in prompts]
return optimized
return prompts
# Usage
optimizer = PromptOptimizer()
# Compress verbose prompt
short_prompt = optimizer.compress_prompt(long_prompt)
# Add caching hints
cached_prompt = optimizer.add_caching_hints(prompt)
Streaming Responses
# optimization/streaming.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
async def stream_llm_response(prompt: str):
"""Stream LLM response for better perceived performance."""
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True
)
async for chunk in llm.astream([HumanMessage(content=prompt)]):
# Yield chunk immediately
yield chunk.content
# Can start processing while waiting for rest
process_partial_response(chunk.content)
# Usage in FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/stream")
async def stream_endpoint(task: str):
"""Stream response to client."""
return StreamingResponse(
stream_llm_response(task),
media_type="text/plain"
)
Batch Processing
# optimization/batching.py
import asyncio
from typing import List
async def batch_llm_calls(prompts: List[str], batch_size: int = 10):
"""Process multiple prompts in batches."""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
# Process batch in parallel
batch_results = await asyncio.gather(*[
llm.ainvoke(prompt) for prompt in batch
])
results.extend(batch_results)
return results
# Usage
prompts = ["Question 1", "Question 2", ..., "Question 100"]
results = await batch_llm_calls(prompts, batch_size=10)
Caching Strategies
Multi-Level Caching
# optimization/multi_level_cache.py
from typing import Optional, Any
from azcore.utils.caching import LRUCache, TTLCache, PersistentCache
class MultiLevelCache:
"""
Multi-level cache with L1 (memory), L2 (Redis), L3 (disk).
Faster caches checked first, slower caches as fallback.
"""
def __init__(self):
# L1: Fast in-memory LRU cache
self.l1_cache = LRUCache(max_size=100, max_memory_mb=50)
# L2: Redis cache (if available)
try:
import redis
self.l2_cache = redis.Redis(host='localhost', port=6379, db=0)
self.has_l2 = True
except:
self.has_l2 = False
# L3: Persistent disk cache
self.l3_cache = PersistentCache(cache_dir="cache", ttl=86400)
def get(self, key: str) -> Optional[Any]:
"""Get value from cache (checks L1, L2, L3 in order)."""
# Try L1 (fastest)
value = self.l1_cache.get(key)
if value is not None:
return value
# Try L2 (medium)
if self.has_l2:
try:
value_bytes = self.l2_cache.get(key)
if value_bytes:
import pickle
value = pickle.loads(value_bytes)
# Populate L1
self.l1_cache.put(key, value)
return value
except:
pass
# Try L3 (slowest)
value = self.l3_cache.get(key)
if value is not None:
# Populate L1 and L2
self.l1_cache.put(key, value)
if self.has_l2:
import pickle
self.l2_cache.set(key, pickle.dumps(value), ex=3600)
return value
return None
def put(self, key: str, value: Any, ttl: int = 3600):
"""Put value in all cache levels."""
# Store in L1
self.l1_cache.put(key, value)
# Store in L2
if self.has_l2:
import pickle
self.l2_cache.set(key, pickle.dumps(value), ex=ttl)
# Store in L3
self.l3_cache.put(key, value)
# Usage
cache = MultiLevelCache()
# Get (checks L1 -> L2 -> L3)
value = cache.get("key")
# Put (stores in all levels)
cache.put("key", value)
Semantic Caching
# optimization/semantic_cache.py
from azcore.utils.caching import SemanticCache
def setup_semantic_caching(threshold: float = 0.95):
"""
Setup semantic caching for similar queries.
Example: "What is 2+2?" and "Calculate 2+2" should hit same cache.
"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
def embed_func(text: str):
return model.encode(text).tolist()
cache = SemanticCache(
embedding_function=embed_func,
similarity_threshold=threshold,
max_size=500
)
return cache
# Usage with CachedLLM
from azcore.utils.cached_llm import CachedLLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
semantic_cache = setup_semantic_caching(threshold=0.95)
cached_llm = CachedLLM(
llm=llm,
cache_type="semantic",
cache=semantic_cache
)
# These will likely hit cache even with different wording
response1 = cached_llm.invoke("What is the capital of France?")
response2 = cached_llm.invoke("Tell me France's capital city") # Cache hit!
Cache Warming
# optimization/cache_warming.py
import asyncio
from typing import List
async def warm_cache(common_queries: List[str]):
"""Pre-populate cache with common queries."""
from langchain_openai import ChatOpenAI
from azcore.utils.cached_llm import CachedLLM
llm = ChatOpenAI(model="gpt-4o-mini")
cached_llm = CachedLLM(llm)
print(f"Warming cache with {len(common_queries)} queries...")
# Process in parallel
tasks = [cached_llm.ainvoke(query) for query in common_queries]
await asyncio.gather(*tasks)
print("Cache warming complete")
# Common queries for a documentation Q&A system
common_queries = [
"How do I install Arc?",
"What is a ReactAgent?",
"How do I create a workflow?",
"What models are supported?",
# ... more common queries
]
# Warm cache on startup
asyncio.run(warm_cache(common_queries))
Concurrency & Parallelism
Async Agent Execution
# optimization/async_agents.py
import asyncio
from typing import List, Dict, Any
async def execute_agents_parallel(
agents: List,
tasks: List[str]
) -> List[Dict[str, Any]]:
"""Execute multiple agents in parallel."""
async def execute_single(agent, task):
"""Execute single agent."""
try:
result = await agent.ainvoke({
"messages": [{"role": "user", "content": task}]
})
return {"status": "success", "result": result, "agent": agent.name}
except Exception as e:
return {"status": "error", "error": str(e), "agent": agent.name}
# Create tasks
tasks_list = [
execute_single(agent, task)
for agent, task in zip(agents, tasks)
]
# Execute in parallel
results = await asyncio.gather(*tasks_list)
return results
# Usage
agents = [researcher, analyzer, writer]
tasks = ["Research AI", "Analyze data", "Write report"]
results = await execute_agents_parallel(agents, tasks)
Connection Pooling
# optimization/connection_pool.py
from langchain_openai import ChatOpenAI
import httpx
# Create HTTP client with connection pooling
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
),
timeout=60.0
)
# Use with LLM
llm = ChatOpenAI(
model="gpt-4o-mini",
http_client=http_client
)
Rate Limiting Optimization
# optimization/rate_limiter.py
import time
import asyncio
from collections import deque
from typing import Callable, Any
class AdaptiveRateLimiter:
"""
Adaptive rate limiter that adjusts based on API responses.
Starts conservative, increases if successful, decreases on errors.
"""
def __init__(
self,
initial_rate: int = 10,
min_rate: int = 1,
max_rate: int = 100,
window_seconds: int = 60
):
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.window_seconds = window_seconds
self.requests: deque = deque()
self.success_count = 0
self.error_count = 0
async def acquire(self):
"""Wait until request can proceed."""
now = time.time()
# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
# Wait if at limit
while len(self.requests) >= self.current_rate:
await asyncio.sleep(0.1)
now = time.time()
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
# Add current request
self.requests.append(now)
def on_success(self):
"""Record successful request."""
self.success_count += 1
# Increase rate if consistently successful
if self.success_count > 20 and self.error_count == 0:
self.current_rate = min(self.current_rate + 5, self.max_rate)
self.success_count = 0
def on_error(self):
"""Record failed request."""
self.error_count += 1
# Decrease rate on errors
if self.error_count > 3:
self.current_rate = max(self.current_rate - 5, self.min_rate)
self.error_count = 0
self.success_count = 0
# Usage
rate_limiter = AdaptiveRateLimiter(initial_rate=10, max_rate=50)
async def rate_limited_call(func: Callable, *args, **kwargs) -> Any:
"""Execute function with rate limiting."""
await rate_limiter.acquire()
try:
result = await func(*args, **kwargs)
rate_limiter.on_success()
return result
except Exception as e:
rate_limiter.on_error()
raise
Memory Optimization
Memory-Efficient Data Structures
# optimization/memory_efficient.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterator
import sys
@dataclass(slots=True) # Use __slots__ to reduce memory
class Message:
"""Memory-efficient message storage."""
role: str
content: str
def __sizeof__(self) -> int:
"""Get memory size."""
return (
sys.getsizeof(self.role) +
sys.getsizeof(self.content)
)
# Generator for large datasets (lazy evaluation)
def process_large_dataset(filepath: str) -> Iterator[dict]:
"""
Process large dataset without loading all into memory.
Uses generator to process one item at a time.
"""
with open(filepath, 'r') as f:
for line in f:
# Process line
data = json.loads(line)
yield process_item(data)
# Usage
for result in process_large_dataset("large_file.jsonl"):
# Process result
save_result(result)
# Previous results are garbage collected
Conversation History Management
# optimization/history_manager.py
from typing import List, Dict
from collections import deque
class ConversationHistoryManager:
"""
Manage conversation history with sliding window.
Keeps only recent messages to reduce token usage.
"""
def __init__(
self,
max_messages: int = 10,
max_tokens: int = 4000,
system_prompt: str = ""
):
self.max_messages = max_messages
self.max_tokens = max_tokens
self.system_prompt = system_prompt
self.messages: deque = deque(maxlen=max_messages)
def add_message(self, role: str, content: str):
"""Add message to history."""
self.messages.append({"role": role, "content": content})
def get_messages(self) -> List[Dict[str, str]]:
"""Get messages for LLM (with token limit)."""
messages = []
# Always include system prompt
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
# Add messages until token limit
total_tokens = self._estimate_tokens(self.system_prompt)
for msg in reversed(self.messages):
msg_tokens = self._estimate_tokens(msg["content"])
if total_tokens + msg_tokens > self.max_tokens:
break
messages.insert(1, msg) # Insert after system prompt
total_tokens += msg_tokens
return messages
def _estimate_tokens(self, text: str) -> int:
"""Estimate token count (rough: 1 token ~= 4 chars)."""
return len(text) // 4
def clear(self):
"""Clear conversation history."""
self.messages.clear()
# Usage
history = ConversationHistoryManager(max_messages=20, max_tokens=4000)
history.add_message("user", "Hello!")
history.add_message("assistant", "Hi! How can I help?")
history.add_message("user", "Tell me about Arc")
# Get messages for LLM (automatically truncated)
messages = history.get_messages()
Network Optimization
Request Retry Strategy
# optimization/retry.py
import asyncio
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
class RateLimitError(Exception):
"""Rate limit exceeded."""
pass
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, TimeoutError))
)
async def resilient_llm_call(llm, prompt: str):
"""LLM call with automatic retry."""
try:
return await llm.ainvoke(prompt)
except Exception as e:
if "rate_limit" in str(e).lower():
raise RateLimitError(str(e))
elif "timeout" in str(e).lower():
raise TimeoutError(str(e))
else:
raise
Request Compression
# optimization/compression.py
import gzip
import json
def compress_request(data: dict) -> bytes:
"""Compress request payload."""
json_str = json.dumps(data)
return gzip.compress(json_str.encode('utf-8'))
def decompress_response(data: bytes) -> dict:
"""Decompress response payload."""
json_str = gzip.decompress(data).decode('utf-8')
return json.loads(json_str)
Database Optimization
Query Optimization
# optimization/db_queries.py
from sqlalchemy import select, func
from sqlalchemy.orm import joinedload
async def get_agent_history_optimized(agent_id: str, limit: int = 100):
"""
Get agent history with optimized queries.
Uses:
- Eager loading to avoid N+1 queries
- Proper indexing
- Pagination
"""
query = (
select(AgentExecution)
.where(AgentExecution.agent_id == agent_id)
.options(joinedload(AgentExecution.results)) # Eager load
.order_by(AgentExecution.created_at.desc())
.limit(limit)
)
result = await session.execute(query)
return result.scalars().all()
Cost Optimization
Token Usage Optimization
# optimization/token_optimizer.py
from typing import List, Dict
class TokenOptimizer:
"""Optimize token usage to reduce costs."""
@staticmethod
def truncate_messages(
messages: List[Dict[str, str]],
max_tokens: int = 4000
) -> List[Dict[str, str]]:
"""Truncate messages to fit within token limit."""
total_tokens = 0
truncated = []
for msg in reversed(messages):
msg_tokens = len(msg["content"]) // 4 # Rough estimate
if total_tokens + msg_tokens > max_tokens:
# Truncate this message
remaining_tokens = max_tokens - total_tokens
remaining_chars = remaining_tokens * 4
truncated_content = msg["content"][:remaining_chars] + "..."
truncated.insert(0, {"role": msg["role"], "content": truncated_content})
break
truncated.insert(0, msg)
total_tokens += msg_tokens
return truncated
@staticmethod
def summarize_history(messages: List[Dict[str, str]]) -> str:
"""Summarize conversation history to reduce tokens."""
# Use smaller model to summarize
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
conversation = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages
])
summary_prompt = f"""Summarize this conversation in 2-3 sentences:
{conversation}
Summary:"""
summary = llm.invoke(summary_prompt)
return summary.content
# Usage
optimizer = TokenOptimizer()
# Truncate long messages
truncated = optimizer.truncate_messages(messages, max_tokens=2000)
# Summarize old messages
if len(messages) > 20:
summary = optimizer.summarize_history(messages[:-10])
recent_messages = messages[-10:]
# Use summary + recent messages
optimized_messages = [
{"role": "system", "content": f"Previous conversation: {summary}"},
*recent_messages
]
Model Routing by Cost
# optimization/cost_router.py
class CostAwareRouter:
"""Route requests to models based on cost constraints."""
def __init__(self, daily_budget: float = 100.0):
self.daily_budget = daily_budget
self.daily_spend = 0.0
def route_request(self, complexity: str, tokens_estimate: int) -> str:
"""Route request to appropriate model based on budget."""
remaining_budget = self.daily_budget - self.daily_spend
# Calculate cost for different models
cost_gpt4o = (tokens_estimate / 1_000_000) * 5.0 # Avg of input/output
cost_gpt4o_mini = (tokens_estimate / 1_000_000) * 0.375
if complexity == "high":
# High complexity needs good model
if cost_gpt4o < remaining_budget * 0.1: # 10% of budget
return "gpt-4o"
else:
# Fallback to cheaper model
return "gpt-4o-mini"
elif complexity == "medium":
# Medium can use either
if cost_gpt4o < remaining_budget * 0.05:
return "gpt-4o"
else:
return "gpt-4o-mini"
else: # low
# Always use cheap model for simple tasks
return "gpt-4o-mini"
def record_spend(self, amount: float):
"""Record spending."""
self.daily_spend += amount
# Usage
router = CostAwareRouter(daily_budget=100.0)
model = router.route_request(complexity="medium", tokens_estimate=1000)
Benchmarking
Performance Benchmarks
# benchmarking/benchmark.py
import time
import statistics
from typing import Callable, List, Dict, Any
class Benchmark:
"""Benchmark tool for performance testing."""
def __init__(self, name: str):
self.name = name
self.results: List[float] = []
def run(
self,
func: Callable,
iterations: int = 100,
warmup: int = 10
) -> Dict[str, Any]:
"""Run benchmark."""
print(f"Running benchmark: {self.name}")
# Warmup
for _ in range(warmup):
func()
# Benchmark
for i in range(iterations):
start = time.time()
func()
duration = time.time() - start
self.results.append(duration)
if (i + 1) % 10 == 0:
print(f" Progress: {i + 1}/{iterations}")
# Calculate statistics
return self.get_statistics()
def get_statistics(self) -> Dict[str, float]:
"""Get benchmark statistics."""
if not self.results:
return {}
return {
"name": self.name,
"iterations": len(self.results),
"mean": statistics.mean(self.results),
"median": statistics.median(self.results),
"stdev": statistics.stdev(self.results) if len(self.results) > 1 else 0,
"min": min(self.results),
"max": max(self.results),
"p95": statistics.quantiles(self.results, n=20)[18], # 95th percentile
"p99": statistics.quantiles(self.results, n=100)[98], # 99th percentile
}
def print_report(self):
"""Print benchmark report."""
stats = self.get_statistics()
print(f"\nBenchmark Results: {stats['name']}")
print(f" Iterations: {stats['iterations']}")
print(f" Mean: {stats['mean']:.3f}s")
print(f" Median: {stats['median']:.3f}s")
print(f" Std Dev: {stats['stdev']:.3f}s")
print(f" Min: {stats['min']:.3f}s")
print(f" Max: {stats['max']:.3f}s")
print(f" P95: {stats['p95']:.3f}s")
print(f" P99: {stats['p99']:.3f}s")
# Usage
def test_agent_execution():
agent.invoke({"messages": [{"role": "user", "content": "test"}]})
benchmark = Benchmark("Agent Execution")
results = benchmark.run(test_agent_execution, iterations=100)
benchmark.print_report()
Best Practices
1. Use Appropriate Models
# Use cheaper models for simple tasks
simple_llm = ChatOpenAI(model="gpt-4o-mini") # Fast & cheap
# Use powerful models for complex tasks
complex_llm = ChatOpenAI(model="gpt-4o") # Slower but better
2. Enable Caching
from azcore.utils.cached_llm import CachedLLM
# Always wrap LLMs with caching
llm = ChatOpenAI(model="gpt-4o-mini")
cached_llm = CachedLLM(llm, cache_type="exact")
3. Use Async When Possible
# Sequential (slow)
results = [agent.invoke(task) for task in tasks]
# Parallel (fast)
results = await asyncio.gather(*[agent.ainvoke(task) for task in tasks])
4. Optimize Prompts
# Bad: Verbose prompt with many examples
prompt = """You are a helpful assistant. Please be very helpful and answer
questions carefully. Here are some examples:
Example 1: ...
Example 2: ...
[many more examples]
Now answer: What is 2+2?"""
# Good: Concise prompt
prompt = "You are a helpful assistant.\n\nWhat is 2+2?"
5. Monitor and Profile
# Always monitor performance
from arc.monitoring import metrics
with metrics.timer("agent_execution"):
result = agent.invoke(task)
6. Set Appropriate Timeouts
llm = ChatOpenAI(
model="gpt-4o-mini",
request_timeout=30, # 30 second timeout
max_retries=3
)
7. Batch When Possible
# Process multiple items in batch
results = llm.batch(prompts) # Much faster than loop