Nodes are the fundamental building blocks of workflows in Azcore. Each node represents a discrete processing unit in the agent workflow graph, performing specific tasks like coordination, planning, supervision, or agent execution.
🔷 Node Architecture
BaseNode Abstract Class
All nodes inherit from BaseNode:
from azcore.core.base import BaseNode
from langgraph.types import Command
class BaseNode(ABC):
"""
Abstract base class for all nodes.
Nodes represent individual processing units in the workflow graph.
Attributes:
name: Unique identifier for the node
description: Human-readable description of the node's purpose
"""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
@abstractmethod
def execute(self, state: Dict[str, Any]) -> Command:
"""
Execute the node's logic.
Args:
state: Current workflow state
Returns:
Command object with updates and next node
"""
pass
Node Execution Pattern
All nodes follow the same execution pattern:
- Receive current state
- Process state according to node logic
- Return Command with state updates and routing decision
def my_node(state: State) -> Command:
"""Example node implementation."""
# 1. Extract data from state
messages = state["messages"]
# 2. Process
result = process_messages(messages)
# 3. Return Command with updates
return Command(
update={
"messages": [HumanMessage(content=result, name="my_node")]
},
goto="next_node"
)
🎯 Core Node Types
1. CoordinatorNode
The coordinator is the first point of contact, handling user interactions and deciding whether to route to the planner or end the conversation.
from azcore.nodes.coordinator import CoordinatorNode
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0)
coordinator = CoordinatorNode(
llm=llm,
system_prompt="You are a helpful coordinator...",
handoff_keyword="handoff_to_planner",
name="coordinator"
)
# Use in graph
from azcore.core.orchestrator import GraphOrchestrator
orchestrator = GraphOrchestrator()
orchestrator.add_node("coordinator", coordinator)
orchestrator.set_entry_point("coordinator")
Coordinator Behavior
# Default coordinator prompt
coordinator_prompt = """You are a friendly and helpful coordinator agent.
Your responsibilities:
1. Greet users warmly and understand their requests
2. Determine if task requires complex planning
3. Delegate to planner for complex tasks
4. Handle simple queries directly
Guidelines:
- Keep responses concise and clear
- Ask clarifying questions when needed
- Use "handoff_to_planner" for complex tasks
Remember: You are the user's first point of contact!
"""
coordinator = CoordinatorNode(
llm=llm,
system_prompt=coordinator_prompt
)
Coordinator Routing
# Coordinator decides routing based on response
state = {"messages": [HumanMessage(content="Hello!")]}
result = coordinator.execute(state)
# If simple query:
# result.goto = END
# If complex task:
# result.goto = "planner"
2. PlannerNode
The planner analyzes complex requests and creates structured execution plans.
from azcore.nodes.planner import PlannerNode
planner = PlannerNode(
llm=llm,
system_prompt=None, # Uses default
validate_json=True,
name="planner"
)
# Custom planner prompt
custom_planner_prompt = """You are an expert task planner.
Analyze the request and create a JSON execution plan:
{
"task": "Task description",
"steps": [
{
"step": 1,
"description": "Step description",
"assigned_team": "team_name",
"tool_selection": ["tool1", "tool2"],
"expected_output": "Expected result"
}
],
"success_criteria": "How to know when complete"
}
"""
planner = PlannerNode(
llm=llm,
system_prompt=custom_planner_prompt,
validate_json=True
)
Planner Output
# Planner creates structured plan
state = {"messages": [HumanMessage(content="Create a security report")]}
result = planner.execute(state)
# Result includes JSON plan
plan = result.update["full_plan"]
print(plan)
# {
# "task": "Create security report",
# "steps": [
# {
# "step": 1,
# "description": "Collect security logs",
# "assigned_team": "security_team",
# "tool_selection": ["fetch_logs"],
# "expected_output": "Recent security logs"
# },
# ...
# ]
# }
3. Supervisor Node
The supervisor routes between teams based on the execution plan.
from azcore.core.supervisor import Supervisor
supervisor = Supervisor(
llm=llm,
members=["research_team", "analysis_team", "writing_team"]
)
# Create supervisor node
supervisor_node = supervisor.create_node()
# Add to graph
orchestrator.add_node("supervisor", supervisor_node)
Supervisor Routing
# Supervisor decides which team to invoke
state = {
"messages": [...],
"full_plan": json_plan # Plan from planner
}
result = supervisor_node(state)
# Routes to appropriate team
# result.goto = "research_team"
4. Generator Node
The generator formats final responses for users.
from azcore.nodes.generator import GeneratorNode
generator = GeneratorNode(
llm=llm,
system_prompt="""You are a response generator.
Format team outputs into clear, user-friendly responses.
Guidelines:
- Synthesize information from all teams
- Present results clearly
- Be concise but complete
"""
)
# Add to graph
orchestrator.add_node("response_generator", generator)
🔧 Creating Custom Nodes
Basic Custom Node
from azcore.core.base import BaseNode
from langgraph.types import Command
from langchain_core.messages import HumanMessage
class CustomProcessorNode(BaseNode):
"""Custom node that processes data."""
def __init__(self, name: str = "processor"):
super().__init__(name=name, description="Processes data")
self.processor = MyDataProcessor()
def execute(self, state: Dict[str, Any]) -> Command:
"""Execute processing logic."""
# Extract data
messages = state["messages"]
last_message = messages[-1].content
# Process
result = self.processor.process(last_message)
# Return Command
return Command(
update={
"messages": [
HumanMessage(content=result, name=self.name)
],
"metadata": {"processed": True}
},
goto="next_node"
)
Node with Conditional Routing
class ConditionalNode(BaseNode):
"""Node with conditional routing."""
def __init__(self, llm, name: str = "conditional"):
super().__init__(name=name, description="Routes conditionally")
self.llm = llm
def execute(self, state: Dict[str, Any]) -> Command:
"""Execute with conditional routing."""
messages = state["messages"]
last_content = messages[-1].content
# Analyze and decide
if "urgent" in last_content.lower():
next_node = "priority_handler"
update = {"metadata": {"priority": "high"}}
elif "error" in last_content.lower():
next_node = "error_handler"
update = {"metadata": {"error": True}}
else:
next_node = "normal_processor"
update = {"metadata": {"priority": "normal"}}
return Command(update=update, goto=next_node)
Node with Error Handling
from azcore.exceptions import NodeExecutionError
from azcore.utils.retry import retry_with_timeout
class RobustNode(BaseNode):
"""Node with error handling and retries."""
def __init__(self, llm, name: str = "robust"):
super().__init__(name=name, description="Robust node with retries")
self.llm = llm
@retry_with_timeout(max_retries=3, timeout=30.0)
def execute(self, state: Dict[str, Any]) -> Command:
"""Execute with error handling."""
try:
# Validate state
if not state or "messages" not in state:
raise NodeExecutionError(
"Invalid state structure",
details={"node": self.name}
)
# Process
result = self._process(state)
return Command(
update={"messages": [result]},
goto="next_node"
)
except Exception as e:
self._logger.error(f"Node execution failed: {e}")
raise NodeExecutionError(
f"Node {self.name} failed",
details={"error": str(e)}
)
def _process(self, state: Dict[str, Any]) -> HumanMessage:
"""Internal processing logic."""
# Implementation
pass
🔄 Node Composition Patterns
Sequential Node Chain
# Create nodes
node1 = CustomNode1("node1")
node2 = CustomNode2("node2")
node3 = CustomNode3("node3")
# Build sequential graph
orchestrator = GraphOrchestrator()
orchestrator.add_node("node1", node1)
orchestrator.add_node("node2", node2)
orchestrator.add_node("node3", node3)
# Chain nodes
orchestrator.set_entry_point("node1")
orchestrator.add_edge("node1", "node2")
orchestrator.add_edge("node2", "node3")
orchestrator.add_edge("node3", END)
graph = orchestrator.compile()
Parallel Node Execution
class AggregatorNode(BaseNode):
"""Node that aggregates results from parallel nodes."""
def execute(self, state: Dict[str, Any]) -> Command:
"""Aggregate results."""
# Collect results from context
results = state.get("context", {}).get("parallel_results", [])
# Aggregate
combined = self._combine_results(results)
return Command(
update={
"messages": [HumanMessage(content=combined, name=self.name)]
},
goto=END
)
# Setup parallel execution
orchestrator.add_node("parallel1", parallel_node1)
orchestrator.add_node("parallel2", parallel_node2)
orchestrator.add_node("aggregator", aggregator_node)
# Both parallel nodes go to aggregator
orchestrator.add_edge("parallel1", "aggregator")
orchestrator.add_edge("parallel2", "aggregator")
Conditional Node Branching
class RouterNode(BaseNode):
"""Routes to different branches based on conditions."""
def execute(self, state: Dict[str, Any]) -> Command:
"""Route based on state."""
message = state["messages"][-1].content
# Determine route
if self._is_query(message):
goto = "query_handler"
elif self._is_command(message):
goto = "command_executor"
elif self._is_error(message):
goto = "error_handler"
else:
goto = "default_handler"
return Command(
update={"metadata": {"route": goto}},
goto=goto
)
# Add branches to graph
orchestrator.add_node("router", router_node)
orchestrator.add_node("query_handler", query_handler)
orchestrator.add_node("command_executor", command_executor)
orchestrator.add_node("error_handler", error_handler)
orchestrator.add_node("default_handler", default_handler)
📊 Node State Management
Accessing State in Nodes
class StateAwareNode(BaseNode):
"""Node that properly accesses state."""
def execute(self, state: Dict[str, Any]) -> Command:
"""Execute with proper state access."""
# Access messages
messages = state.get("messages", [])
# Access context safely
context = state.get("context", {})
user_id = context.get("user_id", "unknown")
# Access metadata
metadata = state.get("metadata", {})
session_id = metadata.get("session_id")
# Access RL metadata
rl_metadata = state.get("rl_metadata", {})
selected_tools = rl_metadata.get("selected_tools", [])
# Process with state data
result = self._process(messages, user_id, selected_tools)
return Command(update={"messages": [result]}, goto="next")
Updating State from Nodes
class StateUpdaterNode(BaseNode):
"""Node that updates various state fields."""
def execute(self, state: Dict[str, Any]) -> Command:
"""Update multiple state fields."""
result = self._process(state)
# Update multiple fields
return Command(
update={
"messages": [
HumanMessage(content=result, name=self.name)
],
"context": {
**state.get("context", {}),
"last_processor": self.name,
"processed_at": time.time()
},
"metadata": {
**state.get("metadata", {}),
"nodes_executed": (
state.get("metadata", {}).get("nodes_executed", [])
+ [self.name]
)
}
},
goto="next_node"
)
🎯 Node Best Practices
1. Single Responsibility
# ✅ GOOD: Focused node
class ValidationNode(BaseNode):
"""Validates input data only."""
def execute(self, state: Dict[str, Any]) -> Command:
# Only validates
is_valid = self._validate(state)
return Command(
update={"metadata": {"valid": is_valid}},
goto="processor" if is_valid else "error"
)
# ❌ BAD: Too many responsibilities
class DoEverythingNode(BaseNode):
"""Does validation, processing, formatting, and routing."""
# Too complex!
2. Clear Naming
# ✅ GOOD: Descriptive names
class UserInputValidatorNode(BaseNode): pass
class DataTransformationNode(BaseNode): pass
class ResponseFormatterNode(BaseNode): pass
# ❌ BAD: Vague names
class Node1(BaseNode): pass
class ProcessorNode(BaseNode): pass # Too generic
3. Error Handling
# ✅ GOOD: Proper error handling
class SafeNode(BaseNode):
def execute(self, state: Dict[str, Any]) -> Command:
try:
result = self._process(state)
return Command(update={"messages": [result]}, goto="next")
except ValueError as e:
self._logger.error(f"Validation error: {e}")
return Command(
update={"metadata": {"error": str(e)}},
goto="error_handler"
)
except Exception as e:
self._logger.error(f"Unexpected error: {e}")
raise NodeExecutionError(f"Node failed: {e}")
4. State Validation
# ✅ GOOD: Validate state structure
class ValidatingNode(BaseNode):
def execute(self, state: Dict[str, Any]) -> Command:
# Validate required fields
required = ["messages", "context"]
for field in required:
if field not in state:
raise ValidationError(f"Missing field: {field}")
# Proceed with processing
return Command(update={...}, goto="next")
🚀 Complete Node Example
from azcore.core.base import BaseNode
from langgraph.types import Command
from langchain_core.messages import HumanMessage
from azcore.exceptions import NodeExecutionError, ValidationError
from azcore.utils.retry import retry_with_timeout
import logging
class DataAnalysisNode(BaseNode):
"""Node that analyzes data and routes based on results."""
def __init__(
self,
llm,
analyzer,
name: str = "data_analyzer"
):
super().__init__(
name=name,
description="Analyzes data and determines next steps"
)
self.llm = llm
self.analyzer = analyzer
self._logger = logging.getLogger(self.__class__.__name__)
@retry_with_timeout(max_retries=3, timeout=30.0)
def execute(self, state: Dict[str, Any]) -> Command:
"""Execute analysis with error handling."""
try:
# Validate state
self._validate_state(state)
# Extract data
messages = state["messages"]
data = messages[-1].content
# Analyze
self._logger.info(f"Analyzing data: {data[:50]}...")
analysis_result = self.analyzer.analyze(data)
# Determine routing
if analysis_result["confidence"] > 0.8:
goto = "high_confidence_processor"
elif analysis_result["requires_human"]:
goto = "human_review"
else:
goto = "standard_processor"
# Format result
result_message = self._format_result(analysis_result)
return Command(
update={
"messages": [
HumanMessage(content=result_message, name=self.name)
],
"context": {
**state.get("context", {}),
"analysis": analysis_result
},
"metadata": {
**state.get("metadata", {}),
"analyzed": True,
"confidence": analysis_result["confidence"]
}
},
goto=goto
)
except ValidationError:
raise
except Exception as e:
self._logger.error(f"Analysis failed: {e}")
raise NodeExecutionError(
f"Data analysis node failed: {str(e)}",
details={"node": self.name, "error": str(e)}
)
def _validate_state(self, state: Dict[str, Any]) -> None:
"""Validate state structure."""
if not state or "messages" not in state:
raise ValidationError(
"Invalid state: missing 'messages'",
details={"state_keys": list(state.keys()) if state else []}
)
if not state["messages"]:
raise ValidationError("State has no messages")
def _format_result(self, analysis: Dict[str, Any]) -> str:
"""Format analysis result for output."""
return f"""Analysis Complete:
- Confidence: {analysis['confidence']:.2f}
- Category: {analysis['category']}
- Recommendation: {analysis['recommendation']}
"""
🎓 Summary
Azcore nodes provide:
- BaseNode: Abstract foundation for all nodes
- Built-in Nodes: Coordinator, Planner, Supervisor, Generator
- Custom Nodes: Easy to create domain-specific nodes
- Command Pattern: Consistent state updates and routing
- Error Handling: Built-in retry and error recovery
- Composability: Nodes combine into complex workflows
Use nodes as modular building blocks to create sophisticated multi-agent workflows with clear separation of concerns and robust error handling.