Comprehensive guide to deploying, scaling, and operating MCP-enabled agents in production environments with best practices for reliability, security, and performance.
Overview
Production deployment of MCP-enabled agents requires careful consideration of reliability, security, performance, and operational requirements. This guide provides battle-tested patterns for production success.
Production vs. Development
# ❌ Development configuration - Not production ready
dev_team = (MCPTeamBuilder("dev_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["server.py"])
.build()
)
# ✅ Production configuration - Hardened and monitored
prod_team = (MCPTeamBuilder("prod_team")
.with_llm(ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
timeout=30,
max_retries=3,
request_timeout=60
))
.with_mcp_server(
"python",
["/opt/mcp/servers/server.py"],
env={
"LOG_LEVEL": "INFO",
"MAX_RETRIES": "3",
"TIMEOUT": "30",
"ENVIRONMENT": "production"
},
timeout=30
)
.skip_failed_servers(True) # Graceful degradation
.build()
)
Key Production Requirements
- Reliability: 99.9%+ uptime, fault tolerance
- Security: Authentication, encryption, audit logs
- Performance: Low latency, high throughput
- Monitoring: Metrics, logs, alerts
- Scalability: Handle increasing load
- Maintainability: Easy updates and debugging
Pre-Production Checklist
Infrastructure Checklist
# production_checklist.yaml
infrastructure:
compute:
- [ ] Production servers provisioned
- [ ] Resource limits configured (CPU, memory)
- [ ] Auto-scaling configured
- [ ] Load balancer set up
networking:
- [ ] VPC/network configured
- [ ] Firewall rules configured
- [ ] SSL/TLS certificates installed
- [ ] DNS configured
storage:
- [ ] Database provisioned and backed up
- [ ] File storage configured
- [ ] Backup strategy implemented
- [ ] Data retention policies defined
security:
- [ ] Secrets management configured
- [ ] API keys rotated
- [ ] SSL/TLS enabled
- [ ] Rate limiting configured
- [ ] WAF/DDoS protection enabled
- [ ] Security audit completed
- [ ] Compliance requirements met
monitoring:
- [ ] Logging infrastructure set up
- [ ] Metrics collection configured
- [ ] Alerting rules defined
- [ ] Dashboard created
- [ ] On-call rotation established
- [ ] Runbook created
deployment:
- [ ] CI/CD pipeline configured
- [ ] Blue-green/canary deployment ready
- [ ] Rollback procedure tested
- [ ] Health checks implemented
- [ ] Load testing completed
- [ ] Disaster recovery tested
documentation:
- [ ] Architecture documented
- [ ] API documentation complete
- [ ] Operational runbooks created
- [ ] Incident response plan defined
- [ ] SLA defined and communicated
Code Quality Checklist
"""
Pre-production code quality checklist.
"""
def production_readiness_check():
"""Verify production readiness."""
checks = {
"error_handling": False,
"logging": False,
"monitoring": False,
"configuration": False,
"security": False,
"testing": False,
"documentation": False
}
# 1. Error handling
try:
# Check comprehensive error handling
checks["error_handling"] = verify_error_handling()
except:
pass
# 2. Logging
try:
# Check structured logging
checks["logging"] = verify_logging_setup()
except:
pass
# 3. Monitoring
try:
# Check metrics and health endpoints
checks["monitoring"] = verify_monitoring()
except:
pass
# 4. Configuration
try:
# Check environment-based config
checks["configuration"] = verify_configuration()
except:
pass
# 5. Security
try:
# Check secrets management
checks["security"] = verify_security()
except:
pass
# 6. Testing
try:
# Check test coverage
checks["testing"] = verify_test_coverage()
except:
pass
# 7. Documentation
try:
# Check documentation completeness
checks["documentation"] = verify_documentation()
except:
pass
# Report results
passed = sum(checks.values())
total = len(checks)
print(f"Production Readiness: {passed}/{total} checks passed")
for check, status in checks.items():
status_str = "✅" if status else "❌"
print(f"{status_str} {check}")
return all(checks.values())
Infrastructure Setup
Docker Deployment
# Dockerfile
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Node.js (for npx-based MCP servers)
RUN curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create necessary directories
RUN mkdir -p /app/logs /app/data /app/rl_data
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=INFO
ENV ENVIRONMENT=production
# Expose port (if using HTTP/SSE)
EXPOSE 8000
# Health check
HEALTHCHECK \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Run application
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'
services:
mcp-agent:
build: .
container_name: mcp-agent-prod
restart: unless-stopped
ports:
- "8000:8000"
environment:
- LOG_LEVEL=INFO
- ENVIRONMENT=production
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GITHUB_TOKEN=${GITHUB_TOKEN}
- DATABASE_URL=${DATABASE_URL}
volumes:
- ./logs:/app/logs
- ./data:/app/data
- ./rl_data:/app/rl_data
- ./config:/app/config:ro
networks:
- mcp-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
deploy:
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '1'
memory: 2G
# Redis for caching (optional)
redis:
image: redis:7-alpine
container_name: mcp-redis
restart: unless-stopped
networks:
- mcp-network
volumes:
- redis-data:/data
command: redis-server --appendonly yes
# PostgreSQL for persistence (optional)
postgres:
image: postgres:15-alpine
container_name: mcp-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=mcp_db
- POSTGRES_USER=mcp_user
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
networks:
- mcp-network
volumes:
- postgres-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mcp_user"]
interval: 10s
timeout: 5s
retries: 5
networks:
mcp-network:
driver: bridge
volumes:
redis-data:
postgres-data:
Kubernetes Deployment
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-agent
namespace: production
labels:
app: mcp-agent
version: v1.0.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: mcp-agent
template:
metadata:
labels:
app: mcp-agent
version: v1.0.0
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
containers:
- name: mcp-agent
image: myregistry/mcp-agent:v1.0.0
imagePullPolicy: Always
ports:
- containerPort: 8000
name: http
protocol: TCP
env:
- name: LOG_LEVEL
value: "INFO"
- name: ENVIRONMENT
value: "production"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: mcp-secrets
key: openai-api-key
- name: GITHUB_TOKEN
valueFrom:
secretKeyRef:
name: mcp-secrets
key: github-token
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
- name: data
mountPath: /app/data
- name: logs
mountPath: /app/logs
volumes:
- name: config
configMap:
name: mcp-config
- name: data
persistentVolumeClaim:
claimName: mcp-data-pvc
- name: logs
emptyDir: {}
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
---
# kubernetes/service.yaml
apiVersion: v1
kind: Service
metadata:
name: mcp-agent-service
namespace: production
labels:
app: mcp-agent
spec:
type: ClusterIP
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
selector:
app: mcp-agent
---
# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-agent-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-agent
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Configuration Management
Environment-Based Configuration
"""
production_config.py - Production configuration management.
"""
import os
from enum import Enum
from typing import Dict, Any, Optional
from pydantic import BaseSettings, Field, validator
import logging
logger = logging.getLogger(__name__)
class Environment(str, Enum):
"""Deployment environment."""
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class MCPServerConfig(BaseSettings):
"""Configuration for MCP server."""
name: str
command: Optional[str] = None
args: list = []
url: Optional[str] = None
transport: str = "stdio"
timeout: int = 30
env_vars: Dict[str, str] = {}
optional: bool = False
class Config:
env_prefix = "MCP_SERVER_"
class ProductionConfig(BaseSettings):
"""Production configuration."""
# Environment
environment: Environment = Field(default=Environment.PRODUCTION)
debug: bool = Field(default=False)
# LLM Configuration
llm_model: str = Field(default="gpt-4o-mini")
llm_temperature: float = Field(default=0.7)
llm_max_tokens: int = Field(default=2000)
llm_timeout: int = Field(default=60)
llm_max_retries: int = Field(default=3)
# API Keys (from environment)
openai_api_key: str = Field(..., env="OPENAI_API_KEY")
github_token: Optional[str] = Field(None, env="GITHUB_TOKEN")
# Database
database_url: Optional[str] = Field(None, env="DATABASE_URL")
database_pool_size: int = Field(default=10)
database_max_overflow: int = Field(default=20)
# Redis
redis_url: Optional[str] = Field(None, env="REDIS_URL")
redis_ttl: int = Field(default=3600)
# Logging
log_level: str = Field(default="INFO")
log_format: str = Field(default="json")
log_file: Optional[str] = Field(default="/app/logs/mcp.log")
# Monitoring
enable_metrics: bool = Field(default=True)
metrics_port: int = Field(default=9090)
enable_tracing: bool = Field(default=True)
tracing_endpoint: Optional[str] = Field(None, env="TRACING_ENDPOINT")
# Performance
max_concurrent_requests: int = Field(default=100)
request_timeout: int = Field(default=300)
enable_caching: bool = Field(default=True)
cache_ttl: int = Field(default=3600)
# Security
enable_auth: bool = Field(default=True)
jwt_secret: Optional[str] = Field(None, env="JWT_SECRET")
allowed_origins: list = Field(default=["*"])
rate_limit_per_minute: int = Field(default=60)
# MCP Servers
mcp_servers: list = Field(default_factory=list)
# RL Configuration
enable_rl: bool = Field(default=True)
rl_exploration_rate: float = Field(default=0.15)
rl_learning_rate: float = Field(default=0.1)
rl_q_table_path: str = Field(default="/app/rl_data/q_table.pkl")
# Health Check
health_check_interval: int = Field(default=30)
class Config:
env_file = ".env.production"
env_file_encoding = "utf-8"
@validator("environment", pre=True)
def validate_environment(cls, v):
"""Validate environment."""
if isinstance(v, str):
return Environment(v.lower())
return v
@validator("log_level")
def validate_log_level(cls, v):
"""Validate log level."""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if v.upper() not in valid_levels:
raise ValueError(f"Invalid log level. Must be one of {valid_levels}")
return v.upper()
def get_llm_config(self) -> Dict[str, Any]:
"""Get LLM configuration."""
return {
"model": self.llm_model,
"temperature": self.llm_temperature,
"max_tokens": self.llm_max_tokens,
"timeout": self.llm_timeout,
"max_retries": self.llm_max_retries,
"api_key": self.openai_api_key
}
def get_database_url(self) -> str:
"""Get database connection URL."""
if not self.database_url:
raise ValueError("DATABASE_URL not configured")
return self.database_url
def is_production(self) -> bool:
"""Check if running in production."""
return self.environment == Environment.PRODUCTION
def get_log_config(self) -> Dict[str, Any]:
"""Get logging configuration."""
return {
"level": self.log_level,
"format": self.log_format,
"file": self.log_file
}
def load_config() -> ProductionConfig:
"""Load production configuration."""
try:
config = ProductionConfig()
logger.info(f"Loaded configuration for environment: {config.environment.value}")
return config
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
raise
# Global configuration instance
config = load_config()
Secrets Management
"""
secrets_manager.py - Secure secrets management.
"""
import os
import boto3
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class SecretsManager:
"""Manage secrets securely."""
def __init__(self, provider: str = "env"):
"""
Initialize secrets manager.
Args:
provider: Secrets provider ('env', 'aws', 'vault', 'k8s')
"""
self.provider = provider
self._cache: Dict[str, str] = {}
if provider == "aws":
self.client = boto3.client('secretsmanager')
elif provider == "vault":
# Initialize HashiCorp Vault client
pass
elif provider == "k8s":
# Initialize Kubernetes secrets client
pass
def get_secret(self, key: str, required: bool = True) -> Optional[str]:
"""
Get secret value.
Args:
key: Secret key
required: Whether secret is required
Returns:
Secret value or None
"""
# Check cache first
if key in self._cache:
return self._cache[key]
# Fetch from provider
value = None
if self.provider == "env":
value = os.getenv(key)
elif self.provider == "aws":
try:
response = self.client.get_secret_value(SecretId=key)
value = response['SecretString']
except Exception as e:
logger.error(f"Failed to fetch secret from AWS: {e}")
elif self.provider == "vault":
# Fetch from Vault
pass
elif self.provider == "k8s":
# Fetch from Kubernetes secrets
pass
# Validate
if required and not value:
raise ValueError(f"Required secret not found: {key}")
# Cache
if value:
self._cache[key] = value
return value
def get_database_credentials(self) -> Dict[str, str]:
"""Get database credentials."""
return {
"host": self.get_secret("DB_HOST"),
"port": self.get_secret("DB_PORT"),
"database": self.get_secret("DB_NAME"),
"user": self.get_secret("DB_USER"),
"password": self.get_secret("DB_PASSWORD")
}
def get_api_keys(self) -> Dict[str, str]:
"""Get API keys."""
return {
"openai": self.get_secret("OPENAI_API_KEY"),
"github": self.get_secret("GITHUB_TOKEN", required=False),
"slack": self.get_secret("SLACK_BOT_TOKEN", required=False)
}
def rotate_secret(self, key: str, new_value: str):
"""Rotate a secret."""
if self.provider == "aws":
try:
self.client.update_secret(SecretId=key, SecretString=new_value)
logger.info(f"Rotated secret: {key}")
# Clear cache
if key in self._cache:
del self._cache[key]
except Exception as e:
logger.error(f"Failed to rotate secret: {e}")
raise
def clear_cache(self):
"""Clear secrets cache."""
self._cache.clear()
logger.info("Cleared secrets cache")
# Global secrets manager
secrets = SecretsManager(provider=os.getenv("SECRETS_PROVIDER", "env"))
Deployment Strategies
Blue-Green Deployment
"""
blue_green_deployment.py - Blue-green deployment strategy.
"""
import time
import logging
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class DeploymentColor(Enum):
"""Deployment color."""
BLUE = "blue"
GREEN = "green"
class BlueGreenDeployment:
"""Manage blue-green deployments."""
def __init__(self):
self.active_color = DeploymentColor.BLUE
self.blue_team = None
self.green_team = None
def deploy_new_version(self, new_team):
"""Deploy new version to inactive environment."""
inactive_color = self._get_inactive_color()
logger.info(f"Deploying new version to {inactive_color.value} environment")
if inactive_color == DeploymentColor.BLUE:
self.blue_team = new_team
else:
self.green_team = new_team
logger.info(f"New version deployed to {inactive_color.value}")
def health_check(self, team) -> bool:
"""Perform health check on team."""
try:
# Test basic functionality
result = team({
"messages": [HumanMessage(content="health check")]
})
return True
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
def switch_traffic(self):
"""Switch traffic to new version."""
inactive_color = self._get_inactive_color()
inactive_team = self._get_team(inactive_color)
if not inactive_team:
raise ValueError(f"No team deployed to {inactive_color.value}")
# Health check before switching
logger.info(f"Performing health check on {inactive_color.value}")
if not self.health_check(inactive_team):
raise RuntimeError(f"Health check failed for {inactive_color.value}")
# Switch traffic
logger.info(f"Switching traffic from {self.active_color.value} to {inactive_color.value}")
self.active_color = inactive_color
logger.info(f"Traffic switched to {self.active_color.value}")
def rollback(self):
"""Rollback to previous version."""
previous_color = self._get_inactive_color()
logger.warning(f"Rolling back to {previous_color.value}")
self.active_color = previous_color
logger.info(f"Rolled back to {self.active_color.value}")
def get_active_team(self):
"""Get currently active team."""
return self._get_team(self.active_color)
def _get_inactive_color(self) -> DeploymentColor:
"""Get inactive color."""
return (
DeploymentColor.GREEN
if self.active_color == DeploymentColor.BLUE
else DeploymentColor.BLUE
)
def _get_team(self, color: DeploymentColor):
"""Get team by color."""
return self.blue_team if color == DeploymentColor.BLUE else self.green_team
# Usage example
def perform_blue_green_deployment():
"""Perform blue-green deployment."""
deployment = BlueGreenDeployment()
# Current active version (blue)
logger.info("Current version running on blue")
# Deploy new version to green
logger.info("Building new version...")
new_team = build_new_team_version()
deployment.deploy_new_version(new_team)
# Run smoke tests
logger.info("Running smoke tests...")
time.sleep(5)
# Switch traffic
try:
deployment.switch_traffic()
logger.info("Deployment successful!")
except Exception as e:
logger.error(f"Deployment failed: {e}")
deployment.rollback()
logger.info("Rolled back to previous version")
Canary Deployment
"""
canary_deployment.py - Canary deployment strategy.
"""
import random
import logging
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
class CanaryDeployment:
"""Manage canary deployments."""
def __init__(
self,
stable_team,
canary_percentage: int = 10
):
self.stable_team = stable_team
self.canary_team = None
self.canary_percentage = canary_percentage
self.metrics: Dict[str, List[float]] = {
"stable": [],
"canary": []
}
def deploy_canary(self, canary_team):
"""Deploy canary version."""
logger.info(f"Deploying canary with {self.canary_percentage}% traffic")
self.canary_team = canary_team
def route_request(self, request: Dict[str, Any]):
"""Route request to stable or canary."""
# Decide which version to use
use_canary = (
self.canary_team is not None and
random.random() * 100 < self.canary_percentage
)
team = self.canary_team if use_canary else self.stable_team
version = "canary" if use_canary else "stable"
logger.info(f"Routing to {version} version")
# Execute request
try:
start_time = time.time()
result = team(request)
duration = time.time() - start_time
# Record metrics
self.metrics[version].append(duration)
return {
"result": result,
"version": version,
"duration": duration
}
except Exception as e:
logger.error(f"Request failed on {version}: {e}")
raise
def increase_canary_traffic(self, increment: int = 10):
"""Gradually increase canary traffic."""
if not self.canary_team:
raise ValueError("No canary deployed")
self.canary_percentage = min(100, self.canary_percentage + increment)
logger.info(f"Increased canary traffic to {self.canary_percentage}%")
def analyze_metrics(self) -> Dict[str, Any]:
"""Analyze performance metrics."""
if not self.metrics["stable"] or not self.metrics["canary"]:
return {"status": "insufficient_data"}
stable_avg = sum(self.metrics["stable"]) / len(self.metrics["stable"])
canary_avg = sum(self.metrics["canary"]) / len(self.metrics["canary"])
# Calculate error rates (simplified)
stable_errors = 0 # Track separately
canary_errors = 0 # Track separately
analysis = {
"stable_avg_latency": stable_avg,
"canary_avg_latency": canary_avg,
"latency_diff_percent": ((canary_avg - stable_avg) / stable_avg) * 100,
"stable_error_rate": stable_errors,
"canary_error_rate": canary_errors,
"recommendation": "proceed"
}
# Decision logic
if canary_avg > stable_avg * 1.2: # 20% slower
analysis["recommendation"] = "rollback"
analysis["reason"] = "Canary latency too high"
elif canary_errors > stable_errors * 1.5:
analysis["recommendation"] = "rollback"
analysis["reason"] = "Canary error rate too high"
return analysis
def promote_canary(self):
"""Promote canary to stable."""
if not self.canary_team:
raise ValueError("No canary to promote")
logger.info("Promoting canary to stable")
self.stable_team = self.canary_team
self.canary_team = None
self.canary_percentage = 0
logger.info("Canary promoted successfully")
def rollback_canary(self):
"""Rollback canary deployment."""
logger.warning("Rolling back canary")
self.canary_team = None
self.canary_percentage = 0
logger.info("Canary rolled back")
# Usage example
def perform_canary_deployment():
"""Perform canary deployment."""
stable_team = build_current_team()
canary = CanaryDeployment(stable_team, canary_percentage=10)
# Deploy canary
new_team = build_new_team_version()
canary.deploy_canary(new_team)
# Gradually increase traffic
for stage in [10, 25, 50, 100]:
logger.info(f"Setting canary traffic to {stage}%")
canary.canary_percentage = stage
# Monitor for 10 minutes
time.sleep(600)
# Analyze metrics
analysis = canary.analyze_metrics()
if analysis["recommendation"] == "rollback":
logger.error(f"Rolling back: {analysis['reason']}")
canary.rollback_canary()
break
if stage == 100:
logger.info("Canary successful, promoting to stable")
canary.promote_canary()
Monitoring and Observability
Structured Logging
"""
production_logging.py - Production logging setup.
"""
import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any
from pythonjsonlogger import jsonlogger
class ProductionLogger:
"""Production-ready logging."""
def __init__(
self,
name: str,
level: str = "INFO",
log_file: str = None
):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# JSON formatter
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
json_encoder=json.JSONEncoder
)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# File handler (if specified)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_request(
self,
request_id: str,
method: str,
path: str,
**kwargs
):
"""Log incoming request."""
self.logger.info(
"Request received",
extra={
"request_id": request_id,
"method": method,
"path": path,
**kwargs
}
)
def log_response(
self,
request_id: str,
status_code: int,
duration_ms: float,
**kwargs
):
"""Log response."""
self.logger.info(
"Response sent",
extra={
"request_id": request_id,
"status_code": status_code,
"duration_ms": duration_ms,
**kwargs
}
)
def log_error(
self,
request_id: str,
error: Exception,
**kwargs
):
"""Log error."""
self.logger.error(
"Error occurred",
extra={
"request_id": request_id,
"error_type": type(error).__name__,
"error_message": str(error),
**kwargs
},
exc_info=True
)
def log_metric(
self,
metric_name: str,
value: float,
**kwargs
):
"""Log metric."""
self.logger.info(
"Metric recorded",
extra={
"metric_name": metric_name,
"value": value,
**kwargs
}
)
# Setup production logger
prod_logger = ProductionLogger(
name="mcp_production",
level="INFO",
log_file="/app/logs/mcp.log"
)
Metrics Collection
"""
production_metrics.py - Production metrics collection.
"""
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Define metrics
request_count = Counter(
'mcp_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'mcp_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
active_requests = Gauge(
'mcp_active_requests',
'Number of active requests'
)
tool_execution_count = Counter(
'mcp_tool_executions_total',
'Total tool executions',
['tool_name', 'status']
)
tool_execution_duration = Histogram(
'mcp_tool_execution_duration_seconds',
'Tool execution duration',
['tool_name']
)
rl_q_table_size = Gauge(
'mcp_rl_q_table_size',
'Size of RL Q-table'
)
rl_exploration_rate = Gauge(
'mcp_rl_exploration_rate',
'Current RL exploration rate'
)
def track_request(method: str, endpoint: str):
"""Decorator to track requests."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
active_requests.inc()
start_time = time.time()
try:
result = func(*args, **kwargs)
status = "success"
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
active_requests.dec()
request_count.labels(method=method, endpoint=endpoint, status=status).inc()
request_duration.labels(method=method, endpoint=endpoint).observe(duration)
return wrapper
return decorator
def track_tool_execution(tool_name: str):
"""Decorator to track tool execution."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
status = "success"
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
tool_execution_count.labels(tool_name=tool_name, status=status).inc()
tool_execution_duration.labels(tool_name=tool_name).observe(duration)
return wrapper
return decorator
def update_rl_metrics(rl_manager):
"""Update RL metrics."""
stats = rl_manager.get_statistics()
rl_q_table_size.set(stats["total_states"])
rl_exploration_rate.set(stats["exploration_rate"])
def start_metrics_server(port: int = 9090):
"""Start Prometheus metrics server."""
start_http_server(port)
logger.info(f"Metrics server started on port {port}")
Health Checks
"""
production_health.py - Production health checks.
"""
from fastapi import FastAPI, Response
from typing import Dict, Any
import time
app = FastAPI()
class HealthChecker:
"""Comprehensive health checker."""
def __init__(self, team):
self.team = team
self.last_check = None
self.health_status = "unknown"
def check_components(self) -> Dict[str, Any]:
"""Check all system components."""
checks = {}
# 1. MCP Servers
try:
server_count = self.team.get_mcp_server_count()
checks["mcp_servers"] = {
"status": "healthy",
"count": server_count
}
except Exception as e:
checks["mcp_servers"] = {
"status": "unhealthy",
"error": str(e)
}
# 2. Tools
try:
tools = self.team.get_mcp_tool_names()
checks["tools"] = {
"status": "healthy",
"count": len(tools)
}
except Exception as e:
checks["tools"] = {
"status": "unhealthy",
"error": str(e)
}
# 3. Database (if configured)
try:
if config.database_url:
# Test database connection
checks["database"] = {
"status": "healthy"
}
except Exception as e:
checks["database"] = {
"status": "unhealthy",
"error": str(e)
}
# 4. Redis (if configured)
try:
if config.redis_url:
# Test Redis connection
checks["redis"] = {
"status": "healthy"
}
except Exception as e:
checks["redis"] = {
"status": "unhealthy",
"error": str(e)
}
# Determine overall health
all_healthy = all(
check.get("status") == "healthy"
for check in checks.values()
)
self.health_status = "healthy" if all_healthy else "unhealthy"
self.last_check = time.time()
return {
"status": self.health_status,
"timestamp": self.last_check,
"checks": checks
}
health_checker = None # Initialize after team creation
@app.get("/health")
async def health():
"""Health check endpoint."""
if health_checker:
result = health_checker.check_components()
status_code = 200 if result["status"] == "healthy" else 503
return Response(
content=json.dumps(result),
status_code=status_code,
media_type="application/json"
)
return {"status": "starting"}
@app.get("/ready")
async def readiness():
"""Readiness check endpoint."""
# Check if system is ready to serve traffic
if health_checker and health_checker.health_status == "healthy":
return {"status": "ready"}
return Response(
content=json.dumps({"status": "not_ready"}),
status_code=503,
media_type="application/json"
)
@app.get("/metrics")
async def metrics():
"""Metrics endpoint for Prometheus."""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
Scaling and Performance
Horizontal Scaling
"""
horizontal_scaling.py - Horizontal scaling implementation.
"""
from typing import List
import random
class LoadBalancer:
"""Simple round-robin load balancer."""
def __init__(self, instances: List):
self.instances = instances
self.current_index = 0
def get_instance(self):
"""Get next instance (round-robin)."""
instance = self.instances[self.current_index]
self.current_index = (self.current_index + 1) % len(self.instances)
return instance
def add_instance(self, instance):
"""Add new instance."""
self.instances.append(instance)
logger.info(f"Added instance. Total: {len(self.instances)}")
def remove_instance(self, instance):
"""Remove instance."""
self.instances.remove(instance)
logger.info(f"Removed instance. Total: {len(self.instances)}")
# Create multiple instances
instances = [
build_team_instance(f"instance_{i}")
for i in range(3)
]
load_balancer = LoadBalancer(instances)
# Route requests
def handle_request(request):
"""Handle request with load balancing."""
instance = load_balancer.get_instance()
return instance(request)
Caching Strategy
"""
production_cache.py - Production caching strategy.
"""
import redis
import pickle
import hashlib
from typing import Any, Optional
class ProductionCache:
"""Production caching with Redis."""
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis_client = redis.from_url(redis_url)
self.ttl = ttl
def _generate_key(self, prefix: str, data: Any) -> str:
"""Generate cache key."""
data_str = str(data)
hash_obj = hashlib.md5(data_str.encode())
return f"{prefix}:{hash_obj.hexdigest()}"
def get(self, key: str) -> Optional[Any]:
"""Get from cache."""
try:
data = self.redis_client.get(key)
if data:
return pickle.loads(data)
except Exception as e:
logger.error(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = None):
"""Set in cache."""
try:
ttl = ttl or self.ttl
data = pickle.dumps(value)
self.redis_client.setex(key, ttl, data)
except Exception as e:
logger.error(f"Cache set error: {e}")
def delete(self, key: str):
"""Delete from cache."""
try:
self.redis_client.delete(key)
except Exception as e:
logger.error(f"Cache delete error: {e}")
def clear_all(self):
"""Clear all cache."""
try:
self.redis_client.flushdb()
logger.info("Cleared all cache")
except Exception as e:
logger.error(f"Cache clear error: {e}")
# Initialize cache
cache = ProductionCache(
redis_url=config.redis_url,
ttl=config.cache_ttl
) if config.redis_url else None
Security Hardening
Authentication and Authorization
"""
production_security.py - Production security implementation.
"""
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
from typing import Dict, Optional
security = HTTPBearer()
class SecurityManager:
"""Manage authentication and authorization."""
def __init__(self, jwt_secret: str):
self.jwt_secret = jwt_secret
self.algorithm = "HS256"
def create_token(self, user_id: str, role: str) -> str:
"""Create JWT token."""
payload = {
"user_id": user_id,
"role": role,
"exp": datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.algorithm)
def verify_token(self, token: str) -> Dict:
"""Verify JWT token."""
try:
payload = jwt.decode(
token,
self.jwt_secret,
algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def require_auth(
self,
credentials: HTTPAuthorizationCredentials = Security(security)
) -> Dict:
"""Require authentication."""
token = credentials.credentials
return self.verify_token(token)
def require_role(self, required_role: str):
"""Require specific role."""
def decorator(user: Dict = Depends(self.require_auth)):
if user.get("role") != required_role:
raise HTTPException(
status_code=403,
detail="Insufficient permissions"
)
return user
return decorator
# Initialize security
security_manager = SecurityManager(jwt_secret=config.jwt_secret)
# Use in endpoints
@app.post("/execute")
async def execute_task(
request: Dict,
user: Dict = Depends(security_manager.require_auth)
):
"""Execute task with authentication."""
# Process request
pass
Rate Limiting
"""
rate_limiting.py - Production rate limiting.
"""
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
# Add to FastAPI app
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/execute")
@limiter.limit("60/minute") # 60 requests per minute
async def execute_task(request: Request):
"""Execute task with rate limiting."""
# Process request
pass
High Availability
Multi-Region Deployment
"""
high_availability.py - High availability implementation.
"""
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class MultiRegionDeployment:
"""Manage multi-region deployment for high availability."""
def __init__(self, regions: List[str]):
self.regions = regions
self.region_instances: Dict[str, Any] = {}
self.health_status: Dict[str, bool] = {}
def deploy_to_region(self, region: str, team):
"""Deploy team to specific region."""
logger.info(f"Deploying to region: {region}")
self.region_instances[region] = team
self.health_status[region] = True
def get_healthy_regions(self) -> List[str]:
"""Get list of healthy regions."""
return [
region
for region, healthy in self.health_status.items()
if healthy
]
def route_to_nearest_region(self, user_location: str):
"""Route request to nearest healthy region."""
# Simplified region routing logic
region_distances = {
"us-east-1": self._calculate_distance(user_location, "us-east-1"),
"us-west-2": self._calculate_distance(user_location, "us-west-2"),
"eu-west-1": self._calculate_distance(user_location, "eu-west-1")
}
# Sort by distance
sorted_regions = sorted(
region_distances.items(),
key=lambda x: x[1]
)
# Find nearest healthy region
for region, _ in sorted_regions:
if region in self.health_status and self.health_status[region]:
logger.info(f"Routing to region: {region}")
return self.region_instances[region]
raise RuntimeError("No healthy regions available")
def _calculate_distance(self, from_loc: str, to_region: str) -> float:
"""Calculate distance between locations."""
# Simplified distance calculation
# In production, use actual geolocation
return 0.0
def perform_health_check(self, region: str) -> bool:
"""Perform health check on region."""
try:
team = self.region_instances[region]
# Test basic functionality
result = team({
"messages": [HumanMessage(content="health check")]
})
self.health_status[region] = True
return True
except Exception as e:
logger.error(f"Region {region} health check failed: {e}")
self.health_status[region] = False
return False
def failover_to_backup(self, failed_region: str):
"""Failover to backup region."""
logger.warning(f"Initiating failover from {failed_region}")
# Mark failed region as unhealthy
self.health_status[failed_region] = False
# Find healthy backup region
healthy_regions = self.get_healthy_regions()
if not healthy_regions:
raise RuntimeError("No healthy backup regions available")
backup_region = healthy_regions[0]
logger.info(f"Failed over to backup region: {backup_region}")
return backup_region
Database Replication
# database_replication.yaml - PostgreSQL replication setup
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-replication-config
namespace: production
data:
postgresql.conf: |
# Replication settings
wal_level = replica
max_wal_senders = 10
max_replication_slots = 10
hot_standby = on
pg_hba.conf: |
# Replication connections
host replication replicator 0.0.0.0/0 md5
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres-primary
namespace: production
spec:
serviceName: postgres-primary
replicas: 1
selector:
matchLabels:
app: postgres
role: primary
template:
metadata:
labels:
app: postgres
role: primary
spec:
containers:
- name: postgres
image: postgres:15
env:
- name: POSTGRES_USER
value: mcp_user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
- name: POSTGRES_DB
value: mcp_db
ports:
- containerPort: 5432
name: postgres
volumeMounts:
- name: data
mountPath: /var/lib/postgresql/data
- name: config
mountPath: /etc/postgresql
volumes:
- name: config
configMap:
name: postgres-replication-config
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres-replica
namespace: production
spec:
serviceName: postgres-replica
replicas: 2
selector:
matchLabels:
app: postgres
role: replica
template:
metadata:
labels:
app: postgres
role: replica
spec:
containers:
- name: postgres
image: postgres:15
env:
- name: POSTGRES_USER
value: mcp_user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
- name: POSTGRES_PRIMARY_HOST
value: postgres-primary
ports:
- containerPort: 5432
name: postgres
volumeMounts:
- name: data
mountPath: /var/lib/postgresql/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
Load Balancer Configuration
"""
advanced_load_balancer.py - Advanced load balancing strategies.
"""
import time
from typing import List, Dict, Any
from collections import defaultdict
import heapq
class WeightedLoadBalancer:
"""Weighted round-robin load balancer."""
def __init__(self, instances: List[tuple]):
"""
Initialize with weighted instances.
Args:
instances: List of (instance, weight) tuples
"""
self.instances = instances
self.current_weight = 0
self.current_index = -1
self.max_weight = max(weight for _, weight in instances)
self.gcd_weight = self._gcd_weights()
def _gcd_weights(self) -> int:
"""Calculate GCD of all weights."""
from math import gcd
weights = [weight for _, weight in self.instances]
result = weights[0]
for weight in weights[1:]:
result = gcd(result, weight)
return result
def get_instance(self):
"""Get next instance using weighted round-robin."""
while True:
self.current_index = (self.current_index + 1) % len(self.instances)
if self.current_index == 0:
self.current_weight = self.current_weight - self.gcd_weight
if self.current_weight <= 0:
self.current_weight = self.max_weight
instance, weight = self.instances[self.current_index]
if weight >= self.current_weight:
return instance
class LeastConnectionsLoadBalancer:
"""Least connections load balancer."""
def __init__(self, instances: List):
self.instances = instances
self.connections: Dict[int, int] = defaultdict(int)
def get_instance(self):
"""Get instance with least connections."""
min_connections = min(
self.connections[id(instance)]
for instance in self.instances
)
for instance in self.instances:
if self.connections[id(instance)] == min_connections:
self.connections[id(instance)] += 1
return instance
def release_instance(self, instance):
"""Release instance connection."""
instance_id = id(instance)
if instance_id in self.connections:
self.connections[instance_id] = max(
0,
self.connections[instance_id] - 1
)
class ConsistentHashLoadBalancer:
"""Consistent hashing load balancer."""
def __init__(self, instances: List, virtual_nodes: int = 150):
self.virtual_nodes = virtual_nodes
self.ring: Dict[int, Any] = {}
self.sorted_keys: List[int] = []
for instance in instances:
self.add_instance(instance)
def _hash(self, key: str) -> int:
"""Hash function."""
import hashlib
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_instance(self, instance):
"""Add instance to hash ring."""
for i in range(self.virtual_nodes):
virtual_key = f"{id(instance)}:{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = instance
heapq.heappush(self.sorted_keys, hash_val)
def get_instance(self, key: str):
"""Get instance for given key."""
if not self.ring:
return None
hash_val = self._hash(key)
# Find first node >= hash_val
for node_hash in self.sorted_keys:
if node_hash >= hash_val:
return self.ring[node_hash]
# Wrap around to first node
return self.ring[self.sorted_keys[0]]
Disaster Recovery
Backup Strategy
"""
backup_strategy.py - Comprehensive backup implementation.
"""
import os
import shutil
import boto3
from datetime import datetime, timedelta
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class BackupManager:
"""Manage backups for disaster recovery."""
def __init__(
self,
backup_dir: str = "/backups",
s3_bucket: str = None,
retention_days: int = 30
):
self.backup_dir = backup_dir
self.s3_bucket = s3_bucket
self.retention_days = retention_days
if s3_bucket:
self.s3_client = boto3.client('s3')
os.makedirs(backup_dir, exist_ok=True)
def backup_database(self, db_url: str) -> str:
"""Backup database."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{self.backup_dir}/db_backup_{timestamp}.sql"
logger.info(f"Starting database backup: {backup_file}")
try:
# Use pg_dump for PostgreSQL
import subprocess
subprocess.run([
"pg_dump",
db_url,
"-f", backup_file
], check=True)
logger.info(f"Database backup completed: {backup_file}")
# Upload to S3 if configured
if self.s3_bucket:
self._upload_to_s3(backup_file)
return backup_file
except Exception as e:
logger.error(f"Database backup failed: {e}")
raise
def backup_rl_data(self, rl_data_dir: str) -> str:
"""Backup RL Q-table and training data."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{self.backup_dir}/rl_backup_{timestamp}.tar.gz"
logger.info(f"Starting RL data backup: {backup_file}")
try:
import tarfile
with tarfile.open(backup_file, "w:gz") as tar:
tar.add(rl_data_dir, arcname="rl_data")
logger.info(f"RL data backup completed: {backup_file}")
# Upload to S3 if configured
if self.s3_bucket:
self._upload_to_s3(backup_file)
return backup_file
except Exception as e:
logger.error(f"RL data backup failed: {e}")
raise
def backup_configuration(self, config_dir: str) -> str:
"""Backup configuration files."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{self.backup_dir}/config_backup_{timestamp}.tar.gz"
logger.info(f"Starting configuration backup: {backup_file}")
try:
import tarfile
with tarfile.open(backup_file, "w:gz") as tar:
tar.add(config_dir, arcname="config")
logger.info(f"Configuration backup completed: {backup_file}")
# Upload to S3 if configured
if self.s3_bucket:
self._upload_to_s3(backup_file)
return backup_file
except Exception as e:
logger.error(f"Configuration backup failed: {e}")
raise
def _upload_to_s3(self, file_path: str):
"""Upload backup to S3."""
try:
file_name = os.path.basename(file_path)
s3_key = f"backups/{file_name}"
logger.info(f"Uploading to S3: {s3_key}")
self.s3_client.upload_file(
file_path,
self.s3_bucket,
s3_key
)
logger.info(f"S3 upload completed: {s3_key}")
except Exception as e:
logger.error(f"S3 upload failed: {e}")
raise
def list_backups(self) -> List[Dict[str, Any]]:
"""List all backups."""
backups = []
for file_name in os.listdir(self.backup_dir):
file_path = os.path.join(self.backup_dir, file_name)
if os.path.isfile(file_path):
stat = os.stat(file_path)
backups.append({
"file": file_name,
"path": file_path,
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime)
})
return sorted(backups, key=lambda x: x["created"], reverse=True)
def cleanup_old_backups(self):
"""Remove backups older than retention period."""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
logger.info(f"Cleaning up backups older than {cutoff_date}")
for backup in self.list_backups():
if backup["created"] < cutoff_date:
logger.info(f"Removing old backup: {backup['file']}")
os.remove(backup["path"])
def restore_database(self, backup_file: str, db_url: str):
"""Restore database from backup."""
logger.info(f"Restoring database from: {backup_file}")
try:
import subprocess
subprocess.run([
"psql",
db_url,
"-f", backup_file
], check=True)
logger.info("Database restore completed")
except Exception as e:
logger.error(f"Database restore failed: {e}")
raise
def restore_rl_data(self, backup_file: str, target_dir: str):
"""Restore RL data from backup."""
logger.info(f"Restoring RL data from: {backup_file}")
try:
import tarfile
with tarfile.open(backup_file, "r:gz") as tar:
tar.extractall(path=target_dir)
logger.info("RL data restore completed")
except Exception as e:
logger.error(f"RL data restore failed: {e}")
raise
# Automated backup schedule
def schedule_backups():
"""Schedule automated backups."""
import schedule
backup_manager = BackupManager(
backup_dir="/backups",
s3_bucket="my-mcp-backups",
retention_days=30
)
# Daily database backup at 2 AM
schedule.every().day.at("02:00").do(
backup_manager.backup_database,
db_url=config.database_url
)
# Daily RL data backup at 3 AM
schedule.every().day.at("03:00").do(
backup_manager.backup_rl_data,
rl_data_dir="/app/rl_data"
)
# Weekly configuration backup on Sunday at 1 AM
schedule.every().sunday.at("01:00").do(
backup_manager.backup_configuration,
config_dir="/app/config"
)
# Daily cleanup of old backups at 4 AM
schedule.every().day.at("04:00").do(
backup_manager.cleanup_old_backups
)
logger.info("Backup schedule configured")
# Run scheduled tasks
while True:
schedule.run_pending()
time.sleep(60)
Disaster Recovery Plan
# disaster_recovery_plan.yaml
disaster_recovery:
rto: 4 hours # Recovery Time Objective
rpo: 1 hour # Recovery Point Objective
scenarios:
- name: Database Failure
impact: High
probability: Medium
recovery_steps:
- Verify database is down
- Promote replica to primary
- Update DNS/connection strings
- Verify application connectivity
- Start new replica
estimated_time: 30 minutes
- name: Complete Region Failure
impact: Critical
probability: Low
recovery_steps:
- Verify region is down
- Route traffic to backup region
- Update DNS records
- Verify all services operational
- Monitor performance
- Investigate root cause
estimated_time: 2 hours
- name: Data Corruption
impact: High
probability: Low
recovery_steps:
- Identify corruption extent
- Stop write operations
- Restore from latest backup
- Replay transaction logs
- Verify data integrity
- Resume operations
estimated_time: 4 hours
- name: Application Crash
impact: Medium
probability: Medium
recovery_steps:
- Review logs and metrics
- Identify crash cause
- Rollback to previous version
- Verify health checks pass
- Route traffic back
estimated_time: 15 minutes
backup_procedures:
- type: Database
frequency: Daily
retention: 30 days
location: S3
encryption: AES-256
- type: RL Data
frequency: Daily
retention: 30 days
location: S3
encryption: AES-256
- type: Configuration
frequency: Weekly
retention: 90 days
location: S3 + Git
encryption: AES-256
testing:
- Full DR test: Quarterly
- Partial failover test: Monthly
- Backup restore test: Monthly
Operational Procedures
Deployment Runbook
# Deployment Runbook
## Pre-Deployment Checklist
- [ ] Code reviewed and approved
- [ ] All tests passing (unit, integration, e2e)
- [ ] Security scan completed
- [ ] Performance testing completed
- [ ] Documentation updated
- [ ] Rollback plan prepared
- [ ] Stakeholders notified
- [ ] Maintenance window scheduled (if needed)
## Deployment Steps
### 1. Pre-Deployment
```bash
# Verify current state
kubectl get pods -n production
kubectl get deployments -n production
# Create backup
python scripts/backup.py --type all
# Tag release
git tag -a v1.2.0 -m "Release v1.2.0"
git push origin v1.2.0
2. Build and Push Image
# Build Docker image
docker build -t myregistry/mcp-agent:v1.2.0 .
# Run security scan
docker scan myregistry/mcp-agent:v1.2.0
# Push to registry
docker push myregistry/mcp-agent:v1.2.0
3. Deploy to Staging
# Deploy to staging
kubectl set image deployment/mcp-agent \
mcp-agent=myregistry/mcp-agent:v1.2.0 \
-n staging
# Wait for rollout
kubectl rollout status deployment/mcp-agent -n staging
# Run smoke tests
python tests/smoke_tests.py --env staging
4. Deploy to Production (Canary)
# Deploy canary (10% traffic)
kubectl apply -f k8s/canary-deployment.yaml
# Monitor metrics for 30 minutes
kubectl top pods -n production
# Check Grafana dashboards
# Increase to 50%
kubectl patch deployment mcp-agent-canary \
-p '{"spec":{"replicas":5}}' -n production
# Monitor for 30 minutes
# Full rollout
kubectl set image deployment/mcp-agent \
mcp-agent=myregistry/mcp-agent:v1.2.0 \
-n production
# Wait for rollout
kubectl rollout status deployment/mcp-agent -n production
5. Post-Deployment Verification
# Check health endpoints
curl https://api.example.com/health
curl https://api.example.com/ready
# Verify metrics
curl https://api.example.com/metrics
# Check logs
kubectl logs -f deployment/mcp-agent -n production
# Run integration tests
python tests/integration_tests.py --env production
6. Rollback (If Needed)
# Quick rollback
kubectl rollout undo deployment/mcp-agent -n production
# Or rollback to specific revision
kubectl rollout undo deployment/mcp-agent \
--to-revision=2 -n production
# Verify rollback
kubectl rollout status deployment/mcp-agent -n production
Post-Deployment
- Update status page
- Notify stakeholders
- Update documentation
- Create post-mortem (if issues occurred)
- Update runbook with lessons learned
### Incident Response Procedure
```python
"""
incident_response.py - Incident response automation.
"""
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class Severity(Enum):
"""Incident severity levels."""
P1 = "critical" # Service down
P2 = "high" # Major functionality impaired
P3 = "medium" # Minor functionality impaired
P4 = "low" # Minimal impact
class IncidentStatus(Enum):
"""Incident status."""
DETECTED = "detected"
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
RESOLVING = "resolving"
RESOLVED = "resolved"
CLOSED = "closed"
@dataclass
class Incident:
"""Incident record."""
id: str
title: str
severity: Severity
status: IncidentStatus
detected_at: datetime
description: str
affected_services: List[str]
assigned_to: str = None
resolved_at: datetime = None
root_cause: str = None
remediation: str = None
class IncidentManager:
"""Manage incident response."""
def __init__(self):
self.incidents: Dict[str, Incident] = {}
self.on_call_rotation: List[str] = []
def create_incident(
self,
title: str,
severity: Severity,
description: str,
affected_services: List[str]
) -> Incident:
"""Create new incident."""
incident_id = f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
incident = Incident(
id=incident_id,
title=title,
severity=severity,
status=IncidentStatus.DETECTED,
detected_at=datetime.now(),
description=description,
affected_services=affected_services
)
self.incidents[incident_id] = incident
logger.critical(
f"Incident created: {incident_id} - {title}",
extra={
"incident_id": incident_id,
"severity": severity.value,
"services": affected_services
}
)
# Notify on-call engineer
self._notify_on_call(incident)
# Auto-remediation for known issues
self._attempt_auto_remediation(incident)
return incident
def _notify_on_call(self, incident: Incident):
"""Notify on-call engineer."""
if not self.on_call_rotation:
logger.error("No on-call engineer configured")
return
engineer = self.on_call_rotation[0]
logger.info(f"Notifying on-call engineer: {engineer}")
# Send notifications (PagerDuty, Slack, Email, SMS)
# Implementation depends on notification service
incident.assigned_to = engineer
def _attempt_auto_remediation(self, incident: Incident):
"""Attempt automatic remediation."""
logger.info(f"Attempting auto-remediation for {incident.id}")
# Common auto-remediation patterns
if "database connection" in incident.description.lower():
self._restart_database_connection_pool()
elif "high latency" in incident.description.lower():
self._scale_up_instances()
elif "memory leak" in incident.description.lower():
self._restart_affected_pods()
elif "rate limit" in incident.description.lower():
self._increase_rate_limits()
def _restart_database_connection_pool(self):
"""Restart database connection pool."""
logger.info("Restarting database connection pool")
# Implementation specific to your setup
def _scale_up_instances(self):
"""Scale up instances."""
logger.info("Scaling up instances")
# kubectl scale deployment mcp-agent --replicas=10 -n production
def _restart_affected_pods(self):
"""Restart affected pods."""
logger.info("Restarting affected pods")
# kubectl rollout restart deployment mcp-agent -n production
def _increase_rate_limits(self):
"""Temporarily increase rate limits."""
logger.info("Increasing rate limits")
# Update rate limiter configuration
def update_status(self, incident_id: str, status: IncidentStatus):
"""Update incident status."""
if incident_id not in self.incidents:
raise ValueError(f"Incident not found: {incident_id}")
incident = self.incidents[incident_id]
incident.status = status
logger.info(
f"Incident {incident_id} status updated to {status.value}",
extra={"incident_id": incident_id, "status": status.value}
)
def resolve_incident(
self,
incident_id: str,
root_cause: str,
remediation: str
):
"""Resolve incident."""
if incident_id not in self.incidents:
raise ValueError(f"Incident not found: {incident_id}")
incident = self.incidents[incident_id]
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = datetime.now()
incident.root_cause = root_cause
incident.remediation = remediation
duration = incident.resolved_at - incident.detected_at
logger.info(
f"Incident {incident_id} resolved",
extra={
"incident_id": incident_id,
"duration_seconds": duration.total_seconds(),
"root_cause": root_cause
}
)
def generate_post_mortem(self, incident_id: str) -> str:
"""Generate post-mortem report."""
if incident_id not in self.incidents:
raise ValueError(f"Incident not found: {incident_id}")
incident = self.incidents[incident_id]
duration = (
incident.resolved_at - incident.detected_at
if incident.resolved_at else None
)
report = f"""
# Post-Mortem: {incident.title}
**Incident ID:** {incident.id}
**Severity:** {incident.severity.value}
**Detected:** {incident.detected_at}
**Resolved:** {incident.resolved_at}
**Duration:** {duration}
## Summary
{incident.description}
## Impact
**Affected Services:**
{chr(10).join(f'- {service}' for service in incident.affected_services)}
## Root Cause
{incident.root_cause or 'TBD'}
## Resolution
{incident.remediation or 'TBD'}
## Timeline
- {incident.detected_at}: Incident detected
- {incident.detected_at}: On-call engineer notified
- {incident.resolved_at}: Incident resolved
## Action Items
- [ ] Update monitoring to detect similar issues earlier
- [ ] Implement additional safeguards
- [ ] Update runbooks
- [ ] Schedule review meeting
## Lessons Learned
TBD - To be filled during post-mortem review meeting
"""
return report
# Global incident manager
incident_manager = IncidentManager()
Cost Optimization
Resource Right-Sizing
"""
cost_optimization.py - Cost optimization strategies.
"""
import logging
from typing import Dict, List, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class CostOptimizer:
"""Optimize infrastructure costs."""
def __init__(self):
self.metrics_history: List[Dict] = []
self.recommendations: List[Dict] = []
def analyze_resource_usage(self, lookback_days: int = 7) -> Dict[str, Any]:
"""Analyze resource usage patterns."""
logger.info(f"Analyzing resource usage for past {lookback_days} days")
analysis = {
"cpu_utilization": {
"average": 0.0,
"peak": 0.0,
"idle_periods": []
},
"memory_utilization": {
"average": 0.0,
"peak": 0.0,
"idle_periods": []
},
"request_patterns": {
"peak_hours": [],
"low_traffic_hours": []
}
}
# Analyze metrics from monitoring system
# This would integrate with Prometheus/CloudWatch
return analysis
def generate_recommendations(self) -> List[Dict[str, Any]]:
"""Generate cost optimization recommendations."""
recommendations = []
# 1. Right-size instances
analysis = self.analyze_resource_usage()
if analysis["cpu_utilization"]["average"] < 30:
recommendations.append({
"type": "downsize",
"resource": "cpu",
"current": "2000m",
"recommended": "1000m",
"estimated_savings": "$200/month",
"impact": "low"
})
if analysis["memory_utilization"]["average"] < 40:
recommendations.append({
"type": "downsize",
"resource": "memory",
"current": "4Gi",
"recommended": "2Gi",
"estimated_savings": "$150/month",
"impact": "low"
})
# 2. Auto-scaling optimization
if len(analysis["request_patterns"]["low_traffic_hours"]) > 0:
recommendations.append({
"type": "auto_scaling",
"resource": "pods",
"suggestion": "Reduce min replicas during off-peak hours",
"current_min": 3,
"recommended_min": 1,
"estimated_savings": "$300/month",
"impact": "medium"
})
# 3. Reserved instances
recommendations.append({
"type": "reserved_instances",
"suggestion": "Purchase 1-year reserved instances for base load",
"estimated_savings": "$500/month",
"commitment": "1 year",
"impact": "none"
})
# 4. Spot instances
recommendations.append({
"type": "spot_instances",
"suggestion": "Use spot instances for non-critical workloads",
"estimated_savings": "$400/month",
"impact": "low"
})
# 5. Cache optimization
recommendations.append({
"type": "caching",
"suggestion": "Increase Redis cache TTL for static data",
"current_ttl": "1 hour",
"recommended_ttl": "6 hours",
"estimated_savings": "$50/month (reduced LLM API calls)",
"impact": "none"
})
self.recommendations = recommendations
return recommendations
def implement_recommendation(self, recommendation: Dict[str, Any]):
"""Implement cost optimization recommendation."""
rec_type = recommendation["type"]
logger.info(f"Implementing recommendation: {rec_type}")
if rec_type == "downsize":
self._downsize_resource(recommendation)
elif rec_type == "auto_scaling":
self._optimize_autoscaling(recommendation)
elif rec_type == "caching":
self._optimize_caching(recommendation)
logger.info(f"Recommendation implemented: {rec_type}")
def _downsize_resource(self, recommendation: Dict):
"""Downsize resource allocation."""
# Update Kubernetes deployment with new resource limits
pass
def _optimize_autoscaling(self, recommendation: Dict):
"""Optimize auto-scaling configuration."""
# Update HPA configuration
pass
def _optimize_caching(self, recommendation: Dict):
"""Optimize caching strategy."""
# Update cache TTL configuration
pass
def generate_cost_report(self) -> str:
"""Generate cost report."""
recommendations = self.generate_recommendations()
total_savings = sum(
float(rec["estimated_savings"].replace("$", "").replace("/month", ""))
for rec in recommendations
if "estimated_savings" in rec
)
report = f"""
# Cost Optimization Report
Generated: {datetime.now()}
## Current Monthly Costs
- Compute: $1,500
- Storage: $300
- Data Transfer: $200
- LLM API: $800
- **Total: $2,800/month**
## Optimization Opportunities
Total Potential Savings: **${total_savings:.2f}/month** ({(total_savings/2800)*100:.1f}%)
"""
for i, rec in enumerate(recommendations, 1):
report += f"""
### {i}. {rec['type'].replace('_', ' ').title()}
**Suggestion:** {rec.get('suggestion', 'Optimize ' + rec['type'])}
**Estimated Savings:** {rec.get('estimated_savings', 'TBD')}
**Impact:** {rec.get('impact', 'TBD')}
"""
report += """
## Recommendations Priority
1. Reserved Instances (High savings, no impact)
2. Spot Instances (High savings, low impact)
3. Auto-scaling optimization (Medium savings, medium impact)
4. Resource right-sizing (Medium savings, low impact)
5. Cache optimization (Low savings, no impact)
## Next Steps
1. Review and approve recommendations
2. Implement in staging environment
3. Monitor for 1 week
4. Roll out to production
5. Track actual savings
"""
return report
Auto-Scaling Policies
# advanced_autoscaling.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-agent-hpa-advanced
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-agent
# Dynamic scaling based on time of day
minReplicas: 2
maxReplicas: 20
metrics:
# CPU-based scaling
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# Memory-based scaling
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# Custom metric: Request rate
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: "100"
# Custom metric: Queue depth
- type: Object
object:
metric:
name: queue_depth
describedObject:
apiVersion: v1
kind: Service
name: mcp-agent-service
target:
type: Value
value: "30"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
- type: Pods
value: 1
periodSeconds: 60
selectPolicy: Min
---
# Vertical Pod Autoscaler (VPA)
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: mcp-agent-vpa
namespace: production
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-agent
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: mcp-agent
minAllowed:
cpu: 500m
memory: 1Gi
maxAllowed:
cpu: 4000m
memory: 8Gi
Complete Production Examples
Full Production Setup
"""
production_setup.py - Complete production setup example.
"""
import os
import logging
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# Import production components
from azcore.mcp import MCPTeamBuilder
from azcore.rl import RLManager
from production_config import ProductionConfig
from production_logging import ProductionLogger
from production_metrics import start_metrics_server, track_request
from production_health import HealthChecker
from production_cache import ProductionCache
from backup_strategy import BackupManager
from incident_response import IncidentManager
from cost_optimization import CostOptimizer
# Initialize logger
logger = ProductionLogger(
name="mcp_production",
level="INFO",
log_file="/app/logs/mcp.log"
)
# Load configuration
config = ProductionConfig()
def build_production_team():
"""Build production-ready MCP team."""
logger.logger.info("Building production MCP team")
# Initialize LLM with production settings
llm = ChatOpenAI(
model=config.llm_model,
temperature=config.llm_temperature,
max_tokens=config.llm_max_tokens,
timeout=config.llm_timeout,
max_retries=config.llm_max_retries,
api_key=config.openai_api_key
)
# Initialize RL Manager
rl_manager = RLManager(
tool_names=[], # Will be populated after team building
q_table_path=config.rl_q_table_path,
exploration_rate=config.rl_exploration_rate,
learning_rate=config.rl_learning_rate,
use_embeddings=True
) if config.enable_rl else None
# Build MCP team
team = (MCPTeamBuilder("production_team")
.with_llm(llm)
# GitHub server
.with_mcp_server(
"npx",
["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_TOKEN": config.github_token},
timeout=30
)
# Filesystem server
.with_mcp_server(
"npx",
["-y", "@modelcontextprotocol/server-filesystem",
"/app/data"],
timeout=30
)
# PostgreSQL server (if configured)
.with_mcp_server(
"npx",
["-y", "@modelcontextprotocol/server-postgres"],
env={"DATABASE_URL": config.database_url},
timeout=30
) if config.database_url else MCPTeamBuilder("production_team")
# Custom analytics server (example)
.with_mcp_server(
"python",
["/app/servers/analytics_server.py"],
env={"ENVIRONMENT": "production"},
timeout=30
)
# Configuration
.skip_failed_servers(True) # Graceful degradation
# Prompt
.with_prompt("""You are a production AI assistant with access to multiple tools.
Available capabilities:
- GitHub: Repository management, issues, PRs
- Filesystem: Read/write files
- Database: Query and update data
- Analytics: Generate reports and insights
Always:
1. Verify inputs before executing actions
2. Log all operations
3. Handle errors gracefully
4. Provide clear status updates
""")
# RL Manager (if enabled)
.with_rl_manager(rl_manager) if config.enable_rl else MCPTeamBuilder("production_team")
.build()
)
logger.logger.info(f"Production team built with {team.get_mcp_server_count()} servers")
return team
def setup_monitoring(team):
"""Setup monitoring and health checks."""
logger.logger.info("Setting up monitoring")
# Start metrics server
if config.enable_metrics:
start_metrics_server(port=config.metrics_port)
# Initialize health checker
health_checker = HealthChecker(team)
# Schedule periodic health checks
import schedule
schedule.every(config.health_check_interval).seconds.do(
health_checker.check_components
)
logger.logger.info("Monitoring setup complete")
return health_checker
def setup_backups():
"""Setup automated backups."""
logger.logger.info("Setting up backups")
backup_manager = BackupManager(
backup_dir="/backups",
s3_bucket=os.getenv("BACKUP_S3_BUCKET"),
retention_days=30
)
# Schedule backups
import schedule
# Daily database backup
if config.database_url:
schedule.every().day.at("02:00").do(
backup_manager.backup_database,
db_url=config.database_url
)
# Daily RL data backup
if config.enable_rl:
schedule.every().day.at("03:00").do(
backup_manager.backup_rl_data,
rl_data_dir="/app/rl_data"
)
# Daily cleanup
schedule.every().day.at("04:00").do(
backup_manager.cleanup_old_backups
)
logger.logger.info("Backup schedule configured")
return backup_manager
def setup_cost_optimization():
"""Setup cost optimization."""
logger.logger.info("Setting up cost optimization")
optimizer = CostOptimizer()
# Weekly cost analysis
import schedule
schedule.every().monday.at("09:00").do(
lambda: logger.logger.info(optimizer.generate_cost_report())
)
return optimizer
@track_request("POST", "/execute")
def handle_request(request: dict):
"""Handle incoming request."""
request_id = request.get("id", "unknown")
logger.log_request(
request_id=request_id,
method="POST",
path="/execute",
task=request.get("task")
)
try:
# Get cached result if available
if config.enable_caching and cache:
cached_result = cache.get(f"request:{request_id}")
if cached_result:
logger.logger.info(f"Cache hit for request {request_id}")
return cached_result
# Execute request
result = team({
"messages": [
HumanMessage(content=request.get("task", ""))
]
})
# Cache result
if config.enable_caching and cache:
cache.set(f"request:{request_id}", result, ttl=config.cache_ttl)
logger.log_response(
request_id=request_id,
status_code=200,
duration_ms=0 # Would be measured in actual implementation
)
return result
except Exception as e:
logger.log_error(
request_id=request_id,
error=e
)
# Create incident for critical errors
if config.is_production():
incident_manager.create_incident(
title=f"Request execution failed: {request_id}",
severity=Severity.P3,
description=str(e),
affected_services=["mcp-agent"]
)
raise
def main():
"""Main production setup."""
logger.logger.info("Starting production MCP agent")
logger.logger.info(f"Environment: {config.environment.value}")
# Build team
global team
team = build_production_team()
# Setup cache
global cache
cache = ProductionCache(
redis_url=config.redis_url,
ttl=config.cache_ttl
) if config.enable_caching and config.redis_url else None
# Setup monitoring
health_checker = setup_monitoring(team)
# Setup backups
backup_manager = setup_backups()
# Setup cost optimization
optimizer = setup_cost_optimization()
# Setup incident management
global incident_manager
incident_manager = IncidentManager()
logger.logger.info("Production MCP agent ready")
# Start FastAPI server (from production_health.py)
import uvicorn
uvicorn.run(
"production_health:app",
host="0.0.0.0",
port=8000,
workers=4,
log_level="info"
)
if __name__ == "__main__":
main()
Complete Kubernetes Production Deployment
#!/bin/bash
# deploy_production.sh - Complete production deployment script
set -e
echo "=== Production Deployment Script ==="
# Configuration
NAMESPACE="production"
IMAGE_TAG="${1:-latest}"
REGISTRY="myregistry"
APP_NAME="mcp-agent"
# 1. Pre-deployment checks
echo "1. Running pre-deployment checks..."
# Verify kubectl access
kubectl cluster-info
# Verify namespace exists
kubectl get namespace $NAMESPACE || kubectl create namespace $NAMESPACE
# Run tests
python -m pytest tests/
echo "✓ Tests passed"
# 2. Build and push image
echo "2. Building Docker image..."
docker build -t $REGISTRY/$APP_NAME:$IMAGE_TAG .
echo "Running security scan..."
docker scan $REGISTRY/$APP_NAME:$IMAGE_TAG || true
echo "Pushing image to registry..."
docker push $REGISTRY/$APP_NAME:$IMAGE_TAG
echo "✓ Image pushed"
# 3. Create/update secrets
echo "3. Managing secrets..."
kubectl create secret generic mcp-secrets \
--from-env-file=.env.production \
--namespace=$NAMESPACE \
--dry-run=client -o yaml | kubectl apply -f -
echo "✓ Secrets updated"
# 4. Apply configurations
echo "4. Applying configurations..."
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/service.yaml
echo "✓ Configurations applied"
# 5. Deploy application
echo "5. Deploying application..."
# Update deployment with new image
kubectl set image deployment/$APP_NAME \
$APP_NAME=$REGISTRY/$APP_NAME:$IMAGE_TAG \
--namespace=$NAMESPACE
# Or apply full deployment
kubectl apply -f k8s/deployment.yaml
echo "Waiting for rollout..."
kubectl rollout status deployment/$APP_NAME -n $NAMESPACE
echo "✓ Deployment complete"
# 6. Apply autoscaling
echo "6. Applying autoscaling..."
kubectl apply -f k8s/hpa.yaml
echo "✓ Autoscaling configured"
# 7. Verify deployment
echo "7. Verifying deployment..."
# Check pods
kubectl get pods -n $NAMESPACE
# Check health endpoint
EXTERNAL_IP=$(kubectl get service $APP_NAME-service -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
if [ -n "$EXTERNAL_IP" ]; then
echo "Checking health endpoint..."
curl -f http://$EXTERNAL_IP/health || echo "Warning: Health check failed"
fi
echo "✓ Deployment verified"
# 8. Post-deployment tasks
echo "8. Running post-deployment tasks..."
# Create backup
kubectl exec -n $NAMESPACE deployment/$APP_NAME -- python scripts/backup.py
# Update monitoring dashboards
echo "TODO: Update Grafana dashboards"
echo "✓ Post-deployment tasks complete"
echo ""
echo "=== Deployment Complete ==="
echo "Namespace: $NAMESPACE"
echo "Image: $REGISTRY/$APP_NAME:$IMAGE_TAG"
echo "External IP: $EXTERNAL_IP"
echo ""
echo "Monitor deployment:"
echo " kubectl get pods -n $NAMESPACE -w"
echo " kubectl logs -f deployment/$APP_NAME -n $NAMESPACE"
echo ""
echo "Rollback if needed:"
echo " kubectl rollout undo deployment/$APP_NAME -n $NAMESPACE"
Summary
Production deployment requires comprehensive planning and implementation across multiple areas:
Key Components
-
Infrastructure
- Docker containerization
- Kubernetes orchestration
- Multi-region deployment
- Database replication
-
Configuration
- Environment-based settings
- Secrets management (AWS Secrets Manager, Vault)
- Pydantic validation
- Dynamic configuration
-
Deployment
- Blue-green deployment
- Canary deployment
- Automated rollback
- CI/CD integration
-
Monitoring
- Structured JSON logging
- Prometheus metrics
- Health checks (liveness, readiness)
- Distributed tracing
-
Scaling
- Horizontal pod autoscaling (HPA)
- Vertical pod autoscaling (VPA)
- Load balancing (round-robin, least connections, consistent hashing)
- Redis caching
-
Security
- JWT authentication
- Role-based access control
- Rate limiting
- SSL/TLS encryption
- Network policies
-
High Availability
- Multi-region deployment
- Database replication
- Automated failover
- Redundant load balancers
-
Disaster Recovery
- Automated backups (database, RL data, configuration)
- S3 backup storage
- Restore procedures
- RTO/RPO targets
- DR testing schedule
-
Operations
- Deployment runbooks
- Incident response procedures
- Automated remediation
- Post-mortem process
- On-call rotation
-
Cost Optimization
- Resource right-sizing
- Reserved instances
- Spot instances
- Cache optimization
- Auto-scaling policies
Best Practices
- Start small, scale gradually: Begin with basic setup, add complexity as needed
- Automate everything: Deployments, backups, monitoring, incident response
- Monitor continuously: Logs, metrics, traces, health checks
- Plan for failure: Circuit breakers, retries, fallbacks, graceful degradation
- Test disaster recovery: Regular DR drills, backup restore tests
- Document thoroughly: Runbooks, architecture diagrams, API docs
- Optimize costs: Regular cost analysis, right-sizing, reserved instances
- Secure by default: Authentication, encryption, least privilege
Production Checklist
Before going to production:
- All infrastructure provisioned and tested
- Secrets management configured
- Monitoring and alerting set up
- Backup and restore procedures tested
- Disaster recovery plan documented
- Load testing completed
- Security audit passed
- Documentation complete
- Runbooks created
- On-call rotation established
- Stakeholders trained
- Rollback procedure tested
Next Steps
- Review this guide and adapt to your specific requirements
- Set up staging environment first
- Run load tests and validate performance
- Conduct security audit
- Test disaster recovery procedures
- Train team on operational procedures
- Deploy to production with monitoring
- Continuously improve based on operational feedback