• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Workflow Patterns

Concurrent Pattern

Execute multiple agents in parallel for faster task completion with the Concurrent workflow pattern.

Execute multiple agents in parallel and aggregate their results. Ideal for independent tasks that can run simultaneously.

Overview

Concurrent workflow runs agents in parallel, reducing overall execution time. Each agent works independently, and their outputs are combined at the end.

When to Use

  • Independent tasks: Agents don't need each other's outputs
  • Speed critical: Minimize total execution time
  • Diverse perspectives: Gather multiple viewpoints simultaneously
  • Parallel processing: Divide work among specialized agents

Basic Usage

from azcore import ConcurrentWorkflow

# Create specialized agents
researcher1 = Agent(
    agent_name="Market Researcher",
    system_prompt="Research market trends and competitors",
    llm=llm,
)

researcher2 = Agent(
    agent_name="Technical Researcher",
    system_prompt="Research technical requirements and feasibility",
    llm=llm,
)

researcher3 = Agent(
    agent_name="User Researcher",
    system_prompt="Research user needs and preferences",
    llm=llm,
)

# Create concurrent workflow
workflow = ConcurrentWorkflow(
    agents=[researcher1, researcher2, researcher3],
    aggregator_agent=None,  # Simple concatenation
)

# Run in parallel
result = workflow.run("Research opportunities in AI education")
print(result)

Configuration Options

aggregator_agent

Combine outputs with a specialized agent:

# Create aggregator
aggregator = Agent(
    agent_name="Synthesizer",
    system_prompt="Synthesize all research into a coherent report",
    llm=llm,
)

workflow = ConcurrentWorkflow(
    agents=[researcher1, researcher2, researcher3],
    aggregator_agent=aggregator,  # Intelligent aggregation
)

aggregator_system_prompt

Quick aggregation without creating a separate agent:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    aggregator_system_prompt="Combine all outputs into a summary",
    aggregator_model=llm,
)

max_loops

Run multiple concurrent rounds:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    max_loops=3,  # Run 3 parallel rounds
)

Advanced Examples

Multi-Perspective Analysis

from azcore import Agent, ConcurrentWorkflow

# Different analytical perspectives
financial_analyst = Agent(
    agent_name="Financial",
    system_prompt="Analyze from financial perspective: costs, ROI, revenue",
    llm=llm,
    tools=[financial_tools],
)

technical_analyst = Agent(
    agent_name="Technical",
    system_prompt="Analyze technical feasibility and implementation",
    llm=llm,
    tools=[code_analysis_tools],
)

market_analyst = Agent(
    agent_name="Market",
    system_prompt="Analyze market demand and competition",
    llm=llm,
    tools=[market_research_tools],
)

risk_analyst = Agent(
    agent_name="Risk",
    system_prompt="Identify and assess risks",
    llm=llm,
)

# Synthesizer
synthesizer = Agent(
    agent_name="Synthesizer",
    system_prompt="""Combine all analyses into a comprehensive report with:
    - Executive summary
    - Key findings from each perspective
    - Recommendations
    - Action items""",
    llm=llm,
)

# Create workflow
analysis_workflow = ConcurrentWorkflow(
    agents=[financial_analyst, technical_analyst, market_analyst, risk_analyst],
    aggregator_agent=synthesizer,
)

# Analyze business opportunity
report = analysis_workflow.run("Should we build an AI-powered CRM system?")

Parallel Content Generation

# Multiple content formats simultaneously
blog_writer = Agent(
    agent_name="Blog Writer",
    system_prompt="Write engaging blog post (800-1000 words)",
    llm=llm,
)

social_media = Agent(
    agent_name="Social Media",
    system_prompt="Create social media posts (Twitter, LinkedIn, Instagram)",
    llm=llm,
)

email_writer = Agent(
    agent_name="Email Writer",
    system_prompt="Write email newsletter",
    llm=llm,
)

video_script = Agent(
    agent_name="Video Script",
    system_prompt="Create video script with timestamps",
    llm=llm,
)

# Content coordinator
coordinator = Agent(
    agent_name="Coordinator",
    system_prompt="Organize all content formats and ensure consistent messaging",
    llm=llm,
)

# Create content suite
content_workflow = ConcurrentWorkflow(
    agents=[blog_writer, social_media, email_writer, video_script],
    aggregator_agent=coordinator,
)

# Generate all content at once
content_suite = content_workflow.run(
    "Create content about our new AI product launch"
)

Parallel Code Review

# Different review aspects
security_reviewer = Agent(
    agent_name="Security",
    system_prompt="Review code for security vulnerabilities",
    llm=llm,
    tools=[security_scanner],
)

performance_reviewer = Agent(
    agent_name="Performance",
    system_prompt="Identify performance issues and optimizations",
    llm=llm,
    tools=[profiler_tool],
)

style_reviewer = Agent(
    agent_name="Style",
    system_prompt="Check code style, readability, and best practices",
    llm=llm,
    tools=[linter_tool],
)

architecture_reviewer = Agent(
    agent_name="Architecture",
    system_prompt="Review system design and architecture decisions",
    llm=llm,
)

# Review coordinator
coordinator = Agent(
    agent_name="Lead Reviewer",
    system_prompt="""Combine all reviews into:
    - Critical issues (must fix)
    - Important suggestions
    - Nice-to-have improvements
    - Approval/rejection decision""",
    llm=llm,
)

# Create review workflow
review_workflow = ConcurrentWorkflow(
    agents=[security_reviewer, performance_reviewer, style_reviewer, architecture_reviewer],
    aggregator_agent=coordinator,
)

# Review pull request
review = review_workflow.run("Review PR #123: New authentication system")

Parallel Research and Fact-Checking

# Multiple sources simultaneously
academic_researcher = Agent(
    agent_name="Academic",
    system_prompt="Research academic papers and studies",
    llm=llm,
    tools=[arxiv_tool, scholar_tool],
)

news_researcher = Agent(
    agent_name="News",
    system_prompt="Research recent news and current events",
    llm=llm,
    tools=[news_api],
)

industry_researcher = Agent(
    agent_name="Industry",
    system_prompt="Research industry reports and trends",
    llm=llm,
    tools=[market_research_tool],
)

fact_checker = Agent(
    agent_name="Fact Checker",
    system_prompt="Verify claims and check facts",
    llm=llm,
    tools=[fact_check_tool],
)

# Research synthesizer
synthesizer = Agent(
    agent_name="Research Lead",
    system_prompt="""Create comprehensive research report:
    - Key findings
    - Sources and citations
    - Verified facts
    - Confidence levels""",
    llm=llm,
)

# Create research workflow
research_workflow = ConcurrentWorkflow(
    agents=[academic_researcher, news_researcher, industry_researcher, fact_checker],
    aggregator_agent=synthesizer,
)

# Conduct research
report = research_workflow.run("Research the impact of AI on job markets")

Aggregation Strategies

1. Simple Concatenation

No aggregator - just combine outputs:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    aggregator_agent=None,
)

2. Prompt-Based Aggregation

Quick aggregation with a prompt:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    aggregator_system_prompt="Summarize all outputs concisely",
    aggregator_model=llm,
)

3. Specialized Aggregator Agent

Full control with dedicated agent:

aggregator = Agent(
    agent_name="Synthesizer",
    system_prompt="Complex aggregation logic...",
    llm=llm,
    tools=[synthesis_tools],
)

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    aggregator_agent=aggregator,
)

4. Voting/Consensus

Implement custom voting logic:

class VotingConcurrentWorkflow(ConcurrentWorkflow):
    def aggregate_outputs(self, outputs):
        # Custom voting logic
        votes = self.extract_votes(outputs)
        consensus = self.calculate_consensus(votes)
        return consensus

State Management

Agents run independently with shared input:

# All agents receive the same initial task
# Outputs are collected: [agent1_output, agent2_output, agent3_output]
# Aggregator sees all outputs

Access state for custom processing:

class CustomConcurrentWorkflow(ConcurrentWorkflow):
    def pre_execution_hook(self, state):
        # Modify state before parallel execution
        return state

    def post_execution_hook(self, outputs, state):
        # Process outputs before aggregation
        return outputs, state

Error Handling

Handle partial failures:

from azcore import ConcurrentWorkflow

try:
    result = workflow.run("Task")
except Exception as e:
    print(f"Workflow failed: {e}")
    # Some agents may have succeeded
    # Implement partial result recovery

Graceful degradation:

class RobustConcurrentWorkflow(ConcurrentWorkflow):
    def handle_agent_failure(self, agent, error):
        # Log error but continue with other agents
        logger.error(f"Agent {agent.agent_name} failed: {error}")
        return None  # Continue without this agent's output

Best Practices

1. Ensure Independence

Agents should not depend on each other:

# Good: Independent tasks
[market_research, technical_analysis, user_research]

# Bad: Dependent tasks
[data_collection, data_cleaning, data_analysis]  # Use Sequential instead

2. Balance Agent Count

Optimize for performance vs. cost:

# Good: 3-6 agents for most tasks
workflow = ConcurrentWorkflow(agents=[agent1, agent2, agent3, agent4])

# Consider cost: 10+ agents running simultaneously

3. Design Good Aggregation

The aggregator is critical:

# Good: Clear aggregation strategy
aggregator = Agent(
    system_prompt="""Synthesize outputs into:
    1. Executive summary
    2. Key findings from each agent
    3. Recommendations"""
)

# Bad: Vague aggregation
aggregator = Agent(system_prompt="Combine the outputs")

4. Handle Timeouts

Set appropriate timeouts:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    timeout=300,  # 5 minutes max
)

Performance Considerations

Speed Improvement

Concurrent execution dramatically reduces latency:

Sequential time = Agent1 + Agent2 + Agent3
Concurrent time = max(Agent1, Agent2, Agent3) + Aggregation

Resource Usage

More resource-intensive than sequential:

# Peak memory/API usage = number of agents × per-agent usage
# Monitor costs with many agents

Optimization Tips

  1. Agent balance: Make agents similar in complexity
  2. Caching: Use cached LLM for duplicate calls
  3. Rate limits: Respect API rate limits
  4. Batching: Group similar tasks together

Comparison with Other Patterns

FeatureConcurrentSequentialMixture of Agents
SpeedFastSlowMedium
DependenciesNoneStrongWeak
Resource useHighLowHigh
AggregationRequiredNoneBuilt-in

Debugging

Monitor parallel execution:

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    verbose=True,  # Log parallel execution
)

result = workflow.run("Task")

# Check individual outputs before aggregation
for i, output in enumerate(result["agent_outputs"]):
    print(f"Agent {i}: {output}")

# Check aggregated result
print(f"Final: {result['aggregated']}")

Common Patterns

Map-Reduce

Divide work, then combine:

# Map: Parallel processing
mappers = [Agent(...) for _ in range(10)]

# Reduce: Aggregation
reducer = Agent(system_prompt="Combine all results")

workflow = ConcurrentWorkflow(agents=mappers, aggregator_agent=reducer)

Ensemble Methods

Multiple approaches, best result:

method1 = Agent(system_prompt="Approach 1...")
method2 = Agent(system_prompt="Approach 2...")
method3 = Agent(system_prompt="Approach 3...")

selector = Agent(system_prompt="Select the best solution")

workflow = ConcurrentWorkflow(
    agents=[method1, method2, method3],
    aggregator_agent=selector,
)
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs