Execute multiple agents in parallel and aggregate their results. Ideal for independent tasks that can run simultaneously.
Overview
Concurrent workflow runs agents in parallel, reducing overall execution time. Each agent works independently, and their outputs are combined at the end.
When to Use
- Independent tasks: Agents don't need each other's outputs
- Speed critical: Minimize total execution time
- Diverse perspectives: Gather multiple viewpoints simultaneously
- Parallel processing: Divide work among specialized agents
Basic Usage
from azcore import ConcurrentWorkflow
# Create specialized agents
researcher1 = Agent(
agent_name="Market Researcher",
system_prompt="Research market trends and competitors",
llm=llm,
)
researcher2 = Agent(
agent_name="Technical Researcher",
system_prompt="Research technical requirements and feasibility",
llm=llm,
)
researcher3 = Agent(
agent_name="User Researcher",
system_prompt="Research user needs and preferences",
llm=llm,
)
# Create concurrent workflow
workflow = ConcurrentWorkflow(
agents=[researcher1, researcher2, researcher3],
aggregator_agent=None, # Simple concatenation
)
# Run in parallel
result = workflow.run("Research opportunities in AI education")
print(result)
Configuration Options
aggregator_agent
Combine outputs with a specialized agent:
# Create aggregator
aggregator = Agent(
agent_name="Synthesizer",
system_prompt="Synthesize all research into a coherent report",
llm=llm,
)
workflow = ConcurrentWorkflow(
agents=[researcher1, researcher2, researcher3],
aggregator_agent=aggregator, # Intelligent aggregation
)
aggregator_system_prompt
Quick aggregation without creating a separate agent:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
aggregator_system_prompt="Combine all outputs into a summary",
aggregator_model=llm,
)
max_loops
Run multiple concurrent rounds:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
max_loops=3, # Run 3 parallel rounds
)
Advanced Examples
Multi-Perspective Analysis
from azcore import Agent, ConcurrentWorkflow
# Different analytical perspectives
financial_analyst = Agent(
agent_name="Financial",
system_prompt="Analyze from financial perspective: costs, ROI, revenue",
llm=llm,
tools=[financial_tools],
)
technical_analyst = Agent(
agent_name="Technical",
system_prompt="Analyze technical feasibility and implementation",
llm=llm,
tools=[code_analysis_tools],
)
market_analyst = Agent(
agent_name="Market",
system_prompt="Analyze market demand and competition",
llm=llm,
tools=[market_research_tools],
)
risk_analyst = Agent(
agent_name="Risk",
system_prompt="Identify and assess risks",
llm=llm,
)
# Synthesizer
synthesizer = Agent(
agent_name="Synthesizer",
system_prompt="""Combine all analyses into a comprehensive report with:
- Executive summary
- Key findings from each perspective
- Recommendations
- Action items""",
llm=llm,
)
# Create workflow
analysis_workflow = ConcurrentWorkflow(
agents=[financial_analyst, technical_analyst, market_analyst, risk_analyst],
aggregator_agent=synthesizer,
)
# Analyze business opportunity
report = analysis_workflow.run("Should we build an AI-powered CRM system?")
Parallel Content Generation
# Multiple content formats simultaneously
blog_writer = Agent(
agent_name="Blog Writer",
system_prompt="Write engaging blog post (800-1000 words)",
llm=llm,
)
social_media = Agent(
agent_name="Social Media",
system_prompt="Create social media posts (Twitter, LinkedIn, Instagram)",
llm=llm,
)
email_writer = Agent(
agent_name="Email Writer",
system_prompt="Write email newsletter",
llm=llm,
)
video_script = Agent(
agent_name="Video Script",
system_prompt="Create video script with timestamps",
llm=llm,
)
# Content coordinator
coordinator = Agent(
agent_name="Coordinator",
system_prompt="Organize all content formats and ensure consistent messaging",
llm=llm,
)
# Create content suite
content_workflow = ConcurrentWorkflow(
agents=[blog_writer, social_media, email_writer, video_script],
aggregator_agent=coordinator,
)
# Generate all content at once
content_suite = content_workflow.run(
"Create content about our new AI product launch"
)
Parallel Code Review
# Different review aspects
security_reviewer = Agent(
agent_name="Security",
system_prompt="Review code for security vulnerabilities",
llm=llm,
tools=[security_scanner],
)
performance_reviewer = Agent(
agent_name="Performance",
system_prompt="Identify performance issues and optimizations",
llm=llm,
tools=[profiler_tool],
)
style_reviewer = Agent(
agent_name="Style",
system_prompt="Check code style, readability, and best practices",
llm=llm,
tools=[linter_tool],
)
architecture_reviewer = Agent(
agent_name="Architecture",
system_prompt="Review system design and architecture decisions",
llm=llm,
)
# Review coordinator
coordinator = Agent(
agent_name="Lead Reviewer",
system_prompt="""Combine all reviews into:
- Critical issues (must fix)
- Important suggestions
- Nice-to-have improvements
- Approval/rejection decision""",
llm=llm,
)
# Create review workflow
review_workflow = ConcurrentWorkflow(
agents=[security_reviewer, performance_reviewer, style_reviewer, architecture_reviewer],
aggregator_agent=coordinator,
)
# Review pull request
review = review_workflow.run("Review PR #123: New authentication system")
Parallel Research and Fact-Checking
# Multiple sources simultaneously
academic_researcher = Agent(
agent_name="Academic",
system_prompt="Research academic papers and studies",
llm=llm,
tools=[arxiv_tool, scholar_tool],
)
news_researcher = Agent(
agent_name="News",
system_prompt="Research recent news and current events",
llm=llm,
tools=[news_api],
)
industry_researcher = Agent(
agent_name="Industry",
system_prompt="Research industry reports and trends",
llm=llm,
tools=[market_research_tool],
)
fact_checker = Agent(
agent_name="Fact Checker",
system_prompt="Verify claims and check facts",
llm=llm,
tools=[fact_check_tool],
)
# Research synthesizer
synthesizer = Agent(
agent_name="Research Lead",
system_prompt="""Create comprehensive research report:
- Key findings
- Sources and citations
- Verified facts
- Confidence levels""",
llm=llm,
)
# Create research workflow
research_workflow = ConcurrentWorkflow(
agents=[academic_researcher, news_researcher, industry_researcher, fact_checker],
aggregator_agent=synthesizer,
)
# Conduct research
report = research_workflow.run("Research the impact of AI on job markets")
Aggregation Strategies
1. Simple Concatenation
No aggregator - just combine outputs:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
aggregator_agent=None,
)
2. Prompt-Based Aggregation
Quick aggregation with a prompt:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
aggregator_system_prompt="Summarize all outputs concisely",
aggregator_model=llm,
)
3. Specialized Aggregator Agent
Full control with dedicated agent:
aggregator = Agent(
agent_name="Synthesizer",
system_prompt="Complex aggregation logic...",
llm=llm,
tools=[synthesis_tools],
)
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
aggregator_agent=aggregator,
)
4. Voting/Consensus
Implement custom voting logic:
class VotingConcurrentWorkflow(ConcurrentWorkflow):
def aggregate_outputs(self, outputs):
# Custom voting logic
votes = self.extract_votes(outputs)
consensus = self.calculate_consensus(votes)
return consensus
State Management
Agents run independently with shared input:
# All agents receive the same initial task
# Outputs are collected: [agent1_output, agent2_output, agent3_output]
# Aggregator sees all outputs
Access state for custom processing:
class CustomConcurrentWorkflow(ConcurrentWorkflow):
def pre_execution_hook(self, state):
# Modify state before parallel execution
return state
def post_execution_hook(self, outputs, state):
# Process outputs before aggregation
return outputs, state
Error Handling
Handle partial failures:
from azcore import ConcurrentWorkflow
try:
result = workflow.run("Task")
except Exception as e:
print(f"Workflow failed: {e}")
# Some agents may have succeeded
# Implement partial result recovery
Graceful degradation:
class RobustConcurrentWorkflow(ConcurrentWorkflow):
def handle_agent_failure(self, agent, error):
# Log error but continue with other agents
logger.error(f"Agent {agent.agent_name} failed: {error}")
return None # Continue without this agent's output
Best Practices
1. Ensure Independence
Agents should not depend on each other:
# Good: Independent tasks
[market_research, technical_analysis, user_research]
# Bad: Dependent tasks
[data_collection, data_cleaning, data_analysis] # Use Sequential instead
2. Balance Agent Count
Optimize for performance vs. cost:
# Good: 3-6 agents for most tasks
workflow = ConcurrentWorkflow(agents=[agent1, agent2, agent3, agent4])
# Consider cost: 10+ agents running simultaneously
3. Design Good Aggregation
The aggregator is critical:
# Good: Clear aggregation strategy
aggregator = Agent(
system_prompt="""Synthesize outputs into:
1. Executive summary
2. Key findings from each agent
3. Recommendations"""
)
# Bad: Vague aggregation
aggregator = Agent(system_prompt="Combine the outputs")
4. Handle Timeouts
Set appropriate timeouts:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
timeout=300, # 5 minutes max
)
Performance Considerations
Speed Improvement
Concurrent execution dramatically reduces latency:
Sequential time = Agent1 + Agent2 + Agent3
Concurrent time = max(Agent1, Agent2, Agent3) + Aggregation
Resource Usage
More resource-intensive than sequential:
# Peak memory/API usage = number of agents × per-agent usage
# Monitor costs with many agents
Optimization Tips
- Agent balance: Make agents similar in complexity
- Caching: Use cached LLM for duplicate calls
- Rate limits: Respect API rate limits
- Batching: Group similar tasks together
Comparison with Other Patterns
| Feature | Concurrent | Sequential | Mixture of Agents |
|---|---|---|---|
| Speed | Fast | Slow | Medium |
| Dependencies | None | Strong | Weak |
| Resource use | High | Low | High |
| Aggregation | Required | None | Built-in |
Debugging
Monitor parallel execution:
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
verbose=True, # Log parallel execution
)
result = workflow.run("Task")
# Check individual outputs before aggregation
for i, output in enumerate(result["agent_outputs"]):
print(f"Agent {i}: {output}")
# Check aggregated result
print(f"Final: {result['aggregated']}")
Common Patterns
Map-Reduce
Divide work, then combine:
# Map: Parallel processing
mappers = [Agent(...) for _ in range(10)]
# Reduce: Aggregation
reducer = Agent(system_prompt="Combine all results")
workflow = ConcurrentWorkflow(agents=mappers, aggregator_agent=reducer)
Ensemble Methods
Multiple approaches, best result:
method1 = Agent(system_prompt="Approach 1...")
method2 = Agent(system_prompt="Approach 2...")
method3 = Agent(system_prompt="Approach 3...")
selector = Agent(system_prompt="Select the best solution")
workflow = ConcurrentWorkflow(
agents=[method1, method2, method3],
aggregator_agent=selector,
)