• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Workflow Patterns

Custom Workflows

Create custom workflow patterns tailored to your specific use cases and requirements.

Build your own workflow patterns tailored to your specific needs. Extend Arc's workflow system with custom logic and orchestration.

Overview

While Arc provides many built-in patterns, you can create custom workflows for specialized use cases. This guide shows how to build custom workflows using LangGraph and Arc's agent system.

When to Build Custom Workflows

  • Unique requirements: Built-in patterns don't fit your needs
  • Complex orchestration: Sophisticated control flow needed
  • Special integrations: Custom systems or protocols
  • Domain-specific patterns: Industry-specific workflows
  • Experimental patterns: Research new architectures

Basic Custom Workflow

from azcore import Agent
from langgraph.graph import StateGraph, END
from typing import TypedDict

# Define state schema
class WorkflowState(TypedDict):
    task: str
    messages: list
    current_agent: str
    result: str

# Create custom workflow
class CustomWorkflow:
    def __init__(self, agents):
        self.agents = {agent.agent_name: agent for agent in agents}
        self.graph = self._build_graph()

    def _build_graph(self):
        # Create graph
        workflow = StateGraph(WorkflowState)

        # Add nodes
        workflow.add_node("start", self.start_node)
        workflow.add_node("process", self.process_node)
        workflow.add_node("review", self.review_node)
        workflow.add_node("finalize", self.finalize_node)

        # Add edges
        workflow.add_edge("start", "process")
        workflow.add_edge("process", "review")
        workflow.add_conditional_edges(
            "review",
            self.should_revise,
            {
                "revise": "process",
                "approve": "finalize",
            }
        )
        workflow.add_edge("finalize", END)

        # Set entry point
        workflow.set_entry_point("start")

        return workflow.compile()

    def start_node(self, state):
        return state

    def process_node(self, state):
        agent = self.agents["processor"]
        result = agent.run(state["task"])
        state["messages"].append(result)
        return state

    def review_node(self, state):
        agent = self.agents["reviewer"]
        review = agent.run(f"Review: {state['messages'][-1]}")
        state["messages"].append(review)
        return state

    def should_revise(self, state):
        last_message = state["messages"][-1]
        if "needs revision" in last_message.lower():
            return "revise"
        return "approve"

    def finalize_node(self, state):
        agent = self.agents["finalizer"]
        result = agent.run(f"Finalize: {state['messages']}")
        state["result"] = result
        return state

    def run(self, task):
        initial_state = {
            "task": task,
            "messages": [],
            "current_agent": "",
            "result": "",
        }
        final_state = self.graph.invoke(initial_state)
        return final_state["result"]

# Usage
processor = Agent(agent_name="processor", system_prompt="Process tasks", llm=llm)
reviewer = Agent(agent_name="reviewer", system_prompt="Review outputs", llm=llm)
finalizer = Agent(agent_name="finalizer", system_prompt="Finalize results", llm=llm)

workflow = CustomWorkflow([processor, reviewer, finalizer])
result = workflow.run("Create a project plan")

Advanced Examples

Adaptive Workflow

Workflow that adapts based on task complexity:

from azcore import Agent
from langgraph.graph import StateGraph, END

class AdaptiveWorkflow:
    """Workflow that adjusts based on task complexity"""

    def __init__(self, simple_agent, complex_agent, complexity_analyzer):
        self.simple_agent = simple_agent
        self.complex_agent = complex_agent
        self.analyzer = complexity_analyzer
        self.graph = self._build_graph()

    def _build_graph(self):
        workflow = StateGraph(dict)

        # Nodes
        workflow.add_node("analyze", self.analyze_complexity)
        workflow.add_node("simple_path", self.simple_processing)
        workflow.add_node("complex_path", self.complex_processing)

        # Routing based on complexity
        workflow.add_conditional_edges(
            "analyze",
            self.route_by_complexity,
            {
                "simple": "simple_path",
                "complex": "complex_path",
            }
        )

        workflow.add_edge("simple_path", END)
        workflow.add_edge("complex_path", END)

        workflow.set_entry_point("analyze")
        return workflow.compile()

    def analyze_complexity(self, state):
        analysis = self.analyzer.run(f"Analyze complexity: {state['task']}")
        state["complexity"] = "complex" if "complex" in analysis.lower() else "simple"
        return state

    def route_by_complexity(self, state):
        return state["complexity"]

    def simple_processing(self, state):
        result = self.simple_agent.run(state["task"])
        state["result"] = result
        return state

    def complex_processing(self, state):
        # Multi-step processing for complex tasks
        plan = self.complex_agent.run(f"Plan: {state['task']}")
        execution = self.complex_agent.run(f"Execute: {plan}")
        review = self.complex_agent.run(f"Review: {execution}")
        state["result"] = review
        return state

    def run(self, task):
        initial_state = {"task": task, "complexity": "", "result": ""}
        final_state = self.graph.invoke(initial_state)
        return final_state["result"]

Pipeline with Branching

Workflow with conditional branches:

class BranchingPipeline:
    """Pipeline that branches based on intermediate results"""

    def __init__(self, agents):
        self.agents = agents
        self.graph = self._build_graph()

    def _build_graph(self):
        workflow = StateGraph(dict)

        # Main pipeline
        workflow.add_node("input_processing", self.process_input)
        workflow.add_node("classification", self.classify_task)

        # Branch A: Technical path
        workflow.add_node("technical_analysis", self.technical_path)
        workflow.add_node("technical_implementation", self.implement_technical)

        # Branch B: Creative path
        workflow.add_node("creative_brainstorm", self.creative_path)
        workflow.add_node("creative_refinement", self.refine_creative)

        # Convergence
        workflow.add_node("synthesis", self.synthesize_results)

        # Edges
        workflow.add_edge("input_processing", "classification")

        workflow.add_conditional_edges(
            "classification",
            self.route_by_type,
            {
                "technical": "technical_analysis",
                "creative": "creative_brainstorm",
            }
        )

        workflow.add_edge("technical_analysis", "technical_implementation")
        workflow.add_edge("technical_implementation", "synthesis")

        workflow.add_edge("creative_brainstorm", "creative_refinement")
        workflow.add_edge("creative_refinement", "synthesis")

        workflow.add_edge("synthesis", END)

        workflow.set_entry_point("input_processing")
        return workflow.compile()

    def process_input(self, state):
        agent = self.agents["input_processor"]
        processed = agent.run(state["task"])
        state["processed_input"] = processed
        return state

    def classify_task(self, state):
        agent = self.agents["classifier"]
        classification = agent.run(f"Classify: {state['processed_input']}")
        state["task_type"] = "technical" if "technical" in classification.lower() else "creative"
        return state

    def route_by_type(self, state):
        return state["task_type"]

    def technical_path(self, state):
        agent = self.agents["technical_analyst"]
        analysis = agent.run(state["processed_input"])
        state["branch_result"] = analysis
        return state

    def implement_technical(self, state):
        agent = self.agents["implementer"]
        implementation = agent.run(state["branch_result"])
        state["branch_result"] = implementation
        return state

    def creative_path(self, state):
        agent = self.agents["creative_agent"]
        ideas = agent.run(state["processed_input"])
        state["branch_result"] = ideas
        return state

    def refine_creative(self, state):
        agent = self.agents["refiner"]
        refined = agent.run(state["branch_result"])
        state["branch_result"] = refined
        return state

    def synthesize_results(self, state):
        agent = self.agents["synthesizer"]
        final = agent.run(f"Synthesize: {state['branch_result']}")
        state["result"] = final
        return state

    def run(self, task):
        initial_state = {
            "task": task,
            "processed_input": "",
            "task_type": "",
            "branch_result": "",
            "result": "",
        }
        final_state = self.graph.invoke(initial_state)
        return final_state["result"]

Iterative Refinement Workflow

Workflow with quality-based iteration:

class IterativeRefinementWorkflow:
    """Iterates until quality threshold is met"""

    def __init__(self, generator, critic, improver, max_iterations=5):
        self.generator = generator
        self.critic = critic
        self.improver = improver
        self.max_iterations = max_iterations
        self.graph = self._build_graph()

    def _build_graph(self):
        workflow = StateGraph(dict)

        workflow.add_node("generate", self.generate_initial)
        workflow.add_node("critique", self.critique_output)
        workflow.add_node("improve", self.improve_output)
        workflow.add_node("finalize", self.finalize_output)

        workflow.add_edge("generate", "critique")

        workflow.add_conditional_edges(
            "critique",
            self.check_quality,
            {
                "acceptable": "finalize",
                "needs_improvement": "improve",
                "max_iterations_reached": "finalize",
            }
        )

        workflow.add_edge("improve", "critique")
        workflow.add_edge("finalize", END)

        workflow.set_entry_point("generate")
        return workflow.compile()

    def generate_initial(self, state):
        output = self.generator.run(state["task"])
        state["current_output"] = output
        state["iteration"] = 1
        state["history"] = [output]
        return state

    def critique_output(self, state):
        critique = self.critic.run(f"Critique: {state['current_output']}")
        state["last_critique"] = critique

        # Extract quality score (0-10)
        # In practice, use more robust extraction
        if "score:" in critique.lower():
            score_str = critique.lower().split("score:")[1].split()[0]
            try:
                state["quality_score"] = float(score_str)
            except:
                state["quality_score"] = 5.0
        else:
            state["quality_score"] = 5.0

        return state

    def check_quality(self, state):
        if state["quality_score"] >= 8.0:
            return "acceptable"
        elif state["iteration"] >= self.max_iterations:
            return "max_iterations_reached"
        else:
            return "needs_improvement"

    def improve_output(self, state):
        improvement_prompt = f"""
        Current output: {state['current_output']}
        Critique: {state['last_critique']}
        Previous attempts: {len(state['history'])}

        Improve the output based on the critique.
        """
        improved = self.improver.run(improvement_prompt)
        state["current_output"] = improved
        state["iteration"] += 1
        state["history"].append(improved)
        return state

    def finalize_output(self, state):
        state["result"] = state["current_output"]
        state["iterations_used"] = state["iteration"]
        return state

    def run(self, task):
        initial_state = {
            "task": task,
            "current_output": "",
            "iteration": 0,
            "history": [],
            "quality_score": 0.0,
            "result": "",
        }
        final_state = self.graph.invoke(initial_state)
        return {
            "result": final_state["result"],
            "iterations": final_state["iterations_used"],
            "history": final_state["history"],
        }

Parallel with Synchronization

Parallel execution with sync points:

class ParallelSyncWorkflow:
    """Execute phases in parallel with synchronization points"""

    def __init__(self, phase1_agents, phase2_agents, coordinator):
        self.phase1_agents = phase1_agents
        self.phase2_agents = phase2_agents
        self.coordinator = coordinator
        self.graph = self._build_graph()

    def _build_graph(self):
        from langgraph.graph import StateGraph, END

        workflow = StateGraph(dict)

        # Phase 1: Parallel execution
        for i, agent in enumerate(self.phase1_agents):
            workflow.add_node(f"phase1_{i}", self._make_phase1_node(agent, i))

        # Sync point
        workflow.add_node("sync1", self.sync_phase1)

        # Phase 2: Parallel execution
        for i, agent in enumerate(self.phase2_agents):
            workflow.add_node(f"phase2_{i}", self._make_phase2_node(agent, i))

        # Final sync
        workflow.add_node("sync2", self.sync_phase2)
        workflow.add_node("finalize", self.finalize)

        # Entry point fans out to phase 1
        workflow.set_entry_point("sync1")  # Will trigger phase 1

        # Phase 1 edges
        for i in range(len(self.phase1_agents)):
            workflow.add_edge("sync1", f"phase1_{i}")
            workflow.add_edge(f"phase1_{i}", "sync2")  # Wrong - should be another sync

        # Simplified: just show concept
        workflow.add_edge("sync2", "finalize")
        workflow.add_edge("finalize", END)

        return workflow.compile()

    def _make_phase1_node(self, agent, index):
        def node(state):
            result = agent.run(state["task"])
            if "phase1_results" not in state:
                state["phase1_results"] = {}
            state["phase1_results"][index] = result
            return state
        return node

    def sync_phase1(self, state):
        # Wait for all phase 1 agents
        # Coordinator synthesizes
        if "phase1_results" in state and len(state["phase1_results"]) == len(self.phase1_agents):
            synthesis = self.coordinator.run(f"Synthesize phase 1: {state['phase1_results']}")
            state["phase1_synthesis"] = synthesis
        return state

    def _make_phase2_node(self, agent, index):
        def node(state):
            prompt = f"Task: {state['task']}\nPhase 1 results: {state['phase1_synthesis']}"
            result = agent.run(prompt)
            if "phase2_results" not in state:
                state["phase2_results"] = {}
            state["phase2_results"][index] = result
            return state
        return node

    def sync_phase2(self, state):
        if "phase2_results" in state and len(state["phase2_results"]) == len(self.phase2_agents):
            synthesis = self.coordinator.run(f"Synthesize phase 2: {state['phase2_results']}")
            state["phase2_synthesis"] = synthesis
        return state

    def finalize(self, state):
        final = self.coordinator.run(f"Final: {state['phase2_synthesis']}")
        state["result"] = final
        return state

    def run(self, task):
        initial_state = {"task": task}
        final_state = self.graph.invoke(initial_state)
        return final_state["result"]

Design Patterns

State Management

# Use TypedDict for type safety
from typing import TypedDict, List

class MyWorkflowState(TypedDict):
    task: str
    messages: List[str]
    current_step: str
    result: str
    metadata: dict

# Access and update state
def my_node(state: MyWorkflowState) -> MyWorkflowState:
    state["messages"].append("Processing...")
    state["current_step"] = "processing"
    return state

Conditional Routing

def route_by_condition(state):
    """Return next node name based on state"""
    if state["confidence"] > 0.8:
        return "high_confidence_path"
    elif state["confidence"] > 0.5:
        return "medium_confidence_path"
    else:
        return "low_confidence_path"

# Add to graph
workflow.add_conditional_edges(
    "evaluation",
    route_by_condition,
    {
        "high_confidence_path": "fast_process",
        "medium_confidence_path": "standard_process",
        "low_confidence_path": "thorough_process",
    }
)

Error Handling

def safe_agent_call(agent, task, state):
    """Wrapper for agent calls with error handling"""
    try:
        result = agent.run(task)
        state["error"] = None
        return result
    except Exception as e:
        state["error"] = str(e)
        state["failed_agent"] = agent.agent_name
        return f"Error: {e}"

def error_handling_node(state):
    if state.get("error"):
        # Handle error
        state["recovery_attempted"] = True
        # Try alternative approach
    return state

Parallel Execution

from concurrent.futures import ThreadPoolExecutor

def parallel_execution_node(state):
    """Execute multiple agents in parallel"""
    with ThreadPoolExecutor(max_workers=len(agents)) as executor:
        futures = [
            executor.submit(agent.run, state["task"])
            for agent in agents
        ]
        results = [f.result() for f in futures]

    state["parallel_results"] = results
    return state

Best Practices

1. Define Clear State Schema

class WorkflowState(TypedDict):
    # Required fields
    task: str
    result: str

    # Optional tracking
    current_node: str
    iteration: int
    errors: List[str]

    # Domain-specific
    context: dict
    intermediate_results: List[str]

2. Modular Node Functions

# Good: Single responsibility
def analyze_node(state):
    """Analyze task complexity"""
    analysis = analyzer.run(state["task"])
    state["analysis"] = analysis
    return state

# Bad: Multiple responsibilities
def analyze_and_execute_and_review(state):
    # Too much in one node
    pass

3. Clear Edge Conditions

# Good: Explicit conditions
def should_continue(state):
    if state["iteration"] >= max_iterations:
        return "stop"
    elif state["quality_score"] >= threshold:
        return "stop"
    else:
        return "continue"

# Bad: Implicit logic
def check(state):
    return state["done"]  # Unclear what "done" means

4. Logging and Observability

def instrumented_node(state):
    logger.info(f"Entering node: {state['current_node']}")
    start_time = time.time()

    # Node logic
    result = process(state)

    duration = time.time() - start_time
    logger.info(f"Node completed in {duration:.2f}s")

    return result

5. Testing Custom Workflows

def test_workflow():
    workflow = CustomWorkflow(agents)

    # Test happy path
    result = workflow.run("Simple task")
    assert "result" in result

    # Test error handling
    result = workflow.run("Invalid task")
    assert "error" not in result or result["error"] is None

    # Test edge cases
    result = workflow.run("")
    # ...

Integration with Arc Agents

Use Arc agents in custom workflows:

from azcore import Agent

# Create Arc agents
agent1 = Agent(
    agent_name="Agent1",
    system_prompt="...",
    llm=llm,
    tools=[...],
)

# Use in custom workflow nodes
def custom_node(state):
    result = agent1.run(state["task"])
    state["result"] = result
    return state

Performance Optimization

Caching

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_agent_call(task_hash):
    return agent.run(task)

def optimized_node(state):
    task_hash = hash(state["task"])
    result = cached_agent_call(task_hash)
    state["result"] = result
    return state

Lazy Evaluation

def lazy_node(state):
    # Only execute if needed
    if state.get("skip_analysis"):
        return state

    result = expensive_analysis(state["task"])
    state["analysis"] = result
    return state

Debugging Custom Workflows

def debug_workflow(workflow, task):
    # Enable verbose logging
    import logging
    logging.basicConfig(level=logging.DEBUG)

    # Trace execution
    state = {"task": task, "trace": []}

    def traced_node(original_node):
        def wrapper(state):
            state["trace"].append(original_node.__name__)
            return original_node(state)
        return wrapper

    # Run workflow
    result = workflow.run(task)

    # Print trace
    print("Execution trace:", result.get("trace", []))
    return result
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs