• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Guides

Performance Optimization Guide

Optimize Azcore applications for better performance and efficiency.

Comprehensive guide for optimizing Az Core framework applications for maximum performance, cost-efficiency, and scalability.

Overview

This guide covers optimization strategies for LLM calls, caching, concurrency, memory management, and overall system performance.

Performance Profiling

CPU Profiling

# profiling/cpu_profiler.py
import cProfile
import pstats
import io
from functools import wraps
from typing import Callable

def profile_cpu(sort_by: str = 'cumulative', top_n: int = 20):
    """Decorator to profile CPU usage."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            profiler = cProfile.Profile()
            profiler.enable()

            result = func(*args, **kwargs)

            profiler.disable()

            # Print statistics
            s = io.StringIO()
            ps = pstats.Stats(profiler, stream=s).sort_stats(sort_by)
            ps.print_stats(top_n)

            print(s.getvalue())

            return result

        return wrapper
    return decorator


# Usage
@profile_cpu(sort_by='cumulative', top_n=10)
def execute_workflow(task: str):
    # Workflow logic
    pass

Memory Profiling

# profiling/memory_profiler.py
from memory_profiler import profile
import tracemalloc
from typing import Callable
from functools import wraps

def profile_memory(func: Callable) -> Callable:
    """Decorator to profile memory usage."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        tracemalloc.start()

        result = func(*args, **kwargs)

        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()

        print(f"Memory usage:")
        print(f"  Current: {current / 1024 / 1024:.2f} MB")
        print(f"  Peak: {peak / 1024 / 1024:.2f} MB")

        return result

    return wrapper


# Line-by-line memory profiling
@profile
def process_large_dataset(data):
    """Process large dataset with memory profiling."""
    results = []
    for item in data:
        processed = expensive_operation(item)
        results.append(processed)
    return results

Request Profiling

# profiling/request_profiler.py
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime

@dataclass
class ProfilePoint:
    """A profiling checkpoint."""
    name: str
    timestamp: float
    duration_from_start: float
    memory_mb: Optional[float] = None

@dataclass
class RequestProfile:
    """Profile data for a single request."""
    request_id: str
    start_time: float = field(default_factory=time.time)
    checkpoints: List[ProfilePoint] = field(default_factory=list)
    metadata: Dict = field(default_factory=dict)

    def checkpoint(self, name: str):
        """Add profiling checkpoint."""
        now = time.time()
        duration = now - self.start_time

        # Get memory usage
        try:
            import psutil
            process = psutil.Process()
            memory_mb = process.memory_info().rss / 1024 / 1024
        except:
            memory_mb = None

        point = ProfilePoint(
            name=name,
            timestamp=now,
            duration_from_start=duration,
            memory_mb=memory_mb
        )

        self.checkpoints.append(point)

    def get_duration(self) -> float:
        """Get total duration."""
        return time.time() - self.start_time

    def get_report(self) -> str:
        """Generate profiling report."""
        lines = [
            f"Request Profile: {self.request_id}",
            f"Total Duration: {self.get_duration():.3f}s",
            "\nCheckpoints:"
        ]

        for i, point in enumerate(self.checkpoints):
            if i > 0:
                prev_point = self.checkpoints[i - 1]
                delta = point.duration_from_start - prev_point.duration_from_start
                lines.append(
                    f"  {point.name}: {point.duration_from_start:.3f}s "
                    f"(+{delta:.3f}s) "
                    f"[{point.memory_mb:.1f} MB]" if point.memory_mb else ""
                )
            else:
                lines.append(
                    f"  {point.name}: {point.duration_from_start:.3f}s "
                    f"[{point.memory_mb:.1f} MB]" if point.memory_mb else ""
                )

        return "\n".join(lines)


# Usage
profile = RequestProfile(request_id="req_123")

profile.checkpoint("start")
# ... do work ...

profile.checkpoint("llm_call_complete")
# ... more work ...

profile.checkpoint("agent_complete")

print(profile.get_report())

LLM Optimization

Model Selection

# optimization/model_selector.py
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class ModelConfig:
    """Configuration for LLM model."""
    name: str
    cost_per_1m_input: float  # USD per 1M tokens
    cost_per_1m_output: float
    max_tokens: int
    speed: str  # "fast", "medium", "slow"
    quality: str  # "high", "medium", "low"

# Model catalog
MODELS = {
    "gpt-4o": ModelConfig(
        name="gpt-4o",
        cost_per_1m_input=2.50,
        cost_per_1m_output=10.00,
        max_tokens=128000,
        speed="fast",
        quality="high"
    ),
    "gpt-4o-mini": ModelConfig(
        name="gpt-4o-mini",
        cost_per_1m_input=0.15,
        cost_per_1m_output=0.60,
        max_tokens=128000,
        speed="fast",
        quality="medium"
    ),
    "claude-3-5-sonnet": ModelConfig(
        name="claude-3-5-sonnet-20241022",
        cost_per_1m_input=3.00,
        cost_per_1m_output=15.00,
        max_tokens=200000,
        speed="medium",
        quality="high"
    ),
    "claude-3-haiku": ModelConfig(
        name="claude-3-haiku-20240307",
        cost_per_1m_input=0.25,
        cost_per_1m_output=1.25,
        max_tokens=200000,
        speed="fast",
        quality="medium"
    ),
}

def select_model(
    task_complexity: str,  # "simple", "medium", "complex"
    latency_requirement: str,  # "fast", "medium"
    budget_constraint: str  # "low", "medium", "high"
) -> ModelConfig:
    """Select optimal model based on requirements."""

    if task_complexity == "simple":
        # Use cheapest fast models for simple tasks
        if budget_constraint == "low":
            return MODELS["gpt-4o-mini"]
        else:
            return MODELS["claude-3-haiku"]

    elif task_complexity == "medium":
        # Balance of speed, quality, and cost
        if latency_requirement == "fast":
            return MODELS["gpt-4o-mini"]
        else:
            return MODELS["gpt-4o"]

    else:  # complex
        # Use highest quality models
        if budget_constraint == "low":
            return MODELS["gpt-4o"]
        else:
            return MODELS["claude-3-5-sonnet"]


# Usage
model = select_model(
    task_complexity="simple",
    latency_requirement="fast",
    budget_constraint="low"
)

Prompt Optimization

# optimization/prompt_optimizer.py

class PromptOptimizer:
    """Optimize prompts for better performance."""

    @staticmethod
    def compress_prompt(prompt: str, max_tokens: int = 1000) -> str:
        """
        Compress verbose prompts while preserving meaning.

        Strategies:
        - Remove redundant words
        - Use concise phrasing
        - Remove unnecessary examples
        """
        # Remove extra whitespace
        compressed = " ".join(prompt.split())

        # Remove filler words (carefully)
        fillers = ["please", "kindly", "very", "really", "just"]
        for filler in fillers:
            compressed = compressed.replace(f" {filler} ", " ")

        # Estimate tokens (rough: 1 token ~= 4 characters)
        estimated_tokens = len(compressed) // 4

        if estimated_tokens > max_tokens:
            # Truncate if still too long
            max_chars = max_tokens * 4
            compressed = compressed[:max_chars] + "..."

        return compressed

    @staticmethod
    def add_caching_hints(prompt: str) -> str:
        """
        Add hints for LLM provider caching.

        Many providers cache common prefixes.
        """
        # Add consistent prefix for caching
        prefix = "You are a helpful AI assistant.\n\n"
        if not prompt.startswith(prefix):
            return prefix + prompt
        return prompt

    @staticmethod
    def optimize_for_batch(prompts: List[str]) -> List[str]:
        """
        Optimize multiple prompts for batch processing.

        - Extract common prefix
        - Ensure consistent formatting
        """
        if not prompts:
            return []

        # Find common prefix
        common_prefix = os.path.commonprefix(prompts)

        if len(common_prefix) > 50:  # Worth optimizing
            # Remove common prefix from all prompts
            optimized = [p[len(common_prefix):] for p in prompts]
            return optimized

        return prompts


# Usage
optimizer = PromptOptimizer()

# Compress verbose prompt
short_prompt = optimizer.compress_prompt(long_prompt)

# Add caching hints
cached_prompt = optimizer.add_caching_hints(prompt)

Streaming Responses

# optimization/streaming.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

async def stream_llm_response(prompt: str):
    """Stream LLM response for better perceived performance."""
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        streaming=True
    )

    async for chunk in llm.astream([HumanMessage(content=prompt)]):
        # Yield chunk immediately
        yield chunk.content

        # Can start processing while waiting for rest
        process_partial_response(chunk.content)


# Usage in FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/stream")
async def stream_endpoint(task: str):
    """Stream response to client."""
    return StreamingResponse(
        stream_llm_response(task),
        media_type="text/plain"
    )

Batch Processing

# optimization/batching.py
import asyncio
from typing import List

async def batch_llm_calls(prompts: List[str], batch_size: int = 10):
    """Process multiple prompts in batches."""
    from langchain_openai import ChatOpenAI

    llm = ChatOpenAI(model="gpt-4o-mini")

    results = []

    for i in range(0, len(prompts), batch_size):
        batch = prompts[i:i + batch_size]

        # Process batch in parallel
        batch_results = await asyncio.gather(*[
            llm.ainvoke(prompt) for prompt in batch
        ])

        results.extend(batch_results)

    return results


# Usage
prompts = ["Question 1", "Question 2", ..., "Question 100"]
results = await batch_llm_calls(prompts, batch_size=10)

Caching Strategies

Multi-Level Caching

# optimization/multi_level_cache.py
from typing import Optional, Any
from azcore.utils.caching import LRUCache, TTLCache, PersistentCache

class MultiLevelCache:
    """
    Multi-level cache with L1 (memory), L2 (Redis), L3 (disk).

    Faster caches checked first, slower caches as fallback.
    """

    def __init__(self):
        # L1: Fast in-memory LRU cache
        self.l1_cache = LRUCache(max_size=100, max_memory_mb=50)

        # L2: Redis cache (if available)
        try:
            import redis
            self.l2_cache = redis.Redis(host='localhost', port=6379, db=0)
            self.has_l2 = True
        except:
            self.has_l2 = False

        # L3: Persistent disk cache
        self.l3_cache = PersistentCache(cache_dir="cache", ttl=86400)

    def get(self, key: str) -> Optional[Any]:
        """Get value from cache (checks L1, L2, L3 in order)."""

        # Try L1 (fastest)
        value = self.l1_cache.get(key)
        if value is not None:
            return value

        # Try L2 (medium)
        if self.has_l2:
            try:
                value_bytes = self.l2_cache.get(key)
                if value_bytes:
                    import pickle
                    value = pickle.loads(value_bytes)

                    # Populate L1
                    self.l1_cache.put(key, value)
                    return value
            except:
                pass

        # Try L3 (slowest)
        value = self.l3_cache.get(key)
        if value is not None:
            # Populate L1 and L2
            self.l1_cache.put(key, value)

            if self.has_l2:
                import pickle
                self.l2_cache.set(key, pickle.dumps(value), ex=3600)

            return value

        return None

    def put(self, key: str, value: Any, ttl: int = 3600):
        """Put value in all cache levels."""

        # Store in L1
        self.l1_cache.put(key, value)

        # Store in L2
        if self.has_l2:
            import pickle
            self.l2_cache.set(key, pickle.dumps(value), ex=ttl)

        # Store in L3
        self.l3_cache.put(key, value)


# Usage
cache = MultiLevelCache()

# Get (checks L1 -> L2 -> L3)
value = cache.get("key")

# Put (stores in all levels)
cache.put("key", value)

Semantic Caching

# optimization/semantic_cache.py
from azcore.utils.caching import SemanticCache

def setup_semantic_caching(threshold: float = 0.95):
    """
    Setup semantic caching for similar queries.

    Example: "What is 2+2?" and "Calculate 2+2" should hit same cache.
    """
    from sentence_transformers import SentenceTransformer

    model = SentenceTransformer('all-MiniLM-L6-v2')

    def embed_func(text: str):
        return model.encode(text).tolist()

    cache = SemanticCache(
        embedding_function=embed_func,
        similarity_threshold=threshold,
        max_size=500
    )

    return cache


# Usage with CachedLLM
from azcore.utils.cached_llm import CachedLLM
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")
semantic_cache = setup_semantic_caching(threshold=0.95)

cached_llm = CachedLLM(
    llm=llm,
    cache_type="semantic",
    cache=semantic_cache
)

# These will likely hit cache even with different wording
response1 = cached_llm.invoke("What is the capital of France?")
response2 = cached_llm.invoke("Tell me France's capital city")  # Cache hit!

Cache Warming

# optimization/cache_warming.py
import asyncio
from typing import List

async def warm_cache(common_queries: List[str]):
    """Pre-populate cache with common queries."""
    from langchain_openai import ChatOpenAI
    from azcore.utils.cached_llm import CachedLLM

    llm = ChatOpenAI(model="gpt-4o-mini")
    cached_llm = CachedLLM(llm)

    print(f"Warming cache with {len(common_queries)} queries...")

    # Process in parallel
    tasks = [cached_llm.ainvoke(query) for query in common_queries]
    await asyncio.gather(*tasks)

    print("Cache warming complete")


# Common queries for a documentation Q&A system
common_queries = [
    "How do I install Arc?",
    "What is a ReactAgent?",
    "How do I create a workflow?",
    "What models are supported?",
    # ... more common queries
]

# Warm cache on startup
asyncio.run(warm_cache(common_queries))

Concurrency & Parallelism

Async Agent Execution

# optimization/async_agents.py
import asyncio
from typing import List, Dict, Any

async def execute_agents_parallel(
    agents: List,
    tasks: List[str]
) -> List[Dict[str, Any]]:
    """Execute multiple agents in parallel."""

    async def execute_single(agent, task):
        """Execute single agent."""
        try:
            result = await agent.ainvoke({
                "messages": [{"role": "user", "content": task}]
            })
            return {"status": "success", "result": result, "agent": agent.name}
        except Exception as e:
            return {"status": "error", "error": str(e), "agent": agent.name}

    # Create tasks
    tasks_list = [
        execute_single(agent, task)
        for agent, task in zip(agents, tasks)
    ]

    # Execute in parallel
    results = await asyncio.gather(*tasks_list)

    return results


# Usage
agents = [researcher, analyzer, writer]
tasks = ["Research AI", "Analyze data", "Write report"]

results = await execute_agents_parallel(agents, tasks)

Connection Pooling

# optimization/connection_pool.py
from langchain_openai import ChatOpenAI
import httpx

# Create HTTP client with connection pooling
http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    ),
    timeout=60.0
)

# Use with LLM
llm = ChatOpenAI(
    model="gpt-4o-mini",
    http_client=http_client
)

Rate Limiting Optimization

# optimization/rate_limiter.py
import time
import asyncio
from collections import deque
from typing import Callable, Any

class AdaptiveRateLimiter:
    """
    Adaptive rate limiter that adjusts based on API responses.

    Starts conservative, increases if successful, decreases on errors.
    """

    def __init__(
        self,
        initial_rate: int = 10,
        min_rate: int = 1,
        max_rate: int = 100,
        window_seconds: int = 60
    ):
        self.current_rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        self.window_seconds = window_seconds

        self.requests: deque = deque()
        self.success_count = 0
        self.error_count = 0

    async def acquire(self):
        """Wait until request can proceed."""
        now = time.time()

        # Remove old requests outside window
        while self.requests and self.requests[0] < now - self.window_seconds:
            self.requests.popleft()

        # Wait if at limit
        while len(self.requests) >= self.current_rate:
            await asyncio.sleep(0.1)

            now = time.time()
            while self.requests and self.requests[0] < now - self.window_seconds:
                self.requests.popleft()

        # Add current request
        self.requests.append(now)

    def on_success(self):
        """Record successful request."""
        self.success_count += 1

        # Increase rate if consistently successful
        if self.success_count > 20 and self.error_count == 0:
            self.current_rate = min(self.current_rate + 5, self.max_rate)
            self.success_count = 0

    def on_error(self):
        """Record failed request."""
        self.error_count += 1

        # Decrease rate on errors
        if self.error_count > 3:
            self.current_rate = max(self.current_rate - 5, self.min_rate)
            self.error_count = 0
            self.success_count = 0


# Usage
rate_limiter = AdaptiveRateLimiter(initial_rate=10, max_rate=50)

async def rate_limited_call(func: Callable, *args, **kwargs) -> Any:
    """Execute function with rate limiting."""
    await rate_limiter.acquire()

    try:
        result = await func(*args, **kwargs)
        rate_limiter.on_success()
        return result
    except Exception as e:
        rate_limiter.on_error()
        raise

Memory Optimization

Memory-Efficient Data Structures

# optimization/memory_efficient.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterator
import sys

@dataclass(slots=True)  # Use __slots__ to reduce memory
class Message:
    """Memory-efficient message storage."""
    role: str
    content: str

    def __sizeof__(self) -> int:
        """Get memory size."""
        return (
            sys.getsizeof(self.role) +
            sys.getsizeof(self.content)
        )

# Generator for large datasets (lazy evaluation)
def process_large_dataset(filepath: str) -> Iterator[dict]:
    """
    Process large dataset without loading all into memory.

    Uses generator to process one item at a time.
    """
    with open(filepath, 'r') as f:
        for line in f:
            # Process line
            data = json.loads(line)
            yield process_item(data)

# Usage
for result in process_large_dataset("large_file.jsonl"):
    # Process result
    save_result(result)
    # Previous results are garbage collected

Conversation History Management

# optimization/history_manager.py
from typing import List, Dict
from collections import deque

class ConversationHistoryManager:
    """
    Manage conversation history with sliding window.

    Keeps only recent messages to reduce token usage.
    """

    def __init__(
        self,
        max_messages: int = 10,
        max_tokens: int = 4000,
        system_prompt: str = ""
    ):
        self.max_messages = max_messages
        self.max_tokens = max_tokens
        self.system_prompt = system_prompt
        self.messages: deque = deque(maxlen=max_messages)

    def add_message(self, role: str, content: str):
        """Add message to history."""
        self.messages.append({"role": role, "content": content})

    def get_messages(self) -> List[Dict[str, str]]:
        """Get messages for LLM (with token limit)."""
        messages = []

        # Always include system prompt
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt})

        # Add messages until token limit
        total_tokens = self._estimate_tokens(self.system_prompt)

        for msg in reversed(self.messages):
            msg_tokens = self._estimate_tokens(msg["content"])

            if total_tokens + msg_tokens > self.max_tokens:
                break

            messages.insert(1, msg)  # Insert after system prompt
            total_tokens += msg_tokens

        return messages

    def _estimate_tokens(self, text: str) -> int:
        """Estimate token count (rough: 1 token ~= 4 chars)."""
        return len(text) // 4

    def clear(self):
        """Clear conversation history."""
        self.messages.clear()


# Usage
history = ConversationHistoryManager(max_messages=20, max_tokens=4000)

history.add_message("user", "Hello!")
history.add_message("assistant", "Hi! How can I help?")
history.add_message("user", "Tell me about Arc")

# Get messages for LLM (automatically truncated)
messages = history.get_messages()

Network Optimization

Request Retry Strategy

# optimization/retry.py
import asyncio
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)

class RateLimitError(Exception):
    """Rate limit exceeded."""
    pass

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((RateLimitError, TimeoutError))
)
async def resilient_llm_call(llm, prompt: str):
    """LLM call with automatic retry."""
    try:
        return await llm.ainvoke(prompt)
    except Exception as e:
        if "rate_limit" in str(e).lower():
            raise RateLimitError(str(e))
        elif "timeout" in str(e).lower():
            raise TimeoutError(str(e))
        else:
            raise

Request Compression

# optimization/compression.py
import gzip
import json

def compress_request(data: dict) -> bytes:
    """Compress request payload."""
    json_str = json.dumps(data)
    return gzip.compress(json_str.encode('utf-8'))

def decompress_response(data: bytes) -> dict:
    """Decompress response payload."""
    json_str = gzip.decompress(data).decode('utf-8')
    return json.loads(json_str)

Database Optimization

Query Optimization

# optimization/db_queries.py
from sqlalchemy import select, func
from sqlalchemy.orm import joinedload

async def get_agent_history_optimized(agent_id: str, limit: int = 100):
    """
    Get agent history with optimized queries.

    Uses:
    - Eager loading to avoid N+1 queries
    - Proper indexing
    - Pagination
    """
    query = (
        select(AgentExecution)
        .where(AgentExecution.agent_id == agent_id)
        .options(joinedload(AgentExecution.results))  # Eager load
        .order_by(AgentExecution.created_at.desc())
        .limit(limit)
    )

    result = await session.execute(query)
    return result.scalars().all()

Cost Optimization

Token Usage Optimization

# optimization/token_optimizer.py
from typing import List, Dict

class TokenOptimizer:
    """Optimize token usage to reduce costs."""

    @staticmethod
    def truncate_messages(
        messages: List[Dict[str, str]],
        max_tokens: int = 4000
    ) -> List[Dict[str, str]]:
        """Truncate messages to fit within token limit."""
        total_tokens = 0
        truncated = []

        for msg in reversed(messages):
            msg_tokens = len(msg["content"]) // 4  # Rough estimate

            if total_tokens + msg_tokens > max_tokens:
                # Truncate this message
                remaining_tokens = max_tokens - total_tokens
                remaining_chars = remaining_tokens * 4

                truncated_content = msg["content"][:remaining_chars] + "..."
                truncated.insert(0, {"role": msg["role"], "content": truncated_content})
                break

            truncated.insert(0, msg)
            total_tokens += msg_tokens

        return truncated

    @staticmethod
    def summarize_history(messages: List[Dict[str, str]]) -> str:
        """Summarize conversation history to reduce tokens."""
        # Use smaller model to summarize
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-4o-mini")

        conversation = "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages
        ])

        summary_prompt = f"""Summarize this conversation in 2-3 sentences:

{conversation}

Summary:"""

        summary = llm.invoke(summary_prompt)

        return summary.content


# Usage
optimizer = TokenOptimizer()

# Truncate long messages
truncated = optimizer.truncate_messages(messages, max_tokens=2000)

# Summarize old messages
if len(messages) > 20:
    summary = optimizer.summarize_history(messages[:-10])
    recent_messages = messages[-10:]

    # Use summary + recent messages
    optimized_messages = [
        {"role": "system", "content": f"Previous conversation: {summary}"},
        *recent_messages
    ]

Model Routing by Cost

# optimization/cost_router.py

class CostAwareRouter:
    """Route requests to models based on cost constraints."""

    def __init__(self, daily_budget: float = 100.0):
        self.daily_budget = daily_budget
        self.daily_spend = 0.0

    def route_request(self, complexity: str, tokens_estimate: int) -> str:
        """Route request to appropriate model based on budget."""

        remaining_budget = self.daily_budget - self.daily_spend

        # Calculate cost for different models
        cost_gpt4o = (tokens_estimate / 1_000_000) * 5.0  # Avg of input/output
        cost_gpt4o_mini = (tokens_estimate / 1_000_000) * 0.375

        if complexity == "high":
            # High complexity needs good model
            if cost_gpt4o < remaining_budget * 0.1:  # 10% of budget
                return "gpt-4o"
            else:
                # Fallback to cheaper model
                return "gpt-4o-mini"

        elif complexity == "medium":
            # Medium can use either
            if cost_gpt4o < remaining_budget * 0.05:
                return "gpt-4o"
            else:
                return "gpt-4o-mini"

        else:  # low
            # Always use cheap model for simple tasks
            return "gpt-4o-mini"

    def record_spend(self, amount: float):
        """Record spending."""
        self.daily_spend += amount


# Usage
router = CostAwareRouter(daily_budget=100.0)

model = router.route_request(complexity="medium", tokens_estimate=1000)

Benchmarking

Performance Benchmarks

# benchmarking/benchmark.py
import time
import statistics
from typing import Callable, List, Dict, Any

class Benchmark:
    """Benchmark tool for performance testing."""

    def __init__(self, name: str):
        self.name = name
        self.results: List[float] = []

    def run(
        self,
        func: Callable,
        iterations: int = 100,
        warmup: int = 10
    ) -> Dict[str, Any]:
        """Run benchmark."""
        print(f"Running benchmark: {self.name}")

        # Warmup
        for _ in range(warmup):
            func()

        # Benchmark
        for i in range(iterations):
            start = time.time()
            func()
            duration = time.time() - start
            self.results.append(duration)

            if (i + 1) % 10 == 0:
                print(f"  Progress: {i + 1}/{iterations}")

        # Calculate statistics
        return self.get_statistics()

    def get_statistics(self) -> Dict[str, float]:
        """Get benchmark statistics."""
        if not self.results:
            return {}

        return {
            "name": self.name,
            "iterations": len(self.results),
            "mean": statistics.mean(self.results),
            "median": statistics.median(self.results),
            "stdev": statistics.stdev(self.results) if len(self.results) > 1 else 0,
            "min": min(self.results),
            "max": max(self.results),
            "p95": statistics.quantiles(self.results, n=20)[18],  # 95th percentile
            "p99": statistics.quantiles(self.results, n=100)[98],  # 99th percentile
        }

    def print_report(self):
        """Print benchmark report."""
        stats = self.get_statistics()

        print(f"\nBenchmark Results: {stats['name']}")
        print(f"  Iterations: {stats['iterations']}")
        print(f"  Mean: {stats['mean']:.3f}s")
        print(f"  Median: {stats['median']:.3f}s")
        print(f"  Std Dev: {stats['stdev']:.3f}s")
        print(f"  Min: {stats['min']:.3f}s")
        print(f"  Max: {stats['max']:.3f}s")
        print(f"  P95: {stats['p95']:.3f}s")
        print(f"  P99: {stats['p99']:.3f}s")


# Usage
def test_agent_execution():
    agent.invoke({"messages": [{"role": "user", "content": "test"}]})

benchmark = Benchmark("Agent Execution")
results = benchmark.run(test_agent_execution, iterations=100)
benchmark.print_report()

Best Practices

1. Use Appropriate Models

# Use cheaper models for simple tasks
simple_llm = ChatOpenAI(model="gpt-4o-mini")  # Fast & cheap

# Use powerful models for complex tasks
complex_llm = ChatOpenAI(model="gpt-4o")  # Slower but better

2. Enable Caching

from azcore.utils.cached_llm import CachedLLM

# Always wrap LLMs with caching
llm = ChatOpenAI(model="gpt-4o-mini")
cached_llm = CachedLLM(llm, cache_type="exact")

3. Use Async When Possible

# Sequential (slow)
results = [agent.invoke(task) for task in tasks]

# Parallel (fast)
results = await asyncio.gather(*[agent.ainvoke(task) for task in tasks])

4. Optimize Prompts

# Bad: Verbose prompt with many examples
prompt = """You are a helpful assistant. Please be very helpful and answer
questions carefully. Here are some examples:
Example 1: ...
Example 2: ...
[many more examples]
Now answer: What is 2+2?"""

# Good: Concise prompt
prompt = "You are a helpful assistant.\n\nWhat is 2+2?"

5. Monitor and Profile

# Always monitor performance
from arc.monitoring import metrics

with metrics.timer("agent_execution"):
    result = agent.invoke(task)

6. Set Appropriate Timeouts

llm = ChatOpenAI(
    model="gpt-4o-mini",
    request_timeout=30,  # 30 second timeout
    max_retries=3
)

7. Batch When Possible

# Process multiple items in batch
results = llm.batch(prompts)  # Much faster than loop
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs