• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Core Concepts

Nodes

Understanding nodes and workflow building blocks in Azcore.

Nodes are the fundamental building blocks of workflows in Azcore. Each node represents a discrete processing unit in the agent workflow graph, performing specific tasks like coordination, planning, supervision, or agent execution.

🔷 Node Architecture

BaseNode Abstract Class

All nodes inherit from BaseNode:

from azcore.core.base import BaseNode
from langgraph.types import Command

class BaseNode(ABC):
    """
    Abstract base class for all nodes.

    Nodes represent individual processing units in the workflow graph.

    Attributes:
        name: Unique identifier for the node
        description: Human-readable description of the node's purpose
    """

    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description

    @abstractmethod
    def execute(self, state: Dict[str, Any]) -> Command:
        """
        Execute the node's logic.

        Args:
            state: Current workflow state

        Returns:
            Command object with updates and next node
        """
        pass

Node Execution Pattern

All nodes follow the same execution pattern:

  1. Receive current state
  2. Process state according to node logic
  3. Return Command with state updates and routing decision
def my_node(state: State) -> Command:
    """Example node implementation."""

    # 1. Extract data from state
    messages = state["messages"]

    # 2. Process
    result = process_messages(messages)

    # 3. Return Command with updates
    return Command(
        update={
            "messages": [HumanMessage(content=result, name="my_node")]
        },
        goto="next_node"
    )

🎯 Core Node Types

1. CoordinatorNode

The coordinator is the first point of contact, handling user interactions and deciding whether to route to the planner or end the conversation.

from azcore.nodes.coordinator import CoordinatorNode
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4", temperature=0)

coordinator = CoordinatorNode(
    llm=llm,
    system_prompt="You are a helpful coordinator...",
    handoff_keyword="handoff_to_planner",
    name="coordinator"
)

# Use in graph
from azcore.core.orchestrator import GraphOrchestrator

orchestrator = GraphOrchestrator()
orchestrator.add_node("coordinator", coordinator)
orchestrator.set_entry_point("coordinator")

Coordinator Behavior

# Default coordinator prompt
coordinator_prompt = """You are a friendly and helpful coordinator agent.

Your responsibilities:
1. Greet users warmly and understand their requests
2. Determine if task requires complex planning
3. Delegate to planner for complex tasks
4. Handle simple queries directly

Guidelines:
- Keep responses concise and clear
- Ask clarifying questions when needed
- Use "handoff_to_planner" for complex tasks

Remember: You are the user's first point of contact!
"""

coordinator = CoordinatorNode(
    llm=llm,
    system_prompt=coordinator_prompt
)

Coordinator Routing

# Coordinator decides routing based on response
state = {"messages": [HumanMessage(content="Hello!")]}

result = coordinator.execute(state)

# If simple query:
# result.goto = END

# If complex task:
# result.goto = "planner"

2. PlannerNode

The planner analyzes complex requests and creates structured execution plans.

from azcore.nodes.planner import PlannerNode

planner = PlannerNode(
    llm=llm,
    system_prompt=None,  # Uses default
    validate_json=True,
    name="planner"
)

# Custom planner prompt
custom_planner_prompt = """You are an expert task planner.

Analyze the request and create a JSON execution plan:
{
    "task": "Task description",
    "steps": [
        {
            "step": 1,
            "description": "Step description",
            "assigned_team": "team_name",
            "tool_selection": ["tool1", "tool2"],
            "expected_output": "Expected result"
        }
    ],
    "success_criteria": "How to know when complete"
}
"""

planner = PlannerNode(
    llm=llm,
    system_prompt=custom_planner_prompt,
    validate_json=True
)

Planner Output

# Planner creates structured plan
state = {"messages": [HumanMessage(content="Create a security report")]}

result = planner.execute(state)

# Result includes JSON plan
plan = result.update["full_plan"]
print(plan)
# {
#   "task": "Create security report",
#   "steps": [
#     {
#       "step": 1,
#       "description": "Collect security logs",
#       "assigned_team": "security_team",
#       "tool_selection": ["fetch_logs"],
#       "expected_output": "Recent security logs"
#     },
#     ...
#   ]
# }

3. Supervisor Node

The supervisor routes between teams based on the execution plan.

from azcore.core.supervisor import Supervisor

supervisor = Supervisor(
    llm=llm,
    members=["research_team", "analysis_team", "writing_team"]
)

# Create supervisor node
supervisor_node = supervisor.create_node()

# Add to graph
orchestrator.add_node("supervisor", supervisor_node)

Supervisor Routing

# Supervisor decides which team to invoke
state = {
    "messages": [...],
    "full_plan": json_plan  # Plan from planner
}

result = supervisor_node(state)

# Routes to appropriate team
# result.goto = "research_team"

4. Generator Node

The generator formats final responses for users.

from azcore.nodes.generator import GeneratorNode

generator = GeneratorNode(
    llm=llm,
    system_prompt="""You are a response generator.

    Format team outputs into clear, user-friendly responses.

    Guidelines:
    - Synthesize information from all teams
    - Present results clearly
    - Be concise but complete
    """
)

# Add to graph
orchestrator.add_node("response_generator", generator)

🔧 Creating Custom Nodes

Basic Custom Node

from azcore.core.base import BaseNode
from langgraph.types import Command
from langchain_core.messages import HumanMessage

class CustomProcessorNode(BaseNode):
    """Custom node that processes data."""

    def __init__(self, name: str = "processor"):
        super().__init__(name=name, description="Processes data")
        self.processor = MyDataProcessor()

    def execute(self, state: Dict[str, Any]) -> Command:
        """Execute processing logic."""

        # Extract data
        messages = state["messages"]
        last_message = messages[-1].content

        # Process
        result = self.processor.process(last_message)

        # Return Command
        return Command(
            update={
                "messages": [
                    HumanMessage(content=result, name=self.name)
                ],
                "metadata": {"processed": True}
            },
            goto="next_node"
        )

Node with Conditional Routing

class ConditionalNode(BaseNode):
    """Node with conditional routing."""

    def __init__(self, llm, name: str = "conditional"):
        super().__init__(name=name, description="Routes conditionally")
        self.llm = llm

    def execute(self, state: Dict[str, Any]) -> Command:
        """Execute with conditional routing."""

        messages = state["messages"]
        last_content = messages[-1].content

        # Analyze and decide
        if "urgent" in last_content.lower():
            next_node = "priority_handler"
            update = {"metadata": {"priority": "high"}}
        elif "error" in last_content.lower():
            next_node = "error_handler"
            update = {"metadata": {"error": True}}
        else:
            next_node = "normal_processor"
            update = {"metadata": {"priority": "normal"}}

        return Command(update=update, goto=next_node)

Node with Error Handling

from azcore.exceptions import NodeExecutionError
from azcore.utils.retry import retry_with_timeout

class RobustNode(BaseNode):
    """Node with error handling and retries."""

    def __init__(self, llm, name: str = "robust"):
        super().__init__(name=name, description="Robust node with retries")
        self.llm = llm

    @retry_with_timeout(max_retries=3, timeout=30.0)
    def execute(self, state: Dict[str, Any]) -> Command:
        """Execute with error handling."""

        try:
            # Validate state
            if not state or "messages" not in state:
                raise NodeExecutionError(
                    "Invalid state structure",
                    details={"node": self.name}
                )

            # Process
            result = self._process(state)

            return Command(
                update={"messages": [result]},
                goto="next_node"
            )

        except Exception as e:
            self._logger.error(f"Node execution failed: {e}")
            raise NodeExecutionError(
                f"Node {self.name} failed",
                details={"error": str(e)}
            )

    def _process(self, state: Dict[str, Any]) -> HumanMessage:
        """Internal processing logic."""
        # Implementation
        pass

🔄 Node Composition Patterns

Sequential Node Chain

# Create nodes
node1 = CustomNode1("node1")
node2 = CustomNode2("node2")
node3 = CustomNode3("node3")

# Build sequential graph
orchestrator = GraphOrchestrator()
orchestrator.add_node("node1", node1)
orchestrator.add_node("node2", node2)
orchestrator.add_node("node3", node3)

# Chain nodes
orchestrator.set_entry_point("node1")
orchestrator.add_edge("node1", "node2")
orchestrator.add_edge("node2", "node3")
orchestrator.add_edge("node3", END)

graph = orchestrator.compile()

Parallel Node Execution

class AggregatorNode(BaseNode):
    """Node that aggregates results from parallel nodes."""

    def execute(self, state: Dict[str, Any]) -> Command:
        """Aggregate results."""

        # Collect results from context
        results = state.get("context", {}).get("parallel_results", [])

        # Aggregate
        combined = self._combine_results(results)

        return Command(
            update={
                "messages": [HumanMessage(content=combined, name=self.name)]
            },
            goto=END
        )

# Setup parallel execution
orchestrator.add_node("parallel1", parallel_node1)
orchestrator.add_node("parallel2", parallel_node2)
orchestrator.add_node("aggregator", aggregator_node)

# Both parallel nodes go to aggregator
orchestrator.add_edge("parallel1", "aggregator")
orchestrator.add_edge("parallel2", "aggregator")

Conditional Node Branching

class RouterNode(BaseNode):
    """Routes to different branches based on conditions."""

    def execute(self, state: Dict[str, Any]) -> Command:
        """Route based on state."""

        message = state["messages"][-1].content

        # Determine route
        if self._is_query(message):
            goto = "query_handler"
        elif self._is_command(message):
            goto = "command_executor"
        elif self._is_error(message):
            goto = "error_handler"
        else:
            goto = "default_handler"

        return Command(
            update={"metadata": {"route": goto}},
            goto=goto
        )

# Add branches to graph
orchestrator.add_node("router", router_node)
orchestrator.add_node("query_handler", query_handler)
orchestrator.add_node("command_executor", command_executor)
orchestrator.add_node("error_handler", error_handler)
orchestrator.add_node("default_handler", default_handler)

📊 Node State Management

Accessing State in Nodes

class StateAwareNode(BaseNode):
    """Node that properly accesses state."""

    def execute(self, state: Dict[str, Any]) -> Command:
        """Execute with proper state access."""

        # Access messages
        messages = state.get("messages", [])

        # Access context safely
        context = state.get("context", {})
        user_id = context.get("user_id", "unknown")

        # Access metadata
        metadata = state.get("metadata", {})
        session_id = metadata.get("session_id")

        # Access RL metadata
        rl_metadata = state.get("rl_metadata", {})
        selected_tools = rl_metadata.get("selected_tools", [])

        # Process with state data
        result = self._process(messages, user_id, selected_tools)

        return Command(update={"messages": [result]}, goto="next")

Updating State from Nodes

class StateUpdaterNode(BaseNode):
    """Node that updates various state fields."""

    def execute(self, state: Dict[str, Any]) -> Command:
        """Update multiple state fields."""

        result = self._process(state)

        # Update multiple fields
        return Command(
            update={
                "messages": [
                    HumanMessage(content=result, name=self.name)
                ],
                "context": {
                    **state.get("context", {}),
                    "last_processor": self.name,
                    "processed_at": time.time()
                },
                "metadata": {
                    **state.get("metadata", {}),
                    "nodes_executed": (
                        state.get("metadata", {}).get("nodes_executed", [])
                        + [self.name]
                    )
                }
            },
            goto="next_node"
        )

🎯 Node Best Practices

1. Single Responsibility

# ✅ GOOD: Focused node
class ValidationNode(BaseNode):
    """Validates input data only."""

    def execute(self, state: Dict[str, Any]) -> Command:
        # Only validates
        is_valid = self._validate(state)
        return Command(
            update={"metadata": {"valid": is_valid}},
            goto="processor" if is_valid else "error"
        )

# ❌ BAD: Too many responsibilities
class DoEverythingNode(BaseNode):
    """Does validation, processing, formatting, and routing."""
    # Too complex!

2. Clear Naming

# ✅ GOOD: Descriptive names
class UserInputValidatorNode(BaseNode): pass
class DataTransformationNode(BaseNode): pass
class ResponseFormatterNode(BaseNode): pass

# ❌ BAD: Vague names
class Node1(BaseNode): pass
class ProcessorNode(BaseNode): pass  # Too generic

3. Error Handling

# ✅ GOOD: Proper error handling
class SafeNode(BaseNode):
    def execute(self, state: Dict[str, Any]) -> Command:
        try:
            result = self._process(state)
            return Command(update={"messages": [result]}, goto="next")
        except ValueError as e:
            self._logger.error(f"Validation error: {e}")
            return Command(
                update={"metadata": {"error": str(e)}},
                goto="error_handler"
            )
        except Exception as e:
            self._logger.error(f"Unexpected error: {e}")
            raise NodeExecutionError(f"Node failed: {e}")

4. State Validation

# ✅ GOOD: Validate state structure
class ValidatingNode(BaseNode):
    def execute(self, state: Dict[str, Any]) -> Command:
        # Validate required fields
        required = ["messages", "context"]
        for field in required:
            if field not in state:
                raise ValidationError(f"Missing field: {field}")

        # Proceed with processing
        return Command(update={...}, goto="next")

🚀 Complete Node Example

from azcore.core.base import BaseNode
from langgraph.types import Command
from langchain_core.messages import HumanMessage
from azcore.exceptions import NodeExecutionError, ValidationError
from azcore.utils.retry import retry_with_timeout
import logging

class DataAnalysisNode(BaseNode):
    """Node that analyzes data and routes based on results."""

    def __init__(
        self,
        llm,
        analyzer,
        name: str = "data_analyzer"
    ):
        super().__init__(
            name=name,
            description="Analyzes data and determines next steps"
        )
        self.llm = llm
        self.analyzer = analyzer
        self._logger = logging.getLogger(self.__class__.__name__)

    @retry_with_timeout(max_retries=3, timeout=30.0)
    def execute(self, state: Dict[str, Any]) -> Command:
        """Execute analysis with error handling."""

        try:
            # Validate state
            self._validate_state(state)

            # Extract data
            messages = state["messages"]
            data = messages[-1].content

            # Analyze
            self._logger.info(f"Analyzing data: {data[:50]}...")
            analysis_result = self.analyzer.analyze(data)

            # Determine routing
            if analysis_result["confidence"] > 0.8:
                goto = "high_confidence_processor"
            elif analysis_result["requires_human"]:
                goto = "human_review"
            else:
                goto = "standard_processor"

            # Format result
            result_message = self._format_result(analysis_result)

            return Command(
                update={
                    "messages": [
                        HumanMessage(content=result_message, name=self.name)
                    ],
                    "context": {
                        **state.get("context", {}),
                        "analysis": analysis_result
                    },
                    "metadata": {
                        **state.get("metadata", {}),
                        "analyzed": True,
                        "confidence": analysis_result["confidence"]
                    }
                },
                goto=goto
            )

        except ValidationError:
            raise
        except Exception as e:
            self._logger.error(f"Analysis failed: {e}")
            raise NodeExecutionError(
                f"Data analysis node failed: {str(e)}",
                details={"node": self.name, "error": str(e)}
            )

    def _validate_state(self, state: Dict[str, Any]) -> None:
        """Validate state structure."""
        if not state or "messages" not in state:
            raise ValidationError(
                "Invalid state: missing 'messages'",
                details={"state_keys": list(state.keys()) if state else []}
            )

        if not state["messages"]:
            raise ValidationError("State has no messages")

    def _format_result(self, analysis: Dict[str, Any]) -> str:
        """Format analysis result for output."""
        return f"""Analysis Complete:
        - Confidence: {analysis['confidence']:.2f}
        - Category: {analysis['category']}
        - Recommendation: {analysis['recommendation']}
        """

🎓 Summary

Azcore nodes provide:

  • BaseNode: Abstract foundation for all nodes
  • Built-in Nodes: Coordinator, Planner, Supervisor, Generator
  • Custom Nodes: Easy to create domain-specific nodes
  • Command Pattern: Consistent state updates and routing
  • Error Handling: Built-in retry and error recovery
  • Composability: Nodes combine into complex workflows

Use nodes as modular building blocks to create sophisticated multi-agent workflows with clear separation of concerns and robust error handling.

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs