Advanced techniques for sophisticated RL deployments in production environments.
🤝 Multi-Agent RL
Deploy multiple specialized agents, each learning independently for their domain.
Why Multi-Agent RL?
Different domains require different tool expertise:
# ❌ Single agent trying to handle everything
agent = create_agent(tools=[
"camera", "alert", "lock", # Security tools
"search", "analyze", "summarize", # Research tools
"calculate", "forecast", "visualize" # Analytics tools
])
# Result: Agent struggles to learn optimal tools for each domain
# ✅ Specialized agents for each domain
security_agent = create_agent(tools=["camera", "alert", "lock"])
research_agent = create_agent(tools=["search", "analyze", "summarize"])
analytics_agent = create_agent(tools=["calculate", "forecast", "visualize"])
# Result: Each agent becomes expert in its domain
Complete Multi-Agent Setup
from azcore.rl.rl_manager import RLManager
from azcore.agents.agent_factory import AgentFactory
from langchain_core.tools import tool
# Define domain-specific tools
@tool
def camera_tool(location: str) -> str:
"""Check security camera at location."""
return f"Camera feed from {location}"
@tool
def alert_tool(message: str) -> str:
"""Send security alert."""
return f"Alert sent: {message}"
@tool
def lock_tool(door: str, action: str) -> str:
"""Lock/unlock door."""
return f"Door {door} {action}ed"
@tool
def search_tool(query: str) -> str:
"""Search for information."""
return f"Search results for {query}"
@tool
def analyze_tool(text: str) -> str:
"""Analyze text content."""
return f"Analysis of {text}"
@tool
def summarize_tool(text: str) -> str:
"""Summarize text."""
return f"Summary of {text}"
# Create separate RL managers for different domains
security_rl = RLManager(
tool_names=["camera_tool", "alert_tool", "lock_tool"],
q_table_path="rl_data/security.pkl",
exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
exploration_rate=0.15,
learning_rate=0.1,
use_embeddings=True
)
research_rl = RLManager(
tool_names=["search_tool", "analyze_tool", "summarize_tool"],
q_table_path="rl_data/research.pkl",
exploration_strategy=ExplorationStrategy.UCB,
ucb_c=2.0,
learning_rate=0.15,
use_embeddings=True
)
# Create specialized agents
factory = AgentFactory(llm=llm)
security_agent = factory.create_react_agent(
name="security_agent",
tools=[camera_tool, alert_tool, lock_tool],
rl_manager=security_rl,
system_prompt="You are a security monitoring agent."
)
research_agent = factory.create_react_agent(
name="research_agent",
tools=[search_tool, analyze_tool, summarize_tool],
rl_manager=research_rl,
system_prompt="You are a research assistant agent."
)
# Route queries to appropriate agent
def route_query(query: str):
"""Route query to appropriate specialized agent."""
query_lower = query.lower()
if any(word in query_lower for word in ["security", "camera", "lock", "alert"]):
return security_agent, security_rl
elif any(word in query_lower for word in ["research", "search", "analyze"]):
return research_agent, research_rl
else:
# Default to research
return research_agent, research_rl
# Process queries with specialized agents
queries = [
"Check camera at front door",
"Search for recent AI papers",
"Send alert about suspicious activity",
"Analyze this document for key insights"
]
for query in queries:
agent, rl_manager = route_query(query)
# Agent automatically uses its RL manager
result = agent.invoke({"messages": [HumanMessage(content=query)]})
print(f"Query: {query}")
print(f"Agent: {agent.name}")
print(f"Result: {result['messages'][-1].content}\n")
Agent Coordination
Coordinate multiple agents for complex tasks:
class AgentCoordinator:
"""Coordinate multiple specialized agents."""
def __init__(self, agents: dict):
"""
Args:
agents: Dict mapping domain -> (agent, rl_manager)
"""
self.agents = agents
self.routing_history = []
def process_complex_query(self, query: str, required_domains: list):
"""
Process query requiring multiple agents.
Args:
query: User query
required_domains: List of domains to involve
"""
results = {}
for domain in required_domains:
if domain not in self.agents:
continue
agent, rl_manager = self.agents[domain]
# Contextualize query for this agent
domain_query = f"{query} (Focus on {domain} aspects)"
# Process with specialized agent
result = agent.invoke({"messages": [HumanMessage(content=domain_query)]})
results[domain] = result['messages'][-1].content
# Track routing
self.routing_history.append({
"query": query,
"domain": domain,
"timestamp": datetime.now()
})
return results
def get_agent_statistics(self):
"""Get statistics for all agents."""
stats = {}
for domain, (agent, rl_manager) in self.agents.items():
stats[domain] = rl_manager.get_statistics()
return stats
# Usage
coordinator = AgentCoordinator({
"security": (security_agent, security_rl),
"research": (research_agent, research_rl)
})
# Complex query requiring multiple agents
results = coordinator.process_complex_query(
query="Investigate recent security papers and check camera feeds",
required_domains=["security", "research"]
)
for domain, result in results.items():
print(f"{domain.upper()}: {result}\n")
# Monitor agent performance
stats = coordinator.get_agent_statistics()
for domain, stat in stats.items():
print(f"{domain}: {stat['total_states']} states, "
f"{stat['exploration_rate']:.2%} exploration")
🔄 Transfer Learning
Transfer knowledge from one task to another to accelerate learning.
Why Transfer Learning?
When tasks share similar tool patterns:
# Scenario: Customer support agent learns tools
# Then: Sales agent can reuse that knowledge
# Task A: Customer support (learns email, ticket, kb_search)
# Task B: Sales (uses email, ticket, crm_search)
# Shared tools: email, ticket
# Transfer Q-values for these tools → faster convergence
Complete Transfer Learning Pipeline
import pickle
from pathlib import Path
def train_source_task():
"""Train on source task to build initial knowledge."""
# Source task: Customer support
support_tools = ["email", "ticket", "kb_search", "escalate"]
support_rl = RLManager(
tool_names=support_tools,
q_table_path="rl_data/support.pkl",
exploration_rate=0.2,
learning_rate=0.1,
use_embeddings=True
)
# Training data for support
support_queries = [
("How do I reset my password?", ["kb_search", "email"]),
("File a bug report", ["ticket"]),
("Account locked", ["kb_search", "escalate"]),
("Billing question", ["ticket", "email"]),
("Technical issue", ["kb_search", "ticket"]),
]
# Train source task
print("Training source task (Customer Support)...")
for epoch in range(20):
for query, correct_tools in support_queries:
selected, state_key = support_rl.select_tools(query, top_n=2)
# Reward if correct tools selected
reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
for tool in selected:
support_rl.update(state_key, tool, reward)
support_rl.anneal_exploration(decay_rate=0.95)
support_rl.force_persist()
print(f"Source task trained. States: {len(support_rl.q_table)}")
return support_rl
def transfer_to_target_task(source_rl):
"""Transfer knowledge to target task."""
# Target task: Sales
sales_tools = ["email", "ticket", "crm_search", "quote"]
sales_rl = RLManager(
tool_names=sales_tools,
q_table_path="rl_data/sales.pkl",
exploration_rate=0.2,
learning_rate=0.1,
use_embeddings=True
)
# Identify shared tools
shared_tools = set(source_rl.tool_names) & set(sales_tools)
print(f"\nShared tools: {shared_tools}")
# Transfer Q-values for shared tools
transferred_states = 0
for state_key in source_rl.q_table.keys():
for tool in shared_tools:
if tool in source_rl.q_table[state_key]:
# Copy Q-value from source
sales_rl.q_table[state_key][tool] = source_rl.q_table[state_key][tool]
# Copy visit count (optional, for exploration strategies)
if state_key in source_rl.visit_counts:
sales_rl.visit_counts[state_key][tool] = source_rl.visit_counts[state_key][tool]
transferred_states += 1
sales_rl.force_persist()
print(f"Transferred {transferred_states} state-tool pairs")
return sales_rl
def train_target_task(sales_rl):
"""Fine-tune on target task."""
# Training data for sales
sales_queries = [
("Follow up on quote", ["email", "crm_search"]),
("Customer wants demo", ["email", "ticket"]),
("Generate quote", ["quote", "crm_search"]),
("Lead inquiry", ["email", "crm_search"]),
("Contract question", ["ticket", "quote"]),
]
print("\nFine-tuning target task (Sales)...")
for epoch in range(10): # Fewer epochs needed!
for query, correct_tools in sales_queries:
selected, state_key = sales_rl.select_tools(query, top_n=2)
reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
for tool in selected:
sales_rl.update(state_key, tool, reward)
sales_rl.anneal_exploration(decay_rate=0.95)
sales_rl.force_persist()
print(f"Target task fine-tuned. Total states: {len(sales_rl.q_table)}")
return sales_rl
# Execute transfer learning pipeline
print("=== Transfer Learning Pipeline ===\n")
# Step 1: Train source task
source_rl = train_source_task()
# Step 2: Transfer to target task
target_rl = transfer_to_target_task(source_rl)
# Step 3: Fine-tune target task
target_rl = train_target_task(target_rl)
# Step 4: Compare with baseline (no transfer)
print("\n=== Comparison with Baseline ===")
baseline_rl = RLManager(
tool_names=["email", "ticket", "crm_search", "quote"],
q_table_path="rl_data/sales_baseline.pkl",
exploration_rate=0.2,
learning_rate=0.1
)
# Train baseline from scratch (same 10 epochs)
sales_queries = [
("Follow up on quote", ["email", "crm_search"]),
("Customer wants demo", ["email", "ticket"]),
("Generate quote", ["quote", "crm_search"]),
("Lead inquiry", ["email", "crm_search"]),
("Contract question", ["ticket", "quote"]),
]
for epoch in range(10):
for query, correct_tools in sales_queries:
selected, state_key = baseline_rl.select_tools(query, top_n=2)
reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
for tool in selected:
baseline_rl.update(state_key, tool, reward)
baseline_rl.anneal_exploration(decay_rate=0.95)
# Evaluate both
test_queries = [
("Send quote to customer", ["email", "quote"]),
("Update CRM with lead", ["crm_search"]),
]
print("\nTest Performance:")
for query, correct_tools in test_queries:
transfer_selected, _ = target_rl.select_tools(query, top_n=2)
baseline_selected, _ = baseline_rl.select_tools(query, top_n=2)
transfer_correct = any(t in correct_tools for t in transfer_selected)
baseline_correct = any(t in correct_tools for t in baseline_selected)
print(f"\nQuery: {query}")
print(f" Transfer Learning: {transfer_selected} - {'✓' if transfer_correct else '✗'}")
print(f" Baseline: {baseline_selected} - {'✓' if baseline_correct else '✗'}")
Selective Transfer
Transfer only high-quality knowledge:
def selective_transfer(source_rl, target_rl, min_visits=5, min_q_value=0.3):
"""
Transfer only well-learned state-tool pairs.
Args:
source_rl: Source RL manager
target_rl: Target RL manager
min_visits: Minimum visits for a state-tool pair
min_q_value: Minimum Q-value to transfer
"""
shared_tools = set(source_rl.tool_names) & set(target_rl.tool_names)
transferred = 0
for state_key in source_rl.q_table.keys():
for tool in shared_tools:
q_value = source_rl.q_table[state_key].get(tool, 0)
visits = source_rl.visit_counts[state_key].get(tool, 0)
# Only transfer if well-learned
if visits >= min_visits and q_value >= min_q_value:
target_rl.q_table[state_key][tool] = q_value
target_rl.visit_counts[state_key][tool] = visits
transferred += 1
target_rl.force_persist()
print(f"Selectively transferred {transferred} high-quality pairs")
return transferred
# Usage
transferred = selective_transfer(
source_rl,
target_rl,
min_visits=10,
min_q_value=0.5
)
📚 Curriculum Learning
Train progressively from simple to complex queries.
Why Curriculum Learning?
Learning is more effective when starting with basics:
# ❌ Random training (all difficulties mixed)
queries = shuffle([easy, medium, hard, expert])
# Result: Agent confused, slow convergence
# ✅ Curriculum (progressive difficulty)
Stage 1: easy queries (build foundation)
Stage 2: medium queries (add complexity)
Stage 3: hard queries (handle edge cases)
Stage 4: expert queries (master domain)
# Result: Faster convergence, better performance
Complete Curriculum Pipeline
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class CurriculumStage:
"""Define a curriculum stage."""
name: str
queries: List[Tuple[str, List[str]]] # (query, correct_tools)
epochs: int
difficulty: str
def create_curriculum():
"""Define curriculum stages."""
# Stage 1: Simple single-tool queries
stage1 = CurriculumStage(
name="Foundation",
queries=[
("Search for cats", ["search"]),
("Calculate 2+2", ["calculate"]),
("Get weather", ["weather"]),
("Send email", ["email"]),
],
epochs=5,
difficulty="easy"
)
# Stage 2: Two-tool combinations
stage2 = CurriculumStage(
name="Combinations",
queries=[
("Search and summarize AI papers", ["search", "summarize"]),
("Calculate tax and send email", ["calculate", "email"]),
("Get weather and search for activities", ["weather", "search"]),
("Analyze data and visualize", ["analyze", "visualize"]),
],
epochs=8,
difficulty="medium"
)
# Stage 3: Complex multi-step queries
stage3 = CurriculumStage(
name="Complex Tasks",
queries=[
("Research topic, analyze findings, summarize and email",
["search", "analyze", "summarize", "email"]),
("Calculate metrics, visualize results, generate report",
["calculate", "visualize", "email"]),
("Get weather forecast, search for events, email recommendations",
["weather", "search", "email"]),
],
epochs=10,
difficulty="hard"
)
# Stage 4: Expert ambiguous queries
stage4 = CurriculumStage(
name="Expert",
queries=[
("Help me with my project", ["search", "analyze"]), # Ambiguous
("What should I know?", ["search", "summarize"]), # Vague
("Process this", ["analyze", "visualize"]), # Underspecified
],
epochs=15,
difficulty="expert"
)
return [stage1, stage2, stage3, stage4]
def train_with_curriculum(rl_manager, curriculum):
"""
Train with curriculum learning.
Args:
rl_manager: RLManager instance
curriculum: List of CurriculumStage
"""
print("=== Curriculum Learning Pipeline ===\n")
all_metrics = {
"stage_names": [],
"stage_accuracy": [],
"stage_avg_q": [],
"cumulative_states": []
}
for stage_idx, stage in enumerate(curriculum, 1):
print(f"Stage {stage_idx}/{len(curriculum)}: {stage.name} ({stage.difficulty})")
print(f"Queries: {len(stage.queries)}, Epochs: {stage.epochs}")
stage_correct = 0
stage_total = 0
for epoch in range(stage.epochs):
epoch_correct = 0
for query, correct_tools in stage.queries:
selected, state_key = rl_manager.select_tools(query, top_n=2)
# Calculate reward
correct = any(tool in correct_tools for tool in selected)
reward = 1.0 if correct else -0.5
if correct:
epoch_correct += 1
stage_correct += 1
stage_total += 1
# Update Q-values
for tool in selected:
rl_manager.update(state_key, tool, reward)
# Decay exploration after each epoch
rl_manager.anneal_exploration(decay_rate=0.95)
# Print epoch progress
accuracy = epoch_correct / len(stage.queries)
print(f" Epoch {epoch+1}/{stage.epochs}: "
f"Accuracy={accuracy:.1%}, "
f"Exploration={rl_manager.exploration_rate:.2%}")
# Stage metrics
stage_accuracy = stage_correct / stage_total
stats = rl_manager.get_statistics()
all_metrics["stage_names"].append(stage.name)
all_metrics["stage_accuracy"].append(stage_accuracy)
all_metrics["stage_avg_q"].append(stats['avg_q_value'])
all_metrics["cumulative_states"].append(stats['total_states'])
print(f"Stage Complete: Accuracy={stage_accuracy:.1%}, "
f"Avg Q={stats['avg_q_value']:.3f}, "
f"States={stats['total_states']}\n")
rl_manager.force_persist()
return all_metrics
# Execute curriculum learning
rl_manager = RLManager(
tool_names=["search", "calculate", "weather", "email",
"summarize", "analyze", "visualize"],
q_table_path="rl_data/curriculum.pkl",
exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
exploration_rate=0.3, # Start high for early stages
epsilon_decay_rate=0.95,
min_exploration_rate=0.01,
learning_rate=0.1,
use_embeddings=True
)
curriculum = create_curriculum()
metrics = train_with_curriculum(rl_manager, curriculum)
# Visualize curriculum progression
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Accuracy per stage
axes[0, 0].bar(metrics["stage_names"], metrics["stage_accuracy"])
axes[0, 0].set_ylabel("Accuracy")
axes[0, 0].set_title("Accuracy by Curriculum Stage")
axes[0, 0].set_ylim([0, 1])
# Average Q-value per stage
axes[0, 1].plot(metrics["stage_names"], metrics["stage_avg_q"], marker='o')
axes[0, 1].set_ylabel("Average Q-Value")
axes[0, 1].set_title("Q-Value Growth Across Stages")
# Cumulative states
axes[1, 0].plot(metrics["stage_names"], metrics["cumulative_states"], marker='s')
axes[1, 0].set_ylabel("Total States")
axes[1, 0].set_title("State Space Growth")
# Difficulty progression
difficulties = ["easy", "medium", "hard", "expert"]
axes[1, 1].bar(metrics["stage_names"], range(1, len(metrics["stage_names"])+1))
axes[1, 1].set_ylabel("Difficulty Level")
axes[1, 1].set_title("Curriculum Difficulty Progression")
plt.tight_layout()
plt.savefig("curriculum_progression.png")
print("Curriculum visualization saved to curriculum_progression.png")
Adaptive Curriculum
Automatically adjust difficulty based on performance:
class AdaptiveCurriculum:
"""Adaptive curriculum that adjusts based on performance."""
def __init__(self, rl_manager, stages, mastery_threshold=0.8):
"""
Args:
rl_manager: RLManager instance
stages: List of CurriculumStage
mastery_threshold: Accuracy needed to advance
"""
self.rl_manager = rl_manager
self.stages = stages
self.mastery_threshold = mastery_threshold
self.current_stage = 0
def train(self):
"""Train with adaptive curriculum."""
while self.current_stage < len(self.stages):
stage = self.stages[self.current_stage]
print(f"\n=== Stage {self.current_stage + 1}: {stage.name} ===")
# Train on current stage
accuracy = self._train_stage(stage)
# Check mastery
if accuracy >= self.mastery_threshold:
print(f"✓ Mastered {stage.name} (accuracy: {accuracy:.1%})")
self.current_stage += 1
else:
print(f"✗ Need more practice on {stage.name} (accuracy: {accuracy:.1%})")
# Repeat stage with more epochs
stage.epochs += 3
def _train_stage(self, stage):
"""Train on a single stage."""
correct = 0
total = 0
for epoch in range(stage.epochs):
for query, correct_tools in stage.queries:
selected, state_key = self.rl_manager.select_tools(query, top_n=2)
is_correct = any(tool in correct_tools for tool in selected)
reward = 1.0 if is_correct else -0.5
if is_correct:
correct += 1
total += 1
for tool in selected:
self.rl_manager.update(state_key, tool, reward)
self.rl_manager.anneal_exploration(decay_rate=0.95)
return correct / total if total > 0 else 0
# Usage
rl_manager = RLManager(
tool_names=["search", "calculate", "weather", "email"],
q_table_path="rl_data/adaptive_curriculum.pkl",
exploration_rate=0.3,
learning_rate=0.1
)
curriculum = create_curriculum()
adaptive = AdaptiveCurriculum(rl_manager, curriculum, mastery_threshold=0.85)
adaptive.train()
🎯 Multi-Objective RL
Optimize multiple objectives simultaneously (accuracy, speed, cost).
Why Multi-Objective RL?
Real-world scenarios require balancing trade-offs:
# Single objective: Maximize accuracy
# Problem: May select slow, expensive tools
# Multi-objective: Balance accuracy, speed, cost
# Solution: Find optimal trade-off
Complete Multi-Objective Implementation
from azcore.rl.rewards import RewardCalculator
import time
class MultiObjectiveRewardCalculator(RewardCalculator):
"""
Reward calculator that balances multiple objectives.
Objectives:
1. Accuracy: Did the tool provide correct/useful output?
2. Speed: How fast was the tool?
3. Cost: How expensive is the tool?
"""
def __init__(
self,
accuracy_weight: float = 0.6,
speed_weight: float = 0.3,
cost_weight: float = 0.1,
tool_costs: dict = None
):
"""
Args:
accuracy_weight: Weight for accuracy (0-1)
speed_weight: Weight for speed (0-1)
cost_weight: Weight for cost (0-1)
tool_costs: Dict mapping tool name -> cost (0-1)
"""
self.accuracy_weight = accuracy_weight
self.speed_weight = speed_weight
self.cost_weight = cost_weight
# Normalize weights
total = accuracy_weight + speed_weight + cost_weight
self.accuracy_weight /= total
self.speed_weight /= total
self.cost_weight /= total
# Default costs if not provided
self.tool_costs = tool_costs or {}
def calculate(
self,
state,
result,
query: str,
execution_time: float = None,
selected_tools: list = None,
**kwargs
) -> float:
"""
Calculate multi-objective reward.
Args:
state: Agent state
result: Execution result
query: Original query
execution_time: Time taken (seconds)
selected_tools: List of tools used
"""
# 1. Accuracy score
accuracy_score = self._calculate_accuracy(result)
# 2. Speed score (inverse of time)
if execution_time:
# Normalize: < 1s = 1.0, > 10s = 0.0
speed_score = max(0, 1.0 - (execution_time / 10.0))
else:
speed_score = 0.5 # Neutral if unknown
# 3. Cost score (inverse of cost)
if selected_tools:
avg_cost = sum(self.tool_costs.get(t, 0.5) for t in selected_tools) / len(selected_tools)
cost_score = 1.0 - avg_cost
else:
cost_score = 0.5 # Neutral if unknown
# Weighted combination
total_reward = (
self.accuracy_weight * accuracy_score +
self.speed_weight * speed_score +
self.cost_weight * cost_score
)
# Scale to [-1, 1]
return 2 * total_reward - 1
def _calculate_accuracy(self, result) -> float:
"""
Calculate accuracy score from result.
Returns: Score between 0 and 1
"""
if not result or 'messages' not in result:
return 0.0
last_message = result['messages'][-1].content.lower()
# Check for success indicators
success_indicators = ["success", "complete", "done", "found", "calculated"]
failure_indicators = ["error", "failed", "unable", "cannot", "sorry"]
if any(ind in last_message for ind in success_indicators):
return 1.0
elif any(ind in last_message for ind in failure_indicators):
return 0.0
else:
return 0.5 # Neutral
# Define tool costs
tool_costs = {
"search": 0.7, # Expensive (API calls)
"calculate": 0.1, # Cheap (local)
"weather": 0.5, # Medium (API call)
"email": 0.3, # Low (simple operation)
"analyze": 0.8, # Expensive (LLM call)
"visualize": 0.4, # Medium (computation)
}
# Create multi-objective RL system
reward_calculator = MultiObjectiveRewardCalculator(
accuracy_weight=0.6,
speed_weight=0.3,
cost_weight=0.1,
tool_costs=tool_costs
)
rl_manager = RLManager(
tool_names=list(tool_costs.keys()),
q_table_path="rl_data/multi_objective.pkl",
exploration_rate=0.15,
learning_rate=0.1,
use_embeddings=True
)
# Train with multi-objective rewards
training_data = [
("Search for Python tutorials", ["search"], 0.9), # Accurate but slow/expensive
("Calculate 15% of 200", ["calculate"], 1.0), # Fast, cheap, accurate
("Get weather forecast", ["weather"], 0.8), # Medium on all
("Send status email", ["email"], 0.95), # Fast, cheap
("Analyze sales data", ["analyze"], 0.85), # Slow, expensive
]
print("=== Multi-Objective Training ===\n")
for epoch in range(20):
epoch_rewards = []
for query, correct_tools, expected_accuracy in training_data:
start_time = time.time()
# Select tools
selected, state_key = rl_manager.select_tools(query, top_n=1)
# Simulate execution
execution_time = time.time() - start_time + (0.1 if selected[0] == "calculate" else 0.5)
# Mock result
result = {
'messages': [HumanMessage(content="Success" if selected[0] in correct_tools else "Unable to complete")]
}
# Calculate multi-objective reward
reward = reward_calculator.calculate(
state=None,
result=result,
query=query,
execution_time=execution_time,
selected_tools=selected
)
epoch_rewards.append(reward)
# Update Q-values
for tool in selected:
rl_manager.update(state_key, tool, reward)
rl_manager.anneal_exploration(decay_rate=0.95)
avg_reward = sum(epoch_rewards) / len(epoch_rewards)
print(f"Epoch {epoch+1}: Avg Reward = {avg_reward:.3f}")
# Analyze learned preferences
print("\n=== Learned Tool Preferences ===")
top_tools = rl_manager.get_top_performing_tools(top_n=len(tool_costs))
for tool, avg_q in top_tools:
cost = tool_costs[tool]
print(f"{tool:12s}: Q={avg_q:5.3f}, Cost={cost:.1f} "
f"({'Preferred' if avg_q > 0.5 else 'Avoided'})")
Pareto Optimization
Find Pareto-optimal solutions:
class ParetoOptimizer:
"""Find Pareto-optimal tool selections."""
def __init__(self, rl_manager, objectives):
"""
Args:
rl_manager: RLManager instance
objectives: Dict of objective_name -> callable(tool) -> score
"""
self.rl_manager = rl_manager
self.objectives = objectives
def find_pareto_front(self, query: str):
"""
Find Pareto-optimal tool selections for query.
Returns: List of (tools, objective_scores) on Pareto front
"""
# Generate all possible tool combinations
from itertools import combinations
candidates = []
for r in range(1, len(self.rl_manager.tool_names) + 1):
for combo in combinations(self.rl_manager.tool_names, r):
scores = {
name: obj(combo)
for name, obj in self.objectives.items()
}
candidates.append((list(combo), scores))
# Find Pareto front
pareto_front = []
for candidate in candidates:
is_dominated = False
for other in candidates:
if self._dominates(other[1], candidate[1]):
is_dominated = True
break
if not is_dominated:
pareto_front.append(candidate)
return pareto_front
def _dominates(self, scores1, scores2):
"""Check if scores1 dominates scores2."""
better_in_all = all(scores1[k] >= scores2[k] for k in scores1.keys())
better_in_some = any(scores1[k] > scores2[k] for k in scores1.keys())
return better_in_all and better_in_some
# Define objectives
objectives = {
"accuracy": lambda tools: sum(0.9 if t in ["search", "analyze"] else 0.5 for t in tools) / len(tools),
"speed": lambda tools: sum(0.9 if t == "calculate" else 0.3 for t in tools) / len(tools),
"cost": lambda tools: 1.0 - (sum(tool_costs[t] for t in tools) / len(tools))
}
optimizer = ParetoOptimizer(rl_manager, objectives)
pareto_front = optimizer.find_pareto_front("Analyze data")
print("\n=== Pareto-Optimal Solutions ===")
for tools, scores in pareto_front[:5]: # Top 5
print(f"Tools: {tools}")
print(f" Accuracy: {scores['accuracy']:.2f}, "
f"Speed: {scores['speed']:.2f}, "
f"Cost: {scores['cost']:.2f}\n")
🏗️ Hierarchical RL
Structure RL into high-level and low-level policies.
Why Hierarchical RL?
Complex tasks benefit from hierarchical decomposition:
# High-level policy: Choose sub-goal
# Low-level policy: Execute tools for sub-goal
# Example: "Research and report on AI"
# High-level: [Research] → [Analyze] → [Report]
# Low-level Research: [search, summarize]
# Low-level Analyze: [analyze, visualize]
# Low-level Report: [email]
Complete Hierarchical Implementation
class HierarchicalRLSystem:
"""Hierarchical RL with high-level and low-level policies."""
def __init__(self, tool_hierarchy: dict):
"""
Args:
tool_hierarchy: Dict mapping high-level action -> low-level tools
"""
self.tool_hierarchy = tool_hierarchy
# High-level RL: Choose sub-goals
self.high_level_rl = RLManager(
tool_names=list(tool_hierarchy.keys()),
q_table_path="rl_data/high_level.pkl",
exploration_rate=0.15,
learning_rate=0.1
)
# Low-level RL managers: Choose tools for each sub-goal
self.low_level_rls = {}
for subgoal, tools in tool_hierarchy.items():
self.low_level_rls[subgoal] = RLManager(
tool_names=tools,
q_table_path=f"rl_data/low_level_{subgoal}.pkl",
exploration_rate=0.2,
learning_rate=0.1
)
def select_hierarchical_tools(self, query: str, max_subgoals: int = 2):
"""
Select tools using hierarchical policy.
Returns: (selected_tools, hierarchy_info)
"""
# High-level: Select sub-goals
subgoals, hl_state = self.high_level_rl.select_tools(query, top_n=max_subgoals)
# Low-level: Select tools for each sub-goal
all_tools = []
hierarchy_info = {
"high_level_state": hl_state,
"subgoals": subgoals,
"subgoal_tools": {}
}
for subgoal in subgoals:
if subgoal in self.low_level_rls:
ll_rl = self.low_level_rls[subgoal]
tools, ll_state = ll_rl.select_tools(query, top_n=2)
all_tools.extend(tools)
hierarchy_info["subgoal_tools"][subgoal] = {
"tools": tools,
"state": ll_state
}
return all_tools, hierarchy_info
def update_hierarchical(self, hierarchy_info, reward: float):
"""Update both high-level and low-level policies."""
# Update high-level
hl_state = hierarchy_info["high_level_state"]
for subgoal in hierarchy_info["subgoals"]:
self.high_level_rl.update(hl_state, subgoal, reward)
# Update low-level
for subgoal, info in hierarchy_info["subgoal_tools"].items():
ll_state = info["state"]
for tool in info["tools"]:
self.low_level_rls[subgoal].update(ll_state, tool, reward)
# Define tool hierarchy
tool_hierarchy = {
"research": ["search", "summarize"],
"analyze": ["analyze", "visualize"],
"communicate": ["email", "report"],
"calculate": ["calculate", "forecast"]
}
hierarchical_system = HierarchicalRLSystem(tool_hierarchy)
# Train hierarchical system
training_queries = [
("Research AI trends and email summary", ["research", "communicate"], 1.0),
("Analyze sales data and visualize", ["analyze"], 0.9),
("Calculate revenue forecast", ["calculate"], 0.95),
("Research competitors and analyze", ["research", "analyze"], 0.85),
]
print("=== Hierarchical RL Training ===\n")
for epoch in range(15):
for query, correct_subgoals, expected_quality in training_queries:
# Select tools hierarchically
tools, hierarchy_info = hierarchical_system.select_hierarchical_tools(query)
# Evaluate
selected_subgoals = hierarchy_info["subgoals"]
is_correct = any(sg in correct_subgoals for sg in selected_subgoals)
reward = expected_quality if is_correct else -0.5
# Update hierarchical policies
hierarchical_system.update_hierarchical(hierarchy_info, reward)
print(f"Query: {query}")
print(f" Subgoals: {selected_subgoals}")
print(f" Tools: {tools}")
print(f" Reward: {reward:.2f}\n")
# Anneal exploration
hierarchical_system.high_level_rl.anneal_exploration(decay_rate=0.95)
for ll_rl in hierarchical_system.low_level_rls.values():
ll_rl.anneal_exploration(decay_rate=0.95)
print("Hierarchical training complete!")
🔬 Meta-Learning
Learn how to learn efficiently across tasks.
Why Meta-Learning?
Quickly adapt to new tasks using meta-knowledge:
# Traditional: Train from scratch for each new task
# Meta-learning: Learn initialization that adapts quickly
Complete Meta-Learning Implementation
class MetaLearningRLManager:
"""Meta-learning for RL that learns good initializations."""
def __init__(self, tool_names, meta_lr=0.01):
"""
Args:
tool_names: List of tool names
meta_lr: Meta-learning rate
"""
self.tool_names = tool_names
self.meta_lr = meta_lr
# Meta-parameters (learned across tasks)
self.meta_q_init = {tool: 0.0 for tool in tool_names}
self.task_history = []
def create_task_rl(self, task_name: str):
"""Create RL manager for specific task with meta-init."""
rl_manager = RLManager(
tool_names=self.tool_names,
q_table_path=f"rl_data/meta_{task_name}.pkl",
exploration_rate=0.2,
learning_rate=0.1
)
# Initialize with meta-learned values
for state in rl_manager.q_table.keys():
for tool in self.tool_names:
rl_manager.q_table[state][tool] = self.meta_q_init[tool]
return rl_manager
def meta_update(self, task_performances: dict):
"""
Update meta-parameters based on task performances.
Args:
task_performances: Dict of task_name -> (rl_manager, final_reward)
"""
# Aggregate Q-values from all tasks
tool_q_aggregates = {tool: [] for tool in self.tool_names}
for task_name, (rl_manager, reward) in task_performances.items():
# Weight by task performance
weight = max(0, reward)
for state in rl_manager.q_table.keys():
for tool in self.tool_names:
q_value = rl_manager.q_table[state].get(tool, 0)
tool_q_aggregates[tool].append(q_value * weight)
# Update meta-initialization
for tool in self.tool_names:
if tool_q_aggregates[tool]:
avg_q = sum(tool_q_aggregates[tool]) / len(tool_q_aggregates[tool])
self.meta_q_init[tool] += self.meta_lr * (avg_q - self.meta_q_init[tool])
self.task_history.append(task_performances)
print("Meta-parameters updated:")
for tool, q in self.meta_q_init.items():
print(f" {tool}: {q:.3f}")
# Meta-learning training
meta_learner = MetaLearningRLManager(
tool_names=["search", "calculate", "email", "analyze"],
meta_lr=0.1
)
# Simulate multiple tasks
tasks = {
"customer_support": [
("Help with account", ["search", "email"], 1.0),
("Calculate refund", ["calculate", "email"], 0.9),
],
"data_analysis": [
("Analyze trends", ["analyze"], 1.0),
("Calculate metrics", ["calculate", "analyze"], 0.95),
],
"research": [
("Search papers", ["search"], 1.0),
("Analyze findings", ["search", "analyze"], 0.9),
]
}
print("=== Meta-Learning Training ===\n")
for meta_epoch in range(5):
print(f"\nMeta-Epoch {meta_epoch + 1}")
task_performances = {}
# Train on each task
for task_name, task_data in tasks.items():
print(f"\n Training on task: {task_name}")
# Create task-specific RL with meta-init
rl_manager = meta_learner.create_task_rl(task_name)
# Train on task
task_rewards = []
for epoch in range(5): # Few-shot adaptation
for query, correct_tools, _ in task_data:
selected, state_key = rl_manager.select_tools(query, top_n=2)
reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
task_rewards.append(reward)
for tool in selected:
rl_manager.update(state_key, tool, reward)
final_reward = sum(task_rewards) / len(task_rewards)
task_performances[task_name] = (rl_manager, final_reward)
print(f" Final reward: {final_reward:.3f}")
# Meta-update
meta_learner.meta_update(task_performances)
print("\n=== Meta-Learning Complete ===")
print("Learned meta-initialization:")
for tool, q in meta_learner.meta_q_init.items():
print(f" {tool}: {q:.3f}")
🎲 Ensemble Methods
Combine multiple RL policies for robust decisions.
Why Ensemble Methods?
Reduce variance and improve robustness:
# Single policy: May be unstable or overfitted
# Ensemble: Majority vote or weighted average
Complete Ensemble Implementation
class RLEnsemble:
"""Ensemble of multiple RL managers."""
def __init__(self, tool_names, n_members=5):
"""
Args:
tool_names: List of tool names
n_members: Number of ensemble members
"""
self.tool_names = tool_names
self.members = []
# Create diverse ensemble members
for i in range(n_members):
rl_manager = RLManager(
tool_names=tool_names,
q_table_path=f"rl_data/ensemble_member_{i}.pkl",
exploration_strategy=ExplorationStrategy.EPSILON_DECAY if i % 2 == 0 else ExplorationStrategy.UCB,
exploration_rate=0.1 + (i * 0.05), # Varied exploration
learning_rate=0.05 + (i * 0.02), # Varied learning rates
use_embeddings=True
)
self.members.append(rl_manager)
def select_tools_ensemble(self, query: str, top_n: int = 2, method="voting"):
"""
Select tools using ensemble.
Args:
query: User query
top_n: Number of tools to select
method: "voting" or "averaging"
"""
if method == "voting":
return self._select_by_voting(query, top_n)
elif method == "averaging":
return self._select_by_averaging(query, top_n)
else:
raise ValueError(f"Unknown method: {method}")
def _select_by_voting(self, query: str, top_n: int):
"""Majority voting across ensemble members."""
votes = {tool: 0 for tool in self.tool_names}
member_selections = []
# Collect votes from each member
for member in self.members:
selected, state_key = member.select_tools(query, top_n=top_n)
member_selections.append((selected, state_key))
for tool in selected:
votes[tool] += 1
# Select top-voted tools
sorted_tools = sorted(votes.items(), key=lambda x: x[1], reverse=True)
ensemble_selected = [tool for tool, _ in sorted_tools[:top_n]]
return ensemble_selected, member_selections
def _select_by_averaging(self, query: str, top_n: int):
"""Average Q-values across ensemble members."""
avg_q_values = {tool: 0.0 for tool in self.tool_names}
member_selections = []
# Collect Q-values from each member
for member in self.members:
selected, state_key = member.select_tools(query, top_n=len(self.tool_names))
member_selections.append((selected, state_key))
# Get Q-values for this state
if state_key in member.q_table:
for tool, q_value in member.q_table[state_key].items():
avg_q_values[tool] += q_value / len(self.members)
# Select top Q-value tools
sorted_tools = sorted(avg_q_values.items(), key=lambda x: x[1], reverse=True)
ensemble_selected = [tool for tool, _ in sorted_tools[:top_n]]
return ensemble_selected, member_selections
def update_ensemble(self, member_selections, reward: float):
"""Update all ensemble members."""
for i, (selected, state_key) in enumerate(member_selections):
for tool in selected:
self.members[i].update(state_key, tool, reward)
# Train ensemble
ensemble = RLEnsemble(
tool_names=["search", "calculate", "email", "analyze"],
n_members=5
)
training_data = [
("Search for info", ["search"], 1.0),
("Calculate tax", ["calculate"], 1.0),
("Send report", ["email"], 0.9),
("Analyze data", ["analyze"], 0.95),
]
print("=== Ensemble Training ===\n")
for epoch in range(20):
for query, correct_tools, expected_quality in training_data:
# Ensemble selection (voting)
selected, member_selections = ensemble.select_tools_ensemble(query, top_n=2, method="voting")
# Evaluate
is_correct = any(t in correct_tools for t in selected)
reward = expected_quality if is_correct else -0.5
# Update ensemble
ensemble.update_ensemble(member_selections, reward)
print(f"Epoch {epoch+1} complete")
# Compare methods
test_query = "Search and analyze documents"
voting_tools, _ = ensemble.select_tools_ensemble(test_query, top_n=2, method="voting")
averaging_tools, _ = ensemble.select_tools_ensemble(test_query, top_n=2, method="averaging")
print(f"\nTest Query: {test_query}")
print(f" Voting: {voting_tools}")
print(f" Averaging: {averaging_tools}")
🌊 Online Learning
Continuously learn from production data.
Why Online Learning?
Adapt to changing patterns in real-time:
# Offline: Train once, deploy
# Online: Continuously update from production feedback
Complete Online Learning Implementation
import threading
import queue
from datetime import datetime
class OnlineLearningSystem:
"""Online learning system for continuous adaptation."""
def __init__(self, rl_manager, update_interval=100):
"""
Args:
rl_manager: RLManager instance
update_interval: Update Q-table every N interactions
"""
self.rl_manager = rl_manager
self.update_interval = update_interval
# Online learning queues
self.experience_queue = queue.Queue()
self.interaction_count = 0
self.update_lock = threading.Lock()
# Metrics
self.online_metrics = {
"interactions": 0,
"updates": 0,
"avg_reward": 0.0,
"rewards_history": []
}
def interact(self, query: str, top_n: int = 2):
"""
Interact with system (production use).
Returns: (selected_tools, interaction_id)
"""
selected, state_key = self.rl_manager.select_tools(query, top_n=top_n)
# Generate interaction ID
interaction_id = f"{datetime.now().timestamp()}_{self.interaction_count}"
self.interaction_count += 1
# Store for later feedback
self.experience_queue.put({
"id": interaction_id,
"query": query,
"selected": selected,
"state_key": state_key,
"timestamp": datetime.now()
})
self.online_metrics["interactions"] += 1
return selected, interaction_id
def provide_feedback(self, interaction_id: str, reward: float):
"""
Provide feedback for an interaction.
Args:
interaction_id: ID from interact()
reward: Reward signal
"""
# Find interaction in queue
experiences = []
found = False
while not self.experience_queue.empty():
exp = self.experience_queue.get()
if exp["id"] == interaction_id:
# Update with reward
with self.update_lock:
for tool in exp["selected"]:
self.rl_manager.update(exp["state_key"], tool, reward)
self.online_metrics["updates"] += 1
self.online_metrics["rewards_history"].append(reward)
# Update running average
n = len(self.online_metrics["rewards_history"])
self.online_metrics["avg_reward"] = (
(self.online_metrics["avg_reward"] * (n - 1) + reward) / n
)
found = True
else:
experiences.append(exp)
# Re-add non-matched experiences
for exp in experiences:
self.experience_queue.put(exp)
# Periodic persistence
if self.online_metrics["updates"] % self.update_interval == 0:
self.rl_manager.force_persist()
print(f"Online update checkpoint: {self.online_metrics['updates']} updates, "
f"avg reward: {self.online_metrics['avg_reward']:.3f}")
return found
def get_metrics(self):
"""Get online learning metrics."""
return self.online_metrics.copy()
# Simulate online learning
rl_manager = RLManager(
tool_names=["search", "calculate", "email", "analyze"],
q_table_path="rl_data/online_learning.pkl",
exploration_strategy=ExplorationStrategy.EPSILON_DECAY,
exploration_rate=0.1, # Low exploration in production
learning_rate=0.05, # Conservative online updates
use_embeddings=True
)
online_system = OnlineLearningSystem(rl_manager, update_interval=10)
print("=== Online Learning Simulation ===\n")
# Simulate production usage with delayed feedback
import random
production_queries = [
("Search for documentation", ["search"], 1.0),
("Calculate monthly revenue", ["calculate"], 0.95),
("Send quarterly report", ["email"], 0.9),
("Analyze customer data", ["analyze"], 0.85),
]
for i in range(50):
# User interaction
query, correct_tools, base_reward = random.choice(production_queries)
selected, interaction_id = online_system.interact(query, top_n=2)
print(f"Interaction {i+1}: {query[:30]}...")
print(f" Selected: {selected}")
# Simulate user feedback (delayed)
is_correct = any(t in correct_tools for t in selected)
reward = base_reward if is_correct else -0.3
# Provide feedback immediately (could be delayed in real system)
online_system.provide_feedback(interaction_id, reward)
if (i + 1) % 10 == 0:
metrics = online_system.get_metrics()
print(f"\n--- Checkpoint at {i+1} interactions ---")
print(f"Total updates: {metrics['updates']}")
print(f"Avg reward: {metrics['avg_reward']:.3f}\n")
print("=== Online Learning Complete ===")
final_metrics = online_system.get_metrics()
print(f"Total interactions: {final_metrics['interactions']}")
print(f"Total updates: {final_metrics['updates']}")
print(f"Final avg reward: {final_metrics['avg_reward']:.3f}")
🎓 Best Practices
1. Start Simple, Then Advanced
# ✅ Progression
# 1. Single agent with basic RL
# 2. Add curriculum learning
# 3. Try multi-objective rewards
# 4. Scale to multi-agent if needed
# ❌ Don't start with complex hierarchical multi-objective meta-learning
2. Monitor Each Component
# ✅ Track metrics for each pattern
multi_agent_stats = {
agent_name: rl_manager.get_statistics()
for agent_name, rl_manager in agents.items()
}
# ❌ Don't deploy advanced patterns without monitoring
3. Validate Transfer Learning
# ✅ Compare transfer vs baseline
transfer_performance = evaluate(transfer_rl)
baseline_performance = evaluate(baseline_rl)
improvement = (transfer_performance - baseline_performance) / baseline_performance
print(f"Transfer learning improvement: {improvement:.1%}")
# ❌ Don't assume transfer always helps
4. Balance Complexity vs Benefit
# ✅ Use advanced patterns when they provide clear value
if dataset_size > 1000 and task_similarity > 0.7:
use_transfer_learning()
# ❌ Don't add complexity without justification
📊 Pattern Selection Guide
| Pattern | Use When | Benefit | Complexity |
|---|---|---|---|
| Multi-Agent | Multiple domains | Specialized expertise | ⭐⭐ Medium |
| Transfer Learning | Similar tasks | Faster convergence | ⭐⭐ Medium |
| Curriculum | Complex tasks | Better learning | ⭐⭐ Medium |
| Multi-Objective | Trade-offs needed | Balanced optimization | ⭐⭐⭐ High |
| Hierarchical | Decomposable tasks | Structured learning | ⭐⭐⭐⭐ Very High |
| Meta-Learning | Many tasks | Fast adaptation | ⭐⭐⭐⭐ Very High |
| Ensemble | High stakes | Robustness | ⭐⭐ Medium |
| Online Learning | Production | Continuous adaptation | ⭐⭐⭐ High |
🎯 Summary
Advanced RL patterns in Azcore enable:
- Multi-Agent RL: Specialized agents per domain
- Transfer Learning: Reuse knowledge across tasks
- Curriculum Learning: Progressive difficulty training
- Multi-Objective RL: Balance competing objectives
- Hierarchical RL: Structured decision-making
- Meta-Learning: Learn to learn efficiently
- Ensemble Methods: Robust predictions
- Online Learning: Continuous adaptation
Choose patterns based on your specific needs and gradually increase complexity as required.