Azcore implements a robust state management system built on LangGraph's MessagesState, extended with additional fields for workflow coordination, planning, and reinforcement learning.
🔄 Core State Architecture
State Class
The State class is the foundation of data flow in Azcore, extending LangGraph's MessagesState with additional capabilities:
from azcore.core.state import State
class State(MessagesState):
"""
Enhanced state class for the Azcore.
Attributes:
messages: List of messages in the conversation
next: Next node to visit in the graph
full_plan: Complete execution plan from planner
context: Additional context data
metadata: Metadata for tracking and logging
rl_metadata: RL-specific metadata (state keys, selected tools, rewards)
"""
next: str = ""
full_plan: str = ""
context: Dict[str, Any] = {}
metadata: Dict[str, Any] = {}
rl_metadata: Dict[str, Any] = {}
State Fields
| Field | Type | Purpose |
|---|---|---|
messages | List[AnyMessage] | Conversation history and agent communications |
next | str | Next node to execute in the workflow |
full_plan | str | Execution plan from the planner (typically JSON) |
context | Dict[str, Any] | Custom context data for teams and agents |
metadata | Dict[str, Any] | Tracking and logging metadata |
rl_metadata | Dict[str, Any] | Reinforcement learning state keys, tool selections, rewards |
🛠️ StateManager
The StateManager class provides utilities for state manipulation, validation, and tracking:
from azcore.core.state import StateManager
# Initialize manager
state_manager = StateManager()
# Create initial state
state = state_manager.create_initial_state(
messages=[HumanMessage(content="Hello")],
context={"user_id": "123"},
metadata={"session_id": "abc"}
)
StateManager Methods
Creating and Updating State
# Create initial state
state = state_manager.create_initial_state(
messages=[HumanMessage(content="User query")],
context={"key": "value"},
metadata={"timestamp": "2024-01-01"}
)
# Update state
updated_state = state_manager.update_state(
state,
{"next": "planner", "metadata": {"step": "planning"}}
)
Context Management
# Add context value
state = state_manager.add_context(state, "user_role", "admin")
# Get context value
user_role = state_manager.get_context(state, "user_role", default="user")
Metadata Management
# Add metadata
state = state_manager.add_metadata(state, "execution_time", 1.5)
# For RL metadata
state = state_manager.add_rl_metadata(state, "state_key", "query_embedding_123")
# Get RL metadata
state_key = state_manager.get_rl_metadata(state, "state_key")
State Validation
# Validate state structure
is_valid = state_manager.validate_state(state)
if not is_valid:
# Handle invalid state
state = state_manager.clear_state(state)
Message History
# Get all messages
messages = state_manager.get_message_history(state)
# Get last message only
last_msg = state_manager.get_last_message(state)
📊 State Flow Through the System
Hierarchical Workflow State Flow
1. User Query → Initial State Created
state = {
"messages": [HumanMessage(content="query")],
"next": "",
"context": {},
"metadata": {},
"rl_metadata": {}
}
2. Coordinator → Updates State
state["messages"].append(AIMessage(...))
state["next"] = "planner"
3. Planner → Adds Plan
state["full_plan"] = json_plan
state["next"] = "supervisor"
4. Supervisor → Routes to Teams
state["next"] = "team_name"
5. Teams → Add Results
state["messages"].append(team_response)
6. Generator → Formats Final Response
state["messages"].append(final_response)
State Updates via Command
Nodes update state using LangGraph's Command object:
from langgraph.types import Command
from langchain_core.messages import HumanMessage
def my_node(state: State) -> Command:
"""Node that updates state."""
# Process state...
result = process(state)
# Return Command with updates
return Command(
update={
"messages": [HumanMessage(content=result, name="my_node")],
"metadata": {"processed": True}
},
goto="next_node"
)
🎯 State Best Practices
1. Validate State at Boundaries
from azcore.exceptions import ValidationError
def safe_node(state: State) -> Command:
"""Node with state validation."""
# Validate required fields
if not state or "messages" not in state:
raise ValidationError(
"Invalid state: missing 'messages' key",
details={"state_keys": list(state.keys())}
)
# Process...
return Command(update={"messages": [...]}, goto="next")
2. Use Context for Shared Data
# Store shared configuration in context
state["context"]["max_iterations"] = 5
state["context"]["user_preferences"] = {"theme": "dark"}
# Access in agents/teams
max_iter = state.get("context", {}).get("max_iterations", 10)
3. Track Execution with Metadata
# Track execution metadata
state["metadata"]["start_time"] = time.time()
state["metadata"]["nodes_executed"] = ["coordinator", "planner"]
state["metadata"]["current_step"] = "planning"
4. RL Metadata for Learning
# Store RL-specific information
state["rl_metadata"]["state_key"] = embedding_key
state["rl_metadata"]["selected_tools"] = ["tool1", "tool2"]
state["rl_metadata"]["query"] = original_query
🔍 State Debugging
Inspecting State
# Print state structure
print(f"Messages: {len(state['messages'])}")
print(f"Next: {state.get('next', 'None')}")
print(f"Context: {state.get('context', {})}")
print(f"Metadata: {state.get('metadata', {})}")
# Get last message content
if state["messages"]:
last_msg = state["messages"][-1]
print(f"Last message: {last_msg.content[:100]}...")
State Validation Errors
from azcore.core.state import StateManager
from azcore.exceptions import StateError
state_manager = StateManager()
try:
# Validate state
if not state_manager.validate_state(state):
raise StateError("State validation failed")
except StateError as e:
print(f"State error: {e.message}")
print(f"Details: {e.details}")
📈 Advanced State Patterns
State Accumulation
def accumulator_node(state: State) -> Command:
"""Node that accumulates results."""
# Get existing results from context
results = state.get("context", {}).get("results", [])
# Add new result
new_result = {"team": "security", "status": "complete"}
results.append(new_result)
# Update state
return Command(
update={"context": {**state["context"], "results": results}},
goto="next_node"
)
Conditional State Updates
def conditional_node(state: State) -> Command:
"""Node with conditional updates."""
messages = state["messages"]
last_message = messages[-1].content
# Decide based on state
if "error" in last_message.lower():
goto = "error_handler"
update = {"metadata": {"error": True}}
else:
goto = "next_node"
update = {"metadata": {"success": True}}
return Command(update=update, goto=goto)
State Checkpointing
from langgraph.checkpoint.memory import MemorySaver
# Enable state persistence
checkpointer = MemorySaver()
# Build graph with checkpointing
graph = graph_builder.compile(checkpointer=checkpointer)
# Invoke with thread ID for persistence
result = graph.invoke(
initial_state,
config={"configurable": {"thread_id": "conversation_123"}}
)
# Resume from checkpoint
continued = graph.invoke(
{"messages": [HumanMessage(content="Continue...")]},
config={"configurable": {"thread_id": "conversation_123"}}
)
🚨 Common State Issues
Issue 1: State Mutation
# ❌ BAD: Direct mutation
def bad_node(state: State) -> Command:
state["messages"].append(new_msg) # Mutates original
return Command(update={}, goto="next")
# ✅ GOOD: Return updates
def good_node(state: State) -> Command:
return Command(
update={"messages": [new_msg]}, # Proper update
goto="next"
)
Issue 2: Missing State Fields
# ❌ BAD: Assume field exists
def bad_node(state: State) -> Command:
user_id = state["context"]["user_id"] # May not exist!
# ✅ GOOD: Safe access with defaults
def good_node(state: State) -> Command:
user_id = state.get("context", {}).get("user_id", "default")
Issue 3: State Size Growth
# ❌ BAD: Unbounded accumulation
def bad_node(state: State) -> Command:
# Messages grow indefinitely
return Command(update={"messages": [...]}, goto="next")
# ✅ GOOD: Limit state size
def good_node(state: State) -> Command:
messages = state["messages"]
# Keep only last N messages
if len(messages) > 50:
messages = messages[-50:]
return Command(
update={"messages": [...], "context": {"trimmed": True}},
goto="next"
)
🎓 Summary
Azcore's state management system provides:
- Structured State: Extended MessagesState with workflow fields
- StateManager: Utilities for safe state manipulation
- Context & Metadata: Flexible data storage for custom needs
- RL Integration: Built-in support for reinforcement learning metadata
- Validation: Tools to ensure state integrity
- Checkpointing: Conversation persistence and resumption
Use the StateManager for safe state operations, leverage context for shared data, and always validate state at node boundaries for robust workflows.