High-Level Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ Azcore │
│ Hierarchical Multi-Agent System │
│ │
└─────────────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌─────────▼────────┐ ┌─────▼──────┐ ┌───────▼────────┐
│ Orchestration │ │ Agent │ │ Integration │
│ Layer │ │ Layer │ │ Layer │
└─────────┬────────┘ └─────┬──────┘ └───────┬────────┘
│ │ │
┌─────────▼────────┐ ┌─────▼──────┐ ┌───────▼────────┐
│ - GraphOrch │ │ - Agents │ │ - MCP │
│ - Coordinator │ │ - Teams │ │ - External │
│ - Planner │ │ - Tools │ │ - APIs │
│ - Supervisor │ │ - Patterns │ │ - Services │
└──────────────────┘ └────────────┘ └────────────────┘
│ │ │
└─────────────────┼─────────────────┘
│
┌─────────────────▼─────────────────┐
│ │
│ Foundation Layer │
│ │
│ ┌──────────────────────────────┐ │
│ │ Workflows │ │
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ LLM + Tools | |
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ State Management │ │
│ └──────────────────────────────┘ │
│ │
└────────────────────────────────────┘
│
┌─────────────────▼─────────────────┐
│ │
│ Optional Enhancement Layer │
│ │
│ ┌──────────────────────────────┐ │
│ │ Reinforcement Learning │ │
│ │ (Q-learning, Tool Selection)│ │
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ Caching Layer │ │
│ │ (LLM + Embedding Cache) │ │
│ └──────────────────────────────┘ │
│ ┌──────────────────────────────┐ │
│ │ Monitoring & Observability │ │
│ │ (Logging, Metrics, Tracing) │ │
│ └──────────────────────────────┘ │
│ │
└────────────────────────────────────┘
Layered Architecture
Layer 1: Foundation Layer (What Arc is built on)
- LangGraph: Workflow orchestration, state management
- LangChain: LLM communication, tool integration
- Python 3.12+: Runtime environment
Layer 2: Core Layer (Azcore's main components)
- GraphOrchestrator: Workflow compilation and execution
- Nodes: Coordinator, Planner, Generator
- Supervisor: Routing and delegation
- State: Enhanced state management
Layer 3: Agent Layer (Agent implementations)
- AgentFactory: Agent creation and configuration
- ReactAgent: ReAct-style agents
- TeamBuilder: Multi-agent teams
- Advanced Patterns: Self-consistency, Reflexion, etc.
Layer 4: Integration Layer (External connections)
- MCP Integration: Model Context Protocol
- Tool System: Custom tool integration
- External APIs: Third-party services
Layer 5: Enhancement Layer (Optional optimizations)
- RL System: Q-learning for tool selection
- Caching: Response and embedding caching
- Monitoring: Logging, metrics, tracing
Layer 6: Workflow Layer (Orchestration patterns)
- Sequential: Linear execution
- Concurrent: Parallel execution
- Hierarchical: Director-worker coordination
- Mixture: Expert synthesis
- Forest: Dynamic tree selection
- Heavy: Five-phase comprehensive analysis
- Group Chat: Multi-agent conversation
- Swarm Router: Automatic pattern selection
Design Principles
1. Hierarchical Organization
Principle: Complex systems are managed through clear hierarchies with delegation.
Implementation:
User Request
↓
Coordinator (Triage)
↓
Planner (Decomposition)
↓
Supervisor (Routing)
↓
Teams (Execution)
↓
Agents (Actions)
↓
Tools (Operations)
Benefits:
- Clear responsibility boundaries
- Easy to understand and debug
- Scalable architecture
- Modular composition
Example:
# Hierarchical workflow
orchestrator = GraphOrchestrator()
# Level 1: Coordination
orchestrator.add_node("coordinator", CoordinatorNode(llm))
# Level 2: Planning
orchestrator.add_node("planner", PlannerNode(llm))
# Level 3: Supervision
orchestrator.set_supervisor(Supervisor(llm, team_names))
# Level 4: Teams (contains agents)
orchestrator.add_team(research_team)
orchestrator.add_team(analysis_team)
# Level 5: Agents (contains tools)
# Defined within teams
2. Separation of Concerns
Principle: Each component has a single, well-defined responsibility.
Implementation:
- Coordinator: User interaction and triage
- Planner: Task decomposition
- Supervisor: Routing decisions
- Teams: Domain specialization
- Agents: Task execution
- Tools: Specific operations
Anti-Pattern:
# Bad: Everything in one place
def do_everything(task):
triage(task)
plan(task)
route(task)
execute(task)
generate_response(task)
Good Pattern:
# Good: Clear separation
coordinator = CoordinatorNode(llm) # Triage
planner = PlannerNode(llm) # Planning
supervisor = Supervisor(llm) # Routing
team = TeamBuilder() # Execution
3. Composability
Principle: Components can be combined in flexible ways to create complex behaviors.
Implementation:
# Compose agents into teams
team = (TeamBuilder("research_team")
.with_llm(llm)
.with_tools([search, analyze])
.build())
# Compose teams into workflows
workflow = SequentialWorkflow(
agents=[agent1, agent2, agent3],
llm=llm
)
# Compose workflows into systems
router = SwarmRouter(llm)
router.register_workflow("sequential", seq_wf)
router.register_workflow("concurrent", conc_wf)
4. State Immutability (Functional Style)
Principle: State updates create new states rather than mutating existing ones.
Implementation:
# LangGraph Command pattern
return Command(
update={"messages": [new_message]}, # New state
goto="next_node"
)
# State accumulation
State = {
'messages': Annotated[List[BaseMessage], operator.add] # Accumulate
}
Benefits:
- No side effects
- Easy to debug (state history preserved)
- Supports time travel debugging
- Concurrent execution safe
5. Fail-Safe Defaults
Principle: System should degrade gracefully when components fail.
Implementation:
# Graceful degradation
try:
result = agent.invoke(state)
except Exception as e:
logger.error(f"Agent failed: {e}")
return fallback_response
# Optional RL
if rl_enabled and rl_manager:
tools = rl_manager.select_tools(query)
else:
tools = all_tools # Fallback to all tools
# Skip failed MCP servers
builder = (MCPTeamBuilder("team")
.with_mcp_server(server_config)
.skip_failed_servers(True)) # Continue if server fails
6. Progressive Enhancement
Principle: Core functionality works without optional features; enhancements improve experience.
Implementation:
# Core: Works without RL
agent = factory.create_react_agent(
name="agent",
tools=tools,
llm=llm
)
# Enhanced: With RL (optional)
agent = factory.create_react_agent(
name="agent",
tools=tools,
llm=llm,
rl_enabled=True, # Enhancement
rl_manager=rl_manager,
reward_calculator=reward_calc
)
# Core: Works without caching
llm = ChatOpenAI(model="gpt-4o-mini")
# Enhanced: With caching (optional)
agent = factory.create_react_agent(
name="agent",
tools=tools,
llm=llm,
enable_caching=True, # Enhancement
cache_type="semantic"
)
7. Observable by Default
Principle: System behavior should be visible and debuggable.
Implementation:
# Automatic logging
logger = logging.getLogger(__name__)
logger.info(f"Agent {name} invoked with query: {query}")
# State tracking
state["metadata"]["tool_calls"] += 1
state["metadata"]["execution_time"] = duration
# Message history preserved
state["messages"] # Full conversation history
# LangGraph visualization
graph.get_graph().draw_mermaid()
8. Convention over Configuration
Principle: Sensible defaults with ability to customize when needed.
Implementation:
# Default configuration (works immediately)
agent = factory.create_react_agent(
name="agent",
tools=tools
)
# Custom configuration (when needed)
agent = factory.create_react_agent(
name="agent",
tools=tools,
llm=custom_llm,
prompt=custom_prompt,
enable_caching=True,
cache_type="semantic",
rl_enabled=True,
max_iterations=50
)
System Architecture
Complete System Diagram
┌─────────────────────────────────────────────────────────────────────────────────┐
│ │
│ USER APPLICATION │
│ (Your code using Azcore) │
│ │
└────────────────────────────────────┬────────────────────────────────────────────┘
│
│ API Calls
│
┌────────────────────────────────────▼────────────────────────────────────────────┐
│ │
│ Azcore API LAYER │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ AgentFactory │ │ TeamBuilder │ │ MCPTeamBuilder │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └─────────────────────┼─────────────────────┘ │
│ │ │
└─────────────────────────────────┼───────────────────────────────────────────────┘
│
│ Creates
│
┌─────────────────────────────────▼───────────────────────────────────────────────┐
│ │
│ ORCHESTRATION LAYER │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ GraphOrchestrator │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Coordinator │→ │ Planner │→ │ Supervisor │ │ │
│ │ │ Node │ │ Node │ │ (Routing) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────┼────────────┐ │ │
│ │ │ │ │ │ │ │
│ │ ┌──────▼─────┐ ┌─────▼──────┐ ┌──────▼─────┐ ┌──▼──────┐ │ │
│ │ │ Team 1 │ │ Team 2 │ ... │ Team N │ │Response │ │ │
│ │ │(Agents) │ │ (Agents) │ │ (Agents) │ │Generator│ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └─────────┘ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────┬───────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌─────────────────────▼────┐ ┌───────▼──────┐ ┌────▼────────────────┐
│ │ │ │ │ │
│ AGENT LAYER │ │ STATE LAYER │ │ INTEGRATION LAYER │
│ │ │ │ │ │
│ ┌────────────────────┐ │ │ ┌──────────┐ │ │ ┌─────────────────┐ │
│ │ ReactAgent │ │ │ │ State │ │ │ │ MCP Toolkit │ │
│ │ (ReAct Loop) │ │ │ │ Manager │ │ │ │ - STDIO │ │
│ └─────────┬──────────┘ │ │ └──────────┘ │ │ │ - SSE │ │
│ │ │ │ │ │ └─────────────────┘ │
│ ┌─────────▼──────────┐ │ │ ┌──────────┐ │ │ │
│ │ Advanced Patterns │ │ │ │Messages │ │ │ ┌─────────────────┐ │
│ │ - Self-Consist │ │ │ │Context │ │ │ │ External APIs │ │
│ │ - Reflexion │ │ │ │Metadata │ │ │ │ - REST │ │
│ │ - Reasoning Duo │ │ │ └──────────┘ │ │ │ - GraphQL │ │
│ └────────────────────┘ │ │ │ │ └─────────────────┘ │
│ │ │ │ │ │
└──────────────────────────┘ └──────────────┘ └─────────────────────┘
│ │ │
└───────────────┼───────────────┘
│
┌─────────────────────────────────────▼───────────────────────────────────────────┐
│ │
│ FOUNDATION LAYER │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ LangGraph │ │
│ │ - StateGraph compilation │ │
│ │ - Workflow execution │ │
│ │ - Command routing │ │
│ │ - Checkpointing │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ LangChain │ │
│ │ - LLM communication │ │
│ │ - Tool calling │ │
│ │ - Message formatting │ │
│ │ - Prompt management │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────┬───────────────────────────────────────────┘
│
┌─────────────────────────────────────▼───────────────────────────────────────────┐
│ │
│ ENHANCEMENT LAYER (Optional) │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ RL System │ │ Caching Layer │ │ Monitoring │ │
│ │ - Q-learning │ │ - LLM Cache │ │ - Logging │ │
│ │ - Tool Select │ │ - Embed Cache │ │ - Metrics │ │
│ │ - Rewards │ │ - Semantic │ │ - Tracing │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
Component Interaction Matrix
| Component | Interacts With | Purpose | Protocol |
|---|---|---|---|
| GraphOrchestrator | All Nodes, Teams | Compilation, Execution | LangGraph Commands |
| CoordinatorNode | Planner, User | Triage | State updates |
| PlannerNode | Supervisor | Task decomposition | JSON plan |
| Supervisor | Teams, Generator | Routing | LLM decisions |
| Teams | Agents | Task delegation | State passing |
| Agents | Tools, LLM | Task execution | Tool calls |
| ReactAgent | LangGraph | ReAct loop | Internal graph |
| StateManager | All components | State operations | Dictionary ops |
| RLManager | Agents | Tool selection | Q-values |
| MCPToolkit | External servers | Tool integration | MCP protocol |
Component Deep Dive
1. GraphOrchestrator
Purpose: Central workflow compiler and executor
Architecture:
class GraphOrchestrator:
"""
Compiles and manages multi-agent workflow graphs.
Internal Structure:
┌─────────────────────────────────┐
│ GraphOrchestrator │
│ │
│ _nodes: Dict[str, Callable] │ ← Node registry
│ _teams: List[BaseTeam] │ ← Team registry
│ _supervisor: Supervisor │ ← Routing logic
│ _edges: List[Tuple] │ ← Graph edges
│ _entry_point: str │ ← Start node
│ _graph: StateGraph │ ← Compiled graph
│ │
│ Methods: │
│ - add_node() │
│ - add_team() │
│ - set_supervisor() │
│ - add_edge() │
│ - compile() │
│ - validate_graph() │
└─────────────────────────────────┘
"""
Compilation Process:
1. Node Registration Phase
├── Validate node names (unique)
├── Store node callables
└── Track dependencies
2. Edge Definition Phase
├── Define connections between nodes
├── Validate node existence
└── Check for cycles (if enabled)
3. Supervisor Integration Phase
├── Add supervisor node
├── Connect to teams
└── Configure routing logic
4. Graph Compilation Phase (LangGraph)
├── Create StateGraph
├── Add all nodes
├── Add all edges
├── Set entry point
└── Compile to executable
5. Validation Phase
├── Check graph connectivity
├── Verify all nodes reachable
├── Validate state schema
└── Ensure no orphaned nodes
Usage Example:
# Create orchestrator
orchestrator = GraphOrchestrator(
state_class=State,
checkpointer=MemorySaver(),
max_iterations=20,
enable_cycle_detection=True
)
# Add nodes
orchestrator.add_node("coordinator", coordinator_node)
orchestrator.add_node("planner", planner_node)
# Add teams
orchestrator.add_team(research_team)
orchestrator.add_team(analysis_team)
# Set supervisor
orchestrator.set_supervisor(Supervisor(llm, ["research_team", "analysis_team"]))
# Add edges
orchestrator.add_edge("coordinator", "planner")
orchestrator.add_edge("planner", "supervisor")
# Set entry point
orchestrator.set_entry_point("coordinator")
# Compile
graph = orchestrator.compile()
# Execute
result = graph.invoke({"messages": [("user", "Query")]})
Key Responsibilities:
- Node Management: Register and organize workflow nodes
- Team Integration: Incorporate multi-agent teams
- Graph Construction: Build LangGraph workflows
- Validation: Ensure graph correctness
- Compilation: Create executable workflows
- Execution Management: Control workflow execution
2. Nodes (Coordinator, Planner, Generator)
Node Architecture:
┌────────────────────────────────────────────────────────┐
│ BaseNode (Abstract) │
│ │
│ Properties: │
│ - name: str │
│ - description: str │
│ - _logger: Logger │
│ │
│ Methods: │
│ - execute(state) → Command (abstract) │
│ - __call__(state) → Command │
└────────────────────┬───────────────────────────────────┘
│
│ Inherits
│
┌────────────┴────────────┬──────────────────────┐
│ │ │
┌───────▼────────┐ ┌───────────▼──────┐ ┌─────────▼────────┐
│ CoordinatorNode│ │ PlannerNode │ │ResponseGenerator │
│ │ │ │ │ Node │
│ - Triage │ │ - Decomposition │ │ │
│ - Simple tasks │ │ - Plan creation │ │ - Synthesis │
│ - Handoff │ │ - Team assign │ │ - Formatting │
└────────────────┘ └──────────────────┘ └──────────────────┘
CoordinatorNode
Purpose: Front-line triage and simple query handling
Decision Tree:
User Query
↓
Is it a simple greeting/query?
├─ Yes → Respond directly → END
└─ No → Complex task?
├─ Yes → Handoff to Planner → planner
└─ No → Try to handle → response or planner
Implementation Details:
class CoordinatorNode(BaseNode):
"""
Front-line coordinator for user interaction.
Responsibilities:
1. Receive user input
2. Determine complexity
3. Handle simple queries directly
4. Route complex tasks to planner
Decision Logic:
- Simple greetings → Direct response
- FAQ-style questions → Direct response
- Complex tasks → Handoff to planner
- Ambiguous queries → Ask for clarification
"""
def __init__(
self,
llm: BaseChatModel,
system_prompt: Optional[str] = None,
handoff_keyword: str = "handoff_to_planner"
):
super().__init__("coordinator", "Front-line coordinator")
self.llm = llm
self.handoff_keyword = handoff_keyword
self.system_prompt = system_prompt or self._default_prompt()
def execute(self, state: Dict[str, Any]) -> Command:
"""
Execute coordinator logic.
Process:
1. Extract latest user message
2. Analyze complexity
3. Decide: respond or handoff
4. Return Command with routing
"""
# Extract user message
messages = state["messages"]
user_message = messages[-1].content
# Call LLM with instructions
response = self.llm.invoke([
SystemMessage(content=self.system_prompt),
HumanMessage(content=user_message)
])
# Check for handoff keyword
if self.handoff_keyword in response.content.lower():
# Complex task → Route to planner
return Command(
update={"messages": [response]},
goto="planner"
)
else:
# Simple task → Direct response → END
return Command(
update={"messages": [response]},
goto=END
)
System Prompt:
You are a coordinator agent. Your role is to triage user requests.
For SIMPLE requests (greetings, simple questions):
- Respond directly
- Be helpful and friendly
- Keep responses brief
For COMPLEX requests (research, analysis, multi-step tasks):
- Respond with: "{handoff_keyword}"
- The planner will handle the detailed work
Examples:
- "Hello" → Respond directly
- "What time is it?" → Respond directly
- "Research AI trends and create a report" → handoff_to_planner
- "Analyze data and provide recommendations" → handoff_to_planner
PlannerNode
Purpose: Task decomposition and execution planning
Planning Process:
Complex Task Input
↓
1. Analyze Task
├── Identify sub-tasks
├── Determine dependencies
└── Estimate complexity
2. Decompose into Steps
├── Step 1: Research
├── Step 2: Analysis
├── Step 3: Synthesis
└── Step N: Presentation
3. Assign Teams
├── Match steps to team capabilities
├── Consider team specializations
└── Optimize for efficiency
4. Create Execution Plan (JSON)
{
"task": "High-level description",
"steps": [
{
"step": 1,
"description": "...",
"assigned_team": "research_team",
"dependencies": [],
"tool_selection": ["search", "scrape"],
"expected_output": "..."
}
],
"success_criteria": "..."
}
5. Store in State
state["full_plan"] = plan
6. Route to Supervisor
goto="supervisor"
JSON Plan Schema:
{
"task": "Create comprehensive market analysis",
"steps": [
{
"step": 1,
"description": "Research market trends",
"assigned_team": "research_team",
"dependencies": [],
"tool_selection": ["search_web", "scrape_data"],
"expected_output": "Market trend data and statistics",
"estimated_duration": "2-3 minutes",
"priority": "high"
},
{
"step": 2,
"description": "Analyze competitive landscape",
"assigned_team": "analysis_team",
"dependencies": [1],
"tool_selection": ["analyze_data", "compare"],
"expected_output": "Competitive analysis report",
"estimated_duration": "3-4 minutes",
"priority": "high"
},
{
"step": 3,
"description": "Generate recommendations",
"assigned_team": "synthesis_team",
"dependencies": [1, 2],
"tool_selection": ["synthesize", "format_report"],
"expected_output": "Strategic recommendations",
"estimated_duration": "2 minutes",
"priority": "medium"
}
],
"success_criteria": "Complete market analysis with trends, competition, and recommendations",
"total_estimated_duration": "7-9 minutes",
"fallback_strategy": "If research fails, provide general market insights"
}
ResponseGeneratorNode
Purpose: Final response synthesis and formatting
Generation Process:
All Team Outputs
↓
1. Collect Context
├── Full message history
├── Team outputs
├── Original user query
└── Execution plan
2. Analyze Quality
├── Check completeness
├── Verify accuracy
├── Assess coherence
└── Identify gaps
3. Synthesize Response
├── Combine information
├── Remove redundancy
├── Organize logically
└── Add insights
4. Format Output
├── Structure with sections
├── Add headings
├── Include citations
└── Ensure readability
5. Quality Check
├── Verify answers user query
├── Check professionalism
├── Ensure completeness
└── Validate format
6. Return Final Response
goto=END
Quality Standards:
- Accuracy: All facts correct and cited
- Completeness: Addresses all aspects of query
- Clarity: Easy to understand
- Structure: Well-organized with sections
- Relevance: On-topic and useful
- Actionability: Provides next steps when appropriate
3. Supervisor
Purpose: Intelligent routing between teams and workflow control
Architecture:
┌────────────────────────────────────────────────────────┐
│ Supervisor │
│ │
│ Components: │
│ ┌──────────────────────────────────────────────────┐ │
│ │ LLM Brain │ │
│ │ - Analyzes current state │ │
│ │ - Evaluates team capabilities │ │
│ │ - Makes routing decisions │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Member Registry │ │
│ │ - List of available teams │ │
│ │ - Team capabilities │ │
│ │ - Team descriptions │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Routing Logic │ │
│ │ - Parse LLM routing decision │ │
│ │ - Validate team selection │ │
│ │ - Return Command with goto │ │
│ └──────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────┘
Routing Decision Process:
Current State
↓
1. Context Analysis
├── Read message history
├── Check execution plan
├── Review team outputs
└── Identify remaining work
2. Team Capability Assessment
├── List available teams
├── Evaluate team specializations
├── Consider team workload
└── Check dependencies
3. LLM Decision Making
├── Analyze state + capabilities
├── Determine best team for next step
├── Consider: FINISH vs continue
└── Generate routing decision
4. Decision Validation
├── Verify team exists
├── Check circular routing
├── Validate against plan
└── Ensure progress
5. Routing Execution
├── Create Command
├── Set goto=team_name or response_generator
└── Update metadata
Supervisor Types:
Standard Supervisor:
supervisor = Supervisor(
llm=llm,
members=["team1", "team2", "team3"]
)
# Routes to teams or "FINISH"
MainSupervisor (for hierarchical graphs):
main_supervisor = MainSupervisor(
llm=llm,
members=["team1", "team2", "team3"]
)
# Routes to teams or "response_generator"
Routing Prompt Template:
You are a supervisor managing a workflow. You have access to these teams:
Teams:
- research_team: Specializes in information gathering and research
- analysis_team: Specializes in data analysis and insights
- synthesis_team: Specializes in combining information and creating reports
Current State:
{message_history}
Execution Plan:
{plan}
Your task:
1. Analyze what has been done
2. Determine what needs to be done next
3. Select the most appropriate team
Respond with JSON:
{
"next": "team_name" or "response_generator",
"reasoning": "Why this team is best for the next step"
}
4. Teams and Agents
Team Architecture:
┌────────────────────────────────────────────────────────┐
│ Team │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Supervisor (Optional) │ │
│ │ - Routes within team │ │
│ └──────────────────┬───────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ┌──▼────┐ ┌───▼───┐ ┌───▼───┐ │
│ │Agent 1│ │Agent 2│ │Agent 3│ │
│ │ │ │ │ │ │ │
│ │Tools: │ │Tools: │ │Tools: │ │
│ │[a,b,c]│ │[d,e,f]│ │[g,h,i]│ │
│ └───────┘ └───────┘ └───────┘ │
│ │
│ Configuration: │
│ - Team name and description │
│ - Shared LLM │
│ - Shared tools (optional) │
│ - Team-level configuration │
└────────────────────────────────────────────────────────┘
Team Building:
# Using TeamBuilder
team = (TeamBuilder("research_team")
.with_llm(llm)
.with_tools([search, scrape, analyze])
.with_prompt("You are a research specialist...")
.with_description("Research and information gathering")
.build())
# Using MCPTeamBuilder (with external tools)
mcp_team = (MCPTeamBuilder("external_team")
.with_llm(llm)
.with_tools([local_tool1, local_tool2])
.with_mcp_server("python", ["mcp_server.py"])
.with_mcp_server(url="http://remote:8000/sse", transport="sse")
.skip_failed_servers(True)
.build())
Agent Architecture (ReactAgent):
┌────────────────────────────────────────────────────────┐
│ ReactAgent │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Configuration │ │
│ │ - name, description │ │
│ │ - llm (brain) │ │
│ │ - tools (hands) │ │
│ │ - system prompt (personality) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Internal ReAct Graph (LangGraph) │ │
│ │ │ │
│ │ User Input │ │
│ │ ↓ │ │
│ │ Reasoning (LLM) │ │
│ │ ↓ │ │
│ │ Tool Selection │ │
│ │ ↓ │ │
│ │ Tool Execution │ │
│ │ ↓ │ │
│ │ Observation │ │
│ │ ↓ │ │
│ │ Loop until done │ │
│ │ ↓ │ │
│ │ Final Response │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Optional Enhancements │ │
│ │ - RL-based tool selection │ │
│ │ - Response caching │ │
│ │ - Custom checkpointing │ │
│ └──────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────┘
Agent Creation:
factory = AgentFactory(default_llm=llm)
agent = factory.create_react_agent(
name="research_agent",
tools=[search, analyze, summarize],
prompt="You are a research specialist...",
description="Handles research tasks",
enable_caching=True,
cache_type="semantic",
rl_enabled=False # Optional RL enhancement
)
5. State Management System
State Architecture:
State = TypedDict('State', {
# Core fields
'messages': Annotated[List[BaseMessage], operator.add], # Accumulate
'next': str, # Mutable
'full_plan': Dict[str, Any], # Mutable
'context': Dict[str, Any], # Mutable
'metadata': Dict[str, Any], # Mutable
'rl_metadata': Dict[str, Any] # Mutable
})
State Evolution Example:
# Initial State
{
"messages": [HumanMessage("Query")],
"next": "",
"full_plan": {},
"context": {},
"metadata": {},
"rl_metadata": {}
}
# After Coordinator
{
"messages": [
HumanMessage("Query"),
AIMessage("Routing to planner")
],
"next": "planner",
"full_plan": {},
"context": {},
"metadata": {"coordinator_decision": "complex_task"},
"rl_metadata": {}
}
# After Planner
{
"messages": [...],
"next": "supervisor",
"full_plan": {
"task": "...",
"steps": [...]
},
"context": {"planning_time": 1.5},
"metadata": {...},
"rl_metadata": {}
}
# After Team Execution
{
"messages": [
...,
ToolMessage("Search results..."),
AIMessage("Analysis complete...")
],
"next": "supervisor",
"full_plan": {...},
"context": {
"planning_time": 1.5,
"research_time": 3.2
},
"metadata": {
...,
"tools_used": ["search_web", "analyze_text"]
},
"rl_metadata": {
"state_key": "hash_123",
"selected_tools": ["search_web"],
"rewards": {"search_web": 0.8}
}
}
StateManager Utilities:
# Create initial state
state = StateManager.create_initial_state(
messages=[("user", "Query")]
)
# Add context
StateManager.add_context(state, "start_time", time.time())
# Add metadata
StateManager.add_metadata(state, "execution_id", uuid.uuid4())
# Add RL metadata
StateManager.add_rl_metadata(
state,
state_key="hash_123",
selected_tools=["search_web"],
rewards={"search_web": 0.8}
)
# Get context
start_time = StateManager.get_context(state, "start_time")
# Validate state
StateManager.validate_state(state) # Raises if invalid
Data Flow Architecture
Request-Response Flow
USER REQUEST
│
│ {"messages": [("user", "Query")]}
▼
┌─────────────────────────────────────────┐
│ GraphOrchestrator.invoke() │
└─────────────┬───────────────────────────┘
│
│ State initialization
▼
┌─────────────────────────────────────────┐
│ CoordinatorNode.execute() │
│ ┌────────────────────────────────────┐ │
│ │ 1. Extract user message │ │
│ │ 2. Call LLM for triage │ │
│ │ 3. Decide: simple or complex │ │
│ │ 4. Return Command(goto=...) │ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ If complex → goto="planner"
▼
┌─────────────────────────────────────────┐
│ PlannerNode.execute() │
│ ┌────────────────────────────────────┐ │
│ │ 1. Analyze task complexity │ │
│ │ 2. Break into steps │ │
│ │ 3. Assign teams to steps │ │
│ │ 4. Create JSON plan │ │
│ │ 5. Store in state["full_plan"] │ │
│ │ 6. Return Command(goto="supervisor")│ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ goto="supervisor"
▼
┌─────────────────────────────────────────┐
│ Supervisor.create_node() │
│ ┌────────────────────────────────────┐ │
│ │ 1. Read state + plan │ │
│ │ 2. Determine next team │ │
│ │ 3. Call LLM for routing │ │
│ │ 4. Parse routing decision │ │
│ │ 5. Return Command(goto=team) │ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ goto="research_team"
▼
┌─────────────────────────────────────────┐
│ Team.build() │
│ ┌────────────────────────────────────┐ │
│ │ Internal agent execution │ │
│ │ ┌──────────────────────────────┐ │ │
│ │ │ Agent 1 (with tools) │ │ │
│ │ │ - ReAct loop │ │ │
│ │ │ - Tool calls │ │ │
│ │ │ - Response generation │ │ │
│ │ └──────────────────────────────┘ │ │
│ │ │ │
│ │ Returns updated state │ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ Back to Supervisor
▼
┌─────────────────────────────────────────┐
│ Supervisor (again) │
│ ┌────────────────────────────────────┐ │
│ │ Check if more work needed │ │
│ │ ├─ Yes → goto=another_team │ │
│ │ └─ No → goto=response_generator │ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ goto="response_generator"
▼
┌─────────────────────────────────────────┐
│ ResponseGeneratorNode.execute() │
│ ┌────────────────────────────────────┐ │
│ │ 1. Collect all outputs │ │
│ │ 2. Synthesize final response │ │
│ │ 3. Format and structure │ │
│ │ 4. Return Command(goto=END) │ │
│ └────────────────────────────────────┘ │
└─────────────┬───────────────────────────┘
│
│ goto=END
▼
┌─────────────────────────────────────────┐
│ Return Final State │
│ { │
│ "messages": [ │
│ HumanMessage("Query"), │
│ ..., │
│ AIMessage("Final Response") │
│ ], │
│ "full_plan": {...}, │
│ "context": {...}, │
│ "metadata": {...} │
│ } │
└─────────────┬───────────────────────────┘
│
▼
USER RECEIVES RESPONSE
Data Transformation Flow
User Input (String)
↓
HumanMessage object
↓
State["messages"] list
↓
LLM Processing (Text → Reasoning)
↓
AIMessage object (with optional tool_calls)
↓
Tool Execution (if needed)
↓
ToolMessage object (results)
↓
Back to LLM (Observation)
↓
Final AIMessage (response)
↓
State["messages"] list (accumulated)
↓
Extract: result["messages"][-1].content
↓
User Output (String)
Execution Architecture
Agent Execution Model (ReAct Pattern)
┌─────────────────────────────────────────────────────────────┐
│ Agent Execution Loop │
│ │
│ Entry: agent.invoke(state) │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 1: Pre-Processing │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Optional: RL Tool Selection │ │ │
│ │ │ if rl_enabled: │ │ │
│ │ │ selected_tools = rl_manager.select_tools() │ │ │
│ │ │ agent.tools = filter(all_tools, selected) │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 2: Reasoning Phase (LLM Call #1) │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Input to LLM: │ │ │
│ │ │ - System prompt │ │ │
│ │ │ - Tool descriptions │ │ │
│ │ │ - Message history │ │ │
│ │ │ │ │ │
│ │ │ LLM Thinks: │ │ │
│ │ │ "What should I do?" │ │ │
│ │ │ │ │ │
│ │ │ LLM Decides: │ │ │
│ │ │ - Call tool X with params Y │ │ │
│ │ │ OR │ │ │
│ │ │ - Provide final answer │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 3: Decision Branch │ │
│ │ │ │
│ │ Does LLM want to call a tool? │ │
│ │ ├─ Yes → Go to Step 4 │ │
│ │ └─ No → Go to Step 6 (Final Response) │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 4: Tool Execution │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ 1. Extract tool call from LLM response │ │ │
│ │ │ - tool_name = "search_web" │ │ │
│ │ │ - parameters = {"query": "AI trends"} │ │ │
│ │ │ │ │ │
│ │ │ 2. Validate tool exists │ │ │
│ │ │ │ │ │
│ │ │ 3. Execute tool │ │ │
│ │ │ result = tool_function(**parameters) │ │ │
│ │ │ │ │ │
│ │ │ 4. Create ToolMessage │ │ │
│ │ │ ToolMessage(content=result) │ │ │
│ │ │ │ │ │
│ │ │ 5. Add to message history │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 5: Observation Phase (LLM Call #2) │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Input to LLM: │ │ │
│ │ │ - System prompt │ │ │
│ │ │ - Tool descriptions │ │ │
│ │ │ - Message history (including tool result) │ │ │
│ │ │ │ │ │
│ │ │ LLM Thinks: │ │ │
│ │ │ "What do these results mean?" │ │ │
│ │ │ "Do I need more information?" │ │ │
│ │ │ │ │ │
│ │ │ LLM Decides: │ │ │
│ │ │ - Call another tool │ │ │
│ │ │ OR │ │ │
│ │ │ - Provide final answer │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Loop back to Step 3 if more tools needed │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 6: Final Response Generation │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ LLM synthesizes all information: │ │ │
│ │ │ - Original query │ │ │
│ │ │ - Tool results │ │ │
│ │ │ - System prompt instructions │ │ │
│ │ │ │ │ │
│ │ │ Generates comprehensive response │ │ │
│ │ │ │ │ │
│ │ │ Creates AIMessage(content=response) │ │ │
│ │ │ │ │ │
│ │ │ Marks execution as complete │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STEP 7: Post-Processing │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Optional: RL Update │ │ │
│ │ │ if rl_enabled: │ │ │
│ │ │ reward = calculate_reward(query, response) │ │ │
│ │ │ rl_manager.update(state_key, tools, reward) │ │ │
│ │ │ │ │ │
│ │ │ Optional: Cache Response │ │ │
│ │ │ if caching_enabled: │ │ │
│ │ │ cache.set(query, response) │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ Return: Updated state with all messages │
│ │
└─────────────────────────────────────────────────────────────┘
Execution Patterns
Pattern 1: Simple Query (No Tools)
User: "Hello"
↓
LLM Reasoning → Decides: Direct response
↓
Final Response: "Hello! How can I help you?"
↓
Done (1 LLM call)
Pattern 2: Single Tool Call
User: "What's the weather in Paris?"
↓
LLM Reasoning → Decides: Use weather_tool
↓
Tool Execution → Returns: "Sunny, 22°C"
↓
LLM Observation → Synthesizes result
↓
Final Response: "The weather in Paris is sunny with 22°C"
↓
Done (2 LLM calls, 1 tool call)
Pattern 3: Multiple Tool Calls (Chain)
User: "Research AI and create a summary"
↓
LLM Reasoning → Decides: Use search_tool
↓
Tool Execution → Returns: AI research data
↓
LLM Observation → Decides: Use summarize_tool
↓
Tool Execution → Returns: Summary
↓
LLM Observation → Synthesizes
↓
Final Response: "Here's the summary: ..."
↓
Done (3 LLM calls, 2 tool calls)