• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Model Context Protocol (MCP)

Error Handling

Comprehensive guide to error handling, resilience, and fault tolerance in MCP integrations.

Complete guide to building resilient MCP-enabled agents with comprehensive error handling, fault tolerance, and graceful degradation strategies.


Overview

MCP integrations involve multiple failure points: network issues, server crashes, authentication failures, and tool execution errors. Robust error handling is essential for production systems.

Why Error Handling Matters

# ❌ Without error handling - Fragile
team = (MCPTeamBuilder("fragile_team")
    .with_llm(llm)
    .with_mcp_server_sse("https://api.example.com/sse")
    .build()
)

result = team({"messages": [HumanMessage(content="Task")]})
# Fails completely if server is down

# ✅ With error handling - Resilient
team = (MCPTeamBuilder("resilient_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        "https://api.example.com/sse",
        timeout=30
    )
    .skip_failed_servers(True)  # Graceful degradation
    .build()
)

try:
    result = team({"messages": [HumanMessage(content="Task")]})
except Exception as e:
    logger.error(f"Task failed: {e}")
    # Fallback to alternative approach
    result = fallback_execution()

Common Failure Scenarios

  1. Connection Failures: Server unreachable, network issues
  2. Authentication Errors: Invalid credentials, expired tokens
  3. Timeout Errors: Slow operations, unresponsive servers
  4. Tool Execution Errors: Invalid inputs, runtime exceptions
  5. Resource Exhaustion: Memory limits, connection limits
  6. Protocol Errors: Invalid MCP messages, version mismatches

Error Types

1. Connection Errors

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
import logging

logger = logging.getLogger(__name__)

try:
    team = (MCPTeamBuilder("connection_test")
        .with_llm(ChatOpenAI(model="gpt-4o-mini"))
        .with_mcp_server_sse(
            url="https://unreachable-server.example.com/sse",
            timeout=10
        )
        .build()
    )
except ConnectionError as e:
    logger.error(f"Connection failed: {e}")
    # Server unreachable
except TimeoutError as e:
    logger.error(f"Connection timeout: {e}")
    # Server not responding
except Exception as e:
    logger.error(f"Unexpected connection error: {e}")

2. Authentication Errors

import os

try:
    team = (MCPTeamBuilder("auth_test")
        .with_llm(llm)
        .with_mcp_server(
            "npx",
            ["-y", "@modelcontextprotocol/server-github"],
            env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
        )
        .build()
    )
except PermissionError as e:
    logger.error(f"Authentication failed: {e}")
    # Invalid or missing credentials
except ValueError as e:
    logger.error(f"Invalid credentials format: {e}")
    # Malformed token or credentials

3. Tool Execution Errors

from langchain_core.messages import HumanMessage

team = (MCPTeamBuilder("tool_test")
    .with_llm(llm)
    .with_mcp_server("python", ["server.py"])
    .build()
)

try:
    result = team({
        "messages": [HumanMessage(content="Execute task")]
    })
except ValueError as e:
    logger.error(f"Invalid input: {e}")
    # Tool received invalid arguments
except RuntimeError as e:
    logger.error(f"Tool execution failed: {e}")
    # Tool encountered runtime error
except Exception as e:
    logger.error(f"Unexpected tool error: {e}")

4. Timeout Errors

import asyncio

try:
    result = team({
        "messages": [HumanMessage(content="Long running task")]
    })
except asyncio.TimeoutError as e:
    logger.error(f"Operation timeout: {e}")
    # Operation took too long
except TimeoutError as e:
    logger.error(f"Server timeout: {e}")
    # Server didn't respond in time

5. Resource Errors

try:
    team = (MCPTeamBuilder("resource_test")
        .with_llm(llm)
        .with_mcp_server("python", ["memory_intensive_server.py"])
        .build()
    )
except MemoryError as e:
    logger.error(f"Out of memory: {e}")
    # Insufficient memory for operation
except OSError as e:
    logger.error(f"Resource error: {e}")
    # File descriptor limit, disk space, etc.

Basic Error Handling

Try-Catch Pattern

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def execute_with_basic_error_handling(team, task: str):
    """Execute task with basic error handling."""
    try:
        result = team({
            "messages": [HumanMessage(content=task)]
        })
        logger.info("Task completed successfully")
        return result

    except ConnectionError as e:
        logger.error(f"Connection error: {e}")
        return {"error": "Server unreachable", "details": str(e)}

    except TimeoutError as e:
        logger.error(f"Timeout error: {e}")
        return {"error": "Operation timed out", "details": str(e)}

    except PermissionError as e:
        logger.error(f"Permission error: {e}")
        return {"error": "Authentication failed", "details": str(e)}

    except ValueError as e:
        logger.error(f"Value error: {e}")
        return {"error": "Invalid input", "details": str(e)}

    except Exception as e:
        logger.exception(f"Unexpected error: {e}")
        return {"error": "Unexpected error", "details": str(e)}

# Usage
team = (MCPTeamBuilder("basic_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server("python", ["server.py"])
    .build()
)

result = execute_with_basic_error_handling(team, "Process data")
if "error" in result:
    print(f"Task failed: {result['error']}")
else:
    print(f"Task succeeded: {result['messages'][-1].content}")

Error Context Managers

from contextlib import contextmanager
from typing import Generator
import time

@contextmanager
def error_handler(
    operation: str,
    logger: logging.Logger = None
) -> Generator[None, None, None]:
    """Context manager for error handling with timing."""
    if logger is None:
        logger = logging.getLogger(__name__)

    start_time = time.time()
    logger.info(f"Starting operation: {operation}")

    try:
        yield
    except ConnectionError as e:
        logger.error(f"Connection failed in {operation}: {e}")
        raise
    except TimeoutError as e:
        logger.error(f"Timeout in {operation}: {e}")
        raise
    except Exception as e:
        logger.exception(f"Error in {operation}: {e}")
        raise
    finally:
        elapsed = time.time() - start_time
        logger.info(f"Operation {operation} took {elapsed:.2f}s")

# Usage
with error_handler("team_execution"):
    result = team({"messages": [HumanMessage(content="Task")]})

Defensive Team Building

def build_resilient_team(
    name: str,
    llm,
    server_configs: list,
    max_retries: int = 3
):
    """Build team with defensive error handling."""
    builder = MCPTeamBuilder(name).with_llm(llm)

    # Add servers with error handling
    for config in server_configs:
        try:
            if "url" in config:
                # SSE server
                builder = builder.with_mcp_server_sse(
                    url=config["url"],
                    env=config.get("env", {}),
                    timeout=config.get("timeout", 30)
                )
            else:
                # STDIO server
                builder = builder.with_mcp_server(
                    command=config["command"],
                    args=config["args"],
                    env=config.get("env", {}),
                    timeout=config.get("timeout", 10)
                )
            logger.info(f"Added server: {config.get('name', 'unnamed')}")

        except Exception as e:
            logger.error(f"Failed to add server {config.get('name')}: {e}")
            if not config.get("optional", False):
                raise  # Re-raise if server is required

    # Enable graceful degradation
    builder = builder.skip_failed_servers(True)

    # Build with error handling
    try:
        team = builder.build()
        logger.info(f"Team {name} built successfully")
        return team
    except Exception as e:
        logger.error(f"Failed to build team {name}: {e}")
        raise

# Usage
server_configs = [
    {
        "name": "filesystem",
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"],
        "optional": False  # Required
    },
    {
        "name": "github",
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-github"],
        "env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")},
        "optional": True  # Optional
    },
    {
        "name": "search_api",
        "url": "https://search.example.com/sse",
        "env": {"API_KEY": os.getenv("SEARCH_API_KEY")},
        "timeout": 45,
        "optional": True  # Optional
    }
]

team = build_resilient_team("resilient_team", llm, server_configs)

Connection Errors

Pre-Connection Testing

import requests
from urllib.parse import urlparse

def test_server_connectivity(url: str, timeout: int = 5) -> bool:
    """Test if server is reachable before connecting."""
    try:
        parsed = urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"

        # Try to reach server
        response = requests.get(
            f"{base_url}/health",
            timeout=timeout
        )
        return response.status_code == 200

    except requests.exceptions.ConnectionError:
        logger.warning(f"Server {url} unreachable")
        return False
    except requests.exceptions.Timeout:
        logger.warning(f"Server {url} timeout")
        return False
    except Exception as e:
        logger.error(f"Connectivity test failed: {e}")
        return False

# Build team with connectivity check
def build_team_with_connection_test(server_url: str):
    """Build team only if server is reachable."""
    if not test_server_connectivity(server_url):
        logger.error(f"Server {server_url} not available")
        # Use fallback configuration
        return build_fallback_team()

    return (MCPTeamBuilder("tested_team")
        .with_llm(llm)
        .with_mcp_server_sse(url=server_url, timeout=30)
        .build()
    )

Connection Retry

import time
from typing import Optional

def connect_with_retry(
    builder: MCPTeamBuilder,
    max_attempts: int = 3,
    backoff_factor: float = 2.0
) -> Optional[object]:
    """Build team with connection retry."""
    for attempt in range(max_attempts):
        try:
            team = builder.build()
            logger.info(f"Connected successfully on attempt {attempt + 1}")
            return team

        except ConnectionError as e:
            if attempt < max_attempts - 1:
                wait_time = backoff_factor ** attempt
                logger.warning(
                    f"Connection failed (attempt {attempt + 1}/{max_attempts}). "
                    f"Retrying in {wait_time}s..."
                )
                time.sleep(wait_time)
            else:
                logger.error(f"Failed to connect after {max_attempts} attempts")
                raise

        except Exception as e:
            logger.error(f"Unexpected error during connection: {e}")
            raise

    return None

# Usage
builder = (MCPTeamBuilder("retry_team")
    .with_llm(llm)
    .with_mcp_server_sse("https://unstable-api.example.com/sse")
)

team = connect_with_retry(builder, max_attempts=3)
if team:
    result = team({"messages": [HumanMessage(content="Task")]})

Connection Pooling

class ConnectionPool:
    """Manage pool of MCP team connections."""

    def __init__(self, size: int = 5):
        self.size = size
        self.teams = []
        self.available = []
        self.in_use = []

    def initialize(self, builder_factory):
        """Initialize connection pool."""
        for i in range(self.size):
            try:
                team = builder_factory(f"pool_team_{i}")
                self.teams.append(team)
                self.available.append(team)
                logger.info(f"Initialized team {i+1}/{self.size}")
            except Exception as e:
                logger.error(f"Failed to initialize team {i+1}: {e}")

    def acquire(self, timeout: int = 30):
        """Acquire team from pool."""
        start_time = time.time()

        while time.time() - start_time < timeout:
            if self.available:
                team = self.available.pop(0)
                self.in_use.append(team)
                return team
            time.sleep(0.1)

        raise TimeoutError("No teams available in pool")

    def release(self, team):
        """Release team back to pool."""
        if team in self.in_use:
            self.in_use.remove(team)
            self.available.append(team)

    def close_all(self):
        """Close all connections."""
        for team in self.teams:
            try:
                team.cleanup()
            except Exception as e:
                logger.error(f"Error closing team: {e}")

# Usage
def builder_factory(name: str):
    return (MCPTeamBuilder(name)
        .with_llm(llm)
        .with_mcp_server_sse("https://api.example.com/sse")
        .build()
    )

pool = ConnectionPool(size=5)
pool.initialize(builder_factory)

# Acquire and use
team = pool.acquire()
try:
    result = team({"messages": [HumanMessage(content="Task")]})
finally:
    pool.release(team)

Tool Execution Errors

Input Validation

from typing import Any, Dict
import re

class InputValidator:
    """Validate tool inputs before execution."""

    @staticmethod
    def validate_string(
        value: Any,
        min_length: int = 0,
        max_length: int = 10000,
        pattern: str = None
    ) -> str:
        """Validate string input."""
        if not isinstance(value, str):
            raise ValueError(f"Expected string, got {type(value).__name__}")

        if len(value) < min_length:
            raise ValueError(f"String too short (min {min_length})")

        if len(value) > max_length:
            raise ValueError(f"String too long (max {max_length})")

        if pattern and not re.match(pattern, value):
            raise ValueError(f"String doesn't match pattern: {pattern}")

        return value

    @staticmethod
    def validate_number(
        value: Any,
        min_value: float = None,
        max_value: float = None
    ) -> float:
        """Validate numeric input."""
        try:
            num = float(value)
        except (ValueError, TypeError):
            raise ValueError(f"Expected number, got {type(value).__name__}")

        if min_value is not None and num < min_value:
            raise ValueError(f"Number too small (min {min_value})")

        if max_value is not None and num > max_value:
            raise ValueError(f"Number too large (max {max_value})")

        return num

    @staticmethod
    def validate_dict(
        value: Any,
        required_keys: list = None,
        allowed_keys: list = None
    ) -> Dict:
        """Validate dictionary input."""
        if not isinstance(value, dict):
            raise ValueError(f"Expected dict, got {type(value).__name__}")

        if required_keys:
            missing = set(required_keys) - set(value.keys())
            if missing:
                raise ValueError(f"Missing required keys: {missing}")

        if allowed_keys:
            invalid = set(value.keys()) - set(allowed_keys)
            if invalid:
                raise ValueError(f"Invalid keys: {invalid}")

        return value

# Usage in tool execution
validator = InputValidator()

def execute_with_validation(team, task: str, params: Dict):
    """Execute with input validation."""
    try:
        # Validate inputs
        task = validator.validate_string(
            task,
            min_length=1,
            max_length=5000
        )

        params = validator.validate_dict(
            params,
            required_keys=["action"],
            allowed_keys=["action", "target", "options"]
        )

        # Execute task
        result = team({
            "messages": [HumanMessage(content=task)]
        })
        return result

    except ValueError as e:
        logger.error(f"Validation failed: {e}")
        return {"error": "Invalid input", "details": str(e)}

Safe Tool Execution

import signal
from contextlib import contextmanager

@contextmanager
def timeout_handler(seconds: int):
    """Context manager for execution timeout."""
    def timeout_signal_handler(signum, frame):
        raise TimeoutError(f"Execution exceeded {seconds} seconds")

    # Set signal handler
    old_handler = signal.signal(signal.SIGALRM, timeout_signal_handler)
    signal.alarm(seconds)

    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

def safe_execute(
    team,
    task: str,
    timeout: int = 60,
    max_retries: int = 3
):
    """Execute with timeout and retry."""
    for attempt in range(max_retries):
        try:
            with timeout_handler(timeout):
                result = team({
                    "messages": [HumanMessage(content=task)]
                })
                return result

        except TimeoutError as e:
            logger.warning(f"Attempt {attempt + 1} timed out: {e}")
            if attempt < max_retries - 1:
                continue
            raise

        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            raise

    return None

Error Recovery

class ErrorRecovery:
    """Automatic error recovery strategies."""

    def __init__(self, team):
        self.team = team
        self.error_history = []

    def execute_with_recovery(self, task: str) -> Dict:
        """Execute with automatic error recovery."""
        try:
            result = self.team({
                "messages": [HumanMessage(content=task)]
            })
            return {"status": "success", "result": result}

        except ValueError as e:
            # Input validation error - try to fix
            logger.warning(f"Input error: {e}. Attempting to fix...")
            fixed_task = self._fix_input_error(task, e)
            if fixed_task:
                return self.execute_with_recovery(fixed_task)
            return {"status": "error", "type": "input_error", "details": str(e)}

        except TimeoutError as e:
            # Timeout - break into smaller tasks
            logger.warning(f"Timeout: {e}. Breaking into smaller tasks...")
            return self._break_into_subtasks(task)

        except RuntimeError as e:
            # Runtime error - try alternative approach
            logger.warning(f"Runtime error: {e}. Trying alternative...")
            return self._try_alternative_approach(task)

        except Exception as e:
            # Unknown error - log and fail
            logger.error(f"Unrecoverable error: {e}")
            self.error_history.append({
                "task": task,
                "error": str(e),
                "timestamp": time.time()
            })
            return {"status": "error", "type": "unknown", "details": str(e)}

    def _fix_input_error(self, task: str, error: Exception) -> str:
        """Attempt to fix input error."""
        # Implement input fixing logic
        # For example: clean special characters, truncate length, etc.
        return None

    def _break_into_subtasks(self, task: str) -> Dict:
        """Break task into smaller subtasks."""
        # Implement task breakdown logic
        return {"status": "partial", "message": "Task broken into subtasks"}

    def _try_alternative_approach(self, task: str) -> Dict:
        """Try alternative execution approach."""
        # Implement alternative strategies
        return {"status": "alternative", "message": "Used alternative approach"}

# Usage
recovery = ErrorRecovery(team)
result = recovery.execute_with_recovery("Complex task")

Graceful Degradation

Skip Failed Servers

# Enable graceful degradation during build
resilient_team = (MCPTeamBuilder("resilient_team")
    .with_llm(llm)

    # Critical server (must succeed)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    # Optional servers (can fail)
    .with_mcp_server_sse(
        url="https://optional-api-1.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_1")},
        timeout=30
    )

    .with_mcp_server_sse(
        url="https://optional-api-2.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_2")},
        timeout=30
    )

    # Enable graceful degradation
    .skip_failed_servers(True)

    .with_prompt("""You are a resilient assistant.

Available capabilities depend on which servers are online:
- Filesystem (always available)
- Optional API 1 (may be unavailable)
- Optional API 2 (may be unavailable)

If optional services are unavailable, work with available tools only.
Inform user about limited capabilities when needed.
""")

    .build()
)

# Check which tools are available
available_tools = resilient_team.get_mcp_tool_names()
logger.info(f"Available tools: {available_tools}")

# Execute with awareness of available capabilities
result = resilient_team({
    "messages": [HumanMessage(content="Process data using available tools")]
})

Capability Detection

class CapabilityAwareTeam:
    """Team that adapts to available capabilities."""

    def __init__(self, builder: MCPTeamBuilder):
        self.team = builder.skip_failed_servers(True).build()
        self.capabilities = self._detect_capabilities()

    def _detect_capabilities(self) -> Dict[str, bool]:
        """Detect available capabilities."""
        available_tools = self.team.get_mcp_tool_names()

        return {
            "filesystem": any("file" in tool for tool in available_tools),
            "database": any("db" in tool or "sql" in tool for tool in available_tools),
            "github": any("github" in tool for tool in available_tools),
            "search": any("search" in tool for tool in available_tools),
        }

    def execute_adaptive(self, task: str) -> Dict:
        """Execute task adapting to available capabilities."""
        # Check if required capabilities are available
        required = self._required_capabilities(task)
        available = all(self.capabilities.get(cap, False) for cap in required)

        if not available:
            missing = [c for c in required if not self.capabilities.get(c, False)]
            logger.warning(f"Missing capabilities: {missing}")
            return self._execute_with_fallback(task, missing)

        # Execute normally
        result = self.team({
            "messages": [HumanMessage(content=task)]
        })
        return {"status": "success", "result": result}

    def _required_capabilities(self, task: str) -> list:
        """Determine required capabilities from task."""
        required = []
        task_lower = task.lower()

        if any(word in task_lower for word in ["file", "read", "write"]):
            required.append("filesystem")
        if any(word in task_lower for word in ["database", "query", "sql"]):
            required.append("database")
        if any(word in task_lower for word in ["github", "repo", "issue"]):
            required.append("github")
        if any(word in task_lower for word in ["search", "find", "look up"]):
            required.append("search")

        return required

    def _execute_with_fallback(self, task: str, missing: list) -> Dict:
        """Execute with fallback for missing capabilities."""
        logger.info(f"Using fallback for missing: {missing}")

        # Modify task to work with available capabilities
        modified_task = self._modify_task_for_fallback(task, missing)

        result = self.team({
            "messages": [HumanMessage(content=modified_task)]
        })

        return {
            "status": "degraded",
            "missing_capabilities": missing,
            "result": result
        }

    def _modify_task_for_fallback(self, task: str, missing: list) -> str:
        """Modify task to work without missing capabilities."""
        modifications = {
            "database": "Note: Database unavailable. Use filesystem for data.",
            "github": "Note: GitHub unavailable. Save results locally.",
            "search": "Note: Search unavailable. Use cached information."
        }

        prefix = "\n".join(modifications[cap] for cap in missing if cap in modifications)
        return f"{prefix}\n\n{task}"

# Usage
builder = (MCPTeamBuilder("adaptive_team")
    .with_llm(llm)
    .with_mcp_server("npx", ["-y", "@modelcontextprotocol/server-filesystem", "/data"])
    .with_mcp_server_sse("https://api.example.com/sse")  # May fail
)

adaptive_team = CapabilityAwareTeam(builder)
result = adaptive_team.execute_adaptive("Search and save results to file")

Fallback Chains

class FallbackChain:
    """Execute with chain of fallback options."""

    def __init__(self, teams: list, names: list = None):
        self.teams = teams
        self.names = names or [f"team_{i}" for i in range(len(teams))]

    def execute(self, task: str) -> Dict:
        """Execute with fallback chain."""
        for i, (team, name) in enumerate(zip(self.teams, self.names)):
            try:
                logger.info(f"Trying {name} (option {i+1}/{len(self.teams)})")
                result = team({
                    "messages": [HumanMessage(content=task)]
                })
                logger.info(f"Success with {name}")
                return {
                    "status": "success",
                    "team": name,
                    "result": result
                }

            except Exception as e:
                logger.warning(f"{name} failed: {e}")
                if i < len(self.teams) - 1:
                    logger.info(f"Trying next option...")
                    continue
                else:
                    logger.error(f"All {len(self.teams)} options failed")
                    return {
                        "status": "failed",
                        "error": "All fallback options exhausted",
                        "last_error": str(e)
                    }

        return {"status": "failed", "error": "No teams available"}

# Build fallback chain
primary_team = (MCPTeamBuilder("primary")
    .with_llm(llm)
    .with_mcp_server_sse("https://primary-api.example.com/sse")
    .build()
)

secondary_team = (MCPTeamBuilder("secondary")
    .with_llm(llm)
    .with_mcp_server_sse("https://backup-api.example.com/sse")
    .build()
)

tertiary_team = (MCPTeamBuilder("tertiary")
    .with_llm(llm)
    .with_mcp_server("python", ["local_server.py"])
    .build()
)

fallback = FallbackChain(
    teams=[primary_team, secondary_team, tertiary_team],
    names=["Primary API", "Backup API", "Local Server"]
)

result = fallback.execute("Process data")

Retry Strategies

Exponential Backoff

import time
from typing import Callable, Any

def exponential_backoff_retry(
    func: Callable,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
) -> Any:
    """Execute function with exponential backoff retry."""

    for attempt in range(max_retries):
        try:
            result = func()
            if attempt > 0:
                logger.info(f"Succeeded on attempt {attempt + 1}")
            return result

        except Exception as e:
            if attempt < max_retries - 1:
                # Calculate delay with exponential backoff
                delay = min(
                    base_delay * (exponential_base ** attempt),
                    max_delay
                )

                logger.warning(
                    f"Attempt {attempt + 1}/{max_retries} failed: {e}. "
                    f"Retrying in {delay:.1f}s..."
                )
                time.sleep(delay)
            else:
                logger.error(f"All {max_retries} attempts failed")
                raise

# Usage
def execute_task():
    return team({"messages": [HumanMessage(content="Task")]})

result = exponential_backoff_retry(
    execute_task,
    max_retries=5,
    base_delay=1.0,
    max_delay=60.0
)

Jittered Backoff

import random

def jittered_backoff_retry(
    func: Callable,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
) -> Any:
    """Execute with jittered exponential backoff."""

    for attempt in range(max_retries):
        try:
            return func()

        except Exception as e:
            if attempt < max_retries - 1:
                # Calculate delay with jitter
                exponential_delay = base_delay * (2 ** attempt)
                jitter = random.uniform(0, exponential_delay * 0.1)
                delay = min(exponential_delay + jitter, max_delay)

                logger.warning(
                    f"Attempt {attempt + 1} failed. "
                    f"Retrying in {delay:.2f}s..."
                )
                time.sleep(delay)
            else:
                raise

# Usage
result = jittered_backoff_retry(
    lambda: team({"messages": [HumanMessage(content="Task")]}),
    max_retries=5
)

Adaptive Retry

class AdaptiveRetry:
    """Adaptive retry strategy based on error type."""

    def __init__(self):
        self.retry_config = {
            ConnectionError: {"retries": 5, "delay": 2.0},
            TimeoutError: {"retries": 3, "delay": 5.0},
            ValueError: {"retries": 1, "delay": 0.0},  # Don't retry input errors
            RuntimeError: {"retries": 3, "delay": 1.0}
        }

    def execute(self, func: Callable) -> Any:
        """Execute with adaptive retry strategy."""
        last_exception = None

        for error_type, config in self.retry_config.items():
            for attempt in range(config["retries"]):
                try:
                    return func()

                except error_type as e:
                    last_exception = e
                    if attempt < config["retries"] - 1:
                        logger.warning(
                            f"{error_type.__name__} on attempt {attempt + 1}. "
                            f"Retrying in {config['delay']}s..."
                        )
                        time.sleep(config["delay"])
                    else:
                        logger.error(
                            f"Max retries ({config['retries']}) reached for {error_type.__name__}"
                        )

                except Exception as e:
                    # Unknown error type - fail immediately
                    logger.error(f"Unknown error type: {type(e).__name__}")
                    raise

        if last_exception:
            raise last_exception

# Usage
adaptive = AdaptiveRetry()
result = adaptive.execute(
    lambda: team({"messages": [HumanMessage(content="Task")]})
)

Circuit Breaker Pattern

Basic Circuit Breaker

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Circuit tripped, rejecting requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker pattern implementation."""

    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 2,
        timeout: int = 60
    ):
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout

        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func: Callable) -> Any:
        """Execute function through circuit breaker."""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                logger.info("Circuit breaker: OPEN -> HALF_OPEN")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func()
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        if not self.last_failure_time:
            return False

        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.timeout

    def _on_success(self):
        """Handle successful call."""
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
                logger.info("Circuit breaker: HALF_OPEN -> CLOSED")

    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            logger.warning("Circuit breaker: HALF_OPEN -> OPEN")

        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.warning("Circuit breaker: CLOSED -> OPEN")

    def get_state(self) -> CircuitState:
        """Get current circuit state."""
        return self.state

# Usage
circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    success_threshold=2,
    timeout=60
)

def execute_with_circuit_breaker(task: str):
    """Execute task through circuit breaker."""
    try:
        return circuit_breaker.call(
            lambda: team({"messages": [HumanMessage(content=task)]})
        )
    except Exception as e:
        logger.error(f"Circuit breaker prevented call or execution failed: {e}")
        # Use fallback
        return fallback_execution(task)

# Execute tasks
for i in range(10):
    try:
        result = execute_with_circuit_breaker(f"Task {i}")
        logger.info(f"Task {i} completed")
    except Exception as e:
        logger.error(f"Task {i} failed: {e}")

Multi-Service Circuit Breaker

class MultiServiceCircuitBreaker:
    """Circuit breaker for multiple services."""

    def __init__(self):
        self.breakers = {}

    def get_breaker(self, service_id: str) -> CircuitBreaker:
        """Get or create circuit breaker for service."""
        if service_id not in self.breakers:
            self.breakers[service_id] = CircuitBreaker(
                failure_threshold=5,
                success_threshold=2,
                timeout=60
            )
        return self.breakers[service_id]

    def call(self, service_id: str, func: Callable) -> Any:
        """Execute function through service-specific breaker."""
        breaker = self.get_breaker(service_id)
        return breaker.call(func)

    def get_status(self) -> Dict:
        """Get status of all circuit breakers."""
        return {
            service_id: breaker.get_state().value
            for service_id, breaker in self.breakers.items()
        }

# Usage
multi_breaker = MultiServiceCircuitBreaker()

# Execute against different services
try:
    result1 = multi_breaker.call(
        "github_api",
        lambda: github_team({"messages": [HumanMessage(content="Task")]})
    )
except Exception as e:
    logger.error(f"GitHub API failed: {e}")

try:
    result2 = multi_breaker.call(
        "search_api",
        lambda: search_team({"messages": [HumanMessage(content="Task")]})
    )
except Exception as e:
    logger.error(f"Search API failed: {e}")

# Check status
status = multi_breaker.get_status()
logger.info(f"Circuit breaker status: {status}")

Fallback Mechanisms

Static Fallback

def execute_with_static_fallback(team, task: str, fallback_response: str):
    """Execute with static fallback response."""
    try:
        result = team({"messages": [HumanMessage(content=task)]})
        return result["messages"][-1].content

    except Exception as e:
        logger.error(f"Execution failed: {e}. Using fallback response.")
        return fallback_response

# Usage
result = execute_with_static_fallback(
    team,
    "Fetch user data",
    fallback_response="Unable to fetch data. Please try again later."
)

Dynamic Fallback

class DynamicFallback:
    """Dynamic fallback based on error type and context."""

    def __init__(self, primary_team, fallback_team):
        self.primary_team = primary_team
        self.fallback_team = fallback_team

    def execute(self, task: str) -> Dict:
        """Execute with dynamic fallback."""
        try:
            result = self.primary_team({
                "messages": [HumanMessage(content=task)]
            })
            return {
                "status": "success",
                "source": "primary",
                "result": result
            }

        except ConnectionError as e:
            logger.warning(f"Primary unreachable: {e}. Using fallback...")
            return self._use_fallback(task, "connection_error")

        except TimeoutError as e:
            logger.warning(f"Primary timeout: {e}. Using fallback...")
            return self._use_fallback(task, "timeout")

        except Exception as e:
            logger.error(f"Primary failed: {e}")
            return self._use_fallback(task, "general_error")

    def _use_fallback(self, task: str, reason: str) -> Dict:
        """Use fallback team."""
        try:
            result = self.fallback_team({
                "messages": [HumanMessage(content=task)]
            })
            return {
                "status": "degraded",
                "source": "fallback",
                "reason": reason,
                "result": result
            }
        except Exception as e:
            logger.error(f"Fallback also failed: {e}")
            return {
                "status": "failed",
                "reason": reason,
                "error": str(e)
            }

# Usage
primary = (MCPTeamBuilder("primary")
    .with_llm(llm)
    .with_mcp_server_sse("https://primary.example.com/sse")
    .build()
)

fallback = (MCPTeamBuilder("fallback")
    .with_llm(llm)
    .with_mcp_server("python", ["local_server.py"])
    .build()
)

dynamic_fallback = DynamicFallback(primary, fallback)
result = dynamic_fallback.execute("Process data")

Cached Fallback

import pickle
from pathlib import Path

class CachedFallback:
    """Use cached results as fallback."""

    def __init__(self, team, cache_dir: str = "/tmp/mcp_cache"):
        self.team = team
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _get_cache_key(self, task: str) -> str:
        """Generate cache key from task."""
        import hashlib
        return hashlib.md5(task.encode()).hexdigest()

    def _load_from_cache(self, cache_key: str):
        """Load result from cache."""
        cache_file = self.cache_dir / f"{cache_key}.pkl"
        if cache_file.exists():
            with open(cache_file, 'rb') as f:
                return pickle.load(f)
        return None

    def _save_to_cache(self, cache_key: str, result):
        """Save result to cache."""
        cache_file = self.cache_dir / f"{cache_key}.pkl"
        with open(cache_file, 'wb') as f:
            pickle.dump(result, f)

    def execute(self, task: str, use_cache: bool = True) -> Dict:
        """Execute with cached fallback."""
        cache_key = self._get_cache_key(task)

        try:
            result = self.team({
                "messages": [HumanMessage(content=task)]
            })

            # Save to cache
            if use_cache:
                self._save_to_cache(cache_key, result)

            return {
                "status": "success",
                "source": "live",
                "result": result
            }

        except Exception as e:
            logger.error(f"Live execution failed: {e}")

            if use_cache:
                # Try cache
                cached_result = self._load_from_cache(cache_key)
                if cached_result:
                    logger.info("Using cached result")
                    return {
                        "status": "degraded",
                        "source": "cache",
                        "result": cached_result
                    }

            return {
                "status": "failed",
                "error": str(e)
            }

# Usage
cached_fallback = CachedFallback(team, cache_dir="/tmp/mcp_cache")
result = cached_fallback.execute("Analyze data", use_cache=True)

Timeout Management

Per-Operation Timeouts

import asyncio
from functools import wraps

def with_timeout(seconds: int):
    """Decorator for operation timeout."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                logger.error(f"{func.__name__} timed out after {seconds}s")
                raise TimeoutError(f"Operation timed out after {seconds}s")
        return wrapper
    return decorator

# Usage with async
@with_timeout(30)
async def execute_async(team, task: str):
    """Execute with timeout."""
    return await team.ainvoke({
        "messages": [HumanMessage(content=task)]
    })

# Run
result = asyncio.run(execute_async(team, "Long task"))

Adaptive Timeouts

class AdaptiveTimeout:
    """Adaptive timeout based on historical performance."""

    def __init__(self, initial_timeout: int = 30):
        self.timeout = initial_timeout
        self.execution_times = []
        self.max_history = 100

    def get_timeout(self) -> int:
        """Get current timeout value."""
        if not self.execution_times:
            return self.timeout

        # Calculate adaptive timeout
        import statistics
        mean_time = statistics.mean(self.execution_times)
        std_dev = statistics.stdev(self.execution_times) if len(self.execution_times) > 1 else 0

        # Timeout = mean + 3 * std_dev (covers ~99.7% of cases)
        adaptive_timeout = int(mean_time + 3 * std_dev)

        # Bound timeout
        return max(10, min(adaptive_timeout, 300))  # Between 10s and 5min

    def record_execution(self, elapsed_time: float):
        """Record execution time."""
        self.execution_times.append(elapsed_time)

        # Keep only recent history
        if len(self.execution_times) > self.max_history:
            self.execution_times.pop(0)

    def execute(self, func: Callable) -> Any:
        """Execute with adaptive timeout."""
        timeout = self.get_timeout()
        logger.info(f"Using adaptive timeout: {timeout}s")

        start_time = time.time()

        try:
            # Execute with timeout
            result = func()
            elapsed = time.time() - start_time
            self.record_execution(elapsed)
            logger.info(f"Execution took {elapsed:.2f}s")
            return result

        except TimeoutError:
            logger.error(f"Execution exceeded {timeout}s")
            # Don't record timeout in history
            raise

# Usage
adaptive = AdaptiveTimeout(initial_timeout=30)

for i in range(10):
    try:
        result = adaptive.execute(
            lambda: team({"messages": [HumanMessage(content=f"Task {i}")]})
        )
    except TimeoutError:
        logger.error(f"Task {i} timed out")

Hierarchical Timeouts

class HierarchicalTimeout:
    """Manage nested timeouts for complex operations."""

    def __init__(self, total_timeout: int):
        self.total_timeout = total_timeout
        self.start_time = None

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        elapsed = time.time() - self.start_time
        logger.info(f"Total operation took {elapsed:.2f}s")

    def get_remaining_time(self) -> int:
        """Get remaining time in timeout."""
        if not self.start_time:
            return self.total_timeout

        elapsed = time.time() - self.start_time
        remaining = self.total_timeout - elapsed

        if remaining <= 0:
            raise TimeoutError("Total timeout exceeded")

        return int(remaining)

    def execute_step(self, func: Callable, step_name: str) -> Any:
        """Execute step with remaining time."""
        remaining = self.get_remaining_time()
        logger.info(f"Executing {step_name} with {remaining}s remaining")

        # Execute with remaining time as timeout
        # (Implementation depends on how you pass timeout to func)
        return func()

# Usage
with HierarchicalTimeout(total_timeout=120) as ht:
    # Step 1: Data collection (max 30s)
    data = ht.execute_step(
        lambda: collect_data(team),
        "data_collection"
    )

    # Step 2: Processing (uses remaining time)
    processed = ht.execute_step(
        lambda: process_data(team, data),
        "processing"
    )

    # Step 3: Storage (uses remaining time)
    result = ht.execute_step(
        lambda: store_data(team, processed),
        "storage"
    )

Logging and Monitoring

Structured Logging

import logging
import json
from datetime import datetime

class StructuredLogger:
    """Structured logging for MCP operations."""

    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

        # JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(self._json_formatter())
        self.logger.addHandler(handler)

    def _json_formatter(self):
        """Create JSON formatter."""
        class JSONFormatter(logging.Formatter):
            def format(self, record):
                log_data = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "level": record.levelname,
                    "logger": record.name,
                    "message": record.getMessage(),
                    "module": record.module,
                    "function": record.funcName,
                    "line": record.lineno
                }

                if hasattr(record, "extra_data"):
                    log_data.update(record.extra_data)

                return json.dumps(log_data)

        return JSONFormatter()

    def log_execution(
        self,
        operation: str,
        status: str,
        duration: float = None,
        **kwargs
    ):
        """Log operation execution."""
        extra = {
            "operation": operation,
            "status": status,
            "duration_ms": int(duration * 1000) if duration else None,
            **kwargs
        }

        log_record = self.logger.makeRecord(
            self.logger.name,
            logging.INFO,
            "",
            0,
            f"Operation: {operation}",
            (),
            None
        )
        log_record.extra_data = extra

        self.logger.handle(log_record)

# Usage
struct_logger = StructuredLogger("mcp_team")

start_time = time.time()
try:
    result = team({"messages": [HumanMessage(content="Task")]})
    duration = time.time() - start_time

    struct_logger.log_execution(
        operation="team_execution",
        status="success",
        duration=duration,
        team_name="my_team",
        tool_count=len(team.get_tool_names())
    )

except Exception as e:
    duration = time.time() - start_time

    struct_logger.log_execution(
        operation="team_execution",
        status="failed",
        duration=duration,
        error_type=type(e).__name__,
        error_message=str(e)
    )

Metrics Collection

from collections import defaultdict
from typing import Dict
import time

class MetricsCollector:
    """Collect execution metrics."""

    def __init__(self):
        self.metrics = defaultdict(lambda: {
            "count": 0,
            "success": 0,
            "failure": 0,
            "total_duration": 0.0,
            "errors": {}
        })

    def record_execution(
        self,
        operation: str,
        success: bool,
        duration: float,
        error: str = None
    ):
        """Record execution metrics."""
        m = self.metrics[operation]
        m["count"] += 1

        if success:
            m["success"] += 1
        else:
            m["failure"] += 1
            if error:
                error_type = error.split(":")[0]
                m["errors"][error_type] = m["errors"].get(error_type, 0) + 1

        m["total_duration"] += duration

    def get_metrics(self, operation: str = None) -> Dict:
        """Get collected metrics."""
        if operation:
            m = self.metrics[operation]
            return {
                "count": m["count"],
                "success_rate": m["success"] / m["count"] if m["count"] > 0 else 0,
                "failure_rate": m["failure"] / m["count"] if m["count"] > 0 else 0,
                "avg_duration": m["total_duration"] / m["count"] if m["count"] > 0 else 0,
                "errors": dict(m["errors"])
            }
        else:
            return {op: self.get_metrics(op) for op in self.metrics}

    def execute_with_metrics(
        self,
        operation: str,
        func: Callable
    ) -> Any:
        """Execute function and collect metrics."""
        start_time = time.time()

        try:
            result = func()
            duration = time.time() - start_time
            self.record_execution(operation, True, duration)
            return result

        except Exception as e:
            duration = time.time() - start_time
            self.record_execution(operation, False, duration, str(e))
            raise

# Usage
metrics = MetricsCollector()

for i in range(100):
    try:
        result = metrics.execute_with_metrics(
            "team_execution",
            lambda: team({"messages": [HumanMessage(content=f"Task {i}")]})
        )
    except Exception as e:
        logger.error(f"Task {i} failed: {e}")

# Get metrics
execution_metrics = metrics.get_metrics("team_execution")
print(f"Success rate: {execution_metrics['success_rate']:.2%}")
print(f"Average duration: {execution_metrics['avg_duration']:.2f}s")
print(f"Errors: {execution_metrics['errors']}")

Health Checks

class HealthChecker:
    """Monitor system health."""

    def __init__(self, team):
        self.team = team
        self.last_check = None
        self.status = "unknown"

    def check_health(self) -> Dict:
        """Perform health check."""
        health_status = {
            "timestamp": datetime.utcnow().isoformat(),
            "status": "healthy",
            "checks": {}
        }

        # Check tool availability
        try:
            tools = self.team.get_mcp_tool_names()
            health_status["checks"]["tools"] = {
                "status": "pass",
                "count": len(tools)
            }
        except Exception as e:
            health_status["checks"]["tools"] = {
                "status": "fail",
                "error": str(e)
            }
            health_status["status"] = "unhealthy"

        # Check server connectivity
        try:
            server_count = self.team.get_mcp_server_count()
            health_status["checks"]["servers"] = {
                "status": "pass",
                "count": server_count
            }
        except Exception as e:
            health_status["checks"]["servers"] = {
                "status": "fail",
                "error": str(e)
            }
            health_status["status"] = "unhealthy"

        # Test execution
        try:
            start = time.time()
            result = self.team({
                "messages": [HumanMessage(content="health check")]
            })
            duration = time.time() - start

            health_status["checks"]["execution"] = {
                "status": "pass",
                "duration_ms": int(duration * 1000)
            }
        except Exception as e:
            health_status["checks"]["execution"] = {
                "status": "fail",
                "error": str(e)
            }
            health_status["status"] = "unhealthy"

        self.last_check = health_status
        self.status = health_status["status"]

        return health_status

    def is_healthy(self) -> bool:
        """Check if system is healthy."""
        if not self.last_check:
            self.check_health()
        return self.status == "healthy"

# Usage
health_checker = HealthChecker(team)

# Periodic health checks
while True:
    health = health_checker.check_health()
    logger.info(f"Health check: {health['status']}")

    if not health_checker.is_healthy():
        logger.error("System unhealthy!")
        # Take corrective action

    time.sleep(60)  # Check every minute

Production Patterns

Complete Production Setup

"""
production_mcp_team.py - Production-ready MCP team with comprehensive error handling.
"""

import logging
import os
import time
from typing import Dict, Any
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp_production.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class ProductionMCPTeam:
    """Production-ready MCP team with comprehensive error handling."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.team = None
        self.circuit_breaker = CircuitBreaker()
        self.metrics = MetricsCollector()
        self.health_checker = None
        self._initialize()

    def _initialize(self):
        """Initialize team with error handling."""
        try:
            self.team = self._build_team()
            self.health_checker = HealthChecker(self.team)
            logger.info("Team initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize team: {e}")
            raise

    def _build_team(self):
        """Build team with resilience features."""
        builder = (MCPTeamBuilder(self.config["name"])
            .with_llm(ChatOpenAI(
                model=self.config["llm_model"],
                temperature=self.config.get("temperature", 0.7)
            ))
        )

        # Add servers with error handling
        for server in self.config["servers"]:
            try:
                if "url" in server:
                    builder = builder.with_mcp_server_sse(
                        url=server["url"],
                        env=server.get("env", {}),
                        timeout=server.get("timeout", 30)
                    )
                else:
                    builder = builder.with_mcp_server(
                        command=server["command"],
                        args=server["args"],
                        env=server.get("env", {}),
                        timeout=server.get("timeout", 10)
                    )
                logger.info(f"Added server: {server.get('name', 'unnamed')}")
            except Exception as e:
                logger.error(f"Failed to add server {server.get('name')}: {e}")
                if not server.get("optional", False):
                    raise

        # Enable graceful degradation
        builder = builder.skip_failed_servers(True)

        return builder.build()

    def execute(self, task: str, max_retries: int = 3) -> Dict:
        """Execute task with comprehensive error handling."""
        # Check health first
        if not self.health_checker.is_healthy():
            logger.warning("System unhealthy, running health check...")
            self.health_checker.check_health()

        # Execute with retry and circuit breaker
        def execute_task():
            return self.team({
                "messages": [HumanMessage(content=task)]
            })

        try:
            result = self.metrics.execute_with_metrics(
                "task_execution",
                lambda: self.circuit_breaker.call(
                    lambda: exponential_backoff_retry(
                        execute_task,
                        max_retries=max_retries
                    )
                )
            )

            return {
                "status": "success",
                "result": result["messages"][-1].content,
                "metadata": {
                    "circuit_state": self.circuit_breaker.get_state().value,
                    "tools_used": len(self.team.get_tool_names())
                }
            }

        except Exception as e:
            logger.error(f"Task execution failed: {e}")
            return {
                "status": "failed",
                "error": str(e),
                "error_type": type(e).__name__,
                "metadata": {
                    "circuit_state": self.circuit_breaker.get_state().value
                }
            }

    def get_status(self) -> Dict:
        """Get system status."""
        health = self.health_checker.check_health()
        metrics = self.metrics.get_metrics()

        return {
            "health": health,
            "metrics": metrics,
            "circuit_breaker": self.circuit_breaker.get_state().value,
            "tools": self.team.get_mcp_tool_names() if self.team else []
        }

# Configuration
config = {
    "name": "production_team",
    "llm_model": "gpt-4o-mini",
    "temperature": 0.7,
    "servers": [
        {
            "name": "filesystem",
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"],
            "optional": False
        },
        {
            "name": "github",
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-github"],
            "env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")},
            "optional": True
        },
        {
            "name": "search_api",
            "url": "https://search.example.com/sse",
            "env": {"API_KEY": os.getenv("SEARCH_API_KEY")},
            "timeout": 30,
            "optional": True
        }
    ]
}

# Usage
prod_team = ProductionMCPTeam(config)

# Execute task
result = prod_team.execute("Process production data")
print(f"Result: {result}")

# Get status
status = prod_team.get_status()
print(f"System status: {status}")

Testing Error Scenarios

Error Simulation

import pytest
from unittest.mock import Mock, patch

class ErrorSimulator:
    """Simulate various error scenarios for testing."""

    @staticmethod
    def simulate_connection_error():
        """Simulate connection error."""
        raise ConnectionError("Server unreachable")

    @staticmethod
    def simulate_timeout():
        """Simulate timeout."""
        time.sleep(100)  # Exceeds timeout

    @staticmethod
    def simulate_auth_failure():
        """Simulate authentication failure."""
        raise PermissionError("Invalid credentials")

    @staticmethod
    def simulate_intermittent_failure(failure_rate: float = 0.3):
        """Simulate intermittent failures."""
        import random
        if random.random() < failure_rate:
            raise RuntimeError("Intermittent failure")
        return {"success": True}

# Test error handling
def test_connection_error_handling():
    """Test handling of connection errors."""
    with patch.object(team, '__call__', side_effect=ConnectionError("Test")):
        result = execute_with_basic_error_handling(team, "Task")
        assert "error" in result
        assert result["error"] == "Server unreachable"

def test_timeout_handling():
    """Test handling of timeouts."""
    with patch.object(team, '__call__', side_effect=TimeoutError("Test")):
        result = execute_with_basic_error_handling(team, "Task")
        assert "error" in result
        assert result["error"] == "Operation timed out"

def test_retry_mechanism():
    """Test retry mechanism."""
    mock_func = Mock(side_effect=[
        RuntimeError("Attempt 1"),
        RuntimeError("Attempt 2"),
        {"success": True}
    ])

    result = exponential_backoff_retry(mock_func, max_retries=3)
    assert result["success"]
    assert mock_func.call_count == 3

def test_circuit_breaker():
    """Test circuit breaker."""
    breaker = CircuitBreaker(failure_threshold=3)

    # Cause failures to trip circuit
    for i in range(3):
        try:
            breaker.call(lambda: (_ for _ in ()).throw(RuntimeError("Test")))
        except:
            pass

    # Circuit should be open
    assert breaker.get_state() == CircuitState.OPEN

    # Should reject calls
    with pytest.raises(Exception, match="Circuit breaker is OPEN"):
        breaker.call(lambda: None)

Summary

Comprehensive error handling for MCP integrations requires:

  1. Error Detection: Identify and classify different error types
  2. Graceful Degradation: Continue operating with reduced functionality
  3. Retry Strategies: Exponential backoff, jittered retry, adaptive retry
  4. Circuit Breakers: Prevent cascading failures
  5. Fallback Mechanisms: Alternative execution paths
  6. Timeout Management: Prevent hanging operations
  7. Monitoring: Structured logging, metrics, health checks
  8. Testing: Simulate and test error scenarios

Key Takeaways:

  • Always handle errors at multiple levels (connection, execution, tool)
  • Use graceful degradation to maintain core functionality
  • Implement retry strategies with backoff for transient failures
  • Use circuit breakers to protect against cascading failures
  • Provide fallback mechanisms for critical operations
  • Set appropriate timeouts at all levels
  • Monitor and log all errors for debugging
  • Test error scenarios thoroughly before production
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs