Graph orchestration is the heart of Azcore's multi-agent system. The GraphOrchestrator class manages the construction and execution of complex workflow graphs, coordinating nodes, teams, and supervisors into cohesive AI applications.
🏗️ GraphOrchestrator Basics
What is Graph Orchestration?
Graph orchestration in Azcore:
- Constructs workflow graphs with nodes and edges
- Manages state flow between components
- Handles routing and conditional branching
- Provides cycle detection and validation
- Enables checkpointing and persistence
Creating an Orchestrator
from azcore.core.orchestrator import GraphOrchestrator
from azcore.core.state import State
# Basic orchestrator
orchestrator = GraphOrchestrator()
# With custom configuration
orchestrator = GraphOrchestrator(
state_class=State,
checkpointer=None, # Optional: MemorySaver, etc.
max_iterations=20,
enable_cycle_detection=True
)
🔷 Building Graphs
Adding Nodes
from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode
# Create nodes
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)
# Add to graph
orchestrator.add_node("coordinator", coordinator)
orchestrator.add_node("planner", planner)
# Can also add custom callables
def custom_processor(state):
# Process state
return Command(update={...}, goto="next")
orchestrator.add_node("processor", custom_processor)
Adding Teams
from azcore.agents.team_builder import TeamBuilder
# Create teams
research_team = (TeamBuilder("research")
.with_llm(llm)
.with_tools([search, analyze])
.build())
security_team = (TeamBuilder("security")
.with_llm(llm)
.with_tools([monitor, alert])
.build())
# Add to graph (automatically builds and registers)
orchestrator.add_team(research_team)
orchestrator.add_team(security_team)
Adding Edges
# Add directed edges
orchestrator.add_edge("coordinator", "planner")
orchestrator.add_edge("planner", "supervisor")
orchestrator.add_edge("supervisor", "research")
# Set entry point
orchestrator.set_entry_point("coordinator")
# Add edges to END
from langgraph.graph import END
orchestrator.add_edge("final_node", END)
Setting Supervisor
from azcore.core.supervisor import Supervisor
# Create supervisor
supervisor = Supervisor(
llm=llm,
members=["research", "security", "analysis"]
)
# Add to graph
orchestrator.set_supervisor(supervisor, name="supervisor")
🔄 Graph Patterns
Linear Sequential Graph
# Simple sequential workflow
orchestrator = GraphOrchestrator()
# Add nodes
orchestrator.add_node("step1", node1)
orchestrator.add_node("step2", node2)
orchestrator.add_node("step3", node3)
# Chain them
orchestrator.set_entry_point("step1")
orchestrator.add_edge("step1", "step2")
orchestrator.add_edge("step2", "step3")
orchestrator.add_edge("step3", END)
# Compile
graph = orchestrator.compile()
Hierarchical Graph
# Coordinator -> Planner -> Supervisor -> Teams -> Generator
orchestrator = GraphOrchestrator()
# Add components
orchestrator.add_node("coordinator", coordinator)
orchestrator.add_node("planner", planner)
orchestrator.set_supervisor(supervisor)
orchestrator.add_team(team1)
orchestrator.add_team(team2)
orchestrator.add_node("generator", generator)
# Build structure
orchestrator.set_entry_point("coordinator")
orchestrator.add_edge("coordinator", "planner")
orchestrator.add_edge("planner", "supervisor")
# Supervisor routes to teams dynamically
orchestrator.add_edge("team1", "supervisor") # Back to supervisor
orchestrator.add_edge("team2", "supervisor")
orchestrator.add_edge("supervisor", "generator")
orchestrator.add_edge("generator", END)
graph = orchestrator.compile()
Conditional Branching Graph
# Graph with conditional routing
orchestrator = GraphOrchestrator()
# Router node decides path
def router_node(state):
message = state["messages"][-1].content
if "urgent" in message.lower():
goto = "priority_handler"
elif "query" in message.lower():
goto = "query_processor"
else:
goto = "default_handler"
return Command(update={}, goto=goto)
orchestrator.add_node("router", router_node)
orchestrator.add_node("priority_handler", priority_handler)
orchestrator.add_node("query_processor", query_processor)
orchestrator.add_node("default_handler", default_handler)
# All paths converge
orchestrator.add_edge("priority_handler", "finalizer")
orchestrator.add_edge("query_processor", "finalizer")
orchestrator.add_edge("default_handler", "finalizer")
orchestrator.add_edge("finalizer", END)
graph = orchestrator.compile()
Parallel Processing Graph
# Multiple parallel branches
orchestrator = GraphOrchestrator()
# Dispatcher sends to multiple processors
def dispatcher(state):
# Trigger parallel processing via context
return Command(
update={
"context": {
**state.get("context", {}),
"parallel_mode": True
}
},
goto="processor1" # Start with first
)
# Aggregator combines results
def aggregator(state):
results = state.get("context", {}).get("results", [])
combined = combine_results(results)
return Command(
update={"messages": [HumanMessage(content=combined)]},
goto=END
)
orchestrator.add_node("dispatcher", dispatcher)
orchestrator.add_node("processor1", processor1)
orchestrator.add_node("processor2", processor2)
orchestrator.add_node("aggregator", aggregator)
# Set up parallel flow
orchestrator.set_entry_point("dispatcher")
# Custom logic needed for true parallel execution
# or use supervisor to coordinate
🛠️ Convenience Methods
build_hierarchical_graph
Quickly build the standard hierarchical pattern:
from azcore.core.orchestrator import GraphOrchestrator
from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode
from azcore.nodes.generator import GeneratorNode
from azcore.core.supervisor import Supervisor
# Create components
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)
supervisor = Supervisor(llm=llm, members=[])
teams = [research_team, security_team, analysis_team]
generator = GeneratorNode(llm=llm)
# Build complete hierarchical graph in one call
orchestrator = GraphOrchestrator()
graph = orchestrator.build_hierarchical_graph(
coordinator=coordinator,
planner=planner,
supervisor=supervisor,
teams=teams,
generator=generator
)
# Graph is ready to use!
result = graph.invoke({"messages": [HumanMessage(content="Task")]})
🔍 Graph Inspection
Accessing Components
# Get specific node
coordinator_node = orchestrator.get_node("coordinator")
# Get specific team
research_team = orchestrator.get_team("research")
# Get all nodes
all_nodes = orchestrator.get_all_nodes()
print(f"Graph has {len(all_nodes)} nodes")
# Get all teams
all_teams = orchestrator.get_all_teams()
for team_name, team in all_teams.items():
print(f"Team: {team_name}, Tools: {len(team.tools)}")
Graph Validation
# Validate graph structure
try:
orchestrator.validate_graph()
print("Graph is valid")
except GraphError as e:
print(f"Graph validation failed: {e.message}")
print(f"Details: {e.details}")
Cycle Detection
# Orchestrator automatically detects cycles
orchestrator = GraphOrchestrator(enable_cycle_detection=True)
orchestrator.add_node("node1", node1)
orchestrator.add_node("node2", node2)
orchestrator.add_node("node3", node3)
# Create cycle
orchestrator.add_edge("node1", "node2")
orchestrator.add_edge("node2", "node3")
try:
orchestrator.add_edge("node3", "node1") # Creates cycle!
except GraphCycleError as e:
print(f"Cycle detected: {e.message}")
# Don't add the problematic edge
🚀 Executing Graphs
Basic Invocation
from langchain_core.messages import HumanMessage
# Compile graph
graph = orchestrator.compile()
# Create initial state
initial_state = {
"messages": [HumanMessage(content="Analyze security logs")]
}
# Invoke graph
result = graph.invoke(initial_state)
# Access results
final_message = result["messages"][-1].content
print(f"Result: {final_message}")
Streaming Execution
# Stream graph execution
for chunk in graph.stream(initial_state):
print(f"Node: {chunk}")
# Process each node's output as it completes
Async Execution
import asyncio
async def run_graph():
result = await graph.ainvoke(initial_state)
return result
# Run async
result = asyncio.run(run_graph())
With Configuration
# Invoke with runtime configuration
result = graph.invoke(
initial_state,
config={
"configurable": {
"thread_id": "user_123",
"max_iterations": 15
}
}
)
💾 Checkpointing & Persistence
Enable Checkpointing
from langgraph.checkpoint.memory import MemorySaver
# Create checkpointer
checkpointer = MemorySaver()
# Build graph with checkpointing
orchestrator = GraphOrchestrator(checkpointer=checkpointer)
# ... add nodes ...
graph = orchestrator.compile()
# Use with thread ID
result = graph.invoke(
initial_state,
config={"configurable": {"thread_id": "conversation_1"}}
)
# Continue conversation
continued = graph.invoke(
{"messages": [HumanMessage(content="Continue...")]},
config={"configurable": {"thread_id": "conversation_1"}}
)
SQLite Checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
# Persistent checkpointing
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
orchestrator = GraphOrchestrator(checkpointer=checkpointer)
# ... build graph ...
graph = orchestrator.compile()
# Conversations persist across restarts
🎯 Advanced Patterns
Dynamic Node Addition
class DynamicOrchestrator:
"""Orchestrator that adds nodes dynamically."""
def __init__(self, llm):
self.orchestrator = GraphOrchestrator()
self.llm = llm
def add_specialized_team(self, domain: str, tools: list):
"""Add a team for a specific domain."""
team = (TeamBuilder(f"{domain}_team")
.with_llm(self.llm)
.with_tools(tools)
.with_prompt(f"You specialize in {domain}")
.build())
self.orchestrator.add_team(team)
return team
def build(self):
"""Build the complete graph."""
# Add core nodes
coordinator = CoordinatorNode(llm=self.llm)
planner = PlannerNode(llm=self.llm)
self.orchestrator.add_node("coordinator", coordinator)
self.orchestrator.add_node("planner", planner)
self.orchestrator.set_entry_point("coordinator")
# Graph can be extended later
return self.orchestrator.compile()
# Use dynamic orchestrator
dynamic = DynamicOrchestrator(llm)
graph = dynamic.build()
# Add teams as needed
dynamic.add_specialized_team("data_analysis", [query_tool, viz_tool])
dynamic.add_specialized_team("report_generation", [write_tool, format_tool])
Error Recovery Nodes
def error_handler_node(state):
"""Handle errors in the workflow."""
error = state.get("metadata", {}).get("error")
if error:
# Log error
logger.error(f"Workflow error: {error}")
# Attempt recovery
return Command(
update={
"messages": [
HumanMessage(
content=f"Error occurred: {error}. Attempting recovery...",
name="error_handler"
)
],
"metadata": {"error_handled": True}
},
goto="recovery_node"
)
else:
return Command(update={}, goto="normal_flow")
# Add to graph
orchestrator.add_node("error_handler", error_handler_node)
# Nodes can route to error handler
def risky_node(state):
try:
result = risky_operation()
return Command(update={"messages": [result]}, goto="next")
except Exception as e:
return Command(
update={"metadata": {"error": str(e)}},
goto="error_handler"
)
State Transformers
def state_transformer_node(state):
"""Transform state between different formats."""
# Extract data
messages = state["messages"]
# Transform
transformed_messages = [
transform_message(msg) for msg in messages
]
# Enrich state
return Command(
update={
"messages": transformed_messages,
"context": {
**state.get("context", {}),
"transformed": True,
"original_count": len(messages)
}
},
goto="next_node"
)
📊 Graph Monitoring
Execution Metrics
import time
class MonitoredOrchestrator:
"""Orchestrator with execution monitoring."""
def __init__(self):
self.orchestrator = GraphOrchestrator()
self.metrics = {
"node_executions": {},
"execution_times": {},
"error_count": 0
}
def add_monitored_node(self, name: str, node: Callable):
"""Add node with monitoring wrapper."""
def monitored_node(state):
start_time = time.time()
self.metrics["node_executions"][name] = \
self.metrics["node_executions"].get(name, 0) + 1
try:
result = node(state)
duration = time.time() - start_time
self.metrics["execution_times"][name] = duration
return result
except Exception as e:
self.metrics["error_count"] += 1
raise
self.orchestrator.add_node(name, monitored_node)
def get_metrics(self):
"""Get execution metrics."""
return self.metrics
# Use monitored orchestrator
monitored = MonitoredOrchestrator()
monitored.add_monitored_node("processor", processor_node)
# ... build graph ...
# Check metrics after execution
print(monitored.get_metrics())
Visualization
def visualize_graph(orchestrator: GraphOrchestrator):
"""Visualize the graph structure."""
nodes = orchestrator.get_all_nodes()
teams = orchestrator.get_all_teams()
print("Graph Structure:")
print(f"Nodes: {len(nodes)}")
for name in nodes.keys():
print(f" - {name}")
print(f"\nTeams: {len(teams)}")
for name, team in teams.items():
print(f" - {name} ({len(team.tools)} tools)")
# Could also generate graphviz diagram, etc.
🎯 Best Practices
1. Clear Graph Structure
# ✅ GOOD: Clear, understandable flow
orchestrator = GraphOrchestrator()
orchestrator.add_node("input_validator", validator)
orchestrator.add_node("processor", processor)
orchestrator.add_node("output_formatter", formatter)
orchestrator.set_entry_point("input_validator")
orchestrator.add_edge("input_validator", "processor")
orchestrator.add_edge("processor", "output_formatter")
orchestrator.add_edge("output_formatter", END)
# ❌ BAD: Complex, hard to follow
orchestrator.add_node("n1", node1)
orchestrator.add_node("n2", node2)
# Many edges creating spaghetti...
2. Validate Before Compiling
# ✅ GOOD: Validate graph structure
orchestrator = GraphOrchestrator()
# ... add nodes ...
try:
orchestrator.validate_graph()
graph = orchestrator.compile()
except GraphError as e:
print(f"Graph invalid: {e}")
# Fix issues before compiling
3. Enable Cycle Detection
# ✅ GOOD: Catch cycles early
orchestrator = GraphOrchestrator(enable_cycle_detection=True)
# Will raise GraphCycleError if cycle created
4. Use Hierarchical Pattern for Complex Workflows
# ✅ GOOD: Use build_hierarchical_graph for standard pattern
graph = orchestrator.build_hierarchical_graph(
coordinator=coordinator,
planner=planner,
supervisor=supervisor,
teams=teams,
generator=generator
)
# Don't manually wire up common patterns
🚀 Complete Orchestration Example
from azcore.core.orchestrator import GraphOrchestrator
from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode
from azcore.nodes.generator import GeneratorNode
from azcore.core.supervisor import Supervisor
from azcore.agents.team_builder import TeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
# Setup LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Create nodes
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)
generator = GeneratorNode(llm=llm)
# Create teams
research_team = (TeamBuilder("research")
.with_llm(llm)
.with_tools([web_search, analyze_docs])
.with_prompt("You are a research specialist.")
.build())
security_team = (TeamBuilder("security")
.with_llm(llm)
.with_tools([scan_logs, alert_team])
.with_prompt("You are a security specialist.")
.build())
# Create supervisor
supervisor = Supervisor(
llm=llm,
members=["research", "security"]
)
# Build graph with checkpointing
checkpointer = MemorySaver()
orchestrator = GraphOrchestrator(
checkpointer=checkpointer,
max_iterations=20,
enable_cycle_detection=True
)
# Use convenience method
graph = orchestrator.build_hierarchical_graph(
coordinator=coordinator,
planner=planner,
supervisor=supervisor,
teams=[research_team, security_team],
generator=generator
)
# Execute
result = graph.invoke(
{"messages": [HumanMessage(content="Analyze recent security incidents")]},
config={"configurable": {"thread_id": "session_1"}}
)
# Get result
print(result["messages"][-1].content)
🎓 Summary
Azcore's graph orchestration provides:
- GraphOrchestrator: High-level API for graph construction
- Flexible Patterns: Linear, hierarchical, conditional, parallel workflows
- Validation: Cycle detection and structure validation
- Checkpointing: Conversation persistence and resumption
- Monitoring: Track execution and performance
- Composability: Mix nodes, teams, and custom components
Use the GraphOrchestrator to build sophisticated multi-agent workflows with clean architecture, proper error handling, and robust state management.