• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Model Context Protocol (MCP)

Hybrid Teams

Building advanced agent teams that combine multiple MCP servers, transport types, and custom tools.

Complete guide to building sophisticated agent teams that combine official MCP servers, custom servers, multiple transport types, and traditional LangChain tools.


Overview

Hybrid Teams represent the most flexible and powerful agent configuration in the az-core. They combine multiple capabilities:

  • Multiple MCP Servers: Both official and custom
  • Mixed Transport Types: STDIO and SSE simultaneously
  • Custom Tools: Traditional LangChain tools
  • RL Optimization: Optional reinforcement learning
  • Graceful Degradation: Continues working if some components fail

Why Hybrid Teams?

# ❌ Limited approach - single capability
team = (MCPTeamBuilder("limited_team")
    .with_llm(llm)
    .with_mcp_server("npx", ["-y", "@modelcontextprotocol/server-filesystem"])
    .build()
)

# ✅ Hybrid approach - comprehensive capabilities
team = (MCPTeamBuilder("hybrid_team")
    .with_llm(llm)

    # Local file operations (STDIO)
    .with_mcp_server("npx", ["-y", "@modelcontextprotocol/server-filesystem"])

    # Custom business logic (STDIO)
    .with_mcp_server("python", ["servers/crm_server.py"])

    # Remote search API (SSE)
    .with_mcp_server_sse(
        url="https://api.example.com/mcp/sse",
        env={"API_KEY": os.getenv("API_KEY")}
    )

    # Traditional tools
    .with_tools([DuckDuckGoSearchRun()])

    .build()
)

Benefits:

  1. Maximum Flexibility: Use the best tool for each job
  2. Optimal Performance: Local for speed, remote for scale
  3. Incremental Migration: Add capabilities gradually
  4. Fault Tolerance: Graceful degradation if components fail
  5. Best of All Worlds: Official servers + custom logic + traditional tools

What are Hybrid Teams?

Core Components

┌─────────────────────────────────────────────────────┐
│              Hybrid Team Architecture                │
│                                                       │
│  ┌───────────────────────────────────────────────┐  │
│  │            MCPTeamBuilder                     │  │
│  │         (Orchestration Layer)                 │  │
│  └─────────────────┬─────────────────────────────┘  │
│                    │                                 │
│        ┌───────────┼───────────┬──────────┐         │
│        │           │           │          │         │
│        ▼           ▼           ▼          ▼         │
│   ┌────────┐ ┌─────────┐ ┌────────┐ ┌──────────┐  │
│   │Official│ │ Custom  │ │  SSE   │ │LangChain │  │
│   │ MCP    │ │  MCP    │ │  MCP   │ │  Tools   │  │
│   │(STDIO) │ │(STDIO)  │ │(Remote)│ │(Direct)  │  │
│   └────────┘ └─────────┘ └────────┘ └──────────┘  │
│       │           │           │          │         │
│       ▼           ▼           ▼          ▼         │
│   ┌────────────────────────────────────────────┐   │
│   │        Unified Tool Interface              │   │
│   │    (All tools available to agent)          │   │
│   └────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────┘

Hybrid Team Characteristics

1. Multiple Data Sources:

  • Local files (filesystem MCP)
  • Remote APIs (SSE MCP)
  • Internal databases (custom MCP)
  • Web search (LangChain tools)

2. Mixed Latency Profiles:

  • Low latency: Local STDIO servers
  • Medium latency: Custom STDIO servers
  • Higher latency: Remote SSE servers
  • Variable latency: External APIs

3. Different Security Models:

  • Process isolation: STDIO servers
  • Network security: SSE servers
  • Token-based auth: Official servers
  • Custom auth: Internal servers

4. Varied Reliability:

  • High: Local file operations
  • Medium: Custom internal services
  • Lower: External APIs and services

Architecture Patterns

Pattern 1: Local + Remote

Combine local operations with remote services.

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
import os

# Local + Remote hybrid team
local_remote_team = (MCPTeamBuilder("local_remote_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))

    # ===== LOCAL OPERATIONS (Fast, Private) =====

    # Local filesystem
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"],
        timeout=10
    )

    # Local database
    .with_mcp_server(
        "python",
        ["servers/local_db_server.py"],
        env={"DB_PATH": "/data/local.db"},
        timeout=10
    )

    # ===== REMOTE SERVICES (Scalable, Shared) =====

    # GitHub API (remote)
    .with_mcp_server_sse(
        url="https://github-mcp.example.com/sse",
        env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")},
        timeout=30
    )

    # Search API (remote)
    .with_mcp_server_sse(
        url="https://search-api.example.com/mcp/sse",
        env={"API_KEY": os.getenv("SEARCH_API_KEY")},
        timeout=30
    )

    .with_prompt("""You are a development assistant with access to:

LOCAL CAPABILITIES (fast):
- File operations (read, write, search local files)
- Local database queries

REMOTE CAPABILITIES (scalable):
- GitHub operations (issues, PRs, repos)
- Web search

STRATEGY:
- Use local tools for speed when possible
- Use remote tools for scalability and external data
- Combine both for comprehensive workflows
""")

    .build()
)

# Usage example
result = local_remote_team({
    "messages": [HumanMessage(content="""
    Create a bug report:
    1. Search local code for error patterns
    2. Check local logs for stack traces
    3. Search GitHub for similar issues
    4. Create new GitHub issue with findings
    """)]
})

Pattern 2: Official + Custom

Combine battle-tested official servers with custom business logic.

# Official + Custom hybrid team
official_custom_team = (MCPTeamBuilder("official_custom_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))

    # ===== OFFICIAL SERVERS (Standard Tools) =====

    # Filesystem operations
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    # PostgreSQL database
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")}
    )

    # GitHub integration
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
    )

    # ===== CUSTOM SERVERS (Business Logic) =====

    # Custom CRM integration
    .with_mcp_server(
        "python",
        ["servers/crm_server.py"],
        env={"CRM_API_KEY": os.getenv("CRM_API_KEY")}
    )

    # Custom pricing engine
    .with_mcp_server(
        "python",
        ["servers/pricing_server.py"],
        env={"PRICING_CONFIG": "/config/pricing.json"}
    )

    # Custom notification service
    .with_mcp_server(
        "python",
        ["servers/notification_server.py"],
        env={
            "SLACK_TOKEN": os.getenv("SLACK_TOKEN"),
            "EMAIL_API_KEY": os.getenv("EMAIL_API_KEY")
        }
    )

    .with_prompt("""You are a business operations assistant.

STANDARD TOOLS (official servers):
- File operations
- Database queries
- GitHub management

BUSINESS TOOLS (custom servers):
- CRM customer management
- Pricing calculations
- Multi-channel notifications

Use standard tools for common operations and business tools for company-specific workflows.
""")

    .build()
)

# Usage example
result = official_custom_team({
    "messages": [HumanMessage(content="""
    Process new customer order:
    1. Look up customer in CRM
    2. Calculate pricing with custom rules
    3. Query database for inventory
    4. Create order record in database
    5. Send confirmation via notification service
    6. Log transaction to file
    """)]
})

Pattern 3: MCP + Traditional Tools

Combine MCP servers with traditional LangChain tools.

from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.tools import Tool

# Create custom tools
def calculate_roi(investment: float, return_value: float) -> str:
    """Calculate return on investment."""
    roi = ((return_value - investment) / investment) * 100
    return f"ROI: {roi:.2f}%"

roi_tool = Tool(
    name="calculate_roi",
    func=calculate_roi,
    description="Calculate return on investment (ROI) given investment amount and return value"
)

# MCP + Traditional tools hybrid team
hybrid_tools_team = (MCPTeamBuilder("hybrid_tools_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))

    # ===== MCP SERVERS =====

    # Local file operations
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
    )

    # Custom analytics server
    .with_mcp_server(
        "python",
        ["servers/analytics_server.py"]
    )

    # ===== TRADITIONAL LANGCHAIN TOOLS =====

    .with_tools([
        DuckDuckGoSearchRun(),                          # Web search
        WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()),  # Wikipedia
        roi_tool                                        # Custom calculation
    ])

    .with_prompt("""You are a research and analysis assistant.

MCP CAPABILITIES:
- File operations (read/write data files)
- Advanced analytics (statistics, visualization)

TRADITIONAL TOOLS:
- Web search (DuckDuckGo)
- Wikipedia lookup
- ROI calculations

Combine all tools for comprehensive research and analysis.
""")

    .build()
)

# Usage example
result = hybrid_tools_team({
    "messages": [HumanMessage(content="""
    Research AI investment landscape:
    1. Search web for recent AI company funding
    2. Look up key companies on Wikipedia
    3. Load our investment data from CSV file
    4. Analyze trends using analytics tools
    5. Calculate ROI for our AI investments
    6. Save report to file
    """)]
})

Pattern 4: Multi-Layer Architecture

Organize servers into logical layers.

# Multi-layer hybrid team
multi_layer_team = (MCPTeamBuilder("multi_layer_team")
    .with_llm(ChatOpenAI(model="gpt-4o"))

    # ===== DATA LAYER =====

    # Local file storage
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    # PostgreSQL database
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")}
    )

    # ===== BUSINESS LOGIC LAYER =====

    # Order processing
    .with_mcp_server(
        "python",
        ["servers/order_processor.py"],
        env={"CONFIG_PATH": "/config/orders.json"}
    )

    # Inventory management
    .with_mcp_server(
        "python",
        ["servers/inventory_manager.py"]
    )

    # ===== INTEGRATION LAYER =====

    # Payment gateway
    .with_mcp_server_sse(
        url="https://payments-mcp.example.com/sse",
        env={"PAYMENT_API_KEY": os.getenv("PAYMENT_API_KEY")}
    )

    # Shipping service
    .with_mcp_server_sse(
        url="https://shipping-mcp.example.com/sse",
        env={"SHIPPING_API_KEY": os.getenv("SHIPPING_API_KEY")}
    )

    # ===== COMMUNICATION LAYER =====

    # Slack notifications
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    .with_prompt("""You are an e-commerce operations assistant.

ARCHITECTURE LAYERS:

1. DATA LAYER: Storage and retrieval
   - Files, databases

2. BUSINESS LOGIC: Core operations
   - Order processing, inventory

3. INTEGRATION: External services
   - Payments, shipping

4. COMMUNICATION: Notifications
   - Slack alerts

Execute workflows top-to-bottom through layers.
""")

    .build()
)

Combining Transport Types

Why Mix STDIO and SSE?

# Strategic transport selection
strategic_team = (MCPTeamBuilder("strategic_transport")
    .with_llm(llm)

    # STDIO for:
    # - Low latency operations
    # - Private/sensitive data
    # - Local resources
    # - Development/testing

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/private"]
    )

    .with_mcp_server(
        "python",
        ["servers/sensitive_data_server.py"]
    )

    # SSE for:
    # - Scalable services
    # - Shared infrastructure
    # - External APIs
    # - Multi-tenant access

    .with_mcp_server_sse(
        url="https://api.shared-service.com/mcp/sse",
        env={"API_KEY": os.getenv("API_KEY")}
    )

    .with_mcp_server_sse(
        url="https://external-api.com/mcp/sse",
        env={"TOKEN": os.getenv("EXTERNAL_TOKEN")}
    )

    .build()
)

Performance Optimization

# Optimized hybrid team with performance considerations
performance_team = (MCPTeamBuilder("performance_optimized")
    .with_llm(ChatOpenAI(model="gpt-4o-mini", temperature=0))

    # ===== HOT PATH (Fast, frequent operations) =====
    # Use STDIO for lowest latency

    .with_mcp_server(
        "python",
        ["servers/cache_server.py"],  # In-memory cache
        timeout=5
    )

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/cache"],
        timeout=5
    )

    # ===== WARM PATH (Moderate frequency) =====
    # Use local STDIO or nearby SSE

    .with_mcp_server(
        "python",
        ["servers/database_server.py"],
        timeout=10
    )

    # ===== COLD PATH (Rare, expensive operations) =====
    # Use remote SSE

    .with_mcp_server_sse(
        url="https://ml-inference.example.com/sse",
        env={"API_KEY": os.getenv("ML_API_KEY")},
        timeout=60  # Longer timeout for ML inference
    )

    .with_mcp_server_sse(
        url="https://analytics.example.com/sse",
        env={"API_KEY": os.getenv("ANALYTICS_KEY")},
        timeout=45
    )

    .with_prompt("""You are a performance-optimized assistant.

PERFORMANCE TIERS:

HOT PATH (< 100ms): Use cache_server and local files
WARM PATH (< 1s): Use database_server
COLD PATH (< 5s): Use ML inference and analytics

Always prefer faster tiers when possible.
""")

    .build()
)

Geographic Distribution

# Geographically distributed hybrid team
distributed_team = (MCPTeamBuilder("geo_distributed")
    .with_llm(llm)

    # Local region (low latency)
    .with_mcp_server_sse(
        url="https://us-west.api.example.com/mcp/sse",
        env={"API_KEY": os.getenv("API_KEY")},
        timeout=30
    )

    # Remote region (higher latency)
    .with_mcp_server_sse(
        url="https://eu-central.api.example.com/mcp/sse",
        env={"API_KEY": os.getenv("API_KEY")},
        timeout=60  # Higher timeout for cross-region
    )

    # Global service (CDN)
    .with_mcp_server_sse(
        url="https://global-cdn.example.com/mcp/sse",
        env={"API_KEY": os.getenv("API_KEY")},
        timeout=45
    )

    # Local operations (always fast)
    .with_mcp_server(
        "python",
        ["servers/local_server.py"]
    )

    .build()
)

Mixing Official and Custom Servers

Complementary Capabilities

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
import os

# Official servers for standard operations
# Custom servers for business-specific logic
complementary_team = (MCPTeamBuilder("complementary_team")
    .with_llm(ChatOpenAI(model="gpt-4o-mini"))

    # ===== OFFICIAL: Data Access =====

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")}
    )

    # ===== OFFICIAL: External Services =====

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
    )

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    # ===== CUSTOM: Business Rules =====

    .with_mcp_server(
        "python",
        ["servers/validation_server.py"],
        env={"RULES_CONFIG": "/config/validation_rules.json"}
    )

    .with_mcp_server(
        "python",
        ["servers/workflow_server.py"],
        env={"WORKFLOW_CONFIG": "/config/workflows.yaml"}
    )

    # ===== CUSTOM: Domain Logic =====

    .with_mcp_server(
        "python",
        ["servers/pricing_engine.py"],
        env={"PRICING_RULES": "/config/pricing.json"}
    )

    .with_mcp_server(
        "python",
        ["servers/compliance_checker.py"],
        env={"COMPLIANCE_DB": "/data/compliance.db"}
    )

    .with_prompt("""You are a business process automation assistant.

STANDARD OPERATIONS (official servers):
- File and database operations
- GitHub and Slack integration

BUSINESS OPERATIONS (custom servers):
- Validation rules enforcement
- Workflow orchestration
- Pricing calculations
- Compliance checking

Use standard tools for data access and communication.
Use business tools for company-specific logic.
""")

    .build()
)

# Example workflow
result = complementary_team({
    "messages": [HumanMessage(content="""
    Process customer order #12345:
    1. Load order details from database (official)
    2. Validate order against business rules (custom)
    3. Calculate pricing with discounts (custom)
    4. Check regulatory compliance (custom)
    5. Execute approval workflow (custom)
    6. Update database with status (official)
    7. Notify customer via Slack (official)
    8. Log to audit file (official)
    """)]
})

Wrapper Pattern

Use custom servers to wrap and enhance official servers.

# Custom wrapper for enhanced functionality
wrapper_team = (MCPTeamBuilder("wrapper_team")
    .with_llm(llm)

    # Official GitHub server (standard operations)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
    )

    # Custom wrapper adding business logic
    .with_mcp_server(
        "python",
        ["servers/github_wrapper.py"],
        env={
            "GITHUB_TOKEN": os.getenv("GITHUB_TOKEN"),
            "TEAM_CONFIG": "/config/team_rules.json"
        }
    )
    # This wrapper adds:
    # - Automatic PR template application
    # - Team-specific label management
    # - Custom approval workflows
    # - Automated testing triggers

    # Official Slack server (standard operations)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    # Custom wrapper adding formatting and routing
    .with_mcp_server(
        "python",
        ["servers/slack_wrapper.py"],
        env={
            "SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN"),
            "ROUTING_CONFIG": "/config/message_routing.json"
        }
    )
    # This wrapper adds:
    # - Rich message formatting
    # - Smart channel routing
    # - Message templates
    # - Mention management

    .build()
)

Integrating LangChain Tools

Traditional + MCP Integration

from langchain.tools import Tool
from langchain_community.tools import (
    DuckDuckGoSearchRun,
    WikipediaQueryRun,
    PythonREPLTool
)
from langchain_community.utilities import WikipediaAPIWrapper
import requests

# Custom LangChain tools
@tool
def fetch_weather(city: str) -> str:
    """Fetch current weather for a city."""
    api_key = os.getenv("WEATHER_API_KEY")
    url = f"https://api.weather.com/v1/current?city={city}&apikey={api_key}"
    response = requests.get(url)
    return response.json()

@tool
def calculate_tax(amount: float, tax_rate: float) -> str:
    """Calculate tax amount and total."""
    tax = amount * (tax_rate / 100)
    total = amount + tax
    return f"Tax: ${tax:.2f}, Total: ${total:.2f}"

# Comprehensive hybrid team
comprehensive_team = (MCPTeamBuilder("comprehensive_team")
    .with_llm(ChatOpenAI(model="gpt-4o"))

    # ===== MCP SERVERS (Structured Integration) =====

    # File operations
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
    )

    # Database
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")}
    )

    # Custom business server
    .with_mcp_server(
        "python",
        ["servers/business_logic_server.py"]
    )

    # ===== LANGCHAIN TOOLS (Quick Integration) =====

    .with_tools([
        # Search and research
        DuckDuckGoSearchRun(),
        WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()),

        # Code execution
        PythonREPLTool(),

        # Custom tools
        fetch_weather,
        calculate_tax
    ])

    .with_prompt("""You are a versatile assistant with comprehensive capabilities.

MCP CAPABILITIES (Structured):
- File operations (read, write, manage files)
- Database queries (PostgreSQL)
- Business logic (custom rules and workflows)

LANGCHAIN TOOLS (Flexible):
- Web search (DuckDuckGo)
- Wikipedia research
- Python code execution
- Weather lookup
- Tax calculations

Use MCP for structured operations and LangChain tools for flexible, ad-hoc tasks.
""")

    .build()
)

# Usage example
result = comprehensive_team({
    "messages": [HumanMessage(content="""
    Prepare quarterly report:
    1. Query sales database for Q4 data
    2. Calculate tax implications
    3. Search web for market trends
    4. Look up competitor info on Wikipedia
    5. Use Python to generate charts
    6. Check weather for upcoming conference
    7. Save report to file
    """)]
})

Tool Priority and Routing

class ToolRouter:
    """Route tool calls based on priority and availability."""

    def __init__(self):
        self.tool_priorities = {
            # High priority: MCP servers (structured, maintained)
            "mcp_file_*": 1,
            "mcp_database_*": 1,
            "mcp_business_*": 1,

            # Medium priority: Custom tools (flexible)
            "custom_*": 2,

            # Low priority: External tools (less reliable)
            "external_*": 3
        }

    def get_priority(self, tool_name: str) -> int:
        """Get tool priority."""
        for pattern, priority in self.tool_priorities.items():
            if pattern.replace("*", "") in tool_name:
                return priority
        return 999  # Unknown tools lowest priority

# Team with tool routing
routed_team = (MCPTeamBuilder("routed_team")
    .with_llm(llm)

    # High priority: MCP servers
    .with_mcp_server("npx", ["-y", "@modelcontextprotocol/server-filesystem"])
    .with_mcp_server("python", ["servers/business_server.py"])

    # Medium priority: Custom tools
    .with_tools([calculate_tax, fetch_weather])

    # Low priority: External tools (use as fallback)
    .with_tools([DuckDuckGoSearchRun()])

    .with_prompt("""You are a tool-routing assistant.

TOOL PRIORITY:
1. MCP servers (preferred for structured operations)
2. Custom tools (use for specific calculations)
3. External tools (fallback for general queries)

Always prefer higher priority tools when applicable.
""")

    .build()
)

Multi-Server Coordination

Sequential Workflows

# Team optimized for sequential workflows
sequential_team = (MCPTeamBuilder("sequential_workflow")
    .with_llm(ChatOpenAI(model="gpt-4o-mini", temperature=0))

    # Stage 1: Data Collection
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/input"]
    )

    .with_mcp_server_sse(
        url="https://api.example.com/data/sse",
        env={"API_KEY": os.getenv("DATA_API_KEY")}
    )

    # Stage 2: Processing
    .with_mcp_server(
        "python",
        ["servers/data_processor.py"]
    )

    .with_mcp_server(
        "python",
        ["servers/validator.py"]
    )

    # Stage 3: Analysis
    .with_mcp_server(
        "python",
        ["servers/analytics.py"]
    )

    # Stage 4: Output
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/output"]
    )

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    .with_prompt("""You are a data pipeline orchestrator.

WORKFLOW STAGES:

1. COLLECTION:
   - Read input files
   - Fetch API data

2. PROCESSING:
   - Process data
   - Validate results

3. ANALYSIS:
   - Run analytics

4. OUTPUT:
   - Write results
   - Send notifications

Execute stages in order. Each stage must complete before next begins.
""")

    .build()
)

# Usage
result = sequential_team({
    "messages": [HumanMessage(content="""
    Execute daily data pipeline:
    1. Read CSV files from /input
    2. Fetch supplementary data from API
    3. Process and merge datasets
    4. Validate data quality
    5. Run statistical analysis
    6. Write results to /output
    7. Send summary to Slack #data-team
    """)]
})

Parallel Operations

# Team optimized for parallel operations
parallel_team = (MCPTeamBuilder("parallel_operations")
    .with_llm(ChatOpenAI(model="gpt-4o"))

    # Multiple independent data sources
    .with_mcp_server_sse(
        url="https://api-1.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_1")},
        timeout=30
    )

    .with_mcp_server_sse(
        url="https://api-2.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_2")},
        timeout=30
    )

    .with_mcp_server_sse(
        url="https://api-3.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_3")},
        timeout=30
    )

    # Aggregation and output
    .with_mcp_server(
        "python",
        ["servers/aggregator.py"]
    )

    .with_prompt("""You are a parallel data aggregator.

DATA SOURCES (fetch in parallel):
- API 1: Financial data
- API 2: Market data
- API 3: News data

STRATEGY:
1. Fetch from all APIs simultaneously
2. Aggregate results
3. Combine into unified report

Optimize for parallel execution when possible.
""")

    .build()
)

Fallback Chains

# Team with fallback chains
fallback_team = (MCPTeamBuilder("fallback_chain")
    .with_llm(llm)

    # Primary: Fast local cache
    .with_mcp_server(
        "python",
        ["servers/cache_server.py"],
        timeout=5
    )

    # Secondary: Local database
    .with_mcp_server(
        "python",
        ["servers/db_server.py"],
        timeout=10
    )

    # Tertiary: Remote API (primary)
    .with_mcp_server_sse(
        url="https://api-primary.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY")},
        timeout=30
    )

    # Quaternary: Remote API (backup)
    .with_mcp_server_sse(
        url="https://api-backup.example.com/sse",
        env={"API_KEY": os.getenv("BACKUP_API_KEY")},
        timeout=30
    )

    .with_prompt("""You are a resilient data retrieval assistant.

FALLBACK CHAIN (try in order):
1. Cache (fastest)
2. Local database (fast)
3. Primary API (reliable)
4. Backup API (last resort)

If a source fails, automatically try the next in chain.
""")

    .skip_failed_servers(True)  # Continue if servers fail

    .build()
)

Use Case Patterns

Use Case 1: Full-Stack Development Assistant

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain.tools import PythonREPLTool
import os

# Comprehensive development assistant
dev_assistant = (MCPTeamBuilder("fullstack_dev_assistant")
    .with_llm(ChatOpenAI(model="gpt-4o", temperature=0.3))

    # === LOCAL DEVELOPMENT ===

    # Code management
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/project"]
    )

    # Local database
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": "postgresql://localhost/dev_db"}
    )

    # Custom code analysis
    .with_mcp_server(
        "python",
        ["servers/code_analyzer.py"],
        env={"ANALYSIS_CONFIG": "/config/analysis.json"}
    )

    # === VERSION CONTROL ===

    # GitHub integration
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")}
    )

    # === TESTING & QA ===

    # Custom test runner
    .with_mcp_server(
        "python",
        ["servers/test_runner.py"],
        env={"TEST_CONFIG": "/config/test_settings.json"}
    )

    # === DEPLOYMENT ===

    # CI/CD integration (remote)
    .with_mcp_server_sse(
        url="https://ci-cd-mcp.example.com/sse",
        env={"CICD_TOKEN": os.getenv("CICD_TOKEN")},
        timeout=60
    )

    # === MONITORING ===

    # Application monitoring (remote)
    .with_mcp_server_sse(
        url="https://monitoring-mcp.example.com/sse",
        env={"MONITOR_API_KEY": os.getenv("MONITOR_KEY")},
        timeout=30
    )

    # === COMMUNICATION ===

    # Team communication
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    # === LANGCHAIN TOOLS ===

    .with_tools([
        PythonREPLTool()  # Quick code execution
    ])

    .with_prompt("""You are an expert full-stack development assistant.

CAPABILITIES:

LOCAL DEV:
- Code management (read, write, refactor)
- Database operations (queries, migrations)
- Code analysis (quality, security, performance)

VERSION CONTROL:
- GitHub (commits, branches, PRs, issues)

TESTING & QA:
- Test execution and coverage analysis

DEPLOYMENT:
- CI/CD pipeline management

MONITORING:
- Application health and performance

COMMUNICATION:
- Team updates via Slack

WORKFLOW:
1. Write/modify code locally
2. Analyze code quality
3. Run tests
4. Commit to GitHub
5. Trigger CI/CD
6. Monitor deployment
7. Notify team
""")

    .build()
)

# Example: Complete feature implementation
result = dev_assistant({
    "messages": [HumanMessage(content="""
    Implement user authentication feature:

    1. Create user model and migration
    2. Implement authentication endpoints
    3. Add JWT token generation
    4. Write unit tests (aim for 90% coverage)
    5. Update API documentation
    6. Run full test suite
    7. Analyze code for security issues
    8. Create GitHub PR with changes
    9. Trigger CI/CD pipeline
    10. Monitor deployment
    11. Notify #engineering channel
    """)]
})

Use Case 2: Data Science Workbench

from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

# Comprehensive data science workbench
data_science_team = (MCPTeamBuilder("data_science_workbench")
    .with_llm(ChatOpenAI(model="gpt-4o", temperature=0.1))

    # === DATA ACCESS ===

    # Local datasets
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/datasets"]
    )

    # Database access
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATA_WAREHOUSE_URL")}
    )

    # === DATA PROCESSING ===

    # Custom data processing
    .with_mcp_server(
        "python",
        ["servers/data_processor.py"],
        env={"PROCESSING_CONFIG": "/config/data_processing.json"}
    )

    # Feature engineering
    .with_mcp_server(
        "python",
        ["servers/feature_engineer.py"]
    )

    # === ANALYSIS & MODELING ===

    # Statistical analysis
    .with_mcp_server(
        "python",
        ["servers/stats_server.py"]
    )

    # ML model training (remote GPU)
    .with_mcp_server_sse(
        url="https://ml-training.example.com/mcp/sse",
        env={"ML_API_KEY": os.getenv("ML_API_KEY")},
        timeout=300  # 5 minutes for model training
    )

    # === VISUALIZATION ===

    # Plotting server
    .with_mcp_server(
        "python",
        ["servers/visualization_server.py"],
        env={"OUTPUT_DIR": "/outputs/plots"}
    )

    # === EXTERNAL DATA ===

    # Web search for research
    .with_tools([
        WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
    ])

    # === COLLABORATION ===

    # Results sharing
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    .with_prompt("""You are a data science assistant.

FULL WORKFLOW:

1. DATA ACCESS:
   - Load from files or databases

2. PROCESSING:
   - Clean and transform data
   - Engineer features

3. ANALYSIS:
   - Statistical analysis
   - ML model training

4. VISUALIZATION:
   - Create plots and dashboards

5. RESEARCH:
   - External data lookup

6. COLLABORATION:
   - Share results with team

Always document methodology and results.
""")

    .build()
)

# Example: Complete ML pipeline
result = data_science_team({
    "messages": [HumanMessage(content="""
    Build customer churn prediction model:

    1. Load customer data from data warehouse
    2. Load historical churn data from CSV
    3. Clean and preprocess data
    4. Engineer features (RFM analysis)
    5. Run exploratory data analysis
    6. Create correlation heatmap
    7. Train multiple models (RF, XGBoost, Neural Net)
    8. Compare model performance
    9. Select best model
    10. Create ROC curve and confusion matrix
    11. Save model and results
    12. Research industry benchmarks on Wikipedia
    13. Share results in #data-science channel
    """)]
})

Use Case 3: Customer Support System

# Comprehensive customer support system
support_system = (MCPTeamBuilder("customer_support_system")
    .with_llm(ChatOpenAI(model="gpt-4o", temperature=0.7))

    # === CUSTOMER DATA ===

    # CRM integration
    .with_mcp_server(
        "python",
        ["servers/crm_server.py"],
        env={"CRM_API_KEY": os.getenv("CRM_API_KEY")}
    )

    # Order history
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("ORDERS_DB_URL")}
    )

    # === KNOWLEDGE BASE ===

    # Documentation access
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/knowledge_base"]
    )

    # FAQ database
    .with_mcp_server(
        "python",
        ["servers/faq_server.py"]
    )

    # === TICKETING ===

    # Ticket management (remote)
    .with_mcp_server_sse(
        url="https://tickets.example.com/mcp/sse",
        env={"TICKET_API_KEY": os.getenv("TICKET_API_KEY")},
        timeout=30
    )

    # === COMMUNICATION ===

    # Email support
    .with_mcp_server(
        "python",
        ["servers/email_server.py"],
        env={"EMAIL_API_KEY": os.getenv("EMAIL_API_KEY")}
    )

    # Slack for internal coordination
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    # === ANALYTICS ===

    # Support analytics
    .with_mcp_server(
        "python",
        ["servers/support_analytics.py"]
    )

    # === EXTERNAL ===

    # Web search for troubleshooting
    .with_tools([DuckDuckGoSearchRun()])

    .with_prompt("""You are an AI customer support assistant.

SUPPORT WORKFLOW:

1. IDENTIFY CUSTOMER:
   - Lookup in CRM
   - Get order history

2. UNDERSTAND ISSUE:
   - Search knowledge base
   - Check FAQ database

3. RESOLVE OR ESCALATE:
   - Provide solution if available
   - Create ticket if escalation needed

4. COMMUNICATE:
   - Send email response
   - Update ticket status
   - Alert team if urgent

5. TRACK:
   - Log interaction
   - Update analytics

Always be empathetic and solution-focused.
""")

    .build()
)

# Example: Handle customer inquiry
result = support_system({
    "messages": [HumanMessage(content="""
    Customer inquiry from sarah@example.com:
    "My order #54321 hasn't arrived yet. It's been 3 weeks!"

    Please:
    1. Look up customer profile
    2. Check order status
    3. Review shipping information
    4. Search knowledge base for shipping delays
    5. If issue unresolved, create support ticket
    6. Send empathetic response to customer
    7. Alert #support-urgent if critical
    8. Log interaction for analytics
    """)]
})

Performance Optimization

Caching Strategy

# Team with multi-level caching
cached_team = (MCPTeamBuilder("cached_team")
    .with_llm(llm)

    # L1 Cache: In-memory (fastest)
    .with_mcp_server(
        "python",
        ["servers/memory_cache.py"],
        timeout=5
    )

    # L2 Cache: Redis (fast)
    .with_mcp_server(
        "python",
        ["servers/redis_cache.py"],
        env={"REDIS_URL": os.getenv("REDIS_URL")},
        timeout=10
    )

    # L3 Cache: File system (medium)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/cache"]
    )

    # Primary data source: Database (slower)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL")}
    )

    # Remote API (slowest)
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY")},
        timeout=60
    )

    .with_prompt("""You are a performance-optimized assistant.

CACHING STRATEGY:
1. Check L1 (memory) - < 10ms
2. Check L2 (Redis) - < 50ms
3. Check L3 (filesystem) - < 200ms
4. Query database - < 1s
5. Call remote API - < 5s

Always try caches before hitting slower sources.
Update caches when fetching from slower sources.
""")

    .build()
)

Connection Pooling

# Team with connection pooling
pooled_team = (MCPTeamBuilder("pooled_team")
    .with_llm(llm)

    # Database with connection pooling
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={
            "POSTGRES_CONNECTION_STRING": (
                "postgresql://user:pass@localhost/db"
                "?pool_size=10"              # Connection pool
                "&max_overflow=20"           # Max extra connections
                "&pool_timeout=30"           # Connection timeout
                "&pool_recycle=3600"         # Recycle after 1 hour
            )
        }
    )

    # HTTP client with connection pooling
    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={
            "API_KEY": os.getenv("API_KEY"),
            "POOL_CONNECTIONS": "10",        # Connection pool size
            "POOL_MAXSIZE": "20",            # Max pool size
            "MAX_RETRIES": "3"               # Retry failed requests
        },
        timeout=30
    )

    .build()
)

Lazy Loading

class LazyLoadTeam:
    """Team with lazy server initialization."""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini")
        self.servers_initialized = False
        self.team = None

    def _initialize_servers(self):
        """Initialize servers on first use."""
        if self.servers_initialized:
            return

        self.team = (MCPTeamBuilder("lazy_team")
            .with_llm(self.llm)

            # Core servers (always load)
            .with_mcp_server(
                "npx",
                ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
            )

            # Additional servers loaded on demand
            .build()
        )

        self.servers_initialized = True

    def execute(self, task: str):
        """Execute with lazy initialization."""
        self._initialize_servers()
        return self.team({
            "messages": [HumanMessage(content=task)]
        })

    def add_server_if_needed(self, capability: str):
        """Dynamically add servers based on capability."""
        if capability == "database":
            # Add database server
            pass
        elif capability == "github":
            # Add GitHub server
            pass

# Usage
lazy_team = LazyLoadTeam()

# Servers initialized on first use
result = lazy_team.execute("List files")

Error Handling and Resilience

Graceful Degradation

# Team with graceful degradation
resilient_team = (MCPTeamBuilder("resilient_team")
    .with_llm(llm)

    # Critical servers (must be available)
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    .with_mcp_server(
        "python",
        ["servers/core_server.py"]
    )

    # Optional servers (can fail)
    .with_mcp_server_sse(
        url="https://optional-api-1.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_1")},
        timeout=30
    )

    .with_mcp_server_sse(
        url="https://optional-api-2.example.com/sse",
        env={"API_KEY": os.getenv("API_KEY_2")},
        timeout=30
    )

    # Enable graceful degradation
    .skip_failed_servers(True)  # Continue if optional servers fail

    .with_prompt("""You are a resilient assistant.

CRITICAL CAPABILITIES (always available):
- File operations
- Core business logic

OPTIONAL CAPABILITIES (may be unavailable):
- External API 1
- External API 2

If optional services are unavailable:
- Inform user
- Use alternative approaches
- Complete core functionality
""")

    .build()
)

Retry Logic

import time
from typing import Any, Callable

class RetryWrapper:
    """Wrap team execution with retry logic."""

    def __init__(self, team, max_retries: int = 3):
        self.team = team
        self.max_retries = max_retries

    def execute_with_retry(
        self,
        task: str,
        backoff_factor: float = 2.0
    ) -> Any:
        """Execute with exponential backoff retry."""

        for attempt in range(self.max_retries):
            try:
                result = self.team({
                    "messages": [HumanMessage(content=task)]
                })
                return result

            except Exception as e:
                if attempt < self.max_retries - 1:
                    wait_time = backoff_factor ** attempt
                    logger.warning(
                        f"Attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {wait_time}s..."
                    )
                    time.sleep(wait_time)
                else:
                    logger.error(f"All {self.max_retries} attempts failed")
                    raise

# Usage
team = (MCPTeamBuilder("retry_team")
    .with_llm(llm)
    .with_mcp_server_sse(
        url="https://unreliable-api.example.com/sse",
        timeout=30
    )
    .build()
)

retry_wrapper = RetryWrapper(team, max_retries=3)
result = retry_wrapper.execute_with_retry("Fetch data")

Circuit Breaker

from datetime import datetime, timedelta

class CircuitBreaker:
    """Circuit breaker pattern for server calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = {}
        self.last_failure_time = {}

    def is_open(self, server_id: str) -> bool:
        """Check if circuit is open (server unavailable)."""
        if server_id not in self.failures:
            return False

        # Check if timeout has passed
        if server_id in self.last_failure_time:
            time_since_failure = (
                datetime.now() - self.last_failure_time[server_id]
            ).total_seconds()

            if time_since_failure > self.timeout:
                # Reset circuit
                self.failures[server_id] = 0
                return False

        return self.failures[server_id] >= self.failure_threshold

    def record_failure(self, server_id: str):
        """Record a failure."""
        self.failures[server_id] = self.failures.get(server_id, 0) + 1
        self.last_failure_time[server_id] = datetime.now()

    def record_success(self, server_id: str):
        """Record a success (reset counter)."""
        self.failures[server_id] = 0

# Usage with team
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def execute_with_circuit_breaker(team, task: str, server_id: str):
    """Execute with circuit breaker protection."""

    if circuit_breaker.is_open(server_id):
        logger.warning(f"Circuit open for {server_id}, using fallback")
        # Use fallback logic
        return fallback_execution(task)

    try:
        result = team({"messages": [HumanMessage(content=task)]})
        circuit_breaker.record_success(server_id)
        return result
    except Exception as e:
        circuit_breaker.record_failure(server_id)
        raise

Security Considerations

Authentication and Authorization

# Team with comprehensive security
secure_team = (MCPTeamBuilder("secure_team")
    .with_llm(llm)

    # Servers with authentication
    .with_mcp_server_sse(
        url="https://secure-api.example.com/sse",
        env={
            "AUTH_TOKEN": os.getenv("SECURE_API_TOKEN"),
            "CLIENT_ID": os.getenv("CLIENT_ID"),
            "CLIENT_SECRET": os.getenv("CLIENT_SECRET")
        },
        timeout=30
    )

    # GitHub with fine-grained token
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={
            "GITHUB_TOKEN": os.getenv("GITHUB_PAT"),  # Personal Access Token
            # Token scope: repo, read:org (minimal required)
        }
    )

    # Database with read-only user
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={
            "POSTGRES_CONNECTION_STRING": (
                f"postgresql://{os.getenv('DB_READONLY_USER')}:"
                f"{os.getenv('DB_READONLY_PASS')}@localhost/db"
            )
        }
    )

    # Filesystem with restricted paths
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem",
         "/data/safe_directory"]  # Only this directory accessible
    )

    .build()
)

Secrets Management

from dotenv import load_dotenv
import os

# Load secrets from environment
load_dotenv()  # Loads from .env file

def get_secret(key: str, required: bool = True) -> str:
    """Get secret from environment with validation."""
    value = os.getenv(key)

    if required and not value:
        raise ValueError(f"Required secret not found: {key}")

    return value

# Build team with secure secrets management
secure_secrets_team = (MCPTeamBuilder("secure_secrets")
    .with_llm(llm)

    .with_mcp_server_sse(
        url="https://api.example.com/sse",
        env={
            "API_KEY": get_secret("API_KEY"),
            "API_SECRET": get_secret("API_SECRET")
        }
    )

    .with_mcp_server(
        "python",
        ["servers/secure_server.py"],
        env={
            "DB_PASSWORD": get_secret("DB_PASSWORD"),
            "ENCRYPTION_KEY": get_secret("ENCRYPTION_KEY")
        }
    )

    .build()
)

# ❌ NEVER do this:
# .with_mcp_server_sse(
#     url="https://api.example.com/sse",
#     env={"API_KEY": "hardcoded-key-12345"}  # INSECURE!
# )

Input Validation

# Team with input validation
validated_team = (MCPTeamBuilder("validated_team")
    .with_llm(llm)

    # Custom validation server
    .with_mcp_server(
        "python",
        ["servers/validation_server.py"],
        env={"VALIDATION_RULES": "/config/validation.json"}
    )
    # This server validates all inputs before processing

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )

    .with_prompt("""You are a security-conscious assistant.

SECURITY RULES:
- Validate all inputs before processing
- Sanitize file paths (no ../ or absolute paths to sensitive areas)
- Validate SQL to prevent injection
- Check file extensions before reading
- Limit file sizes
- Validate URLs before fetching

Always use validation_server to check inputs first.
""")

    .build()
)

Complete Examples

Example 1: Enterprise Integration Hub

"""
Enterprise Integration Hub - Complete hybrid team example.

Integrates multiple enterprise systems:
- CRM (Salesforce)
- ERP (SAP)
- HR (Workday)
- Communication (Slack, Email)
- Documentation (Confluence)
- Version Control (GitHub)
"""

from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import os

# Build enterprise integration hub
enterprise_hub = (MCPTeamBuilder("enterprise_integration_hub")
    .with_llm(ChatOpenAI(model="gpt-4o", temperature=0.2))

    # === CUSTOMER RELATIONSHIP (CRM) ===
    .with_mcp_server(
        "python",
        ["servers/salesforce_server.py"],
        env={
            "SALESFORCE_USERNAME": os.getenv("SF_USERNAME"),
            "SALESFORCE_PASSWORD": os.getenv("SF_PASSWORD"),
            "SALESFORCE_TOKEN": os.getenv("SF_SECURITY_TOKEN")
        },
        timeout=30
    )

    # === ENTERPRISE RESOURCE PLANNING (ERP) ===
    .with_mcp_server_sse(
        url="https://sap-mcp.enterprise.com/sse",
        env={
            "SAP_CLIENT_ID": os.getenv("SAP_CLIENT_ID"),
            "SAP_CLIENT_SECRET": os.getenv("SAP_CLIENT_SECRET")
        },
        timeout=60
    )

    # === HUMAN RESOURCES ===
    .with_mcp_server_sse(
        url="https://workday-mcp.enterprise.com/sse",
        env={"WORKDAY_API_KEY": os.getenv("WORKDAY_API_KEY")},
        timeout=45
    )

    # === DOCUMENTATION ===
    .with_mcp_server(
        "python",
        ["servers/confluence_server.py"],
        env={
            "CONFLUENCE_URL": os.getenv("CONFLUENCE_URL"),
            "CONFLUENCE_USER": os.getenv("CONFLUENCE_USER"),
            "CONFLUENCE_API_TOKEN": os.getenv("CONFLUENCE_TOKEN")
        }
    )

    # === VERSION CONTROL ===
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": os.getenv("GITHUB_ENTERPRISE_TOKEN")}
    )

    # === COMMUNICATION ===
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-slack"],
        env={"SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN")}
    )

    .with_mcp_server(
        "python",
        ["servers/email_server.py"],
        env={
            "SMTP_SERVER": os.getenv("SMTP_SERVER"),
            "SMTP_USER": os.getenv("SMTP_USER"),
            "SMTP_PASSWORD": os.getenv("SMTP_PASSWORD")
        }
    )

    # === LOCAL DATA ===
    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-filesystem", "/enterprise_data"]
    )

    .with_mcp_server(
        "npx",
        ["-y", "@modelcontextprotocol/server-postgres"],
        env={"POSTGRES_CONNECTION_STRING": os.getenv("ENTERPRISE_DB_URL")}
    )

    # === CUSTOM BUSINESS LOGIC ===
    .with_mcp_server(
        "python",
        ["servers/workflow_automation.py"],
        env={"WORKFLOW_CONFIG": "/config/enterprise_workflows.yaml"}
    )

    .skip_failed_servers(True)  # Graceful degradation

    .with_prompt("""You are an Enterprise Integration Assistant managing complex business workflows across multiple systems.

INTEGRATED SYSTEMS:
- CRM (Salesforce): Customer data, opportunities, leads
- ERP (SAP): Financial data, inventory, procurement
- HR (Workday): Employee data, payroll, time tracking
- Docs (Confluence): Knowledge base, documentation
- VCS (GitHub): Code repositories, issues, PRs
- Comm (Slack/Email): Team communication
- Data (Files/DB): Local storage and queries

CAPABILITIES:
1. Cross-system data synchronization
2. Automated workflow orchestration
3. Real-time reporting and analytics
4. Intelligent routing and escalation
5. Compliance and audit logging

BEST PRACTICES:
- Always verify data consistency across systems
- Log all cross-system transactions
- Handle errors gracefully with fallbacks
- Maintain data privacy and security
- Provide clear audit trails
""")

    .build()
)

# Example: Complete business process
result = enterprise_hub({
    "messages": [HumanMessage(content="""
    Execute new employee onboarding for John Smith:

    1. HR (Workday):
       - Verify employee record
       - Get start date, department, role

    2. IT Setup:
       - Create GitHub account
       - Add to appropriate teams/repos
       - Grant access levels based on role

    3. CRM (Salesforce):
       - Create user account if customer-facing role
       - Assign territories and accounts

    4. Documentation (Confluence):
       - Create personal onboarding page
       - Link to department documentation
       - Add to team spaces

    5. Communication:
       - Send welcome email with instructions
       - Add to Slack channels: #general, #department
       - Notify manager of completion

    6. Tracking:
       - Log onboarding workflow in database
       - Create onboarding checklist file
       - Schedule follow-up tasks

    7. ERP (SAP):
       - Verify budget allocation for new hire
       - Create procurement request for equipment

    Execute all steps and report status at each stage.
    """)]
})

print("Onboarding result:")
print(result["messages"][-1].content)

Summary

Hybrid Teams provide:

  1. Maximum Flexibility: Combine any tool sources
  2. Optimal Performance: Use best transport for each task
  3. Resilience: Graceful degradation and fallbacks
  4. Scalability: Mix local and remote resources
  5. Security: Layered authentication and authorization
  6. Maintainability: Modular architecture

Key Takeaways:

  • Combine official servers, custom servers, and LangChain tools
  • Mix STDIO (fast, local) and SSE (scalable, remote) transports
  • Implement caching, pooling, and lazy loading for performance
  • Use graceful degradation, retries, and circuit breakers for resilience
  • Follow security best practices for authentication and secrets
  • Design for specific use cases (dev, data science, support, enterprise)

When to Use Hybrid Teams:

  • Complex workflows requiring multiple capabilities
  • Need both speed (local) and scale (remote)
  • Combining standard tools with custom business logic
  • Production systems requiring high availability
  • Enterprise integrations across multiple systems
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs