• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Core Concepts

State Management

Understanding state management and data flow in Azcore.

Azcore implements a robust state management system built on LangGraph's MessagesState, extended with additional fields for workflow coordination, planning, and reinforcement learning.

🔄 Core State Architecture

State Class

The State class is the foundation of data flow in Azcore, extending LangGraph's MessagesState with additional capabilities:

from azcore.core.state import State

class State(MessagesState):
    """
    Enhanced state class for the Azcore.

    Attributes:
        messages: List of messages in the conversation
        next: Next node to visit in the graph
        full_plan: Complete execution plan from planner
        context: Additional context data
        metadata: Metadata for tracking and logging
        rl_metadata: RL-specific metadata (state keys, selected tools, rewards)
    """

    next: str = ""
    full_plan: str = ""
    context: Dict[str, Any] = {}
    metadata: Dict[str, Any] = {}
    rl_metadata: Dict[str, Any] = {}

State Fields

FieldTypePurpose
messagesList[AnyMessage]Conversation history and agent communications
nextstrNext node to execute in the workflow
full_planstrExecution plan from the planner (typically JSON)
contextDict[str, Any]Custom context data for teams and agents
metadataDict[str, Any]Tracking and logging metadata
rl_metadataDict[str, Any]Reinforcement learning state keys, tool selections, rewards

🛠️ StateManager

The StateManager class provides utilities for state manipulation, validation, and tracking:

from azcore.core.state import StateManager

# Initialize manager
state_manager = StateManager()

# Create initial state
state = state_manager.create_initial_state(
    messages=[HumanMessage(content="Hello")],
    context={"user_id": "123"},
    metadata={"session_id": "abc"}
)

StateManager Methods

Creating and Updating State

# Create initial state
state = state_manager.create_initial_state(
    messages=[HumanMessage(content="User query")],
    context={"key": "value"},
    metadata={"timestamp": "2024-01-01"}
)

# Update state
updated_state = state_manager.update_state(
    state,
    {"next": "planner", "metadata": {"step": "planning"}}
)

Context Management

# Add context value
state = state_manager.add_context(state, "user_role", "admin")

# Get context value
user_role = state_manager.get_context(state, "user_role", default="user")

Metadata Management

# Add metadata
state = state_manager.add_metadata(state, "execution_time", 1.5)

# For RL metadata
state = state_manager.add_rl_metadata(state, "state_key", "query_embedding_123")

# Get RL metadata
state_key = state_manager.get_rl_metadata(state, "state_key")

State Validation

# Validate state structure
is_valid = state_manager.validate_state(state)

if not is_valid:
    # Handle invalid state
    state = state_manager.clear_state(state)

Message History

# Get all messages
messages = state_manager.get_message_history(state)

# Get last message only
last_msg = state_manager.get_last_message(state)

📊 State Flow Through the System

Hierarchical Workflow State Flow

1. User Query → Initial State Created
   state = {
       "messages": [HumanMessage(content="query")],
       "next": "",
       "context": {},
       "metadata": {},
       "rl_metadata": {}
   }

2. Coordinator → Updates State
   state["messages"].append(AIMessage(...))
   state["next"] = "planner"

3. Planner → Adds Plan
   state["full_plan"] = json_plan
   state["next"] = "supervisor"

4. Supervisor → Routes to Teams
   state["next"] = "team_name"

5. Teams → Add Results
   state["messages"].append(team_response)

6. Generator → Formats Final Response
   state["messages"].append(final_response)

State Updates via Command

Nodes update state using LangGraph's Command object:

from langgraph.types import Command
from langchain_core.messages import HumanMessage

def my_node(state: State) -> Command:
    """Node that updates state."""

    # Process state...
    result = process(state)

    # Return Command with updates
    return Command(
        update={
            "messages": [HumanMessage(content=result, name="my_node")],
            "metadata": {"processed": True}
        },
        goto="next_node"
    )

🎯 State Best Practices

1. Validate State at Boundaries

from azcore.exceptions import ValidationError

def safe_node(state: State) -> Command:
    """Node with state validation."""

    # Validate required fields
    if not state or "messages" not in state:
        raise ValidationError(
            "Invalid state: missing 'messages' key",
            details={"state_keys": list(state.keys())}
        )

    # Process...
    return Command(update={"messages": [...]}, goto="next")

2. Use Context for Shared Data

# Store shared configuration in context
state["context"]["max_iterations"] = 5
state["context"]["user_preferences"] = {"theme": "dark"}

# Access in agents/teams
max_iter = state.get("context", {}).get("max_iterations", 10)

3. Track Execution with Metadata

# Track execution metadata
state["metadata"]["start_time"] = time.time()
state["metadata"]["nodes_executed"] = ["coordinator", "planner"]
state["metadata"]["current_step"] = "planning"

4. RL Metadata for Learning

# Store RL-specific information
state["rl_metadata"]["state_key"] = embedding_key
state["rl_metadata"]["selected_tools"] = ["tool1", "tool2"]
state["rl_metadata"]["query"] = original_query

🔍 State Debugging

Inspecting State

# Print state structure
print(f"Messages: {len(state['messages'])}")
print(f"Next: {state.get('next', 'None')}")
print(f"Context: {state.get('context', {})}")
print(f"Metadata: {state.get('metadata', {})}")

# Get last message content
if state["messages"]:
    last_msg = state["messages"][-1]
    print(f"Last message: {last_msg.content[:100]}...")

State Validation Errors

from azcore.core.state import StateManager
from azcore.exceptions import StateError

state_manager = StateManager()

try:
    # Validate state
    if not state_manager.validate_state(state):
        raise StateError("State validation failed")
except StateError as e:
    print(f"State error: {e.message}")
    print(f"Details: {e.details}")

📈 Advanced State Patterns

State Accumulation

def accumulator_node(state: State) -> Command:
    """Node that accumulates results."""

    # Get existing results from context
    results = state.get("context", {}).get("results", [])

    # Add new result
    new_result = {"team": "security", "status": "complete"}
    results.append(new_result)

    # Update state
    return Command(
        update={"context": {**state["context"], "results": results}},
        goto="next_node"
    )

Conditional State Updates

def conditional_node(state: State) -> Command:
    """Node with conditional updates."""

    messages = state["messages"]
    last_message = messages[-1].content

    # Decide based on state
    if "error" in last_message.lower():
        goto = "error_handler"
        update = {"metadata": {"error": True}}
    else:
        goto = "next_node"
        update = {"metadata": {"success": True}}

    return Command(update=update, goto=goto)

State Checkpointing

from langgraph.checkpoint.memory import MemorySaver

# Enable state persistence
checkpointer = MemorySaver()

# Build graph with checkpointing
graph = graph_builder.compile(checkpointer=checkpointer)

# Invoke with thread ID for persistence
result = graph.invoke(
    initial_state,
    config={"configurable": {"thread_id": "conversation_123"}}
)

# Resume from checkpoint
continued = graph.invoke(
    {"messages": [HumanMessage(content="Continue...")]},
    config={"configurable": {"thread_id": "conversation_123"}}
)

🚨 Common State Issues

Issue 1: State Mutation

# ❌ BAD: Direct mutation
def bad_node(state: State) -> Command:
    state["messages"].append(new_msg)  # Mutates original
    return Command(update={}, goto="next")

# ✅ GOOD: Return updates
def good_node(state: State) -> Command:
    return Command(
        update={"messages": [new_msg]},  # Proper update
        goto="next"
    )

Issue 2: Missing State Fields

# ❌ BAD: Assume field exists
def bad_node(state: State) -> Command:
    user_id = state["context"]["user_id"]  # May not exist!

# ✅ GOOD: Safe access with defaults
def good_node(state: State) -> Command:
    user_id = state.get("context", {}).get("user_id", "default")

Issue 3: State Size Growth

# ❌ BAD: Unbounded accumulation
def bad_node(state: State) -> Command:
    # Messages grow indefinitely
    return Command(update={"messages": [...]}, goto="next")

# ✅ GOOD: Limit state size
def good_node(state: State) -> Command:
    messages = state["messages"]
    # Keep only last N messages
    if len(messages) > 50:
        messages = messages[-50:]

    return Command(
        update={"messages": [...], "context": {"trimmed": True}},
        goto="next"
    )

🎓 Summary

Azcore's state management system provides:

  • Structured State: Extended MessagesState with workflow fields
  • StateManager: Utilities for safe state manipulation
  • Context & Metadata: Flexible data storage for custom needs
  • RL Integration: Built-in support for reinforcement learning metadata
  • Validation: Tools to ensure state integrity
  • Checkpointing: Conversation persistence and resumption

Use the StateManager for safe state operations, leverage context for shared data, and always validate state at node boundaries for robust workflows.

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs