Comprehensive tools and techniques for monitoring RL performance, debugging issues, and ensuring system health in production.
📊 Real-Time Metrics
Monitor RL system health in real-time.
Basic Statistics
from azcore.rl.rl_manager import RLManager
# Create RL manager
rl_manager = RLManager(
tool_names=["search", "calculate", "weather", "email"],
q_table_path="rl_data/production.pkl",
use_embeddings=True
)
# Get current statistics
stats = rl_manager.get_statistics()
print("=== RL System Statistics ===")
print(f"Total States: {stats['total_states']}")
print(f"Exploration Rate: {stats['exploration_rate']:.2%}")
print(f"Exploration Strategy: {stats['exploration_strategy']}")
print(f"Total State Visits: {stats['total_state_visits']}")
print(f"Average Q-Value: {stats['avg_q_value']:.3f}")
print(f"Max Q-Value: {stats['max_q_value']:.3f}")
print(f"Min Q-Value: {stats['min_q_value']:.3f}")
Continuous Monitoring Dashboard
import time
from datetime import datetime
from collections import deque
class RLMonitoringDashboard:
"""Real-time monitoring dashboard for RL systems."""
def __init__(self, rl_manager, window_size=100):
"""
Args:
rl_manager: RLManager instance
window_size: Size of rolling window for metrics
"""
self.rl_manager = rl_manager
self.window_size = window_size
# Rolling windows for metrics
self.reward_history = deque(maxlen=window_size)
self.exploration_history = deque(maxlen=window_size)
self.q_value_history = deque(maxlen=window_size)
self.tool_selection_counts = {tool: 0 for tool in rl_manager.tool_names}
# Timestamps
self.start_time = datetime.now()
self.last_update = datetime.now()
def record_interaction(self, selected_tools, reward):
"""Record an interaction for monitoring."""
# Update histories
self.reward_history.append(reward)
self.exploration_history.append(self.rl_manager.exploration_rate)
# Update tool selection counts
for tool in selected_tools:
self.tool_selection_counts[tool] += 1
# Get current average Q-value
stats = self.rl_manager.get_statistics()
self.q_value_history.append(stats['avg_q_value'])
self.last_update = datetime.now()
def get_dashboard_summary(self):
"""Get summary of current metrics."""
if not self.reward_history:
return "No data collected yet"
# Calculate metrics
avg_reward = sum(self.reward_history) / len(self.reward_history)
avg_exploration = sum(self.exploration_history) / len(self.exploration_history)
avg_q_value = sum(self.q_value_history) / len(self.q_value_history)
# Runtime
runtime = (datetime.now() - self.start_time).total_seconds()
# Tool distribution
total_selections = sum(self.tool_selection_counts.values())
tool_distribution = {
tool: (count / total_selections if total_selections > 0 else 0)
for tool, count in self.tool_selection_counts.items()
}
summary = f"""
=== RL Monitoring Dashboard ===
Runtime: {runtime:.1f}s
Last Update: {self.last_update.strftime('%H:%M:%S')}
Recent Performance (last {len(self.reward_history)} interactions):
Avg Reward: {avg_reward:.3f}
Avg Exploration: {avg_exploration:.2%}
Avg Q-Value: {avg_q_value:.3f}
Tool Selection Distribution:
"""
for tool, pct in sorted(tool_distribution.items(), key=lambda x: x[1], reverse=True):
bar = "█" * int(pct * 50)
summary += f" {tool:12s}: {pct:5.1%} {bar}\n"
return summary
def display(self, clear_screen=True):
"""Display dashboard (for terminal monitoring)."""
if clear_screen:
import os
os.system('cls' if os.name == 'nt' else 'clear')
print(self.get_dashboard_summary())
# Usage
dashboard = RLMonitoringDashboard(rl_manager, window_size=100)
# Simulate interactions
for i in range(50):
query = f"Query {i}"
selected, state_key = rl_manager.select_tools(query, top_n=2)
# Simulate reward
reward = 0.8 if i % 3 == 0 else 0.5
# Update RL
for tool in selected:
rl_manager.update(state_key, tool, reward)
# Record for monitoring
dashboard.record_interaction(selected, reward)
# Display dashboard every 10 interactions
if (i + 1) % 10 == 0:
dashboard.display(clear_screen=False)
time.sleep(0.5)
📈 Q-Value Tracking
Track Q-value evolution over time to monitor learning progress.
Q-Value History Tracking
class QValueTracker:
"""Track Q-value evolution over time."""
def __init__(self, rl_manager):
"""
Args:
rl_manager: RLManager instance
"""
self.rl_manager = rl_manager
self.history = []
self.timestamps = []
def snapshot(self):
"""Take a snapshot of current Q-values."""
snapshot = {
"timestamp": datetime.now(),
"q_table": {},
"statistics": self.rl_manager.get_statistics()
}
# Deep copy Q-table
for state_key, actions in self.rl_manager.q_table.items():
snapshot["q_table"][state_key] = dict(actions)
self.history.append(snapshot)
self.timestamps.append(snapshot["timestamp"])
def get_tool_q_evolution(self, tool_name: str, state_key: str = None):
"""
Get Q-value evolution for a specific tool.
Args:
tool_name: Name of tool
state_key: Optional specific state key
Returns: List of Q-values over time
"""
q_values = []
for snapshot in self.history:
if state_key:
# Specific state
if state_key in snapshot["q_table"]:
q_values.append(snapshot["q_table"][state_key].get(tool_name, 0.0))
else:
q_values.append(0.0)
else:
# Average across all states
state_q_values = [
actions.get(tool_name, 0.0)
for actions in snapshot["q_table"].values()
]
avg_q = sum(state_q_values) / len(state_q_values) if state_q_values else 0.0
q_values.append(avg_q)
return q_values
def get_convergence_metrics(self):
"""Calculate convergence metrics."""
if len(self.history) < 10:
return {"converged": False, "reason": "Insufficient data"}
# Get recent Q-value averages
recent_q_values = [snap["statistics"]["avg_q_value"] for snap in self.history[-10:]]
# Calculate variance
mean_q = sum(recent_q_values) / len(recent_q_values)
variance = sum((q - mean_q) ** 2 for q in recent_q_values) / len(recent_q_values)
std_dev = variance ** 0.5
# Check convergence (low variance in recent history)
converged = std_dev < 0.01
return {
"converged": converged,
"mean_q": mean_q,
"std_dev": std_dev,
"variance": variance,
"recent_q_values": recent_q_values
}
def plot_q_value_convergence(self, tool_names=None, save_path="q_convergence.png"):
"""Plot Q-value convergence."""
import matplotlib.pyplot as plt
if not self.history:
print("No history to plot")
return
if tool_names is None:
tool_names = self.rl_manager.tool_names
plt.figure(figsize=(12, 6))
for tool in tool_names:
q_values = self.get_tool_q_evolution(tool)
plt.plot(range(len(q_values)), q_values, marker='o', label=tool)
plt.xlabel("Snapshot Index")
plt.ylabel("Average Q-Value")
plt.title("Q-Value Convergence Over Time")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(save_path)
print(f"Q-value convergence plot saved to {save_path}")
# Training with Q-value tracking
tracker = QValueTracker(rl_manager)
training_data = [
("Search for documentation", ["search"]),
("Calculate revenue", ["calculate"]),
("Send email report", ["email"]),
("Get weather forecast", ["weather"]),
]
print("=== Training with Q-Value Tracking ===\n")
for epoch in range(20):
for query, correct_tools in training_data:
selected, state_key = rl_manager.select_tools(query, top_n=2)
reward = 1.0 if any(t in correct_tools for t in selected) else -0.5
for tool in selected:
rl_manager.update(state_key, tool, reward)
# Take snapshot every 5 epochs
if (epoch + 1) % 5 == 0:
tracker.snapshot()
print(f"Epoch {epoch+1}: Snapshot taken")
# Check convergence
convergence = tracker.get_convergence_metrics()
print(f"\n=== Convergence Analysis ===")
print(f"Converged: {convergence['converged']}")
print(f"Mean Q-Value: {convergence['mean_q']:.3f}")
print(f"Std Dev: {convergence['std_dev']:.4f}")
# Plot Q-value evolution
tracker.plot_q_value_convergence()
🔍 Tool Performance Analysis
Analyze individual tool performance and selection patterns.
Tool Performance Analyzer
class ToolPerformanceAnalyzer:
"""Analyze tool performance in detail."""
def __init__(self, rl_manager):
"""
Args:
rl_manager: RLManager instance
"""
self.rl_manager = rl_manager
def get_tool_statistics(self, tool_name: str):
"""
Get detailed statistics for a specific tool.
Args:
tool_name: Name of tool
Returns: Dict of statistics
"""
q_values = []
visit_counts = []
states_used = 0
for state_key in self.rl_manager.q_table.keys():
if tool_name in self.rl_manager.q_table[state_key]:
q_values.append(self.rl_manager.q_table[state_key][tool_name])
visits = self.rl_manager.visit_counts[state_key].get(tool_name, 0)
visit_counts.append(visits)
if visits > 0:
states_used += 1
if not q_values:
return None
return {
"tool": tool_name,
"avg_q_value": sum(q_values) / len(q_values),
"max_q_value": max(q_values),
"min_q_value": min(q_values),
"total_visits": sum(visit_counts),
"states_present": len(q_values),
"states_used": states_used,
"avg_visits_per_state": sum(visit_counts) / len(visit_counts) if visit_counts else 0
}
def compare_tools(self):
"""Compare all tools side-by-side."""
print("=== Tool Performance Comparison ===\n")
all_stats = []
for tool in self.rl_manager.tool_names:
stats = self.get_tool_statistics(tool)
if stats:
all_stats.append(stats)
# Sort by average Q-value
all_stats.sort(key=lambda x: x["avg_q_value"], reverse=True)
# Print table
print(f"{'Tool':<15} {'Avg Q':>8} {'Total Visits':>12} {'States Used':>12} {'Ranking':>10}")
print("-" * 70)
for i, stats in enumerate(all_stats, 1):
ranking = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else f"#{i}"
print(f"{stats['tool']:<15} {stats['avg_q_value']:>8.3f} "
f"{stats['total_visits']:>12} {stats['states_used']:>12} {ranking:>10}")
def find_underutilized_tools(self, visit_threshold=5):
"""
Find tools that are underutilized.
Args:
visit_threshold: Minimum visits to be considered utilized
Returns: List of underutilized tools
"""
underutilized = []
for tool in self.rl_manager.tool_names:
stats = self.get_tool_statistics(tool)
if stats and stats["total_visits"] < visit_threshold:
underutilized.append((tool, stats["total_visits"]))
return underutilized
def find_overselected_tools(self, threshold_percentile=75):
"""
Find tools that are selected disproportionately often.
Args:
threshold_percentile: Percentile threshold
Returns: List of overselected tools
"""
all_visits = []
for tool in self.rl_manager.tool_names:
stats = self.get_tool_statistics(tool)
if stats:
all_visits.append((tool, stats["total_visits"]))
# Calculate percentile
visit_counts = [v for _, v in all_visits]
visit_counts.sort()
percentile_index = int(len(visit_counts) * threshold_percentile / 100)
percentile_value = visit_counts[percentile_index] if percentile_index < len(visit_counts) else visit_counts[-1]
# Find tools above threshold
overselected = [(tool, visits) for tool, visits in all_visits if visits > percentile_value]
overselected.sort(key=lambda x: x[1], reverse=True)
return overselected
def generate_tool_report(self, save_path="tool_report.txt"):
"""Generate comprehensive tool performance report."""
report_lines = []
report_lines.append("=" * 70)
report_lines.append("RL TOOL PERFORMANCE REPORT")
report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append("=" * 70)
report_lines.append("")
# Overall statistics
stats = self.rl_manager.get_statistics()
report_lines.append("Overall System Statistics:")
report_lines.append(f" Total States: {stats['total_states']}")
report_lines.append(f" Exploration Rate: {stats['exploration_rate']:.2%}")
report_lines.append(f" Average Q-Value: {stats['avg_q_value']:.3f}")
report_lines.append("")
# Top performers
report_lines.append("Top Performing Tools:")
top_tools = self.rl_manager.get_top_performing_tools(top_n=5)
for i, (tool, avg_q) in enumerate(top_tools, 1):
report_lines.append(f" {i}. {tool}: Q={avg_q:.3f}")
report_lines.append("")
# Detailed tool statistics
report_lines.append("Detailed Tool Statistics:")
for tool in self.rl_manager.tool_names:
tool_stats = self.get_tool_statistics(tool)
if tool_stats:
report_lines.append(f"\n Tool: {tool}")
report_lines.append(f" Avg Q-Value: {tool_stats['avg_q_value']:.3f}")
report_lines.append(f" Total Visits: {tool_stats['total_visits']}")
report_lines.append(f" States Used: {tool_stats['states_used']}/{tool_stats['states_present']}")
report_lines.append("")
# Underutilized tools
underutilized = self.find_underutilized_tools()
if underutilized:
report_lines.append("⚠️ Underutilized Tools:")
for tool, visits in underutilized:
report_lines.append(f" - {tool}: only {visits} visits")
report_lines.append("")
# Overselected tools
overselected = self.find_overselected_tools()
if overselected:
report_lines.append("⚠️ Potentially Overselected Tools:")
for tool, visits in overselected:
report_lines.append(f" - {tool}: {visits} visits")
report_lines.append("\n" + "=" * 70)
# Save report
report_text = "\n".join(report_lines)
with open(save_path, "w") as f:
f.write(report_text)
print(f"Tool performance report saved to {save_path}")
return report_text
# Usage
analyzer = ToolPerformanceAnalyzer(rl_manager)
# Compare tools
analyzer.compare_tools()
# Find issues
underutilized = analyzer.find_underutilized_tools(visit_threshold=10)
if underutilized:
print("\n⚠️ Underutilized tools found:")
for tool, visits in underutilized:
print(f" - {tool}: {visits} visits")
# Generate comprehensive report
analyzer.generate_tool_report("tool_report.txt")
📋 State Quality Reports
Analyze specific states and query patterns.
State Quality Analyzer
class StateQualityAnalyzer:
"""Analyze quality of learned states."""
def __init__(self, rl_manager):
"""
Args:
rl_manager: RLManager instance
"""
self.rl_manager = rl_manager
def analyze_query(self, query: str):
"""
Analyze the learned state for a query.
Args:
query: Query string
Returns: Dict of quality metrics
"""
quality = self.rl_manager.get_state_quality(query)
if not quality["exists"]:
return {
"query": query,
"state_exists": False,
"recommendation": "This query has not been seen during training"
}
# Calculate additional metrics
q_values = list(quality["tool_q_values"].values())
q_range = max(q_values) - min(q_values)
q_variance = sum((q - quality["avg_q_value"]) ** 2 for q in q_values) / len(q_values)
return {
"query": query,
"state_exists": True,
"best_tool": quality["best_tool"],
"best_q_value": quality["best_q_value"],
"avg_q_value": quality["avg_q_value"],
"total_visits": quality["total_visits"],
"tool_q_values": quality["tool_q_values"],
"q_range": q_range,
"q_variance": q_variance,
"confidence": "high" if q_range > 0.3 and quality["total_visits"] > 5 else "low"
}
def batch_analyze(self, queries: list):
"""
Analyze multiple queries.
Args:
queries: List of query strings
Returns: List of quality analyses
"""
results = []
for query in queries:
analysis = self.analyze_query(query)
results.append(analysis)
return results
def find_low_confidence_states(self, min_visits=3):
"""
Find states with low confidence (low visits or low Q-value variance).
Args:
min_visits: Minimum visits threshold
Returns: List of low-confidence states
"""
low_confidence = []
for state_key in self.rl_manager.q_table.keys():
total_visits = sum(self.rl_manager.visit_counts[state_key].values())
q_values = list(self.rl_manager.q_table[state_key].values())
if total_visits < min_visits:
low_confidence.append({
"state_key": state_key,
"reason": "insufficient_visits",
"visits": total_visits
})
continue
# Check Q-value variance
avg_q = sum(q_values) / len(q_values)
variance = sum((q - avg_q) ** 2 for q in q_values) / len(q_values)
if variance < 0.01: # Very low variance = uncertain
low_confidence.append({
"state_key": state_key,
"reason": "low_variance",
"variance": variance,
"visits": total_visits
})
return low_confidence
def generate_state_quality_report(self, important_queries: list, save_path="state_quality.txt"):
"""
Generate state quality report for important queries.
Args:
important_queries: List of important queries to analyze
save_path: Path to save report
"""
report_lines = []
report_lines.append("=" * 70)
report_lines.append("STATE QUALITY REPORT")
report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append("=" * 70)
report_lines.append("")
# Analyze important queries
report_lines.append("Important Query Analysis:")
for query in important_queries:
analysis = self.analyze_query(query)
report_lines.append(f"\nQuery: \"{query}\"")
if analysis["state_exists"]:
report_lines.append(f" Best Tool: {analysis['best_tool']} (Q={analysis['best_q_value']:.3f})")
report_lines.append(f" Avg Q-Value: {analysis['avg_q_value']:.3f}")
report_lines.append(f" Total Visits: {analysis['total_visits']}")
report_lines.append(f" Confidence: {analysis['confidence'].upper()}")
if analysis['confidence'] == 'low':
report_lines.append(" ⚠️ WARNING: Low confidence - consider more training")
else:
report_lines.append(" ❌ State not found - query not seen during training")
report_lines.append("\n" + "-" * 70)
# Low confidence states
low_confidence = self.find_low_confidence_states()
report_lines.append(f"\nLow Confidence States: {len(low_confidence)}")
if low_confidence:
report_lines.append("\nTop 10 Low Confidence States:")
for state in low_confidence[:10]:
report_lines.append(f" - State: {state['state_key'][:50]}...")
report_lines.append(f" Reason: {state['reason']}")
report_lines.append(f" Visits: {state.get('visits', 'N/A')}")
report_lines.append("\n" + "=" * 70)
# Save report
report_text = "\n".join(report_lines)
with open(save_path, "w") as f:
f.write(report_text)
print(f"State quality report saved to {save_path}")
return report_text
# Usage
state_analyzer = StateQualityAnalyzer(rl_manager)
# Analyze important queries
important_queries = [
"Search for Python documentation",
"Calculate quarterly revenue",
"Send status email",
"Get weather forecast for London"
]
print("=== State Quality Analysis ===\n")
for query in important_queries:
analysis = state_analyzer.analyze_query(query)
print(f"Query: \"{query}\"")
if analysis["state_exists"]:
print(f" Best Tool: {analysis['best_tool']} (confidence: {analysis['confidence']})")
print(f" Q-Value: {analysis['best_q_value']:.3f}")
else:
print(f" ❌ Not trained on this query")
print()
# Find low-confidence states
low_conf = state_analyzer.find_low_confidence_states()
print(f"Found {len(low_conf)} low-confidence states")
# Generate full report
state_analyzer.generate_state_quality_report(important_queries, "state_quality_report.txt")
📉 Visualization Tools
Visualize RL metrics for better understanding.
Complete Visualization Suite
import matplotlib.pyplot as plt
import seaborn as sns
class RLVisualizationSuite:
"""Comprehensive visualization tools for RL monitoring."""
def __init__(self, rl_manager):
"""
Args:
rl_manager: RLManager instance
"""
self.rl_manager = rl_manager
sns.set_style("whitegrid")
def plot_tool_q_values(self, save_path="tool_q_values.png"):
"""Plot Q-values for all tools."""
tool_avg_q = {}
for tool in self.rl_manager.tool_names:
q_values = []
for state_key in self.rl_manager.q_table.keys():
if tool in self.rl_manager.q_table[state_key]:
q_values.append(self.rl_manager.q_table[state_key][tool])
tool_avg_q[tool] = sum(q_values) / len(q_values) if q_values else 0.0
# Plot
plt.figure(figsize=(10, 6))
tools = list(tool_avg_q.keys())
q_values = list(tool_avg_q.values())
plt.bar(tools, q_values, color='skyblue', edgecolor='navy')
plt.xlabel("Tool")
plt.ylabel("Average Q-Value")
plt.title("Average Q-Value by Tool")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(save_path)
print(f"Tool Q-values plot saved to {save_path}")
def plot_tool_usage_heatmap(self, save_path="tool_usage_heatmap.png"):
"""Plot heatmap of tool usage across states."""
import numpy as np
# Get top 10 most visited states
state_visits = {
state_key: sum(self.rl_manager.visit_counts[state_key].values())
for state_key in self.rl_manager.q_table.keys()
}
top_states = sorted(state_visits.items(), key=lambda x: x[1], reverse=True)[:10]
# Build matrix
tools = self.rl_manager.tool_names
matrix = []
for state_key, _ in top_states:
row = [self.rl_manager.visit_counts[state_key].get(tool, 0) for tool in tools]
matrix.append(row)
# Plot heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(
matrix,
annot=True,
fmt='d',
cmap='YlOrRd',
xticklabels=tools,
yticklabels=[f"State {i+1}" for i in range(len(top_states))],
cbar_kws={'label': 'Visit Count'}
)
plt.title("Tool Usage Heatmap (Top 10 States)")
plt.xlabel("Tool")
plt.ylabel("State")
plt.tight_layout()
plt.savefig(save_path)
print(f"Tool usage heatmap saved to {save_path}")
def plot_q_value_distribution(self, save_path="q_value_dist.png"):
"""Plot distribution of Q-values."""
all_q_values = []
for state_key in self.rl_manager.q_table.keys():
for tool, q_value in self.rl_manager.q_table[state_key].items():
all_q_values.append(q_value)
plt.figure(figsize=(10, 6))
plt.hist(all_q_values, bins=50, color='lightblue', edgecolor='navy', alpha=0.7)
plt.xlabel("Q-Value")
plt.ylabel("Frequency")
plt.title("Q-Value Distribution")
plt.axvline(sum(all_q_values) / len(all_q_values), color='red', linestyle='--', label='Mean')
plt.legend()
plt.tight_layout()
plt.savefig(save_path)
print(f"Q-value distribution plot saved to {save_path}")
def plot_exploration_vs_exploitation(self, history, save_path="exploration_vs_exploitation.png"):
"""
Plot exploration vs exploitation over time.
Args:
history: List of dicts with 'exploration' and 'exploitation' counts
"""
exploration = [h['exploration'] for h in history]
exploitation = [h['exploitation'] for h in history]
plt.figure(figsize=(12, 6))
plt.plot(exploration, label='Exploration', marker='o', color='orange')
plt.plot(exploitation, label='Exploitation', marker='s', color='blue')
plt.xlabel("Iteration")
plt.ylabel("Count")
plt.title("Exploration vs Exploitation Over Time")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(save_path)
print(f"Exploration vs exploitation plot saved to {save_path}")
def create_comprehensive_dashboard(self, save_dir="rl_dashboard"):
"""Create comprehensive monitoring dashboard."""
import os
os.makedirs(save_dir, exist_ok=True)
# Generate all plots
self.plot_tool_q_values(f"{save_dir}/tool_q_values.png")
self.plot_tool_usage_heatmap(f"{save_dir}/tool_usage_heatmap.png")
self.plot_q_value_distribution(f"{save_dir}/q_value_distribution.png")
# Generate HTML dashboard
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>RL Monitoring Dashboard</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
h1 {{ color: #333; }}
.plot {{ margin: 20px 0; padding: 10px; background-color: white; border-radius: 5px; }}
img {{ max-width: 100%; height: auto; }}
</style>
</head>
<body>
<h1>RL System Monitoring Dashboard</h1>
<p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<div class="plot">
<h2>Tool Q-Values</h2>
<img src="tool_q_values.png" alt="Tool Q-Values">
</div>
<div class="plot">
<h2>Tool Usage Heatmap</h2>
<img src="tool_usage_heatmap.png" alt="Tool Usage Heatmap">
</div>
<div class="plot">
<h2>Q-Value Distribution</h2>
<img src="q_value_distribution.png" alt="Q-Value Distribution">
</div>
</body>
</html>
"""
with open(f"{save_dir}/dashboard.html", "w") as f:
f.write(html_content)
print(f"Comprehensive dashboard created in {save_dir}/")
print(f"Open {save_dir}/dashboard.html in a browser to view")
# Usage
viz_suite = RLVisualizationSuite(rl_manager)
# Create comprehensive dashboard
viz_suite.create_comprehensive_dashboard("rl_monitoring_dashboard")
🚨 Alerting and Anomaly Detection
Detect and alert on anomalies in RL behavior.
Anomaly Detector
class RLAnomalyDetector:
"""Detect anomalies in RL system behavior."""
def __init__(self, rl_manager, alert_thresholds=None):
"""
Args:
rl_manager: RLManager instance
alert_thresholds: Dict of threshold configurations
"""
self.rl_manager = rl_manager
# Default thresholds
self.thresholds = alert_thresholds or {
"min_q_value": -0.5,
"max_exploration_rate": 0.8,
"min_state_visits": 3,
"q_value_std_dev_max": 1.0,
"tool_selection_imbalance_ratio": 10.0
}
self.alerts = []
def check_q_value_anomalies(self):
"""Check for Q-value anomalies."""
stats = self.rl_manager.get_statistics()
# Check if avg Q-value is too low
if stats["avg_q_value"] < self.thresholds["min_q_value"]:
self.alerts.append({
"type": "LOW_Q_VALUE",
"severity": "WARNING",
"message": f"Average Q-value ({stats['avg_q_value']:.3f}) below threshold ({self.thresholds['min_q_value']})",
"timestamp": datetime.now()
})
# Check Q-value variance
all_q_values = []
for state in self.rl_manager.q_table.values():
all_q_values.extend(state.values())
if all_q_values:
avg_q = sum(all_q_values) / len(all_q_values)
variance = sum((q - avg_q) ** 2 for q in all_q_values) / len(all_q_values)
std_dev = variance ** 0.5
if std_dev > self.thresholds["q_value_std_dev_max"]:
self.alerts.append({
"type": "HIGH_Q_VARIANCE",
"severity": "INFO",
"message": f"Q-value std dev ({std_dev:.3f}) is high - model may be unstable",
"timestamp": datetime.now()
})
def check_exploration_anomalies(self):
"""Check for exploration rate anomalies."""
exploration_rate = self.rl_manager.exploration_rate
if exploration_rate > self.thresholds["max_exploration_rate"]:
self.alerts.append({
"type": "HIGH_EXPLORATION",
"severity": "WARNING",
"message": f"Exploration rate ({exploration_rate:.2%}) is very high",
"timestamp": datetime.now()
})
def check_tool_selection_imbalance(self):
"""Check for imbalanced tool selection."""
tool_visit_counts = {}
for tool in self.rl_manager.tool_names:
total_visits = 0
for state_key in self.rl_manager.q_table.keys():
total_visits += self.rl_manager.visit_counts[state_key].get(tool, 0)
tool_visit_counts[tool] = total_visits
if tool_visit_counts:
max_visits = max(tool_visit_counts.values())
min_visits = min(tool_visit_counts.values())
if min_visits > 0:
ratio = max_visits / min_visits
if ratio > self.thresholds["tool_selection_imbalance_ratio"]:
max_tool = max(tool_visit_counts, key=tool_visit_counts.get)
min_tool = min(tool_visit_counts, key=tool_visit_counts.get)
self.alerts.append({
"type": "TOOL_IMBALANCE",
"severity": "WARNING",
"message": f"Tool selection imbalanced: {max_tool} ({max_visits} visits) vs {min_tool} ({min_visits} visits)",
"timestamp": datetime.now()
})
def check_undervisited_states(self):
"""Check for states with insufficient visits."""
undervisited = 0
for state_key in self.rl_manager.q_table.keys():
total_visits = sum(self.rl_manager.visit_counts[state_key].values())
if total_visits < self.thresholds["min_state_visits"]:
undervisited += 1
if undervisited > len(self.rl_manager.q_table) * 0.2: # More than 20%
self.alerts.append({
"type": "UNDERVISITED_STATES",
"severity": "INFO",
"message": f"{undervisited} states have fewer than {self.thresholds['min_state_visits']} visits",
"timestamp": datetime.now()
})
def run_full_check(self):
"""Run all anomaly checks."""
self.alerts.clear()
self.check_q_value_anomalies()
self.check_exploration_anomalies()
self.check_tool_selection_imbalance()
self.check_undervisited_states()
return self.alerts
def print_alerts(self):
"""Print all alerts."""
if not self.alerts:
print("✅ No anomalies detected")
return
print(f"⚠️ Detected {len(self.alerts)} alert(s):\n")
for alert in self.alerts:
severity_emoji = "🔴" if alert["severity"] == "ERROR" else "🟡" if alert["severity"] == "WARNING" else "🔵"
print(f"{severity_emoji} [{alert['severity']}] {alert['type']}")
print(f" {alert['message']}")
print(f" Time: {alert['timestamp'].strftime('%H:%M:%S')}\n")
# Usage
detector = RLAnomalyDetector(rl_manager)
# Run anomaly detection
alerts = detector.run_full_check()
detector.print_alerts()
# Continuous monitoring
print("\n=== Continuous Monitoring (every 10 interactions) ===\n")
for i in range(30):
query = f"Query {i}"
selected, state_key = rl_manager.select_tools(query, top_n=2)
reward = 0.7 if i % 2 == 0 else 0.3
for tool in selected:
rl_manager.update(state_key, tool, reward)
# Check for anomalies every 10 interactions
if (i + 1) % 10 == 0:
alerts = detector.run_full_check()
print(f"Checkpoint {i+1}:")
detector.print_alerts()
🐛 Debugging Techniques
Advanced debugging for RL systems.
Debug Mode
class RLDebugger:
"""Advanced debugging tools for RL systems."""
def __init__(self, rl_manager, verbose=True):
"""
Args:
rl_manager: RLManager instance
verbose: Enable verbose logging
"""
self.rl_manager = rl_manager
self.verbose = verbose
self.interaction_log = []
def trace_selection(self, query: str, top_n: int = 2):
"""
Trace tool selection process with detailed logging.
Args:
query: User query
top_n: Number of tools to select
Returns: (selected_tools, trace_info)
"""
print(f"\n{'='*60}")
print(f"TRACING TOOL SELECTION FOR: \"{query}\"")
print(f"{'='*60}")
# Get state key
if self.rl_manager.use_embeddings:
state_key = self.rl_manager._get_semantic_state_key(query)
print(f"\n1. Semantic State Key: {state_key[:50]}...")
else:
state_key = query
print(f"\n1. Direct State Key: {state_key}")
# Check if state exists
if state_key not in self.rl_manager.q_table:
print(" ⚠️ New state (not in Q-table)")
print(" Initializing with default Q-values...")
else:
print(" ✓ State found in Q-table")
# Show Q-values
print(f"\n2. Q-Values for this state:")
q_table = self.rl_manager.q_table[state_key]
for tool, q_value in sorted(q_table.items(), key=lambda x: x[1], reverse=True):
visits = self.rl_manager.visit_counts[state_key].get(tool, 0)
print(f" {tool:15s}: Q={q_value:6.3f} (visits: {visits})")
# Exploration decision
import random
exploration_roll = random.random()
will_explore = exploration_roll < self.rl_manager.exploration_rate
print(f"\n3. Exploration Decision:")
print(f" Exploration rate: {self.rl_manager.exploration_rate:.2%}")
print(f" Random roll: {exploration_roll:.3f}")
print(f" Decision: {'EXPLORE' if will_explore else 'EXPLOIT'}")
# Tool selection
selected, _ = self.rl_manager.select_tools(query, top_n=top_n)
print(f"\n4. Selected Tools:")
for tool in selected:
print(f" - {tool}")
print(f"\n{'='*60}\n")
trace_info = {
"query": query,
"state_key": state_key,
"q_values": dict(q_table),
"exploration_rate": self.rl_manager.exploration_rate,
"explored": will_explore,
"selected": selected
}
self.interaction_log.append(trace_info)
return selected, trace_info
def trace_update(self, state_key: str, tool: str, reward: float):
"""
Trace Q-value update process.
Args:
state_key: State key
tool: Tool name
reward: Reward value
"""
old_q = self.rl_manager.q_table[state_key].get(tool, 0.0)
print(f"\n{'='*60}")
print(f"TRACING Q-VALUE UPDATE")
print(f"{'='*60}")
print(f"Tool: {tool}")
print(f"Reward: {reward:.3f}")
print(f"Old Q-Value: {old_q:.3f}")
print(f"Learning Rate: {self.rl_manager.learning_rate}")
print(f"Discount Factor: {self.rl_manager.discount_factor}")
# Perform update
self.rl_manager.update(state_key, tool, reward)
new_q = self.rl_manager.q_table[state_key][tool]
delta = new_q - old_q
print(f"\nNew Q-Value: {new_q:.3f}")
print(f"Change (Δ): {delta:+.3f}")
print(f"{'='*60}\n")
def export_debug_log(self, save_path="debug_log.json"):
"""Export interaction log for analysis."""
import json
with open(save_path, "w") as f:
json.dump(self.interaction_log, f, indent=2)
print(f"Debug log exported to {save_path}")
# Usage
debugger = RLDebugger(rl_manager, verbose=True)
# Trace tool selection
query = "Search for machine learning tutorials"
selected, trace = debugger.trace_selection(query, top_n=2)
# Trace Q-value update
reward = 0.9
for tool in selected:
debugger.trace_update(trace["state_key"], tool, reward)
# Export debug log
debugger.export_debug_log("rl_debug_log.json")
⚡ Performance Profiling
Profile RL system performance for optimization.
Performance Profiler
import time
class RLPerformanceProfiler:
"""Profile RL system performance."""
def __init__(self, rl_manager):
"""
Args:
rl_manager: RLManager instance
"""
self.rl_manager = rl_manager
self.metrics = {
"select_times": [],
"update_times": [],
"persist_times": []
}
def profile_selection(self, query: str, top_n: int = 2):
"""Profile tool selection performance."""
start = time.time()
selected, state_key = self.rl_manager.select_tools(query, top_n=top_n)
duration = time.time() - start
self.metrics["select_times"].append(duration)
return selected, state_key, duration
def profile_update(self, state_key: str, tool: str, reward: float):
"""Profile Q-value update performance."""
start = time.time()
self.rl_manager.update(state_key, tool, reward)
duration = time.time() - start
self.metrics["update_times"].append(duration)
return duration
def profile_persistence(self):
"""Profile Q-table persistence performance."""
start = time.time()
self.rl_manager.force_persist()
duration = time.time() - start
self.metrics["persist_times"].append(duration)
return duration
def generate_performance_report(self):
"""Generate performance report."""
print("=== RL Performance Profile ===\n")
# Selection performance
if self.metrics["select_times"]:
avg_select = sum(self.metrics["select_times"]) / len(self.metrics["select_times"])
print(f"Tool Selection:")
print(f" Avg: {avg_select*1000:.2f}ms")
print(f" Min: {min(self.metrics['select_times'])*1000:.2f}ms")
print(f" Max: {max(self.metrics['select_times'])*1000:.2f}ms")
print()
# Update performance
if self.metrics["update_times"]:
avg_update = sum(self.metrics["update_times"]) / len(self.metrics["update_times"])
print(f"Q-Value Update:")
print(f" Avg: {avg_update*1000:.2f}ms")
print(f" Min: {min(self.metrics['update_times'])*1000:.2f}ms")
print(f" Max: {max(self.metrics['update_times'])*1000:.2f}ms")
print()
# Persistence performance
if self.metrics["persist_times"]:
avg_persist = sum(self.metrics["persist_times"]) / len(self.metrics["persist_times"])
print(f"Persistence:")
print(f" Avg: {avg_persist*1000:.2f}ms")
# Usage
profiler = RLPerformanceProfiler(rl_manager)
# Profile operations
for i in range(100):
query = f"Test query {i}"
# Profile selection
selected, state_key, select_time = profiler.profile_selection(query)
# Profile update
reward = 0.8
for tool in selected:
update_time = profiler.profile_update(state_key, tool, reward)
# Profile persistence
persist_time = profiler.profile_persistence()
# Generate report
profiler.generate_performance_report()
🎓 Best Practices
1. Monitor Continuously
# ✅ Set up continuous monitoring
dashboard = RLMonitoringDashboard(rl_manager)
detector = RLAnomalyDetector(rl_manager)
# Run checks regularly
for interaction in production_loop():
dashboard.record_interaction(...)
if interaction % 100 == 0:
detector.run_full_check()
2. Alert on Anomalies
# ✅ Configure alerts
alert_thresholds = {
"min_q_value": -0.3,
"max_exploration_rate": 0.5,
"min_state_visits": 5
}
detector = RLAnomalyDetector(rl_manager, alert_thresholds)
3. Track Q-Value Convergence
# ✅ Monitor convergence
tracker = QValueTracker(rl_manager)
for epoch in training_loop():
train_epoch()
if epoch % 10 == 0:
tracker.snapshot()
convergence = tracker.get_convergence_metrics()
if convergence["converged"]:
break
4. Generate Regular Reports
# ✅ Schedule periodic reports
import schedule
def generate_reports():
analyzer.generate_tool_report()
state_analyzer.generate_state_quality_report(important_queries)
viz_suite.create_comprehensive_dashboard()
schedule.every().day.at("02:00").do(generate_reports)
🎯 Summary
Azcore provides comprehensive monitoring tools:
- Real-Time Metrics: Live system statistics and dashboards
- Q-Value Tracking: Monitor learning progress and convergence
- Tool Performance Analysis: Detailed tool-level analytics
- State Quality Reports: Analyze learned state quality
- Visualization Tools: Rich visualizations for insights
- Alerting: Automatic anomaly detection and alerting
- Debugging: Detailed trace logging and debugging
- Performance Profiling: Optimize system performance
Use these tools to ensure your RL system is healthy, learning effectively, and performing optimally in production.