• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Model Context Protocol (MCP)

MCP Transport

Configure transport layers for Model Context Protocol communication.

Transport types define how the MCP client communicates with MCP servers. The az-core supports two transport mechanisms: STDIO (Standard Input/Output) and SSE (Server-Sent Events). This guide provides a comprehensive comparison and usage guide for both transport types.


Transport Overview

MCP (Model Context Protocol) supports multiple transport mechanisms for communication between clients and servers. The choice of transport affects:

  • Performance: Latency, throughput, overhead
  • Deployment: Complexity, scalability, infrastructure
  • Security: Authentication, encryption, isolation
  • Reliability: Error handling, connection management

Architecture Diagram

Client (MCPTeamBuilder)
         |
         v
   Transport Layer
         |
    +----+----+
    |         |
  STDIO      SSE
    |         |
    v         v
Local Process  HTTP Server
(IPC)         (Network)
    |         |
    v         v
MCP Server   MCP Server

STDIO Transport

STDIO (Standard Input/Output) transport uses inter-process communication (IPC) to connect to MCP servers running as local processes.

How STDIO Works

  1. Process Spawning: Client spawns MCP server as child process
  2. IPC Channel: Communication via stdin/stdout pipes
  3. JSON-RPC: Messages serialized as JSON-RPC 2.0
  4. Synchronous: Blocking read/write operations
  5. Process Lifecycle: Server lifecycle managed by client

Architecture

MCPTeamBuilder
      |
      v
subprocess.Popen()
      |
      v
+------------------+
|  Child Process   |
|  (MCP Server)    |
|                  |
|  stdin  <---->   |
|  stdout <---->   |
|  stderr <---->   |
+------------------+

Configuration

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI

team = (MCPTeamBuilder("stdio_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server(
        command="python",           # Executable command
        args=["server.py"],         # Command arguments
        env={                       # Environment variables
            "API_KEY": "secret",
            "LOG_LEVEL": "INFO"
        },
        timeout=10                  # Connection timeout (seconds)
    )
    .build()
)

Internal Configuration

# Internal server configuration for STDIO
{
    "command": "python",
    "args": ["server.py", "--port", "8080"],
    "env": {
        "API_KEY": "secret",
        "HOME": "/home/user"
    },
    "transport": "stdio",
    "timeout": 10
}

Server Implementation

# Example STDIO MCP Server
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import sys

app = Server("file-operations-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools."""
    return [
        Tool(
            name="read_file",
            description="Read contents of a file",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "File path to read"
                    }
                },
                "required": ["path"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool."""
    if name == "read_file":
        path = arguments["path"]
        try:
            with open(path, 'r') as f:
                content = f.read()
            return [TextContent(type="text", text=content)]
        except Exception as e:
            return [TextContent(type="text", text=f"Error: {str(e)}")]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    """Run STDIO server."""
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Advanced STDIO Configuration

# Multiple STDIO servers
team = (MCPTeamBuilder("multi_stdio_team")
    .with_llm(llm)

    # File operations server
    .with_mcp_server(
        command="python",
        args=["servers/file_ops.py"],
        env={"WORKSPACE": "/data"},
        timeout=10
    )

    # Database server
    .with_mcp_server(
        command="python",
        args=["servers/database.py"],
        env={
            "DB_URL": "postgresql://localhost/mydb",
            "DB_POOL_SIZE": "10"
        },
        timeout=15  # Longer timeout for DB connection
    )

    # Python interpreter server
    .with_mcp_server(
        command="python",
        args=["servers/python_executor.py"],
        env={"PYTHON_PATH": "/usr/bin/python3"},
        timeout=5
    )

    .build()
)

STDIO with Custom Interpreters

# Node.js MCP server
team_node = (MCPTeamBuilder("node_team")
    .with_llm(llm)
    .with_mcp_server(
        command="node",
        args=["server.js"],
        env={"NODE_ENV": "production"}
    )
    .build()
)

# Compiled binary server
team_binary = (MCPTeamBuilder("binary_team")
    .with_llm(llm)
    .with_mcp_server(
        command="./mcp_server",
        args=["--config", "config.json"],
        env={"LD_LIBRARY_PATH": "/usr/local/lib"}
    )
    .build()
)

# Docker containerized server
team_docker = (MCPTeamBuilder("docker_team")
    .with_llm(llm)
    .with_mcp_server(
        command="docker",
        args=[
            "run",
            "--rm",
            "-i",
            "mcp-server:latest"
        ],
        timeout=30  # Longer timeout for container startup
    )
    .build()
)

STDIO Process Management

import subprocess
import signal
import time

class MCPServerManager:
    """Manage STDIO MCP server lifecycle."""

    def __init__(self, command: str, args: list):
        self.command = command
        self.args = args
        self.process = None

    def start(self):
        """Start the MCP server process."""
        self.process = subprocess.Popen(
            [self.command] + self.args,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        print(f"Started MCP server: PID {self.process.pid}")

    def stop(self, timeout: int = 5):
        """Stop the MCP server gracefully."""
        if self.process:
            # Send SIGTERM for graceful shutdown
            self.process.terminate()

            try:
                self.process.wait(timeout=timeout)
                print(f"Server stopped gracefully")
            except subprocess.TimeoutExpired:
                # Force kill if timeout
                self.process.kill()
                print(f"Server force killed after timeout")

    def restart(self):
        """Restart the MCP server."""
        self.stop()
        time.sleep(1)
        self.start()

    def is_alive(self) -> bool:
        """Check if server is running."""
        return self.process and self.process.poll() is None

# Usage
manager = MCPServerManager("python", ["server.py"])
manager.start()

# ... use the server ...

manager.stop()

STDIO Advantages

  1. Low Latency: Direct IPC communication
  2. Simple Setup: No network configuration required
  3. Process Isolation: Each server runs in isolated process
  4. Resource Control: Easy to limit CPU/memory per process
  5. No Network Overhead: No HTTP headers, connection pooling, etc.
  6. Built-in Logging: stderr available for logs

STDIO Disadvantages

  1. Not Scalable: Limited to single machine
  2. No Load Balancing: Cannot distribute across servers
  3. Process Management: Must handle process lifecycle
  4. Platform Dependent: Process spawning differs by OS
  5. No Remote Access: Cannot connect from other machines
  6. Limited Parallelism: One process per connection

Best Use Cases for STDIO

  • Development: Fast iteration and debugging
  • Single-User Applications: Desktop apps, CLI tools
  • Local Processing: File operations, local databases
  • Low-Latency Requirements: Real-time processing
  • Simple Deployment: No server infrastructure needed
  • Resource-Constrained: Minimal network overhead

SSE Transport

SSE (Server-Sent Events) transport uses HTTP for communication with remote MCP servers.

How SSE Works

  1. HTTP Connection: Client connects to server via HTTP
  2. Event Stream: Server sends events over persistent connection
  3. JSON-RPC: Messages serialized as JSON-RPC 2.0
  4. Asynchronous: Non-blocking event handling
  5. Server Lifecycle: Server runs independently of client

Architecture

MCPTeamBuilder
      |
      v
HTTP Client
      |
      v (HTTPS)
+------------------+
|   HTTP Server    |
|                  |
|  SSE Endpoint    |
|  /mcp/sse        |
|                  |
|  MCP Server      |
+------------------+

Configuration

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI

team = (MCPTeamBuilder("sse_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server_sse(
        url="http://localhost:8000/sse",   # Server URL
        env={                               # Auth/config headers
            "AUTH_TOKEN": "Bearer secret",
            "API_VERSION": "v1"
        },
        timeout=30,                         # Connection timeout
        sse_read_timeout=60                 # Read timeout
    )
    .build()
)

Internal Configuration

# Internal server configuration for SSE
{
    "url": "http://localhost:8000/sse",
    "transport": "sse",
    "env": {
        "AUTH_TOKEN": "Bearer secret",
        "API_VERSION": "v1"
    },
    "timeout": 30,
    "sse_read_timeout": 60
}

Server Implementation

# Example SSE MCP Server with FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncio
import json

app = FastAPI()
mcp_server = Server("api-server")

@mcp_server.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools."""
    return [
        Tool(
            name="api_call",
            description="Make API call to external service",
            inputSchema={
                "type": "object",
                "properties": {
                    "endpoint": {"type": "string"},
                    "method": {"type": "string"}
                },
                "required": ["endpoint", "method"]
            }
        )
    ]

@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool."""
    if name == "api_call":
        # Implement API call logic
        result = await make_api_call(
            arguments["endpoint"],
            arguments["method"]
        )
        return [TextContent(type="text", text=str(result))]

    raise ValueError(f"Unknown tool: {name}")

async def event_generator(request: Request):
    """Generate SSE events."""
    while True:
        if await request.is_disconnected():
            break

        # Send events to client
        event_data = {
            "type": "tool_response",
            "data": "..."
        }

        yield f"data: {json.dumps(event_data)}\n\n"
        await asyncio.sleep(0.1)

@app.get("/sse")
async def sse_endpoint(request: Request):
    """SSE endpoint for MCP communication."""
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Running the SSE Server

# Install dependencies
pip install fastapi uvicorn mcp

# Run the server
python sse_server.py

# Or with uvicorn directly
uvicorn sse_server:app --host 0.0.0.0 --port 8000 --reload

Advanced SSE Configuration

# Multiple SSE servers
team = (MCPTeamBuilder("multi_sse_team")
    .with_llm(llm)

    # Production API server
    .with_mcp_server_sse(
        url="https://api.production.com/mcp/sse",
        env={
            "AUTH_TOKEN": os.getenv("PROD_API_TOKEN"),
            "TENANT_ID": "tenant-123"
        },
        timeout=45,
        sse_read_timeout=90
    )

    # Analytics server
    .with_mcp_server_sse(
        url="https://analytics.example.com/sse",
        env={
            "API_KEY": os.getenv("ANALYTICS_API_KEY")
        },
        timeout=30,
        sse_read_timeout=60
    )

    # External service
    .with_mcp_server_sse(
        url="https://external-service.com/mcp",
        env={
            "CLIENT_ID": os.getenv("CLIENT_ID"),
            "CLIENT_SECRET": os.getenv("CLIENT_SECRET")
        },
        timeout=60,
        sse_read_timeout=120
    )

    .build()
)

SSE with Authentication

# Bearer token authentication
team = (MCPTeamBuilder("auth_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={
            "AUTH_TOKEN": f"Bearer {os.getenv('API_TOKEN')}"
        }
    )
    .build()
)

# API key authentication
team = (MCPTeamBuilder("api_key_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={
            "X-API-KEY": os.getenv('API_KEY'),
            "X-API-SECRET": os.getenv('API_SECRET')
        }
    )
    .build()
)

# OAuth2 authentication
import requests

def get_oauth_token():
    """Get OAuth2 token."""
    response = requests.post(
        "https://auth.example.com/token",
        data={
            "grant_type": "client_credentials",
            "client_id": os.getenv("CLIENT_ID"),
            "client_secret": os.getenv("CLIENT_SECRET")
        }
    )
    return response.json()["access_token"]

team = (MCPTeamBuilder("oauth_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={
            "AUTH_TOKEN": f"Bearer {get_oauth_token()}"
        },
        timeout=60
    )
    .build()
)

SSE Connection Management

import aiohttp
import asyncio
from typing import AsyncIterator

class SSEConnectionManager:
    """Manage SSE connections with reconnection logic."""

    def __init__(self, url: str, auth_token: str):
        self.url = url
        self.auth_token = auth_token
        self.session = None
        self.connected = False

    async def connect(self):
        """Establish SSE connection."""
        self.session = aiohttp.ClientSession()

        headers = {
            "Authorization": f"Bearer {self.auth_token}",
            "Accept": "text/event-stream"
        }

        try:
            self.response = await self.session.get(
                self.url,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=30)
            )

            if self.response.status == 200:
                self.connected = True
                print(f"Connected to {self.url}")
            else:
                raise Exception(f"Connection failed: {self.response.status}")

        except Exception as e:
            print(f"Connection error: {e}")
            await self.disconnect()
            raise

    async def disconnect(self):
        """Close SSE connection."""
        self.connected = False
        if self.session:
            await self.session.close()
            print(f"Disconnected from {self.url}")

    async def read_events(self) -> AsyncIterator[dict]:
        """Read events from SSE stream."""
        async for line in self.response.content:
            line = line.decode('utf-8').strip()

            if line.startswith('data: '):
                data = line[6:]  # Remove 'data: ' prefix
                yield json.loads(data)

    async def with_reconnection(self, max_retries: int = 3):
        """Connect with automatic reconnection."""
        retries = 0

        while retries < max_retries:
            try:
                await self.connect()
                return
            except Exception as e:
                retries += 1
                wait_time = 2 ** retries  # Exponential backoff
                print(f"Retry {retries}/{max_retries} in {wait_time}s...")
                await asyncio.sleep(wait_time)

        raise Exception(f"Failed to connect after {max_retries} retries")

# Usage
async def main():
    manager = SSEConnectionManager(
        url="https://api.example.com/sse",
        auth_token="your-token"
    )

    await manager.with_reconnection()

    async for event in manager.read_events():
        print(f"Received event: {event}")

    await manager.disconnect()

asyncio.run(main())

SSE Load Balancing

# Multiple SSE servers for load balancing
class LoadBalancedSSETeam:
    """MCP team with load-balanced SSE servers."""

    def __init__(self, server_urls: list[str]):
        self.teams = []

        for i, url in enumerate(server_urls):
            team = (MCPTeamBuilder(f"team_{i}")
                .with_llm(ChatOpenAI(model="gpt-4o-mini"))
                .with_mcp_server_sse(url=url)
                .build()
            )
            self.teams.append(team)

        self.current_index = 0

    def get_next_team(self):
        """Round-robin team selection."""
        team = self.teams[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.teams)
        return team

    def execute(self, task: str):
        """Execute task with load balancing."""
        team = self.get_next_team()
        return team({"messages": [HumanMessage(content=task)]})

# Usage
load_balanced_team = LoadBalancedSSETeam([
    "https://server1.example.com/sse",
    "https://server2.example.com/sse",
    "https://server3.example.com/sse"
])

# Requests distributed across servers
result1 = load_balanced_team.execute("Task 1")  # -> server1
result2 = load_balanced_team.execute("Task 2")  # -> server2
result3 = load_balanced_team.execute("Task 3")  # -> server3
result4 = load_balanced_team.execute("Task 4")  # -> server1

SSE Advantages

  1. Scalable: Can serve multiple clients
  2. Remote Access: Connect from anywhere
  3. Load Balancing: Distribute across multiple servers
  4. Centralized Management: Single server for all clients
  5. Independent Lifecycle: Server runs independently
  6. Standard Protocol: HTTP/HTTPS compatibility

SSE Disadvantages

  1. Higher Latency: Network overhead
  2. Complex Setup: Requires web server infrastructure
  3. Network Dependencies: Requires stable network
  4. Authentication Overhead: Token management
  5. Connection Limits: HTTP connection pooling limits
  6. More Resources: Network and server resources

Best Use Cases for SSE

  • Production Systems: Multi-user, scalable applications
  • Microservices: Distributed architecture
  • Cloud Deployment: Containerized services
  • Remote Access: Cross-network communication
  • High Availability: Load balancing and failover
  • Multi-Tenant: Serve multiple customers

Comparison Matrix

FeatureSTDIOSSE
LatencyVery Low (< 1ms IPC)Higher (10-100ms network)
ThroughputHigh (direct memory)Medium (network bandwidth)
ScalabilityLow (single machine)High (distributed)
DeploymentSimple (single process)Complex (web server)
SecurityProcess isolationHTTPS + Auth
Remote AccessNoYes
Load BalancingNoYes
FailoverNoYes
Resource UsageLowMedium-High
Setup ComplexityLowHigh
DebuggingEasy (local logs)Medium (distributed logs)
Production ReadySingle-user onlyMulti-user

Performance Comparison

import time
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI

# STDIO team
stdio_team = (MCPTeamBuilder("stdio_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server("python", ["server.py"])
    .build()
)

# SSE team
sse_team = (MCPTeamBuilder("sse_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server_sse("http://localhost:8000/sse")
    .build()
)

# Benchmark
task = "List files in current directory"

# STDIO timing
start = time.time()
stdio_result = stdio_team({"messages": [HumanMessage(content=task)]})
stdio_time = time.time() - start

# SSE timing
start = time.time()
sse_result = sse_team({"messages": [HumanMessage(content=task)]})
sse_time = time.time() - start

print(f"STDIO: {stdio_time:.3f}s")
print(f"SSE:   {sse_time:.3f}s")
print(f"Difference: {(sse_time - stdio_time):.3f}s")

# Typical results:
# STDIO: 0.012s
# SSE:   0.045s
# Difference: 0.033s (SSE ~3-4x slower for simple operations)

Performance Considerations

STDIO Performance Optimization

# 1. Reuse processes instead of spawning new ones
class PooledSTDIOTeam:
    """Pool of reusable STDIO servers."""

    def __init__(self, pool_size: int = 5):
        self.pool = [
            (MCPTeamBuilder(f"pool_{i}")
                .with_llm(llm)
                .with_mcp_server("python", ["server.py"])
                .build())
            for i in range(pool_size)
        ]
        self.current = 0

    def execute(self, task: str):
        team = self.pool[self.current]
        self.current = (self.current + 1) % len(self.pool)
        return team({"messages": [HumanMessage(content=task)]})

# 2. Use compiled binaries instead of interpreted languages
team = (MCPTeamBuilder("fast_team")
    .with_mcp_server(
        command="./mcp_server_binary",  # Compiled Go/Rust binary
        args=["--fast-mode"]
    )
    .build()
)

# 3. Batch operations when possible
def batch_execute(team, tasks: list[str]):
    """Execute multiple tasks in one go."""
    combined_task = "; ".join(tasks)
    return team({"messages": [HumanMessage(content=combined_task)]})

SSE Performance Optimization

# 1. Connection pooling
from aiohttp import ClientSession, TCPConnector

connector = TCPConnector(
    limit=100,              # Max connections
    limit_per_host=10,      # Max per host
    keepalive_timeout=30    # Keep-alive duration
)

session = ClientSession(connector=connector)

# 2. HTTP/2 for multiplexing
team = (MCPTeamBuilder("http2_team")
    .with_mcp_server_sse(
        url="https://api.example.com/sse",  # HTTP/2 enabled
        timeout=30
    )
    .build()
)

# 3. Regional deployment
teams_by_region = {
    "us-east": MCPTeamBuilder("team_us_east")
        .with_mcp_server_sse("https://us-east.example.com/sse")
        .build(),

    "eu-west": MCPTeamBuilder("team_eu_west")
        .with_mcp_server_sse("https://eu-west.example.com/sse")
        .build(),

    "ap-south": MCPTeamBuilder("team_ap_south")
        .with_mcp_server_sse("https://ap-south.example.com/sse")
        .build()
}

def get_team_for_user(user_region: str):
    """Get nearest team for user."""
    return teams_by_region.get(user_region, teams_by_region["us-east"])

Security Considerations

STDIO Security

# 1. Limit server capabilities
team = (MCPTeamBuilder("secure_stdio")
    .with_mcp_server(
        command="python",
        args=["server.py"],
        env={
            "ALLOWED_PATHS": "/safe/directory",  # Restrict file access
            "READ_ONLY": "true",                 # Prevent modifications
            "MAX_FILE_SIZE": "10485760"          # 10MB limit
        }
    )
    .build()
)

# 2. Run in sandboxed environment
team = (MCPTeamBuilder("sandboxed_team")
    .with_mcp_server(
        command="firejail",  # Sandbox tool
        args=[
            "--private",
            "--net=none",
            "python", "server.py"
        ]
    )
    .build()
)

# 3. Resource limits
import resource

def limit_resources():
    """Set resource limits for server process."""
    # Max 1GB memory
    resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024, 1024*1024*1024))

    # Max 100 open files
    resource.setrlimit(resource.RLIMIT_NOFILE, (100, 100))

    # Max 60 seconds CPU time
    resource.setrlimit(resource.RLIMIT_CPU, (60, 60))

# Apply before spawning server
limit_resources()
team = builder.build()

SSE Security

# 1. HTTPS only in production
team = (MCPTeamBuilder("secure_sse")
    .with_mcp_server_sse(
        url="https://api.example.com/sse",  # HTTPS, not HTTP
        env={
            "AUTH_TOKEN": f"Bearer {os.getenv('API_TOKEN')}"
        }
    )
    .build()
)

# 2. Token rotation
import time
import jwt

class TokenRotatingTeam:
    """Team with automatic token rotation."""

    def __init__(self, url: str, secret: str):
        self.url = url
        self.secret = secret
        self.token_expiry = 0

    def get_fresh_token(self) -> str:
        """Get or refresh JWT token."""
        if time.time() >= self.token_expiry:
            # Generate new token
            payload = {
                "exp": time.time() + 3600,  # 1 hour expiry
                "iat": time.time()
            }
            self.current_token = jwt.encode(payload, self.secret, algorithm="HS256")
            self.token_expiry = payload["exp"]

        return self.current_token

    def build_team(self):
        """Build team with fresh token."""
        return (MCPTeamBuilder("rotating_team")
            .with_llm(llm)
            .with_mcp_server_sse(
                url=self.url,
                env={"AUTH_TOKEN": f"Bearer {self.get_fresh_token()}"}
            )
            .build()
        )

# 3. Rate limiting
from functools import wraps
import time

def rate_limit(max_calls: int, time_window: int):
    """Decorator for rate limiting."""
    calls = []

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()

            # Remove old calls outside time window
            calls[:] = [c for c in calls if c > now - time_window]

            if len(calls) >= max_calls:
                raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")

            calls.append(now)
            return func(*args, **kwargs)

        return wrapper
    return decorator

@rate_limit(max_calls=100, time_window=60)
def execute_task(team, task: str):
    """Execute with rate limiting."""
    return team({"messages": [HumanMessage(content=task)]})

# 4. IP whitelisting (server-side)
# In your SSE server:
ALLOWED_IPS = ["192.168.1.0/24", "10.0.0.0/8"]

@app.get("/sse")
async def sse_endpoint(request: Request):
    client_ip = request.client.host

    if not is_ip_allowed(client_ip, ALLOWED_IPS):
        raise HTTPException(status_code=403, detail="IP not allowed")

    return StreamingResponse(...)

Use Case Selection

When to Use STDIO

Development & Testing:

# Quick iteration during development
dev_team = (MCPTeamBuilder("dev_team")
    .with_llm(llm)
    .with_mcp_server("python", ["dev_server.py"])
    .build()
)

Desktop Applications:

# Single-user desktop app
desktop_app = (MCPTeamBuilder("desktop_app")
    .with_llm(llm)
    .with_mcp_server("python", ["local_tools.py"])
    .with_prompt("You are a desktop assistant.")
    .build()
)

CLI Tools:

# Command-line tool
cli_tool = (MCPTeamBuilder("cli_tool")
    .with_llm(llm)
    .with_mcp_server("python", ["cli_server.py"])
    .build()
)

# Usage: python cli.py "analyze this file"

When to Use SSE

Production Web Services:

# Multi-user web application
web_service = (MCPTeamBuilder("web_service")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://api.production.com/sse",
        env={"AUTH_TOKEN": os.getenv("API_TOKEN")}
    )
    .build()
)

Microservices Architecture:

# Service mesh with multiple MCP services
microservices_team = (MCPTeamBuilder("microservices")
    .with_llm(llm)
    .with_mcp_server_sse("https://user-service.internal/sse")
    .with_mcp_server_sse("https://data-service.internal/sse")
    .with_mcp_server_sse("https://analytics-service.internal/sse")
    .build()
)

Cloud-Native Applications:

# Kubernetes-deployed MCP servers
k8s_team = (MCPTeamBuilder("k8s_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://mcp.cluster.local/sse",
        env={"SERVICE_ACCOUNT_TOKEN": os.getenv("K8S_TOKEN")}
    )
    .build()
)

Hybrid Approach

Combine both transports for optimal performance:

# Local tools via STDIO, remote APIs via SSE
hybrid_team = (MCPTeamBuilder("hybrid_team")
    .with_llm(llm)

    # Fast local operations via STDIO
    .with_mcp_server(
        command="python",
        args=["local_tools.py"]
    )

    # Remote services via SSE
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={"AUTH_TOKEN": os.getenv("API_TOKEN")}
    )

    .build()
)

# Tools from both transports available simultaneously

Advanced Configuration

Timeout Configuration

# STDIO timeouts
stdio_team = (MCPTeamBuilder("stdio_team")
    .with_mcp_server(
        command="python",
        args=["server.py"],
        timeout=10  # Server startup timeout
    )
    .build()
)

# SSE timeouts
sse_team = (MCPTeamBuilder("sse_team")
    .with_mcp_server_sse(
        url="http://localhost:8000/sse",
        timeout=30,              # Initial connection timeout
        sse_read_timeout=60      # Event read timeout
    )
    .build()
)

# Adaptive timeouts based on operation
class AdaptiveTimeoutTeam:
    """Team with adaptive timeout configuration."""

    def __init__(self):
        self.fast_team = (MCPTeamBuilder("fast")
            .with_llm(llm)
            .with_mcp_server_sse(
                url="https://api.example.com/sse",
                timeout=10,
                sse_read_timeout=20
            )
            .build()
        )

        self.slow_team = (MCPTeamBuilder("slow")
            .with_llm(llm)
            .with_mcp_server_sse(
                url="https://api.example.com/sse",
                timeout=60,
                sse_read_timeout=120
            )
            .build()
        )

    def execute(self, task: str, expected_duration: str):
        """Select team based on expected duration."""
        if expected_duration == "fast":
            return self.fast_team({"messages": [HumanMessage(content=task)]})
        else:
            return self.slow_team({"messages": [HumanMessage(content=task)]})

Connection Pooling

# SSE connection pool
from contextlib import asynccontextmanager

class SSEConnectionPool:
    """Pool of reusable SSE connections."""

    def __init__(self, url: str, pool_size: int = 10):
        self.url = url
        self.pool_size = pool_size
        self.connections = []

    async def initialize(self):
        """Initialize connection pool."""
        for i in range(self.pool_size):
            connection = await self.create_connection()
            self.connections.append(connection)

    async def create_connection(self):
        """Create new SSE connection."""
        # Connection creation logic
        pass

    @asynccontextmanager
    async def get_connection(self):
        """Get connection from pool."""
        if not self.connections:
            connection = await self.create_connection()
        else:
            connection = self.connections.pop()

        try:
            yield connection
        finally:
            self.connections.append(connection)

# Usage
pool = SSEConnectionPool("https://api.example.com/sse", pool_size=10)
await pool.initialize()

async with pool.get_connection() as conn:
    # Use connection
    pass

Troubleshooting

STDIO Issues

Problem: Server process doesn't start

# Check server script is executable
import os
os.chmod("server.py", 0o755)

# Use absolute path
import os
server_path = os.path.abspath("server.py")
team = (MCPTeamBuilder("team")
    .with_mcp_server("python", [server_path])
    .build()
)

# Check Python interpreter
team = (MCPTeamBuilder("team")
    .with_mcp_server(
        command="/usr/bin/python3",  # Explicit interpreter
        args=["server.py"]
    )
    .build()
)

Problem: Server crashes silently

# Capture stderr for debugging
import subprocess
import sys

process = subprocess.Popen(
    ["python", "server.py"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)

# Monitor stderr
stderr_output = process.stderr.read()
print(f"Server error: {stderr_output.decode()}")

SSE Issues

Problem: Connection timeout

# Increase timeouts
team = (MCPTeamBuilder("team")
    .with_mcp_server_sse(
        url="http://slow-server.com/sse",
        timeout=90,              # Longer connection timeout
        sse_read_timeout=180     # Longer read timeout
    )
    .build()
)

# Add retry logic
def connect_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            team = (MCPTeamBuilder(f"team_attempt_{attempt}")
                .with_llm(llm)
                .with_mcp_server_sse(url=url)
                .build()
            )
            return team
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

Problem: Authentication fails

# Debug authentication
import requests

# Test endpoint directly
response = requests.get(
    "https://api.example.com/sse",
    headers={"Authorization": f"Bearer {token}"},
    stream=True
)

print(f"Status: {response.status_code}")
print(f"Headers: {response.headers}")

# Verify token format
assert token.startswith("Bearer "), "Token must start with 'Bearer '"

# Check token expiry
import jwt
decoded = jwt.decode(token.split()[1], verify=False)
print(f"Token expires: {decoded.get('exp')}")

Migration Guide

STDIO to SSE Migration

Step 1: Deploy SSE server

# Convert STDIO server to SSE
# Before (STDIO):
# server.py with stdio_server()

# After (SSE):
# sse_server.py with FastAPI
from fastapi import FastAPI
from mcp.server import Server

app = FastAPI()
mcp_server = Server("migrated-server")

# ... (same tool definitions) ...

@app.get("/sse")
async def sse_endpoint():
    return StreamingResponse(...)

Step 2: Update client configuration

# Before (STDIO)
team_old = (MCPTeamBuilder("team")
    .with_llm(llm)
    .with_mcp_server("python", ["server.py"])
    .build()
)

# After (SSE)
team_new = (MCPTeamBuilder("team")
    .with_llm(llm)
    .with_mcp_server_sse("http://localhost:8000/sse")
    .build()
)

Step 3: Gradual rollout

# Support both during transition
USE_SSE = os.getenv("USE_SSE_TRANSPORT", "false") == "true"

if USE_SSE:
    team = (MCPTeamBuilder("team")
        .with_mcp_server_sse("http://localhost:8000/sse")
        .build()
    )
else:
    team = (MCPTeamBuilder("team")
        .with_mcp_server("python", ["server.py"])
        .build()
    )

Summary

Choose STDIO when:

  • Developing locally
  • Building single-user applications
  • Need minimum latency
  • Want simple deployment

Choose SSE when:

  • Building production services
  • Need scalability
  • Require remote access
  • Want centralized management

Use Both when:

  • Local operations need low latency
  • Remote services required for some features
  • Hybrid architecture (local + cloud)
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs