• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Model Context Protocol (MCP)

Custom MCP Servers

Guide to building custom Model Context Protocol servers for Azcore.

Complete guide to building, deploying, and integrating custom MCP servers with the az-core.


Overview

Custom MCP servers allow you to expose your own tools, data sources, and services to AI agents through the Model Context Protocol. This enables seamless integration of proprietary systems, internal APIs, and specialized functionality.

What are Custom Servers?

Custom MCP servers are:

  • User-Created: Built for specific business needs
  • Flexible: Implement any functionality you need
  • Standards-Compliant: Follow MCP protocol specifications
  • Reusable: Can be shared across projects and teams
  • Maintainable: Under your control and ownership

Why Build Custom Servers?

# ❌ Without Custom Server - Direct tool implementation
@tool
def query_internal_crm(customer_id: str) -> str:
    """Query internal CRM system"""
    # Tight coupling with agent code
    # Hard to reuse across projects
    # Difficult to version and maintain
    pass

# ✅ With Custom MCP Server - Clean separation
# Server handles all CRM logic
# Agent just connects to server
team = (MCPTeamBuilder("crm_team")
    .with_llm(llm)
    .with_mcp_server("python", ["servers/crm_server.py"])
    .build()
)

Benefits:

  1. Separation of Concerns: Tool logic separated from agent logic
  2. Reusability: One server, many agents
  3. Versioning: Independent server versioning
  4. Security: Centralized access control
  5. Scalability: Can run on separate infrastructure
  6. Testing: Test tools independently

When to Build Custom Servers

Use Cases for Custom Servers

1. Internal Systems Integration

# Custom server for internal ERP system
"""
crm_server.py - Exposes internal CRM capabilities
- Customer lookup
- Order history
- Contact management
- Sales pipeline access
"""

When to build:

  • Need to integrate proprietary systems
  • Internal APIs not publicly available
  • Business-specific data models
  • Custom authentication requirements

2. Specialized Domain Logic

# Custom server for financial calculations
"""
finance_server.py - Financial analysis tools
- Portfolio optimization
- Risk assessment
- Regulatory compliance checks
- Custom trading strategies
"""

When to build:

  • Domain-specific algorithms
  • Specialized calculations
  • Industry-specific workflows
  • Proprietary business logic

3. Data Pipeline Integration

# Custom server for data processing
"""
data_pipeline_server.py - Data processing tools
- ETL operations
- Data validation
- Schema transformations
- Quality checks
"""

When to build:

  • Custom data formats
  • Proprietary data sources
  • Specialized transformations
  • Internal data lakes

4. Third-Party Service Wrappers

# Custom server for legacy API
"""
legacy_api_server.py - Wrapper for old API
- Translate modern requests to legacy format
- Handle authentication quirks
- Rate limiting and retry logic
- Response normalization
"""

When to build:

  • Legacy systems without modern APIs
  • APIs with complex authentication
  • Need custom retry/error handling
  • Response transformation required

When NOT to Build Custom Servers

Use official servers when:

  • Official server already exists (GitHub, PostgreSQL, etc.)
  • Standard functionality is sufficient
  • No custom business logic needed
  • Quick prototyping is priority

Server Architecture

MCP Server Components

┌─────────────────────────────────────────┐
│         MCP Server Architecture          │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │      Server Instance (app)         │ │
│  │  - Name and metadata               │ │
│  │  - Server lifecycle management     │ │
│  └────────────────────────────────────┘ │
│                   │                      │
│  ┌────────────────┼────────────────────┐│
│  │                │                    ││
│  ▼                ▼                    ▼│
│  Tools       Resources        Prompts  ││
│  - Functions  - Data access   - Templates││
│  - Actions    - Files/DBs     - Examples││
│  └────────────────────────────────────┘│
│                   │                      │
│  ┌────────────────▼────────────────────┐│
│  │      Transport Layer               │ │
│  │  - STDIO (local IPC)               │ │
│  │  - SSE (HTTP streaming)            │ │
│  └────────────────────────────────────┘ │
└─────────────────────────────────────────┘

Core Components

1. Server Instance

from mcp.server import Server

# Create server with name
app = Server("my-custom-server")

Responsibilities:

  • Server initialization
  • Capability registration
  • Request routing
  • Lifecycle management

2. Tools

from mcp.types import Tool

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Register available tools"""
    return [...]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    """Execute tool by name"""
    pass

Responsibilities:

  • Define available functions
  • Validate input parameters
  • Execute operations
  • Return results

3. Resources

from mcp.types import Resource

@app.list_resources()
async def list_resources() -> list[Resource]:
    """Register available resources"""
    return [...]

@app.read_resource()
async def read_resource(uri: str):
    """Read resource by URI"""
    pass

Responsibilities:

  • Expose data sources
  • Provide content access
  • Handle different data types

4. Transport

# STDIO transport
from mcp.server.stdio import stdio_server

async with stdio_server() as (read_stream, write_stream):
    await app.run(read_stream, write_stream, ...)

# SSE transport (FastAPI)
from fastapi import FastAPI
api = FastAPI()

@api.get("/sse")
async def sse_endpoint():
    return StreamingResponse(...)

Building Your First Server

Step 1: Setup Project

# Create project directory
mkdir my-mcp-server
cd my-mcp-server

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install mcp

Step 2: Create Basic Server

Create calculator_server.py:

"""
Simple calculator MCP server.
Demonstrates basic tool implementation.
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create server
app = Server("calculator-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """
    Register available calculator tools.
    Called when client connects to discover capabilities.
    """
    return [
        Tool(
            name="add",
            description="Add two numbers together",
            inputSchema={
                "type": "object",
                "properties": {
                    "a": {
                        "type": "number",
                        "description": "First number"
                    },
                    "b": {
                        "type": "number",
                        "description": "Second number"
                    }
                },
                "required": ["a", "b"]
            }
        ),
        Tool(
            name="multiply",
            description="Multiply two numbers",
            inputSchema={
                "type": "object",
                "properties": {
                    "a": {
                        "type": "number",
                        "description": "First number"
                    },
                    "b": {
                        "type": "number",
                        "description": "Second number"
                    }
                },
                "required": ["a", "b"]
            }
        ),
        Tool(
            name="power",
            description="Raise a number to a power",
            inputSchema={
                "type": "object",
                "properties": {
                    "base": {
                        "type": "number",
                        "description": "Base number"
                    },
                    "exponent": {
                        "type": "number",
                        "description": "Exponent"
                    }
                },
                "required": ["base", "exponent"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """
    Execute tool by name.

    Args:
        name: Tool name to execute
        arguments: Tool arguments as dict

    Returns:
        List of TextContent with results
    """
    logger.info(f"Executing tool: {name} with args: {arguments}")

    try:
        if name == "add":
            a = arguments["a"]
            b = arguments["b"]
            result = a + b
            return [TextContent(
                type="text",
                text=f"Result: {a} + {b} = {result}"
            )]

        elif name == "multiply":
            a = arguments["a"]
            b = arguments["b"]
            result = a * b
            return [TextContent(
                type="text",
                text=f"Result: {a} × {b} = {result}"
            )]

        elif name == "power":
            base = arguments["base"]
            exponent = arguments["exponent"]
            result = base ** exponent
            return [TextContent(
                type="text",
                text=f"Result: {base}^{exponent} = {result}"
            )]

        else:
            raise ValueError(f"Unknown tool: {name}")

    except Exception as e:
        logger.error(f"Tool execution failed: {e}")
        return [TextContent(
            type="text",
            text=f"Error: {str(e)}"
        )]

async def main():
    """Run the MCP server with STDIO transport."""
    logger.info("Starting calculator MCP server...")

    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Step 3: Test the Server

Create test_calculator_client.py:

"""
Test client for calculator server.
"""
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import os

# Setup LLM
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    api_key=os.getenv("OPENAI_API_KEY")
)

# Build team with calculator server
calculator_team = (MCPTeamBuilder("calculator_team")
    .with_llm(llm)
    .with_mcp_server(
        command="python",
        args=["calculator_server.py"],
        timeout=10
    )
    .with_prompt("You are a calculator assistant. Use the available math tools to help users.")
    .build()
)

# Check available tools
print("Available tools:", calculator_team.get_mcp_tool_names())

# Test calculations
test_queries = [
    "What is 25 + 17?",
    "Calculate 8 times 12",
    "What is 2 to the power of 10?",
    "Add 100 and 200, then multiply the result by 3"
]

for query in test_queries:
    print(f"\nQuery: {query}")
    result = calculator_team({
        "messages": [HumanMessage(content=query)]
    })
    print(f"Answer: {result['messages'][-1].content}")

Step 4: Run and Test

# Run test client
python test_calculator_client.py

# Expected output:
# Available tools: ['add', 'multiply', 'power']
# Query: What is 25 + 17?
# Answer: The result of 25 + 17 is 42.
# ...

Tool Implementation

Tool Definition Best Practices

1. Clear Descriptions

# ❌ Vague description
Tool(
    name="process",
    description="Process data",
    inputSchema={...}
)

# ✅ Clear description
Tool(
    name="process_csv",
    description="Process CSV file: validate format, clean data, and compute statistics. Returns summary of rows processed and any errors found.",
    inputSchema={...}
)

2. Detailed Input Schemas

Tool(
    name="search_products",
    description="Search product catalog with filters",
    inputSchema={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query (product name, SKU, or description)",
                "minLength": 1
            },
            "category": {
                "type": "string",
                "description": "Product category (e.g., 'electronics', 'clothing')",
                "enum": ["electronics", "clothing", "food", "books"]
            },
            "min_price": {
                "type": "number",
                "description": "Minimum price in USD",
                "minimum": 0
            },
            "max_price": {
                "type": "number",
                "description": "Maximum price in USD",
                "minimum": 0
            },
            "in_stock": {
                "type": "boolean",
                "description": "Filter to only in-stock items",
                "default": False
            },
            "limit": {
                "type": "integer",
                "description": "Maximum number of results to return",
                "minimum": 1,
                "maximum": 100,
                "default": 10
            }
        },
        "required": ["query"]
    }
)

3. Input Validation

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool with comprehensive validation."""

    if name == "search_products":
        # Extract and validate arguments
        query = arguments.get("query", "").strip()
        if not query:
            return [TextContent(
                type="text",
                text="Error: Query cannot be empty"
            )]

        min_price = arguments.get("min_price", 0)
        max_price = arguments.get("max_price", float('inf'))

        # Validate price range
        if min_price < 0:
            return [TextContent(
                type="text",
                text="Error: min_price must be non-negative"
            )]

        if max_price < min_price:
            return [TextContent(
                type="text",
                text=f"Error: max_price ({max_price}) cannot be less than min_price ({min_price})"
            )]

        # Execute search
        try:
            results = search_product_catalog(
                query=query,
                min_price=min_price,
                max_price=max_price,
                **arguments
            )

            return [TextContent(
                type="text",
                text=format_search_results(results)
            )]

        except Exception as e:
            logger.error(f"Search failed: {e}")
            return [TextContent(
                type="text",
                text=f"Search error: {str(e)}"
            )]

4. Error Handling

async def safe_tool_execution(tool_func, name: str, arguments: dict) -> list[TextContent]:
    """Wrapper for safe tool execution with comprehensive error handling."""

    try:
        # Execute tool
        result = await tool_func(name, arguments)
        return result

    except KeyError as e:
        # Missing required argument
        logger.error(f"Missing argument: {e}")
        return [TextContent(
            type="text",
            text=f"Error: Missing required argument: {e}"
        )]

    except ValueError as e:
        # Invalid argument value
        logger.error(f"Invalid argument: {e}")
        return [TextContent(
            type="text",
            text=f"Error: Invalid argument value: {e}"
        )]

    except PermissionError as e:
        # Access denied
        logger.error(f"Permission denied: {e}")
        return [TextContent(
            type="text",
            text=f"Error: Permission denied: {e}"
        )]

    except TimeoutError as e:
        # Operation timeout
        logger.error(f"Timeout: {e}")
        return [TextContent(
            type="text",
            text=f"Error: Operation timed out: {e}"
        )]

    except Exception as e:
        # Unexpected error
        logger.exception(f"Unexpected error: {e}")
        return [TextContent(
            type="text",
            text=f"Error: Unexpected error occurred: {str(e)}"
        )]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Call tool with error handling."""
    return await safe_tool_execution(execute_tool, name, arguments)

5. Response Formatting

def format_response(data: dict, format_type: str = "text") -> list[TextContent]:
    """Format response based on type."""

    if format_type == "json":
        # JSON format
        import json
        return [TextContent(
            type="text",
            text=json.dumps(data, indent=2)
        )]

    elif format_type == "table":
        # Table format
        if isinstance(data, list) and data:
            # Create markdown table
            headers = data[0].keys()
            table = "| " + " | ".join(headers) + " |\n"
            table += "| " + " | ".join(["---"] * len(headers)) + " |\n"

            for row in data:
                table += "| " + " | ".join(str(row[h]) for h in headers) + " |\n"

            return [TextContent(type="text", text=table)]

    elif format_type == "summary":
        # Summary format
        summary = f"Results: {len(data)} items\n\n"
        summary += "\n".join(f"- {k}: {v}" for k, v in data.items())
        return [TextContent(type="text", text=summary)]

    else:
        # Default text format
        return [TextContent(type="text", text=str(data))]

Resource Management

Resources provide access to data sources like files, databases, or APIs.

Basic Resource Implementation

from mcp.types import Resource, ResourceContent

@app.list_resources()
async def list_resources() -> list[Resource]:
    """List available resources."""
    return [
        Resource(
            uri="file:///config/settings.json",
            name="Application Settings",
            description="Current application configuration",
            mimeType="application/json"
        ),
        Resource(
            uri="db://users/all",
            name="User Database",
            description="All registered users",
            mimeType="application/json"
        )
    ]

@app.read_resource()
async def read_resource(uri: str) -> str:
    """Read resource by URI."""

    if uri == "file:///config/settings.json":
        # Read configuration file
        import json
        with open("/path/to/settings.json", "r") as f:
            config = json.load(f)
        return json.dumps(config, indent=2)

    elif uri == "db://users/all":
        # Query database
        users = get_all_users_from_db()
        return json.dumps(users, indent=2)

    else:
        raise ValueError(f"Unknown resource URI: {uri}")

Dynamic Resources

@app.list_resources()
async def list_resources() -> list[Resource]:
    """List resources dynamically based on current state."""
    resources = []

    # List all available database tables
    tables = get_database_tables()
    for table in tables:
        resources.append(Resource(
            uri=f"db://tables/{table}",
            name=f"Table: {table}",
            description=f"Access to {table} table",
            mimeType="application/json"
        ))

    # List recent log files
    log_files = get_recent_logs()
    for log_file in log_files:
        resources.append(Resource(
            uri=f"file:///logs/{log_file}",
            name=f"Log: {log_file}",
            description=f"Application log from {log_file}",
            mimeType="text/plain"
        ))

    return resources

@app.read_resource()
async def read_resource(uri: str) -> str:
    """Read resource with pattern matching."""

    # Parse URI
    if uri.startswith("db://tables/"):
        table_name = uri.split("/")[-1]
        return read_database_table(table_name)

    elif uri.startswith("file:///logs/"):
        log_file = uri.split("/")[-1]
        return read_log_file(log_file)

    else:
        raise ValueError(f"Unknown resource URI: {uri}")

Prompt Templates

Prompt templates allow servers to provide reusable prompt patterns.

Basic Prompt Template

from mcp.types import Prompt, PromptArgument, PromptMessage

@app.list_prompts()
async def list_prompts() -> list[Prompt]:
    """List available prompt templates."""
    return [
        Prompt(
            name="code_review",
            description="Review code for quality, security, and best practices",
            arguments=[
                PromptArgument(
                    name="code",
                    description="Code to review",
                    required=True
                ),
                PromptArgument(
                    name="language",
                    description="Programming language",
                    required=True
                ),
                PromptArgument(
                    name="focus",
                    description="Review focus (security, performance, style)",
                    required=False
                )
            ]
        )
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    """Get prompt by name with arguments."""

    if name == "code_review":
        code = arguments["code"]
        language = arguments["language"]
        focus = arguments.get("focus", "general")

        prompt_text = f"""Review the following {language} code.

Focus: {focus}

Code:
```{language}
{code}

Provide:

  1. Code quality assessment

  2. Security vulnerabilities

  3. Performance issues

  4. Best practice recommendations

  5. Suggested improvements """

    return [PromptMessage( role="user", content=prompt_text )]


---

## Advanced Features

### 1. Async Operations

```python
import asyncio
import aiohttp

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool with async operations."""

    if name == "fetch_urls":
        urls = arguments["urls"]

        async def fetch_one(session, url):
            """Fetch single URL."""
            async with session.get(url) as response:
                return await response.text()

        # Fetch all URLs concurrently
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_one(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)

        # Format results
        output = []
        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                output.append(f"❌ {url}: {str(result)}")
            else:
                output.append(f"✅ {url}: {len(result)} bytes")

        return [TextContent(
            type="text",
            text="\n".join(output)
        )]

2. Streaming Responses

from mcp.types import TextContent

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Tool with streaming-like progress updates."""

    if name == "process_large_file":
        file_path = arguments["file_path"]

        # Process in chunks and report progress
        results = []
        total_lines = count_lines(file_path)

        with open(file_path, 'r') as f:
            for i, line in enumerate(f, 1):
                # Process line
                process_line(line)

                # Report progress every 1000 lines
                if i % 1000 == 0:
                    progress = (i / total_lines) * 100
                    results.append(TextContent(
                        type="text",
                        text=f"Progress: {progress:.1f}% ({i}/{total_lines} lines)"
                    ))

        results.append(TextContent(
            type="text",
            text=f"✅ Complete: Processed {total_lines} lines"
        ))

        return results

3. State Management

class StatefulServer:
    """MCP server with state management."""

    def __init__(self):
        self.app = Server("stateful-server")
        self.state = {
            "sessions": {},
            "cache": {},
            "counters": {}
        }
        self._setup_tools()

    def _setup_tools(self):
        """Setup tools with state access."""

        @self.app.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[TextContent]:
            if name == "create_session":
                session_id = arguments["session_id"]
                self.state["sessions"][session_id] = {
                    "created": time.time(),
                    "data": {}
                }
                return [TextContent(
                    type="text",
                    text=f"Session {session_id} created"
                )]

            elif name == "store_data":
                session_id = arguments["session_id"]
                key = arguments["key"]
                value = arguments["value"]

                if session_id not in self.state["sessions"]:
                    return [TextContent(
                        type="text",
                        text=f"Error: Session {session_id} not found"
                    )]

                self.state["sessions"][session_id]["data"][key] = value
                return [TextContent(
                    type="text",
                    text=f"Stored {key} in session {session_id}"
                )]

            elif name == "get_data":
                session_id = arguments["session_id"]
                key = arguments["key"]

                session = self.state["sessions"].get(session_id)
                if not session:
                    return [TextContent(
                        type="text",
                        text=f"Error: Session {session_id} not found"
                    )]

                value = session["data"].get(key, "Not found")
                return [TextContent(
                    type="text",
                    text=f"{key}: {value}"
                )]

4. Caching

from functools import lru_cache
import hashlib

class CachingServer:
    """MCP server with caching."""

    def __init__(self):
        self.app = Server("caching-server")
        self.cache = {}
        self._setup_tools()

    def _cache_key(self, name: str, arguments: dict) -> str:
        """Generate cache key from tool call."""
        key_data = f"{name}:{sorted(arguments.items())}"
        return hashlib.md5(key_data.encode()).hexdigest()

    async def cached_call(
        self,
        name: str,
        arguments: dict,
        ttl: int = 300
    ) -> list[TextContent]:
        """Execute tool with caching."""

        # Check cache
        cache_key = self._cache_key(name, arguments)

        if cache_key in self.cache:
            cached_result, cached_time = self.cache[cache_key]

            # Check if cache is still valid
            if time.time() - cached_time < ttl:
                logger.info(f"Cache hit: {name}")
                return cached_result

        # Execute tool
        logger.info(f"Cache miss: {name}")
        result = await self.execute_tool(name, arguments)

        # Store in cache
        self.cache[cache_key] = (result, time.time())

        return result

Testing and Debugging

Unit Testing

"""
test_calculator_server.py - Unit tests for calculator server
"""
import pytest
import asyncio
from calculator_server import app

@pytest.mark.asyncio
async def test_list_tools():
    """Test tool listing."""
    tools = await app.list_tools()

    assert len(tools) == 3
    tool_names = [t.name for t in tools]
    assert "add" in tool_names
    assert "multiply" in tool_names
    assert "power" in tool_names

@pytest.mark.asyncio
async def test_add_tool():
    """Test addition tool."""
    result = await app.call_tool("add", {"a": 5, "b": 3})

    assert len(result) == 1
    assert "8" in result[0].text

@pytest.mark.asyncio
async def test_multiply_tool():
    """Test multiplication tool."""
    result = await app.call_tool("multiply", {"a": 4, "b": 7})

    assert len(result) == 1
    assert "28" in result[0].text

@pytest.mark.asyncio
async def test_power_tool():
    """Test power tool."""
    result = await app.call_tool("power", {"base": 2, "exponent": 10})

    assert len(result) == 1
    assert "1024" in result[0].text

@pytest.mark.asyncio
async def test_invalid_tool():
    """Test invalid tool name."""
    result = await app.call_tool("invalid_tool", {})

    assert len(result) == 1
    assert "Error" in result[0].text or "Unknown" in result[0].text

@pytest.mark.asyncio
async def test_missing_arguments():
    """Test missing required arguments."""
    result = await app.call_tool("add", {"a": 5})  # Missing 'b'

    assert len(result) == 1
    assert "Error" in result[0].text

# Run tests
# pytest test_calculator_server.py -v

Integration Testing

"""
test_calculator_integration.py - Integration tests
"""
import pytest
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import os

@pytest.fixture
def calculator_team():
    """Create calculator team for testing."""
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        temperature=0,
        api_key=os.getenv("OPENAI_API_KEY")
    )

    team = (MCPTeamBuilder("test_calculator")
        .with_llm(llm)
        .with_mcp_server("python", ["calculator_server.py"])
        .with_prompt("You are a calculator assistant.")
        .build()
    )

    return team

def test_team_creation(calculator_team):
    """Test team is created successfully."""
    assert calculator_team is not None
    tools = calculator_team.get_mcp_tool_names()
    assert len(tools) > 0

def test_simple_addition(calculator_team):
    """Test simple addition."""
    result = calculator_team({
        "messages": [HumanMessage(content="What is 5 + 3?")]
    })

    response = result["messages"][-1].content
    assert "8" in response

def test_multiplication(calculator_team):
    """Test multiplication."""
    result = calculator_team({
        "messages": [HumanMessage(content="Calculate 12 times 8")]
    })

    response = result["messages"][-1].content
    assert "96" in response

def test_complex_calculation(calculator_team):
    """Test complex calculation."""
    result = calculator_team({
        "messages": [HumanMessage(content="Add 10 and 20, then multiply by 2")]
    })

    response = result["messages"][-1].content
    assert "60" in response

Debugging Tools

"""
debug_server.py - Server with debugging capabilities
"""
import logging
import time
from functools import wraps

# Configure detailed logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mcp_server_debug.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def debug_tool(func):
    """Decorator to debug tool execution."""
    @wraps(func)
    async def wrapper(name: str, arguments: dict):
        logger.debug(f"=== Tool Call Start ===")
        logger.debug(f"Tool: {name}")
        logger.debug(f"Arguments: {arguments}")

        start_time = time.time()

        try:
            result = await func(name, arguments)
            elapsed = time.time() - start_time

            logger.debug(f"Result: {result}")
            logger.debug(f"Execution time: {elapsed:.3f}s")
            logger.debug(f"=== Tool Call Success ===")

            return result

        except Exception as e:
            elapsed = time.time() - start_time

            logger.error(f"Error: {str(e)}")
            logger.error(f"Execution time: {elapsed:.3f}s")
            logger.error(f"=== Tool Call Failed ===")
            logger.exception("Full traceback:")

            raise

    return wrapper

# Apply to tool handler
@app.call_tool()
@debug_tool
async def call_tool(name: str, arguments: dict):
    """Tool execution with debugging."""
    # Implementation here
    pass

Deployment

STDIO Deployment (Local)

"""
Production STDIO server with proper error handling.
"""
import asyncio
import logging
import signal
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/mcp_server.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

app = Server("production-server")

# Setup tools
@app.list_tools()
async def list_tools():
    # Tool definitions
    pass

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    # Tool implementations
    pass

async def shutdown(signal, loop):
    """Graceful shutdown handler."""
    logger.info(f"Received exit signal {signal.name}...")

    # Cleanup tasks
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]

    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

async def main():
    """Run server with graceful shutdown."""
    loop = asyncio.get_running_loop()

    # Setup signal handlers
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(shutdown(s, loop))
        )

    logger.info("Starting MCP server...")

    try:
        async with stdio_server() as (read_stream, write_stream):
            await app.run(
                read_stream,
                write_stream,
                app.create_initialization_options()
            )
    except Exception as e:
        logger.error(f"Server error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Server stopped by user")

SSE Deployment (Remote)

"""
Production SSE server with FastAPI.
"""
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from mcp.server import Server
import asyncio
import logging
import json

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create FastAPI app
api = FastAPI(title="MCP Server API")

# Create MCP server
mcp_server = Server("sse-production-server")

# Setup MCP tools
@mcp_server.list_tools()
async def list_tools():
    # Tool definitions
    pass

@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict):
    # Tool implementations
    pass

# Health check endpoint
@api.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "server": "mcp-sse-server",
        "version": "1.0.0"
    }

# SSE endpoint
@api.get("/sse")
async def sse_endpoint(request: Request):
    """SSE endpoint for MCP communication."""

    # Authentication (if needed)
    auth_token = request.headers.get("Authorization")
    if not auth_token or not verify_token(auth_token):
        raise HTTPException(status_code=401, detail="Unauthorized")

    async def event_generator():
        """Generate SSE events."""
        try:
            while True:
                # Check if client disconnected
                if await request.is_disconnected():
                    logger.info("Client disconnected")
                    break

                # Handle MCP messages
                # This is simplified - actual implementation would
                # involve proper MCP message handling

                yield f"data: {json.dumps({'type': 'ping'})}\n\n"
                await asyncio.sleep(1)

        except Exception as e:
            logger.error(f"SSE error: {e}")

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

def verify_token(token: str) -> bool:
    """Verify authentication token."""
    # Implement token verification
    return True

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        api,
        host="0.0.0.0",
        port=8000,
        log_level="info",
        access_log=True
    )

Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy server code
COPY server.py .
COPY config.py .

# Create log directory
RUN mkdir -p /var/log

# Expose port for SSE (if using SSE transport)
EXPOSE 8000

# Run server
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    container_name: mcp-server
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=INFO
      - DATABASE_URL=${DATABASE_URL}
      - API_KEY=${API_KEY}
    volumes:
      - ./logs:/var/log
      - ./config:/app/config
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
# Build and run
docker-compose up -d

# View logs
docker-compose logs -f mcp-server

# Stop
docker-compose down

Security Best Practices

1. Input Validation

from typing import Any
import re

class InputValidator:
    """Validate and sanitize tool inputs."""

    @staticmethod
    def validate_string(value: str, max_length: int = 1000) -> str:
        """Validate string input."""
        if not isinstance(value, str):
            raise ValueError("Input must be string")

        if len(value) > max_length:
            raise ValueError(f"String too long (max {max_length})")

        return value.strip()

    @staticmethod
    def validate_integer(value: Any, min_val: int = None, max_val: int = None) -> int:
        """Validate integer input."""
        try:
            val = int(value)
        except (ValueError, TypeError):
            raise ValueError("Input must be integer")

        if min_val is not None and val < min_val:
            raise ValueError(f"Value must be >= {min_val}")

        if max_val is not None and val > max_val:
            raise ValueError(f"Value must be <= {max_val}")

        return val

    @staticmethod
    def validate_path(path: str, allowed_dirs: list[str]) -> str:
        """Validate file path."""
        import os

        # Resolve to absolute path
        abs_path = os.path.abspath(path)

        # Check if path is in allowed directories
        allowed = any(
            abs_path.startswith(os.path.abspath(d))
            for d in allowed_dirs
        )

        if not allowed:
            raise ValueError(f"Path not in allowed directories")

        return abs_path

    @staticmethod
    def validate_sql_safe(query: str) -> str:
        """Basic SQL injection prevention."""
        # This is simplified - use parameterized queries instead
        dangerous_keywords = [
            'DROP', 'DELETE', 'TRUNCATE', 'ALTER',
            'EXEC', 'EXECUTE', '--', ';', '/*'
        ]

        query_upper = query.upper()
        for keyword in dangerous_keywords:
            if keyword in query_upper:
                raise ValueError(f"Dangerous SQL keyword: {keyword}")

        return query

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Tool with input validation."""
    validator = InputValidator()

    if name == "read_file":
        # Validate file path
        file_path = validator.validate_path(
            arguments["path"],
            allowed_dirs=["/data", "/documents"]
        )

        # Read file safely
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            return [TextContent(type="text", text=content)]
        except Exception as e:
            return [TextContent(type="text", text=f"Error: {e}")]

2. Access Control

class AccessControl:
    """Role-based access control for tools."""

    def __init__(self):
        self.permissions = {
            "admin": ["*"],  # All tools
            "user": ["read_file", "list_files", "search"],
            "guest": ["list_files"]
        }

    def check_permission(self, role: str, tool_name: str) -> bool:
        """Check if role has permission for tool."""
        if role not in self.permissions:
            return False

        allowed_tools = self.permissions[role]

        # Check wildcard
        if "*" in allowed_tools:
            return True

        # Check specific tool
        return tool_name in allowed_tools

    def require_permission(self, role: str, tool_name: str):
        """Raise exception if permission denied."""
        if not self.check_permission(role, tool_name):
            raise PermissionError(
                f"Role '{role}' not authorized for tool '{tool_name}'"
            )

# Initialize access control
ac = AccessControl()

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Tool execution with access control."""

    # Get user role from context (implementation depends on auth method)
    user_role = get_current_user_role()

    # Check permission
    try:
        ac.require_permission(user_role, name)
    except PermissionError as e:
        logger.warning(f"Permission denied: {e}")
        return [TextContent(
            type="text",
            text=f"Error: {str(e)}"
        )]

    # Execute tool
    return await execute_tool_impl(name, arguments)

3. Rate Limiting

import time
from collections import defaultdict

class RateLimiter:
    """Rate limiter for tool calls."""

    def __init__(self, calls_per_minute: int = 60):
        self.calls_per_minute = calls_per_minute
        self.calls = defaultdict(list)

    def check_rate_limit(self, user_id: str) -> bool:
        """Check if user is within rate limit."""
        now = time.time()
        minute_ago = now - 60

        # Remove old calls
        self.calls[user_id] = [
            call_time for call_time in self.calls[user_id]
            if call_time > minute_ago
        ]

        # Check limit
        if len(self.calls[user_id]) >= self.calls_per_minute:
            return False

        # Record call
        self.calls[user_id].append(now)
        return True

# Initialize rate limiter
rate_limiter = RateLimiter(calls_per_minute=60)

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Tool execution with rate limiting."""

    # Get user ID
    user_id = get_current_user_id()

    # Check rate limit
    if not rate_limiter.check_rate_limit(user_id):
        logger.warning(f"Rate limit exceeded for user {user_id}")
        return [TextContent(
            type="text",
            text="Error: Rate limit exceeded. Please try again later."
        )]

    # Execute tool
    return await execute_tool_impl(name, arguments)

4. Secure Configuration

import os
from pathlib import Path
from typing import Dict, Any
import json

class SecureConfig:
    """Secure configuration management."""

    def __init__(self, config_path: str = None):
        self.config_path = config_path or self._default_config_path()
        self.config = self._load_config()

    def _default_config_path(self) -> str:
        """Get default config path."""
        # Use secure location
        config_dir = os.getenv("MCP_CONFIG_DIR", "/etc/mcp")
        return os.path.join(config_dir, "config.json")

    def _load_config(self) -> Dict[str, Any]:
        """Load configuration securely."""
        try:
            with open(self.config_path, 'r') as f:
                config = json.load(f)

            # Verify required fields
            required = ["server_name", "allowed_dirs", "log_level"]
            for field in required:
                if field not in config:
                    raise ValueError(f"Missing required config: {field}")

            return config

        except FileNotFoundError:
            logger.error(f"Config not found: {self.config_path}")
            return self._default_config()

    def _default_config(self) -> Dict[str, Any]:
        """Get default configuration."""
        return {
            "server_name": "mcp-server",
            "allowed_dirs": ["/data"],
            "log_level": "INFO",
            "rate_limit": 60,
            "max_file_size": 10485760  # 10MB
        }

    def get(self, key: str, default: Any = None) -> Any:
        """Get config value."""
        return self.config.get(key, default)

    def get_secret(self, key: str) -> str:
        """Get secret from environment."""
        # Secrets should come from environment, not config files
        env_key = f"MCP_{key.upper()}"
        value = os.getenv(env_key)

        if value is None:
            raise ValueError(f"Secret not found: {env_key}")

        return value

# Usage
config = SecureConfig()

# Get config values
server_name = config.get("server_name")
allowed_dirs = config.get("allowed_dirs")

# Get secrets from environment
api_key = config.get_secret("api_key")
db_password = config.get_secret("db_password")

Complete Examples

Example 1: CRM Integration Server

"""
crm_server.py - Custom MCP server for CRM integration.

Exposes internal CRM system to AI agents.
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import logging
import os
from typing import List, Dict, Any

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Mock CRM database (replace with actual CRM API)
class CRMDatabase:
    """Mock CRM database."""

    def __init__(self):
        self.customers = {
            "C001": {
                "id": "C001",
                "name": "Acme Corp",
                "email": "contact@acme.com",
                "phone": "555-0100",
                "status": "active",
                "total_orders": 45,
                "lifetime_value": 125000
            },
            "C002": {
                "id": "C002",
                "name": "TechStart Inc",
                "email": "hello@techstart.com",
                "phone": "555-0200",
                "status": "active",
                "total_orders": 12,
                "lifetime_value": 35000
            }
        }

    def search_customers(self, query: str) -> List[Dict]:
        """Search customers."""
        results = []
        query_lower = query.lower()

        for customer in self.customers.values():
            if (query_lower in customer["name"].lower() or
                query_lower in customer["email"].lower()):
                results.append(customer)

        return results

    def get_customer(self, customer_id: str) -> Dict:
        """Get customer by ID."""
        if customer_id not in self.customers:
            raise ValueError(f"Customer not found: {customer_id}")
        return self.customers[customer_id]

    def update_customer(self, customer_id: str, updates: Dict) -> Dict:
        """Update customer data."""
        if customer_id not in self.customers:
            raise ValueError(f"Customer not found: {customer_id}")

        self.customers[customer_id].update(updates)
        return self.customers[customer_id]

# Initialize CRM
crm_db = CRMDatabase()

# Create MCP server
app = Server("crm-integration-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Register CRM tools."""
    return [
        Tool(
            name="search_customers",
            description="Search for customers by name or email",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query (name or email)"
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="get_customer",
            description="Get detailed customer information by ID",
            inputSchema={
                "type": "object",
                "properties": {
                    "customer_id": {
                        "type": "string",
                        "description": "Customer ID (e.g., C001)"
                    }
                },
                "required": ["customer_id"]
            }
        ),
        Tool(
            name="update_customer_status",
            description="Update customer status (active, inactive, suspended)",
            inputSchema={
                "type": "object",
                "properties": {
                    "customer_id": {
                        "type": "string",
                        "description": "Customer ID"
                    },
                    "status": {
                        "type": "string",
                        "description": "New status",
                        "enum": ["active", "inactive", "suspended"]
                    }
                },
                "required": ["customer_id", "status"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute CRM tool."""
    logger.info(f"Executing: {name} with {arguments}")

    try:
        if name == "search_customers":
            query = arguments["query"]
            results = crm_db.search_customers(query)

            if not results:
                return [TextContent(
                    type="text",
                    text=f"No customers found for query: {query}"
                )]

            # Format results
            output = f"Found {len(results)} customer(s):\n\n"
            for customer in results:
                output += f"**{customer['name']}** (ID: {customer['id']})\n"
                output += f"- Email: {customer['email']}\n"
                output += f"- Status: {customer['status']}\n"
                output += f"- Lifetime Value: ${customer['lifetime_value']:,}\n\n"

            return [TextContent(type="text", text=output)]

        elif name == "get_customer":
            customer_id = arguments["customer_id"]
            customer = crm_db.get_customer(customer_id)

            # Format customer details
            output = f"# {customer['name']}\n\n"
            output += f"**Customer ID:** {customer['id']}\n"
            output += f"**Email:** {customer['email']}\n"
            output += f"**Phone:** {customer['phone']}\n"
            output += f"**Status:** {customer['status']}\n"
            output += f"**Total Orders:** {customer['total_orders']}\n"
            output += f"**Lifetime Value:** ${customer['lifetime_value']:,}\n"

            return [TextContent(type="text", text=output)]

        elif name == "update_customer_status":
            customer_id = arguments["customer_id"]
            new_status = arguments["status"]

            customer = crm_db.update_customer(
                customer_id,
                {"status": new_status}
            )

            return [TextContent(
                type="text",
                text=f"Updated {customer['name']} status to: {new_status}"
            )]

        else:
            raise ValueError(f"Unknown tool: {name}")

    except Exception as e:
        logger.error(f"Tool execution failed: {e}")
        return [TextContent(
            type="text",
            text=f"Error: {str(e)}"
        )]

async def main():
    """Run CRM MCP server."""
    logger.info("Starting CRM integration server...")

    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Using the CRM Server:

"""
test_crm_client.py - Test CRM server integration
"""
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import os

# Create CRM assistant
crm_assistant = (MCPTeamBuilder("crm_assistant")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))
    .with_mcp_server("python", ["crm_server.py"])
    .with_prompt("""You are a CRM assistant with access to customer data.

Available tools:
- search_customers: Find customers by name or email
- get_customer: Get detailed customer information
- update_customer_status: Change customer status

Always provide clear, professional responses about customer data.
""")
    .build()
)

# Test queries
queries = [
    "Search for customers with 'tech' in their name",
    "Get details for customer C001",
    "Update customer C002 status to inactive"
]

for query in queries:
    print(f"\nQuery: {query}")
    result = crm_assistant({
        "messages": [HumanMessage(content=query)]
    })
    print(f"Response: {result['messages'][-1].content}")

Example 2: Data Analytics Server

"""
analytics_server.py - Data analytics MCP server.

Provides statistical analysis and data processing tools.
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import logging
import pandas as pd
import numpy as np
from typing import List, Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AnalyticsEngine:
    """Analytics engine with data processing capabilities."""

    def __init__(self):
        self.datasets = {}

    def load_csv(self, file_path: str, name: str) -> Dict:
        """Load CSV file."""
        try:
            df = pd.read_csv(file_path)
            self.datasets[name] = df

            return {
                "name": name,
                "rows": len(df),
                "columns": list(df.columns),
                "memory": f"{df.memory_usage(deep=True).sum() / 1024:.2f} KB"
            }
        except Exception as e:
            raise ValueError(f"Failed to load CSV: {e}")

    def describe_dataset(self, name: str) -> Dict:
        """Get dataset statistics."""
        if name not in self.datasets:
            raise ValueError(f"Dataset not found: {name}")

        df = self.datasets[name]

        return {
            "shape": df.shape,
            "columns": list(df.columns),
            "dtypes": df.dtypes.to_dict(),
            "missing": df.isnull().sum().to_dict(),
            "statistics": df.describe().to_dict()
        }

    def correlation_analysis(self, name: str) -> Dict:
        """Compute correlation matrix."""
        if name not in self.datasets:
            raise ValueError(f"Dataset not found: {name}")

        df = self.datasets[name]
        numeric_df = df.select_dtypes(include=[np.number])

        if numeric_df.empty:
            raise ValueError("No numeric columns for correlation")

        corr_matrix = numeric_df.corr()

        return {
            "correlation_matrix": corr_matrix.to_dict(),
            "strong_correlations": self._find_strong_correlations(corr_matrix)
        }

    def _find_strong_correlations(self, corr_matrix, threshold=0.7):
        """Find strong correlations."""
        strong_corr = []

        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                col1 = corr_matrix.columns[i]
                col2 = corr_matrix.columns[j]
                corr_value = corr_matrix.iloc[i, j]

                if abs(corr_value) >= threshold:
                    strong_corr.append({
                        "column1": col1,
                        "column2": col2,
                        "correlation": float(corr_value)
                    })

        return strong_corr

# Initialize analytics engine
analytics = AnalyticsEngine()

# Create MCP server
app = Server("analytics-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Register analytics tools."""
    return [
        Tool(
            name="load_csv",
            description="Load CSV file for analysis",
            inputSchema={
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "Path to CSV file"
                    },
                    "name": {
                        "type": "string",
                        "description": "Dataset name for reference"
                    }
                },
                "required": ["file_path", "name"]
            }
        ),
        Tool(
            name="describe_dataset",
            description="Get dataset statistics and info",
            inputSchema={
                "type": "object",
                "properties": {
                    "name": {
                        "type": "string",
                        "description": "Dataset name"
                    }
                },
                "required": ["name"]
            }
        ),
        Tool(
            name="correlation_analysis",
            description="Compute correlation matrix for numeric columns",
            inputSchema={
                "type": "object",
                "properties": {
                    "name": {
                        "type": "string",
                        "description": "Dataset name"
                    }
                },
                "required": ["name"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute analytics tool."""
    logger.info(f"Executing: {name}")

    try:
        if name == "load_csv":
            result = analytics.load_csv(
                arguments["file_path"],
                arguments["name"]
            )

            output = f"✅ Loaded dataset: {result['name']}\n"
            output += f"- Rows: {result['rows']}\n"
            output += f"- Columns: {', '.join(result['columns'])}\n"
            output += f"- Memory: {result['memory']}"

            return [TextContent(type="text", text=output)]

        elif name == "describe_dataset":
            result = analytics.describe_dataset(arguments["name"])

            output = f"# Dataset: {arguments['name']}\n\n"
            output += f"**Shape:** {result['shape'][0]} rows × {result['shape'][1]} columns\n\n"
            output += f"**Columns:** {', '.join(result['columns'])}\n\n"

            # Missing values
            missing = result['missing']
            if any(missing.values()):
                output += "**Missing Values:**\n"
                for col, count in missing.items():
                    if count > 0:
                        output += f"- {col}: {count}\n"
                output += "\n"

            return [TextContent(type="text", text=output)]

        elif name == "correlation_analysis":
            result = analytics.correlation_analysis(arguments["name"])

            output = f"# Correlation Analysis\n\n"

            strong_corr = result["strong_correlations"]
            if strong_corr:
                output += "**Strong Correlations (|r| ≥ 0.7):**\n\n"
                for corr in strong_corr:
                    output += f"- {corr['column1']}{corr['column2']}: "
                    output += f"{corr['correlation']:.3f}\n"
            else:
                output += "No strong correlations found.\n"

            return [TextContent(type="text", text=output)]

        else:
            raise ValueError(f"Unknown tool: {name}")

    except Exception as e:
        logger.error(f"Tool failed: {e}")
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def main():
    """Run analytics server."""
    logger.info("Starting analytics server...")

    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Summary

Custom MCP servers provide:

  1. Flexibility: Build exactly what you need
  2. Separation of Concerns: Clean architecture
  3. Reusability: One server, many agents
  4. Maintainability: Independent versioning and updates
  5. Security: Centralized access control
  6. Scalability: Can run on separate infrastructure

Key Takeaways:

  • Build custom servers for proprietary systems and specialized logic
  • Follow MCP protocol specifications for compatibility
  • Implement comprehensive error handling and validation
  • Use security best practices (input validation, access control)
  • Test thoroughly before deployment
  • Deploy with proper logging and monitoring
  • Document tools clearly for effective agent usage
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs