• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Reinforcement Learning

Monitoring

Monitoring and debugging RL systems in Azcore.

Comprehensive tools and techniques for monitoring RL performance, debugging issues, and ensuring system health in production.


📊 Real-Time Metrics

Monitor RL system health in real-time.

Basic Statistics

from azcore.rl.rl_manager import RLManager

# Create RL manager
rl_manager = RLManager(
    tool_names=["search", "calculate", "weather", "email"],
    q_table_path="rl_data/production.pkl",
    use_embeddings=True
)

# Get current statistics
stats = rl_manager.get_statistics()

print("=== RL System Statistics ===")
print(f"Total States: {stats['total_states']}")
print(f"Exploration Rate: {stats['exploration_rate']:.2%}")
print(f"Exploration Strategy: {stats['exploration_strategy']}")
print(f"Total State Visits: {stats['total_state_visits']}")
print(f"Average Q-Value: {stats['avg_q_value']:.3f}")
print(f"Max Q-Value: {stats['max_q_value']:.3f}")
print(f"Min Q-Value: {stats['min_q_value']:.3f}")

Continuous Monitoring Dashboard

import time
from datetime import datetime
from collections import deque

class RLMonitoringDashboard:
    """Real-time monitoring dashboard for RL systems."""

    def __init__(self, rl_manager, window_size=100):
        """
        Args:
            rl_manager: RLManager instance
            window_size: Size of rolling window for metrics
        """
        self.rl_manager = rl_manager
        self.window_size = window_size

        # Rolling windows for metrics
        self.reward_history = deque(maxlen=window_size)
        self.exploration_history = deque(maxlen=window_size)
        self.q_value_history = deque(maxlen=window_size)
        self.tool_selection_counts = {tool: 0 for tool in rl_manager.tool_names}

        # Timestamps
        self.start_time = datetime.now()
        self.last_update = datetime.now()

    def record_interaction(self, selected_tools, reward):
        """Record an interaction for monitoring."""
        # Update histories
        self.reward_history.append(reward)
        self.exploration_history.append(self.rl_manager.exploration_rate)

        # Update tool selection counts
        for tool in selected_tools:
            self.tool_selection_counts[tool] += 1

        # Get current average Q-value
        stats = self.rl_manager.get_statistics()
        self.q_value_history.append(stats['avg_q_value'])

        self.last_update = datetime.now()

    def get_dashboard_summary(self):
        """Get summary of current metrics."""
        if not self.reward_history:
            return "No data collected yet"

        # Calculate metrics
        avg_reward = sum(self.reward_history) / len(self.reward_history)
        avg_exploration = sum(self.exploration_history) / len(self.exploration_history)
        avg_q_value = sum(self.q_value_history) / len(self.q_value_history)

        # Runtime
        runtime = (datetime.now() - self.start_time).total_seconds()

        # Tool distribution
        total_selections = sum(self.tool_selection_counts.values())
        tool_distribution = {
            tool: (count / total_selections if total_selections > 0 else 0)
            for tool, count in self.tool_selection_counts.items()
        }

        summary = f"""
=== RL Monitoring Dashboard ===
Runtime: {runtime:.1f}s
Last Update: {self.last_update.strftime('%H:%M:%S')}

Recent Performance (last {len(self.reward_history)} interactions):
  Avg Reward: {avg_reward:.3f}
  Avg Exploration: {avg_exploration:.2%}
  Avg Q-Value: {avg_q_value:.3f}

Tool Selection Distribution:
"""
        for tool, pct in sorted(tool_distribution.items(), key=lambda x: x[1], reverse=True):
            bar = "█" * int(pct * 50)
            summary += f"  {tool:12s}: {pct:5.1%} {bar}\n"

        return summary

    def display(self, clear_screen=True):
        """Display dashboard (for terminal monitoring)."""
        if clear_screen:
            import os
            os.system('cls' if os.name == 'nt' else 'clear')

        print(self.get_dashboard_summary())

# Usage
dashboard = RLMonitoringDashboard(rl_manager, window_size=100)

# Simulate interactions
for i in range(50):
    query = f"Query {i}"
    selected, state_key = rl_manager.select_tools(query, top_n=2)

    # Simulate reward
    reward = 0.8 if i % 3 == 0 else 0.5

    # Update RL
    for tool in selected:
        rl_manager.update(state_key, tool, reward)

    # Record for monitoring
    dashboard.record_interaction(selected, reward)

    # Display dashboard every 10 interactions
    if (i + 1) % 10 == 0:
        dashboard.display(clear_screen=False)
        time.sleep(0.5)

📈 Q-Value Tracking

Track Q-value evolution over time to monitor learning progress.

Q-Value History Tracking

class QValueTracker:
    """Track Q-value evolution over time."""

    def __init__(self, rl_manager):
        """
        Args:
            rl_manager: RLManager instance
        """
        self.rl_manager = rl_manager
        self.history = []
        self.timestamps = []

    def snapshot(self):
        """Take a snapshot of current Q-values."""
        snapshot = {
            "timestamp": datetime.now(),
            "q_table": {},
            "statistics": self.rl_manager.get_statistics()
        }

        # Deep copy Q-table
        for state_key, actions in self.rl_manager.q_table.items():
            snapshot["q_table"][state_key] = dict(actions)

        self.history.append(snapshot)
        self.timestamps.append(snapshot["timestamp"])

    def get_tool_q_evolution(self, tool_name: str, state_key: str = None):
        """
        Get Q-value evolution for a specific tool.

        Args:
            tool_name: Name of tool
            state_key: Optional specific state key

        Returns: List of Q-values over time
        """
        q_values = []

        for snapshot in self.history:
            if state_key:
                # Specific state
                if state_key in snapshot["q_table"]:
                    q_values.append(snapshot["q_table"][state_key].get(tool_name, 0.0))
                else:
                    q_values.append(0.0)
            else:
                # Average across all states
                state_q_values = [
                    actions.get(tool_name, 0.0)
                    for actions in snapshot["q_table"].values()
                ]
                avg_q = sum(state_q_values) / len(state_q_values) if state_q_values else 0.0
                q_values.append(avg_q)

        return q_values

    def get_convergence_metrics(self):
        """Calculate convergence metrics."""
        if len(self.history) < 10:
            return {"converged": False, "reason": "Insufficient data"}

        # Get recent Q-value averages
        recent_q_values = [snap["statistics"]["avg_q_value"] for snap in self.history[-10:]]

        # Calculate variance
        mean_q = sum(recent_q_values) / len(recent_q_values)
        variance = sum((q - mean_q) ** 2 for q in recent_q_values) / len(recent_q_values)
        std_dev = variance ** 0.5

        # Check convergence (low variance in recent history)
        converged = std_dev < 0.01

        return {
            "converged": converged,
            "mean_q": mean_q,
            "std_dev": std_dev,
            "variance": variance,
            "recent_q_values": recent_q_values
        }

    def plot_q_value_convergence(self, tool_names=None, save_path="q_convergence.png"):
        """Plot Q-value convergence."""
        import matplotlib.pyplot as plt

        if not self.history:
            print("No history to plot")
            return

        if tool_names is None:
            tool_names = self.rl_manager.tool_names

        plt.figure(figsize=(12, 6))

        for tool in tool_names:
            q_values = self.get_tool_q_evolution(tool)
            plt.plot(range(len(q_values)), q_values, marker='o', label=tool)

        plt.xlabel("Snapshot Index")
        plt.ylabel("Average Q-Value")
        plt.title("Q-Value Convergence Over Time")
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(save_path)
        print(f"Q-value convergence plot saved to {save_path}")

# Training with Q-value tracking
tracker = QValueTracker(rl_manager)

training_data = [
    ("Search for documentation", ["search"]),
    ("Calculate revenue", ["calculate"]),
    ("Send email report", ["email"]),
    ("Get weather forecast", ["weather"]),
]

print("=== Training with Q-Value Tracking ===\n")

for epoch in range(20):
    for query, correct_tools in training_data:
        selected, state_key = rl_manager.select_tools(query, top_n=2)

        reward = 1.0 if any(t in correct_tools for t in selected) else -0.5

        for tool in selected:
            rl_manager.update(state_key, tool, reward)

    # Take snapshot every 5 epochs
    if (epoch + 1) % 5 == 0:
        tracker.snapshot()
        print(f"Epoch {epoch+1}: Snapshot taken")

# Check convergence
convergence = tracker.get_convergence_metrics()
print(f"\n=== Convergence Analysis ===")
print(f"Converged: {convergence['converged']}")
print(f"Mean Q-Value: {convergence['mean_q']:.3f}")
print(f"Std Dev: {convergence['std_dev']:.4f}")

# Plot Q-value evolution
tracker.plot_q_value_convergence()

🔍 Tool Performance Analysis

Analyze individual tool performance and selection patterns.

Tool Performance Analyzer

class ToolPerformanceAnalyzer:
    """Analyze tool performance in detail."""

    def __init__(self, rl_manager):
        """
        Args:
            rl_manager: RLManager instance
        """
        self.rl_manager = rl_manager

    def get_tool_statistics(self, tool_name: str):
        """
        Get detailed statistics for a specific tool.

        Args:
            tool_name: Name of tool

        Returns: Dict of statistics
        """
        q_values = []
        visit_counts = []
        states_used = 0

        for state_key in self.rl_manager.q_table.keys():
            if tool_name in self.rl_manager.q_table[state_key]:
                q_values.append(self.rl_manager.q_table[state_key][tool_name])
                visits = self.rl_manager.visit_counts[state_key].get(tool_name, 0)
                visit_counts.append(visits)
                if visits > 0:
                    states_used += 1

        if not q_values:
            return None

        return {
            "tool": tool_name,
            "avg_q_value": sum(q_values) / len(q_values),
            "max_q_value": max(q_values),
            "min_q_value": min(q_values),
            "total_visits": sum(visit_counts),
            "states_present": len(q_values),
            "states_used": states_used,
            "avg_visits_per_state": sum(visit_counts) / len(visit_counts) if visit_counts else 0
        }

    def compare_tools(self):
        """Compare all tools side-by-side."""
        print("=== Tool Performance Comparison ===\n")

        all_stats = []
        for tool in self.rl_manager.tool_names:
            stats = self.get_tool_statistics(tool)
            if stats:
                all_stats.append(stats)

        # Sort by average Q-value
        all_stats.sort(key=lambda x: x["avg_q_value"], reverse=True)

        # Print table
        print(f"{'Tool':<15} {'Avg Q':>8} {'Total Visits':>12} {'States Used':>12} {'Ranking':>10}")
        print("-" * 70)

        for i, stats in enumerate(all_stats, 1):
            ranking = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else f"#{i}"
            print(f"{stats['tool']:<15} {stats['avg_q_value']:>8.3f} "
                  f"{stats['total_visits']:>12} {stats['states_used']:>12} {ranking:>10}")

    def find_underutilized_tools(self, visit_threshold=5):
        """
        Find tools that are underutilized.

        Args:
            visit_threshold: Minimum visits to be considered utilized

        Returns: List of underutilized tools
        """
        underutilized = []

        for tool in self.rl_manager.tool_names:
            stats = self.get_tool_statistics(tool)
            if stats and stats["total_visits"] < visit_threshold:
                underutilized.append((tool, stats["total_visits"]))

        return underutilized

    def find_overselected_tools(self, threshold_percentile=75):
        """
        Find tools that are selected disproportionately often.

        Args:
            threshold_percentile: Percentile threshold

        Returns: List of overselected tools
        """
        all_visits = []
        for tool in self.rl_manager.tool_names:
            stats = self.get_tool_statistics(tool)
            if stats:
                all_visits.append((tool, stats["total_visits"]))

        # Calculate percentile
        visit_counts = [v for _, v in all_visits]
        visit_counts.sort()
        percentile_index = int(len(visit_counts) * threshold_percentile / 100)
        percentile_value = visit_counts[percentile_index] if percentile_index < len(visit_counts) else visit_counts[-1]

        # Find tools above threshold
        overselected = [(tool, visits) for tool, visits in all_visits if visits > percentile_value]
        overselected.sort(key=lambda x: x[1], reverse=True)

        return overselected

    def generate_tool_report(self, save_path="tool_report.txt"):
        """Generate comprehensive tool performance report."""
        report_lines = []

        report_lines.append("=" * 70)
        report_lines.append("RL TOOL PERFORMANCE REPORT")
        report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("=" * 70)
        report_lines.append("")

        # Overall statistics
        stats = self.rl_manager.get_statistics()
        report_lines.append("Overall System Statistics:")
        report_lines.append(f"  Total States: {stats['total_states']}")
        report_lines.append(f"  Exploration Rate: {stats['exploration_rate']:.2%}")
        report_lines.append(f"  Average Q-Value: {stats['avg_q_value']:.3f}")
        report_lines.append("")

        # Top performers
        report_lines.append("Top Performing Tools:")
        top_tools = self.rl_manager.get_top_performing_tools(top_n=5)
        for i, (tool, avg_q) in enumerate(top_tools, 1):
            report_lines.append(f"  {i}. {tool}: Q={avg_q:.3f}")
        report_lines.append("")

        # Detailed tool statistics
        report_lines.append("Detailed Tool Statistics:")
        for tool in self.rl_manager.tool_names:
            tool_stats = self.get_tool_statistics(tool)
            if tool_stats:
                report_lines.append(f"\n  Tool: {tool}")
                report_lines.append(f"    Avg Q-Value: {tool_stats['avg_q_value']:.3f}")
                report_lines.append(f"    Total Visits: {tool_stats['total_visits']}")
                report_lines.append(f"    States Used: {tool_stats['states_used']}/{tool_stats['states_present']}")
        report_lines.append("")

        # Underutilized tools
        underutilized = self.find_underutilized_tools()
        if underutilized:
            report_lines.append("⚠️  Underutilized Tools:")
            for tool, visits in underutilized:
                report_lines.append(f"  - {tool}: only {visits} visits")
        report_lines.append("")

        # Overselected tools
        overselected = self.find_overselected_tools()
        if overselected:
            report_lines.append("⚠️  Potentially Overselected Tools:")
            for tool, visits in overselected:
                report_lines.append(f"  - {tool}: {visits} visits")

        report_lines.append("\n" + "=" * 70)

        # Save report
        report_text = "\n".join(report_lines)
        with open(save_path, "w") as f:
            f.write(report_text)

        print(f"Tool performance report saved to {save_path}")
        return report_text

# Usage
analyzer = ToolPerformanceAnalyzer(rl_manager)

# Compare tools
analyzer.compare_tools()

# Find issues
underutilized = analyzer.find_underutilized_tools(visit_threshold=10)
if underutilized:
    print("\n⚠️ Underutilized tools found:")
    for tool, visits in underutilized:
        print(f"  - {tool}: {visits} visits")

# Generate comprehensive report
analyzer.generate_tool_report("tool_report.txt")

📋 State Quality Reports

Analyze specific states and query patterns.

State Quality Analyzer

class StateQualityAnalyzer:
    """Analyze quality of learned states."""

    def __init__(self, rl_manager):
        """
        Args:
            rl_manager: RLManager instance
        """
        self.rl_manager = rl_manager

    def analyze_query(self, query: str):
        """
        Analyze the learned state for a query.

        Args:
            query: Query string

        Returns: Dict of quality metrics
        """
        quality = self.rl_manager.get_state_quality(query)

        if not quality["exists"]:
            return {
                "query": query,
                "state_exists": False,
                "recommendation": "This query has not been seen during training"
            }

        # Calculate additional metrics
        q_values = list(quality["tool_q_values"].values())
        q_range = max(q_values) - min(q_values)
        q_variance = sum((q - quality["avg_q_value"]) ** 2 for q in q_values) / len(q_values)

        return {
            "query": query,
            "state_exists": True,
            "best_tool": quality["best_tool"],
            "best_q_value": quality["best_q_value"],
            "avg_q_value": quality["avg_q_value"],
            "total_visits": quality["total_visits"],
            "tool_q_values": quality["tool_q_values"],
            "q_range": q_range,
            "q_variance": q_variance,
            "confidence": "high" if q_range > 0.3 and quality["total_visits"] > 5 else "low"
        }

    def batch_analyze(self, queries: list):
        """
        Analyze multiple queries.

        Args:
            queries: List of query strings

        Returns: List of quality analyses
        """
        results = []
        for query in queries:
            analysis = self.analyze_query(query)
            results.append(analysis)
        return results

    def find_low_confidence_states(self, min_visits=3):
        """
        Find states with low confidence (low visits or low Q-value variance).

        Args:
            min_visits: Minimum visits threshold

        Returns: List of low-confidence states
        """
        low_confidence = []

        for state_key in self.rl_manager.q_table.keys():
            total_visits = sum(self.rl_manager.visit_counts[state_key].values())
            q_values = list(self.rl_manager.q_table[state_key].values())

            if total_visits < min_visits:
                low_confidence.append({
                    "state_key": state_key,
                    "reason": "insufficient_visits",
                    "visits": total_visits
                })
                continue

            # Check Q-value variance
            avg_q = sum(q_values) / len(q_values)
            variance = sum((q - avg_q) ** 2 for q in q_values) / len(q_values)

            if variance < 0.01:  # Very low variance = uncertain
                low_confidence.append({
                    "state_key": state_key,
                    "reason": "low_variance",
                    "variance": variance,
                    "visits": total_visits
                })

        return low_confidence

    def generate_state_quality_report(self, important_queries: list, save_path="state_quality.txt"):
        """
        Generate state quality report for important queries.

        Args:
            important_queries: List of important queries to analyze
            save_path: Path to save report
        """
        report_lines = []

        report_lines.append("=" * 70)
        report_lines.append("STATE QUALITY REPORT")
        report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("=" * 70)
        report_lines.append("")

        # Analyze important queries
        report_lines.append("Important Query Analysis:")
        for query in important_queries:
            analysis = self.analyze_query(query)

            report_lines.append(f"\nQuery: \"{query}\"")

            if analysis["state_exists"]:
                report_lines.append(f"  Best Tool: {analysis['best_tool']} (Q={analysis['best_q_value']:.3f})")
                report_lines.append(f"  Avg Q-Value: {analysis['avg_q_value']:.3f}")
                report_lines.append(f"  Total Visits: {analysis['total_visits']}")
                report_lines.append(f"  Confidence: {analysis['confidence'].upper()}")

                if analysis['confidence'] == 'low':
                    report_lines.append("  ⚠️  WARNING: Low confidence - consider more training")
            else:
                report_lines.append("  ❌ State not found - query not seen during training")

        report_lines.append("\n" + "-" * 70)

        # Low confidence states
        low_confidence = self.find_low_confidence_states()
        report_lines.append(f"\nLow Confidence States: {len(low_confidence)}")

        if low_confidence:
            report_lines.append("\nTop 10 Low Confidence States:")
            for state in low_confidence[:10]:
                report_lines.append(f"  - State: {state['state_key'][:50]}...")
                report_lines.append(f"    Reason: {state['reason']}")
                report_lines.append(f"    Visits: {state.get('visits', 'N/A')}")

        report_lines.append("\n" + "=" * 70)

        # Save report
        report_text = "\n".join(report_lines)
        with open(save_path, "w") as f:
            f.write(report_text)

        print(f"State quality report saved to {save_path}")
        return report_text

# Usage
state_analyzer = StateQualityAnalyzer(rl_manager)

# Analyze important queries
important_queries = [
    "Search for Python documentation",
    "Calculate quarterly revenue",
    "Send status email",
    "Get weather forecast for London"
]

print("=== State Quality Analysis ===\n")

for query in important_queries:
    analysis = state_analyzer.analyze_query(query)

    print(f"Query: \"{query}\"")
    if analysis["state_exists"]:
        print(f"  Best Tool: {analysis['best_tool']} (confidence: {analysis['confidence']})")
        print(f"  Q-Value: {analysis['best_q_value']:.3f}")
    else:
        print(f"  ❌ Not trained on this query")
    print()

# Find low-confidence states
low_conf = state_analyzer.find_low_confidence_states()
print(f"Found {len(low_conf)} low-confidence states")

# Generate full report
state_analyzer.generate_state_quality_report(important_queries, "state_quality_report.txt")

📉 Visualization Tools

Visualize RL metrics for better understanding.

Complete Visualization Suite

import matplotlib.pyplot as plt
import seaborn as sns

class RLVisualizationSuite:
    """Comprehensive visualization tools for RL monitoring."""

    def __init__(self, rl_manager):
        """
        Args:
            rl_manager: RLManager instance
        """
        self.rl_manager = rl_manager
        sns.set_style("whitegrid")

    def plot_tool_q_values(self, save_path="tool_q_values.png"):
        """Plot Q-values for all tools."""
        tool_avg_q = {}

        for tool in self.rl_manager.tool_names:
            q_values = []
            for state_key in self.rl_manager.q_table.keys():
                if tool in self.rl_manager.q_table[state_key]:
                    q_values.append(self.rl_manager.q_table[state_key][tool])

            tool_avg_q[tool] = sum(q_values) / len(q_values) if q_values else 0.0

        # Plot
        plt.figure(figsize=(10, 6))
        tools = list(tool_avg_q.keys())
        q_values = list(tool_avg_q.values())

        plt.bar(tools, q_values, color='skyblue', edgecolor='navy')
        plt.xlabel("Tool")
        plt.ylabel("Average Q-Value")
        plt.title("Average Q-Value by Tool")
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.savefig(save_path)
        print(f"Tool Q-values plot saved to {save_path}")

    def plot_tool_usage_heatmap(self, save_path="tool_usage_heatmap.png"):
        """Plot heatmap of tool usage across states."""
        import numpy as np

        # Get top 10 most visited states
        state_visits = {
            state_key: sum(self.rl_manager.visit_counts[state_key].values())
            for state_key in self.rl_manager.q_table.keys()
        }
        top_states = sorted(state_visits.items(), key=lambda x: x[1], reverse=True)[:10]

        # Build matrix
        tools = self.rl_manager.tool_names
        matrix = []

        for state_key, _ in top_states:
            row = [self.rl_manager.visit_counts[state_key].get(tool, 0) for tool in tools]
            matrix.append(row)

        # Plot heatmap
        plt.figure(figsize=(12, 8))
        sns.heatmap(
            matrix,
            annot=True,
            fmt='d',
            cmap='YlOrRd',
            xticklabels=tools,
            yticklabels=[f"State {i+1}" for i in range(len(top_states))],
            cbar_kws={'label': 'Visit Count'}
        )
        plt.title("Tool Usage Heatmap (Top 10 States)")
        plt.xlabel("Tool")
        plt.ylabel("State")
        plt.tight_layout()
        plt.savefig(save_path)
        print(f"Tool usage heatmap saved to {save_path}")

    def plot_q_value_distribution(self, save_path="q_value_dist.png"):
        """Plot distribution of Q-values."""
        all_q_values = []

        for state_key in self.rl_manager.q_table.keys():
            for tool, q_value in self.rl_manager.q_table[state_key].items():
                all_q_values.append(q_value)

        plt.figure(figsize=(10, 6))
        plt.hist(all_q_values, bins=50, color='lightblue', edgecolor='navy', alpha=0.7)
        plt.xlabel("Q-Value")
        plt.ylabel("Frequency")
        plt.title("Q-Value Distribution")
        plt.axvline(sum(all_q_values) / len(all_q_values), color='red', linestyle='--', label='Mean')
        plt.legend()
        plt.tight_layout()
        plt.savefig(save_path)
        print(f"Q-value distribution plot saved to {save_path}")

    def plot_exploration_vs_exploitation(self, history, save_path="exploration_vs_exploitation.png"):
        """
        Plot exploration vs exploitation over time.

        Args:
            history: List of dicts with 'exploration' and 'exploitation' counts
        """
        exploration = [h['exploration'] for h in history]
        exploitation = [h['exploitation'] for h in history]

        plt.figure(figsize=(12, 6))
        plt.plot(exploration, label='Exploration', marker='o', color='orange')
        plt.plot(exploitation, label='Exploitation', marker='s', color='blue')
        plt.xlabel("Iteration")
        plt.ylabel("Count")
        plt.title("Exploration vs Exploitation Over Time")
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(save_path)
        print(f"Exploration vs exploitation plot saved to {save_path}")

    def create_comprehensive_dashboard(self, save_dir="rl_dashboard"):
        """Create comprehensive monitoring dashboard."""
        import os
        os.makedirs(save_dir, exist_ok=True)

        # Generate all plots
        self.plot_tool_q_values(f"{save_dir}/tool_q_values.png")
        self.plot_tool_usage_heatmap(f"{save_dir}/tool_usage_heatmap.png")
        self.plot_q_value_distribution(f"{save_dir}/q_value_distribution.png")

        # Generate HTML dashboard
        html_content = f"""
<!DOCTYPE html>
<html>
<head>
    <title>RL Monitoring Dashboard</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
        h1 {{ color: #333; }}
        .plot {{ margin: 20px 0; padding: 10px; background-color: white; border-radius: 5px; }}
        img {{ max-width: 100%; height: auto; }}
    </style>
</head>
<body>
    <h1>RL System Monitoring Dashboard</h1>
    <p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>

    <div class="plot">
        <h2>Tool Q-Values</h2>
        <img src="tool_q_values.png" alt="Tool Q-Values">
    </div>

    <div class="plot">
        <h2>Tool Usage Heatmap</h2>
        <img src="tool_usage_heatmap.png" alt="Tool Usage Heatmap">
    </div>

    <div class="plot">
        <h2>Q-Value Distribution</h2>
        <img src="q_value_distribution.png" alt="Q-Value Distribution">
    </div>
</body>
</html>
"""

        with open(f"{save_dir}/dashboard.html", "w") as f:
            f.write(html_content)

        print(f"Comprehensive dashboard created in {save_dir}/")
        print(f"Open {save_dir}/dashboard.html in a browser to view")

# Usage
viz_suite = RLVisualizationSuite(rl_manager)

# Create comprehensive dashboard
viz_suite.create_comprehensive_dashboard("rl_monitoring_dashboard")

🚨 Alerting and Anomaly Detection

Detect and alert on anomalies in RL behavior.

Anomaly Detector

class RLAnomalyDetector:
    """Detect anomalies in RL system behavior."""

    def __init__(self, rl_manager, alert_thresholds=None):
        """
        Args:
            rl_manager: RLManager instance
            alert_thresholds: Dict of threshold configurations
        """
        self.rl_manager = rl_manager

        # Default thresholds
        self.thresholds = alert_thresholds or {
            "min_q_value": -0.5,
            "max_exploration_rate": 0.8,
            "min_state_visits": 3,
            "q_value_std_dev_max": 1.0,
            "tool_selection_imbalance_ratio": 10.0
        }

        self.alerts = []

    def check_q_value_anomalies(self):
        """Check for Q-value anomalies."""
        stats = self.rl_manager.get_statistics()

        # Check if avg Q-value is too low
        if stats["avg_q_value"] < self.thresholds["min_q_value"]:
            self.alerts.append({
                "type": "LOW_Q_VALUE",
                "severity": "WARNING",
                "message": f"Average Q-value ({stats['avg_q_value']:.3f}) below threshold ({self.thresholds['min_q_value']})",
                "timestamp": datetime.now()
            })

        # Check Q-value variance
        all_q_values = []
        for state in self.rl_manager.q_table.values():
            all_q_values.extend(state.values())

        if all_q_values:
            avg_q = sum(all_q_values) / len(all_q_values)
            variance = sum((q - avg_q) ** 2 for q in all_q_values) / len(all_q_values)
            std_dev = variance ** 0.5

            if std_dev > self.thresholds["q_value_std_dev_max"]:
                self.alerts.append({
                    "type": "HIGH_Q_VARIANCE",
                    "severity": "INFO",
                    "message": f"Q-value std dev ({std_dev:.3f}) is high - model may be unstable",
                    "timestamp": datetime.now()
                })

    def check_exploration_anomalies(self):
        """Check for exploration rate anomalies."""
        exploration_rate = self.rl_manager.exploration_rate

        if exploration_rate > self.thresholds["max_exploration_rate"]:
            self.alerts.append({
                "type": "HIGH_EXPLORATION",
                "severity": "WARNING",
                "message": f"Exploration rate ({exploration_rate:.2%}) is very high",
                "timestamp": datetime.now()
            })

    def check_tool_selection_imbalance(self):
        """Check for imbalanced tool selection."""
        tool_visit_counts = {}

        for tool in self.rl_manager.tool_names:
            total_visits = 0
            for state_key in self.rl_manager.q_table.keys():
                total_visits += self.rl_manager.visit_counts[state_key].get(tool, 0)
            tool_visit_counts[tool] = total_visits

        if tool_visit_counts:
            max_visits = max(tool_visit_counts.values())
            min_visits = min(tool_visit_counts.values())

            if min_visits > 0:
                ratio = max_visits / min_visits

                if ratio > self.thresholds["tool_selection_imbalance_ratio"]:
                    max_tool = max(tool_visit_counts, key=tool_visit_counts.get)
                    min_tool = min(tool_visit_counts, key=tool_visit_counts.get)

                    self.alerts.append({
                        "type": "TOOL_IMBALANCE",
                        "severity": "WARNING",
                        "message": f"Tool selection imbalanced: {max_tool} ({max_visits} visits) vs {min_tool} ({min_visits} visits)",
                        "timestamp": datetime.now()
                    })

    def check_undervisited_states(self):
        """Check for states with insufficient visits."""
        undervisited = 0

        for state_key in self.rl_manager.q_table.keys():
            total_visits = sum(self.rl_manager.visit_counts[state_key].values())

            if total_visits < self.thresholds["min_state_visits"]:
                undervisited += 1

        if undervisited > len(self.rl_manager.q_table) * 0.2:  # More than 20%
            self.alerts.append({
                "type": "UNDERVISITED_STATES",
                "severity": "INFO",
                "message": f"{undervisited} states have fewer than {self.thresholds['min_state_visits']} visits",
                "timestamp": datetime.now()
            })

    def run_full_check(self):
        """Run all anomaly checks."""
        self.alerts.clear()

        self.check_q_value_anomalies()
        self.check_exploration_anomalies()
        self.check_tool_selection_imbalance()
        self.check_undervisited_states()

        return self.alerts

    def print_alerts(self):
        """Print all alerts."""
        if not self.alerts:
            print("✅ No anomalies detected")
            return

        print(f"⚠️  Detected {len(self.alerts)} alert(s):\n")

        for alert in self.alerts:
            severity_emoji = "🔴" if alert["severity"] == "ERROR" else "🟡" if alert["severity"] == "WARNING" else "🔵"
            print(f"{severity_emoji} [{alert['severity']}] {alert['type']}")
            print(f"   {alert['message']}")
            print(f"   Time: {alert['timestamp'].strftime('%H:%M:%S')}\n")

# Usage
detector = RLAnomalyDetector(rl_manager)

# Run anomaly detection
alerts = detector.run_full_check()
detector.print_alerts()

# Continuous monitoring
print("\n=== Continuous Monitoring (every 10 interactions) ===\n")

for i in range(30):
    query = f"Query {i}"
    selected, state_key = rl_manager.select_tools(query, top_n=2)
    reward = 0.7 if i % 2 == 0 else 0.3

    for tool in selected:
        rl_manager.update(state_key, tool, reward)

    # Check for anomalies every 10 interactions
    if (i + 1) % 10 == 0:
        alerts = detector.run_full_check()
        print(f"Checkpoint {i+1}:")
        detector.print_alerts()

🐛 Debugging Techniques

Advanced debugging for RL systems.

Debug Mode

class RLDebugger:
    """Advanced debugging tools for RL systems."""

    def __init__(self, rl_manager, verbose=True):
        """
        Args:
            rl_manager: RLManager instance
            verbose: Enable verbose logging
        """
        self.rl_manager = rl_manager
        self.verbose = verbose
        self.interaction_log = []

    def trace_selection(self, query: str, top_n: int = 2):
        """
        Trace tool selection process with detailed logging.

        Args:
            query: User query
            top_n: Number of tools to select

        Returns: (selected_tools, trace_info)
        """
        print(f"\n{'='*60}")
        print(f"TRACING TOOL SELECTION FOR: \"{query}\"")
        print(f"{'='*60}")

        # Get state key
        if self.rl_manager.use_embeddings:
            state_key = self.rl_manager._get_semantic_state_key(query)
            print(f"\n1. Semantic State Key: {state_key[:50]}...")
        else:
            state_key = query
            print(f"\n1. Direct State Key: {state_key}")

        # Check if state exists
        if state_key not in self.rl_manager.q_table:
            print("   ⚠️  New state (not in Q-table)")
            print("   Initializing with default Q-values...")
        else:
            print("   ✓ State found in Q-table")

        # Show Q-values
        print(f"\n2. Q-Values for this state:")
        q_table = self.rl_manager.q_table[state_key]
        for tool, q_value in sorted(q_table.items(), key=lambda x: x[1], reverse=True):
            visits = self.rl_manager.visit_counts[state_key].get(tool, 0)
            print(f"   {tool:15s}: Q={q_value:6.3f} (visits: {visits})")

        # Exploration decision
        import random
        exploration_roll = random.random()
        will_explore = exploration_roll < self.rl_manager.exploration_rate

        print(f"\n3. Exploration Decision:")
        print(f"   Exploration rate: {self.rl_manager.exploration_rate:.2%}")
        print(f"   Random roll: {exploration_roll:.3f}")
        print(f"   Decision: {'EXPLORE' if will_explore else 'EXPLOIT'}")

        # Tool selection
        selected, _ = self.rl_manager.select_tools(query, top_n=top_n)

        print(f"\n4. Selected Tools:")
        for tool in selected:
            print(f"   - {tool}")

        print(f"\n{'='*60}\n")

        trace_info = {
            "query": query,
            "state_key": state_key,
            "q_values": dict(q_table),
            "exploration_rate": self.rl_manager.exploration_rate,
            "explored": will_explore,
            "selected": selected
        }

        self.interaction_log.append(trace_info)

        return selected, trace_info

    def trace_update(self, state_key: str, tool: str, reward: float):
        """
        Trace Q-value update process.

        Args:
            state_key: State key
            tool: Tool name
            reward: Reward value
        """
        old_q = self.rl_manager.q_table[state_key].get(tool, 0.0)

        print(f"\n{'='*60}")
        print(f"TRACING Q-VALUE UPDATE")
        print(f"{'='*60}")
        print(f"Tool: {tool}")
        print(f"Reward: {reward:.3f}")
        print(f"Old Q-Value: {old_q:.3f}")
        print(f"Learning Rate: {self.rl_manager.learning_rate}")
        print(f"Discount Factor: {self.rl_manager.discount_factor}")

        # Perform update
        self.rl_manager.update(state_key, tool, reward)

        new_q = self.rl_manager.q_table[state_key][tool]
        delta = new_q - old_q

        print(f"\nNew Q-Value: {new_q:.3f}")
        print(f"Change (Δ): {delta:+.3f}")
        print(f"{'='*60}\n")

    def export_debug_log(self, save_path="debug_log.json"):
        """Export interaction log for analysis."""
        import json

        with open(save_path, "w") as f:
            json.dump(self.interaction_log, f, indent=2)

        print(f"Debug log exported to {save_path}")

# Usage
debugger = RLDebugger(rl_manager, verbose=True)

# Trace tool selection
query = "Search for machine learning tutorials"
selected, trace = debugger.trace_selection(query, top_n=2)

# Trace Q-value update
reward = 0.9
for tool in selected:
    debugger.trace_update(trace["state_key"], tool, reward)

# Export debug log
debugger.export_debug_log("rl_debug_log.json")

⚡ Performance Profiling

Profile RL system performance for optimization.

Performance Profiler

import time

class RLPerformanceProfiler:
    """Profile RL system performance."""

    def __init__(self, rl_manager):
        """
        Args:
            rl_manager: RLManager instance
        """
        self.rl_manager = rl_manager
        self.metrics = {
            "select_times": [],
            "update_times": [],
            "persist_times": []
        }

    def profile_selection(self, query: str, top_n: int = 2):
        """Profile tool selection performance."""
        start = time.time()
        selected, state_key = self.rl_manager.select_tools(query, top_n=top_n)
        duration = time.time() - start

        self.metrics["select_times"].append(duration)
        return selected, state_key, duration

    def profile_update(self, state_key: str, tool: str, reward: float):
        """Profile Q-value update performance."""
        start = time.time()
        self.rl_manager.update(state_key, tool, reward)
        duration = time.time() - start

        self.metrics["update_times"].append(duration)
        return duration

    def profile_persistence(self):
        """Profile Q-table persistence performance."""
        start = time.time()
        self.rl_manager.force_persist()
        duration = time.time() - start

        self.metrics["persist_times"].append(duration)
        return duration

    def generate_performance_report(self):
        """Generate performance report."""
        print("=== RL Performance Profile ===\n")

        # Selection performance
        if self.metrics["select_times"]:
            avg_select = sum(self.metrics["select_times"]) / len(self.metrics["select_times"])
            print(f"Tool Selection:")
            print(f"  Avg: {avg_select*1000:.2f}ms")
            print(f"  Min: {min(self.metrics['select_times'])*1000:.2f}ms")
            print(f"  Max: {max(self.metrics['select_times'])*1000:.2f}ms")
            print()

        # Update performance
        if self.metrics["update_times"]:
            avg_update = sum(self.metrics["update_times"]) / len(self.metrics["update_times"])
            print(f"Q-Value Update:")
            print(f"  Avg: {avg_update*1000:.2f}ms")
            print(f"  Min: {min(self.metrics['update_times'])*1000:.2f}ms")
            print(f"  Max: {max(self.metrics['update_times'])*1000:.2f}ms")
            print()

        # Persistence performance
        if self.metrics["persist_times"]:
            avg_persist = sum(self.metrics["persist_times"]) / len(self.metrics["persist_times"])
            print(f"Persistence:")
            print(f"  Avg: {avg_persist*1000:.2f}ms")

# Usage
profiler = RLPerformanceProfiler(rl_manager)

# Profile operations
for i in range(100):
    query = f"Test query {i}"

    # Profile selection
    selected, state_key, select_time = profiler.profile_selection(query)

    # Profile update
    reward = 0.8
    for tool in selected:
        update_time = profiler.profile_update(state_key, tool, reward)

# Profile persistence
persist_time = profiler.profile_persistence()

# Generate report
profiler.generate_performance_report()

🎓 Best Practices

1. Monitor Continuously

# ✅ Set up continuous monitoring
dashboard = RLMonitoringDashboard(rl_manager)
detector = RLAnomalyDetector(rl_manager)

# Run checks regularly
for interaction in production_loop():
    dashboard.record_interaction(...)
    if interaction % 100 == 0:
        detector.run_full_check()

2. Alert on Anomalies

# ✅ Configure alerts
alert_thresholds = {
    "min_q_value": -0.3,
    "max_exploration_rate": 0.5,
    "min_state_visits": 5
}
detector = RLAnomalyDetector(rl_manager, alert_thresholds)

3. Track Q-Value Convergence

# ✅ Monitor convergence
tracker = QValueTracker(rl_manager)

for epoch in training_loop():
    train_epoch()
    if epoch % 10 == 0:
        tracker.snapshot()
        convergence = tracker.get_convergence_metrics()
        if convergence["converged"]:
            break

4. Generate Regular Reports

# ✅ Schedule periodic reports
import schedule

def generate_reports():
    analyzer.generate_tool_report()
    state_analyzer.generate_state_quality_report(important_queries)
    viz_suite.create_comprehensive_dashboard()

schedule.every().day.at("02:00").do(generate_reports)

🎯 Summary

Azcore provides comprehensive monitoring tools:

  1. Real-Time Metrics: Live system statistics and dashboards
  2. Q-Value Tracking: Monitor learning progress and convergence
  3. Tool Performance Analysis: Detailed tool-level analytics
  4. State Quality Reports: Analyze learned state quality
  5. Visualization Tools: Rich visualizations for insights
  6. Alerting: Automatic anomaly detection and alerting
  7. Debugging: Detailed trace logging and debugging
  8. Performance Profiling: Optimize system performance

Use these tools to ensure your RL system is healthy, learning effectively, and performing optimally in production.

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs