The MCPTeamBuilder class provides a fluent API for constructing agent teams with Model Context Protocol (MCP) integration. This guide covers all available methods, configuration options, and advanced usage patterns.
Class Overview
from azcore.agents import MCPTeamBuilder
class MCPTeamBuilder(BaseTeam):
"""
Fluent builder for creating MCP-enabled agent teams.
Inherits from BaseTeam and adds MCP-specific functionality including:
- MCP server integration (STDIO and SSE transports)
- Automatic tool discovery from MCP servers
- Multi-server support
- RL-enabled tool selection
- Graceful degradation when MCP is unavailable
"""
Key Features
- Fluent API: Method chaining for readable team construction
- Multi-Server Support: Connect to multiple MCP servers simultaneously
- Transport Flexibility: Support for both STDIO and SSE transports
- Automatic Tool Discovery: MCP tools automatically registered with agents
- RL Integration: Optional reinforcement learning for tool selection
- Type Safety: Full type hints for IDE support
- Graceful Degradation: Works even when MCP dependencies are unavailable
Constructor
__init__(name: str)
Initialize a new MCP team builder.
Parameters:
name(str): Unique identifier for the team
Returns:
- MCPTeamBuilder instance for method chaining
Example:
from azcore.agents import MCPTeamBuilder
# Create a new builder
builder = MCPTeamBuilder("research_team")
# Or use method chaining immediately
team = MCPTeamBuilder("data_team").with_llm(llm).build()
Internal State Initialized:
self._mcp_servers: List[Dict[str, Any]] = [] # Server configurations
self._mcp_sessions: List[Any] = [] # Active MCP sessions
self._mcp_tools: List[BaseTool] = [] # Discovered tools
self._mcp_enabled: bool = False # MCP availability flag
self._mcp_client: Optional[Any] = None # MultiServerMCPClient
self._rl_enabled: bool = False # RL feature flag
self._rl_manager: Optional[Any] = None # RLManager instance
Core Configuration Methods
with_llm(llm: BaseLanguageModel) -> MCPTeamBuilder
Configure the language model for the team.
Parameters:
llm(BaseLanguageModel): LangChain-compatible LLM instance
Returns:
- Self for method chaining
Example:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI GPT-4
builder = MCPTeamBuilder("team1").with_llm(
ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
)
# Anthropic Claude
builder = MCPTeamBuilder("team2").with_llm(
ChatAnthropic(model="claude-3-5-sonnet-20241022", temperature=0.5)
)
# Custom configuration
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.3,
max_tokens=2000,
timeout=30
)
builder = MCPTeamBuilder("team3").with_llm(llm)
Best Practices:
- Use lower temperature (0.0-0.3) for deterministic tool usage
- Use higher temperature (0.7-1.0) for creative tasks
- Set appropriate max_tokens based on expected response length
- Configure timeouts for production environments
with_prompt(prompt: str) -> MCPTeamBuilder
Set the system prompt for the agent.
Parameters:
prompt(str): System prompt defining agent behavior and capabilities
Returns:
- Self for method chaining
Example:
# Basic prompt
builder = MCPTeamBuilder("assistant").with_prompt(
"You are a helpful assistant with access to MCP tools."
)
# Detailed prompt with tool guidance
prompt = """You are a research assistant with access to the following capabilities:
- File system operations (read, write, list)
- Web search
- Data analysis
Always:
1. Confirm tool availability before using them
2. Provide clear explanations of your actions
3. Handle errors gracefully
4. Verify results before presenting to the user
When using file operations, always check if files exist before reading.
When searching the web, cite your sources.
When analyzing data, explain your methodology.
"""
builder = MCPTeamBuilder("research_assistant").with_prompt(prompt)
# Role-specific prompt
data_analyst_prompt = """You are a data analysis specialist with access to:
- Statistical analysis tools
- Data visualization capabilities
- Database query tools
Your task is to:
- Analyze datasets for patterns and insights
- Create clear visualizations
- Provide actionable recommendations
- Explain statistical significance
Always verify data quality before analysis.
"""
builder = MCPTeamBuilder("data_analyst").with_prompt(data_analyst_prompt)
Best Practices:
- Be specific about tool capabilities and usage guidelines
- Include error handling instructions
- Define expected output format
- Set clear boundaries and limitations
- Use structured format (numbered lists, sections) for complex prompts
with_tools(tools: List[BaseTool]) -> MCPTeamBuilder
Add custom LangChain tools to the team (in addition to MCP tools).
Parameters:
tools(List[BaseTool]): List of LangChain BaseTool instances
Returns:
- Self for method chaining
Example:
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
# Add search tool
search_tool = DuckDuckGoSearchRun()
builder = MCPTeamBuilder("research_team").with_tools([search_tool])
# Custom tool
def calculate_discount(price: float, discount_percent: float) -> float:
"""Calculate discounted price."""
return price * (1 - discount_percent / 100)
discount_tool = Tool(
name="calculate_discount",
func=calculate_discount,
description="Calculate final price after applying discount percentage"
)
builder = MCPTeamBuilder("sales_team").with_tools([discount_tool])
# Multiple custom tools
from langchain.tools import ShellTool, PythonREPLTool
tools = [
ShellTool(),
PythonREPLTool(),
DuckDuckGoSearchRun(),
discount_tool
]
builder = MCPTeamBuilder("dev_team").with_tools(tools)
Tool Combination:
- MCP tools are automatically added during
build() - Custom tools specified here are combined with MCP tools
- All tools are available to the agent simultaneously
- Tool names must be unique across MCP and custom tools
Best Practices:
- Provide clear tool descriptions for better agent decision-making
- Avoid tool name conflicts between custom and MCP tools
- Test tools independently before adding to team
- Document tool capabilities in the system prompt
MCP Server Methods
with_mcp_server(command: str, args: List[str], ...) -> MCPTeamBuilder
Add an MCP server with STDIO transport.
Parameters:
command(str): Executable command (e.g., "python", "node")args(List[str]): Command arguments (e.g., ["server.py"])env(Optional[Dict[str, str]]): Environment variablestimeout(Optional[int]): Connection timeout in seconds (default: 10)
Returns:
- Self for method chaining
Example:
# Basic Python MCP server
builder = MCPTeamBuilder("team1").with_mcp_server(
command="python",
args=["mcp_server.py"]
)
# Node.js MCP server
builder = MCPTeamBuilder("team2").with_mcp_server(
command="node",
args=["server.js"]
)
# With custom timeout
builder = MCPTeamBuilder("team3").with_mcp_server(
command="python",
args=["slow_server.py"],
timeout=30 # 30 seconds for slow startup
)
# With environment variables
builder = MCPTeamBuilder("team4").with_mcp_server(
command="python",
args=["api_server.py"],
env={
"API_KEY": "your-api-key",
"LOG_LEVEL": "DEBUG",
"DATABASE_URL": "postgresql://localhost/db"
}
)
# Absolute path to server
import os
server_path = os.path.join(os.getcwd(), "servers", "mcp_server.py")
builder = MCPTeamBuilder("team5").with_mcp_server(
command="python",
args=[server_path],
timeout=15
)
Server Configuration Storage:
# Internal representation
server_config = {
"command": "python",
"args": ["server.py"],
"env": {"API_KEY": "secret"},
"transport": "stdio",
"timeout": 10
}
self._mcp_servers.append(server_config)
Best Practices:
- Use absolute paths for server scripts in production
- Set appropriate timeouts based on server startup time
- Store sensitive data in environment variables
- Test server connectivity before deploying
with_mcp_server_sse(url: str, ...) -> MCPTeamBuilder
Add an MCP server with SSE (Server-Sent Events) transport.
Parameters:
url(str): Server URL (e.g., "http://localhost:8000/sse")env(Optional[Dict[str, str]]): Environment variables (for authentication)timeout(Optional[int]): Connection timeout in seconds (default: 30)sse_read_timeout(Optional[int]): SSE read timeout in seconds (default: 60)
Returns:
- Self for method chaining
Example:
# Basic SSE server
builder = MCPTeamBuilder("team1").with_mcp_server_sse(
url="http://localhost:8000/sse"
)
# Remote server with authentication
builder = MCPTeamBuilder("team2").with_mcp_server_sse(
url="https://api.example.com/mcp/sse",
env={
"AUTH_TOKEN": "Bearer your-token-here",
"API_VERSION": "v1"
}
)
# Custom timeouts for slow networks
builder = MCPTeamBuilder("team3").with_mcp_server_sse(
url="http://remote-server.com:8080/sse",
timeout=60, # 60 seconds connection timeout
sse_read_timeout=120 # 2 minutes read timeout
)
# Local development server
builder = MCPTeamBuilder("dev_team").with_mcp_server_sse(
url="http://127.0.0.1:5000/sse",
timeout=10,
sse_read_timeout=30
)
# Production server with full configuration
builder = MCPTeamBuilder("prod_team").with_mcp_server_sse(
url="https://mcp.production.com/sse",
env={
"AUTH_TOKEN": os.getenv("MCP_AUTH_TOKEN"),
"TENANT_ID": os.getenv("TENANT_ID"),
"REGION": "us-west-2"
},
timeout=45,
sse_read_timeout=90
)
Server Configuration Storage:
# Internal representation
server_config = {
"url": "http://localhost:8000/sse",
"transport": "sse",
"env": {"AUTH_TOKEN": "Bearer token"},
"timeout": 30,
"sse_read_timeout": 60
}
self._mcp_servers.append(server_config)
When to Use SSE vs STDIO:
| Criterion | STDIO | SSE |
|---|---|---|
| Location | Local process | Remote server |
| Latency | Low (IPC) | Higher (network) |
| Scalability | Limited | High |
| Deployment | Simple | Complex |
| Security | Process isolation | HTTPS/Auth |
| Use Case | Development, single-user | Production, multi-user |
Best Practices:
- Use HTTPS in production for security
- Implement proper authentication with tokens
- Set conservative timeouts for reliability
- Handle network errors gracefully
- Monitor connection health
Multiple Server Configuration
You can add multiple MCP servers to a single team:
builder = (MCPTeamBuilder("multi_server_team")
# Local file operations server
.with_mcp_server(
command="python",
args=["file_server.py"]
)
# Remote API server
.with_mcp_server_sse(
url="https://api.example.com/mcp/sse",
env={"API_KEY": os.getenv("API_KEY")}
)
# Local database server
.with_mcp_server(
command="python",
args=["db_server.py"],
env={"DB_URL": "postgresql://localhost/mydb"}
)
.with_llm(llm)
.build()
)
Tool Namespace Management:
When using multiple servers, tools from all servers are combined:
# Server 1 provides: file_read, file_write
# Server 2 provides: web_search, web_fetch
# Server 3 provides: db_query, db_insert
# All tools are available to the agent:
team = builder.build()
# team has access to: file_read, file_write, web_search, web_fetch, db_query, db_insert
Handling Name Conflicts:
If two servers provide tools with the same name, the last server added wins:
# Both servers provide a "search" tool
builder = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server1.py"]) # search v1
.with_mcp_server("python", ["server2.py"]) # search v2 (will be used)
)
Best Practice: Use prefixed tool names in MCP servers to avoid conflicts:
file_search,web_search,db_searchinstead ofsearch
Tool Management Methods
with_manual_tools() -> MCPTeamBuilder
Disable automatic tool discovery and fetch tools manually.
Parameters:
- None
Returns:
- Self for method chaining
Use Case:
- When you need to inspect tools before adding them
- When you want to filter or modify tools
- When you need custom tool initialization logic
Example:
# Manual tool management
builder = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server.py"])
.with_manual_tools() # Disable auto-discovery
.with_llm(llm)
)
team = builder.build()
# Manually fetch tools after build
tools = team.fetch_mcp_tools()
# Inspect tools
print(f"Available tools: {[t.name for t in tools]}")
for tool in tools:
print(f" {tool.name}: {tool.description}")
# Filter tools
allowed_tools = ["file_read", "file_write"]
filtered_tools = [t for t in tools if t.name in allowed_tools]
# Add filtered tools
team.add_tools(filtered_tools)
Comparison:
| Mode | Auto-Discovery (default) | Manual Tools |
|---|---|---|
| Setup | Automatic | Explicit |
| Control | Limited | Full |
| Filtering | Not available | Available |
| Inspection | Not available | Available |
| Complexity | Simple | More complex |
Best Practices:
- Use manual tools for security-sensitive applications
- Inspect tool descriptions before adding
- Validate tool schemas
- Log tool additions for audit trails
fetch_mcp_tools() -> List[BaseTool]
Manually fetch tools from all configured MCP servers.
Returns:
- List of LangChain BaseTool instances
Example:
# Build team with manual tools
team = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server.py"])
.with_manual_tools()
.with_llm(llm)
.build()
)
# Fetch tools
tools = team.fetch_mcp_tools()
# Analyze tools
print(f"Total tools: {len(tools)}")
for tool in tools:
print(f"\nTool: {tool.name}")
print(f"Description: {tool.description}")
print(f"Schema: {tool.args_schema}")
# Check if tool requires authentication
if "auth" in tool.description.lower():
print(" ⚠️ Requires authentication")
# Use only safe tools
safe_tools = [t for t in tools if "delete" not in t.name.lower()]
team.add_tools(safe_tools)
Advanced Filtering:
def filter_tools(tools: List[BaseTool],
allowed_categories: List[str],
forbidden_keywords: List[str]) -> List[BaseTool]:
"""Filter tools based on categories and forbidden keywords."""
filtered = []
for tool in tools:
# Check category (from tool description)
has_allowed_category = any(
cat.lower() in tool.description.lower()
for cat in allowed_categories
)
# Check forbidden keywords
has_forbidden = any(
keyword.lower() in tool.name.lower() or
keyword.lower() in tool.description.lower()
for keyword in forbidden_keywords
)
if has_allowed_category and not has_forbidden:
filtered.append(tool)
return filtered
# Usage
tools = team.fetch_mcp_tools()
safe_tools = filter_tools(
tools,
allowed_categories=["file", "search", "analysis"],
forbidden_keywords=["delete", "remove", "destroy", "drop"]
)
team.add_tools(safe_tools)
add_tools(tools: List[BaseTool]) -> None
Add tools to the team after build.
Parameters:
tools(List[BaseTool]): Tools to add
Returns:
- None
Example:
# Build team
team = builder.build()
# Add MCP tools manually
mcp_tools = team.fetch_mcp_tools()
team.add_tools(mcp_tools)
# Add custom tools
from langchain_community.tools import DuckDuckGoSearchRun
custom_tools = [DuckDuckGoSearchRun()]
team.add_tools(custom_tools)
# Conditional tool addition
if user_has_admin_access:
admin_tools = team.fetch_mcp_tools_from_server("admin_server")
team.add_tools(admin_tools)
Reinforcement Learning Methods
with_rl(rl_manager: RLManager) -> MCPTeamBuilder
Enable reinforcement learning for tool selection optimization.
Parameters:
rl_manager(RLManager): Configured RL manager instance
Returns:
- Self for method chaining
Example:
from azcore.rl import RLManager
# Create RL manager
rl_manager = RLManager(
state_dim=128, # State embedding dimension
action_dim=10, # Number of tools
learning_rate=0.001,
gamma=0.99, # Discount factor
epsilon=0.1 # Exploration rate
)
# Build team with RL
team = (MCPTeamBuilder("rl_team")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.with_rl(rl_manager) # Enable RL
.build()
)
# RL manager will optimize tool selection over time
for i in range(100):
result = team({"messages": [HumanMessage(content=f"Task {i}")]})
# RL manager learns from task outcomes
How RL Works with MCP:
- State: Current conversation context and available tools
- Action: Tool selection decision
- Reward: Based on task success and efficiency
- Learning: Q-learning or policy gradient updates
RL Configuration:
# Advanced RL configuration
rl_manager = RLManager(
state_dim=256,
action_dim=len(mcp_tools),
learning_rate=0.0001,
gamma=0.95,
epsilon_start=0.3, # High exploration initially
epsilon_end=0.05, # Low exploration after training
epsilon_decay=0.995,
replay_buffer_size=10000,
batch_size=64,
target_update_freq=100
)
team = (MCPTeamBuilder("advanced_rl_team")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.with_rl(rl_manager)
.build()
)
Training Loop:
from langchain.schema import HumanMessage
# Training phase
for episode in range(1000):
task = generate_task() # Your task generation logic
result = team({
"messages": [HumanMessage(content=task)]
})
# Calculate reward
reward = evaluate_result(result, task)
# Update RL manager
rl_manager.update(reward)
if episode % 100 == 0:
print(f"Episode {episode}, Avg Reward: {rl_manager.avg_reward}")
# Save trained model
rl_manager.save("trained_model.pkl")
# Production usage with trained model
rl_manager_prod = RLManager.load("trained_model.pkl")
prod_team = (MCPTeamBuilder("prod_team")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.with_rl(rl_manager_prod)
.build()
)
Benefits of RL with MCP:
- Optimized Tool Selection: Learns which tools work best for different tasks
- Reduced Latency: Faster tool selection with experience
- Improved Success Rate: Higher task completion rate over time
- Adaptive Behavior: Adjusts to changing tool performance
Build and Execution
build() -> MCPTeam
Build the final MCP team with all configurations.
Returns:
- MCPTeam instance ready for execution
Build Process:
# Internal build steps
def build(self) -> MCPTeam:
# 1. Validate configuration
self._validate_config()
# 2. Initialize MCP client (if servers configured)
if self._mcp_servers:
self._init_mcp_client()
# 3. Discover tools (unless manual mode)
if not self._manual_tools:
self._discover_tools()
# 4. Combine MCP tools with custom tools
all_tools = self._mcp_tools + self._custom_tools
# 5. Create agent with tools
self._create_agent(all_tools)
# 6. Set up RL (if enabled)
if self._rl_enabled:
self._setup_rl()
# 7. Return configured team
return MCPTeam(self)
Example:
# Complete team build
team = (MCPTeamBuilder("research_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["file_server.py"])
.with_mcp_server_sse("http://localhost:8000/sse")
.with_tools([DuckDuckGoSearchRun()])
.with_prompt("You are a research assistant.")
.build() # Returns MCPTeam instance
)
# Team is now ready for use
result = team({"messages": [HumanMessage(content="Research AI trends")]})
Build Validation:
The builder validates configuration during build():
# Required fields
assert self._llm is not None, "LLM must be configured"
assert self._prompt is not None, "Prompt must be set"
# MCP configuration
if self._mcp_servers:
assert all(
"command" in s or "url" in s
for s in self._mcp_servers
), "Invalid MCP server configuration"
# RL configuration
if self._rl_enabled:
assert self._rl_manager is not None, "RL manager required"
Error Handling:
try:
team = builder.build()
except ValueError as e:
print(f"Configuration error: {e}")
except ImportError as e:
print(f"Missing dependency: {e}")
except Exception as e:
print(f"Build failed: {e}")
Executing the Team
Once built, execute the team with input messages:
from langchain.schema import HumanMessage, AIMessage
# Single message
result = team({
"messages": [
HumanMessage(content="What files are in the current directory?")
]
})
print(result["output"])
# Conversation with history
result = team({
"messages": [
HumanMessage(content="List files in /data"),
AIMessage(content="Found 3 files: a.txt, b.txt, c.txt"),
HumanMessage(content="Read a.txt")
]
})
# Streaming responses
for chunk in team.stream({"messages": [HumanMessage(content="Search for AI papers")]}):
print(chunk["output"], end="", flush=True)
# Async execution
import asyncio
async def run_async():
result = await team.ainvoke({
"messages": [HumanMessage(content="Analyze this data")]
})
print(result["output"])
asyncio.run(run_async())
Advanced Patterns
Pattern 1: Multi-Server Team with Specialized Roles
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
# Create team with multiple specialized MCP servers
research_team = (MCPTeamBuilder("research_team")
.with_llm(ChatOpenAI(model="gpt-4o"))
# File operations server
.with_mcp_server(
command="python",
args=["servers/file_ops_server.py"],
env={"WORKSPACE": "/data/research"}
)
# Web search server
.with_mcp_server_sse(
url="https://search-api.example.com/sse",
env={"API_KEY": os.getenv("SEARCH_API_KEY")}
)
# Database server
.with_mcp_server(
command="python",
args=["servers/db_server.py"],
env={"DB_URL": os.getenv("DATABASE_URL")}
)
# Academic API server
.with_mcp_server_sse(
url="https://academic-api.example.com/mcp",
env={"AUTH_TOKEN": os.getenv("ACADEMIC_API_TOKEN")}
)
.with_prompt("""You are a research assistant with access to:
- File system operations (file_*)
- Web search capabilities (web_search, web_fetch)
- Database queries (db_query, db_insert)
- Academic paper search (academic_search, academic_fetch)
When conducting research:
1. Search academic databases first
2. Verify with web search
3. Store findings in database
4. Save reports to files
Always cite sources and maintain data integrity.
""")
.build()
)
# Use the team
result = research_team({
"messages": [HumanMessage(
content="Research recent advances in transformer architectures and save a report"
)]
})
Pattern 2: RL-Optimized Tool Selection
from azcore.rl import RLManager
from azcore.agents import MCPTeamBuilder
# Initialize RL manager
rl_manager = RLManager(
state_dim=256,
action_dim=20, # Number of available tools
learning_rate=0.0001,
gamma=0.95,
epsilon_start=0.3,
epsilon_end=0.05,
epsilon_decay=0.995
)
# Build RL-enabled team
rl_team = (MCPTeamBuilder("rl_optimized_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["servers/multi_tool_server.py"])
.with_rl(rl_manager)
.with_prompt("You are an AI assistant optimizing tool usage through reinforcement learning.")
.build()
)
# Training loop
training_tasks = load_training_tasks() # Your task dataset
for epoch in range(10):
for task in training_tasks:
result = rl_team({
"messages": [HumanMessage(content=task["prompt"])]
})
# Calculate reward based on task success
reward = calculate_reward(result, task["expected_outcome"])
# Update RL manager
rl_manager.update(reward)
# Evaluate
avg_reward = rl_manager.get_average_reward()
print(f"Epoch {epoch}, Avg Reward: {avg_reward}")
# Save trained model
rl_manager.save("models/rl_team_trained.pkl")
Pattern 3: Dynamic Tool Loading
class DynamicToolTeam:
"""Team that loads tools dynamically based on task requirements."""
def __init__(self):
self.base_team = (MCPTeamBuilder("dynamic_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_manual_tools() # Manual tool management
.with_prompt("You are an adaptive AI assistant.")
.build()
)
self.server_configs = {
"files": {
"command": "python",
"args": ["servers/file_server.py"]
},
"database": {
"command": "python",
"args": ["servers/db_server.py"]
},
"api": {
"url": "http://api.example.com/sse"
}
}
def execute(self, task: str, required_capabilities: List[str]):
"""Execute task with dynamically loaded tools."""
# Load required servers
for capability in required_capabilities:
if capability in self.server_configs:
config = self.server_configs[capability]
if "command" in config:
self.base_team.add_mcp_server(
command=config["command"],
args=config["args"]
)
elif "url" in config:
self.base_team.add_mcp_server_sse(
url=config["url"]
)
# Fetch and add tools
tools = self.base_team.fetch_mcp_tools()
self.base_team.add_tools(tools)
# Execute task
result = self.base_team({
"messages": [HumanMessage(content=task)]
})
return result
# Usage
dynamic_team = DynamicToolTeam()
# Task 1: File operations only
result1 = dynamic_team.execute(
task="List all Python files in /src",
required_capabilities=["files"]
)
# Task 2: Database and API access
result2 = dynamic_team.execute(
task="Query user data and fetch external profile",
required_capabilities=["database", "api"]
)
Pattern 4: Fallback and Retry Logic
from typing import Optional
import time
class ResilientMCPTeam:
"""MCP team with fallback and retry logic."""
def __init__(self, primary_server: str, fallback_server: Optional[str] = None):
self.primary_team = self._build_team(primary_server)
self.fallback_team = (
self._build_team(fallback_server)
if fallback_server else None
)
def _build_team(self, server_path: str):
return (MCPTeamBuilder(f"team_{server_path}")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", [server_path])
.with_prompt("You are a resilient AI assistant.")
.build()
)
def execute_with_retry(
self,
task: str,
max_retries: int = 3,
use_fallback: bool = True
):
"""Execute task with retry and fallback logic."""
last_error = None
# Try primary server with retries
for attempt in range(max_retries):
try:
result = self.primary_team({
"messages": [HumanMessage(content=task)]
})
return {
"status": "success",
"result": result,
"server": "primary",
"attempt": attempt + 1
}
except Exception as e:
last_error = e
print(f"Primary attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
# Try fallback server if available
if use_fallback and self.fallback_team:
try:
result = self.fallback_team({
"messages": [HumanMessage(content=task)]
})
return {
"status": "success",
"result": result,
"server": "fallback",
"attempt": 1
}
except Exception as e:
last_error = e
print(f"Fallback failed: {e}")
# All attempts failed
return {
"status": "failure",
"error": str(last_error),
"task": task
}
# Usage
resilient_team = ResilientMCPTeam(
primary_server="servers/primary_server.py",
fallback_server="servers/fallback_server.py"
)
result = resilient_team.execute_with_retry(
task="Process critical data",
max_retries=3,
use_fallback=True
)
if result["status"] == "success":
print(f"Task completed using {result['server']} server")
else:
print(f"Task failed: {result['error']}")
Pattern 5: Tool Filtering and Security
from typing import List, Callable
from langchain.tools import BaseTool
class SecureMCPTeam:
"""MCP team with security-focused tool filtering."""
def __init__(self, user_role: str):
self.user_role = user_role
self.team = (MCPTeamBuilder("secure_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server("python", ["servers/all_tools_server.py"])
.with_manual_tools()
.with_prompt(f"You are an assistant with {user_role} permissions.")
.build()
)
self._setup_filtered_tools()
def _setup_filtered_tools(self):
"""Setup tools based on user role."""
all_tools = self.team.fetch_mcp_tools()
# Define role permissions
role_permissions = {
"admin": lambda tool: True, # All tools
"user": lambda tool: not any(
keyword in tool.name.lower()
for keyword in ["delete", "drop", "admin", "system"]
),
"guest": lambda tool: tool.name.lower() in [
"read_file", "list_files", "search"
]
}
# Filter tools based on role
filter_func = role_permissions.get(
self.user_role,
lambda tool: False # No tools for unknown roles
)
allowed_tools = [tool for tool in all_tools if filter_func(tool)]
# Log allowed tools
print(f"User role '{self.user_role}' has access to:")
for tool in allowed_tools:
print(f" - {tool.name}")
# Add filtered tools to team
self.team.add_tools(allowed_tools)
def execute(self, task: str):
"""Execute task with role-based tool access."""
return self.team({
"messages": [HumanMessage(content=task)]
})
# Usage
admin_team = SecureMCPTeam(user_role="admin")
user_team = SecureMCPTeam(user_role="user")
guest_team = SecureMCPTeam(user_role="guest")
# Admin can delete files
admin_result = admin_team.execute("Delete temporary files")
# User cannot delete files
user_result = user_team.execute("Delete temporary files")
# Will fail: "delete" tools not available
# Guest can only read
guest_result = guest_team.execute("Read the documentation")
Error Handling
Common Errors and Solutions
1. MCP Import Error
try:
team = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server.py"])
.build()
)
except ImportError as e:
print("MCP dependencies not installed")
print("Install with: pip install langchain-mcp-adapters mcp")
2. Server Connection Timeout
try:
team = (MCPTeamBuilder("team")
.with_mcp_server(
command="python",
args=["server.py"],
timeout=30 # Increase timeout
)
.build()
)
except TimeoutError as e:
print(f"Server connection timed out: {e}")
print("Try increasing timeout or check server startup")
3. Missing Configuration
try:
team = MCPTeamBuilder("team").build() # Missing LLM!
except ValueError as e:
print(f"Configuration error: {e}")
# Add required configuration
team = (MCPTeamBuilder("team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_prompt("You are an assistant")
.build()
)
4. Tool Discovery Failure
team = (MCPTeamBuilder("team")
.with_mcp_server("python", ["server.py"])
.with_manual_tools()
.build()
)
try:
tools = team.fetch_mcp_tools()
except Exception as e:
print(f"Tool discovery failed: {e}")
# Fallback to custom tools
from langchain.tools import ShellTool
team.add_tools([ShellTool()])
5. Server Crash During Execution
def safe_execute(team, task: str, max_retries: int = 3):
"""Execute with retry on server errors."""
for attempt in range(max_retries):
try:
result = team({
"messages": [HumanMessage(content=task)]
})
return result
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying...")
time.sleep(2)
else:
print(f"All attempts failed: {e}")
raise
# Usage
result = safe_execute(team, "Process data")
Graceful Degradation
When MCP dependencies are unavailable:
try:
from langchain_mcp_adapters import MultiServerMCPClient
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
if MCP_AVAILABLE:
team = (MCPTeamBuilder("team")
.with_llm(llm)
.with_mcp_server("python", ["server.py"])
.build()
)
else:
# Fallback to standard tools
from langchain.tools import ShellTool
team = (MCPTeamBuilder("team")
.with_llm(llm)
.with_tools([ShellTool()])
.with_prompt("You are an assistant with basic tools")
.build()
)
Complete Examples
Example 1: Research Assistant with Multiple Data Sources
import os
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
# Create comprehensive research assistant
research_assistant = (MCPTeamBuilder("research_assistant")
.with_llm(ChatOpenAI(
model="gpt-4o",
temperature=0.3
))
# Local file operations
.with_mcp_server(
command="python",
args=["servers/file_operations_server.py"],
env={"WORKSPACE": "/data/research"}
)
# Academic database
.with_mcp_server_sse(
url="https://academic-db.example.com/mcp",
env={"API_KEY": os.getenv("ACADEMIC_API_KEY")},
timeout=45
)
# Web search
.with_mcp_server_sse(
url="https://search.example.com/mcp",
env={"API_KEY": os.getenv("SEARCH_API_KEY")}
)
.with_prompt("""You are a research assistant with access to:
1. **File Operations** (file_read, file_write, file_list):
- Read and write research documents
- Organize findings in structured files
2. **Academic Database** (academic_search, paper_fetch, citation_get):
- Search peer-reviewed papers
- Fetch full paper content
- Get citation information
3. **Web Search** (web_search, web_fetch):
- Search the general web for supplementary information
- Fetch web page content
**Research Process:**
1. Start with academic database search for authoritative sources
2. Use web search for recent developments and practical applications
3. Organize findings in files with proper citations
4. Synthesize information into clear, well-documented reports
**Citation Format:**
Always cite sources using: [AuthorYear] format and include full references.
""")
.build()
)
# Execute research task
research_task = """
Research the latest developments in large language model fine-tuning techniques.
Focus on:
1. Parameter-efficient methods (LoRA, adapters)
2. Instruction tuning approaches
3. Performance comparisons
Create a comprehensive report with:
- Executive summary
- Detailed findings for each topic
- Citations for all sources
- Recommendations for practitioners
Save the report to 'llm_finetuning_report.md'
"""
result = research_assistant({
"messages": [HumanMessage(content=research_task)]
})
print(result["output"])
Example 2: Data Analysis Pipeline
from azcore.agents import MCPTeamBuilder
from azcore.rl import RLManager
from langchain_openai import ChatOpenAI
# Create RL-optimized data analysis team
rl_manager = RLManager(
state_dim=256,
action_dim=15,
learning_rate=0.0001,
gamma=0.95
)
data_analyst = (MCPTeamBuilder("data_analyst")
.with_llm(ChatOpenAI(
model="gpt-4o-mini",
temperature=0.0 # Deterministic for analysis
))
# Data processing server
.with_mcp_server(
command="python",
args=["servers/data_processing_server.py"],
env={
"DATA_PATH": "/data/datasets",
"CACHE_DIR": "/tmp/cache"
}
)
# Statistical analysis server
.with_mcp_server(
command="python",
args=["servers/statistics_server.py"]
)
# Visualization server
.with_mcp_server(
command="python",
args=["servers/visualization_server.py"],
env={"OUTPUT_DIR": "/data/plots"}
)
.with_rl(rl_manager)
.with_prompt("""You are a data analysis specialist with access to:
**Data Processing Tools:**
- load_csv, load_json, load_parquet
- clean_data, transform_data, aggregate_data
**Statistical Analysis Tools:**
- descriptive_stats, correlation_analysis, hypothesis_test
- regression_analysis, time_series_analysis
**Visualization Tools:**
- create_histogram, create_scatter, create_line_plot
- create_heatmap, create_box_plot
**Analysis Process:**
1. Load and inspect data
2. Clean and preprocess
3. Perform exploratory data analysis
4. Apply statistical tests
5. Create visualizations
6. Document findings
Always verify data quality before analysis and explain your methodology.
""")
.build()
)
# Analysis task
analysis_task = """
Analyze the sales_data.csv file:
1. Load and clean the data
2. Calculate descriptive statistics
3. Analyze sales trends over time
4. Identify correlations between variables
5. Create visualizations for key findings
6. Summarize insights and recommendations
"""
result = data_analyst({
"messages": [HumanMessage(content=analysis_task)]
})
print(result["output"])
Example 3: Multi-User Secure System
from azcore.agents import MCPTeamBuilder
from langchain_openai import ChatOpenAI
from typing import Dict, List
import logging
class SecureMultiUserSystem:
"""Multi-user system with role-based access control."""
def __init__(self):
self.teams: Dict[str, any] = {}
self._setup_logging()
self._create_role_teams()
def _setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='mcp_system.log'
)
self.logger = logging.getLogger(__name__)
def _create_role_teams(self):
"""Create teams for different user roles."""
# Admin team - full access
self.teams["admin"] = (MCPTeamBuilder("admin_team")
.with_llm(ChatOpenAI(model="gpt-4o"))
.with_mcp_server(
command="python",
args=["servers/admin_server.py"],
env={"ADMIN_KEY": os.getenv("ADMIN_KEY")}
)
.with_prompt("You are an admin assistant with full system access.")
.build()
)
# User team - limited access
self.teams["user"] = (MCPTeamBuilder("user_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server(
command="python",
args=["servers/user_server.py"]
)
.with_prompt("You are a user assistant with standard permissions.")
.build()
)
# Guest team - read-only
self.teams["guest"] = (MCPTeamBuilder("guest_team")
.with_llm(ChatOpenAI(model="gpt-4o-mini"))
.with_mcp_server(
command="python",
args=["servers/guest_server.py"]
)
.with_prompt("You are a guest assistant with read-only access.")
.build()
)
def execute_task(self, user_id: str, role: str, task: str) -> Dict:
"""Execute task with role-based team."""
self.logger.info(f"User {user_id} ({role}) executing: {task}")
if role not in self.teams:
self.logger.warning(f"Invalid role: {role}")
return {
"status": "error",
"message": f"Invalid role: {role}"
}
try:
team = self.teams[role]
result = team({
"messages": [HumanMessage(content=task)]
})
self.logger.info(f"Task completed successfully for user {user_id}")
return {
"status": "success",
"user_id": user_id,
"role": role,
"result": result["output"]
}
except Exception as e:
self.logger.error(f"Task failed for user {user_id}: {e}")
return {
"status": "error",
"user_id": user_id,
"role": role,
"error": str(e)
}
def get_team_capabilities(self, role: str) -> List[str]:
"""Get available tools for a role."""
if role not in self.teams:
return []
team = self.teams[role]
tools = team.fetch_mcp_tools()
return [tool.name for tool in tools]
# Usage
system = SecureMultiUserSystem()
# Admin user
admin_result = system.execute_task(
user_id="admin001",
role="admin",
task="Delete old log files and optimize database"
)
# Regular user
user_result = system.execute_task(
user_id="user042",
role="user",
task="Generate my monthly report"
)
# Guest user
guest_result = system.execute_task(
user_id="guest123",
role="guest",
task="View the public documentation"
)
# Check capabilities
print("Admin capabilities:", system.get_team_capabilities("admin"))
print("User capabilities:", system.get_team_capabilities("user"))
print("Guest capabilities:", system.get_team_capabilities("guest"))
Summary
The MCPTeamBuilder provides a comprehensive fluent API for creating MCP-enabled agent teams with:
- Flexible configuration through method chaining
- Multiple transport types (STDIO and SSE)
- Multi-server support for combining capabilities
- Automatic tool discovery with manual override option
- RL integration for optimized tool selection
- Robust error handling and graceful degradation
- Security features through tool filtering
- Production-ready patterns and best practices