Complete guide for deploying Az Core framework applications to production environments.
Overview
This guide covers everything you need to deploy Arc-based agent systems to production, including infrastructure setup, deployment strategies, health checks, rollback procedures, and production best practices.
Pre-Deployment Checklist
Code Quality
- All tests passing (unit, integration, end-to-end)
- Code review completed
- Security audit passed
- Performance benchmarks meet requirements
- Documentation updated
- Changelog updated
Configuration
- Environment variables configured
- API keys securely stored
- Database connections tested
- LLM provider credentials validated
- Rate limits configured
- Timeout values set appropriately
Monitoring
- Logging configured
- Metrics collection enabled
- Alerting rules defined
- Dashboard created
- On-call rotation established
Dependencies
- All dependencies pinned to specific versions
- Security vulnerabilities scanned
- License compliance verified
- External service availability confirmed
Infrastructure Setup
Minimum Requirements
CPU & Memory
# Development
cpu: 2 cores
memory: 4GB
storage: 20GB
# Production (Small)
cpu: 4 cores
memory: 8GB
storage: 50GB
# Production (Medium)
cpu: 8 cores
memory: 16GB
storage: 100GB
# Production (Large)
cpu: 16+ cores
memory: 32GB+
storage: 200GB+
Network
- Stable internet connection for LLM API calls
- Low latency to LLM provider endpoints
- Bandwidth: 100Mbps minimum
- Load balancer for high availability
Cloud Providers
AWS Setup
# Install AWS CLI
pip install awscli
# Configure credentials
aws configure
# Create EC2 instance
aws ec2 run-instances \
--image-id ami-0c55b159cbfafe1f0 \
--instance-type t3.xlarge \
--key-name arc-production \
--security-group-ids sg-xxx \
--subnet-id subnet-xxx \
--tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=arc-production}]'
# Create RDS database (optional)
aws rds create-db-instance \
--db-instance-identifier arc-db \
--db-instance-class db.t3.medium \
--engine postgres \
--master-username admin \
--master-user-password <password> \
--allocated-storage 100
GCP Setup
# Install gcloud CLI
curl https://sdk.cloud.google.com | bash
# Initialize
gcloud init
# Create Compute Engine instance
gcloud compute instances create arc-production \
--machine-type=n1-standard-4 \
--zone=us-central1-a \
--image-family=ubuntu-2004-lts \
--image-project=ubuntu-os-cloud \
--boot-disk-size=100GB
# Create Cloud SQL instance (optional)
gcloud sql instances create arc-db \
--database-version=POSTGRES_13 \
--tier=db-n1-standard-2 \
--region=us-central1
Azure Setup
# Install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
# Login
az login
# Create resource group
az group create --name arc-production --location eastus
# Create VM
az vm create \
--resource-group arc-production \
--name arc-vm \
--image UbuntuLTS \
--size Standard_D4s_v3 \
--admin-username azureuser \
--generate-ssh-keys
# Create Azure Database for PostgreSQL (optional)
az postgres server create \
--resource-group arc-production \
--name arc-db \
--location eastus \
--admin-user admin \
--admin-password <password> \
--sku-name GP_Gen5_2
Deployment Strategies
1. Blue-Green Deployment
Deploy new version alongside old version, then switch traffic.
# deployment_manager.py
from typing import Dict, Any
import time
import requests
class BlueGreenDeployment:
def __init__(
self,
blue_url: str,
green_url: str,
load_balancer_url: str
):
self.blue_url = blue_url
self.green_url = green_url
self.load_balancer_url = load_balancer_url
self.current_env = "blue"
def deploy(self, new_version: str) -> bool:
"""Deploy new version to inactive environment."""
target_env = "green" if self.current_env == "blue" else "blue"
target_url = self.green_url if target_env == "green" else self.blue_url
print(f"Deploying version {new_version} to {target_env}...")
# Deploy to inactive environment
success = self._deploy_to_env(target_url, new_version)
if not success:
print("Deployment failed")
return False
# Health check
if not self._health_check(target_url):
print("Health check failed")
return False
# Smoke tests
if not self._run_smoke_tests(target_url):
print("Smoke tests failed")
return False
# Switch traffic
self._switch_traffic(target_env)
self.current_env = target_env
print(f"Successfully deployed to {target_env}")
return True
def rollback(self) -> bool:
"""Rollback to previous environment."""
previous_env = "green" if self.current_env == "blue" else "blue"
self._switch_traffic(previous_env)
self.current_env = previous_env
return True
def _deploy_to_env(self, url: str, version: str) -> bool:
"""Deploy application to environment."""
# Implementation depends on deployment method
# (Docker, Kubernetes, direct deployment, etc.)
return True
def _health_check(self, url: str, timeout: int = 300) -> bool:
"""Check if service is healthy."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{url}/health", timeout=5)
if response.status_code == 200:
return True
except:
pass
time.sleep(5)
return False
def _run_smoke_tests(self, url: str) -> bool:
"""Run basic smoke tests."""
tests = [
("GET", f"{url}/health"),
("POST", f"{url}/api/v1/agents", {"task": "test"}),
]
for method, endpoint, *data in tests:
try:
if method == "GET":
response = requests.get(endpoint)
else:
response = requests.post(endpoint, json=data[0] if data else None)
if response.status_code >= 500:
return False
except:
return False
return True
def _switch_traffic(self, target_env: str) -> None:
"""Switch load balancer traffic to target environment."""
# Update load balancer configuration
# Implementation depends on load balancer type
pass
Usage:
deployer = BlueGreenDeployment(
blue_url="http://arc-blue.example.com",
green_url="http://arc-green.example.com",
load_balancer_url="http://arc.example.com"
)
# Deploy new version
success = deployer.deploy("v2.0.0")
# Rollback if needed
if not success:
deployer.rollback()
2. Canary Deployment
Gradually roll out new version to subset of users.
# canary_deployment.py
import time
from typing import List
class CanaryDeployment:
def __init__(self, stages: List[int] = [10, 25, 50, 100]):
"""
Initialize canary deployment.
Args:
stages: List of traffic percentages for each stage
"""
self.stages = stages
self.current_stage = 0
def deploy(self, new_version: str) -> bool:
"""Deploy using canary strategy."""
print(f"Starting canary deployment for {new_version}")
for stage_pct in self.stages:
print(f"\nStage: Routing {stage_pct}% traffic to new version")
# Update traffic split
self._update_traffic_split(stage_pct)
# Monitor for issues
wait_time = 300 # 5 minutes per stage
print(f"Monitoring for {wait_time}s...")
if not self._monitor_health(wait_time):
print("Issues detected! Rolling back...")
self._update_traffic_split(0)
return False
self.current_stage += 1
print("\nCanary deployment successful!")
return True
def _update_traffic_split(self, percentage: int) -> None:
"""Update traffic split percentage."""
# Update load balancer or service mesh configuration
# Example for Kubernetes with Istio:
# kubectl apply -f virtual-service.yaml
pass
def _monitor_health(self, duration: int) -> bool:
"""Monitor health metrics during canary stage."""
start_time = time.time()
while time.time() - start_time < duration:
metrics = self._get_metrics()
# Check error rate
if metrics.get("error_rate", 0) > 0.05: # 5% threshold
return False
# Check latency
if metrics.get("p99_latency", 0) > 5000: # 5s threshold
return False
# Check availability
if metrics.get("availability", 1.0) < 0.99: # 99% threshold
return False
time.sleep(10)
return True
def _get_metrics(self) -> dict:
"""Get current metrics from monitoring system."""
# Fetch from Prometheus, CloudWatch, etc.
return {
"error_rate": 0.01,
"p99_latency": 1500,
"availability": 0.999
}
3. Rolling Deployment
Update instances one by one.
#!/bin/bash
# rolling_deploy.sh
INSTANCES=("instance1" "instance2" "instance3" "instance4")
VERSION="$1"
for instance in "${INSTANCES[@]}"; do
echo "Updating $instance to version $VERSION..."
# Remove from load balancer
aws elb deregister-instances-from-load-balancer \
--load-balancer-name arc-lb \
--instances "$instance"
# Wait for connections to drain
sleep 30
# Deploy new version
ssh "$instance" "cd /app && git pull && git checkout $VERSION && sudo systemctl restart arc"
# Health check
for i in {1..30}; do
if curl -f "http://$instance:8000/health"; then
break
fi
sleep 10
done
# Add back to load balancer
aws elb register-instances-with-load-balancer \
--load-balancer-name arc-lb \
--instances "$instance"
echo "$instance updated successfully"
sleep 60 # Wait before next instance
done
echo "Rolling deployment complete"
Environment Configuration
Environment Variables
# .env.production
# Application
APP_NAME=arc-production
APP_ENV=production
APP_DEBUG=false
LOG_LEVEL=INFO
# Server
HOST=0.0.0.0
PORT=8000
WORKERS=4
# LLM Configuration
OPENAI_API_KEY=sk-xxx
ANTHROPIC_API_KEY=sk-ant-xxx
LLM_MODEL=gpt-4o-mini
LLM_TEMPERATURE=0.5
LLM_MAX_TOKENS=4096
LLM_TIMEOUT=60
# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=60
# Caching
CACHE_ENABLED=true
CACHE_TTL=3600
CACHE_MAX_SIZE=1000
# Database (if using)
DATABASE_URL=postgresql://user:pass@host:5432/arc
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10
# Monitoring
SENTRY_DSN=https://xxx@sentry.io/xxx
DATADOG_API_KEY=xxx
PROMETHEUS_PORT=9090
# Security
CORS_ORIGINS=https://app.example.com
API_KEY_REQUIRED=true
JWT_SECRET=xxx
Configuration Management
# config.py
from pydantic import BaseSettings, Field
from typing import Optional
class ProductionConfig(BaseSettings):
"""Production configuration with validation."""
# Application
app_name: str = Field(..., env="APP_NAME")
app_env: str = Field("production", env="APP_ENV")
debug: bool = Field(False, env="APP_DEBUG")
log_level: str = Field("INFO", env="LOG_LEVEL")
# Server
host: str = Field("0.0.0.0", env="HOST")
port: int = Field(8000, env="PORT")
workers: int = Field(4, env="WORKERS")
# LLM
openai_api_key: str = Field(..., env="OPENAI_API_KEY")
llm_model: str = Field("gpt-4o-mini", env="LLM_MODEL")
llm_temperature: float = Field(0.5, env="LLM_TEMPERATURE")
llm_max_tokens: int = Field(4096, env="LLM_MAX_TOKENS")
llm_timeout: int = Field(60, env="LLM_TIMEOUT")
# Rate Limiting
rate_limit_enabled: bool = Field(True, env="RATE_LIMIT_ENABLED")
rate_limit_requests: int = Field(100, env="RATE_LIMIT_REQUESTS")
rate_limit_window: int = Field(60, env="RATE_LIMIT_WINDOW")
# Caching
cache_enabled: bool = Field(True, env="CACHE_ENABLED")
cache_ttl: int = Field(3600, env="CACHE_TTL")
cache_max_size: int = Field(1000, env="CACHE_MAX_SIZE")
# Database
database_url: Optional[str] = Field(None, env="DATABASE_URL")
database_pool_size: int = Field(20, env="DATABASE_POOL_SIZE")
# Monitoring
sentry_dsn: Optional[str] = Field(None, env="SENTRY_DSN")
datadog_api_key: Optional[str] = Field(None, env="DATADOG_API_KEY")
class Config:
env_file = ".env.production"
case_sensitive = False
# Load configuration
config = ProductionConfig()
Container Deployment
Dockerfile
# Dockerfile
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Install Arc
RUN pip install -e .
# Create non-root user
RUN useradd -m -u 1000 arc && chown -R arc:arc /app
USER arc
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Docker Compose
# docker-compose.prod.yml
version: '3.8'
services:
arc:
build:
context: .
dockerfile: Dockerfile
container_name: arc-production
restart: unless-stopped
ports:
- "8000:8000"
environment:
- APP_ENV=production
- LOG_LEVEL=INFO
env_file:
- .env.production
volumes:
- ./logs:/app/logs
- ./cache:/app/cache
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
networks:
- arc-network
deploy:
resources:
limits:
cpus: '4'
memory: 8G
reservations:
cpus: '2'
memory: 4G
nginx:
image: nginx:alpine
container_name: arc-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- arc
networks:
- arc-network
redis:
image: redis:7-alpine
container_name: arc-redis
restart: unless-stopped
command: redis-server --appendonly yes
volumes:
- redis-data:/data
networks:
- arc-network
prometheus:
image: prom/prometheus:latest
container_name: arc-prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
networks:
- arc-network
volumes:
redis-data:
prometheus-data:
networks:
arc-network:
driver: bridge
Build and Deploy
# Build image
docker build -t arc:v1.0.0 .
# Tag for registry
docker tag arc:v1.0.0 myregistry.com/arc:v1.0.0
docker tag arc:v1.0.0 myregistry.com/arc:latest
# Push to registry
docker push myregistry.com/arc:v1.0.0
docker push myregistry.com/arc:latest
# Deploy with docker-compose
docker-compose -f docker-compose.prod.yml up -d
# View logs
docker-compose -f docker-compose.prod.yml logs -f arc
# Scale service
docker-compose -f docker-compose.prod.yml up -d --scale arc=4
Kubernetes Deployment
Deployment Manifest
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: arc-deployment
namespace: production
labels:
app: arc
version: v1.0.0
spec:
replicas: 4
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: arc
template:
metadata:
labels:
app: arc
version: v1.0.0
spec:
containers:
- name: arc
image: myregistry.com/arc:v1.0.0
imagePullPolicy: Always
ports:
- containerPort: 8000
name: http
- containerPort: 9090
name: metrics
env:
- name: APP_ENV
value: "production"
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: arc-secrets
- configMapRef:
name: arc-config
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
volumeMounts:
- name: cache
mountPath: /app/cache
- name: logs
mountPath: /app/logs
volumes:
- name: cache
emptyDir: {}
- name: logs
emptyDir: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: arc
topologyKey: kubernetes.io/hostname
Service Manifest
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: arc-service
namespace: production
labels:
app: arc
spec:
type: ClusterIP
selector:
app: arc
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
- port: 9090
targetPort: 9090
protocol: TCP
name: metrics
Ingress Manifest
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: arc-ingress
namespace: production
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/rate-limit: "100"
spec:
tls:
- hosts:
- arc.example.com
secretName: arc-tls
rules:
- host: arc.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: arc-service
port:
number: 80
ConfigMap and Secrets
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: arc-config
namespace: production
data:
LLM_MODEL: "gpt-4o-mini"
LLM_TEMPERATURE: "0.5"
LLM_MAX_TOKENS: "4096"
CACHE_ENABLED: "true"
CACHE_TTL: "3600"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: arc-secrets
namespace: production
type: Opaque
stringData:
OPENAI_API_KEY: "sk-xxx"
ANTHROPIC_API_KEY: "sk-ant-xxx"
DATABASE_URL: "postgresql://user:pass@host:5432/arc"
SENTRY_DSN: "https://xxx@sentry.io/xxx"
Horizontal Pod Autoscaler
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: arc-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: arc-deployment
minReplicas: 4
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
Deploy to Kubernetes
# Create namespace
kubectl create namespace production
# Apply configurations
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/ingress.yaml
kubectl apply -f k8s/hpa.yaml
# Check status
kubectl get pods -n production
kubectl get svc -n production
kubectl get ingress -n production
# View logs
kubectl logs -f deployment/arc-deployment -n production
# Scale manually
kubectl scale deployment/arc-deployment --replicas=8 -n production
# Rolling update
kubectl set image deployment/arc-deployment arc=myregistry.com/arc:v1.1.0 -n production
# Rollback
kubectl rollout undo deployment/arc-deployment -n production
Serverless Deployment
AWS Lambda
# lambda_handler.py
import json
from azcore.agents import ReactAgent
from langchain_openai import ChatOpenAI
# Initialize agent (cold start)
llm = ChatOpenAI(model="gpt-4o-mini")
agent = ReactAgent(
name="lambda-agent",
llm=llm,
prompt="You are a helpful assistant."
)
def lambda_handler(event, context):
"""AWS Lambda handler."""
try:
# Parse input
body = json.loads(event.get("body", "{}"))
task = body.get("task", "")
# Execute agent
result = agent.invoke({
"messages": [{"role": "user", "content": task}]
})
# Return response
response = result["messages"][-1]["content"]
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
},
"body": json.dumps({
"response": response,
"status": "success"
})
}
except Exception as e:
return {
"statusCode": 500,
"body": json.dumps({
"error": str(e),
"status": "error"
})
}
# serverless.yml
service: arc-lambda
provider:
name: aws
runtime: python3.11
stage: production
region: us-east-1
memorySize: 1024
timeout: 60
environment:
OPENAI_API_KEY: ${env:OPENAI_API_KEY}
LOG_LEVEL: INFO
functions:
agent:
handler: lambda_handler.lambda_handler
events:
- http:
path: /agent
method: post
cors: true
layers:
- arn:aws:lambda:us-east-1:xxx:layer:arc-dependencies:1
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: true
slim: true
Health Checks
Basic Health Check
# health.py
from fastapi import FastAPI, Response
from typing import Dict, Any
import time
import psutil
app = FastAPI()
START_TIME = time.time()
@app.get("/health")
async def health_check() -> Dict[str, Any]:
"""Basic health check endpoint."""
return {
"status": "healthy",
"timestamp": time.time(),
"uptime": time.time() - START_TIME
}
@app.get("/ready")
async def readiness_check() -> Dict[str, Any]:
"""Readiness check with dependencies."""
checks = {
"api": _check_api(),
"llm": _check_llm_connection(),
"cache": _check_cache(),
"memory": _check_memory()
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return Response(
content={
"status": "ready" if all_healthy else "not_ready",
"checks": checks
},
status_code=status_code
)
def _check_api() -> bool:
"""Check if API is responding."""
return True
def _check_llm_connection() -> bool:
"""Check LLM provider connectivity."""
try:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
llm.invoke("test")
return True
except:
return False
def _check_cache() -> bool:
"""Check cache availability."""
try:
from azcore.utils.caching import get_llm_cache
cache = get_llm_cache()
cache.get("test")
return True
except:
return False
def _check_memory() -> bool:
"""Check memory usage."""
memory = psutil.virtual_memory()
return memory.percent < 90 # Under 90% usage
Rollback Procedures
Automated Rollback
#!/bin/bash
# rollback.sh
VERSION_TO_ROLLBACK="$1"
echo "Rolling back to version: $VERSION_TO_ROLLBACK"
# Kubernetes rollback
if command -v kubectl &> /dev/null; then
kubectl rollout undo deployment/arc-deployment -n production
kubectl rollout status deployment/arc-deployment -n production
fi
# Docker rollback
if command -v docker &> /dev/null; then
docker-compose -f docker-compose.prod.yml down
docker-compose -f docker-compose.prod.yml up -d arc:$VERSION_TO_ROLLBACK
fi
# Verify rollback
sleep 30
if curl -f http://localhost:8000/health; then
echo "Rollback successful"
else
echo "Rollback failed - manual intervention required"
exit 1
fi
Production Best Practices
1. Use Production-Grade WSGI Server
# Use Gunicorn or uWSGI instead of development server
# gunicorn_config.py
import multiprocessing
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 120
timeout = 120
max_requests = 1000
max_requests_jitter = 100
preload_app = True
accesslog = "/var/log/gunicorn/access.log"
errorlog = "/var/log/gunicorn/error.log"
loglevel = "info"
# Start with Gunicorn
gunicorn -c gunicorn_config.py app.main:app
2. Enable HTTPS
# nginx.conf
server {
listen 80;
server_name arc.example.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name arc.example.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
location / {
proxy_pass http://arc:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
3. Implement Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/agent")
@limiter.limit("100/minute")
async def agent_endpoint(task: str):
# Handle request
pass
4. Use Connection Pooling
from langchain_openai import ChatOpenAI
# Reuse LLM instances
llm = ChatOpenAI(
model="gpt-4o-mini",
max_retries=3,
request_timeout=60
)
5. Implement Graceful Shutdown
import signal
import sys
def signal_handler(sig, frame):
"""Handle shutdown gracefully."""
print("Shutting down gracefully...")
# Close connections
# Finish pending requests
# Save state
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
6. Monitor Resource Usage
import psutil
def log_resource_usage():
"""Log current resource usage."""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
print(f"CPU: {cpu_percent}%")
print(f"Memory: {memory.percent}%")
print(f"Disk: {disk.percent}%")
7. Use Environment-Specific Configurations
# config/__init__.py
from .development import DevelopmentConfig
from .production import ProductionConfig
from .testing import TestingConfig
import os
config_map = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig
}
env = os.getenv("APP_ENV", "development")
config = config_map[env]()