Transport types define how the MCP client communicates with MCP servers. The az-core supports two transport mechanisms: STDIO (Standard Input/Output) and SSE (Server-Sent Events). This guide provides a comprehensive comparison and usage guide for both transport types.
Transport Overview
MCP (Model Context Protocol) supports multiple transport mechanisms for communication between clients and servers. The choice of transport affects:
- Performance: Latency, throughput, overhead
- Deployment: Complexity, scalability, infrastructure
- Security: Authentication, encryption, isolation
- Reliability: Error handling, connection management
Architecture Diagram
Client (MCPTeamBuilder)
|
v
Transport Layer
|
+----+----+
| |
STDIO SSE
| |
v v
Local Process HTTP Server
(IPC) (Network)
| |
v v
MCP Server MCP Server
STDIO Transport
STDIO (Standard Input/Output) transport uses inter-process communication (IPC) to connect to MCP servers running as local processes.
How STDIO Works
- Process Spawning: Client spawns MCP server as child process
- IPC Channel: Communication via stdin/stdout pipes
- JSON-RPC: Messages serialized as JSON-RPC 2.0
- Synchronous: Blocking read/write operations
- Process Lifecycle: Server lifecycle managed by client
Architecture
MCPTeamBuilder
|
v
subprocess.Popen()
|
v
+------------------+
| Child Process |
| (MCP Server) |
| |
| stdin <----> |
| stdout <----> |
| stderr <----> |
+------------------+
Configuration
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
team = (MCPTeamBuilder("stdio_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server(
command="python", # Executable command
args=["server.py"], # Command arguments
env={ # Environment variables
"API_KEY": "secret",
"LOG_LEVEL": "INFO"
},
timeout=10 # Connection timeout (seconds)
)
.build()
)
Internal Configuration
# Internal server configuration for STDIO
{
"command": "python",
"args": ["server.py", "--port", "8080"],
"env": {
"API_KEY": "secret",
"HOME": "/home/user"
},
"transport": "stdio",
"timeout": 10
}
Server Implementation
# Example STDIO MCP Server
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import sys
app = Server("file-operations-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="read_file",
description="Read contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to read"
}
},
"required": ["path"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool."""
if name == "read_file":
path = arguments["path"]
try:
with open(path, 'r') as f:
content = f.read()
return [TextContent(type="text", text=content)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
raise ValueError(f"Unknown tool: {name}")
async def main():
"""Run STDIO server."""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
Advanced STDIO Configuration
# Multiple STDIO servers
team = (MCPTeamBuilder("multi_stdio_team")
.with_llm(llm)
# File operations server
.with_mcp_server(
command="python",
args=["servers/file_ops.py"],
env={"WORKSPACE": "/data"},
timeout=10
)
# Database server
.with_mcp_server(
command="python",
args=["servers/database.py"],
env={
"DB_URL": "postgresql://localhost/mydb",
"DB_POOL_SIZE": "10"
},
timeout=15 # Longer timeout for DB connection
)
# Python interpreter server
.with_mcp_server(
command="python",
args=["servers/python_executor.py"],
env={"PYTHON_PATH": "/usr/bin/python3"},
timeout=5
)
.build()
)
STDIO with Custom Interpreters
# Node.js MCP server
team_node = (MCPTeamBuilder("node_team")
.with_llm(llm)
.with_mcp_server(
command="node",
args=["server.js"],
env={"NODE_ENV": "production"}
)
.build()
)
# Compiled binary server
team_binary = (MCPTeamBuilder("binary_team")
.with_llm(llm)
.with_mcp_server(
command="./mcp_server",
args=["--config", "config.json"],
env={"LD_LIBRARY_PATH": "/usr/local/lib"}
)
.build()
)
# Docker containerized server
team_docker = (MCPTeamBuilder("docker_team")
.with_llm(llm)
.with_mcp_server(
command="docker",
args=[
"run",
"--rm",
"-i",
"mcp-server:latest"
],
timeout=30 # Longer timeout for container startup
)
.build()
)
STDIO Process Management
import subprocess
import signal
import time
class MCPServerManager:
"""Manage STDIO MCP server lifecycle."""
def __init__(self, command: str, args: list):
self.command = command
self.args = args
self.process = None
def start(self):
"""Start the MCP server process."""
self.process = subprocess.Popen(
[self.command] + self.args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print(f"Started MCP server: PID {self.process.pid}")
def stop(self, timeout: int = 5):
"""Stop the MCP server gracefully."""
if self.process:
# Send SIGTERM for graceful shutdown
self.process.terminate()
try:
self.process.wait(timeout=timeout)
print(f"Server stopped gracefully")
except subprocess.TimeoutExpired:
# Force kill if timeout
self.process.kill()
print(f"Server force killed after timeout")
def restart(self):
"""Restart the MCP server."""
self.stop()
time.sleep(1)
self.start()
def is_alive(self) -> bool:
"""Check if server is running."""
return self.process and self.process.poll() is None
# Usage
manager = MCPServerManager("python", ["server.py"])
manager.start()
# ... use the server ...
manager.stop()
STDIO Advantages
- Low Latency: Direct IPC communication
- Simple Setup: No network configuration required
- Process Isolation: Each server runs in isolated process
- Resource Control: Easy to limit CPU/memory per process
- No Network Overhead: No HTTP headers, connection pooling, etc.
- Built-in Logging: stderr available for logs
STDIO Disadvantages
- Not Scalable: Limited to single machine
- No Load Balancing: Cannot distribute across servers
- Process Management: Must handle process lifecycle
- Platform Dependent: Process spawning differs by OS
- No Remote Access: Cannot connect from other machines
- Limited Parallelism: One process per connection
Best Use Cases for STDIO
- Development: Fast iteration and debugging
- Single-User Applications: Desktop apps, CLI tools
- Local Processing: File operations, local databases
- Low-Latency Requirements: Real-time processing
- Simple Deployment: No server infrastructure needed
- Resource-Constrained: Minimal network overhead
SSE Transport
SSE (Server-Sent Events) transport uses HTTP for communication with remote MCP servers.
How SSE Works
- HTTP Connection: Client connects to server via HTTP
- Event Stream: Server sends events over persistent connection
- JSON-RPC: Messages serialized as JSON-RPC 2.0
- Asynchronous: Non-blocking event handling
- Server Lifecycle: Server runs independently of client
Architecture
MCPTeamBuilder
|
v
HTTP Client
|
v (HTTPS)
+------------------+
| HTTP Server |
| |
| SSE Endpoint |
| /mcp/sse |
| |
| MCP Server |
+------------------+
Configuration
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
team = (MCPTeamBuilder("sse_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server_sse(
url="http://localhost:8000/sse", # Server URL
env={ # Auth/config headers
"AUTH_TOKEN": "Bearer secret",
"API_VERSION": "v1"
},
timeout=30, # Connection timeout
sse_read_timeout=60 # Read timeout
)
.build()
)
Internal Configuration
# Internal server configuration for SSE
{
"url": "http://localhost:8000/sse",
"transport": "sse",
"env": {
"AUTH_TOKEN": "Bearer secret",
"API_VERSION": "v1"
},
"timeout": 30,
"sse_read_timeout": 60
}
Server Implementation
# Example SSE MCP Server with FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncio
import json
app = FastAPI()
mcp_server = Server("api-server")
@mcp_server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="api_call",
description="Make API call to external service",
inputSchema={
"type": "object",
"properties": {
"endpoint": {"type": "string"},
"method": {"type": "string"}
},
"required": ["endpoint", "method"]
}
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool."""
if name == "api_call":
# Implement API call logic
result = await make_api_call(
arguments["endpoint"],
arguments["method"]
)
return [TextContent(type="text", text=str(result))]
raise ValueError(f"Unknown tool: {name}")
async def event_generator(request: Request):
"""Generate SSE events."""
while True:
if await request.is_disconnected():
break
# Send events to client
event_data = {
"type": "tool_response",
"data": "..."
}
yield f"data: {json.dumps(event_data)}\n\n"
await asyncio.sleep(0.1)
@app.get("/sse")
async def sse_endpoint(request: Request):
"""SSE endpoint for MCP communication."""
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Running the SSE Server
# Install dependencies
pip install fastapi uvicorn mcp
# Run the server
python sse_server.py
# Or with uvicorn directly
uvicorn sse_server:app --host 0.0.0.0 --port 8000 --reload
Advanced SSE Configuration
# Multiple SSE servers
team = (MCPTeamBuilder("multi_sse_team")
.with_llm(llm)
# Production API server
.with_mcp_server_sse(
url="https://api.production.com/mcp/sse",
env={
"AUTH_TOKEN": os.getenv("PROD_API_TOKEN"),
"TENANT_ID": "tenant-123"
},
timeout=45,
sse_read_timeout=90
)
# Analytics server
.with_mcp_server_sse(
url="https://analytics.example.com/sse",
env={
"API_KEY": os.getenv("ANALYTICS_API_KEY")
},
timeout=30,
sse_read_timeout=60
)
# External service
.with_mcp_server_sse(
url="https://external-service.com/mcp",
env={
"CLIENT_ID": os.getenv("CLIENT_ID"),
"CLIENT_SECRET": os.getenv("CLIENT_SECRET")
},
timeout=60,
sse_read_timeout=120
)
.build()
)
SSE with Authentication
# Bearer token authentication
team = (MCPTeamBuilder("auth_team")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.example.com/sse",
env={
"AUTH_TOKEN": f"Bearer {os.getenv('API_TOKEN')}"
}
)
.build()
)
# API key authentication
team = (MCPTeamBuilder("api_key_team")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.example.com/sse",
env={
"X-API-KEY": os.getenv('API_KEY'),
"X-API-SECRET": os.getenv('API_SECRET')
}
)
.build()
)
# OAuth2 authentication
import requests
def get_oauth_token():
"""Get OAuth2 token."""
response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "client_credentials",
"client_id": os.getenv("CLIENT_ID"),
"client_secret": os.getenv("CLIENT_SECRET")
}
)
return response.json()["access_token"]
team = (MCPTeamBuilder("oauth_team")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.example.com/sse",
env={
"AUTH_TOKEN": f"Bearer {get_oauth_token()}"
},
timeout=60
)
.build()
)
SSE Connection Management
import aiohttp
import asyncio
from typing import AsyncIterator
class SSEConnectionManager:
"""Manage SSE connections with reconnection logic."""
def __init__(self, url: str, auth_token: str):
self.url = url
self.auth_token = auth_token
self.session = None
self.connected = False
async def connect(self):
"""Establish SSE connection."""
self.session = aiohttp.ClientSession()
headers = {
"Authorization": f"Bearer {self.auth_token}",
"Accept": "text/event-stream"
}
try:
self.response = await self.session.get(
self.url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
)
if self.response.status == 200:
self.connected = True
print(f"Connected to {self.url}")
else:
raise Exception(f"Connection failed: {self.response.status}")
except Exception as e:
print(f"Connection error: {e}")
await self.disconnect()
raise
async def disconnect(self):
"""Close SSE connection."""
self.connected = False
if self.session:
await self.session.close()
print(f"Disconnected from {self.url}")
async def read_events(self) -> AsyncIterator[dict]:
"""Read events from SSE stream."""
async for line in self.response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
yield json.loads(data)
async def with_reconnection(self, max_retries: int = 3):
"""Connect with automatic reconnection."""
retries = 0
while retries < max_retries:
try:
await self.connect()
return
except Exception as e:
retries += 1
wait_time = 2 ** retries # Exponential backoff
print(f"Retry {retries}/{max_retries} in {wait_time}s...")
await asyncio.sleep(wait_time)
raise Exception(f"Failed to connect after {max_retries} retries")
# Usage
async def main():
manager = SSEConnectionManager(
url="https://api.example.com/sse",
auth_token="your-token"
)
await manager.with_reconnection()
async for event in manager.read_events():
print(f"Received event: {event}")
await manager.disconnect()
asyncio.run(main())
SSE Load Balancing
# Multiple SSE servers for load balancing
class LoadBalancedSSETeam:
"""MCP team with load-balanced SSE servers."""
def __init__(self, server_urls: list[str]):
self.teams = []
for i, url in enumerate(server_urls):
team = (MCPTeamBuilder(f"team_{i}")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server_sse(url=url)
.build()
)
self.teams.append(team)
self.current_index = 0
def get_next_team(self):
"""Round-robin team selection."""
team = self.teams[self.current_index]
self.current_index = (self.current_index + 1) % len(self.teams)
return team
def execute(self, task: str):
"""Execute task with load balancing."""
team = self.get_next_team()
return team({"messages": [HumanMessage(content=task)]})
# Usage
load_balanced_team = LoadBalancedSSETeam([
"https://server1.example.com/sse",
"https://server2.example.com/sse",
"https://server3.example.com/sse"
])
# Requests distributed across servers
result1 = load_balanced_team.execute("Task 1") # -> server1
result2 = load_balanced_team.execute("Task 2") # -> server2
result3 = load_balanced_team.execute("Task 3") # -> server3
result4 = load_balanced_team.execute("Task 4") # -> server1
SSE Advantages
- Scalable: Can serve multiple clients
- Remote Access: Connect from anywhere
- Load Balancing: Distribute across multiple servers
- Centralized Management: Single server for all clients
- Independent Lifecycle: Server runs independently
- Standard Protocol: HTTP/HTTPS compatibility
SSE Disadvantages
- Higher Latency: Network overhead
- Complex Setup: Requires web server infrastructure
- Network Dependencies: Requires stable network
- Authentication Overhead: Token management
- Connection Limits: HTTP connection pooling limits
- More Resources: Network and server resources
Best Use Cases for SSE
- Production Systems: Multi-user, scalable applications
- Microservices: Distributed architecture
- Cloud Deployment: Containerized services
- Remote Access: Cross-network communication
- High Availability: Load balancing and failover
- Multi-Tenant: Serve multiple customers
Comparison Matrix
| Feature | STDIO | SSE |
|---|---|---|
| Latency | Very Low (< 1ms IPC) | Higher (10-100ms network) |
| Throughput | High (direct memory) | Medium (network bandwidth) |
| Scalability | Low (single machine) | High (distributed) |
| Deployment | Simple (single process) | Complex (web server) |
| Security | Process isolation | HTTPS + Auth |
| Remote Access | No | Yes |
| Load Balancing | No | Yes |
| Failover | No | Yes |
| Resource Usage | Low | Medium-High |
| Setup Complexity | Low | High |
| Debugging | Easy (local logs) | Medium (distributed logs) |
| Production Ready | Single-user only | Multi-user |
Performance Comparison
import time
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
# STDIO team
stdio_team = (MCPTeamBuilder("stdio_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["server.py"])
.build()
)
# SSE team
sse_team = (MCPTeamBuilder("sse_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server_sse("http://localhost:8000/sse")
.build()
)
# Benchmark
task = "List files in current directory"
# STDIO timing
start = time.time()
stdio_result = stdio_team({"messages": [HumanMessage(content=task)]})
stdio_time = time.time() - start
# SSE timing
start = time.time()
sse_result = sse_team({"messages": [HumanMessage(content=task)]})
sse_time = time.time() - start
print(f"STDIO: {stdio_time:.3f}s")
print(f"SSE: {sse_time:.3f}s")
print(f"Difference: {(sse_time - stdio_time):.3f}s")
# Typical results:
# STDIO: 0.012s
# SSE: 0.045s
# Difference: 0.033s (SSE ~3-4x slower for simple operations)
Performance Considerations
STDIO Performance Optimization
# 1. Reuse processes instead of spawning new ones
class PooledSTDIOTeam:
"""Pool of reusable STDIO servers."""
def __init__(self, pool_size: int = 5):
self.pool = [
(MCPTeamBuilder(f"pool_{i}")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.build())
for i in range(pool_size)
]
self.current = 0
def execute(self, task: str):
team = self.pool[self.current]
self.current = (self.current + 1) % len(self.pool)
return team({"messages": [HumanMessage(content=task)]})
# 2. Use compiled binaries instead of interpreted languages
team = (MCPTeamBuilder("fast_team")
.with_mcp_server(
command="./mcp_server_binary", # Compiled Go/Rust binary
args=["--fast-mode"]
)
.build()
)
# 3. Batch operations when possible
def batch_execute(team, tasks: list[str]):
"""Execute multiple tasks in one go."""
combined_task = "; ".join(tasks)
return team({"messages": [HumanMessage(content=combined_task)]})
SSE Performance Optimization
# 1. Connection pooling
from aiohttp import ClientSession, TCPConnector
connector = TCPConnector(
limit=100, # Max connections
limit_per_host=10, # Max per host
keepalive_timeout=30 # Keep-alive duration
)
session = ClientSession(connector=connector)
# 2. HTTP/2 for multiplexing
team = (MCPTeamBuilder("http2_team")
.with_mcp_server_sse(
url="https://api.example.com/sse", # HTTP/2 enabled
timeout=30
)
.build()
)
# 3. Regional deployment
teams_by_region = {
"us-east": MCPTeamBuilder("team_us_east")
.with_mcp_server_sse("https://us-east.example.com/sse")
.build(),
"eu-west": MCPTeamBuilder("team_eu_west")
.with_mcp_server_sse("https://eu-west.example.com/sse")
.build(),
"ap-south": MCPTeamBuilder("team_ap_south")
.with_mcp_server_sse("https://ap-south.example.com/sse")
.build()
}
def get_team_for_user(user_region: str):
"""Get nearest team for user."""
return teams_by_region.get(user_region, teams_by_region["us-east"])
Security Considerations
STDIO Security
# 1. Limit server capabilities
team = (MCPTeamBuilder("secure_stdio")
.with_mcp_server(
command="python",
args=["server.py"],
env={
"ALLOWED_PATHS": "/safe/directory", # Restrict file access
"READ_ONLY": "true", # Prevent modifications
"MAX_FILE_SIZE": "10485760" # 10MB limit
}
)
.build()
)
# 2. Run in sandboxed environment
team = (MCPTeamBuilder("sandboxed_team")
.with_mcp_server(
command="firejail", # Sandbox tool
args=[
"--private",
"--net=none",
"python", "server.py"
]
)
.build()
)
# 3. Resource limits
import resource
def limit_resources():
"""Set resource limits for server process."""
# Max 1GB memory
resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024, 1024*1024*1024))
# Max 100 open files
resource.setrlimit(resource.RLIMIT_NOFILE, (100, 100))
# Max 60 seconds CPU time
resource.setrlimit(resource.RLIMIT_CPU, (60, 60))
# Apply before spawning server
limit_resources()
team = builder.build()
SSE Security
# 1. HTTPS only in production
team = (MCPTeamBuilder("secure_sse")
.with_mcp_server_sse(
url="https://api.example.com/sse", # HTTPS, not HTTP
env={
"AUTH_TOKEN": f"Bearer {os.getenv('API_TOKEN')}"
}
)
.build()
)
# 2. Token rotation
import time
import jwt
class TokenRotatingTeam:
"""Team with automatic token rotation."""
def __init__(self, url: str, secret: str):
self.url = url
self.secret = secret
self.token_expiry = 0
def get_fresh_token(self) -> str:
"""Get or refresh JWT token."""
if time.time() >= self.token_expiry:
# Generate new token
payload = {
"exp": time.time() + 3600, # 1 hour expiry
"iat": time.time()
}
self.current_token = jwt.encode(payload, self.secret, algorithm="HS256")
self.token_expiry = payload["exp"]
return self.current_token
def build_team(self):
"""Build team with fresh token."""
return (MCPTeamBuilder("rotating_team")
.with_llm(llm)
.with_mcp_server_sse(
url=self.url,
env={"AUTH_TOKEN": f"Bearer {self.get_fresh_token()}"}
)
.build()
)
# 3. Rate limiting
from functools import wraps
import time
def rate_limit(max_calls: int, time_window: int):
"""Decorator for rate limiting."""
calls = []
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls outside time window
calls[:] = [c for c in calls if c > now - time_window]
if len(calls) >= max_calls:
raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")
calls.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(max_calls=100, time_window=60)
def execute_task(team, task: str):
"""Execute with rate limiting."""
return team({"messages": [HumanMessage(content=task)]})
# 4. IP whitelisting (server-side)
# In your SSE server:
ALLOWED_IPS = ["192.168.1.0/24", "10.0.0.0/8"]
@app.get("/sse")
async def sse_endpoint(request: Request):
client_ip = request.client.host
if not is_ip_allowed(client_ip, ALLOWED_IPS):
raise HTTPException(status_code=403, detail="IP not allowed")
return StreamingResponse(...)
Use Case Selection
When to Use STDIO
Development & Testing:
# Quick iteration during development
dev_team = (MCPTeamBuilder("dev_team")
.with_llm(llm)
.with_mcp_server("python", ["dev_server.py"])
.build()
)
Desktop Applications:
# Single-user desktop app
desktop_app = (MCPTeamBuilder("desktop_app")
.with_llm(llm)
.with_mcp_server("python", ["local_tools.py"])
.with_prompt("You are a desktop assistant.")
.build()
)
CLI Tools:
# Command-line tool
cli_tool = (MCPTeamBuilder("cli_tool")
.with_llm(llm)
.with_mcp_server("python", ["cli_server.py"])
.build()
)
# Usage: python cli.py "analyze this file"
When to Use SSE
Production Web Services:
# Multi-user web application
web_service = (MCPTeamBuilder("web_service")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.production.com/sse",
env={"AUTH_TOKEN": os.getenv("API_TOKEN")}
)
.build()
)
Microservices Architecture:
# Service mesh with multiple MCP services
microservices_team = (MCPTeamBuilder("microservices")
.with_llm(llm)
.with_mcp_server_sse("https://user-service.internal/sse")
.with_mcp_server_sse("https://data-service.internal/sse")
.with_mcp_server_sse("https://analytics-service.internal/sse")
.build()
)
Cloud-Native Applications:
# Kubernetes-deployed MCP servers
k8s_team = (MCPTeamBuilder("k8s_team")
.with_llm(llm)
.with_mcp_server_sse(
url="https://mcp.cluster.local/sse",
env={"SERVICE_ACCOUNT_TOKEN": os.getenv("K8S_TOKEN")}
)
.build()
)
Hybrid Approach
Combine both transports for optimal performance:
# Local tools via STDIO, remote APIs via SSE
hybrid_team = (MCPTeamBuilder("hybrid_team")
.with_llm(llm)
# Fast local operations via STDIO
.with_mcp_server(
command="python",
args=["local_tools.py"]
)
# Remote services via SSE
.with_mcp_server_sse(
url="https://api.example.com/sse",
env={"AUTH_TOKEN": os.getenv("API_TOKEN")}
)
.build()
)
# Tools from both transports available simultaneously
Advanced Configuration
Timeout Configuration
# STDIO timeouts
stdio_team = (MCPTeamBuilder("stdio_team")
.with_mcp_server(
command="python",
args=["server.py"],
timeout=10 # Server startup timeout
)
.build()
)
# SSE timeouts
sse_team = (MCPTeamBuilder("sse_team")
.with_mcp_server_sse(
url="http://localhost:8000/sse",
timeout=30, # Initial connection timeout
sse_read_timeout=60 # Event read timeout
)
.build()
)
# Adaptive timeouts based on operation
class AdaptiveTimeoutTeam:
"""Team with adaptive timeout configuration."""
def __init__(self):
self.fast_team = (MCPTeamBuilder("fast")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.example.com/sse",
timeout=10,
sse_read_timeout=20
)
.build()
)
self.slow_team = (MCPTeamBuilder("slow")
.with_llm(llm)
.with_mcp_server_sse(
url="https://api.example.com/sse",
timeout=60,
sse_read_timeout=120
)
.build()
)
def execute(self, task: str, expected_duration: str):
"""Select team based on expected duration."""
if expected_duration == "fast":
return self.fast_team({"messages": [HumanMessage(content=task)]})
else:
return self.slow_team({"messages": [HumanMessage(content=task)]})
Connection Pooling
# SSE connection pool
from contextlib import asynccontextmanager
class SSEConnectionPool:
"""Pool of reusable SSE connections."""
def __init__(self, url: str, pool_size: int = 10):
self.url = url
self.pool_size = pool_size
self.connections = []
async def initialize(self):
"""Initialize connection pool."""
for i in range(self.pool_size):
connection = await self.create_connection()
self.connections.append(connection)
async def create_connection(self):
"""Create new SSE connection."""
# Connection creation logic
pass
@asynccontextmanager
async def get_connection(self):
"""Get connection from pool."""
if not self.connections:
connection = await self.create_connection()
else:
connection = self.connections.pop()
try:
yield connection
finally:
self.connections.append(connection)
# Usage
pool = SSEConnectionPool("https://api.example.com/sse", pool_size=10)
await pool.initialize()
async with pool.get_connection() as conn:
# Use connection
pass
Troubleshooting
STDIO Issues
Problem: Server process doesn't start
# Check server script is executable
import os
os.chmod("server.py", 0o755)
# Use absolute path
import os
server_path = os.path.abspath("server.py")
team = (MCPTeamBuilder("team")
.with_mcp_server("python", [server_path])
.build()
)
# Check Python interpreter
team = (MCPTeamBuilder("team")
.with_mcp_server(
command="/usr/bin/python3", # Explicit interpreter
args=["server.py"]
)
.build()
)
Problem: Server crashes silently
# Capture stderr for debugging
import subprocess
import sys
process = subprocess.Popen(
["python", "server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Monitor stderr
stderr_output = process.stderr.read()
print(f"Server error: {stderr_output.decode()}")
SSE Issues
Problem: Connection timeout
# Increase timeouts
team = (MCPTeamBuilder("team")
.with_mcp_server_sse(
url="http://slow-server.com/sse",
timeout=90, # Longer connection timeout
sse_read_timeout=180 # Longer read timeout
)
.build()
)
# Add retry logic
def connect_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
team = (MCPTeamBuilder(f"team_attempt_{attempt}")
.with_llm(llm)
.with_mcp_server_sse(url=url)
.build()
)
return team
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
Problem: Authentication fails
# Debug authentication
import requests
# Test endpoint directly
response = requests.get(
"https://api.example.com/sse",
headers={"Authorization": f"Bearer {token}"},
stream=True
)
print(f"Status: {response.status_code}")
print(f"Headers: {response.headers}")
# Verify token format
assert token.startswith("Bearer "), "Token must start with 'Bearer '"
# Check token expiry
import jwt
decoded = jwt.decode(token.split()[1], verify=False)
print(f"Token expires: {decoded.get('exp')}")
Migration Guide
STDIO to SSE Migration
Step 1: Deploy SSE server
# Convert STDIO server to SSE
# Before (STDIO):
# server.py with stdio_server()
# After (SSE):
# sse_server.py with FastAPI
from fastapi import FastAPI
from mcp.server import Server
app = FastAPI()
mcp_server = Server("migrated-server")
# ... (same tool definitions) ...
@app.get("/sse")
async def sse_endpoint():
return StreamingResponse(...)
Step 2: Update client configuration
# Before (STDIO)
team_old = (MCPTeamBuilder("team")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.build()
)
# After (SSE)
team_new = (MCPTeamBuilder("team")
.with_llm(llm)
.with_mcp_server_sse("http://localhost:8000/sse")
.build()
)
Step 3: Gradual rollout
# Support both during transition
USE_SSE = os.getenv("USE_SSE_TRANSPORT", "false") == "true"
if USE_SSE:
team = (MCPTeamBuilder("team")
.with_mcp_server_sse("http://localhost:8000/sse")
.build()
)
else:
team = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server.py"])
.build()
)
Summary
Choose STDIO when:
- Developing locally
- Building single-user applications
- Need minimum latency
- Want simple deployment
Choose SSE when:
- Building production services
- Need scalability
- Require remote access
- Want centralized management
Use Both when:
- Local operations need low latency
- Remote services required for some features
- Hybrid architecture (local + cloud)