• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Model Context Protocol (MCP)

Production Deployment

Complete guide to deploying MCP-enabled agents in production environments.

Comprehensive guide to deploying, scaling, and operating MCP-enabled agents in production environments with best practices for reliability, security, and performance.


Overview

Production deployment of MCP-enabled agents requires careful consideration of reliability, security, performance, and operational requirements. This guide provides battle-tested patterns for production success.

Production vs. Development

# ❌ Development configuration - Not production ready
dev_team = (MCPTeamBuilder("dev_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server("python", ["server.py"])
    .build()
)

# ✅ Production configuration - Hardened and monitored
prod_team = (MCPTeamBuilder("prod_team")
    .with_llm(ChatOpenAI(
        model="gpt-4o-mini",
        temperature=0.7,
        timeout=30,
        max_retries=3,
        request_timeout=60
    ))
    .with_mcp_server(
        "python",
        ["/opt/mcp/servers/server.py"],
        env={
            "LOG_LEVEL": "INFO",
            "MAX_RETRIES": "3",
            "TIMEOUT": "30",
            "ENVIRONMENT": "production"
        },
        timeout=30
    )
    .skip_failed_servers(True)  # Graceful degradation
    .build()
)

Key Production Requirements

  1. Reliability: 99.9%+ uptime, fault tolerance
  2. Security: Authentication, encryption, audit logs
  3. Performance: Low latency, high throughput
  4. Monitoring: Metrics, logs, alerts
  5. Scalability: Handle increasing load
  6. Maintainability: Easy updates and debugging

Pre-Production Checklist

Infrastructure Checklist

# production_checklist.yaml
infrastructure:
  compute:
    - [ ] Production servers provisioned
    - [ ] Resource limits configured (CPU, memory)
    - [ ] Auto-scaling configured
    - [ ] Load balancer set up

  networking:
    - [ ] VPC/network configured
    - [ ] Firewall rules configured
    - [ ] SSL/TLS certificates installed
    - [ ] DNS configured

  storage:
    - [ ] Database provisioned and backed up
    - [ ] File storage configured
    - [ ] Backup strategy implemented
    - [ ] Data retention policies defined

security:
  - [ ] Secrets management configured
  - [ ] API keys rotated
  - [ ] SSL/TLS enabled
  - [ ] Rate limiting configured
  - [ ] WAF/DDoS protection enabled
  - [ ] Security audit completed
  - [ ] Compliance requirements met

monitoring:
  - [ ] Logging infrastructure set up
  - [ ] Metrics collection configured
  - [ ] Alerting rules defined
  - [ ] Dashboard created
  - [ ] On-call rotation established
  - [ ] Runbook created

deployment:
  - [ ] CI/CD pipeline configured
  - [ ] Blue-green/canary deployment ready
  - [ ] Rollback procedure tested
  - [ ] Health checks implemented
  - [ ] Load testing completed
  - [ ] Disaster recovery tested

documentation:
  - [ ] Architecture documented
  - [ ] API documentation complete
  - [ ] Operational runbooks created
  - [ ] Incident response plan defined
  - [ ] SLA defined and communicated

Code Quality Checklist

"""
Pre-production code quality checklist.
"""

def production_readiness_check():
    """Verify production readiness."""

    checks = {
        "error_handling": False,
        "logging": False,
        "monitoring": False,
        "configuration": False,
        "security": False,
        "testing": False,
        "documentation": False
    }

    # 1. Error handling
    try:
        # Check comprehensive error handling
        checks["error_handling"] = verify_error_handling()
    except:
        pass

    # 2. Logging
    try:
        # Check structured logging
        checks["logging"] = verify_logging_setup()
    except:
        pass

    # 3. Monitoring
    try:
        # Check metrics and health endpoints
        checks["monitoring"] = verify_monitoring()
    except:
        pass

    # 4. Configuration
    try:
        # Check environment-based config
        checks["configuration"] = verify_configuration()
    except:
        pass

    # 5. Security
    try:
        # Check secrets management
        checks["security"] = verify_security()
    except:
        pass

    # 6. Testing
    try:
        # Check test coverage
        checks["testing"] = verify_test_coverage()
    except:
        pass

    # 7. Documentation
    try:
        # Check documentation completeness
        checks["documentation"] = verify_documentation()
    except:
        pass

    # Report results
    passed = sum(checks.values())
    total = len(checks)

    print(f"Production Readiness: {passed}/{total} checks passed")
    for check, status in checks.items():
        status_str = "✅" if status else "❌"
        print(f"{status_str} {check}")

    return all(checks.values())

Infrastructure Setup

Docker Deployment

# Dockerfile
FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    git \
    && rm -rf /var/lib/apt/lists/*

# Install Node.js (for npx-based MCP servers)
RUN curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
    && apt-get install -y nodejs \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements
COPY requirements.txt .

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create necessary directories
RUN mkdir -p /app/logs /app/data /app/rl_data

# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=INFO
ENV ENVIRONMENT=production

# Expose port (if using HTTP/SSE)
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# Run application
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'

services:
  mcp-agent:
    build: .
    container_name: mcp-agent-prod
    restart: unless-stopped

    ports:
      - "8000:8000"

    environment:
      - LOG_LEVEL=INFO
      - ENVIRONMENT=production
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - GITHUB_TOKEN=${GITHUB_TOKEN}
      - DATABASE_URL=${DATABASE_URL}

    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
      - ./rl_data:/app/rl_data
      - ./config:/app/config:ro

    networks:
      - mcp-network

    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G
        reservations:
          cpus: '1'
          memory: 2G

  # Redis for caching (optional)
  redis:
    image: redis:7-alpine
    container_name: mcp-redis
    restart: unless-stopped

    networks:
      - mcp-network

    volumes:
      - redis-data:/data

    command: redis-server --appendonly yes

  # PostgreSQL for persistence (optional)
  postgres:
    image: postgres:15-alpine
    container_name: mcp-postgres
    restart: unless-stopped

    environment:
      - POSTGRES_DB=mcp_db
      - POSTGRES_USER=mcp_user
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}

    networks:
      - mcp-network

    volumes:
      - postgres-data:/var/lib/postgresql/data

    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mcp_user"]
      interval: 10s
      timeout: 5s
      retries: 5

networks:
  mcp-network:
    driver: bridge

volumes:
  redis-data:
  postgres-data:

Kubernetes Deployment

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-agent
  namespace: production
  labels:
    app: mcp-agent
    version: v1.0.0
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: mcp-agent
  template:
    metadata:
      labels:
        app: mcp-agent
        version: v1.0.0
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      containers:
      - name: mcp-agent
        image: myregistry/mcp-agent:v1.0.0
        imagePullPolicy: Always

        ports:
        - containerPort: 8000
          name: http
          protocol: TCP

        env:
        - name: LOG_LEVEL
          value: "INFO"
        - name: ENVIRONMENT
          value: "production"
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: openai-api-key
        - name: GITHUB_TOKEN
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: github-token

        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2000m"
            memory: "4Gi"

        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3

        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 3

        volumeMounts:
        - name: config
          mountPath: /app/config
          readOnly: true
        - name: data
          mountPath: /app/data
        - name: logs
          mountPath: /app/logs

      volumes:
      - name: config
        configMap:
          name: mcp-config
      - name: data
        persistentVolumeClaim:
          claimName: mcp-data-pvc
      - name: logs
        emptyDir: {}

      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000

---
# kubernetes/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: mcp-agent-service
  namespace: production
  labels:
    app: mcp-agent
spec:
  type: ClusterIP
  ports:
  - port: 80
    targetPort: 8000
    protocol: TCP
    name: http
  selector:
    app: mcp-agent

---
# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-agent-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-agent
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Configuration Management

Environment-Based Configuration

"""
production_config.py - Production configuration management.
"""

import os
from enum import Enum
from typing import Dict, Any, Optional
from pydantic import BaseSettings, Field, validator
import logging

logger = logging.getLogger(__name__)


class Environment(str, Enum):
    """Deployment environment."""
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"


class MCPServerConfig(BaseSettings):
    """Configuration for MCP server."""
    name: str
    command: Optional[str] = None
    args: list = []
    url: Optional[str] = None
    transport: str = "stdio"
    timeout: int = 30
    env_vars: Dict[str, str] = {}
    optional: bool = False

    class Config:
        env_prefix = "MCP_SERVER_"


class ProductionConfig(BaseSettings):
    """Production configuration."""

    # Environment
    environment: Environment = Field(default=Environment.PRODUCTION)
    debug: bool = Field(default=False)

    # LLM Configuration
    llm_model: str = Field(default="gpt-4o-mini")
    llm_temperature: float = Field(default=0.7)
    llm_max_tokens: int = Field(default=2000)
    llm_timeout: int = Field(default=60)
    llm_max_retries: int = Field(default=3)

    # API Keys (from environment)
    openai_api_key: str = Field(..., env="OPENAI_API_KEY")
    github_token: Optional[str] = Field(None, env="GITHUB_TOKEN")

    # Database
    database_url: Optional[str] = Field(None, env="DATABASE_URL")
    database_pool_size: int = Field(default=10)
    database_max_overflow: int = Field(default=20)

    # Redis
    redis_url: Optional[str] = Field(None, env="REDIS_URL")
    redis_ttl: int = Field(default=3600)

    # Logging
    log_level: str = Field(default="INFO")
    log_format: str = Field(default="json")
    log_file: Optional[str] = Field(default="/app/logs/mcp.log")

    # Monitoring
    enable_metrics: bool = Field(default=True)
    metrics_port: int = Field(default=9090)
    enable_tracing: bool = Field(default=True)
    tracing_endpoint: Optional[str] = Field(None, env="TRACING_ENDPOINT")

    # Performance
    max_concurrent_requests: int = Field(default=100)
    request_timeout: int = Field(default=300)
    enable_caching: bool = Field(default=True)
    cache_ttl: int = Field(default=3600)

    # Security
    enable_auth: bool = Field(default=True)
    jwt_secret: Optional[str] = Field(None, env="JWT_SECRET")
    allowed_origins: list = Field(default=["*"])
    rate_limit_per_minute: int = Field(default=60)

    # MCP Servers
    mcp_servers: list = Field(default_factory=list)

    # RL Configuration
    enable_rl: bool = Field(default=True)
    rl_exploration_rate: float = Field(default=0.15)
    rl_learning_rate: float = Field(default=0.1)
    rl_q_table_path: str = Field(default="/app/rl_data/q_table.pkl")

    # Health Check
    health_check_interval: int = Field(default=30)

    class Config:
        env_file = ".env.production"
        env_file_encoding = "utf-8"

    @validator("environment", pre=True)
    def validate_environment(cls, v):
        """Validate environment."""
        if isinstance(v, str):
            return Environment(v.lower())
        return v

    @validator("log_level")
    def validate_log_level(cls, v):
        """Validate log level."""
        valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        if v.upper() not in valid_levels:
            raise ValueError(f"Invalid log level. Must be one of {valid_levels}")
        return v.upper()

    def get_llm_config(self) -> Dict[str, Any]:
        """Get LLM configuration."""
        return {
            "model": self.llm_model,
            "temperature": self.llm_temperature,
            "max_tokens": self.llm_max_tokens,
            "timeout": self.llm_timeout,
            "max_retries": self.llm_max_retries,
            "api_key": self.openai_api_key
        }

    def get_database_url(self) -> str:
        """Get database connection URL."""
        if not self.database_url:
            raise ValueError("DATABASE_URL not configured")
        return self.database_url

    def is_production(self) -> bool:
        """Check if running in production."""
        return self.environment == Environment.PRODUCTION

    def get_log_config(self) -> Dict[str, Any]:
        """Get logging configuration."""
        return {
            "level": self.log_level,
            "format": self.log_format,
            "file": self.log_file
        }


def load_config() -> ProductionConfig:
    """Load production configuration."""
    try:
        config = ProductionConfig()
        logger.info(f"Loaded configuration for environment: {config.environment.value}")
        return config
    except Exception as e:
        logger.error(f"Failed to load configuration: {e}")
        raise


# Global configuration instance
config = load_config()

Secrets Management

"""
secrets_manager.py - Secure secrets management.
"""

import os
import boto3
from typing import Dict, Any, Optional
import logging

logger = logging.getLogger(__name__)


class SecretsManager:
    """Manage secrets securely."""

    def __init__(self, provider: str = "env"):
        """
        Initialize secrets manager.

        Args:
            provider: Secrets provider ('env', 'aws', 'vault', 'k8s')
        """
        self.provider = provider
        self._cache: Dict[str, str] = {}

        if provider == "aws":
            self.client = boto3.client('secretsmanager')
        elif provider == "vault":
            # Initialize HashiCorp Vault client
            pass
        elif provider == "k8s":
            # Initialize Kubernetes secrets client
            pass

    def get_secret(self, key: str, required: bool = True) -> Optional[str]:
        """
        Get secret value.

        Args:
            key: Secret key
            required: Whether secret is required

        Returns:
            Secret value or None
        """
        # Check cache first
        if key in self._cache:
            return self._cache[key]

        # Fetch from provider
        value = None

        if self.provider == "env":
            value = os.getenv(key)

        elif self.provider == "aws":
            try:
                response = self.client.get_secret_value(SecretId=key)
                value = response['SecretString']
            except Exception as e:
                logger.error(f"Failed to fetch secret from AWS: {e}")

        elif self.provider == "vault":
            # Fetch from Vault
            pass

        elif self.provider == "k8s":
            # Fetch from Kubernetes secrets
            pass

        # Validate
        if required and not value:
            raise ValueError(f"Required secret not found: {key}")

        # Cache
        if value:
            self._cache[key] = value

        return value

    def get_database_credentials(self) -> Dict[str, str]:
        """Get database credentials."""
        return {
            "host": self.get_secret("DB_HOST"),
            "port": self.get_secret("DB_PORT"),
            "database": self.get_secret("DB_NAME"),
            "user": self.get_secret("DB_USER"),
            "password": self.get_secret("DB_PASSWORD")
        }

    def get_api_keys(self) -> Dict[str, str]:
        """Get API keys."""
        return {
            "openai": self.get_secret("OPENAI_API_KEY"),
            "github": self.get_secret("GITHUB_TOKEN", required=False),
            "slack": self.get_secret("SLACK_BOT_TOKEN", required=False)
        }

    def rotate_secret(self, key: str, new_value: str):
        """Rotate a secret."""
        if self.provider == "aws":
            try:
                self.client.update_secret(SecretId=key, SecretString=new_value)
                logger.info(f"Rotated secret: {key}")
                # Clear cache
                if key in self._cache:
                    del self._cache[key]
            except Exception as e:
                logger.error(f"Failed to rotate secret: {e}")
                raise

    def clear_cache(self):
        """Clear secrets cache."""
        self._cache.clear()
        logger.info("Cleared secrets cache")


# Global secrets manager
secrets = SecretsManager(provider=os.getenv("SECRETS_PROVIDER", "env"))

Deployment Strategies

Blue-Green Deployment

"""
blue_green_deployment.py - Blue-green deployment strategy.
"""

import time
import logging
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class DeploymentColor(Enum):
    """Deployment color."""
    BLUE = "blue"
    GREEN = "green"


class BlueGreenDeployment:
    """Manage blue-green deployments."""

    def __init__(self):
        self.active_color = DeploymentColor.BLUE
        self.blue_team = None
        self.green_team = None

    def deploy_new_version(self, new_team):
        """Deploy new version to inactive environment."""
        inactive_color = self._get_inactive_color()
        logger.info(f"Deploying new version to {inactive_color.value} environment")

        if inactive_color == DeploymentColor.BLUE:
            self.blue_team = new_team
        else:
            self.green_team = new_team

        logger.info(f"New version deployed to {inactive_color.value}")

    def health_check(self, team) -> bool:
        """Perform health check on team."""
        try:
            # Test basic functionality
            result = team({
                "messages": [HumanMessage(content="health check")]
            })
            return True
        except Exception as e:
            logger.error(f"Health check failed: {e}")
            return False

    def switch_traffic(self):
        """Switch traffic to new version."""
        inactive_color = self._get_inactive_color()
        inactive_team = self._get_team(inactive_color)

        if not inactive_team:
            raise ValueError(f"No team deployed to {inactive_color.value}")

        # Health check before switching
        logger.info(f"Performing health check on {inactive_color.value}")
        if not self.health_check(inactive_team):
            raise RuntimeError(f"Health check failed for {inactive_color.value}")

        # Switch traffic
        logger.info(f"Switching traffic from {self.active_color.value} to {inactive_color.value}")
        self.active_color = inactive_color
        logger.info(f"Traffic switched to {self.active_color.value}")

    def rollback(self):
        """Rollback to previous version."""
        previous_color = self._get_inactive_color()
        logger.warning(f"Rolling back to {previous_color.value}")
        self.active_color = previous_color
        logger.info(f"Rolled back to {self.active_color.value}")

    def get_active_team(self):
        """Get currently active team."""
        return self._get_team(self.active_color)

    def _get_inactive_color(self) -> DeploymentColor:
        """Get inactive color."""
        return (
            DeploymentColor.GREEN
            if self.active_color == DeploymentColor.BLUE
            else DeploymentColor.BLUE
        )

    def _get_team(self, color: DeploymentColor):
        """Get team by color."""
        return self.blue_team if color == DeploymentColor.BLUE else self.green_team


# Usage example
def perform_blue_green_deployment():
    """Perform blue-green deployment."""
    deployment = BlueGreenDeployment()

    # Current active version (blue)
    logger.info("Current version running on blue")

    # Deploy new version to green
    logger.info("Building new version...")
    new_team = build_new_team_version()

    deployment.deploy_new_version(new_team)

    # Run smoke tests
    logger.info("Running smoke tests...")
    time.sleep(5)

    # Switch traffic
    try:
        deployment.switch_traffic()
        logger.info("Deployment successful!")
    except Exception as e:
        logger.error(f"Deployment failed: {e}")
        deployment.rollback()
        logger.info("Rolled back to previous version")

Canary Deployment

"""
canary_deployment.py - Canary deployment strategy.
"""

import random
import logging
from typing import List, Dict, Any

logger = logging.getLogger(__name__)


class CanaryDeployment:
    """Manage canary deployments."""

    def __init__(
        self,
        stable_team,
        canary_percentage: int = 10
    ):
        self.stable_team = stable_team
        self.canary_team = None
        self.canary_percentage = canary_percentage
        self.metrics: Dict[str, List[float]] = {
            "stable": [],
            "canary": []
        }

    def deploy_canary(self, canary_team):
        """Deploy canary version."""
        logger.info(f"Deploying canary with {self.canary_percentage}% traffic")
        self.canary_team = canary_team

    def route_request(self, request: Dict[str, Any]):
        """Route request to stable or canary."""
        # Decide which version to use
        use_canary = (
            self.canary_team is not None and
            random.random() * 100 < self.canary_percentage
        )

        team = self.canary_team if use_canary else self.stable_team
        version = "canary" if use_canary else "stable"

        logger.info(f"Routing to {version} version")

        # Execute request
        try:
            start_time = time.time()
            result = team(request)
            duration = time.time() - start_time

            # Record metrics
            self.metrics[version].append(duration)

            return {
                "result": result,
                "version": version,
                "duration": duration
            }

        except Exception as e:
            logger.error(f"Request failed on {version}: {e}")
            raise

    def increase_canary_traffic(self, increment: int = 10):
        """Gradually increase canary traffic."""
        if not self.canary_team:
            raise ValueError("No canary deployed")

        self.canary_percentage = min(100, self.canary_percentage + increment)
        logger.info(f"Increased canary traffic to {self.canary_percentage}%")

    def analyze_metrics(self) -> Dict[str, Any]:
        """Analyze performance metrics."""
        if not self.metrics["stable"] or not self.metrics["canary"]:
            return {"status": "insufficient_data"}

        stable_avg = sum(self.metrics["stable"]) / len(self.metrics["stable"])
        canary_avg = sum(self.metrics["canary"]) / len(self.metrics["canary"])

        # Calculate error rates (simplified)
        stable_errors = 0  # Track separately
        canary_errors = 0  # Track separately

        analysis = {
            "stable_avg_latency": stable_avg,
            "canary_avg_latency": canary_avg,
            "latency_diff_percent": ((canary_avg - stable_avg) / stable_avg) * 100,
            "stable_error_rate": stable_errors,
            "canary_error_rate": canary_errors,
            "recommendation": "proceed"
        }

        # Decision logic
        if canary_avg > stable_avg * 1.2:  # 20% slower
            analysis["recommendation"] = "rollback"
            analysis["reason"] = "Canary latency too high"
        elif canary_errors > stable_errors * 1.5:
            analysis["recommendation"] = "rollback"
            analysis["reason"] = "Canary error rate too high"

        return analysis

    def promote_canary(self):
        """Promote canary to stable."""
        if not self.canary_team:
            raise ValueError("No canary to promote")

        logger.info("Promoting canary to stable")
        self.stable_team = self.canary_team
        self.canary_team = None
        self.canary_percentage = 0
        logger.info("Canary promoted successfully")

    def rollback_canary(self):
        """Rollback canary deployment."""
        logger.warning("Rolling back canary")
        self.canary_team = None
        self.canary_percentage = 0
        logger.info("Canary rolled back")


# Usage example
def perform_canary_deployment():
    """Perform canary deployment."""
    stable_team = build_current_team()
    canary = CanaryDeployment(stable_team, canary_percentage=10)

    # Deploy canary
    new_team = build_new_team_version()
    canary.deploy_canary(new_team)

    # Gradually increase traffic
    for stage in [10, 25, 50, 100]:
        logger.info(f"Setting canary traffic to {stage}%")
        canary.canary_percentage = stage

        # Monitor for 10 minutes
        time.sleep(600)

        # Analyze metrics
        analysis = canary.analyze_metrics()

        if analysis["recommendation"] == "rollback":
            logger.error(f"Rolling back: {analysis['reason']}")
            canary.rollback_canary()
            break

        if stage == 100:
            logger.info("Canary successful, promoting to stable")
            canary.promote_canary()

Monitoring and Observability

Structured Logging

"""
production_logging.py - Production logging setup.
"""

import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any
from pythonjsonlogger import jsonlogger


class ProductionLogger:
    """Production-ready logging."""

    def __init__(
        self,
        name: str,
        level: str = "INFO",
        log_file: str = None
    ):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)

        # JSON formatter
        formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            json_encoder=json.JSONEncoder
        )

        # Console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)

        # File handler (if specified)
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)

    def log_request(
        self,
        request_id: str,
        method: str,
        path: str,
        **kwargs
    ):
        """Log incoming request."""
        self.logger.info(
            "Request received",
            extra={
                "request_id": request_id,
                "method": method,
                "path": path,
                **kwargs
            }
        )

    def log_response(
        self,
        request_id: str,
        status_code: int,
        duration_ms: float,
        **kwargs
    ):
        """Log response."""
        self.logger.info(
            "Response sent",
            extra={
                "request_id": request_id,
                "status_code": status_code,
                "duration_ms": duration_ms,
                **kwargs
            }
        )

    def log_error(
        self,
        request_id: str,
        error: Exception,
        **kwargs
    ):
        """Log error."""
        self.logger.error(
            "Error occurred",
            extra={
                "request_id": request_id,
                "error_type": type(error).__name__,
                "error_message": str(error),
                **kwargs
            },
            exc_info=True
        )

    def log_metric(
        self,
        metric_name: str,
        value: float,
        **kwargs
    ):
        """Log metric."""
        self.logger.info(
            "Metric recorded",
            extra={
                "metric_name": metric_name,
                "value": value,
                **kwargs
            }
        )


# Setup production logger
prod_logger = ProductionLogger(
    name="mcp_production",
    level="INFO",
    log_file="/app/logs/mcp.log"
)

Metrics Collection

"""
production_metrics.py - Production metrics collection.
"""

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps

# Define metrics
request_count = Counter(
    'mcp_requests_total',
    'Total number of requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'mcp_request_duration_seconds',
    'Request duration in seconds',
    ['method', 'endpoint']
)

active_requests = Gauge(
    'mcp_active_requests',
    'Number of active requests'
)

tool_execution_count = Counter(
    'mcp_tool_executions_total',
    'Total tool executions',
    ['tool_name', 'status']
)

tool_execution_duration = Histogram(
    'mcp_tool_execution_duration_seconds',
    'Tool execution duration',
    ['tool_name']
)

rl_q_table_size = Gauge(
    'mcp_rl_q_table_size',
    'Size of RL Q-table'
)

rl_exploration_rate = Gauge(
    'mcp_rl_exploration_rate',
    'Current RL exploration rate'
)


def track_request(method: str, endpoint: str):
    """Decorator to track requests."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            active_requests.inc()
            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                status = "success"
                return result
            except Exception as e:
                status = "error"
                raise
            finally:
                duration = time.time() - start_time
                active_requests.dec()
                request_count.labels(method=method, endpoint=endpoint, status=status).inc()
                request_duration.labels(method=method, endpoint=endpoint).observe(duration)

        return wrapper
    return decorator


def track_tool_execution(tool_name: str):
    """Decorator to track tool execution."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                status = "success"
                return result
            except Exception as e:
                status = "error"
                raise
            finally:
                duration = time.time() - start_time
                tool_execution_count.labels(tool_name=tool_name, status=status).inc()
                tool_execution_duration.labels(tool_name=tool_name).observe(duration)

        return wrapper
    return decorator


def update_rl_metrics(rl_manager):
    """Update RL metrics."""
    stats = rl_manager.get_statistics()
    rl_q_table_size.set(stats["total_states"])
    rl_exploration_rate.set(stats["exploration_rate"])


def start_metrics_server(port: int = 9090):
    """Start Prometheus metrics server."""
    start_http_server(port)
    logger.info(f"Metrics server started on port {port}")

Health Checks

"""
production_health.py - Production health checks.
"""

from fastapi import FastAPI, Response
from typing import Dict, Any
import time

app = FastAPI()


class HealthChecker:
    """Comprehensive health checker."""

    def __init__(self, team):
        self.team = team
        self.last_check = None
        self.health_status = "unknown"

    def check_components(self) -> Dict[str, Any]:
        """Check all system components."""
        checks = {}

        # 1. MCP Servers
        try:
            server_count = self.team.get_mcp_server_count()
            checks["mcp_servers"] = {
                "status": "healthy",
                "count": server_count
            }
        except Exception as e:
            checks["mcp_servers"] = {
                "status": "unhealthy",
                "error": str(e)
            }

        # 2. Tools
        try:
            tools = self.team.get_mcp_tool_names()
            checks["tools"] = {
                "status": "healthy",
                "count": len(tools)
            }
        except Exception as e:
            checks["tools"] = {
                "status": "unhealthy",
                "error": str(e)
            }

        # 3. Database (if configured)
        try:
            if config.database_url:
                # Test database connection
                checks["database"] = {
                    "status": "healthy"
                }
        except Exception as e:
            checks["database"] = {
                "status": "unhealthy",
                "error": str(e)
            }

        # 4. Redis (if configured)
        try:
            if config.redis_url:
                # Test Redis connection
                checks["redis"] = {
                    "status": "healthy"
                }
        except Exception as e:
            checks["redis"] = {
                "status": "unhealthy",
                "error": str(e)
            }

        # Determine overall health
        all_healthy = all(
            check.get("status") == "healthy"
            for check in checks.values()
        )

        self.health_status = "healthy" if all_healthy else "unhealthy"
        self.last_check = time.time()

        return {
            "status": self.health_status,
            "timestamp": self.last_check,
            "checks": checks
        }


health_checker = None  # Initialize after team creation


@app.get("/health")
async def health():
    """Health check endpoint."""
    if health_checker:
        result = health_checker.check_components()
        status_code = 200 if result["status"] == "healthy" else 503
        return Response(
            content=json.dumps(result),
            status_code=status_code,
            media_type="application/json"
        )
    return {"status": "starting"}


@app.get("/ready")
async def readiness():
    """Readiness check endpoint."""
    # Check if system is ready to serve traffic
    if health_checker and health_checker.health_status == "healthy":
        return {"status": "ready"}
    return Response(
        content=json.dumps({"status": "not_ready"}),
        status_code=503,
        media_type="application/json"
    )


@app.get("/metrics")
async def metrics():
    """Metrics endpoint for Prometheus."""
    from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

Scaling and Performance

Horizontal Scaling

"""
horizontal_scaling.py - Horizontal scaling implementation.
"""

from typing import List
import random

class LoadBalancer:
    """Simple round-robin load balancer."""

    def __init__(self, instances: List):
        self.instances = instances
        self.current_index = 0

    def get_instance(self):
        """Get next instance (round-robin)."""
        instance = self.instances[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.instances)
        return instance

    def add_instance(self, instance):
        """Add new instance."""
        self.instances.append(instance)
        logger.info(f"Added instance. Total: {len(self.instances)}")

    def remove_instance(self, instance):
        """Remove instance."""
        self.instances.remove(instance)
        logger.info(f"Removed instance. Total: {len(self.instances)}")


# Create multiple instances
instances = [
    build_team_instance(f"instance_{i}")
    for i in range(3)
]

load_balancer = LoadBalancer(instances)

# Route requests
def handle_request(request):
    """Handle request with load balancing."""
    instance = load_balancer.get_instance()
    return instance(request)

Caching Strategy

"""
production_cache.py - Production caching strategy.
"""

import redis
import pickle
import hashlib
from typing import Any, Optional

class ProductionCache:
    """Production caching with Redis."""

    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis_client = redis.from_url(redis_url)
        self.ttl = ttl

    def _generate_key(self, prefix: str, data: Any) -> str:
        """Generate cache key."""
        data_str = str(data)
        hash_obj = hashlib.md5(data_str.encode())
        return f"{prefix}:{hash_obj.hexdigest()}"

    def get(self, key: str) -> Optional[Any]:
        """Get from cache."""
        try:
            data = self.redis_client.get(key)
            if data:
                return pickle.loads(data)
        except Exception as e:
            logger.error(f"Cache get error: {e}")
        return None

    def set(self, key: str, value: Any, ttl: int = None):
        """Set in cache."""
        try:
            ttl = ttl or self.ttl
            data = pickle.dumps(value)
            self.redis_client.setex(key, ttl, data)
        except Exception as e:
            logger.error(f"Cache set error: {e}")

    def delete(self, key: str):
        """Delete from cache."""
        try:
            self.redis_client.delete(key)
        except Exception as e:
            logger.error(f"Cache delete error: {e}")

    def clear_all(self):
        """Clear all cache."""
        try:
            self.redis_client.flushdb()
            logger.info("Cleared all cache")
        except Exception as e:
            logger.error(f"Cache clear error: {e}")


# Initialize cache
cache = ProductionCache(
    redis_url=config.redis_url,
    ttl=config.cache_ttl
) if config.redis_url else None

Security Hardening

Authentication and Authorization

"""
production_security.py - Production security implementation.
"""

from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
from typing import Dict, Optional

security = HTTPBearer()


class SecurityManager:
    """Manage authentication and authorization."""

    def __init__(self, jwt_secret: str):
        self.jwt_secret = jwt_secret
        self.algorithm = "HS256"

    def create_token(self, user_id: str, role: str) -> str:
        """Create JWT token."""
        payload = {
            "user_id": user_id,
            "role": role,
            "exp": datetime.utcnow() + timedelta(hours=24)
        }
        return jwt.encode(payload, self.jwt_secret, algorithm=self.algorithm)

    def verify_token(self, token: str) -> Dict:
        """Verify JWT token."""
        try:
            payload = jwt.decode(
                token,
                self.jwt_secret,
                algorithms=[self.algorithm]
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="Token expired")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail="Invalid token")

    def require_auth(
        self,
        credentials: HTTPAuthorizationCredentials = Security(security)
    ) -> Dict:
        """Require authentication."""
        token = credentials.credentials
        return self.verify_token(token)

    def require_role(self, required_role: str):
        """Require specific role."""
        def decorator(user: Dict = Depends(self.require_auth)):
            if user.get("role") != required_role:
                raise HTTPException(
                    status_code=403,
                    detail="Insufficient permissions"
                )
            return user
        return decorator


# Initialize security
security_manager = SecurityManager(jwt_secret=config.jwt_secret)


# Use in endpoints
@app.post("/execute")
async def execute_task(
    request: Dict,
    user: Dict = Depends(security_manager.require_auth)
):
    """Execute task with authentication."""
    # Process request
    pass

Rate Limiting

"""
rate_limiting.py - Production rate limiting.
"""

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request

# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)

# Add to FastAPI app
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


@app.post("/execute")
@limiter.limit("60/minute")  # 60 requests per minute
async def execute_task(request: Request):
    """Execute task with rate limiting."""
    # Process request
    pass

High Availability

Multi-Region Deployment

"""
high_availability.py - High availability implementation.
"""

from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)


class MultiRegionDeployment:
    """Manage multi-region deployment for high availability."""

    def __init__(self, regions: List[str]):
        self.regions = regions
        self.region_instances: Dict[str, Any] = {}
        self.health_status: Dict[str, bool] = {}

    def deploy_to_region(self, region: str, team):
        """Deploy team to specific region."""
        logger.info(f"Deploying to region: {region}")
        self.region_instances[region] = team
        self.health_status[region] = True

    def get_healthy_regions(self) -> List[str]:
        """Get list of healthy regions."""
        return [
            region
            for region, healthy in self.health_status.items()
            if healthy
        ]

    def route_to_nearest_region(self, user_location: str):
        """Route request to nearest healthy region."""
        # Simplified region routing logic
        region_distances = {
            "us-east-1": self._calculate_distance(user_location, "us-east-1"),
            "us-west-2": self._calculate_distance(user_location, "us-west-2"),
            "eu-west-1": self._calculate_distance(user_location, "eu-west-1")
        }

        # Sort by distance
        sorted_regions = sorted(
            region_distances.items(),
            key=lambda x: x[1]
        )

        # Find nearest healthy region
        for region, _ in sorted_regions:
            if region in self.health_status and self.health_status[region]:
                logger.info(f"Routing to region: {region}")
                return self.region_instances[region]

        raise RuntimeError("No healthy regions available")

    def _calculate_distance(self, from_loc: str, to_region: str) -> float:
        """Calculate distance between locations."""
        # Simplified distance calculation
        # In production, use actual geolocation
        return 0.0

    def perform_health_check(self, region: str) -> bool:
        """Perform health check on region."""
        try:
            team = self.region_instances[region]
            # Test basic functionality
            result = team({
                "messages": [HumanMessage(content="health check")]
            })
            self.health_status[region] = True
            return True
        except Exception as e:
            logger.error(f"Region {region} health check failed: {e}")
            self.health_status[region] = False
            return False

    def failover_to_backup(self, failed_region: str):
        """Failover to backup region."""
        logger.warning(f"Initiating failover from {failed_region}")

        # Mark failed region as unhealthy
        self.health_status[failed_region] = False

        # Find healthy backup region
        healthy_regions = self.get_healthy_regions()

        if not healthy_regions:
            raise RuntimeError("No healthy backup regions available")

        backup_region = healthy_regions[0]
        logger.info(f"Failed over to backup region: {backup_region}")

        return backup_region

Database Replication

# database_replication.yaml - PostgreSQL replication setup
apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-replication-config
  namespace: production
data:
  postgresql.conf: |
    # Replication settings
    wal_level = replica
    max_wal_senders = 10
    max_replication_slots = 10
    hot_standby = on

  pg_hba.conf: |
    # Replication connections
    host replication replicator 0.0.0.0/0 md5

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: postgres-primary
  namespace: production
spec:
  serviceName: postgres-primary
  replicas: 1
  selector:
    matchLabels:
      app: postgres
      role: primary
  template:
    metadata:
      labels:
        app: postgres
        role: primary
    spec:
      containers:
      - name: postgres
        image: postgres:15
        env:
        - name: POSTGRES_USER
          value: mcp_user
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: postgres-secrets
              key: password
        - name: POSTGRES_DB
          value: mcp_db
        ports:
        - containerPort: 5432
          name: postgres
        volumeMounts:
        - name: data
          mountPath: /var/lib/postgresql/data
        - name: config
          mountPath: /etc/postgresql
      volumes:
      - name: config
        configMap:
          name: postgres-replication-config
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: postgres-replica
  namespace: production
spec:
  serviceName: postgres-replica
  replicas: 2
  selector:
    matchLabels:
      app: postgres
      role: replica
  template:
    metadata:
      labels:
        app: postgres
        role: replica
    spec:
      containers:
      - name: postgres
        image: postgres:15
        env:
        - name: POSTGRES_USER
          value: mcp_user
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: postgres-secrets
              key: password
        - name: POSTGRES_PRIMARY_HOST
          value: postgres-primary
        ports:
        - containerPort: 5432
          name: postgres
        volumeMounts:
        - name: data
          mountPath: /var/lib/postgresql/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi

Load Balancer Configuration

"""
advanced_load_balancer.py - Advanced load balancing strategies.
"""

import time
from typing import List, Dict, Any
from collections import defaultdict
import heapq


class WeightedLoadBalancer:
    """Weighted round-robin load balancer."""

    def __init__(self, instances: List[tuple]):
        """
        Initialize with weighted instances.

        Args:
            instances: List of (instance, weight) tuples
        """
        self.instances = instances
        self.current_weight = 0
        self.current_index = -1
        self.max_weight = max(weight for _, weight in instances)
        self.gcd_weight = self._gcd_weights()

    def _gcd_weights(self) -> int:
        """Calculate GCD of all weights."""
        from math import gcd
        weights = [weight for _, weight in self.instances]
        result = weights[0]
        for weight in weights[1:]:
            result = gcd(result, weight)
        return result

    def get_instance(self):
        """Get next instance using weighted round-robin."""
        while True:
            self.current_index = (self.current_index + 1) % len(self.instances)

            if self.current_index == 0:
                self.current_weight = self.current_weight - self.gcd_weight
                if self.current_weight <= 0:
                    self.current_weight = self.max_weight

            instance, weight = self.instances[self.current_index]
            if weight >= self.current_weight:
                return instance


class LeastConnectionsLoadBalancer:
    """Least connections load balancer."""

    def __init__(self, instances: List):
        self.instances = instances
        self.connections: Dict[int, int] = defaultdict(int)

    def get_instance(self):
        """Get instance with least connections."""
        min_connections = min(
            self.connections[id(instance)]
            for instance in self.instances
        )

        for instance in self.instances:
            if self.connections[id(instance)] == min_connections:
                self.connections[id(instance)] += 1
                return instance

    def release_instance(self, instance):
        """Release instance connection."""
        instance_id = id(instance)
        if instance_id in self.connections:
            self.connections[instance_id] = max(
                0,
                self.connections[instance_id] - 1
            )


class ConsistentHashLoadBalancer:
    """Consistent hashing load balancer."""

    def __init__(self, instances: List, virtual_nodes: int = 150):
        self.virtual_nodes = virtual_nodes
        self.ring: Dict[int, Any] = {}
        self.sorted_keys: List[int] = []

        for instance in instances:
            self.add_instance(instance)

    def _hash(self, key: str) -> int:
        """Hash function."""
        import hashlib
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_instance(self, instance):
        """Add instance to hash ring."""
        for i in range(self.virtual_nodes):
            virtual_key = f"{id(instance)}:{i}"
            hash_val = self._hash(virtual_key)
            self.ring[hash_val] = instance
            heapq.heappush(self.sorted_keys, hash_val)

    def get_instance(self, key: str):
        """Get instance for given key."""
        if not self.ring:
            return None

        hash_val = self._hash(key)

        # Find first node >= hash_val
        for node_hash in self.sorted_keys:
            if node_hash >= hash_val:
                return self.ring[node_hash]

        # Wrap around to first node
        return self.ring[self.sorted_keys[0]]

Disaster Recovery

Backup Strategy

"""
backup_strategy.py - Comprehensive backup implementation.
"""

import os
import shutil
import boto3
from datetime import datetime, timedelta
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)


class BackupManager:
    """Manage backups for disaster recovery."""

    def __init__(
        self,
        backup_dir: str = "/backups",
        s3_bucket: str = None,
        retention_days: int = 30
    ):
        self.backup_dir = backup_dir
        self.s3_bucket = s3_bucket
        self.retention_days = retention_days

        if s3_bucket:
            self.s3_client = boto3.client('s3')

        os.makedirs(backup_dir, exist_ok=True)

    def backup_database(self, db_url: str) -> str:
        """Backup database."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"{self.backup_dir}/db_backup_{timestamp}.sql"

        logger.info(f"Starting database backup: {backup_file}")

        try:
            # Use pg_dump for PostgreSQL
            import subprocess
            subprocess.run([
                "pg_dump",
                db_url,
                "-f", backup_file
            ], check=True)

            logger.info(f"Database backup completed: {backup_file}")

            # Upload to S3 if configured
            if self.s3_bucket:
                self._upload_to_s3(backup_file)

            return backup_file

        except Exception as e:
            logger.error(f"Database backup failed: {e}")
            raise

    def backup_rl_data(self, rl_data_dir: str) -> str:
        """Backup RL Q-table and training data."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"{self.backup_dir}/rl_backup_{timestamp}.tar.gz"

        logger.info(f"Starting RL data backup: {backup_file}")

        try:
            import tarfile
            with tarfile.open(backup_file, "w:gz") as tar:
                tar.add(rl_data_dir, arcname="rl_data")

            logger.info(f"RL data backup completed: {backup_file}")

            # Upload to S3 if configured
            if self.s3_bucket:
                self._upload_to_s3(backup_file)

            return backup_file

        except Exception as e:
            logger.error(f"RL data backup failed: {e}")
            raise

    def backup_configuration(self, config_dir: str) -> str:
        """Backup configuration files."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"{self.backup_dir}/config_backup_{timestamp}.tar.gz"

        logger.info(f"Starting configuration backup: {backup_file}")

        try:
            import tarfile
            with tarfile.open(backup_file, "w:gz") as tar:
                tar.add(config_dir, arcname="config")

            logger.info(f"Configuration backup completed: {backup_file}")

            # Upload to S3 if configured
            if self.s3_bucket:
                self._upload_to_s3(backup_file)

            return backup_file

        except Exception as e:
            logger.error(f"Configuration backup failed: {e}")
            raise

    def _upload_to_s3(self, file_path: str):
        """Upload backup to S3."""
        try:
            file_name = os.path.basename(file_path)
            s3_key = f"backups/{file_name}"

            logger.info(f"Uploading to S3: {s3_key}")

            self.s3_client.upload_file(
                file_path,
                self.s3_bucket,
                s3_key
            )

            logger.info(f"S3 upload completed: {s3_key}")

        except Exception as e:
            logger.error(f"S3 upload failed: {e}")
            raise

    def list_backups(self) -> List[Dict[str, Any]]:
        """List all backups."""
        backups = []

        for file_name in os.listdir(self.backup_dir):
            file_path = os.path.join(self.backup_dir, file_name)
            if os.path.isfile(file_path):
                stat = os.stat(file_path)
                backups.append({
                    "file": file_name,
                    "path": file_path,
                    "size": stat.st_size,
                    "created": datetime.fromtimestamp(stat.st_ctime)
                })

        return sorted(backups, key=lambda x: x["created"], reverse=True)

    def cleanup_old_backups(self):
        """Remove backups older than retention period."""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)

        logger.info(f"Cleaning up backups older than {cutoff_date}")

        for backup in self.list_backups():
            if backup["created"] < cutoff_date:
                logger.info(f"Removing old backup: {backup['file']}")
                os.remove(backup["path"])

    def restore_database(self, backup_file: str, db_url: str):
        """Restore database from backup."""
        logger.info(f"Restoring database from: {backup_file}")

        try:
            import subprocess
            subprocess.run([
                "psql",
                db_url,
                "-f", backup_file
            ], check=True)

            logger.info("Database restore completed")

        except Exception as e:
            logger.error(f"Database restore failed: {e}")
            raise

    def restore_rl_data(self, backup_file: str, target_dir: str):
        """Restore RL data from backup."""
        logger.info(f"Restoring RL data from: {backup_file}")

        try:
            import tarfile
            with tarfile.open(backup_file, "r:gz") as tar:
                tar.extractall(path=target_dir)

            logger.info("RL data restore completed")

        except Exception as e:
            logger.error(f"RL data restore failed: {e}")
            raise


# Automated backup schedule
def schedule_backups():
    """Schedule automated backups."""
    import schedule

    backup_manager = BackupManager(
        backup_dir="/backups",
        s3_bucket="my-mcp-backups",
        retention_days=30
    )

    # Daily database backup at 2 AM
    schedule.every().day.at("02:00").do(
        backup_manager.backup_database,
        db_url=config.database_url
    )

    # Daily RL data backup at 3 AM
    schedule.every().day.at("03:00").do(
        backup_manager.backup_rl_data,
        rl_data_dir="/app/rl_data"
    )

    # Weekly configuration backup on Sunday at 1 AM
    schedule.every().sunday.at("01:00").do(
        backup_manager.backup_configuration,
        config_dir="/app/config"
    )

    # Daily cleanup of old backups at 4 AM
    schedule.every().day.at("04:00").do(
        backup_manager.cleanup_old_backups
    )

    logger.info("Backup schedule configured")

    # Run scheduled tasks
    while True:
        schedule.run_pending()
        time.sleep(60)

Disaster Recovery Plan

# disaster_recovery_plan.yaml
disaster_recovery:
  rto: 4 hours  # Recovery Time Objective
  rpo: 1 hour   # Recovery Point Objective

  scenarios:
    - name: Database Failure
      impact: High
      probability: Medium
      recovery_steps:
        - Verify database is down
        - Promote replica to primary
        - Update DNS/connection strings
        - Verify application connectivity
        - Start new replica
      estimated_time: 30 minutes

    - name: Complete Region Failure
      impact: Critical
      probability: Low
      recovery_steps:
        - Verify region is down
        - Route traffic to backup region
        - Update DNS records
        - Verify all services operational
        - Monitor performance
        - Investigate root cause
      estimated_time: 2 hours

    - name: Data Corruption
      impact: High
      probability: Low
      recovery_steps:
        - Identify corruption extent
        - Stop write operations
        - Restore from latest backup
        - Replay transaction logs
        - Verify data integrity
        - Resume operations
      estimated_time: 4 hours

    - name: Application Crash
      impact: Medium
      probability: Medium
      recovery_steps:
        - Review logs and metrics
        - Identify crash cause
        - Rollback to previous version
        - Verify health checks pass
        - Route traffic back
      estimated_time: 15 minutes

  backup_procedures:
    - type: Database
      frequency: Daily
      retention: 30 days
      location: S3
      encryption: AES-256

    - type: RL Data
      frequency: Daily
      retention: 30 days
      location: S3
      encryption: AES-256

    - type: Configuration
      frequency: Weekly
      retention: 90 days
      location: S3 + Git
      encryption: AES-256

  testing:
    - Full DR test: Quarterly
    - Partial failover test: Monthly
    - Backup restore test: Monthly

Operational Procedures

Deployment Runbook

# Deployment Runbook

## Pre-Deployment Checklist

- [ ] Code reviewed and approved
- [ ] All tests passing (unit, integration, e2e)
- [ ] Security scan completed
- [ ] Performance testing completed
- [ ] Documentation updated
- [ ] Rollback plan prepared
- [ ] Stakeholders notified
- [ ] Maintenance window scheduled (if needed)

## Deployment Steps

### 1. Pre-Deployment

```bash
# Verify current state
kubectl get pods -n production
kubectl get deployments -n production

# Create backup
python scripts/backup.py --type all

# Tag release
git tag -a v1.2.0 -m "Release v1.2.0"
git push origin v1.2.0

2. Build and Push Image

# Build Docker image
docker build -t myregistry/mcp-agent:v1.2.0 .

# Run security scan
docker scan myregistry/mcp-agent:v1.2.0

# Push to registry
docker push myregistry/mcp-agent:v1.2.0

3. Deploy to Staging

# Deploy to staging
kubectl set image deployment/mcp-agent \
  mcp-agent=myregistry/mcp-agent:v1.2.0 \
  -n staging

# Wait for rollout
kubectl rollout status deployment/mcp-agent -n staging

# Run smoke tests
python tests/smoke_tests.py --env staging

4. Deploy to Production (Canary)

# Deploy canary (10% traffic)
kubectl apply -f k8s/canary-deployment.yaml

# Monitor metrics for 30 minutes
kubectl top pods -n production
# Check Grafana dashboards

# Increase to 50%
kubectl patch deployment mcp-agent-canary \
  -p '{"spec":{"replicas":5}}' -n production

# Monitor for 30 minutes

# Full rollout
kubectl set image deployment/mcp-agent \
  mcp-agent=myregistry/mcp-agent:v1.2.0 \
  -n production

# Wait for rollout
kubectl rollout status deployment/mcp-agent -n production

5. Post-Deployment Verification

# Check health endpoints
curl https://api.example.com/health
curl https://api.example.com/ready

# Verify metrics
curl https://api.example.com/metrics

# Check logs
kubectl logs -f deployment/mcp-agent -n production

# Run integration tests
python tests/integration_tests.py --env production

6. Rollback (If Needed)

# Quick rollback
kubectl rollout undo deployment/mcp-agent -n production

# Or rollback to specific revision
kubectl rollout undo deployment/mcp-agent \
  --to-revision=2 -n production

# Verify rollback
kubectl rollout status deployment/mcp-agent -n production

Post-Deployment

  • Update status page
  • Notify stakeholders
  • Update documentation
  • Create post-mortem (if issues occurred)
  • Update runbook with lessons learned

### Incident Response Procedure

```python
"""
incident_response.py - Incident response automation.
"""

from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)


class Severity(Enum):
    """Incident severity levels."""
    P1 = "critical"      # Service down
    P2 = "high"          # Major functionality impaired
    P3 = "medium"        # Minor functionality impaired
    P4 = "low"           # Minimal impact


class IncidentStatus(Enum):
    """Incident status."""
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    RESOLVING = "resolving"
    RESOLVED = "resolved"
    CLOSED = "closed"


@dataclass
class Incident:
    """Incident record."""
    id: str
    title: str
    severity: Severity
    status: IncidentStatus
    detected_at: datetime
    description: str
    affected_services: List[str]
    assigned_to: str = None
    resolved_at: datetime = None
    root_cause: str = None
    remediation: str = None


class IncidentManager:
    """Manage incident response."""

    def __init__(self):
        self.incidents: Dict[str, Incident] = {}
        self.on_call_rotation: List[str] = []

    def create_incident(
        self,
        title: str,
        severity: Severity,
        description: str,
        affected_services: List[str]
    ) -> Incident:
        """Create new incident."""
        incident_id = f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

        incident = Incident(
            id=incident_id,
            title=title,
            severity=severity,
            status=IncidentStatus.DETECTED,
            detected_at=datetime.now(),
            description=description,
            affected_services=affected_services
        )

        self.incidents[incident_id] = incident

        logger.critical(
            f"Incident created: {incident_id} - {title}",
            extra={
                "incident_id": incident_id,
                "severity": severity.value,
                "services": affected_services
            }
        )

        # Notify on-call engineer
        self._notify_on_call(incident)

        # Auto-remediation for known issues
        self._attempt_auto_remediation(incident)

        return incident

    def _notify_on_call(self, incident: Incident):
        """Notify on-call engineer."""
        if not self.on_call_rotation:
            logger.error("No on-call engineer configured")
            return

        engineer = self.on_call_rotation[0]
        logger.info(f"Notifying on-call engineer: {engineer}")

        # Send notifications (PagerDuty, Slack, Email, SMS)
        # Implementation depends on notification service

        incident.assigned_to = engineer

    def _attempt_auto_remediation(self, incident: Incident):
        """Attempt automatic remediation."""
        logger.info(f"Attempting auto-remediation for {incident.id}")

        # Common auto-remediation patterns
        if "database connection" in incident.description.lower():
            self._restart_database_connection_pool()

        elif "high latency" in incident.description.lower():
            self._scale_up_instances()

        elif "memory leak" in incident.description.lower():
            self._restart_affected_pods()

        elif "rate limit" in incident.description.lower():
            self._increase_rate_limits()

    def _restart_database_connection_pool(self):
        """Restart database connection pool."""
        logger.info("Restarting database connection pool")
        # Implementation specific to your setup

    def _scale_up_instances(self):
        """Scale up instances."""
        logger.info("Scaling up instances")
        # kubectl scale deployment mcp-agent --replicas=10 -n production

    def _restart_affected_pods(self):
        """Restart affected pods."""
        logger.info("Restarting affected pods")
        # kubectl rollout restart deployment mcp-agent -n production

    def _increase_rate_limits(self):
        """Temporarily increase rate limits."""
        logger.info("Increasing rate limits")
        # Update rate limiter configuration

    def update_status(self, incident_id: str, status: IncidentStatus):
        """Update incident status."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident not found: {incident_id}")

        incident = self.incidents[incident_id]
        incident.status = status

        logger.info(
            f"Incident {incident_id} status updated to {status.value}",
            extra={"incident_id": incident_id, "status": status.value}
        )

    def resolve_incident(
        self,
        incident_id: str,
        root_cause: str,
        remediation: str
    ):
        """Resolve incident."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident not found: {incident_id}")

        incident = self.incidents[incident_id]
        incident.status = IncidentStatus.RESOLVED
        incident.resolved_at = datetime.now()
        incident.root_cause = root_cause
        incident.remediation = remediation

        duration = incident.resolved_at - incident.detected_at

        logger.info(
            f"Incident {incident_id} resolved",
            extra={
                "incident_id": incident_id,
                "duration_seconds": duration.total_seconds(),
                "root_cause": root_cause
            }
        )

    def generate_post_mortem(self, incident_id: str) -> str:
        """Generate post-mortem report."""
        if incident_id not in self.incidents:
            raise ValueError(f"Incident not found: {incident_id}")

        incident = self.incidents[incident_id]

        duration = (
            incident.resolved_at - incident.detected_at
            if incident.resolved_at else None
        )

        report = f"""
# Post-Mortem: {incident.title}

**Incident ID:** {incident.id}
**Severity:** {incident.severity.value}
**Detected:** {incident.detected_at}
**Resolved:** {incident.resolved_at}
**Duration:** {duration}

## Summary

{incident.description}

## Impact

**Affected Services:**
{chr(10).join(f'- {service}' for service in incident.affected_services)}

## Root Cause

{incident.root_cause or 'TBD'}

## Resolution

{incident.remediation or 'TBD'}

## Timeline

- {incident.detected_at}: Incident detected
- {incident.detected_at}: On-call engineer notified
- {incident.resolved_at}: Incident resolved

## Action Items

- [ ] Update monitoring to detect similar issues earlier
- [ ] Implement additional safeguards
- [ ] Update runbooks
- [ ] Schedule review meeting

## Lessons Learned

TBD - To be filled during post-mortem review meeting
"""

        return report


# Global incident manager
incident_manager = IncidentManager()

Cost Optimization

Resource Right-Sizing

"""
cost_optimization.py - Cost optimization strategies.
"""

import logging
from typing import Dict, List, Any
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)


class CostOptimizer:
    """Optimize infrastructure costs."""

    def __init__(self):
        self.metrics_history: List[Dict] = []
        self.recommendations: List[Dict] = []

    def analyze_resource_usage(self, lookback_days: int = 7) -> Dict[str, Any]:
        """Analyze resource usage patterns."""
        logger.info(f"Analyzing resource usage for past {lookback_days} days")

        analysis = {
            "cpu_utilization": {
                "average": 0.0,
                "peak": 0.0,
                "idle_periods": []
            },
            "memory_utilization": {
                "average": 0.0,
                "peak": 0.0,
                "idle_periods": []
            },
            "request_patterns": {
                "peak_hours": [],
                "low_traffic_hours": []
            }
        }

        # Analyze metrics from monitoring system
        # This would integrate with Prometheus/CloudWatch

        return analysis

    def generate_recommendations(self) -> List[Dict[str, Any]]:
        """Generate cost optimization recommendations."""
        recommendations = []

        # 1. Right-size instances
        analysis = self.analyze_resource_usage()

        if analysis["cpu_utilization"]["average"] < 30:
            recommendations.append({
                "type": "downsize",
                "resource": "cpu",
                "current": "2000m",
                "recommended": "1000m",
                "estimated_savings": "$200/month",
                "impact": "low"
            })

        if analysis["memory_utilization"]["average"] < 40:
            recommendations.append({
                "type": "downsize",
                "resource": "memory",
                "current": "4Gi",
                "recommended": "2Gi",
                "estimated_savings": "$150/month",
                "impact": "low"
            })

        # 2. Auto-scaling optimization
        if len(analysis["request_patterns"]["low_traffic_hours"]) > 0:
            recommendations.append({
                "type": "auto_scaling",
                "resource": "pods",
                "suggestion": "Reduce min replicas during off-peak hours",
                "current_min": 3,
                "recommended_min": 1,
                "estimated_savings": "$300/month",
                "impact": "medium"
            })

        # 3. Reserved instances
        recommendations.append({
            "type": "reserved_instances",
            "suggestion": "Purchase 1-year reserved instances for base load",
            "estimated_savings": "$500/month",
            "commitment": "1 year",
            "impact": "none"
        })

        # 4. Spot instances
        recommendations.append({
            "type": "spot_instances",
            "suggestion": "Use spot instances for non-critical workloads",
            "estimated_savings": "$400/month",
            "impact": "low"
        })

        # 5. Cache optimization
        recommendations.append({
            "type": "caching",
            "suggestion": "Increase Redis cache TTL for static data",
            "current_ttl": "1 hour",
            "recommended_ttl": "6 hours",
            "estimated_savings": "$50/month (reduced LLM API calls)",
            "impact": "none"
        })

        self.recommendations = recommendations
        return recommendations

    def implement_recommendation(self, recommendation: Dict[str, Any]):
        """Implement cost optimization recommendation."""
        rec_type = recommendation["type"]

        logger.info(f"Implementing recommendation: {rec_type}")

        if rec_type == "downsize":
            self._downsize_resource(recommendation)

        elif rec_type == "auto_scaling":
            self._optimize_autoscaling(recommendation)

        elif rec_type == "caching":
            self._optimize_caching(recommendation)

        logger.info(f"Recommendation implemented: {rec_type}")

    def _downsize_resource(self, recommendation: Dict):
        """Downsize resource allocation."""
        # Update Kubernetes deployment with new resource limits
        pass

    def _optimize_autoscaling(self, recommendation: Dict):
        """Optimize auto-scaling configuration."""
        # Update HPA configuration
        pass

    def _optimize_caching(self, recommendation: Dict):
        """Optimize caching strategy."""
        # Update cache TTL configuration
        pass

    def generate_cost_report(self) -> str:
        """Generate cost report."""
        recommendations = self.generate_recommendations()

        total_savings = sum(
            float(rec["estimated_savings"].replace("$", "").replace("/month", ""))
            for rec in recommendations
            if "estimated_savings" in rec
        )

        report = f"""
# Cost Optimization Report
Generated: {datetime.now()}

## Current Monthly Costs
- Compute: $1,500
- Storage: $300
- Data Transfer: $200
- LLM API: $800
- **Total: $2,800/month**

## Optimization Opportunities

Total Potential Savings: **${total_savings:.2f}/month** ({(total_savings/2800)*100:.1f}%)

"""

        for i, rec in enumerate(recommendations, 1):
            report += f"""
### {i}. {rec['type'].replace('_', ' ').title()}

**Suggestion:** {rec.get('suggestion', 'Optimize ' + rec['type'])}
**Estimated Savings:** {rec.get('estimated_savings', 'TBD')}
**Impact:** {rec.get('impact', 'TBD')}

"""

        report += """
## Recommendations Priority

1. Reserved Instances (High savings, no impact)
2. Spot Instances (High savings, low impact)
3. Auto-scaling optimization (Medium savings, medium impact)
4. Resource right-sizing (Medium savings, low impact)
5. Cache optimization (Low savings, no impact)

## Next Steps

1. Review and approve recommendations
2. Implement in staging environment
3. Monitor for 1 week
4. Roll out to production
5. Track actual savings
"""

        return report

Auto-Scaling Policies

# advanced_autoscaling.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-agent-hpa-advanced
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-agent

  # Dynamic scaling based on time of day
  minReplicas: 2
  maxReplicas: 20

  metrics:
  # CPU-based scaling
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

  # Memory-based scaling
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

  # Custom metric: Request rate
  - type: Pods
    pods:
      metric:
        name: requests_per_second
      target:
        type: AverageValue
        averageValue: "100"

  # Custom metric: Queue depth
  - type: Object
    object:
      metric:
        name: queue_depth
      describedObject:
        apiVersion: v1
        kind: Service
        name: mcp-agent-service
      target:
        type: Value
        value: "30"

  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
      - type: Pods
        value: 2
        periodSeconds: 60
      selectPolicy: Max

    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60
      - type: Pods
        value: 1
        periodSeconds: 60
      selectPolicy: Min

---
# Vertical Pod Autoscaler (VPA)
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
  name: mcp-agent-vpa
  namespace: production
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-agent

  updatePolicy:
    updateMode: "Auto"

  resourcePolicy:
    containerPolicies:
    - containerName: mcp-agent
      minAllowed:
        cpu: 500m
        memory: 1Gi
      maxAllowed:
        cpu: 4000m
        memory: 8Gi

Complete Production Examples

Full Production Setup

"""
production_setup.py - Complete production setup example.
"""

import os
import logging
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# Import production components
from azcore.mcp import MCPTeamBuilder
from azcore.rl import RLManager
from production_config import ProductionConfig
from production_logging import ProductionLogger
from production_metrics import start_metrics_server, track_request
from production_health import HealthChecker
from production_cache import ProductionCache
from backup_strategy import BackupManager
from incident_response import IncidentManager
from cost_optimization import CostOptimizer

# Initialize logger
logger = ProductionLogger(
    name="mcp_production",
    level="INFO",
    log_file="/app/logs/mcp.log"
)

# Load configuration
config = ProductionConfig()


def build_production_team():
    """Build production-ready MCP team."""

    logger.logger.info("Building production MCP team")

    # Initialize LLM with production settings
    llm = ChatOpenAI(
        model=config.llm_model,
        temperature=config.llm_temperature,
        max_tokens=config.llm_max_tokens,
        timeout=config.llm_timeout,
        max_retries=config.llm_max_retries,
        api_key=config.openai_api_key
    )

    # Initialize RL Manager
    rl_manager = RLManager(
        tool_names=[],  # Will be populated after team building
        q_table_path=config.rl_q_table_path,
        exploration_rate=config.rl_exploration_rate,
        learning_rate=config.rl_learning_rate,
        use_embeddings=True
    ) if config.enable_rl else None

    # Build MCP team
    team = (MCPTeamBuilder("production_team")
        .with_llm(llm)

        # GitHub server
        .with_mcp_server(
            "npx",
            ["-y", "@modelcontextprotocol/server-github"],
            env={"GITHUB_TOKEN": config.github_token},
            timeout=30
        )

        # Filesystem server
        .with_mcp_server(
            "npx",
            ["-y", "@modelcontextprotocol/server-filesystem",
             "/app/data"],
            timeout=30
        )

        # PostgreSQL server (if configured)
        .with_mcp_server(
            "npx",
            ["-y", "@modelcontextprotocol/server-postgres"],
            env={"DATABASE_URL": config.database_url},
            timeout=30
        ) if config.database_url else MCPTeamBuilder("production_team")

        # Custom analytics server (example)
        .with_mcp_server(
            "python",
            ["/app/servers/analytics_server.py"],
            env={"ENVIRONMENT": "production"},
            timeout=30
        )

        # Configuration
        .skip_failed_servers(True)  # Graceful degradation

        # Prompt
        .with_prompt("""You are a production AI assistant with access to multiple tools.

Available capabilities:
- GitHub: Repository management, issues, PRs
- Filesystem: Read/write files
- Database: Query and update data
- Analytics: Generate reports and insights

Always:
1. Verify inputs before executing actions
2. Log all operations
3. Handle errors gracefully
4. Provide clear status updates
""")

        # RL Manager (if enabled)
        .with_rl_manager(rl_manager) if config.enable_rl else MCPTeamBuilder("production_team")

        .build()
    )

    logger.logger.info(f"Production team built with {team.get_mcp_server_count()} servers")

    return team


def setup_monitoring(team):
    """Setup monitoring and health checks."""

    logger.logger.info("Setting up monitoring")

    # Start metrics server
    if config.enable_metrics:
        start_metrics_server(port=config.metrics_port)

    # Initialize health checker
    health_checker = HealthChecker(team)

    # Schedule periodic health checks
    import schedule
    schedule.every(config.health_check_interval).seconds.do(
        health_checker.check_components
    )

    logger.logger.info("Monitoring setup complete")

    return health_checker


def setup_backups():
    """Setup automated backups."""

    logger.logger.info("Setting up backups")

    backup_manager = BackupManager(
        backup_dir="/backups",
        s3_bucket=os.getenv("BACKUP_S3_BUCKET"),
        retention_days=30
    )

    # Schedule backups
    import schedule

    # Daily database backup
    if config.database_url:
        schedule.every().day.at("02:00").do(
            backup_manager.backup_database,
            db_url=config.database_url
        )

    # Daily RL data backup
    if config.enable_rl:
        schedule.every().day.at("03:00").do(
            backup_manager.backup_rl_data,
            rl_data_dir="/app/rl_data"
        )

    # Daily cleanup
    schedule.every().day.at("04:00").do(
        backup_manager.cleanup_old_backups
    )

    logger.logger.info("Backup schedule configured")

    return backup_manager


def setup_cost_optimization():
    """Setup cost optimization."""

    logger.logger.info("Setting up cost optimization")

    optimizer = CostOptimizer()

    # Weekly cost analysis
    import schedule
    schedule.every().monday.at("09:00").do(
        lambda: logger.logger.info(optimizer.generate_cost_report())
    )

    return optimizer


@track_request("POST", "/execute")
def handle_request(request: dict):
    """Handle incoming request."""

    request_id = request.get("id", "unknown")

    logger.log_request(
        request_id=request_id,
        method="POST",
        path="/execute",
        task=request.get("task")
    )

    try:
        # Get cached result if available
        if config.enable_caching and cache:
            cached_result = cache.get(f"request:{request_id}")
            if cached_result:
                logger.logger.info(f"Cache hit for request {request_id}")
                return cached_result

        # Execute request
        result = team({
            "messages": [
                HumanMessage(content=request.get("task", ""))
            ]
        })

        # Cache result
        if config.enable_caching and cache:
            cache.set(f"request:{request_id}", result, ttl=config.cache_ttl)

        logger.log_response(
            request_id=request_id,
            status_code=200,
            duration_ms=0  # Would be measured in actual implementation
        )

        return result

    except Exception as e:
        logger.log_error(
            request_id=request_id,
            error=e
        )

        # Create incident for critical errors
        if config.is_production():
            incident_manager.create_incident(
                title=f"Request execution failed: {request_id}",
                severity=Severity.P3,
                description=str(e),
                affected_services=["mcp-agent"]
            )

        raise


def main():
    """Main production setup."""

    logger.logger.info("Starting production MCP agent")
    logger.logger.info(f"Environment: {config.environment.value}")

    # Build team
    global team
    team = build_production_team()

    # Setup cache
    global cache
    cache = ProductionCache(
        redis_url=config.redis_url,
        ttl=config.cache_ttl
    ) if config.enable_caching and config.redis_url else None

    # Setup monitoring
    health_checker = setup_monitoring(team)

    # Setup backups
    backup_manager = setup_backups()

    # Setup cost optimization
    optimizer = setup_cost_optimization()

    # Setup incident management
    global incident_manager
    incident_manager = IncidentManager()

    logger.logger.info("Production MCP agent ready")

    # Start FastAPI server (from production_health.py)
    import uvicorn
    uvicorn.run(
        "production_health:app",
        host="0.0.0.0",
        port=8000,
        workers=4,
        log_level="info"
    )


if __name__ == "__main__":
    main()

Complete Kubernetes Production Deployment

#!/bin/bash
# deploy_production.sh - Complete production deployment script

set -e

echo "=== Production Deployment Script ==="

# Configuration
NAMESPACE="production"
IMAGE_TAG="${1:-latest}"
REGISTRY="myregistry"
APP_NAME="mcp-agent"

# 1. Pre-deployment checks
echo "1. Running pre-deployment checks..."

# Verify kubectl access
kubectl cluster-info

# Verify namespace exists
kubectl get namespace $NAMESPACE || kubectl create namespace $NAMESPACE

# Run tests
python -m pytest tests/
echo "✓ Tests passed"

# 2. Build and push image
echo "2. Building Docker image..."
docker build -t $REGISTRY/$APP_NAME:$IMAGE_TAG .

echo "Running security scan..."
docker scan $REGISTRY/$APP_NAME:$IMAGE_TAG || true

echo "Pushing image to registry..."
docker push $REGISTRY/$APP_NAME:$IMAGE_TAG
echo "✓ Image pushed"

# 3. Create/update secrets
echo "3. Managing secrets..."

kubectl create secret generic mcp-secrets \
  --from-env-file=.env.production \
  --namespace=$NAMESPACE \
  --dry-run=client -o yaml | kubectl apply -f -

echo "✓ Secrets updated"

# 4. Apply configurations
echo "4. Applying configurations..."

kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/service.yaml

echo "✓ Configurations applied"

# 5. Deploy application
echo "5. Deploying application..."

# Update deployment with new image
kubectl set image deployment/$APP_NAME \
  $APP_NAME=$REGISTRY/$APP_NAME:$IMAGE_TAG \
  --namespace=$NAMESPACE

# Or apply full deployment
kubectl apply -f k8s/deployment.yaml

echo "Waiting for rollout..."
kubectl rollout status deployment/$APP_NAME -n $NAMESPACE

echo "✓ Deployment complete"

# 6. Apply autoscaling
echo "6. Applying autoscaling..."
kubectl apply -f k8s/hpa.yaml
echo "✓ Autoscaling configured"

# 7. Verify deployment
echo "7. Verifying deployment..."

# Check pods
kubectl get pods -n $NAMESPACE

# Check health endpoint
EXTERNAL_IP=$(kubectl get service $APP_NAME-service -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')

if [ -n "$EXTERNAL_IP" ]; then
  echo "Checking health endpoint..."
  curl -f http://$EXTERNAL_IP/health || echo "Warning: Health check failed"
fi

echo "✓ Deployment verified"

# 8. Post-deployment tasks
echo "8. Running post-deployment tasks..."

# Create backup
kubectl exec -n $NAMESPACE deployment/$APP_NAME -- python scripts/backup.py

# Update monitoring dashboards
echo "TODO: Update Grafana dashboards"

echo "✓ Post-deployment tasks complete"

echo ""
echo "=== Deployment Complete ==="
echo "Namespace: $NAMESPACE"
echo "Image: $REGISTRY/$APP_NAME:$IMAGE_TAG"
echo "External IP: $EXTERNAL_IP"
echo ""
echo "Monitor deployment:"
echo "  kubectl get pods -n $NAMESPACE -w"
echo "  kubectl logs -f deployment/$APP_NAME -n $NAMESPACE"
echo ""
echo "Rollback if needed:"
echo "  kubectl rollout undo deployment/$APP_NAME -n $NAMESPACE"

Summary

Production deployment requires comprehensive planning and implementation across multiple areas:

Key Components

  1. Infrastructure

    • Docker containerization
    • Kubernetes orchestration
    • Multi-region deployment
    • Database replication
  2. Configuration

    • Environment-based settings
    • Secrets management (AWS Secrets Manager, Vault)
    • Pydantic validation
    • Dynamic configuration
  3. Deployment

    • Blue-green deployment
    • Canary deployment
    • Automated rollback
    • CI/CD integration
  4. Monitoring

    • Structured JSON logging
    • Prometheus metrics
    • Health checks (liveness, readiness)
    • Distributed tracing
  5. Scaling

    • Horizontal pod autoscaling (HPA)
    • Vertical pod autoscaling (VPA)
    • Load balancing (round-robin, least connections, consistent hashing)
    • Redis caching
  6. Security

    • JWT authentication
    • Role-based access control
    • Rate limiting
    • SSL/TLS encryption
    • Network policies
  7. High Availability

    • Multi-region deployment
    • Database replication
    • Automated failover
    • Redundant load balancers
  8. Disaster Recovery

    • Automated backups (database, RL data, configuration)
    • S3 backup storage
    • Restore procedures
    • RTO/RPO targets
    • DR testing schedule
  9. Operations

    • Deployment runbooks
    • Incident response procedures
    • Automated remediation
    • Post-mortem process
    • On-call rotation
  10. Cost Optimization

    • Resource right-sizing
    • Reserved instances
    • Spot instances
    • Cache optimization
    • Auto-scaling policies

Best Practices

  • Start small, scale gradually: Begin with basic setup, add complexity as needed
  • Automate everything: Deployments, backups, monitoring, incident response
  • Monitor continuously: Logs, metrics, traces, health checks
  • Plan for failure: Circuit breakers, retries, fallbacks, graceful degradation
  • Test disaster recovery: Regular DR drills, backup restore tests
  • Document thoroughly: Runbooks, architecture diagrams, API docs
  • Optimize costs: Regular cost analysis, right-sizing, reserved instances
  • Secure by default: Authentication, encryption, least privilege

Production Checklist

Before going to production:

  • All infrastructure provisioned and tested
  • Secrets management configured
  • Monitoring and alerting set up
  • Backup and restore procedures tested
  • Disaster recovery plan documented
  • Load testing completed
  • Security audit passed
  • Documentation complete
  • Runbooks created
  • On-call rotation established
  • Stakeholders trained
  • Rollback procedure tested

Next Steps

  1. Review this guide and adapt to your specific requirements
  2. Set up staging environment first
  3. Run load tests and validate performance
  4. Conduct security audit
  5. Test disaster recovery procedures
  6. Train team on operational procedures
  7. Deploy to production with monitoring
  8. Continuously improve based on operational feedback
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs