• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Reinforcement Learning

Advanced Patterns

Advanced RL techniques and patterns in Azcore.

Advanced techniques for sophisticated RL deployments in production environments.


🤝 Multi-Agent RL

Deploy multiple specialized agents, each learning independently for their domain.

Why Multi-Agent RL?

Different domains require different tool expertise:

# ❌ Single agent trying to handle everything
agent = create_agent(tools=[
    "camera", "alert", "lock",          # Security tools
    "search", "analyze", "summarize",   # Research tools
    "calculate", "forecast", "visualize" # Analytics tools
])
# Result: Agent struggles to learn optimal tools for each domain

# ✅ Specialized agents for each domain
security_agent = create_agent(tools=["camera", "alert", "lock"])
research_agent = create_agent(tools=["search", "analyze", "summarize"])
analytics_agent = create_agent(tools=["calculate", "forecast", "visualize"])
# Result: Each agent becomes expert in its domain

Complete Multi-Agent Setup

from azcore.rl.rl_manager import RLManager
from azcore.agents.agent_factory import AgentFactory
from langchain_core.tools import tool

# Define domain-specific tools
@tool
def camera_tool(location: str) -> str:
    """Check security camera at location."""
    return f"Camera feed from {location}"

@tool
def alert_tool(message: str) -> str:
    """Send security alert."""
    return f"Alert sent: {message}"

@tool
def lock_tool(door: str, action: str) -> str:
    """Lock/unlock door."""
    return f"Door {door} {action}ed"

@tool
def search_tool(query: str) -> str:
    """Search for information."""
    return f"Search results for {query}"

@tool
def analyze_tool(text: str) -> str:
    """Analyze text content."""
    return f"Analysis of {text}"

@tool
def summarize_tool(text: str) -> str:
    """Summarize text."""
    return f"Summary of {text}"

# Create separate RL managers for different domains
security_rl = RLManager(
    tool_names=["camera_tool", "alert_tool", "lock_tool"],
    q_table_path="rl_data/security.pkl",
    exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
    exploration_rate=0.15,
    learning_rate=0.1,
    use_embeddings=True
)

research_rl = RLManager(
    tool_names=["search_tool", "analyze_tool", "summarize_tool"],
    q_table_path="rl_data/research.pkl",
    exploration_strategy=ExplorationStrategy.UCB,
    ucb_c=2.0,
    learning_rate=0.15,
    use_embeddings=True
)

# Create specialized agents
factory = AgentFactory(llm=llm)

security_agent = factory.create_react_agent(
    name="security_agent",
    tools=[camera_tool, alert_tool, lock_tool],
    rl_manager=security_rl,
    system_prompt="You are a security monitoring agent."
)

research_agent = factory.create_react_agent(
    name="research_agent",
    tools=[search_tool, analyze_tool, summarize_tool],
    rl_manager=research_rl,
    system_prompt="You are a research assistant agent."
)

# Route queries to appropriate agent
def route_query(query: str):
    """Route query to appropriate specialized agent."""
    query_lower = query.lower()

    if any(word in query_lower for word in ["security", "camera", "lock", "alert"]):
        return security_agent, security_rl
    elif any(word in query_lower for word in ["research", "search", "analyze"]):
        return research_agent, research_rl
    else:
        # Default to research
        return research_agent, research_rl

# Process queries with specialized agents
queries = [
    "Check camera at front door",
    "Search for recent AI papers",
    "Send alert about suspicious activity",
    "Analyze this document for key insights"
]

for query in queries:
    agent, rl_manager = route_query(query)

    # Agent automatically uses its RL manager
    result = agent.invoke({"messages": [HumanMessage(content=query)]})

    print(f"Query: {query}")
    print(f"Agent: {agent.name}")
    print(f"Result: {result['messages'][-1].content}\n")

Agent Coordination

Coordinate multiple agents for complex tasks:

class AgentCoordinator:
    """Coordinate multiple specialized agents."""

    def __init__(self, agents: dict):
        """
        Args:
            agents: Dict mapping domain -> (agent, rl_manager)
        """
        self.agents = agents
        self.routing_history = []

    def process_complex_query(self, query: str, required_domains: list):
        """
        Process query requiring multiple agents.

        Args:
            query: User query
            required_domains: List of domains to involve
        """
        results = {}

        for domain in required_domains:
            if domain not in self.agents:
                continue

            agent, rl_manager = self.agents[domain]

            # Contextualize query for this agent
            domain_query = f"{query} (Focus on {domain} aspects)"

            # Process with specialized agent
            result = agent.invoke({"messages": [HumanMessage(content=domain_query)]})
            results[domain] = result['messages'][-1].content

            # Track routing
            self.routing_history.append({
                "query": query,
                "domain": domain,
                "timestamp": datetime.now()
            })

        return results

    def get_agent_statistics(self):
        """Get statistics for all agents."""
        stats = {}
        for domain, (agent, rl_manager) in self.agents.items():
            stats[domain] = rl_manager.get_statistics()
        return stats

# Usage
coordinator = AgentCoordinator({
    "security": (security_agent, security_rl),
    "research": (research_agent, research_rl)
})

# Complex query requiring multiple agents
results = coordinator.process_complex_query(
    query="Investigate recent security papers and check camera feeds",
    required_domains=["security", "research"]
)

for domain, result in results.items():
    print(f"{domain.upper()}: {result}\n")

# Monitor agent performance
stats = coordinator.get_agent_statistics()
for domain, stat in stats.items():
    print(f"{domain}: {stat['total_states']} states, "
          f"{stat['exploration_rate']:.2%} exploration")

🔄 Transfer Learning

Transfer knowledge from one task to another to accelerate learning.

Why Transfer Learning?

When tasks share similar tool patterns:

# Scenario: Customer support agent learns tools
# Then: Sales agent can reuse that knowledge

# Task A: Customer support (learns email, ticket, kb_search)
# Task B: Sales (uses email, ticket, crm_search)
# Shared tools: email, ticket
# Transfer Q-values for these tools → faster convergence

Complete Transfer Learning Pipeline

import pickle
from pathlib import Path

def train_source_task():
    """Train on source task to build initial knowledge."""

    # Source task: Customer support
    support_tools = ["email", "ticket", "kb_search", "escalate"]

    support_rl = RLManager(
        tool_names=support_tools,
        q_table_path="rl_data/support.pkl",
        exploration_rate=0.2,
        learning_rate=0.1,
        use_embeddings=True
    )

    # Training data for support
    support_queries = [
        ("How do I reset my password?", ["kb_search", "email"]),
        ("File a bug report", ["ticket"]),
        ("Account locked", ["kb_search", "escalate"]),
        ("Billing question", ["ticket", "email"]),
        ("Technical issue", ["kb_search", "ticket"]),
    ]

    # Train source task
    print("Training source task (Customer Support)...")
    for epoch in range(20):
        for query, correct_tools in support_queries:
            selected, state_key = support_rl.select_tools(query, top_n=2)

            # Reward if correct tools selected
            reward = 1.0 if any(t in correct_tools for t in selected) else -0.5

            for tool in selected:
                support_rl.update(state_key, tool, reward)

        support_rl.anneal_exploration(decay_rate=0.95)

    support_rl.force_persist()
    print(f"Source task trained. States: {len(support_rl.q_table)}")

    return support_rl

def transfer_to_target_task(source_rl):
    """Transfer knowledge to target task."""

    # Target task: Sales
    sales_tools = ["email", "ticket", "crm_search", "quote"]

    sales_rl = RLManager(
        tool_names=sales_tools,
        q_table_path="rl_data/sales.pkl",
        exploration_rate=0.2,
        learning_rate=0.1,
        use_embeddings=True
    )

    # Identify shared tools
    shared_tools = set(source_rl.tool_names) & set(sales_tools)
    print(f"\nShared tools: {shared_tools}")

    # Transfer Q-values for shared tools
    transferred_states = 0
    for state_key in source_rl.q_table.keys():
        for tool in shared_tools:
            if tool in source_rl.q_table[state_key]:
                # Copy Q-value from source
                sales_rl.q_table[state_key][tool] = source_rl.q_table[state_key][tool]

                # Copy visit count (optional, for exploration strategies)
                if state_key in source_rl.visit_counts:
                    sales_rl.visit_counts[state_key][tool] = source_rl.visit_counts[state_key][tool]

                transferred_states += 1

    sales_rl.force_persist()
    print(f"Transferred {transferred_states} state-tool pairs")

    return sales_rl

def train_target_task(sales_rl):
    """Fine-tune on target task."""

    # Training data for sales
    sales_queries = [
        ("Follow up on quote", ["email", "crm_search"]),
        ("Customer wants demo", ["email", "ticket"]),
        ("Generate quote", ["quote", "crm_search"]),
        ("Lead inquiry", ["email", "crm_search"]),
        ("Contract question", ["ticket", "quote"]),
    ]

    print("\nFine-tuning target task (Sales)...")
    for epoch in range(10):  # Fewer epochs needed!
        for query, correct_tools in sales_queries:
            selected, state_key = sales_rl.select_tools(query, top_n=2)

            reward = 1.0 if any(t in correct_tools for t in selected) else -0.5

            for tool in selected:
                sales_rl.update(state_key, tool, reward)

        sales_rl.anneal_exploration(decay_rate=0.95)

    sales_rl.force_persist()
    print(f"Target task fine-tuned. Total states: {len(sales_rl.q_table)}")

    return sales_rl

# Execute transfer learning pipeline
print("=== Transfer Learning Pipeline ===\n")

# Step 1: Train source task
source_rl = train_source_task()

# Step 2: Transfer to target task
target_rl = transfer_to_target_task(source_rl)

# Step 3: Fine-tune target task
target_rl = train_target_task(target_rl)

# Step 4: Compare with baseline (no transfer)
print("\n=== Comparison with Baseline ===")

baseline_rl = RLManager(
    tool_names=["email", "ticket", "crm_search", "quote"],
    q_table_path="rl_data/sales_baseline.pkl",
    exploration_rate=0.2,
    learning_rate=0.1
)

# Train baseline from scratch (same 10 epochs)
sales_queries = [
    ("Follow up on quote", ["email", "crm_search"]),
    ("Customer wants demo", ["email", "ticket"]),
    ("Generate quote", ["quote", "crm_search"]),
    ("Lead inquiry", ["email", "crm_search"]),
    ("Contract question", ["ticket", "quote"]),
]

for epoch in range(10):
    for query, correct_tools in sales_queries:
        selected, state_key = baseline_rl.select_tools(query, top_n=2)
        reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
        for tool in selected:
            baseline_rl.update(state_key, tool, reward)
    baseline_rl.anneal_exploration(decay_rate=0.95)

# Evaluate both
test_queries = [
    ("Send quote to customer", ["email", "quote"]),
    ("Update CRM with lead", ["crm_search"]),
]

print("\nTest Performance:")
for query, correct_tools in test_queries:
    transfer_selected, _ = target_rl.select_tools(query, top_n=2)
    baseline_selected, _ = baseline_rl.select_tools(query, top_n=2)

    transfer_correct = any(t in correct_tools for t in transfer_selected)
    baseline_correct = any(t in correct_tools for t in baseline_selected)

    print(f"\nQuery: {query}")
    print(f"  Transfer Learning: {transfer_selected} - {'✓' if transfer_correct else '✗'}")
    print(f"  Baseline: {baseline_selected} - {'✓' if baseline_correct else '✗'}")

Selective Transfer

Transfer only high-quality knowledge:

def selective_transfer(source_rl, target_rl, min_visits=5, min_q_value=0.3):
    """
    Transfer only well-learned state-tool pairs.

    Args:
        source_rl: Source RL manager
        target_rl: Target RL manager
        min_visits: Minimum visits for a state-tool pair
        min_q_value: Minimum Q-value to transfer
    """
    shared_tools = set(source_rl.tool_names) & set(target_rl.tool_names)
    transferred = 0

    for state_key in source_rl.q_table.keys():
        for tool in shared_tools:
            q_value = source_rl.q_table[state_key].get(tool, 0)
            visits = source_rl.visit_counts[state_key].get(tool, 0)

            # Only transfer if well-learned
            if visits >= min_visits and q_value >= min_q_value:
                target_rl.q_table[state_key][tool] = q_value
                target_rl.visit_counts[state_key][tool] = visits
                transferred += 1

    target_rl.force_persist()
    print(f"Selectively transferred {transferred} high-quality pairs")
    return transferred

# Usage
transferred = selective_transfer(
    source_rl,
    target_rl,
    min_visits=10,
    min_q_value=0.5
)

📚 Curriculum Learning

Train progressively from simple to complex queries.

Why Curriculum Learning?

Learning is more effective when starting with basics:

# ❌ Random training (all difficulties mixed)
queries = shuffle([easy, medium, hard, expert])
# Result: Agent confused, slow convergence

# ✅ Curriculum (progressive difficulty)
Stage 1: easy queries (build foundation)
Stage 2: medium queries (add complexity)
Stage 3: hard queries (handle edge cases)
Stage 4: expert queries (master domain)
# Result: Faster convergence, better performance

Complete Curriculum Pipeline

from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class CurriculumStage:
    """Define a curriculum stage."""
    name: str
    queries: List[Tuple[str, List[str]]]  # (query, correct_tools)
    epochs: int
    difficulty: str

def create_curriculum():
    """Define curriculum stages."""

    # Stage 1: Simple single-tool queries
    stage1 = CurriculumStage(
        name="Foundation",
        queries=[
            ("Search for cats", ["search"]),
            ("Calculate 2+2", ["calculate"]),
            ("Get weather", ["weather"]),
            ("Send email", ["email"]),
        ],
        epochs=5,
        difficulty="easy"
    )

    # Stage 2: Two-tool combinations
    stage2 = CurriculumStage(
        name="Combinations",
        queries=[
            ("Search and summarize AI papers", ["search", "summarize"]),
            ("Calculate tax and send email", ["calculate", "email"]),
            ("Get weather and search for activities", ["weather", "search"]),
            ("Analyze data and visualize", ["analyze", "visualize"]),
        ],
        epochs=8,
        difficulty="medium"
    )

    # Stage 3: Complex multi-step queries
    stage3 = CurriculumStage(
        name="Complex Tasks",
        queries=[
            ("Research topic, analyze findings, summarize and email",
             ["search", "analyze", "summarize", "email"]),
            ("Calculate metrics, visualize results, generate report",
             ["calculate", "visualize", "email"]),
            ("Get weather forecast, search for events, email recommendations",
             ["weather", "search", "email"]),
        ],
        epochs=10,
        difficulty="hard"
    )

    # Stage 4: Expert ambiguous queries
    stage4 = CurriculumStage(
        name="Expert",
        queries=[
            ("Help me with my project", ["search", "analyze"]),  # Ambiguous
            ("What should I know?", ["search", "summarize"]),    # Vague
            ("Process this", ["analyze", "visualize"]),          # Underspecified
        ],
        epochs=15,
        difficulty="expert"
    )

    return [stage1, stage2, stage3, stage4]

def train_with_curriculum(rl_manager, curriculum):
    """
    Train with curriculum learning.

    Args:
        rl_manager: RLManager instance
        curriculum: List of CurriculumStage
    """
    print("=== Curriculum Learning Pipeline ===\n")

    all_metrics = {
        "stage_names": [],
        "stage_accuracy": [],
        "stage_avg_q": [],
        "cumulative_states": []
    }

    for stage_idx, stage in enumerate(curriculum, 1):
        print(f"Stage {stage_idx}/{len(curriculum)}: {stage.name} ({stage.difficulty})")
        print(f"Queries: {len(stage.queries)}, Epochs: {stage.epochs}")

        stage_correct = 0
        stage_total = 0

        for epoch in range(stage.epochs):
            epoch_correct = 0

            for query, correct_tools in stage.queries:
                selected, state_key = rl_manager.select_tools(query, top_n=2)

                # Calculate reward
                correct = any(tool in correct_tools for tool in selected)
                reward = 1.0 if correct else -0.5

                if correct:
                    epoch_correct += 1
                    stage_correct += 1

                stage_total += 1

                # Update Q-values
                for tool in selected:
                    rl_manager.update(state_key, tool, reward)

            # Decay exploration after each epoch
            rl_manager.anneal_exploration(decay_rate=0.95)

            # Print epoch progress
            accuracy = epoch_correct / len(stage.queries)
            print(f"  Epoch {epoch+1}/{stage.epochs}: "
                  f"Accuracy={accuracy:.1%}, "
                  f"Exploration={rl_manager.exploration_rate:.2%}")

        # Stage metrics
        stage_accuracy = stage_correct / stage_total
        stats = rl_manager.get_statistics()

        all_metrics["stage_names"].append(stage.name)
        all_metrics["stage_accuracy"].append(stage_accuracy)
        all_metrics["stage_avg_q"].append(stats['avg_q_value'])
        all_metrics["cumulative_states"].append(stats['total_states'])

        print(f"Stage Complete: Accuracy={stage_accuracy:.1%}, "
              f"Avg Q={stats['avg_q_value']:.3f}, "
              f"States={stats['total_states']}\n")

    rl_manager.force_persist()
    return all_metrics

# Execute curriculum learning
rl_manager = RLManager(
    tool_names=["search", "calculate", "weather", "email",
                "summarize", "analyze", "visualize"],
    q_table_path="rl_data/curriculum.pkl",
    exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
    exploration_rate=0.3,  # Start high for early stages
    epsilon_decay_rate=0.95,
    min_exploration_rate=0.01,
    learning_rate=0.1,
    use_embeddings=True
)

curriculum = create_curriculum()
metrics = train_with_curriculum(rl_manager, curriculum)

# Visualize curriculum progression
import matplotlib.pyplot as plt

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# Accuracy per stage
axes[0, 0].bar(metrics["stage_names"], metrics["stage_accuracy"])
axes[0, 0].set_ylabel("Accuracy")
axes[0, 0].set_title("Accuracy by Curriculum Stage")
axes[0, 0].set_ylim([0, 1])

# Average Q-value per stage
axes[0, 1].plot(metrics["stage_names"], metrics["stage_avg_q"], marker='o')
axes[0, 1].set_ylabel("Average Q-Value")
axes[0, 1].set_title("Q-Value Growth Across Stages")

# Cumulative states
axes[1, 0].plot(metrics["stage_names"], metrics["cumulative_states"], marker='s')
axes[1, 0].set_ylabel("Total States")
axes[1, 0].set_title("State Space Growth")

# Difficulty progression
difficulties = ["easy", "medium", "hard", "expert"]
axes[1, 1].bar(metrics["stage_names"], range(1, len(metrics["stage_names"])+1))
axes[1, 1].set_ylabel("Difficulty Level")
axes[1, 1].set_title("Curriculum Difficulty Progression")

plt.tight_layout()
plt.savefig("curriculum_progression.png")
print("Curriculum visualization saved to curriculum_progression.png")

Adaptive Curriculum

Automatically adjust difficulty based on performance:

class AdaptiveCurriculum:
    """Adaptive curriculum that adjusts based on performance."""

    def __init__(self, rl_manager, stages, mastery_threshold=0.8):
        """
        Args:
            rl_manager: RLManager instance
            stages: List of CurriculumStage
            mastery_threshold: Accuracy needed to advance
        """
        self.rl_manager = rl_manager
        self.stages = stages
        self.mastery_threshold = mastery_threshold
        self.current_stage = 0

    def train(self):
        """Train with adaptive curriculum."""
        while self.current_stage < len(self.stages):
            stage = self.stages[self.current_stage]
            print(f"\n=== Stage {self.current_stage + 1}: {stage.name} ===")

            # Train on current stage
            accuracy = self._train_stage(stage)

            # Check mastery
            if accuracy >= self.mastery_threshold:
                print(f"✓ Mastered {stage.name} (accuracy: {accuracy:.1%})")
                self.current_stage += 1
            else:
                print(f"✗ Need more practice on {stage.name} (accuracy: {accuracy:.1%})")
                # Repeat stage with more epochs
                stage.epochs += 3

    def _train_stage(self, stage):
        """Train on a single stage."""
        correct = 0
        total = 0

        for epoch in range(stage.epochs):
            for query, correct_tools in stage.queries:
                selected, state_key = self.rl_manager.select_tools(query, top_n=2)

                is_correct = any(tool in correct_tools for tool in selected)
                reward = 1.0 if is_correct else -0.5

                if is_correct:
                    correct += 1
                total += 1

                for tool in selected:
                    self.rl_manager.update(state_key, tool, reward)

            self.rl_manager.anneal_exploration(decay_rate=0.95)

        return correct / total if total > 0 else 0

# Usage
rl_manager = RLManager(
    tool_names=["search", "calculate", "weather", "email"],
    q_table_path="rl_data/adaptive_curriculum.pkl",
    exploration_rate=0.3,
    learning_rate=0.1
)

curriculum = create_curriculum()
adaptive = AdaptiveCurriculum(rl_manager, curriculum, mastery_threshold=0.85)
adaptive.train()

🎯 Multi-Objective RL

Optimize multiple objectives simultaneously (accuracy, speed, cost).

Why Multi-Objective RL?

Real-world scenarios require balancing trade-offs:

# Single objective: Maximize accuracy
# Problem: May select slow, expensive tools

# Multi-objective: Balance accuracy, speed, cost
# Solution: Find optimal trade-off

Complete Multi-Objective Implementation

from azcore.rl.rewards import RewardCalculator
import time

class MultiObjectiveRewardCalculator(RewardCalculator):
    """
    Reward calculator that balances multiple objectives.

    Objectives:
    1. Accuracy: Did the tool provide correct/useful output?
    2. Speed: How fast was the tool?
    3. Cost: How expensive is the tool?
    """

    def __init__(
        self,
        accuracy_weight: float = 0.6,
        speed_weight: float = 0.3,
        cost_weight: float = 0.1,
        tool_costs: dict = None
    ):
        """
        Args:
            accuracy_weight: Weight for accuracy (0-1)
            speed_weight: Weight for speed (0-1)
            cost_weight: Weight for cost (0-1)
            tool_costs: Dict mapping tool name -> cost (0-1)
        """
        self.accuracy_weight = accuracy_weight
        self.speed_weight = speed_weight
        self.cost_weight = cost_weight

        # Normalize weights
        total = accuracy_weight + speed_weight + cost_weight
        self.accuracy_weight /= total
        self.speed_weight /= total
        self.cost_weight /= total

        # Default costs if not provided
        self.tool_costs = tool_costs or {}

    def calculate(
        self,
        state,
        result,
        query: str,
        execution_time: float = None,
        selected_tools: list = None,
        **kwargs
    ) -> float:
        """
        Calculate multi-objective reward.

        Args:
            state: Agent state
            result: Execution result
            query: Original query
            execution_time: Time taken (seconds)
            selected_tools: List of tools used
        """
        # 1. Accuracy score
        accuracy_score = self._calculate_accuracy(result)

        # 2. Speed score (inverse of time)
        if execution_time:
            # Normalize: < 1s = 1.0, > 10s = 0.0
            speed_score = max(0, 1.0 - (execution_time / 10.0))
        else:
            speed_score = 0.5  # Neutral if unknown

        # 3. Cost score (inverse of cost)
        if selected_tools:
            avg_cost = sum(self.tool_costs.get(t, 0.5) for t in selected_tools) / len(selected_tools)
            cost_score = 1.0 - avg_cost
        else:
            cost_score = 0.5  # Neutral if unknown

        # Weighted combination
        total_reward = (
            self.accuracy_weight * accuracy_score +
            self.speed_weight * speed_score +
            self.cost_weight * cost_score
        )

        # Scale to [-1, 1]
        return 2 * total_reward - 1

    def _calculate_accuracy(self, result) -> float:
        """
        Calculate accuracy score from result.

        Returns: Score between 0 and 1
        """
        if not result or 'messages' not in result:
            return 0.0

        last_message = result['messages'][-1].content.lower()

        # Check for success indicators
        success_indicators = ["success", "complete", "done", "found", "calculated"]
        failure_indicators = ["error", "failed", "unable", "cannot", "sorry"]

        if any(ind in last_message for ind in success_indicators):
            return 1.0
        elif any(ind in last_message for ind in failure_indicators):
            return 0.0
        else:
            return 0.5  # Neutral

# Define tool costs
tool_costs = {
    "search": 0.7,      # Expensive (API calls)
    "calculate": 0.1,   # Cheap (local)
    "weather": 0.5,     # Medium (API call)
    "email": 0.3,       # Low (simple operation)
    "analyze": 0.8,     # Expensive (LLM call)
    "visualize": 0.4,   # Medium (computation)
}

# Create multi-objective RL system
reward_calculator = MultiObjectiveRewardCalculator(
    accuracy_weight=0.6,
    speed_weight=0.3,
    cost_weight=0.1,
    tool_costs=tool_costs
)

rl_manager = RLManager(
    tool_names=list(tool_costs.keys()),
    q_table_path="rl_data/multi_objective.pkl",
    exploration_rate=0.15,
    learning_rate=0.1,
    use_embeddings=True
)

# Train with multi-objective rewards
training_data = [
    ("Search for Python tutorials", ["search"], 0.9),   # Accurate but slow/expensive
    ("Calculate 15% of 200", ["calculate"], 1.0),       # Fast, cheap, accurate
    ("Get weather forecast", ["weather"], 0.8),         # Medium on all
    ("Send status email", ["email"], 0.95),             # Fast, cheap
    ("Analyze sales data", ["analyze"], 0.85),          # Slow, expensive
]

print("=== Multi-Objective Training ===\n")

for epoch in range(20):
    epoch_rewards = []

    for query, correct_tools, expected_accuracy in training_data:
        start_time = time.time()

        # Select tools
        selected, state_key = rl_manager.select_tools(query, top_n=1)

        # Simulate execution
        execution_time = time.time() - start_time + (0.1 if selected[0] == "calculate" else 0.5)

        # Mock result
        result = {
            'messages': [HumanMessage(content="Success" if selected[0] in correct_tools else "Unable to complete")]
        }

        # Calculate multi-objective reward
        reward = reward_calculator.calculate(
            state=None,
            result=result,
            query=query,
            execution_time=execution_time,
            selected_tools=selected
        )

        epoch_rewards.append(reward)

        # Update Q-values
        for tool in selected:
            rl_manager.update(state_key, tool, reward)

    rl_manager.anneal_exploration(decay_rate=0.95)

    avg_reward = sum(epoch_rewards) / len(epoch_rewards)
    print(f"Epoch {epoch+1}: Avg Reward = {avg_reward:.3f}")

# Analyze learned preferences
print("\n=== Learned Tool Preferences ===")
top_tools = rl_manager.get_top_performing_tools(top_n=len(tool_costs))

for tool, avg_q in top_tools:
    cost = tool_costs[tool]
    print(f"{tool:12s}: Q={avg_q:5.3f}, Cost={cost:.1f} "
          f"({'Preferred' if avg_q > 0.5 else 'Avoided'})")

Pareto Optimization

Find Pareto-optimal solutions:

class ParetoOptimizer:
    """Find Pareto-optimal tool selections."""

    def __init__(self, rl_manager, objectives):
        """
        Args:
            rl_manager: RLManager instance
            objectives: Dict of objective_name -> callable(tool) -> score
        """
        self.rl_manager = rl_manager
        self.objectives = objectives

    def find_pareto_front(self, query: str):
        """
        Find Pareto-optimal tool selections for query.

        Returns: List of (tools, objective_scores) on Pareto front
        """
        # Generate all possible tool combinations
        from itertools import combinations

        candidates = []
        for r in range(1, len(self.rl_manager.tool_names) + 1):
            for combo in combinations(self.rl_manager.tool_names, r):
                scores = {
                    name: obj(combo)
                    for name, obj in self.objectives.items()
                }
                candidates.append((list(combo), scores))

        # Find Pareto front
        pareto_front = []
        for candidate in candidates:
            is_dominated = False
            for other in candidates:
                if self._dominates(other[1], candidate[1]):
                    is_dominated = True
                    break

            if not is_dominated:
                pareto_front.append(candidate)

        return pareto_front

    def _dominates(self, scores1, scores2):
        """Check if scores1 dominates scores2."""
        better_in_all = all(scores1[k] >= scores2[k] for k in scores1.keys())
        better_in_some = any(scores1[k] > scores2[k] for k in scores1.keys())
        return better_in_all and better_in_some

# Define objectives
objectives = {
    "accuracy": lambda tools: sum(0.9 if t in ["search", "analyze"] else 0.5 for t in tools) / len(tools),
    "speed": lambda tools: sum(0.9 if t == "calculate" else 0.3 for t in tools) / len(tools),
    "cost": lambda tools: 1.0 - (sum(tool_costs[t] for t in tools) / len(tools))
}

optimizer = ParetoOptimizer(rl_manager, objectives)
pareto_front = optimizer.find_pareto_front("Analyze data")

print("\n=== Pareto-Optimal Solutions ===")
for tools, scores in pareto_front[:5]:  # Top 5
    print(f"Tools: {tools}")
    print(f"  Accuracy: {scores['accuracy']:.2f}, "
          f"Speed: {scores['speed']:.2f}, "
          f"Cost: {scores['cost']:.2f}\n")

🏗️ Hierarchical RL

Structure RL into high-level and low-level policies.

Why Hierarchical RL?

Complex tasks benefit from hierarchical decomposition:

# High-level policy: Choose sub-goal
# Low-level policy: Execute tools for sub-goal

# Example: "Research and report on AI"
# High-level: [Research] → [Analyze] → [Report]
# Low-level Research: [search, summarize]
# Low-level Analyze: [analyze, visualize]
# Low-level Report: [email]

Complete Hierarchical Implementation

class HierarchicalRLSystem:
    """Hierarchical RL with high-level and low-level policies."""

    def __init__(self, tool_hierarchy: dict):
        """
        Args:
            tool_hierarchy: Dict mapping high-level action -> low-level tools
        """
        self.tool_hierarchy = tool_hierarchy

        # High-level RL: Choose sub-goals
        self.high_level_rl = RLManager(
            tool_names=list(tool_hierarchy.keys()),
            q_table_path="rl_data/high_level.pkl",
            exploration_rate=0.15,
            learning_rate=0.1
        )

        # Low-level RL managers: Choose tools for each sub-goal
        self.low_level_rls = {}
        for subgoal, tools in tool_hierarchy.items():
            self.low_level_rls[subgoal] = RLManager(
                tool_names=tools,
                q_table_path=f"rl_data/low_level_{subgoal}.pkl",
                exploration_rate=0.2,
                learning_rate=0.1
            )

    def select_hierarchical_tools(self, query: str, max_subgoals: int = 2):
        """
        Select tools using hierarchical policy.

        Returns: (selected_tools, hierarchy_info)
        """
        # High-level: Select sub-goals
        subgoals, hl_state = self.high_level_rl.select_tools(query, top_n=max_subgoals)

        # Low-level: Select tools for each sub-goal
        all_tools = []
        hierarchy_info = {
            "high_level_state": hl_state,
            "subgoals": subgoals,
            "subgoal_tools": {}
        }

        for subgoal in subgoals:
            if subgoal in self.low_level_rls:
                ll_rl = self.low_level_rls[subgoal]
                tools, ll_state = ll_rl.select_tools(query, top_n=2)
                all_tools.extend(tools)
                hierarchy_info["subgoal_tools"][subgoal] = {
                    "tools": tools,
                    "state": ll_state
                }

        return all_tools, hierarchy_info

    def update_hierarchical(self, hierarchy_info, reward: float):
        """Update both high-level and low-level policies."""
        # Update high-level
        hl_state = hierarchy_info["high_level_state"]
        for subgoal in hierarchy_info["subgoals"]:
            self.high_level_rl.update(hl_state, subgoal, reward)

        # Update low-level
        for subgoal, info in hierarchy_info["subgoal_tools"].items():
            ll_state = info["state"]
            for tool in info["tools"]:
                self.low_level_rls[subgoal].update(ll_state, tool, reward)

# Define tool hierarchy
tool_hierarchy = {
    "research": ["search", "summarize"],
    "analyze": ["analyze", "visualize"],
    "communicate": ["email", "report"],
    "calculate": ["calculate", "forecast"]
}

hierarchical_system = HierarchicalRLSystem(tool_hierarchy)

# Train hierarchical system
training_queries = [
    ("Research AI trends and email summary", ["research", "communicate"], 1.0),
    ("Analyze sales data and visualize", ["analyze"], 0.9),
    ("Calculate revenue forecast", ["calculate"], 0.95),
    ("Research competitors and analyze", ["research", "analyze"], 0.85),
]

print("=== Hierarchical RL Training ===\n")

for epoch in range(15):
    for query, correct_subgoals, expected_quality in training_queries:
        # Select tools hierarchically
        tools, hierarchy_info = hierarchical_system.select_hierarchical_tools(query)

        # Evaluate
        selected_subgoals = hierarchy_info["subgoals"]
        is_correct = any(sg in correct_subgoals for sg in selected_subgoals)
        reward = expected_quality if is_correct else -0.5

        # Update hierarchical policies
        hierarchical_system.update_hierarchical(hierarchy_info, reward)

        print(f"Query: {query}")
        print(f"  Subgoals: {selected_subgoals}")
        print(f"  Tools: {tools}")
        print(f"  Reward: {reward:.2f}\n")

    # Anneal exploration
    hierarchical_system.high_level_rl.anneal_exploration(decay_rate=0.95)
    for ll_rl in hierarchical_system.low_level_rls.values():
        ll_rl.anneal_exploration(decay_rate=0.95)

print("Hierarchical training complete!")

🔬 Meta-Learning

Learn how to learn efficiently across tasks.

Why Meta-Learning?

Quickly adapt to new tasks using meta-knowledge:

# Traditional: Train from scratch for each new task
# Meta-learning: Learn initialization that adapts quickly

Complete Meta-Learning Implementation

class MetaLearningRLManager:
    """Meta-learning for RL that learns good initializations."""

    def __init__(self, tool_names, meta_lr=0.01):
        """
        Args:
            tool_names: List of tool names
            meta_lr: Meta-learning rate
        """
        self.tool_names = tool_names
        self.meta_lr = meta_lr

        # Meta-parameters (learned across tasks)
        self.meta_q_init = {tool: 0.0 for tool in tool_names}
        self.task_history = []

    def create_task_rl(self, task_name: str):
        """Create RL manager for specific task with meta-init."""
        rl_manager = RLManager(
            tool_names=self.tool_names,
            q_table_path=f"rl_data/meta_{task_name}.pkl",
            exploration_rate=0.2,
            learning_rate=0.1
        )

        # Initialize with meta-learned values
        for state in rl_manager.q_table.keys():
            for tool in self.tool_names:
                rl_manager.q_table[state][tool] = self.meta_q_init[tool]

        return rl_manager

    def meta_update(self, task_performances: dict):
        """
        Update meta-parameters based on task performances.

        Args:
            task_performances: Dict of task_name -> (rl_manager, final_reward)
        """
        # Aggregate Q-values from all tasks
        tool_q_aggregates = {tool: [] for tool in self.tool_names}

        for task_name, (rl_manager, reward) in task_performances.items():
            # Weight by task performance
            weight = max(0, reward)

            for state in rl_manager.q_table.keys():
                for tool in self.tool_names:
                    q_value = rl_manager.q_table[state].get(tool, 0)
                    tool_q_aggregates[tool].append(q_value * weight)

        # Update meta-initialization
        for tool in self.tool_names:
            if tool_q_aggregates[tool]:
                avg_q = sum(tool_q_aggregates[tool]) / len(tool_q_aggregates[tool])
                self.meta_q_init[tool] += self.meta_lr * (avg_q - self.meta_q_init[tool])

        self.task_history.append(task_performances)

        print("Meta-parameters updated:")
        for tool, q in self.meta_q_init.items():
            print(f"  {tool}: {q:.3f}")

# Meta-learning training
meta_learner = MetaLearningRLManager(
    tool_names=["search", "calculate", "email", "analyze"],
    meta_lr=0.1
)

# Simulate multiple tasks
tasks = {
    "customer_support": [
        ("Help with account", ["search", "email"], 1.0),
        ("Calculate refund", ["calculate", "email"], 0.9),
    ],
    "data_analysis": [
        ("Analyze trends", ["analyze"], 1.0),
        ("Calculate metrics", ["calculate", "analyze"], 0.95),
    ],
    "research": [
        ("Search papers", ["search"], 1.0),
        ("Analyze findings", ["search", "analyze"], 0.9),
    ]
}

print("=== Meta-Learning Training ===\n")

for meta_epoch in range(5):
    print(f"\nMeta-Epoch {meta_epoch + 1}")

    task_performances = {}

    # Train on each task
    for task_name, task_data in tasks.items():
        print(f"\n  Training on task: {task_name}")

        # Create task-specific RL with meta-init
        rl_manager = meta_learner.create_task_rl(task_name)

        # Train on task
        task_rewards = []
        for epoch in range(5):  # Few-shot adaptation
            for query, correct_tools, _ in task_data:
                selected, state_key = rl_manager.select_tools(query, top_n=2)
                reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
                task_rewards.append(reward)

                for tool in selected:
                    rl_manager.update(state_key, tool, reward)

        final_reward = sum(task_rewards) / len(task_rewards)
        task_performances[task_name] = (rl_manager, final_reward)
        print(f"    Final reward: {final_reward:.3f}")

    # Meta-update
    meta_learner.meta_update(task_performances)

print("\n=== Meta-Learning Complete ===")
print("Learned meta-initialization:")
for tool, q in meta_learner.meta_q_init.items():
    print(f"  {tool}: {q:.3f}")

🎲 Ensemble Methods

Combine multiple RL policies for robust decisions.

Why Ensemble Methods?

Reduce variance and improve robustness:

# Single policy: May be unstable or overfitted
# Ensemble: Majority vote or weighted average

Complete Ensemble Implementation

class RLEnsemble:
    """Ensemble of multiple RL managers."""

    def __init__(self, tool_names, n_members=5):
        """
        Args:
            tool_names: List of tool names
            n_members: Number of ensemble members
        """
        self.tool_names = tool_names
        self.members = []

        # Create diverse ensemble members
        for i in range(n_members):
            rl_manager = RLManager(
                tool_names=tool_names,
                q_table_path=f"rl_data/ensemble_member_{i}.pkl",
                exploration_strategy=ExplorationStrategy.EPSILON_DECAY if i % 2 == 0 else ExplorationStrategy.UCB,
                exploration_rate=0.1 + (i * 0.05),  # Varied exploration
                learning_rate=0.05 + (i * 0.02),    # Varied learning rates
                use_embeddings=True
            )
            self.members.append(rl_manager)

    def select_tools_ensemble(self, query: str, top_n: int = 2, method="voting"):
        """
        Select tools using ensemble.

        Args:
            query: User query
            top_n: Number of tools to select
            method: "voting" or "averaging"
        """
        if method == "voting":
            return self._select_by_voting(query, top_n)
        elif method == "averaging":
            return self._select_by_averaging(query, top_n)
        else:
            raise ValueError(f"Unknown method: {method}")

    def _select_by_voting(self, query: str, top_n: int):
        """Majority voting across ensemble members."""
        votes = {tool: 0 for tool in self.tool_names}
        member_selections = []

        # Collect votes from each member
        for member in self.members:
            selected, state_key = member.select_tools(query, top_n=top_n)
            member_selections.append((selected, state_key))

            for tool in selected:
                votes[tool] += 1

        # Select top-voted tools
        sorted_tools = sorted(votes.items(), key=lambda x: x[1], reverse=True)
        ensemble_selected = [tool for tool, _ in sorted_tools[:top_n]]

        return ensemble_selected, member_selections

    def _select_by_averaging(self, query: str, top_n: int):
        """Average Q-values across ensemble members."""
        avg_q_values = {tool: 0.0 for tool in self.tool_names}
        member_selections = []

        # Collect Q-values from each member
        for member in self.members:
            selected, state_key = member.select_tools(query, top_n=len(self.tool_names))
            member_selections.append((selected, state_key))

            # Get Q-values for this state
            if state_key in member.q_table:
                for tool, q_value in member.q_table[state_key].items():
                    avg_q_values[tool] += q_value / len(self.members)

        # Select top Q-value tools
        sorted_tools = sorted(avg_q_values.items(), key=lambda x: x[1], reverse=True)
        ensemble_selected = [tool for tool, _ in sorted_tools[:top_n]]

        return ensemble_selected, member_selections

    def update_ensemble(self, member_selections, reward: float):
        """Update all ensemble members."""
        for i, (selected, state_key) in enumerate(member_selections):
            for tool in selected:
                self.members[i].update(state_key, tool, reward)

# Train ensemble
ensemble = RLEnsemble(
    tool_names=["search", "calculate", "email", "analyze"],
    n_members=5
)

training_data = [
    ("Search for info", ["search"], 1.0),
    ("Calculate tax", ["calculate"], 1.0),
    ("Send report", ["email"], 0.9),
    ("Analyze data", ["analyze"], 0.95),
]

print("=== Ensemble Training ===\n")

for epoch in range(20):
    for query, correct_tools, expected_quality in training_data:
        # Ensemble selection (voting)
        selected, member_selections = ensemble.select_tools_ensemble(query, top_n=2, method="voting")

        # Evaluate
        is_correct = any(t in correct_tools for t in selected)
        reward = expected_quality if is_correct else -0.5

        # Update ensemble
        ensemble.update_ensemble(member_selections, reward)

    print(f"Epoch {epoch+1} complete")

# Compare methods
test_query = "Search and analyze documents"
voting_tools, _ = ensemble.select_tools_ensemble(test_query, top_n=2, method="voting")
averaging_tools, _ = ensemble.select_tools_ensemble(test_query, top_n=2, method="averaging")

print(f"\nTest Query: {test_query}")
print(f"  Voting: {voting_tools}")
print(f"  Averaging: {averaging_tools}")

🌊 Online Learning

Continuously learn from production data.

Why Online Learning?

Adapt to changing patterns in real-time:

# Offline: Train once, deploy
# Online: Continuously update from production feedback

Complete Online Learning Implementation

import threading
import queue
from datetime import datetime

class OnlineLearningSystem:
    """Online learning system for continuous adaptation."""

    def __init__(self, rl_manager, update_interval=100):
        """
        Args:
            rl_manager: RLManager instance
            update_interval: Update Q-table every N interactions
        """
        self.rl_manager = rl_manager
        self.update_interval = update_interval

        # Online learning queues
        self.experience_queue = queue.Queue()
        self.interaction_count = 0
        self.update_lock = threading.Lock()

        # Metrics
        self.online_metrics = {
            "interactions": 0,
            "updates": 0,
            "avg_reward": 0.0,
            "rewards_history": []
        }

    def interact(self, query: str, top_n: int = 2):
        """
        Interact with system (production use).

        Returns: (selected_tools, interaction_id)
        """
        selected, state_key = self.rl_manager.select_tools(query, top_n=top_n)

        # Generate interaction ID
        interaction_id = f"{datetime.now().timestamp()}_{self.interaction_count}"
        self.interaction_count += 1

        # Store for later feedback
        self.experience_queue.put({
            "id": interaction_id,
            "query": query,
            "selected": selected,
            "state_key": state_key,
            "timestamp": datetime.now()
        })

        self.online_metrics["interactions"] += 1

        return selected, interaction_id

    def provide_feedback(self, interaction_id: str, reward: float):
        """
        Provide feedback for an interaction.

        Args:
            interaction_id: ID from interact()
            reward: Reward signal
        """
        # Find interaction in queue
        experiences = []
        found = False

        while not self.experience_queue.empty():
            exp = self.experience_queue.get()
            if exp["id"] == interaction_id:
                # Update with reward
                with self.update_lock:
                    for tool in exp["selected"]:
                        self.rl_manager.update(exp["state_key"], tool, reward)

                self.online_metrics["updates"] += 1
                self.online_metrics["rewards_history"].append(reward)

                # Update running average
                n = len(self.online_metrics["rewards_history"])
                self.online_metrics["avg_reward"] = (
                    (self.online_metrics["avg_reward"] * (n - 1) + reward) / n
                )

                found = True
            else:
                experiences.append(exp)

        # Re-add non-matched experiences
        for exp in experiences:
            self.experience_queue.put(exp)

        # Periodic persistence
        if self.online_metrics["updates"] % self.update_interval == 0:
            self.rl_manager.force_persist()
            print(f"Online update checkpoint: {self.online_metrics['updates']} updates, "
                  f"avg reward: {self.online_metrics['avg_reward']:.3f}")

        return found

    def get_metrics(self):
        """Get online learning metrics."""
        return self.online_metrics.copy()

# Simulate online learning
rl_manager = RLManager(
    tool_names=["search", "calculate", "email", "analyze"],
    q_table_path="rl_data/online_learning.pkl",
    exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
    exploration_rate=0.1,  # Low exploration in production
    learning_rate=0.05,     # Conservative online updates
    use_embeddings=True
)

online_system = OnlineLearningSystem(rl_manager, update_interval=10)

print("=== Online Learning Simulation ===\n")

# Simulate production usage with delayed feedback
import random

production_queries = [
    ("Search for documentation", ["search"], 1.0),
    ("Calculate monthly revenue", ["calculate"], 0.95),
    ("Send quarterly report", ["email"], 0.9),
    ("Analyze customer data", ["analyze"], 0.85),
]

for i in range(50):
    # User interaction
    query, correct_tools, base_reward = random.choice(production_queries)
    selected, interaction_id = online_system.interact(query, top_n=2)

    print(f"Interaction {i+1}: {query[:30]}...")
    print(f"  Selected: {selected}")

    # Simulate user feedback (delayed)
    is_correct = any(t in correct_tools for t in selected)
    reward = base_reward if is_correct else -0.3

    # Provide feedback immediately (could be delayed in real system)
    online_system.provide_feedback(interaction_id, reward)

    if (i + 1) % 10 == 0:
        metrics = online_system.get_metrics()
        print(f"\n--- Checkpoint at {i+1} interactions ---")
        print(f"Total updates: {metrics['updates']}")
        print(f"Avg reward: {metrics['avg_reward']:.3f}\n")

print("=== Online Learning Complete ===")
final_metrics = online_system.get_metrics()
print(f"Total interactions: {final_metrics['interactions']}")
print(f"Total updates: {final_metrics['updates']}")
print(f"Final avg reward: {final_metrics['avg_reward']:.3f}")

🎓 Best Practices

1. Start Simple, Then Advanced

# ✅ Progression
# 1. Single agent with basic RL
# 2. Add curriculum learning
# 3. Try multi-objective rewards
# 4. Scale to multi-agent if needed

# ❌ Don't start with complex hierarchical multi-objective meta-learning

2. Monitor Each Component

# ✅ Track metrics for each pattern
multi_agent_stats = {
    agent_name: rl_manager.get_statistics()
    for agent_name, rl_manager in agents.items()
}

# ❌ Don't deploy advanced patterns without monitoring

3. Validate Transfer Learning

# ✅ Compare transfer vs baseline
transfer_performance = evaluate(transfer_rl)
baseline_performance = evaluate(baseline_rl)
improvement = (transfer_performance - baseline_performance) / baseline_performance
print(f"Transfer learning improvement: {improvement:.1%}")

# ❌ Don't assume transfer always helps

4. Balance Complexity vs Benefit

# ✅ Use advanced patterns when they provide clear value
if dataset_size > 1000 and task_similarity > 0.7:
    use_transfer_learning()

# ❌ Don't add complexity without justification

📊 Pattern Selection Guide

PatternUse WhenBenefitComplexity
Multi-AgentMultiple domainsSpecialized expertise⭐⭐ Medium
Transfer LearningSimilar tasksFaster convergence⭐⭐ Medium
CurriculumComplex tasksBetter learning⭐⭐ Medium
Multi-ObjectiveTrade-offs neededBalanced optimization⭐⭐⭐ High
HierarchicalDecomposable tasksStructured learning⭐⭐⭐⭐ Very High
Meta-LearningMany tasksFast adaptation⭐⭐⭐⭐ Very High
EnsembleHigh stakesRobustness⭐⭐ Medium
Online LearningProductionContinuous adaptation⭐⭐⭐ High

🎯 Summary

Advanced RL patterns in Azcore enable:

  1. Multi-Agent RL: Specialized agents per domain
  2. Transfer Learning: Reuse knowledge across tasks
  3. Curriculum Learning: Progressive difficulty training
  4. Multi-Objective RL: Balance competing objectives
  5. Hierarchical RL: Structured decision-making
  6. Meta-Learning: Learn to learn efficiently
  7. Ensemble Methods: Robust predictions
  8. Online Learning: Continuous adaptation

Choose patterns based on your specific needs and gradually increase complexity as required.

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs