Complete guide to building resilient MCP-enabled agents with comprehensive error handling, fault tolerance, and graceful degradation strategies.
Overview
MCP integrations involve multiple failure points: network issues, server crashes, authentication failures, and tool execution errors. Robust error handling is essential for production systems.
Why Error Handling Matters
# ❌ Without error handling - Fragile
team = (MCPTeamBuilder("fragile_team")
.with_llm(llm)
.with_mcp_server_sse("https://api.example.com/sse")
.build()
)
result = team({"messages": [HumanMessage(content="Task")]})
# Fails completely if server is down
# ✅ With error handling - Resilient
team = (MCPTeamBuilder("resilient_team")
.with_llm(llm)
.with_mcp_server_sse(
"https://api.example.com/sse",
timeout=30
)
.skip_failed_servers(True) # Graceful degradation
.build()
)
try:
result = team({"messages": [HumanMessage(content="Task")]})
except Exception as e:
logger.error(f"Task failed: {e}")
# Fallback to alternative approach
result = fallback_execution()
Common Failure Scenarios
- Connection Failures: Server unreachable, network issues
- Authentication Errors: Invalid credentials, expired tokens
- Timeout Errors: Slow operations, unresponsive servers
- Tool Execution Errors: Invalid inputs, runtime exceptions
- Resource Exhaustion: Memory limits, connection limits
- Protocol Errors: Invalid MCP messages, version mismatches
Error Types
1. Connection Errors
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
import logging
logger = logging.getLogger(__name__)
try:
team = (MCPTeamBuilder("connection_test")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server_sse(
url="https://unreachable-server.example.com/sse",
timeout=10
)
.build()
)
except ConnectionError as e:
logger.error(f"Connection failed: {e}")
# Server unreachable
except TimeoutError as e:
logger.error(f"Connection timeout: {e}")
# Server not responding
except Exception as e:
logger.error(f"Unexpected connection error: {e}")
2. Authentication Errors
import os
try:
team = (MCPTeamBuilder("auth_test")
.with_llm(llm)
.with_mcp_server(
"npx",
["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
)
.build()
)
except PermissionError as e:
logger.error(f"Authentication failed: {e}")
# Invalid or missing credentials
except ValueError as e:
logger.error(f"Invalid credentials format: {e}")
# Malformed token or credentials
3. Tool Execution Errors
from langchain_core.messages import HumanMessage
team = (MCPTeamBuilder("tool_test")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.build()
)
try:
result = team({
"messages": [HumanMessage(content="Execute task")]
})
except ValueError as e:
logger.error(f"Invalid input: {e}")
# Tool received invalid arguments
except RuntimeError as e:
logger.error(f"Tool execution failed: {e}")
# Tool encountered runtime error
except Exception as e:
logger.error(f"Unexpected tool error: {e}")
4. Timeout Errors
import asyncio
try:
result = team({
"messages": [HumanMessage(content="Long running task")]
})
except asyncio.TimeoutError as e:
logger.error(f"Operation timeout: {e}")
# Operation took too long
except TimeoutError as e:
logger.error(f"Server timeout: {e}")
# Server didn't respond in time
5. Resource Errors
try:
team = (MCPTeamBuilder("resource_test")
.with_llm(llm)
.with_mcp_server("python", ["memory_intensive_server.py"])
.build()
)
except MemoryError as e:
logger.error(f"Out of memory: {e}")
# Insufficient memory for operation
except OSError as e:
logger.error(f"Resource error: {e}")
# File descriptor limit, disk space, etc.
Basic Error Handling
Try-Catch Pattern
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def execute_with_basic_error_handling(team, task: str):
"""Execute task with basic error handling."""
try:
result = team({
"messages": [HumanMessage(content=task)]
})
logger.info("Task completed successfully")
return result
except ConnectionError as e:
logger.error(f"Connection error: {e}")
return {"error": "Server unreachable", "details": str(e)}
except TimeoutError as e:
logger.error(f"Timeout error: {e}")
return {"error": "Operation timed out", "details": str(e)}
except PermissionError as e:
logger.error(f"Permission error: {e}")
return {"error": "Authentication failed", "details": str(e)}
except ValueError as e:
logger.error(f"Value error: {e}")
return {"error": "Invalid input", "details": str(e)}
except Exception as e:
logger.exception(f"Unexpected error: {e}")
return {"error": "Unexpected error", "details": str(e)}
# Usage
team = (MCPTeamBuilder("basic_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["server.py"])
.build()
)
result = execute_with_basic_error_handling(team, "Process data")
if "error" in result:
print(f"Task failed: {result['error']}")
else:
print(f"Task succeeded: {result['messages'][-1].content}")
Error Context Managers
from contextlib import contextmanager
from typing import Generator
import time
@contextmanager
def error_handler(
operation: str,
logger: logging.Logger = None
) -> Generator[None, None, None]:
"""Context manager for error handling with timing."""
if logger is None:
logger = logging.getLogger(__name__)
start_time = time.time()
logger.info(f"Starting operation: {operation}")
try:
yield
except ConnectionError as e:
logger.error(f"Connection failed in {operation}: {e}")
raise
except TimeoutError as e:
logger.error(f"Timeout in {operation}: {e}")
raise
except Exception as e:
logger.exception(f"Error in {operation}: {e}")
raise
finally:
elapsed = time.time() - start_time
logger.info(f"Operation {operation} took {elapsed:.2f}s")
# Usage
with error_handler("team_execution"):
result = team({"messages": [HumanMessage(content="Task")]})
Defensive Team Building
def build_resilient_team(
name: str,
llm,
server_configs: list,
max_retries: int = 3
):
"""Build team with defensive error handling."""
builder = MCPTeamBuilder(name).with_llm(llm)
# Add servers with error handling
for config in server_configs:
try:
if "url" in config:
# SSE server
builder = builder.with_mcp_server_sse(
url=config["url"],
env=config.get("env", {}),
timeout=config.get("timeout", 30)
)
else:
# STDIO server
builder = builder.with_mcp_server(
command=config["command"],
args=config["args"],
env=config.get("env", {}),
timeout=config.get("timeout", 10)
)
logger.info(f"Added server: {config.get('name', 'unnamed')}")
except Exception as e:
logger.error(f"Failed to add server {config.get('name')}: {e}")
if not config.get("optional", False):
raise # Re-raise if server is required
# Enable graceful degradation
builder = builder.skip_failed_servers(True)
# Build with error handling
try:
team = builder.build()
logger.info(f"Team {name} built successfully")
return team
except Exception as e:
logger.error(f"Failed to build team {name}: {e}")
raise
# Usage
server_configs = [
{
"name": "filesystem",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"],
"optional": False # Required
},
{
"name": "github",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")},
"optional": True # Optional
},
{
"name": "search_api",
"url": "https://search.example.com/sse",
"env": {"API_KEY": os.getenv("SEARCH_API_KEY")},
"timeout": 45,
"optional": True # Optional
}
]
team = build_resilient_team("resilient_team", llm, server_configs)
Connection Errors
Pre-Connection Testing
import requests
from urllib.parse import urlparse
def test_server_connectivity(url: str, timeout: int = 5) -> bool:
"""Test if server is reachable before connecting."""
try:
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
# Try to reach server
response = requests.get(
f"{base_url}/health",
timeout=timeout
)
return response.status_code == 200
except requests.exceptions.ConnectionError:
logger.warning(f"Server {url} unreachable")
return False
except requests.exceptions.Timeout:
logger.warning(f"Server {url} timeout")
return False
except Exception as e:
logger.error(f"Connectivity test failed: {e}")
return False
# Build team with connectivity check
def build_team_with_connection_test(server_url: str):
"""Build team only if server is reachable."""
if not test_server_connectivity(server_url):
logger.error(f"Server {server_url} not available")
# Use fallback configuration
return build_fallback_team()
return (MCPTeamBuilder("tested_team")
.with_llm(llm)
.with_mcp_server_sse(url=server_url, timeout=30)
.build()
)
Connection Retry
import time
from typing import Optional
def connect_with_retry(
builder: MCPTeamBuilder,
max_attempts: int = 3,
backoff_factor: float = 2.0
) -> Optional[object]:
"""Build team with connection retry."""
for attempt in range(max_attempts):
try:
team = builder.build()
logger.info(f"Connected successfully on attempt {attempt + 1}")
return team
except ConnectionError as e:
if attempt < max_attempts - 1:
wait_time = backoff_factor ** attempt
logger.warning(
f"Connection failed (attempt {attempt + 1}/{max_attempts}). "
f"Retrying in {wait_time}s..."
)
time.sleep(wait_time)
else:
logger.error(f"Failed to connect after {max_attempts} attempts")
raise
except Exception as e:
logger.error(f"Unexpected error during connection: {e}")
raise
return None
# Usage
builder = (MCPTeamBuilder("retry_team")
.with_llm(llm)
.with_mcp_server_sse("https://unstable-api.example.com/sse")
)
team = connect_with_retry(builder, max_attempts=3)
if team:
result = team({"messages": [HumanMessage(content="Task")]})
Connection Pooling
class ConnectionPool:
"""Manage pool of MCP team connections."""
def __init__(self, size: int = 5):
self.size = size
self.teams = []
self.available = []
self.in_use = []
def initialize(self, builder_factory):
"""Initialize connection pool."""
for i in range(self.size):
try:
team = builder_factory(f"pool_team_{i}")
self.teams.append(team)
self.available.append(team)
logger.info(f"Initialized team {i+1}/{self.size}")
except Exception as e:
logger.error(f"Failed to initialize team {i+1}: {e}")
def acquire(self, timeout: int = 30):
"""Acquire team from pool."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.available:
team = self.available.pop(0)
self.in_use.append(team)
return team
time.sleep(0.1)
raise TimeoutError("No teams available in pool")
def release(self, team):
"""Release team back to pool."""
if team in self.in_use:
self.in_use.remove(team)
self.available.append(team)
def close_all(self):
"""Close all connections."""
for team in self.teams:
try:
team.cleanup()
except Exception as e:
logger.error(f"Error closing team: {e}")
# Usage
def builder_factory(name: str):
return (MCPTeamBuilder(name)
.with_llm(llm)
.with_mcp_server_sse("https://api.example.com/sse")
.build()
)
pool = ConnectionPool(size=5)
pool.initialize(builder_factory)
# Acquire and use
team = pool.acquire()
try:
result = team({"messages": [HumanMessage(content="Task")]})
finally:
pool.release(team)
Tool Execution Errors
Input Validation
from typing import Any, Dict
import re
class InputValidator:
"""Validate tool inputs before execution."""
@staticmethod
def validate_string(
value: Any,
min_length: int = 0,
max_length: int = 10000,
pattern: str = None
) -> str:
"""Validate string input."""
if not isinstance(value, str):
raise ValueError(f"Expected string, got {type(value).__name__}")
if len(value) < min_length:
raise ValueError(f"String too short (min {min_length})")
if len(value) > max_length:
raise ValueError(f"String too long (max {max_length})")
if pattern and not re.match(pattern, value):
raise ValueError(f"String doesn't match pattern: {pattern}")
return value
@staticmethod
def validate_number(
value: Any,
min_value: float = None,
max_value: float = None
) -> float:
"""Validate numeric input."""
try:
num = float(value)
except (ValueError, TypeError):
raise ValueError(f"Expected number, got {type(value).__name__}")
if min_value is not None and num < min_value:
raise ValueError(f"Number too small (min {min_value})")
if max_value is not None and num > max_value:
raise ValueError(f"Number too large (max {max_value})")
return num
@staticmethod
def validate_dict(
value: Any,
required_keys: list = None,
allowed_keys: list = None
) -> Dict:
"""Validate dictionary input."""
if not isinstance(value, dict):
raise ValueError(f"Expected dict, got {type(value).__name__}")
if required_keys:
missing = set(required_keys) - set(value.keys())
if missing:
raise ValueError(f"Missing required keys: {missing}")
if allowed_keys:
invalid = set(value.keys()) - set(allowed_keys)
if invalid:
raise ValueError(f"Invalid keys: {invalid}")
return value
# Usage in tool execution
validator = InputValidator()
def execute_with_validation(team, task: str, params: Dict):
"""Execute with input validation."""
try:
# Validate inputs
task = validator.validate_string(
task,
min_length=1,
max_length=5000
)
params = validator.validate_dict(
params,
required_keys=["action"],
allowed_keys=["action", "target", "options"]
)
# Execute task
result = team({
"messages": [HumanMessage(content=task)]
})
return result
except ValueError as e:
logger.error(f"Validation failed: {e}")
return {"error": "Invalid input", "details": str(e)}
Safe Tool Execution
import signal
from contextlib import contextmanager
@contextmanager
def timeout_handler(seconds: int):
"""Context manager for execution timeout."""
def timeout_signal_handler(signum, frame):
raise TimeoutError(f"Execution exceeded {seconds} seconds")
# Set signal handler
old_handler = signal.signal(signal.SIGALRM, timeout_signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def safe_execute(
team,
task: str,
timeout: int = 60,
max_retries: int = 3
):
"""Execute with timeout and retry."""
for attempt in range(max_retries):
try:
with timeout_handler(timeout):
result = team({
"messages": [HumanMessage(content=task)]
})
return result
except TimeoutError as e:
logger.warning(f"Attempt {attempt + 1} timed out: {e}")
if attempt < max_retries - 1:
continue
raise
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
return None
Error Recovery
class ErrorRecovery:
"""Automatic error recovery strategies."""
def __init__(self, team):
self.team = team
self.error_history = []
def execute_with_recovery(self, task: str) -> Dict:
"""Execute with automatic error recovery."""
try:
result = self.team({
"messages": [HumanMessage(content=task)]
})
return {"status": "success", "result": result}
except ValueError as e:
# Input validation error - try to fix
logger.warning(f"Input error: {e}. Attempting to fix...")
fixed_task = self._fix_input_error(task, e)
if fixed_task:
return self.execute_with_recovery(fixed_task)
return {"status": "error", "type": "input_error", "details": str(e)}
except TimeoutError as e:
# Timeout - break into smaller tasks
logger.warning(f"Timeout: {e}. Breaking into smaller tasks...")
return self._break_into_subtasks(task)
except RuntimeError as e:
# Runtime error - try alternative approach
logger.warning(f"Runtime error: {e}. Trying alternative...")
return self._try_alternative_approach(task)
except Exception as e:
# Unknown error - log and fail
logger.error(f"Unrecoverable error: {e}")
self.error_history.append({
"task": task,
"error": str(e),
"timestamp": time.time()
})
return {"status": "error", "type": "unknown", "details": str(e)}
def _fix_input_error(self, task: str, error: Exception) -> str:
"""Attempt to fix input error."""
# Implement input fixing logic
# For example: clean special characters, truncate length, etc.
return None
def _break_into_subtasks(self, task: str) -> Dict:
"""Break task into smaller subtasks."""
# Implement task breakdown logic
return {"status": "partial", "message": "Task broken into subtasks"}
def _try_alternative_approach(self, task: str) -> Dict:
"""Try alternative execution approach."""
# Implement alternative strategies
return {"status": "alternative", "message": "Used alternative approach"}
# Usage
recovery = ErrorRecovery(team)
result = recovery.execute_with_recovery("Complex task")
Graceful Degradation
Skip Failed Servers
# Enable graceful degradation during build
resilient_team = (MCPTeamBuilder("resilient_team")
.with_llm(llm)
# Critical server (must succeed)
.with_mcp_server(
"npx",
["-y", "@modelcontextprotocol/server-filesystem", "/data"]
)
# Optional servers (can fail)
.with_mcp_server_sse(
url="https://optional-api-1.example.com/sse",
env={"API_KEY": os.getenv("API_KEY_1")},
timeout=30
)
.with_mcp_server_sse(
url="https://optional-api-2.example.com/sse",
env={"API_KEY": os.getenv("API_KEY_2")},
timeout=30
)
# Enable graceful degradation
.skip_failed_servers(True)
.with_prompt("""You are a resilient assistant.
Available capabilities depend on which servers are online:
- Filesystem (always available)
- Optional API 1 (may be unavailable)
- Optional API 2 (may be unavailable)
If optional services are unavailable, work with available tools only.
Inform user about limited capabilities when needed.
""")
.build()
)
# Check which tools are available
available_tools = resilient_team.get_mcp_tool_names()
logger.info(f"Available tools: {available_tools}")
# Execute with awareness of available capabilities
result = resilient_team({
"messages": [HumanMessage(content="Process data using available tools")]
})
Capability Detection
class CapabilityAwareTeam:
"""Team that adapts to available capabilities."""
def __init__(self, builder: MCPTeamBuilder):
self.team = builder.skip_failed_servers(True).build()
self.capabilities = self._detect_capabilities()
def _detect_capabilities(self) -> Dict[str, bool]:
"""Detect available capabilities."""
available_tools = self.team.get_mcp_tool_names()
return {
"filesystem": any("file" in tool for tool in available_tools),
"database": any("db" in tool or "sql" in tool for tool in available_tools),
"github": any("github" in tool for tool in available_tools),
"search": any("search" in tool for tool in available_tools),
}
def execute_adaptive(self, task: str) -> Dict:
"""Execute task adapting to available capabilities."""
# Check if required capabilities are available
required = self._required_capabilities(task)
available = all(self.capabilities.get(cap, False) for cap in required)
if not available:
missing = [c for c in required if not self.capabilities.get(c, False)]
logger.warning(f"Missing capabilities: {missing}")
return self._execute_with_fallback(task, missing)
# Execute normally
result = self.team({
"messages": [HumanMessage(content=task)]
})
return {"status": "success", "result": result}
def _required_capabilities(self, task: str) -> list:
"""Determine required capabilities from task."""
required = []
task_lower = task.lower()
if any(word in task_lower for word in ["file", "read", "write"]):
required.append("filesystem")
if any(word in task_lower for word in ["database", "query", "sql"]):
required.append("database")
if any(word in task_lower for word in ["github", "repo", "issue"]):
required.append("github")
if any(word in task_lower for word in ["search", "find", "look up"]):
required.append("search")
return required
def _execute_with_fallback(self, task: str, missing: list) -> Dict:
"""Execute with fallback for missing capabilities."""
logger.info(f"Using fallback for missing: {missing}")
# Modify task to work with available capabilities
modified_task = self._modify_task_for_fallback(task, missing)
result = self.team({
"messages": [HumanMessage(content=modified_task)]
})
return {
"status": "degraded",
"missing_capabilities": missing,
"result": result
}
def _modify_task_for_fallback(self, task: str, missing: list) -> str:
"""Modify task to work without missing capabilities."""
modifications = {
"database": "Note: Database unavailable. Use filesystem for data.",
"github": "Note: GitHub unavailable. Save results locally.",
"search": "Note: Search unavailable. Use cached information."
}
prefix = "\n".join(modifications[cap] for cap in missing if cap in modifications)
return f"{prefix}\n\n{task}"
# Usage
builder = (MCPTeamBuilder("adaptive_team")
.with_llm(llm)
.with_mcp_server("npx", ["-y", "@modelcontextprotocol/server-filesystem", "/data"])
.with_mcp_server_sse("https://api.example.com/sse") # May fail
)
adaptive_team = CapabilityAwareTeam(builder)
result = adaptive_team.execute_adaptive("Search and save results to file")
Fallback Chains
class FallbackChain:
"""Execute with chain of fallback options."""
def __init__(self, teams: list, names: list = None):
self.teams = teams
self.names = names or [f"team_{i}" for i in range(len(teams))]
def execute(self, task: str) -> Dict:
"""Execute with fallback chain."""
for i, (team, name) in enumerate(zip(self.teams, self.names)):
try:
logger.info(f"Trying {name} (option {i+1}/{len(self.teams)})")
result = team({
"messages": [HumanMessage(content=task)]
})
logger.info(f"Success with {name}")
return {
"status": "success",
"team": name,
"result": result
}
except Exception as e:
logger.warning(f"{name} failed: {e}")
if i < len(self.teams) - 1:
logger.info(f"Trying next option...")
continue
else:
logger.error(f"All {len(self.teams)} options failed")
return {
"status": "failed",
"error": "All fallback options exhausted",
"last_error": str(e)
}
return {"status": "failed", "error": "No teams available"}
# Build fallback chain
primary_team = (MCPTeamBuilder("primary")
.with_llm(llm)
.with_mcp_server_sse("https://primary-api.example.com/sse")
.build()
)
secondary_team = (MCPTeamBuilder("secondary")
.with_llm(llm)
.with_mcp_server_sse("https://backup-api.example.com/sse")
.build()
)
tertiary_team = (MCPTeamBuilder("tertiary")
.with_llm(llm)
.with_mcp_server("python", ["local_server.py"])
.build()
)
fallback = FallbackChain(
teams=[primary_team, secondary_team, tertiary_team],
names=["Primary API", "Backup API", "Local Server"]
)
result = fallback.execute("Process data")
Retry Strategies
Exponential Backoff
import time
from typing import Callable, Any
def exponential_backoff_retry(
func: Callable,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
) -> Any:
"""Execute function with exponential backoff retry."""
for attempt in range(max_retries):
try:
result = func()
if attempt > 0:
logger.info(f"Succeeded on attempt {attempt + 1}")
return result
except Exception as e:
if attempt < max_retries - 1:
# Calculate delay with exponential backoff
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
logger.warning(
f"Attempt {attempt + 1}/{max_retries} failed: {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
else:
logger.error(f"All {max_retries} attempts failed")
raise
# Usage
def execute_task():
return team({"messages": [HumanMessage(content="Task")]})
result = exponential_backoff_retry(
execute_task,
max_retries=5,
base_delay=1.0,
max_delay=60.0
)
Jittered Backoff
import random
def jittered_backoff_retry(
func: Callable,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> Any:
"""Execute with jittered exponential backoff."""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt < max_retries - 1:
# Calculate delay with jitter
exponential_delay = base_delay * (2 ** attempt)
jitter = random.uniform(0, exponential_delay * 0.1)
delay = min(exponential_delay + jitter, max_delay)
logger.warning(
f"Attempt {attempt + 1} failed. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)
else:
raise
# Usage
result = jittered_backoff_retry(
lambda: team({"messages": [HumanMessage(content="Task")]}),
max_retries=5
)
Adaptive Retry
class AdaptiveRetry:
"""Adaptive retry strategy based on error type."""
def __init__(self):
self.retry_config = {
ConnectionError: {"retries": 5, "delay": 2.0},
TimeoutError: {"retries": 3, "delay": 5.0},
ValueError: {"retries": 1, "delay": 0.0}, # Don't retry input errors
RuntimeError: {"retries": 3, "delay": 1.0}
}
def execute(self, func: Callable) -> Any:
"""Execute with adaptive retry strategy."""
last_exception = None
for error_type, config in self.retry_config.items():
for attempt in range(config["retries"]):
try:
return func()
except error_type as e:
last_exception = e
if attempt < config["retries"] - 1:
logger.warning(
f"{error_type.__name__} on attempt {attempt + 1}. "
f"Retrying in {config['delay']}s..."
)
time.sleep(config["delay"])
else:
logger.error(
f"Max retries ({config['retries']}) reached for {error_type.__name__}"
)
except Exception as e:
# Unknown error type - fail immediately
logger.error(f"Unknown error type: {type(e).__name__}")
raise
if last_exception:
raise last_exception
# Usage
adaptive = AdaptiveRetry()
result = adaptive.execute(
lambda: team({"messages": [HumanMessage(content="Task")]})
)
Circuit Breaker Pattern
Basic Circuit Breaker
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Circuit tripped, rejecting requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""Circuit breaker pattern implementation."""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: int = 60
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable) -> Any:
"""Execute function through circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
logger.info("Circuit breaker: OPEN -> HALF_OPEN")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
if not self.last_failure_time:
return False
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.timeout
def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
logger.info("Circuit breaker: HALF_OPEN -> CLOSED")
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
logger.warning("Circuit breaker: HALF_OPEN -> OPEN")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.warning("Circuit breaker: CLOSED -> OPEN")
def get_state(self) -> CircuitState:
"""Get current circuit state."""
return self.state
# Usage
circuit_breaker = CircuitBreaker(
failure_threshold=5,
success_threshold=2,
timeout=60
)
def execute_with_circuit_breaker(task: str):
"""Execute task through circuit breaker."""
try:
return circuit_breaker.call(
lambda: team({"messages": [HumanMessage(content=task)]})
)
except Exception as e:
logger.error(f"Circuit breaker prevented call or execution failed: {e}")
# Use fallback
return fallback_execution(task)
# Execute tasks
for i in range(10):
try:
result = execute_with_circuit_breaker(f"Task {i}")
logger.info(f"Task {i} completed")
except Exception as e:
logger.error(f"Task {i} failed: {e}")
Multi-Service Circuit Breaker
class MultiServiceCircuitBreaker:
"""Circuit breaker for multiple services."""
def __init__(self):
self.breakers = {}
def get_breaker(self, service_id: str) -> CircuitBreaker:
"""Get or create circuit breaker for service."""
if service_id not in self.breakers:
self.breakers[service_id] = CircuitBreaker(
failure_threshold=5,
success_threshold=2,
timeout=60
)
return self.breakers[service_id]
def call(self, service_id: str, func: Callable) -> Any:
"""Execute function through service-specific breaker."""
breaker = self.get_breaker(service_id)
return breaker.call(func)
def get_status(self) -> Dict:
"""Get status of all circuit breakers."""
return {
service_id: breaker.get_state().value
for service_id, breaker in self.breakers.items()
}
# Usage
multi_breaker = MultiServiceCircuitBreaker()
# Execute against different services
try:
result1 = multi_breaker.call(
"github_api",
lambda: github_team({"messages": [HumanMessage(content="Task")]})
)
except Exception as e:
logger.error(f"GitHub API failed: {e}")
try:
result2 = multi_breaker.call(
"search_api",
lambda: search_team({"messages": [HumanMessage(content="Task")]})
)
except Exception as e:
logger.error(f"Search API failed: {e}")
# Check status
status = multi_breaker.get_status()
logger.info(f"Circuit breaker status: {status}")
Fallback Mechanisms
Static Fallback
def execute_with_static_fallback(team, task: str, fallback_response: str):
"""Execute with static fallback response."""
try:
result = team({"messages": [HumanMessage(content=task)]})
return result["messages"][-1].content
except Exception as e:
logger.error(f"Execution failed: {e}. Using fallback response.")
return fallback_response
# Usage
result = execute_with_static_fallback(
team,
"Fetch user data",
fallback_response="Unable to fetch data. Please try again later."
)
Dynamic Fallback
class DynamicFallback:
"""Dynamic fallback based on error type and context."""
def __init__(self, primary_team, fallback_team):
self.primary_team = primary_team
self.fallback_team = fallback_team
def execute(self, task: str) -> Dict:
"""Execute with dynamic fallback."""
try:
result = self.primary_team({
"messages": [HumanMessage(content=task)]
})
return {
"status": "success",
"source": "primary",
"result": result
}
except ConnectionError as e:
logger.warning(f"Primary unreachable: {e}. Using fallback...")
return self._use_fallback(task, "connection_error")
except TimeoutError as e:
logger.warning(f"Primary timeout: {e}. Using fallback...")
return self._use_fallback(task, "timeout")
except Exception as e:
logger.error(f"Primary failed: {e}")
return self._use_fallback(task, "general_error")
def _use_fallback(self, task: str, reason: str) -> Dict:
"""Use fallback team."""
try:
result = self.fallback_team({
"messages": [HumanMessage(content=task)]
})
return {
"status": "degraded",
"source": "fallback",
"reason": reason,
"result": result
}
except Exception as e:
logger.error(f"Fallback also failed: {e}")
return {
"status": "failed",
"reason": reason,
"error": str(e)
}
# Usage
primary = (MCPTeamBuilder("primary")
.with_llm(llm)
.with_mcp_server_sse("https://primary.example.com/sse")
.build()
)
fallback = (MCPTeamBuilder("fallback")
.with_llm(llm)
.with_mcp_server("python", ["local_server.py"])
.build()
)
dynamic_fallback = DynamicFallback(primary, fallback)
result = dynamic_fallback.execute("Process data")
Cached Fallback
import pickle
from pathlib import Path
class CachedFallback:
"""Use cached results as fallback."""
def __init__(self, team, cache_dir: str = "/tmp/mcp_cache"):
self.team = team
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, task: str) -> str:
"""Generate cache key from task."""
import hashlib
return hashlib.md5(task.encode()).hexdigest()
def _load_from_cache(self, cache_key: str):
"""Load result from cache."""
cache_file = self.cache_dir / f"{cache_key}.pkl"
if cache_file.exists():
with open(cache_file, 'rb') as f:
return pickle.load(f)
return None
def _save_to_cache(self, cache_key: str, result):
"""Save result to cache."""
cache_file = self.cache_dir / f"{cache_key}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
def execute(self, task: str, use_cache: bool = True) -> Dict:
"""Execute with cached fallback."""
cache_key = self._get_cache_key(task)
try:
result = self.team({
"messages": [HumanMessage(content=task)]
})
# Save to cache
if use_cache:
self._save_to_cache(cache_key, result)
return {
"status": "success",
"source": "live",
"result": result
}
except Exception as e:
logger.error(f"Live execution failed: {e}")
if use_cache:
# Try cache
cached_result = self._load_from_cache(cache_key)
if cached_result:
logger.info("Using cached result")
return {
"status": "degraded",
"source": "cache",
"result": cached_result
}
return {
"status": "failed",
"error": str(e)
}
# Usage
cached_fallback = CachedFallback(team, cache_dir="/tmp/mcp_cache")
result = cached_fallback.execute("Analyze data", use_cache=True)
Timeout Management
Per-Operation Timeouts
import asyncio
from functools import wraps
def with_timeout(seconds: int):
"""Decorator for operation timeout."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds
)
except asyncio.TimeoutError:
logger.error(f"{func.__name__} timed out after {seconds}s")
raise TimeoutError(f"Operation timed out after {seconds}s")
return wrapper
return decorator
# Usage with async
@with_timeout(30)
async def execute_async(team, task: str):
"""Execute with timeout."""
return await team.ainvoke({
"messages": [HumanMessage(content=task)]
})
# Run
result = asyncio.run(execute_async(team, "Long task"))
Adaptive Timeouts
class AdaptiveTimeout:
"""Adaptive timeout based on historical performance."""
def __init__(self, initial_timeout: int = 30):
self.timeout = initial_timeout
self.execution_times = []
self.max_history = 100
def get_timeout(self) -> int:
"""Get current timeout value."""
if not self.execution_times:
return self.timeout
# Calculate adaptive timeout
import statistics
mean_time = statistics.mean(self.execution_times)
std_dev = statistics.stdev(self.execution_times) if len(self.execution_times) > 1 else 0
# Timeout = mean + 3 * std_dev (covers ~99.7% of cases)
adaptive_timeout = int(mean_time + 3 * std_dev)
# Bound timeout
return max(10, min(adaptive_timeout, 300)) # Between 10s and 5min
def record_execution(self, elapsed_time: float):
"""Record execution time."""
self.execution_times.append(elapsed_time)
# Keep only recent history
if len(self.execution_times) > self.max_history:
self.execution_times.pop(0)
def execute(self, func: Callable) -> Any:
"""Execute with adaptive timeout."""
timeout = self.get_timeout()
logger.info(f"Using adaptive timeout: {timeout}s")
start_time = time.time()
try:
# Execute with timeout
result = func()
elapsed = time.time() - start_time
self.record_execution(elapsed)
logger.info(f"Execution took {elapsed:.2f}s")
return result
except TimeoutError:
logger.error(f"Execution exceeded {timeout}s")
# Don't record timeout in history
raise
# Usage
adaptive = AdaptiveTimeout(initial_timeout=30)
for i in range(10):
try:
result = adaptive.execute(
lambda: team({"messages": [HumanMessage(content=f"Task {i}")]})
)
except TimeoutError:
logger.error(f"Task {i} timed out")
Hierarchical Timeouts
class HierarchicalTimeout:
"""Manage nested timeouts for complex operations."""
def __init__(self, total_timeout: int):
self.total_timeout = total_timeout
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = time.time() - self.start_time
logger.info(f"Total operation took {elapsed:.2f}s")
def get_remaining_time(self) -> int:
"""Get remaining time in timeout."""
if not self.start_time:
return self.total_timeout
elapsed = time.time() - self.start_time
remaining = self.total_timeout - elapsed
if remaining <= 0:
raise TimeoutError("Total timeout exceeded")
return int(remaining)
def execute_step(self, func: Callable, step_name: str) -> Any:
"""Execute step with remaining time."""
remaining = self.get_remaining_time()
logger.info(f"Executing {step_name} with {remaining}s remaining")
# Execute with remaining time as timeout
# (Implementation depends on how you pass timeout to func)
return func()
# Usage
with HierarchicalTimeout(total_timeout=120) as ht:
# Step 1: Data collection (max 30s)
data = ht.execute_step(
lambda: collect_data(team),
"data_collection"
)
# Step 2: Processing (uses remaining time)
processed = ht.execute_step(
lambda: process_data(team, data),
"processing"
)
# Step 3: Storage (uses remaining time)
result = ht.execute_step(
lambda: store_data(team, processed),
"storage"
)
Logging and Monitoring
Structured Logging
import logging
import json
from datetime import datetime
class StructuredLogger:
"""Structured logging for MCP operations."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(self._json_formatter())
self.logger.addHandler(handler)
def _json_formatter(self):
"""Create JSON formatter."""
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if hasattr(record, "extra_data"):
log_data.update(record.extra_data)
return json.dumps(log_data)
return JSONFormatter()
def log_execution(
self,
operation: str,
status: str,
duration: float = None,
**kwargs
):
"""Log operation execution."""
extra = {
"operation": operation,
"status": status,
"duration_ms": int(duration * 1000) if duration else None,
**kwargs
}
log_record = self.logger.makeRecord(
self.logger.name,
logging.INFO,
"",
0,
f"Operation: {operation}",
(),
None
)
log_record.extra_data = extra
self.logger.handle(log_record)
# Usage
struct_logger = StructuredLogger("mcp_team")
start_time = time.time()
try:
result = team({"messages": [HumanMessage(content="Task")]})
duration = time.time() - start_time
struct_logger.log_execution(
operation="team_execution",
status="success",
duration=duration,
team_name="my_team",
tool_count=len(team.get_tool_names())
)
except Exception as e:
duration = time.time() - start_time
struct_logger.log_execution(
operation="team_execution",
status="failed",
duration=duration,
error_type=type(e).__name__,
error_message=str(e)
)
Metrics Collection
from collections import defaultdict
from typing import Dict
import time
class MetricsCollector:
"""Collect execution metrics."""
def __init__(self):
self.metrics = defaultdict(lambda: {
"count": 0,
"success": 0,
"failure": 0,
"total_duration": 0.0,
"errors": {}
})
def record_execution(
self,
operation: str,
success: bool,
duration: float,
error: str = None
):
"""Record execution metrics."""
m = self.metrics[operation]
m["count"] += 1
if success:
m["success"] += 1
else:
m["failure"] += 1
if error:
error_type = error.split(":")[0]
m["errors"][error_type] = m["errors"].get(error_type, 0) + 1
m["total_duration"] += duration
def get_metrics(self, operation: str = None) -> Dict:
"""Get collected metrics."""
if operation:
m = self.metrics[operation]
return {
"count": m["count"],
"success_rate": m["success"] / m["count"] if m["count"] > 0 else 0,
"failure_rate": m["failure"] / m["count"] if m["count"] > 0 else 0,
"avg_duration": m["total_duration"] / m["count"] if m["count"] > 0 else 0,
"errors": dict(m["errors"])
}
else:
return {op: self.get_metrics(op) for op in self.metrics}
def execute_with_metrics(
self,
operation: str,
func: Callable
) -> Any:
"""Execute function and collect metrics."""
start_time = time.time()
try:
result = func()
duration = time.time() - start_time
self.record_execution(operation, True, duration)
return result
except Exception as e:
duration = time.time() - start_time
self.record_execution(operation, False, duration, str(e))
raise
# Usage
metrics = MetricsCollector()
for i in range(100):
try:
result = metrics.execute_with_metrics(
"team_execution",
lambda: team({"messages": [HumanMessage(content=f"Task {i}")]})
)
except Exception as e:
logger.error(f"Task {i} failed: {e}")
# Get metrics
execution_metrics = metrics.get_metrics("team_execution")
print(f"Success rate: {execution_metrics['success_rate']:.2%}")
print(f"Average duration: {execution_metrics['avg_duration']:.2f}s")
print(f"Errors: {execution_metrics['errors']}")
Health Checks
class HealthChecker:
"""Monitor system health."""
def __init__(self, team):
self.team = team
self.last_check = None
self.status = "unknown"
def check_health(self) -> Dict:
"""Perform health check."""
health_status = {
"timestamp": datetime.utcnow().isoformat(),
"status": "healthy",
"checks": {}
}
# Check tool availability
try:
tools = self.team.get_mcp_tool_names()
health_status["checks"]["tools"] = {
"status": "pass",
"count": len(tools)
}
except Exception as e:
health_status["checks"]["tools"] = {
"status": "fail",
"error": str(e)
}
health_status["status"] = "unhealthy"
# Check server connectivity
try:
server_count = self.team.get_mcp_server_count()
health_status["checks"]["servers"] = {
"status": "pass",
"count": server_count
}
except Exception as e:
health_status["checks"]["servers"] = {
"status": "fail",
"error": str(e)
}
health_status["status"] = "unhealthy"
# Test execution
try:
start = time.time()
result = self.team({
"messages": [HumanMessage(content="health check")]
})
duration = time.time() - start
health_status["checks"]["execution"] = {
"status": "pass",
"duration_ms": int(duration * 1000)
}
except Exception as e:
health_status["checks"]["execution"] = {
"status": "fail",
"error": str(e)
}
health_status["status"] = "unhealthy"
self.last_check = health_status
self.status = health_status["status"]
return health_status
def is_healthy(self) -> bool:
"""Check if system is healthy."""
if not self.last_check:
self.check_health()
return self.status == "healthy"
# Usage
health_checker = HealthChecker(team)
# Periodic health checks
while True:
health = health_checker.check_health()
logger.info(f"Health check: {health['status']}")
if not health_checker.is_healthy():
logger.error("System unhealthy!")
# Take corrective action
time.sleep(60) # Check every minute
Production Patterns
Complete Production Setup
"""
production_mcp_team.py - Production-ready MCP team with comprehensive error handling.
"""
import logging
import os
import time
from typing import Dict, Any
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_production.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ProductionMCPTeam:
"""Production-ready MCP team with comprehensive error handling."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.team = None
self.circuit_breaker = CircuitBreaker()
self.metrics = MetricsCollector()
self.health_checker = None
self._initialize()
def _initialize(self):
"""Initialize team with error handling."""
try:
self.team = self._build_team()
self.health_checker = HealthChecker(self.team)
logger.info("Team initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize team: {e}")
raise
def _build_team(self):
"""Build team with resilience features."""
builder = (MCPTeamBuilder(self.config["name"])
.with_llm(ChatOpenAI(
model=self.config["llm_model"],
temperature=self.config.get("temperature", 0.7)
))
)
# Add servers with error handling
for server in self.config["servers"]:
try:
if "url" in server:
builder = builder.with_mcp_server_sse(
url=server["url"],
env=server.get("env", {}),
timeout=server.get("timeout", 30)
)
else:
builder = builder.with_mcp_server(
command=server["command"],
args=server["args"],
env=server.get("env", {}),
timeout=server.get("timeout", 10)
)
logger.info(f"Added server: {server.get('name', 'unnamed')}")
except Exception as e:
logger.error(f"Failed to add server {server.get('name')}: {e}")
if not server.get("optional", False):
raise
# Enable graceful degradation
builder = builder.skip_failed_servers(True)
return builder.build()
def execute(self, task: str, max_retries: int = 3) -> Dict:
"""Execute task with comprehensive error handling."""
# Check health first
if not self.health_checker.is_healthy():
logger.warning("System unhealthy, running health check...")
self.health_checker.check_health()
# Execute with retry and circuit breaker
def execute_task():
return self.team({
"messages": [HumanMessage(content=task)]
})
try:
result = self.metrics.execute_with_metrics(
"task_execution",
lambda: self.circuit_breaker.call(
lambda: exponential_backoff_retry(
execute_task,
max_retries=max_retries
)
)
)
return {
"status": "success",
"result": result["messages"][-1].content,
"metadata": {
"circuit_state": self.circuit_breaker.get_state().value,
"tools_used": len(self.team.get_tool_names())
}
}
except Exception as e:
logger.error(f"Task execution failed: {e}")
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__,
"metadata": {
"circuit_state": self.circuit_breaker.get_state().value
}
}
def get_status(self) -> Dict:
"""Get system status."""
health = self.health_checker.check_health()
metrics = self.metrics.get_metrics()
return {
"health": health,
"metrics": metrics,
"circuit_breaker": self.circuit_breaker.get_state().value,
"tools": self.team.get_mcp_tool_names() if self.team else []
}
# Configuration
config = {
"name": "production_team",
"llm_model": "gpt-4o-mini",
"temperature": 0.7,
"servers": [
{
"name": "filesystem",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"],
"optional": False
},
{
"name": "github",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")},
"optional": True
},
{
"name": "search_api",
"url": "https://search.example.com/sse",
"env": {"API_KEY": os.getenv("SEARCH_API_KEY")},
"timeout": 30,
"optional": True
}
]
}
# Usage
prod_team = ProductionMCPTeam(config)
# Execute task
result = prod_team.execute("Process production data")
print(f"Result: {result}")
# Get status
status = prod_team.get_status()
print(f"System status: {status}")
Testing Error Scenarios
Error Simulation
import pytest
from unittest.mock import Mock, patch
class ErrorSimulator:
"""Simulate various error scenarios for testing."""
@staticmethod
def simulate_connection_error():
"""Simulate connection error."""
raise ConnectionError("Server unreachable")
@staticmethod
def simulate_timeout():
"""Simulate timeout."""
time.sleep(100) # Exceeds timeout
@staticmethod
def simulate_auth_failure():
"""Simulate authentication failure."""
raise PermissionError("Invalid credentials")
@staticmethod
def simulate_intermittent_failure(failure_rate: float = 0.3):
"""Simulate intermittent failures."""
import random
if random.random() < failure_rate:
raise RuntimeError("Intermittent failure")
return {"success": True}
# Test error handling
def test_connection_error_handling():
"""Test handling of connection errors."""
with patch.object(team, '__call__', side_effect=ConnectionError("Test")):
result = execute_with_basic_error_handling(team, "Task")
assert "error" in result
assert result["error"] == "Server unreachable"
def test_timeout_handling():
"""Test handling of timeouts."""
with patch.object(team, '__call__', side_effect=TimeoutError("Test")):
result = execute_with_basic_error_handling(team, "Task")
assert "error" in result
assert result["error"] == "Operation timed out"
def test_retry_mechanism():
"""Test retry mechanism."""
mock_func = Mock(side_effect=[
RuntimeError("Attempt 1"),
RuntimeError("Attempt 2"),
{"success": True}
])
result = exponential_backoff_retry(mock_func, max_retries=3)
assert result["success"]
assert mock_func.call_count == 3
def test_circuit_breaker():
"""Test circuit breaker."""
breaker = CircuitBreaker(failure_threshold=3)
# Cause failures to trip circuit
for i in range(3):
try:
breaker.call(lambda: (_ for _ in ()).throw(RuntimeError("Test")))
except:
pass
# Circuit should be open
assert breaker.get_state() == CircuitState.OPEN
# Should reject calls
with pytest.raises(Exception, match="Circuit breaker is OPEN"):
breaker.call(lambda: None)
Summary
Comprehensive error handling for MCP integrations requires:
- Error Detection: Identify and classify different error types
- Graceful Degradation: Continue operating with reduced functionality
- Retry Strategies: Exponential backoff, jittered retry, adaptive retry
- Circuit Breakers: Prevent cascading failures
- Fallback Mechanisms: Alternative execution paths
- Timeout Management: Prevent hanging operations
- Monitoring: Structured logging, metrics, health checks
- Testing: Simulate and test error scenarios
Key Takeaways:
- Always handle errors at multiple levels (connection, execution, tool)
- Use graceful degradation to maintain core functionality
- Implement retry strategies with backoff for transient failures
- Use circuit breakers to protect against cascading failures
- Provide fallback mechanisms for critical operations
- Set appropriate timeouts at all levels
- Monitor and log all errors for debugging
- Test error scenarios thoroughly before production