• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Guides

Scaling Guide

Scale Azcore applications to handle increased load and concurrent users.

Comprehensive guide for scaling Az Core framework applications to handle increased load, from single server to distributed systems.

Overview

This guide covers horizontal and vertical scaling strategies, load balancing, caching, database scaling, and architectural patterns for high-scale deployments.

Scaling Strategies

When to Scale

Indicators you need to scale:

  • Response time > 2s (P95)
  • CPU usage > 70% sustained
  • Memory usage > 80%
  • Error rate > 1%
  • Queue depth growing
  • Customer complaints

Scaling Decision Matrix

┌──────────────────┬────────────────┬───────────────┐
│  Metric          │ Vertical Scale │ Horizontal    │
├──────────────────┼────────────────┼───────────────┤
│ CPU Bound        │ ✓ Initially    │ ✓ Long-term  │
│ Memory Bound     │ ✓ Good         │ ✓ Better     │
│ I/O Bound        │ ✗ Limited      │ ✓ Good       │
│ Network Bound    │ ✗ Limited      │ ✓ Best       │
│ LLM Rate Limits  │ ✗ No help      │ ✓ Best       │
└──────────────────┴────────────────┴───────────────┘

Vertical Scaling

Resource Optimization

# config/resource_config.py
from dataclasses import dataclass

@dataclass
class ResourceConfig:
    """Resource configuration for different scales."""

    # Small (dev/staging)
    SMALL = {
        "workers": 2,
        "cpu_limit": "1",
        "memory_limit": "2Gi",
        "llm_concurrent_requests": 5,
        "cache_size_mb": 500
    }

    # Medium (small production)
    MEDIUM = {
        "workers": 4,
        "cpu_limit": "2",
        "memory_limit": "4Gi",
        "llm_concurrent_requests": 10,
        "cache_size_mb": 2000
    }

    # Large (production)
    LARGE = {
        "workers": 8,
        "cpu_limit": "4",
        "memory_limit": "8Gi",
        "llm_concurrent_requests": 20,
        "cache_size_mb": 4000
    }

    # XLarge (high traffic)
    XLARGE = {
        "workers": 16,
        "cpu_limit": "8",
        "memory_limit": "16Gi",
        "llm_concurrent_requests": 50,
        "cache_size_mb": 8000
    }


def get_config_for_scale(scale: str = "medium") -> dict:
    """Get resource configuration for scale."""
    return getattr(ResourceConfig, scale.upper())


# Apply configuration
config = get_config_for_scale("large")

# Use in application
workers = config["workers"]
memory_limit = config["memory_limit"]

Optimizing Worker Count

# Calculate optimal worker count
import multiprocessing
import os

def calculate_workers():
    """Calculate optimal number of workers."""
    # Get CPU count
    cpu_count = multiprocessing.cpu_count()

    # For I/O bound (LLM calls): 2-4x CPU count
    # For CPU bound: 1x CPU count

    # LLM applications are I/O bound
    workers = cpu_count * 2

    # Cap at reasonable limit
    max_workers = int(os.getenv("MAX_WORKERS", "32"))
    workers = min(workers, max_workers)

    return workers


# Use with Gunicorn
workers = calculate_workers()
# gunicorn -w $workers app:app

Horizontal Scaling

Stateless Architecture

# design/stateless.py

# Bad: Stateful (doesn't scale)
class StatefulAgent:
    def __init__(self):
        self.conversation_history = []  # State in memory

    def invoke(self, message):
        self.conversation_history.append(message)
        # Process with history
        return response


# Good: Stateless (scales horizontally)
class StatelessAgent:
    def __init__(self):
        self.cache = Redis()  # External state

    def invoke(self, session_id, message):
        # Load history from cache
        history = self.cache.get(f"history:{session_id}")

        # Process
        response = self.process(history + [message])

        # Save back to cache
        self.cache.set(f"history:{session_id}", history + [message])

        return response

Session Management

# scaling/session_manager.py
import redis
import json
from typing import List, Dict, Any

class DistributedSessionManager:
    """Manage sessions across multiple servers."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.ttl = 3600  # 1 hour

    def get_session(self, session_id: str) -> Dict[str, Any]:
        """Get session data."""
        data = self.redis.get(f"session:{session_id}")
        if not data:
            return {"messages": []}

        return json.loads(data)

    def save_session(self, session_id: str, data: Dict[str, Any]):
        """Save session data."""
        self.redis.setex(
            f"session:{session_id}",
            self.ttl,
            json.dumps(data)
        )

    def delete_session(self, session_id: str):
        """Delete session."""
        self.redis.delete(f"session:{session_id}")

    def extend_session(self, session_id: str):
        """Extend session TTL."""
        self.redis.expire(f"session:{session_id}", self.ttl)


# Usage
session_manager = DistributedSessionManager()

@app.post("/api/chat")
async def chat(session_id: str, message: str):
    """Chat endpoint with distributed sessions."""

    # Get session
    session = session_manager.get_session(session_id)

    # Add message
    session["messages"].append({"role": "user", "content": message})

    # Process
    result = agent.invoke({"messages": session["messages"]})

    # Update session
    session["messages"].append(result["messages"][-1])
    session_manager.save_session(session_id, session)

    return result

Multi-Region Deployment

# k8s/multi-region.yaml
apiVersion: v1
kind: Service
metadata:
  name: arc-global
  annotations:
    external-dns.alpha.kubernetes.io/hostname: arc.example.com
spec:
  type: LoadBalancer
  selector:
    app: arc
  ports:
  - port: 80
    targetPort: 8000

---
# Deploy to multiple regions
apiVersion: apps/v1
kind: Deployment
metadata:
  name: arc-us-east
  namespace: production
spec:
  replicas: 10
  selector:
    matchLabels:
      app: arc
      region: us-east
  template:
    metadata:
      labels:
        app: arc
        region: us-east
    spec:
      containers:
      - name: arc
        image: arc:latest
        env:
        - name: REGION
          value: "us-east"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: arc-eu-west
  namespace: production
spec:
  replicas: 10
  selector:
    matchLabels:
      app: arc
      region: eu-west
  template:
    metadata:
      labels:
        app: arc
        region: eu-west
    spec:
      containers:
      - name: arc
        image: arc:latest
        env:
        - name: REGION
          value: "eu-west"

Load Balancing

Nginx Load Balancer

# nginx.conf
upstream arc_backend {
    least_conn;  # Route to server with least connections

    server arc-1:8000 weight=1 max_fails=3 fail_timeout=30s;
    server arc-2:8000 weight=1 max_fails=3 fail_timeout=30s;
    server arc-3:8000 weight=1 max_fails=3 fail_timeout=30s;
    server arc-4:8000 weight=1 max_fails=3 fail_timeout=30s;

    # Health check
    check interval=5000 rise=2 fall=3 timeout=1000;
}

server {
    listen 80;
    server_name arc.example.com;

    location / {
        proxy_pass http://arc_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # Connection pooling
        proxy_http_version 1.1;
        proxy_set_header Connection "";

        # Timeouts
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;

        # Buffering
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
    }
}

Session Affinity (Sticky Sessions)

# Sticky sessions based on cookie
upstream arc_backend {
    ip_hash;  # Or use sticky cookie

    server arc-1:8000;
    server arc-2:8000;
    server arc-3:8000;
}

# Or with cookie-based stickiness
upstream arc_backend {
    server arc-1:8000;
    server arc-2:8000;
    server arc-3:8000;

    sticky cookie srv_id expires=1h domain=.example.com path=/;
}

Distributed Caching

Redis Cluster

# scaling/redis_cluster.py
from redis.cluster import RedisCluster

class DistributedCache:
    """Distributed cache using Redis Cluster."""

    def __init__(self, nodes: List[dict]):
        self.client = RedisCluster(
            startup_nodes=nodes,
            decode_responses=True,
            skip_full_coverage_check=True
        )

    def get(self, key: str) -> Optional[str]:
        """Get value from cache."""
        return self.client.get(key)

    def set(self, key: str, value: str, ttl: int = 3600):
        """Set value in cache."""
        self.client.setex(key, ttl, value)

    def delete(self, key: str):
        """Delete key from cache."""
        self.client.delete(key)

    def get_batch(self, keys: List[str]) -> List[Optional[str]]:
        """Get multiple values (pipelined for efficiency)."""
        pipeline = self.client.pipeline()
        for key in keys:
            pipeline.get(key)
        return pipeline.execute()


# Initialize with cluster nodes
cache = DistributedCache(
    nodes=[
        {"host": "redis-1", "port": 6379},
        {"host": "redis-2", "port": 6379},
        {"host": "redis-3", "port": 6379},
    ]
)

# Use for LLM response caching
def cached_llm_call(prompt: str) -> str:
    """LLM call with distributed caching."""
    cache_key = f"llm:{hashlib.sha256(prompt.encode()).hexdigest()}"

    # Try cache
    cached = cache.get(cache_key)
    if cached:
        return cached

    # Call LLM
    response = llm.invoke(prompt)

    # Cache response
    cache.set(cache_key, response, ttl=3600)

    return response

Database Scaling

Read Replicas

# scaling/db_replicas.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class DatabaseRouter:
    """Route queries to master or replicas."""

    def __init__(self, master_url: str, replica_urls: List[str]):
        # Master for writes
        self.master_engine = create_engine(master_url)
        self.MasterSession = sessionmaker(bind=self.master_engine)

        # Replicas for reads (round-robin)
        self.replica_engines = [
            create_engine(url) for url in replica_urls
        ]
        self.ReplicaSessions = [
            sessionmaker(bind=engine) for engine in self.replica_engines
        ]
        self.current_replica = 0

    def get_write_session(self):
        """Get session for writes (master)."""
        return self.MasterSession()

    def get_read_session(self):
        """Get session for reads (replica, round-robin)."""
        session = self.ReplicaSessions[self.current_replica]()
        self.current_replica = (self.current_replica + 1) % len(self.ReplicaSessions)
        return session


# Usage
db = DatabaseRouter(
    master_url="postgresql://master:5432/arc",
    replica_urls=[
        "postgresql://replica1:5432/arc",
        "postgresql://replica2:5432/arc",
    ]
)

# Read operations use replicas
read_session = db.get_read_session()
users = read_session.query(User).all()

# Write operations use master
write_session = db.get_write_session()
write_session.add(new_user)
write_session.commit()

Connection Pooling

# Efficient connection pooling
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "postgresql://user:pass@host/db",
    poolclass=QueuePool,
    pool_size=20,          # Number of connections to maintain
    max_overflow=10,        # Additional connections under load
    pool_timeout=30,        # Timeout waiting for connection
    pool_recycle=3600,      # Recycle connections after 1 hour
    pool_pre_ping=True      # Verify connections before use
)

Async Processing

Task Queue

# scaling/task_queue.py
from celery import Celery
from typing import Dict, Any

# Initialize Celery
celery_app = Celery(
    "arc_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task
def process_agent_task(task_data: Dict[str, Any]) -> Dict[str, Any]:
    """Process agent task asynchronously."""
    from azcore.agents import ReactAgent

    agent = ReactAgent(...)

    result = agent.invoke({
        "messages": [{"role": "user", "content": task_data["query"]}]
    })

    return result


# Usage in API
@app.post("/api/agent/async")
async def agent_async(task: str):
    """Submit agent task to queue."""

    # Submit to queue
    task_result = process_agent_task.delay({"query": task})

    return {
        "task_id": task_result.id,
        "status": "processing"
    }

@app.get("/api/task/{task_id}")
async def get_task_result(task_id: str):
    """Get task result."""
    result = celery_app.AsyncResult(task_id)

    if result.ready():
        return {
            "status": "completed",
            "result": result.get()
        }
    else:
        return {
            "status": "processing"
        }

Background Workers

# scaling/background_worker.py
import asyncio
from typing import Optional
from queue import Queue
import threading

class BackgroundWorker:
    """Background worker for processing tasks."""

    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers
        self.task_queue = Queue()
        self.workers = []
        self.running = False

    def start(self):
        """Start worker threads."""
        self.running = True

        for i in range(self.num_workers):
            worker = threading.Thread(
                target=self._worker_loop,
                name=f"worker-{i}",
                daemon=True
            )
            worker.start()
            self.workers.append(worker)

    def stop(self):
        """Stop all workers."""
        self.running = False

    def submit_task(self, task_id: str, task_data: dict):
        """Submit task to queue."""
        self.task_queue.put((task_id, task_data))

    def _worker_loop(self):
        """Worker loop - processes tasks from queue."""
        while self.running:
            try:
                # Get task (timeout so we can check self.running)
                task_id, task_data = self.task_queue.get(timeout=1)

                # Process task
                result = self._process_task(task_data)

                # Store result
                self._store_result(task_id, result)

            except Empty:
                continue
            except Exception as e:
                logger.error(f"Worker error: {e}")

    def _process_task(self, task_data: dict) -> dict:
        """Process individual task."""
        # Implement task processing
        pass

    def _store_result(self, task_id: str, result: dict):
        """Store task result."""
        # Store in Redis or database
        pass


# Usage
worker_pool = BackgroundWorker(num_workers=10)
worker_pool.start()

# Submit tasks
worker_pool.submit_task("task_123", {"query": "..."})

Auto-Scaling

Kubernetes HPA

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: arc-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: arc-deployment
  minReplicas: 4
  maxReplicas: 50
  metrics:
  # CPU-based scaling
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

  # Memory-based scaling
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

  # Custom metric: Request rate
  - type: Pods
    pods:
      metric:
        name: http_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"

  # Custom metric: Queue depth
  - type: External
    external:
      metric:
        name: queue_depth
      target:
        type: AverageValue
        averageValue: "30"

  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 5
        periodSeconds: 60
      selectPolicy: Max

    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
      selectPolicy: Min

AWS Auto Scaling

# aws/autoscaling.py
import boto3

def setup_autoscaling(
    cluster_name: str,
    service_name: str,
    min_capacity: int = 4,
    max_capacity: int = 50
):
    """Setup ECS auto scaling."""
    client = boto3.client('application-autoscaling')

    # Register scalable target
    client.register_scalable_target(
        ServiceNamespace='ecs',
        ResourceId=f'service/{cluster_name}/{service_name}',
        ScalableDimension='ecs:service:DesiredCount',
        MinCapacity=min_capacity,
        MaxCapacity=max_capacity
    )

    # CPU-based scaling policy
    client.put_scaling_policy(
        PolicyName=f'{service_name}-cpu-scaling',
        ServiceNamespace='ecs',
        ResourceId=f'service/{cluster_name}/{service_name}',
        ScalableDimension='ecs:service:DesiredCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': 70.0,
            'PredefinedMetricSpecification': {
                'PredefinedMetricType': 'ECSServiceAverageCPUUtilization'
            },
            'ScaleInCooldown': 300,
            'ScaleOutCooldown': 60
        }
    )

    # Request count scaling policy
    client.put_scaling_policy(
        PolicyName=f'{service_name}-request-scaling',
        ServiceNamespace='ecs',
        ResourceId=f'service/{cluster_name}/{service_name}',
        ScalableDimension='ecs:service:DesiredCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': 1000.0,  # 1000 requests/target
            'PredefinedMetricSpecification': {
                'PredefinedMetricType': 'ALBRequestCountPerTarget',
                'ResourceLabel': f'app/{cluster_name}/{service_name}'
            }
        }
    )

Monitoring at Scale

Distributed Tracing

# monitoring/distributed_tracing.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def setup_distributed_tracing(service_name: str):
    """Setup distributed tracing for scaled system."""

    # Create tracer provider
    tracer_provider = TracerProvider(
        resource=Resource.create({
            "service.name": service_name,
            "deployment.environment": "production"
        })
    )

    # Add Jaeger exporter
    jaeger_exporter = JaegerExporter(
        agent_host_name="jaeger-agent",
        agent_port=6831,
    )

    tracer_provider.add_span_processor(
        BatchSpanProcessor(jaeger_exporter)
    )

    trace.set_tracer_provider(tracer_provider)


# Use in requests
tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("process_request") as span:
    span.set_attribute("user_id", user_id)
    span.set_attribute("request_id", request_id)

    result = agent.invoke(task)

Cost Optimization

Cost-Aware Scaling

# scaling/cost_optimization.py

class CostOptimizer:
    """Optimize costs while scaling."""

    def __init__(self):
        self.hourly_budget = 100.0  # $100/hour
        self.current_spend = 0.0

    def should_scale_up(
        self,
        current_instances: int,
        cpu_usage: float,
        cost_per_instance: float
    ) -> bool:
        """Decide if should scale up based on cost."""

        # Calculate additional cost
        additional_cost = cost_per_instance

        # Check budget
        projected_spend = self.current_spend + additional_cost

        if projected_spend > self.hourly_budget:
            logger.warning("Budget exceeded, not scaling up")
            return False

        # Scale if CPU high
        return cpu_usage > 70

    def optimize_instance_types(self, workload_type: str) -> str:
        """Choose optimal instance type for workload."""

        if workload_type == "cpu_intensive":
            return "c5.xlarge"  # CPU optimized
        elif workload_type == "memory_intensive":
            return "r5.xlarge"  # Memory optimized
        else:
            return "t3.xlarge"  # General purpose


# Usage
optimizer = CostOptimizer()

if optimizer.should_scale_up(current_instances=10, cpu_usage=75, cost_per_instance=0.20):
    scale_up()

Best Practices

1. Design for Horizontal Scaling

# Good: Stateless, scales horizontally
def process_request(session_id, request):
    # Load state from external store
    state = redis.get(f"session:{session_id}")
    # Process
    result = process(state, request)
    # Save state
    redis.set(f"session:{session_id}", result)
    return result

2. Use Connection Pooling

# Reuse connections
http_client = httpx.Client(
    limits=httpx.Limits(max_connections=100)
)

3. Implement Circuit Breakers

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_service():
    # Fails fast if service is down
    pass

4. Cache Aggressively

# Cache expensive operations
@cached(ttl=3600)
def expensive_operation():
    pass

5. Monitor Everything

# Track all metrics
metrics.record("requests", 1)
metrics.record("latency", duration)
metrics.record("errors", error_count)
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs