• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Core Concepts

Graph Orchestration

Building and managing workflow graphs in Azcore.

Graph orchestration is the heart of Azcore's multi-agent system. The GraphOrchestrator class manages the construction and execution of complex workflow graphs, coordinating nodes, teams, and supervisors into cohesive AI applications.

🏗️ GraphOrchestrator Basics

What is Graph Orchestration?

Graph orchestration in Azcore:

  • Constructs workflow graphs with nodes and edges
  • Manages state flow between components
  • Handles routing and conditional branching
  • Provides cycle detection and validation
  • Enables checkpointing and persistence

Creating an Orchestrator

from azcore.core.orchestrator import GraphOrchestrator
from azcore.core.state import State

# Basic orchestrator
orchestrator = GraphOrchestrator()

# With custom configuration
orchestrator = GraphOrchestrator(
    state_class=State,
    checkpointer=None,  # Optional: MemorySaver, etc.
    max_iterations=20,
    enable_cycle_detection=True
)

🔷 Building Graphs

Adding Nodes

from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode

# Create nodes
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)

# Add to graph
orchestrator.add_node("coordinator", coordinator)
orchestrator.add_node("planner", planner)

# Can also add custom callables
def custom_processor(state):
    # Process state
    return Command(update={...}, goto="next")

orchestrator.add_node("processor", custom_processor)

Adding Teams

from azcore.agents.team_builder import TeamBuilder

# Create teams
research_team = (TeamBuilder("research")
    .with_llm(llm)
    .with_tools([search, analyze])
    .build())

security_team = (TeamBuilder("security")
    .with_llm(llm)
    .with_tools([monitor, alert])
    .build())

# Add to graph (automatically builds and registers)
orchestrator.add_team(research_team)
orchestrator.add_team(security_team)

Adding Edges

# Add directed edges
orchestrator.add_edge("coordinator", "planner")
orchestrator.add_edge("planner", "supervisor")
orchestrator.add_edge("supervisor", "research")

# Set entry point
orchestrator.set_entry_point("coordinator")

# Add edges to END
from langgraph.graph import END
orchestrator.add_edge("final_node", END)

Setting Supervisor

from azcore.core.supervisor import Supervisor

# Create supervisor
supervisor = Supervisor(
    llm=llm,
    members=["research", "security", "analysis"]
)

# Add to graph
orchestrator.set_supervisor(supervisor, name="supervisor")

🔄 Graph Patterns

Linear Sequential Graph

# Simple sequential workflow
orchestrator = GraphOrchestrator()

# Add nodes
orchestrator.add_node("step1", node1)
orchestrator.add_node("step2", node2)
orchestrator.add_node("step3", node3)

# Chain them
orchestrator.set_entry_point("step1")
orchestrator.add_edge("step1", "step2")
orchestrator.add_edge("step2", "step3")
orchestrator.add_edge("step3", END)

# Compile
graph = orchestrator.compile()

Hierarchical Graph

# Coordinator -> Planner -> Supervisor -> Teams -> Generator
orchestrator = GraphOrchestrator()

# Add components
orchestrator.add_node("coordinator", coordinator)
orchestrator.add_node("planner", planner)
orchestrator.set_supervisor(supervisor)
orchestrator.add_team(team1)
orchestrator.add_team(team2)
orchestrator.add_node("generator", generator)

# Build structure
orchestrator.set_entry_point("coordinator")
orchestrator.add_edge("coordinator", "planner")
orchestrator.add_edge("planner", "supervisor")
# Supervisor routes to teams dynamically
orchestrator.add_edge("team1", "supervisor")  # Back to supervisor
orchestrator.add_edge("team2", "supervisor")
orchestrator.add_edge("supervisor", "generator")
orchestrator.add_edge("generator", END)

graph = orchestrator.compile()

Conditional Branching Graph

# Graph with conditional routing
orchestrator = GraphOrchestrator()

# Router node decides path
def router_node(state):
    message = state["messages"][-1].content

    if "urgent" in message.lower():
        goto = "priority_handler"
    elif "query" in message.lower():
        goto = "query_processor"
    else:
        goto = "default_handler"

    return Command(update={}, goto=goto)

orchestrator.add_node("router", router_node)
orchestrator.add_node("priority_handler", priority_handler)
orchestrator.add_node("query_processor", query_processor)
orchestrator.add_node("default_handler", default_handler)

# All paths converge
orchestrator.add_edge("priority_handler", "finalizer")
orchestrator.add_edge("query_processor", "finalizer")
orchestrator.add_edge("default_handler", "finalizer")
orchestrator.add_edge("finalizer", END)

graph = orchestrator.compile()

Parallel Processing Graph

# Multiple parallel branches
orchestrator = GraphOrchestrator()

# Dispatcher sends to multiple processors
def dispatcher(state):
    # Trigger parallel processing via context
    return Command(
        update={
            "context": {
                **state.get("context", {}),
                "parallel_mode": True
            }
        },
        goto="processor1"  # Start with first
    )

# Aggregator combines results
def aggregator(state):
    results = state.get("context", {}).get("results", [])
    combined = combine_results(results)

    return Command(
        update={"messages": [HumanMessage(content=combined)]},
        goto=END
    )

orchestrator.add_node("dispatcher", dispatcher)
orchestrator.add_node("processor1", processor1)
orchestrator.add_node("processor2", processor2)
orchestrator.add_node("aggregator", aggregator)

# Set up parallel flow
orchestrator.set_entry_point("dispatcher")
# Custom logic needed for true parallel execution
# or use supervisor to coordinate

🛠️ Convenience Methods

build_hierarchical_graph

Quickly build the standard hierarchical pattern:

from azcore.core.orchestrator import GraphOrchestrator
from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode
from azcore.nodes.generator import GeneratorNode
from azcore.core.supervisor import Supervisor

# Create components
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)
supervisor = Supervisor(llm=llm, members=[])
teams = [research_team, security_team, analysis_team]
generator = GeneratorNode(llm=llm)

# Build complete hierarchical graph in one call
orchestrator = GraphOrchestrator()
graph = orchestrator.build_hierarchical_graph(
    coordinator=coordinator,
    planner=planner,
    supervisor=supervisor,
    teams=teams,
    generator=generator
)

# Graph is ready to use!
result = graph.invoke({"messages": [HumanMessage(content="Task")]})

🔍 Graph Inspection

Accessing Components

# Get specific node
coordinator_node = orchestrator.get_node("coordinator")

# Get specific team
research_team = orchestrator.get_team("research")

# Get all nodes
all_nodes = orchestrator.get_all_nodes()
print(f"Graph has {len(all_nodes)} nodes")

# Get all teams
all_teams = orchestrator.get_all_teams()
for team_name, team in all_teams.items():
    print(f"Team: {team_name}, Tools: {len(team.tools)}")

Graph Validation

# Validate graph structure
try:
    orchestrator.validate_graph()
    print("Graph is valid")
except GraphError as e:
    print(f"Graph validation failed: {e.message}")
    print(f"Details: {e.details}")

Cycle Detection

# Orchestrator automatically detects cycles
orchestrator = GraphOrchestrator(enable_cycle_detection=True)

orchestrator.add_node("node1", node1)
orchestrator.add_node("node2", node2)
orchestrator.add_node("node3", node3)

# Create cycle
orchestrator.add_edge("node1", "node2")
orchestrator.add_edge("node2", "node3")

try:
    orchestrator.add_edge("node3", "node1")  # Creates cycle!
except GraphCycleError as e:
    print(f"Cycle detected: {e.message}")
    # Don't add the problematic edge

🚀 Executing Graphs

Basic Invocation

from langchain_core.messages import HumanMessage

# Compile graph
graph = orchestrator.compile()

# Create initial state
initial_state = {
    "messages": [HumanMessage(content="Analyze security logs")]
}

# Invoke graph
result = graph.invoke(initial_state)

# Access results
final_message = result["messages"][-1].content
print(f"Result: {final_message}")

Streaming Execution

# Stream graph execution
for chunk in graph.stream(initial_state):
    print(f"Node: {chunk}")
    # Process each node's output as it completes

Async Execution

import asyncio

async def run_graph():
    result = await graph.ainvoke(initial_state)
    return result

# Run async
result = asyncio.run(run_graph())

With Configuration

# Invoke with runtime configuration
result = graph.invoke(
    initial_state,
    config={
        "configurable": {
            "thread_id": "user_123",
            "max_iterations": 15
        }
    }
)

💾 Checkpointing & Persistence

Enable Checkpointing

from langgraph.checkpoint.memory import MemorySaver

# Create checkpointer
checkpointer = MemorySaver()

# Build graph with checkpointing
orchestrator = GraphOrchestrator(checkpointer=checkpointer)
# ... add nodes ...
graph = orchestrator.compile()

# Use with thread ID
result = graph.invoke(
    initial_state,
    config={"configurable": {"thread_id": "conversation_1"}}
)

# Continue conversation
continued = graph.invoke(
    {"messages": [HumanMessage(content="Continue...")]},
    config={"configurable": {"thread_id": "conversation_1"}}
)

SQLite Checkpointer

from langgraph.checkpoint.sqlite import SqliteSaver

# Persistent checkpointing
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

orchestrator = GraphOrchestrator(checkpointer=checkpointer)
# ... build graph ...
graph = orchestrator.compile()

# Conversations persist across restarts

🎯 Advanced Patterns

Dynamic Node Addition

class DynamicOrchestrator:
    """Orchestrator that adds nodes dynamically."""

    def __init__(self, llm):
        self.orchestrator = GraphOrchestrator()
        self.llm = llm

    def add_specialized_team(self, domain: str, tools: list):
        """Add a team for a specific domain."""
        team = (TeamBuilder(f"{domain}_team")
            .with_llm(self.llm)
            .with_tools(tools)
            .with_prompt(f"You specialize in {domain}")
            .build())

        self.orchestrator.add_team(team)
        return team

    def build(self):
        """Build the complete graph."""
        # Add core nodes
        coordinator = CoordinatorNode(llm=self.llm)
        planner = PlannerNode(llm=self.llm)

        self.orchestrator.add_node("coordinator", coordinator)
        self.orchestrator.add_node("planner", planner)
        self.orchestrator.set_entry_point("coordinator")

        # Graph can be extended later
        return self.orchestrator.compile()

# Use dynamic orchestrator
dynamic = DynamicOrchestrator(llm)
graph = dynamic.build()

# Add teams as needed
dynamic.add_specialized_team("data_analysis", [query_tool, viz_tool])
dynamic.add_specialized_team("report_generation", [write_tool, format_tool])

Error Recovery Nodes

def error_handler_node(state):
    """Handle errors in the workflow."""

    error = state.get("metadata", {}).get("error")

    if error:
        # Log error
        logger.error(f"Workflow error: {error}")

        # Attempt recovery
        return Command(
            update={
                "messages": [
                    HumanMessage(
                        content=f"Error occurred: {error}. Attempting recovery...",
                        name="error_handler"
                    )
                ],
                "metadata": {"error_handled": True}
            },
            goto="recovery_node"
        )
    else:
        return Command(update={}, goto="normal_flow")

# Add to graph
orchestrator.add_node("error_handler", error_handler_node)

# Nodes can route to error handler
def risky_node(state):
    try:
        result = risky_operation()
        return Command(update={"messages": [result]}, goto="next")
    except Exception as e:
        return Command(
            update={"metadata": {"error": str(e)}},
            goto="error_handler"
        )

State Transformers

def state_transformer_node(state):
    """Transform state between different formats."""

    # Extract data
    messages = state["messages"]

    # Transform
    transformed_messages = [
        transform_message(msg) for msg in messages
    ]

    # Enrich state
    return Command(
        update={
            "messages": transformed_messages,
            "context": {
                **state.get("context", {}),
                "transformed": True,
                "original_count": len(messages)
            }
        },
        goto="next_node"
    )

📊 Graph Monitoring

Execution Metrics

import time

class MonitoredOrchestrator:
    """Orchestrator with execution monitoring."""

    def __init__(self):
        self.orchestrator = GraphOrchestrator()
        self.metrics = {
            "node_executions": {},
            "execution_times": {},
            "error_count": 0
        }

    def add_monitored_node(self, name: str, node: Callable):
        """Add node with monitoring wrapper."""

        def monitored_node(state):
            start_time = time.time()
            self.metrics["node_executions"][name] = \
                self.metrics["node_executions"].get(name, 0) + 1

            try:
                result = node(state)
                duration = time.time() - start_time
                self.metrics["execution_times"][name] = duration
                return result
            except Exception as e:
                self.metrics["error_count"] += 1
                raise

        self.orchestrator.add_node(name, monitored_node)

    def get_metrics(self):
        """Get execution metrics."""
        return self.metrics

# Use monitored orchestrator
monitored = MonitoredOrchestrator()
monitored.add_monitored_node("processor", processor_node)
# ... build graph ...

# Check metrics after execution
print(monitored.get_metrics())

Visualization

def visualize_graph(orchestrator: GraphOrchestrator):
    """Visualize the graph structure."""

    nodes = orchestrator.get_all_nodes()
    teams = orchestrator.get_all_teams()

    print("Graph Structure:")
    print(f"Nodes: {len(nodes)}")
    for name in nodes.keys():
        print(f"  - {name}")

    print(f"\nTeams: {len(teams)}")
    for name, team in teams.items():
        print(f"  - {name} ({len(team.tools)} tools)")

    # Could also generate graphviz diagram, etc.

🎯 Best Practices

1. Clear Graph Structure

# ✅ GOOD: Clear, understandable flow
orchestrator = GraphOrchestrator()
orchestrator.add_node("input_validator", validator)
orchestrator.add_node("processor", processor)
orchestrator.add_node("output_formatter", formatter)
orchestrator.set_entry_point("input_validator")
orchestrator.add_edge("input_validator", "processor")
orchestrator.add_edge("processor", "output_formatter")
orchestrator.add_edge("output_formatter", END)

# ❌ BAD: Complex, hard to follow
orchestrator.add_node("n1", node1)
orchestrator.add_node("n2", node2)
# Many edges creating spaghetti...

2. Validate Before Compiling

# ✅ GOOD: Validate graph structure
orchestrator = GraphOrchestrator()
# ... add nodes ...

try:
    orchestrator.validate_graph()
    graph = orchestrator.compile()
except GraphError as e:
    print(f"Graph invalid: {e}")
    # Fix issues before compiling

3. Enable Cycle Detection

# ✅ GOOD: Catch cycles early
orchestrator = GraphOrchestrator(enable_cycle_detection=True)
# Will raise GraphCycleError if cycle created

4. Use Hierarchical Pattern for Complex Workflows

# ✅ GOOD: Use build_hierarchical_graph for standard pattern
graph = orchestrator.build_hierarchical_graph(
    coordinator=coordinator,
    planner=planner,
    supervisor=supervisor,
    teams=teams,
    generator=generator
)

# Don't manually wire up common patterns

🚀 Complete Orchestration Example

from azcore.core.orchestrator import GraphOrchestrator
from azcore.nodes.coordinator import CoordinatorNode
from azcore.nodes.planner import PlannerNode
from azcore.nodes.generator import GeneratorNode
from azcore.core.supervisor import Supervisor
from azcore.agents.team_builder import TeamBuilder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver

# Setup LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)

# Create nodes
coordinator = CoordinatorNode(llm=llm)
planner = PlannerNode(llm=llm)
generator = GeneratorNode(llm=llm)

# Create teams
research_team = (TeamBuilder("research")
    .with_llm(llm)
    .with_tools([web_search, analyze_docs])
    .with_prompt("You are a research specialist.")
    .build())

security_team = (TeamBuilder("security")
    .with_llm(llm)
    .with_tools([scan_logs, alert_team])
    .with_prompt("You are a security specialist.")
    .build())

# Create supervisor
supervisor = Supervisor(
    llm=llm,
    members=["research", "security"]
)

# Build graph with checkpointing
checkpointer = MemorySaver()
orchestrator = GraphOrchestrator(
    checkpointer=checkpointer,
    max_iterations=20,
    enable_cycle_detection=True
)

# Use convenience method
graph = orchestrator.build_hierarchical_graph(
    coordinator=coordinator,
    planner=planner,
    supervisor=supervisor,
    teams=[research_team, security_team],
    generator=generator
)

# Execute
result = graph.invoke(
    {"messages": [HumanMessage(content="Analyze recent security incidents")]},
    config={"configurable": {"thread_id": "session_1"}}
)

# Get result
print(result["messages"][-1].content)

🎓 Summary

Azcore's graph orchestration provides:

  • GraphOrchestrator: High-level API for graph construction
  • Flexible Patterns: Linear, hierarchical, conditional, parallel workflows
  • Validation: Cycle detection and structure validation
  • Checkpointing: Conversation persistence and resumption
  • Monitoring: Track execution and performance
  • Composability: Mix nodes, teams, and custom components

Use the GraphOrchestrator to build sophisticated multi-agent workflows with clean architecture, proper error handling, and robust state management.

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs