Build your own workflow patterns tailored to your specific needs. Extend Arc's workflow system with custom logic and orchestration.
Overview
While Arc provides many built-in patterns, you can create custom workflows for specialized use cases. This guide shows how to build custom workflows using LangGraph and Arc's agent system.
When to Build Custom Workflows
- Unique requirements: Built-in patterns don't fit your needs
- Complex orchestration: Sophisticated control flow needed
- Special integrations: Custom systems or protocols
- Domain-specific patterns: Industry-specific workflows
- Experimental patterns: Research new architectures
Basic Custom Workflow
from azcore import Agent
from langgraph.graph import StateGraph, END
from typing import TypedDict
# Define state schema
class WorkflowState(TypedDict):
task: str
messages: list
current_agent: str
result: str
# Create custom workflow
class CustomWorkflow:
def __init__(self, agents):
self.agents = {agent.agent_name: agent for agent in agents}
self.graph = self._build_graph()
def _build_graph(self):
# Create graph
workflow = StateGraph(WorkflowState)
# Add nodes
workflow.add_node("start", self.start_node)
workflow.add_node("process", self.process_node)
workflow.add_node("review", self.review_node)
workflow.add_node("finalize", self.finalize_node)
# Add edges
workflow.add_edge("start", "process")
workflow.add_edge("process", "review")
workflow.add_conditional_edges(
"review",
self.should_revise,
{
"revise": "process",
"approve": "finalize",
}
)
workflow.add_edge("finalize", END)
# Set entry point
workflow.set_entry_point("start")
return workflow.compile()
def start_node(self, state):
return state
def process_node(self, state):
agent = self.agents["processor"]
result = agent.run(state["task"])
state["messages"].append(result)
return state
def review_node(self, state):
agent = self.agents["reviewer"]
review = agent.run(f"Review: {state['messages'][-1]}")
state["messages"].append(review)
return state
def should_revise(self, state):
last_message = state["messages"][-1]
if "needs revision" in last_message.lower():
return "revise"
return "approve"
def finalize_node(self, state):
agent = self.agents["finalizer"]
result = agent.run(f"Finalize: {state['messages']}")
state["result"] = result
return state
def run(self, task):
initial_state = {
"task": task,
"messages": [],
"current_agent": "",
"result": "",
}
final_state = self.graph.invoke(initial_state)
return final_state["result"]
# Usage
processor = Agent(agent_name="processor", system_prompt="Process tasks", llm=llm)
reviewer = Agent(agent_name="reviewer", system_prompt="Review outputs", llm=llm)
finalizer = Agent(agent_name="finalizer", system_prompt="Finalize results", llm=llm)
workflow = CustomWorkflow([processor, reviewer, finalizer])
result = workflow.run("Create a project plan")
Advanced Examples
Adaptive Workflow
Workflow that adapts based on task complexity:
from azcore import Agent
from langgraph.graph import StateGraph, END
class AdaptiveWorkflow:
"""Workflow that adjusts based on task complexity"""
def __init__(self, simple_agent, complex_agent, complexity_analyzer):
self.simple_agent = simple_agent
self.complex_agent = complex_agent
self.analyzer = complexity_analyzer
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(dict)
# Nodes
workflow.add_node("analyze", self.analyze_complexity)
workflow.add_node("simple_path", self.simple_processing)
workflow.add_node("complex_path", self.complex_processing)
# Routing based on complexity
workflow.add_conditional_edges(
"analyze",
self.route_by_complexity,
{
"simple": "simple_path",
"complex": "complex_path",
}
)
workflow.add_edge("simple_path", END)
workflow.add_edge("complex_path", END)
workflow.set_entry_point("analyze")
return workflow.compile()
def analyze_complexity(self, state):
analysis = self.analyzer.run(f"Analyze complexity: {state['task']}")
state["complexity"] = "complex" if "complex" in analysis.lower() else "simple"
return state
def route_by_complexity(self, state):
return state["complexity"]
def simple_processing(self, state):
result = self.simple_agent.run(state["task"])
state["result"] = result
return state
def complex_processing(self, state):
# Multi-step processing for complex tasks
plan = self.complex_agent.run(f"Plan: {state['task']}")
execution = self.complex_agent.run(f"Execute: {plan}")
review = self.complex_agent.run(f"Review: {execution}")
state["result"] = review
return state
def run(self, task):
initial_state = {"task": task, "complexity": "", "result": ""}
final_state = self.graph.invoke(initial_state)
return final_state["result"]
Pipeline with Branching
Workflow with conditional branches:
class BranchingPipeline:
"""Pipeline that branches based on intermediate results"""
def __init__(self, agents):
self.agents = agents
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(dict)
# Main pipeline
workflow.add_node("input_processing", self.process_input)
workflow.add_node("classification", self.classify_task)
# Branch A: Technical path
workflow.add_node("technical_analysis", self.technical_path)
workflow.add_node("technical_implementation", self.implement_technical)
# Branch B: Creative path
workflow.add_node("creative_brainstorm", self.creative_path)
workflow.add_node("creative_refinement", self.refine_creative)
# Convergence
workflow.add_node("synthesis", self.synthesize_results)
# Edges
workflow.add_edge("input_processing", "classification")
workflow.add_conditional_edges(
"classification",
self.route_by_type,
{
"technical": "technical_analysis",
"creative": "creative_brainstorm",
}
)
workflow.add_edge("technical_analysis", "technical_implementation")
workflow.add_edge("technical_implementation", "synthesis")
workflow.add_edge("creative_brainstorm", "creative_refinement")
workflow.add_edge("creative_refinement", "synthesis")
workflow.add_edge("synthesis", END)
workflow.set_entry_point("input_processing")
return workflow.compile()
def process_input(self, state):
agent = self.agents["input_processor"]
processed = agent.run(state["task"])
state["processed_input"] = processed
return state
def classify_task(self, state):
agent = self.agents["classifier"]
classification = agent.run(f"Classify: {state['processed_input']}")
state["task_type"] = "technical" if "technical" in classification.lower() else "creative"
return state
def route_by_type(self, state):
return state["task_type"]
def technical_path(self, state):
agent = self.agents["technical_analyst"]
analysis = agent.run(state["processed_input"])
state["branch_result"] = analysis
return state
def implement_technical(self, state):
agent = self.agents["implementer"]
implementation = agent.run(state["branch_result"])
state["branch_result"] = implementation
return state
def creative_path(self, state):
agent = self.agents["creative_agent"]
ideas = agent.run(state["processed_input"])
state["branch_result"] = ideas
return state
def refine_creative(self, state):
agent = self.agents["refiner"]
refined = agent.run(state["branch_result"])
state["branch_result"] = refined
return state
def synthesize_results(self, state):
agent = self.agents["synthesizer"]
final = agent.run(f"Synthesize: {state['branch_result']}")
state["result"] = final
return state
def run(self, task):
initial_state = {
"task": task,
"processed_input": "",
"task_type": "",
"branch_result": "",
"result": "",
}
final_state = self.graph.invoke(initial_state)
return final_state["result"]
Iterative Refinement Workflow
Workflow with quality-based iteration:
class IterativeRefinementWorkflow:
"""Iterates until quality threshold is met"""
def __init__(self, generator, critic, improver, max_iterations=5):
self.generator = generator
self.critic = critic
self.improver = improver
self.max_iterations = max_iterations
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(dict)
workflow.add_node("generate", self.generate_initial)
workflow.add_node("critique", self.critique_output)
workflow.add_node("improve", self.improve_output)
workflow.add_node("finalize", self.finalize_output)
workflow.add_edge("generate", "critique")
workflow.add_conditional_edges(
"critique",
self.check_quality,
{
"acceptable": "finalize",
"needs_improvement": "improve",
"max_iterations_reached": "finalize",
}
)
workflow.add_edge("improve", "critique")
workflow.add_edge("finalize", END)
workflow.set_entry_point("generate")
return workflow.compile()
def generate_initial(self, state):
output = self.generator.run(state["task"])
state["current_output"] = output
state["iteration"] = 1
state["history"] = [output]
return state
def critique_output(self, state):
critique = self.critic.run(f"Critique: {state['current_output']}")
state["last_critique"] = critique
# Extract quality score (0-10)
# In practice, use more robust extraction
if "score:" in critique.lower():
score_str = critique.lower().split("score:")[1].split()[0]
try:
state["quality_score"] = float(score_str)
except:
state["quality_score"] = 5.0
else:
state["quality_score"] = 5.0
return state
def check_quality(self, state):
if state["quality_score"] >= 8.0:
return "acceptable"
elif state["iteration"] >= self.max_iterations:
return "max_iterations_reached"
else:
return "needs_improvement"
def improve_output(self, state):
improvement_prompt = f"""
Current output: {state['current_output']}
Critique: {state['last_critique']}
Previous attempts: {len(state['history'])}
Improve the output based on the critique.
"""
improved = self.improver.run(improvement_prompt)
state["current_output"] = improved
state["iteration"] += 1
state["history"].append(improved)
return state
def finalize_output(self, state):
state["result"] = state["current_output"]
state["iterations_used"] = state["iteration"]
return state
def run(self, task):
initial_state = {
"task": task,
"current_output": "",
"iteration": 0,
"history": [],
"quality_score": 0.0,
"result": "",
}
final_state = self.graph.invoke(initial_state)
return {
"result": final_state["result"],
"iterations": final_state["iterations_used"],
"history": final_state["history"],
}
Parallel with Synchronization
Parallel execution with sync points:
class ParallelSyncWorkflow:
"""Execute phases in parallel with synchronization points"""
def __init__(self, phase1_agents, phase2_agents, coordinator):
self.phase1_agents = phase1_agents
self.phase2_agents = phase2_agents
self.coordinator = coordinator
self.graph = self._build_graph()
def _build_graph(self):
from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
# Phase 1: Parallel execution
for i, agent in enumerate(self.phase1_agents):
workflow.add_node(f"phase1_{i}", self._make_phase1_node(agent, i))
# Sync point
workflow.add_node("sync1", self.sync_phase1)
# Phase 2: Parallel execution
for i, agent in enumerate(self.phase2_agents):
workflow.add_node(f"phase2_{i}", self._make_phase2_node(agent, i))
# Final sync
workflow.add_node("sync2", self.sync_phase2)
workflow.add_node("finalize", self.finalize)
# Entry point fans out to phase 1
workflow.set_entry_point("sync1") # Will trigger phase 1
# Phase 1 edges
for i in range(len(self.phase1_agents)):
workflow.add_edge("sync1", f"phase1_{i}")
workflow.add_edge(f"phase1_{i}", "sync2") # Wrong - should be another sync
# Simplified: just show concept
workflow.add_edge("sync2", "finalize")
workflow.add_edge("finalize", END)
return workflow.compile()
def _make_phase1_node(self, agent, index):
def node(state):
result = agent.run(state["task"])
if "phase1_results" not in state:
state["phase1_results"] = {}
state["phase1_results"][index] = result
return state
return node
def sync_phase1(self, state):
# Wait for all phase 1 agents
# Coordinator synthesizes
if "phase1_results" in state and len(state["phase1_results"]) == len(self.phase1_agents):
synthesis = self.coordinator.run(f"Synthesize phase 1: {state['phase1_results']}")
state["phase1_synthesis"] = synthesis
return state
def _make_phase2_node(self, agent, index):
def node(state):
prompt = f"Task: {state['task']}\nPhase 1 results: {state['phase1_synthesis']}"
result = agent.run(prompt)
if "phase2_results" not in state:
state["phase2_results"] = {}
state["phase2_results"][index] = result
return state
return node
def sync_phase2(self, state):
if "phase2_results" in state and len(state["phase2_results"]) == len(self.phase2_agents):
synthesis = self.coordinator.run(f"Synthesize phase 2: {state['phase2_results']}")
state["phase2_synthesis"] = synthesis
return state
def finalize(self, state):
final = self.coordinator.run(f"Final: {state['phase2_synthesis']}")
state["result"] = final
return state
def run(self, task):
initial_state = {"task": task}
final_state = self.graph.invoke(initial_state)
return final_state["result"]
Design Patterns
State Management
# Use TypedDict for type safety
from typing import TypedDict, List
class MyWorkflowState(TypedDict):
task: str
messages: List[str]
current_step: str
result: str
metadata: dict
# Access and update state
def my_node(state: MyWorkflowState) -> MyWorkflowState:
state["messages"].append("Processing...")
state["current_step"] = "processing"
return state
Conditional Routing
def route_by_condition(state):
"""Return next node name based on state"""
if state["confidence"] > 0.8:
return "high_confidence_path"
elif state["confidence"] > 0.5:
return "medium_confidence_path"
else:
return "low_confidence_path"
# Add to graph
workflow.add_conditional_edges(
"evaluation",
route_by_condition,
{
"high_confidence_path": "fast_process",
"medium_confidence_path": "standard_process",
"low_confidence_path": "thorough_process",
}
)
Error Handling
def safe_agent_call(agent, task, state):
"""Wrapper for agent calls with error handling"""
try:
result = agent.run(task)
state["error"] = None
return result
except Exception as e:
state["error"] = str(e)
state["failed_agent"] = agent.agent_name
return f"Error: {e}"
def error_handling_node(state):
if state.get("error"):
# Handle error
state["recovery_attempted"] = True
# Try alternative approach
return state
Parallel Execution
from concurrent.futures import ThreadPoolExecutor
def parallel_execution_node(state):
"""Execute multiple agents in parallel"""
with ThreadPoolExecutor(max_workers=len(agents)) as executor:
futures = [
executor.submit(agent.run, state["task"])
for agent in agents
]
results = [f.result() for f in futures]
state["parallel_results"] = results
return state
Best Practices
1. Define Clear State Schema
class WorkflowState(TypedDict):
# Required fields
task: str
result: str
# Optional tracking
current_node: str
iteration: int
errors: List[str]
# Domain-specific
context: dict
intermediate_results: List[str]
2. Modular Node Functions
# Good: Single responsibility
def analyze_node(state):
"""Analyze task complexity"""
analysis = analyzer.run(state["task"])
state["analysis"] = analysis
return state
# Bad: Multiple responsibilities
def analyze_and_execute_and_review(state):
# Too much in one node
pass
3. Clear Edge Conditions
# Good: Explicit conditions
def should_continue(state):
if state["iteration"] >= max_iterations:
return "stop"
elif state["quality_score"] >= threshold:
return "stop"
else:
return "continue"
# Bad: Implicit logic
def check(state):
return state["done"] # Unclear what "done" means
4. Logging and Observability
def instrumented_node(state):
logger.info(f"Entering node: {state['current_node']}")
start_time = time.time()
# Node logic
result = process(state)
duration = time.time() - start_time
logger.info(f"Node completed in {duration:.2f}s")
return result
5. Testing Custom Workflows
def test_workflow():
workflow = CustomWorkflow(agents)
# Test happy path
result = workflow.run("Simple task")
assert "result" in result
# Test error handling
result = workflow.run("Invalid task")
assert "error" not in result or result["error"] is None
# Test edge cases
result = workflow.run("")
# ...
Integration with Arc Agents
Use Arc agents in custom workflows:
from azcore import Agent
# Create Arc agents
agent1 = Agent(
agent_name="Agent1",
system_prompt="...",
llm=llm,
tools=[...],
)
# Use in custom workflow nodes
def custom_node(state):
result = agent1.run(state["task"])
state["result"] = result
return state
Performance Optimization
Caching
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_agent_call(task_hash):
return agent.run(task)
def optimized_node(state):
task_hash = hash(state["task"])
result = cached_agent_call(task_hash)
state["result"] = result
return state
Lazy Evaluation
def lazy_node(state):
# Only execute if needed
if state.get("skip_analysis"):
return state
result = expensive_analysis(state["task"])
state["analysis"] = result
return state
Debugging Custom Workflows
def debug_workflow(workflow, task):
# Enable verbose logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Trace execution
state = {"task": task, "trace": []}
def traced_node(original_node):
def wrapper(state):
state["trace"].append(original_node.__name__)
return original_node(state)
return wrapper
# Run workflow
result = workflow.run(task)
# Print trace
print("Execution trace:", result.get("trace", []))
return result