Comprehensive guide for scaling Az Core framework applications to handle increased load, from single server to distributed systems.
Overview
This guide covers horizontal and vertical scaling strategies, load balancing, caching, database scaling, and architectural patterns for high-scale deployments.
Scaling Strategies
When to Scale
Indicators you need to scale:
- Response time > 2s (P95)
- CPU usage > 70% sustained
- Memory usage > 80%
- Error rate > 1%
- Queue depth growing
- Customer complaints
Scaling Decision Matrix
┌──────────────────┬────────────────┬───────────────┐
│ Metric │ Vertical Scale │ Horizontal │
├──────────────────┼────────────────┼───────────────┤
│ CPU Bound │ ✓ Initially │ ✓ Long-term │
│ Memory Bound │ ✓ Good │ ✓ Better │
│ I/O Bound │ ✗ Limited │ ✓ Good │
│ Network Bound │ ✗ Limited │ ✓ Best │
│ LLM Rate Limits │ ✗ No help │ ✓ Best │
└──────────────────┴────────────────┴───────────────┘
Vertical Scaling
Resource Optimization
# config/resource_config.py
from dataclasses import dataclass
@dataclass
class ResourceConfig:
"""Resource configuration for different scales."""
# Small (dev/staging)
SMALL = {
"workers": 2,
"cpu_limit": "1",
"memory_limit": "2Gi",
"llm_concurrent_requests": 5,
"cache_size_mb": 500
}
# Medium (small production)
MEDIUM = {
"workers": 4,
"cpu_limit": "2",
"memory_limit": "4Gi",
"llm_concurrent_requests": 10,
"cache_size_mb": 2000
}
# Large (production)
LARGE = {
"workers": 8,
"cpu_limit": "4",
"memory_limit": "8Gi",
"llm_concurrent_requests": 20,
"cache_size_mb": 4000
}
# XLarge (high traffic)
XLARGE = {
"workers": 16,
"cpu_limit": "8",
"memory_limit": "16Gi",
"llm_concurrent_requests": 50,
"cache_size_mb": 8000
}
def get_config_for_scale(scale: str = "medium") -> dict:
"""Get resource configuration for scale."""
return getattr(ResourceConfig, scale.upper())
# Apply configuration
config = get_config_for_scale("large")
# Use in application
workers = config["workers"]
memory_limit = config["memory_limit"]
Optimizing Worker Count
# Calculate optimal worker count
import multiprocessing
import os
def calculate_workers():
"""Calculate optimal number of workers."""
# Get CPU count
cpu_count = multiprocessing.cpu_count()
# For I/O bound (LLM calls): 2-4x CPU count
# For CPU bound: 1x CPU count
# LLM applications are I/O bound
workers = cpu_count * 2
# Cap at reasonable limit
max_workers = int(os.getenv("MAX_WORKERS", "32"))
workers = min(workers, max_workers)
return workers
# Use with Gunicorn
workers = calculate_workers()
# gunicorn -w $workers app:app
Horizontal Scaling
Stateless Architecture
# design/stateless.py
# Bad: Stateful (doesn't scale)
class StatefulAgent:
def __init__(self):
self.conversation_history = [] # State in memory
def invoke(self, message):
self.conversation_history.append(message)
# Process with history
return response
# Good: Stateless (scales horizontally)
class StatelessAgent:
def __init__(self):
self.cache = Redis() # External state
def invoke(self, session_id, message):
# Load history from cache
history = self.cache.get(f"history:{session_id}")
# Process
response = self.process(history + [message])
# Save back to cache
self.cache.set(f"history:{session_id}", history + [message])
return response
Session Management
# scaling/session_manager.py
import redis
import json
from typing import List, Dict, Any
class DistributedSessionManager:
"""Manage sessions across multiple servers."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 hour
def get_session(self, session_id: str) -> Dict[str, Any]:
"""Get session data."""
data = self.redis.get(f"session:{session_id}")
if not data:
return {"messages": []}
return json.loads(data)
def save_session(self, session_id: str, data: Dict[str, Any]):
"""Save session data."""
self.redis.setex(
f"session:{session_id}",
self.ttl,
json.dumps(data)
)
def delete_session(self, session_id: str):
"""Delete session."""
self.redis.delete(f"session:{session_id}")
def extend_session(self, session_id: str):
"""Extend session TTL."""
self.redis.expire(f"session:{session_id}", self.ttl)
# Usage
session_manager = DistributedSessionManager()
@app.post("/api/chat")
async def chat(session_id: str, message: str):
"""Chat endpoint with distributed sessions."""
# Get session
session = session_manager.get_session(session_id)
# Add message
session["messages"].append({"role": "user", "content": message})
# Process
result = agent.invoke({"messages": session["messages"]})
# Update session
session["messages"].append(result["messages"][-1])
session_manager.save_session(session_id, session)
return result
Multi-Region Deployment
# k8s/multi-region.yaml
apiVersion: v1
kind: Service
metadata:
name: arc-global
annotations:
external-dns.alpha.kubernetes.io/hostname: arc.example.com
spec:
type: LoadBalancer
selector:
app: arc
ports:
- port: 80
targetPort: 8000
---
# Deploy to multiple regions
apiVersion: apps/v1
kind: Deployment
metadata:
name: arc-us-east
namespace: production
spec:
replicas: 10
selector:
matchLabels:
app: arc
region: us-east
template:
metadata:
labels:
app: arc
region: us-east
spec:
containers:
- name: arc
image: arc:latest
env:
- name: REGION
value: "us-east"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: arc-eu-west
namespace: production
spec:
replicas: 10
selector:
matchLabels:
app: arc
region: eu-west
template:
metadata:
labels:
app: arc
region: eu-west
spec:
containers:
- name: arc
image: arc:latest
env:
- name: REGION
value: "eu-west"
Load Balancing
Nginx Load Balancer
# nginx.conf
upstream arc_backend {
least_conn; # Route to server with least connections
server arc-1:8000 weight=1 max_fails=3 fail_timeout=30s;
server arc-2:8000 weight=1 max_fails=3 fail_timeout=30s;
server arc-3:8000 weight=1 max_fails=3 fail_timeout=30s;
server arc-4:8000 weight=1 max_fails=3 fail_timeout=30s;
# Health check
check interval=5000 rise=2 fall=3 timeout=1000;
}
server {
listen 80;
server_name arc.example.com;
location / {
proxy_pass http://arc_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Connection pooling
proxy_http_version 1.1;
proxy_set_header Connection "";
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Buffering
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
}
Session Affinity (Sticky Sessions)
# Sticky sessions based on cookie
upstream arc_backend {
ip_hash; # Or use sticky cookie
server arc-1:8000;
server arc-2:8000;
server arc-3:8000;
}
# Or with cookie-based stickiness
upstream arc_backend {
server arc-1:8000;
server arc-2:8000;
server arc-3:8000;
sticky cookie srv_id expires=1h domain=.example.com path=/;
}
Distributed Caching
Redis Cluster
# scaling/redis_cluster.py
from redis.cluster import RedisCluster
class DistributedCache:
"""Distributed cache using Redis Cluster."""
def __init__(self, nodes: List[dict]):
self.client = RedisCluster(
startup_nodes=nodes,
decode_responses=True,
skip_full_coverage_check=True
)
def get(self, key: str) -> Optional[str]:
"""Get value from cache."""
return self.client.get(key)
def set(self, key: str, value: str, ttl: int = 3600):
"""Set value in cache."""
self.client.setex(key, ttl, value)
def delete(self, key: str):
"""Delete key from cache."""
self.client.delete(key)
def get_batch(self, keys: List[str]) -> List[Optional[str]]:
"""Get multiple values (pipelined for efficiency)."""
pipeline = self.client.pipeline()
for key in keys:
pipeline.get(key)
return pipeline.execute()
# Initialize with cluster nodes
cache = DistributedCache(
nodes=[
{"host": "redis-1", "port": 6379},
{"host": "redis-2", "port": 6379},
{"host": "redis-3", "port": 6379},
]
)
# Use for LLM response caching
def cached_llm_call(prompt: str) -> str:
"""LLM call with distributed caching."""
cache_key = f"llm:{hashlib.sha256(prompt.encode()).hexdigest()}"
# Try cache
cached = cache.get(cache_key)
if cached:
return cached
# Call LLM
response = llm.invoke(prompt)
# Cache response
cache.set(cache_key, response, ttl=3600)
return response
Database Scaling
Read Replicas
# scaling/db_replicas.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class DatabaseRouter:
"""Route queries to master or replicas."""
def __init__(self, master_url: str, replica_urls: List[str]):
# Master for writes
self.master_engine = create_engine(master_url)
self.MasterSession = sessionmaker(bind=self.master_engine)
# Replicas for reads (round-robin)
self.replica_engines = [
create_engine(url) for url in replica_urls
]
self.ReplicaSessions = [
sessionmaker(bind=engine) for engine in self.replica_engines
]
self.current_replica = 0
def get_write_session(self):
"""Get session for writes (master)."""
return self.MasterSession()
def get_read_session(self):
"""Get session for reads (replica, round-robin)."""
session = self.ReplicaSessions[self.current_replica]()
self.current_replica = (self.current_replica + 1) % len(self.ReplicaSessions)
return session
# Usage
db = DatabaseRouter(
master_url="postgresql://master:5432/arc",
replica_urls=[
"postgresql://replica1:5432/arc",
"postgresql://replica2:5432/arc",
]
)
# Read operations use replicas
read_session = db.get_read_session()
users = read_session.query(User).all()
# Write operations use master
write_session = db.get_write_session()
write_session.add(new_user)
write_session.commit()
Connection Pooling
# Efficient connection pooling
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://user:pass@host/db",
poolclass=QueuePool,
pool_size=20, # Number of connections to maintain
max_overflow=10, # Additional connections under load
pool_timeout=30, # Timeout waiting for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Verify connections before use
)
Async Processing
Task Queue
# scaling/task_queue.py
from celery import Celery
from typing import Dict, Any
# Initialize Celery
celery_app = Celery(
"arc_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_agent_task(task_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process agent task asynchronously."""
from azcore.agents import ReactAgent
agent = ReactAgent(...)
result = agent.invoke({
"messages": [{"role": "user", "content": task_data["query"]}]
})
return result
# Usage in API
@app.post("/api/agent/async")
async def agent_async(task: str):
"""Submit agent task to queue."""
# Submit to queue
task_result = process_agent_task.delay({"query": task})
return {
"task_id": task_result.id,
"status": "processing"
}
@app.get("/api/task/{task_id}")
async def get_task_result(task_id: str):
"""Get task result."""
result = celery_app.AsyncResult(task_id)
if result.ready():
return {
"status": "completed",
"result": result.get()
}
else:
return {
"status": "processing"
}
Background Workers
# scaling/background_worker.py
import asyncio
from typing import Optional
from queue import Queue
import threading
class BackgroundWorker:
"""Background worker for processing tasks."""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.task_queue = Queue()
self.workers = []
self.running = False
def start(self):
"""Start worker threads."""
self.running = True
for i in range(self.num_workers):
worker = threading.Thread(
target=self._worker_loop,
name=f"worker-{i}",
daemon=True
)
worker.start()
self.workers.append(worker)
def stop(self):
"""Stop all workers."""
self.running = False
def submit_task(self, task_id: str, task_data: dict):
"""Submit task to queue."""
self.task_queue.put((task_id, task_data))
def _worker_loop(self):
"""Worker loop - processes tasks from queue."""
while self.running:
try:
# Get task (timeout so we can check self.running)
task_id, task_data = self.task_queue.get(timeout=1)
# Process task
result = self._process_task(task_data)
# Store result
self._store_result(task_id, result)
except Empty:
continue
except Exception as e:
logger.error(f"Worker error: {e}")
def _process_task(self, task_data: dict) -> dict:
"""Process individual task."""
# Implement task processing
pass
def _store_result(self, task_id: str, result: dict):
"""Store task result."""
# Store in Redis or database
pass
# Usage
worker_pool = BackgroundWorker(num_workers=10)
worker_pool.start()
# Submit tasks
worker_pool.submit_task("task_123", {"query": "..."})
Auto-Scaling
Kubernetes HPA
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: arc-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: arc-deployment
minReplicas: 4
maxReplicas: 50
metrics:
# CPU-based scaling
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# Memory-based scaling
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# Custom metric: Request rate
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
# Custom metric: Queue depth
- type: External
external:
metric:
name: queue_depth
target:
type: AverageValue
averageValue: "30"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 5
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
selectPolicy: Min
AWS Auto Scaling
# aws/autoscaling.py
import boto3
def setup_autoscaling(
cluster_name: str,
service_name: str,
min_capacity: int = 4,
max_capacity: int = 50
):
"""Setup ECS auto scaling."""
client = boto3.client('application-autoscaling')
# Register scalable target
client.register_scalable_target(
ServiceNamespace='ecs',
ResourceId=f'service/{cluster_name}/{service_name}',
ScalableDimension='ecs:service:DesiredCount',
MinCapacity=min_capacity,
MaxCapacity=max_capacity
)
# CPU-based scaling policy
client.put_scaling_policy(
PolicyName=f'{service_name}-cpu-scaling',
ServiceNamespace='ecs',
ResourceId=f'service/{cluster_name}/{service_name}',
ScalableDimension='ecs:service:DesiredCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 70.0,
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'ECSServiceAverageCPUUtilization'
},
'ScaleInCooldown': 300,
'ScaleOutCooldown': 60
}
)
# Request count scaling policy
client.put_scaling_policy(
PolicyName=f'{service_name}-request-scaling',
ServiceNamespace='ecs',
ResourceId=f'service/{cluster_name}/{service_name}',
ScalableDimension='ecs:service:DesiredCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 1000.0, # 1000 requests/target
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'ALBRequestCountPerTarget',
'ResourceLabel': f'app/{cluster_name}/{service_name}'
}
}
)
Monitoring at Scale
Distributed Tracing
# monitoring/distributed_tracing.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_distributed_tracing(service_name: str):
"""Setup distributed tracing for scaled system."""
# Create tracer provider
tracer_provider = TracerProvider(
resource=Resource.create({
"service.name": service_name,
"deployment.environment": "production"
})
)
# Add Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
tracer_provider.add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
trace.set_tracer_provider(tracer_provider)
# Use in requests
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("process_request") as span:
span.set_attribute("user_id", user_id)
span.set_attribute("request_id", request_id)
result = agent.invoke(task)
Cost Optimization
Cost-Aware Scaling
# scaling/cost_optimization.py
class CostOptimizer:
"""Optimize costs while scaling."""
def __init__(self):
self.hourly_budget = 100.0 # $100/hour
self.current_spend = 0.0
def should_scale_up(
self,
current_instances: int,
cpu_usage: float,
cost_per_instance: float
) -> bool:
"""Decide if should scale up based on cost."""
# Calculate additional cost
additional_cost = cost_per_instance
# Check budget
projected_spend = self.current_spend + additional_cost
if projected_spend > self.hourly_budget:
logger.warning("Budget exceeded, not scaling up")
return False
# Scale if CPU high
return cpu_usage > 70
def optimize_instance_types(self, workload_type: str) -> str:
"""Choose optimal instance type for workload."""
if workload_type == "cpu_intensive":
return "c5.xlarge" # CPU optimized
elif workload_type == "memory_intensive":
return "r5.xlarge" # Memory optimized
else:
return "t3.xlarge" # General purpose
# Usage
optimizer = CostOptimizer()
if optimizer.should_scale_up(current_instances=10, cpu_usage=75, cost_per_instance=0.20):
scale_up()
Best Practices
1. Design for Horizontal Scaling
# Good: Stateless, scales horizontally
def process_request(session_id, request):
# Load state from external store
state = redis.get(f"session:{session_id}")
# Process
result = process(state, request)
# Save state
redis.set(f"session:{session_id}", result)
return result
2. Use Connection Pooling
# Reuse connections
http_client = httpx.Client(
limits=httpx.Limits(max_connections=100)
)
3. Implement Circuit Breakers
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_service():
# Fails fast if service is down
pass
4. Cache Aggressively
# Cache expensive operations
@cached(ttl=3600)
def expensive_operation():
pass
5. Monitor Everything
# Track all metrics
metrics.record("requests", 1)
metrics.record("latency", duration)
metrics.record("errors", error_count)