• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Guides

Monitoring Guide

Monitor Azcore applications with logging, metrics, and observability tools.

Comprehensive guide for monitoring, logging, and observability of Az Core framework applications in production.

Overview

Observability is critical for understanding system behavior, diagnosing issues, and maintaining reliability in production. This guide covers logging, metrics, tracing, alerting, and visualization strategies.

Observability Pillars

Three Pillars of Observability

  1. Logs: Discrete events with context
  2. Metrics: Numerical measurements over time
  3. Traces: Request flows through the system
# observability/__init__.py
from typing import Optional
import logging
from prometheus_client import Counter, Histogram, Gauge
from opentelemetry import trace

class ObservabilityManager:
    """Unified observability management."""

    def __init__(self, service_name: str = "arc"):
        self.service_name = service_name

        # Setup logging
        self.logger = logging.getLogger(service_name)

        # Setup metrics
        self.request_counter = Counter(
            'arc_requests_total',
            'Total requests',
            ['method', 'endpoint', 'status']
        )

        self.request_duration = Histogram(
            'arc_request_duration_seconds',
            'Request duration',
            ['method', 'endpoint']
        )

        self.active_requests = Gauge(
            'arc_active_requests',
            'Active requests'
        )

        # Setup tracing
        self.tracer = trace.get_tracer(service_name)

    def log(self, level: str, message: str, **kwargs):
        """Log with structured context."""
        getattr(self.logger, level)(message, extra=kwargs)

    def record_request(self, method: str, endpoint: str, status: int, duration: float):
        """Record request metrics."""
        self.request_counter.labels(
            method=method,
            endpoint=endpoint,
            status=status
        ).inc()

        self.request_duration.labels(
            method=method,
            endpoint=endpoint
        ).observe(duration)

    def trace_operation(self, operation_name: str):
        """Create trace span for operation."""
        return self.tracer.start_as_current_span(operation_name)


# Usage
obs = ObservabilityManager("arc-production")
obs.log("info", "Processing request", user_id="123", task="analyze")

Logging Strategy

Structured Logging

# logging/structured.py
import logging
import json
from datetime import datetime
from typing import Dict, Any

class StructuredLogger:
    """Structured logging with JSON output."""

    def __init__(self, name: str, level: str = "INFO"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(getattr(logging, level))

        # JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(JSONFormatter())
        self.logger.addHandler(handler)

    def log(self, level: str, message: str, **context):
        """Log with structured context."""
        extra = {"context": context}
        getattr(self.logger, level)(message, extra=extra)

    def info(self, message: str, **context):
        self.log("info", message, **context)

    def warning(self, message: str, **context):
        self.log("warning", message, **context)

    def error(self, message: str, error: Exception = None, **context):
        if error:
            context["error"] = str(error)
            context["error_type"] = type(error).__name__
        self.log("error", message, **context)

    def debug(self, message: str, **context):
        self.log("debug", message, **context)


class JSONFormatter(logging.Formatter):
    """Format logs as JSON."""

    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }

        # Add extra context
        if hasattr(record, 'context'):
            log_data.update(record.context)

        return json.dumps(log_data)


# Usage
logger = StructuredLogger("arc")

logger.info(
    "Agent invoked",
    agent_name="researcher",
    task_id="123",
    user_id="user_456"
)

logger.error(
    "LLM call failed",
    error=exception,
    model="gpt-4o",
    retry_count=3,
    duration_ms=5000
)

Log Levels Strategy

# logging/config.py
import logging
from typing import Dict

class LogLevelManager:
    """Manage log levels per component."""

    def __init__(self):
        self.levels: Dict[str, int] = {
            # Core components
            "arc.agents": logging.INFO,
            "arc.workflows": logging.INFO,
            "arc.core": logging.INFO,

            # External libraries - reduce noise
            "httpx": logging.WARNING,
            "openai": logging.WARNING,
            "langchain": logging.WARNING,
            "urllib3": logging.WARNING,

            # Debug specific components
            "arc.cache": logging.DEBUG,
            "arc.rl": logging.DEBUG,
        }

    def configure(self):
        """Apply log levels."""
        for logger_name, level in self.levels.items():
            logging.getLogger(logger_name).setLevel(level)

    def set_level(self, logger_name: str, level: str):
        """Set log level for specific logger."""
        level_int = getattr(logging, level.upper())
        logging.getLogger(logger_name).setLevel(level_int)
        self.levels[logger_name] = level_int


# Apply configuration
log_manager = LogLevelManager()
log_manager.configure()

Contextual Logging

# logging/context.py
import contextvars
from typing import Dict, Any

# Context variables for request tracking
request_id_var = contextvars.ContextVar('request_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)

class ContextualLogger:
    """Logger with automatic context injection."""

    def __init__(self, logger: logging.Logger):
        self.logger = logger

    def _get_context(self) -> Dict[str, Any]:
        """Get current context."""
        context = {}

        request_id = request_id_var.get()
        if request_id:
            context['request_id'] = request_id

        user_id = user_id_var.get()
        if user_id:
            context['user_id'] = user_id

        return context

    def log(self, level: str, message: str, **kwargs):
        """Log with automatic context."""
        context = self._get_context()
        context.update(kwargs)

        extra = {"context": context}
        getattr(self.logger, level)(message, extra=extra)

    def info(self, message: str, **kwargs):
        self.log("info", message, **kwargs)

    def error(self, message: str, **kwargs):
        self.log("error", message, **kwargs)


# Middleware to set context
async def logging_middleware(request, call_next):
    """Set logging context from request."""
    import uuid

    request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
    user_id = request.headers.get('X-User-ID')

    request_id_var.set(request_id)
    if user_id:
        user_id_var.set(user_id)

    response = await call_next(request)
    response.headers['X-Request-ID'] = request_id

    return response


# Usage
logger = ContextualLogger(logging.getLogger("arc"))
logger.info("Processing request")  # Automatically includes request_id, user_id

Log Aggregation

# logging/aggregation.py
import logging
from logging.handlers import SysLogHandler
from pythonjsonlogger import jsonlogger

def setup_log_aggregation(
    service: str = "datadog",
    host: str = "localhost",
    port: int = 10514
):
    """Setup log aggregation to external service."""

    if service == "datadog":
        # Datadog over syslog
        handler = SysLogHandler(address=(host, port))
        formatter = jsonlogger.JsonFormatter(
            '%(timestamp)s %(level)s %(name)s %(message)s'
        )
        handler.setFormatter(formatter)

    elif service == "elasticsearch":
        # Elasticsearch via logstash
        from logstash import TCPLogstashHandler
        handler = TCPLogstashHandler(host, port, version=1)

    elif service == "cloudwatch":
        # AWS CloudWatch
        import boto3
        from watchtower import CloudWatchLogHandler
        handler = CloudWatchLogHandler(
            log_group='/aws/arc',
            stream_name='production'
        )

    else:
        raise ValueError(f"Unknown service: {service}")

    # Add handler to root logger
    logging.getLogger().addHandler(handler)

Metrics Collection

Prometheus Metrics

# metrics/prometheus.py
from prometheus_client import Counter, Histogram, Gauge, Summary, Info
from prometheus_client import start_http_server
from typing import Dict, Any
import time

class MetricsCollector:
    """Collect and expose Prometheus metrics."""

    def __init__(self):
        # Request metrics
        self.requests_total = Counter(
            'arc_requests_total',
            'Total requests',
            ['method', 'endpoint', 'status']
        )

        self.request_duration = Histogram(
            'arc_request_duration_seconds',
            'Request duration',
            ['method', 'endpoint'],
            buckets=[.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]
        )

        # LLM metrics
        self.llm_calls_total = Counter(
            'arc_llm_calls_total',
            'Total LLM calls',
            ['model', 'status']
        )

        self.llm_duration = Histogram(
            'arc_llm_duration_seconds',
            'LLM call duration',
            ['model'],
            buckets=[.1, .5, 1, 2, 5, 10, 30, 60]
        )

        self.llm_tokens = Counter(
            'arc_llm_tokens_total',
            'Total LLM tokens',
            ['model', 'type']  # type: prompt, completion
        )

        self.llm_cost = Counter(
            'arc_llm_cost_dollars',
            'Total LLM cost in dollars',
            ['model']
        )

        # Agent metrics
        self.agent_executions = Counter(
            'arc_agent_executions_total',
            'Total agent executions',
            ['agent_name', 'status']
        )

        self.agent_duration = Histogram(
            'arc_agent_duration_seconds',
            'Agent execution duration',
            ['agent_name']
        )

        # Cache metrics
        self.cache_hits = Counter(
            'arc_cache_hits_total',
            'Cache hits',
            ['cache_type']
        )

        self.cache_misses = Counter(
            'arc_cache_misses_total',
            'Cache misses',
            ['cache_type']
        )

        self.cache_size = Gauge(
            'arc_cache_size_bytes',
            'Cache size in bytes',
            ['cache_type']
        )

        # System metrics
        self.active_agents = Gauge(
            'arc_active_agents',
            'Number of active agents'
        )

        self.memory_usage = Gauge(
            'arc_memory_usage_bytes',
            'Memory usage in bytes'
        )

        # Application info
        self.app_info = Info('arc_app', 'Application information')
        self.app_info.info({
            'version': '1.0.0',
            'environment': 'production'
        })

    def record_request(
        self,
        method: str,
        endpoint: str,
        status: int,
        duration: float
    ):
        """Record HTTP request metrics."""
        self.requests_total.labels(
            method=method,
            endpoint=endpoint,
            status=status
        ).inc()

        self.request_duration.labels(
            method=method,
            endpoint=endpoint
        ).observe(duration)

    def record_llm_call(
        self,
        model: str,
        status: str,
        duration: float,
        prompt_tokens: int,
        completion_tokens: int,
        cost: float
    ):
        """Record LLM call metrics."""
        self.llm_calls_total.labels(model=model, status=status).inc()
        self.llm_duration.labels(model=model).observe(duration)
        self.llm_tokens.labels(model=model, type='prompt').inc(prompt_tokens)
        self.llm_tokens.labels(model=model, type='completion').inc(completion_tokens)
        self.llm_cost.labels(model=model).inc(cost)

    def record_agent_execution(
        self,
        agent_name: str,
        status: str,
        duration: float
    ):
        """Record agent execution metrics."""
        self.agent_executions.labels(
            agent_name=agent_name,
            status=status
        ).inc()

        self.agent_duration.labels(
            agent_name=agent_name
        ).observe(duration)

    def record_cache_operation(
        self,
        cache_type: str,
        hit: bool,
        size_bytes: int = 0
    ):
        """Record cache operation metrics."""
        if hit:
            self.cache_hits.labels(cache_type=cache_type).inc()
        else:
            self.cache_misses.labels(cache_type=cache_type).inc()

        if size_bytes > 0:
            self.cache_size.labels(cache_type=cache_type).set(size_bytes)

    def start_server(self, port: int = 9090):
        """Start Prometheus metrics server."""
        start_http_server(port)
        print(f"Metrics server started on port {port}")


# Global metrics collector
metrics = MetricsCollector()

# Start metrics server
metrics.start_server(port=9090)

Custom Metrics Decorator

# metrics/decorators.py
from functools import wraps
import time
from typing import Callable

def track_execution(agent_name: str):
    """Decorator to track agent execution metrics."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            status = "success"

            try:
                result = func(*args, **kwargs)
                return result
            except Exception as e:
                status = "error"
                raise
            finally:
                duration = time.time() - start_time
                metrics.record_agent_execution(
                    agent_name=agent_name,
                    status=status,
                    duration=duration
                )

        return wrapper
    return decorator


def track_llm_call(model: str):
    """Decorator to track LLM call metrics."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            status = "success"

            try:
                result = func(*args, **kwargs)

                # Extract token counts from result
                prompt_tokens = result.get("usage", {}).get("prompt_tokens", 0)
                completion_tokens = result.get("usage", {}).get("completion_tokens", 0)
                cost = calculate_cost(model, prompt_tokens, completion_tokens)

                metrics.record_llm_call(
                    model=model,
                    status=status,
                    duration=time.time() - start_time,
                    prompt_tokens=prompt_tokens,
                    completion_tokens=completion_tokens,
                    cost=cost
                )

                return result
            except Exception as e:
                status = "error"
                metrics.record_llm_call(
                    model=model,
                    status=status,
                    duration=time.time() - start_time,
                    prompt_tokens=0,
                    completion_tokens=0,
                    cost=0
                )
                raise

        return wrapper
    return decorator


# Usage
@track_execution("research_agent")
def execute_agent(task: str):
    # Agent logic
    pass

@track_llm_call("gpt-4o")
def call_llm(prompt: str):
    # LLM call logic
    pass

Distributed Tracing

OpenTelemetry Integration

# tracing/opentelemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor

def setup_tracing(
    service_name: str = "arc",
    jaeger_host: str = "localhost",
    jaeger_port: int = 6831
):
    """Setup OpenTelemetry distributed tracing."""

    # Create resource
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
        "deployment.environment": "production"
    })

    # Create tracer provider
    tracer_provider = TracerProvider(resource=resource)

    # Create Jaeger exporter
    jaeger_exporter = JaegerExporter(
        agent_host_name=jaeger_host,
        agent_port=jaeger_port,
    )

    # Add span processor
    span_processor = BatchSpanProcessor(jaeger_exporter)
    tracer_provider.add_span_processor(span_processor)

    # Set as global tracer provider
    trace.set_tracer_provider(tracer_provider)

    # Auto-instrument libraries
    RequestsInstrumentor().instrument()
    LoggingInstrumentor().instrument()

    print(f"Tracing configured: {service_name} -> {jaeger_host}:{jaeger_port}")


# Tracer instance
tracer = trace.get_tracer(__name__)


def trace_agent_execution(agent_name: str):
    """Decorator to trace agent execution."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(f"agent.{agent_name}") as span:
                span.set_attribute("agent.name", agent_name)
                span.set_attribute("agent.type", "react")

                try:
                    result = func(*args, **kwargs)
                    span.set_attribute("status", "success")
                    return result
                except Exception as e:
                    span.set_attribute("status", "error")
                    span.set_attribute("error.type", type(e).__name__)
                    span.set_attribute("error.message", str(e))
                    raise

        return wrapper
    return decorator


# Usage
@trace_agent_execution("researcher")
def execute_researcher(task: str):
    with tracer.start_as_current_span("llm.call") as llm_span:
        llm_span.set_attribute("model", "gpt-4o")
        # LLM call logic

    with tracer.start_as_current_span("tool.search") as tool_span:
        tool_span.set_attribute("tool", "search")
        # Tool call logic

    return result

Trace Context Propagation

# tracing/propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract

def propagate_trace_context(headers: dict) -> dict:
    """Inject trace context into headers."""
    inject(headers)
    return headers


def extract_trace_context(headers: dict):
    """Extract trace context from headers."""
    ctx = extract(headers)
    return ctx


# Usage in HTTP requests
import requests

def make_traced_request(url: str, data: dict):
    """Make HTTP request with trace context."""
    headers = {}
    propagate_trace_context(headers)

    response = requests.post(url, json=data, headers=headers)
    return response


# Usage in FastAPI
from fastapi import Request

async def traced_endpoint(request: Request):
    """Endpoint with trace context extraction."""
    ctx = extract_trace_context(dict(request.headers))

    with tracer.start_as_current_span("process_request", context=ctx):
        # Process request
        pass

Application Performance Monitoring

APM Integration

# apm/datadog.py
from ddtrace import tracer, patch_all
from ddtrace.contrib.logging import patch as log_patch

def setup_datadog_apm(
    service_name: str = "arc",
    env: str = "production"
):
    """Setup Datadog APM."""

    # Patch libraries
    patch_all()

    # Patch logging
    log_patch()

    # Configure tracer
    tracer.configure(
        hostname="datadog-agent",
        port=8126,
        service=service_name,
        env=env
    )

    print(f"Datadog APM configured: {service_name}")


# apm/newrelic.py
import newrelic.agent

def setup_newrelic_apm(config_file: str = "newrelic.ini"):
    """Setup New Relic APM."""
    newrelic.agent.initialize(config_file)
    print("New Relic APM configured")


# apm/sentry.py
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration

def setup_sentry(
    dsn: str,
    environment: str = "production",
    traces_sample_rate: float = 0.1
):
    """Setup Sentry error tracking and performance monitoring."""

    sentry_sdk.init(
        dsn=dsn,
        environment=environment,
        traces_sample_rate=traces_sample_rate,
        profiles_sample_rate=0.1,
        integrations=[
            LoggingIntegration(
                level=logging.INFO,
                event_level=logging.ERROR
            )
        ]
    )

    print(f"Sentry configured: {environment}")


# Usage
from arc.apm import setup_datadog_apm, setup_sentry

setup_datadog_apm(service_name="arc", env="production")
setup_sentry(
    dsn=os.getenv("SENTRY_DSN"),
    environment="production"
)

LLM-Specific Monitoring

LLM Call Tracking

# monitoring/llm_tracker.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
import json

@dataclass
class LLMCallMetadata:
    """Metadata for LLM call."""
    timestamp: datetime
    model: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    duration_seconds: float
    cost_dollars: float
    status: str  # success, error, timeout
    error_message: Optional[str] = None
    prompt_preview: Optional[str] = None
    response_preview: Optional[str] = None

class LLMMonitor:
    """Monitor LLM calls for cost, performance, and quality."""

    def __init__(self):
        self.calls: List[LLMCallMetadata] = []
        self.total_cost = 0.0
        self.total_tokens = 0

    def track_call(
        self,
        model: str,
        prompt: str,
        response: str,
        usage: Dict[str, int],
        duration: float,
        status: str = "success",
        error: Optional[str] = None
    ):
        """Track individual LLM call."""

        # Calculate cost
        cost = self._calculate_cost(
            model,
            usage['prompt_tokens'],
            usage['completion_tokens']
        )

        metadata = LLMCallMetadata(
            timestamp=datetime.now(),
            model=model,
            prompt_tokens=usage['prompt_tokens'],
            completion_tokens=usage['completion_tokens'],
            total_tokens=usage['total_tokens'],
            duration_seconds=duration,
            cost_dollars=cost,
            status=status,
            error_message=error,
            prompt_preview=prompt[:100],
            response_preview=response[:100] if response else None
        )

        self.calls.append(metadata)
        self.total_cost += cost
        self.total_tokens += usage['total_tokens']

        # Log to metrics
        metrics.record_llm_call(
            model=model,
            status=status,
            duration=duration,
            prompt_tokens=usage['prompt_tokens'],
            completion_tokens=usage['completion_tokens'],
            cost=cost
        )

        # Log high-cost calls
        if cost > 1.0:  # $1 threshold
            logger.warning(
                "High-cost LLM call",
                model=model,
                cost=cost,
                tokens=usage['total_tokens']
            )

    def _calculate_cost(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int
    ) -> float:
        """Calculate cost based on model pricing."""

        # Pricing per 1M tokens (as of 2024)
        pricing = {
            "gpt-4o": {"prompt": 2.50, "completion": 10.00},
            "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
            "gpt-4-turbo": {"prompt": 10.00, "completion": 30.00},
            "claude-3-5-sonnet": {"prompt": 3.00, "completion": 15.00},
            "claude-3-haiku": {"prompt": 0.25, "completion": 1.25},
        }

        if model not in pricing:
            return 0.0

        prompt_cost = (prompt_tokens / 1_000_000) * pricing[model]["prompt"]
        completion_cost = (completion_tokens / 1_000_000) * pricing[model]["completion"]

        return prompt_cost + completion_cost

    def get_statistics(self) -> Dict[str, Any]:
        """Get aggregated statistics."""
        if not self.calls:
            return {}

        successful_calls = [c for c in self.calls if c.status == "success"]
        failed_calls = [c for c in self.calls if c.status == "error"]

        return {
            "total_calls": len(self.calls),
            "successful_calls": len(successful_calls),
            "failed_calls": len(failed_calls),
            "total_cost": self.total_cost,
            "total_tokens": self.total_tokens,
            "average_cost_per_call": self.total_cost / len(self.calls),
            "average_tokens_per_call": self.total_tokens / len(self.calls),
            "average_duration": sum(c.duration_seconds for c in self.calls) / len(self.calls),
            "models_used": list(set(c.model for c in self.calls))
        }

    def export_to_json(self, filepath: str):
        """Export call history to JSON."""
        data = [
            {
                "timestamp": c.timestamp.isoformat(),
                "model": c.model,
                "tokens": {
                    "prompt": c.prompt_tokens,
                    "completion": c.completion_tokens,
                    "total": c.total_tokens
                },
                "duration_seconds": c.duration_seconds,
                "cost_dollars": c.cost_dollars,
                "status": c.status
            }
            for c in self.calls
        ]

        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)


# Global LLM monitor
llm_monitor = LLMMonitor()

Token Usage Dashboard

# monitoring/token_dashboard.py
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List

class TokenUsageDashboard:
    """Track token usage over time."""

    def __init__(self):
        self.usage_by_hour: Dict[datetime, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self.usage_by_model: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))

    def record_usage(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int
    ):
        """Record token usage."""
        hour = datetime.now().replace(minute=0, second=0, microsecond=0)

        self.usage_by_hour[hour]["prompt"] += prompt_tokens
        self.usage_by_hour[hour]["completion"] += completion_tokens
        self.usage_by_hour[hour]["total"] += prompt_tokens + completion_tokens

        self.usage_by_model[model]["prompt"] += prompt_tokens
        self.usage_by_model[model]["completion"] += completion_tokens
        self.usage_by_model[model]["total"] += prompt_tokens + completion_tokens

    def get_hourly_usage(self, hours: int = 24) -> List[Dict]:
        """Get token usage for last N hours."""
        cutoff = datetime.now() - timedelta(hours=hours)

        return [
            {
                "hour": hour.isoformat(),
                "prompt_tokens": usage["prompt"],
                "completion_tokens": usage["completion"],
                "total_tokens": usage["total"]
            }
            for hour, usage in sorted(self.usage_by_hour.items())
            if hour >= cutoff
        ]

    def get_model_usage(self) -> List[Dict]:
        """Get token usage by model."""
        return [
            {
                "model": model,
                "prompt_tokens": usage["prompt"],
                "completion_tokens": usage["completion"],
                "total_tokens": usage["total"]
            }
            for model, usage in self.usage_by_model.items()
        ]

Alerting

Alert Rules

# alerting/rules.py
from dataclasses import dataclass
from typing import Callable, List
from enum import Enum

class AlertSeverity(str, Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    """Alert rule definition."""
    name: str
    description: str
    condition: Callable[[], bool]
    severity: AlertSeverity
    cooldown_seconds: int = 300  # 5 minutes

class AlertManager:
    """Manage alerts and notifications."""

    def __init__(self):
        self.rules: List[AlertRule] = []
        self.last_alert_times: Dict[str, datetime] = {}

    def add_rule(self, rule: AlertRule):
        """Add alert rule."""
        self.rules.append(rule)

    def check_rules(self):
        """Check all alert rules."""
        for rule in self.rules:
            try:
                if rule.condition():
                    self._trigger_alert(rule)
            except Exception as e:
                logger.error(f"Error checking rule {rule.name}: {e}")

    def _trigger_alert(self, rule: AlertRule):
        """Trigger alert if not in cooldown."""
        now = datetime.now()
        last_alert = self.last_alert_times.get(rule.name)

        if last_alert:
            time_since_last = (now - last_alert).total_seconds()
            if time_since_last < rule.cooldown_seconds:
                return  # Still in cooldown

        # Send alert
        self._send_alert(rule)
        self.last_alert_times[rule.name] = now

    def _send_alert(self, rule: AlertRule):
        """Send alert to configured channels."""
        message = f"[{rule.severity.upper()}] {rule.name}: {rule.description}"

        # Send to Slack
        send_slack_alert(message, severity=rule.severity)

        # Send to PagerDuty for critical alerts
        if rule.severity == AlertSeverity.CRITICAL:
            send_pagerduty_alert(rule.name, rule.description)

        logger.warning(f"Alert triggered: {rule.name}")


# Define alert rules
alert_manager = AlertManager()

# High error rate
alert_manager.add_rule(AlertRule(
    name="high_error_rate",
    description="Error rate exceeds 5%",
    condition=lambda: get_error_rate() > 0.05,
    severity=AlertSeverity.CRITICAL
))

# High LLM cost
alert_manager.add_rule(AlertRule(
    name="high_llm_cost",
    description="LLM cost exceeds $100/hour",
    condition=lambda: get_hourly_llm_cost() > 100,
    severity=AlertSeverity.WARNING
))

# Low cache hit rate
alert_manager.add_rule(AlertRule(
    name="low_cache_hit_rate",
    description="Cache hit rate below 50%",
    condition=lambda: get_cache_hit_rate() < 0.5,
    severity=AlertSeverity.WARNING
))

# High latency
alert_manager.add_rule(AlertRule(
    name="high_latency",
    description="P99 latency exceeds 10s",
    condition=lambda: get_p99_latency() > 10,
    severity=AlertSeverity.WARNING
))

Notification Channels

# alerting/notifications.py
import requests
import json

def send_slack_alert(message: str, severity: str = "warning"):
    """Send alert to Slack."""
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")
    if not webhook_url:
        return

    color = {
        "info": "#36a64f",
        "warning": "#ff9900",
        "critical": "#ff0000"
    }.get(severity, "#808080")

    payload = {
        "attachments": [{
            "color": color,
            "text": message,
            "footer": "Arc Monitoring",
            "ts": int(time.time())
        }]
    }

    requests.post(webhook_url, json=payload)


def send_pagerduty_alert(title: str, description: str):
    """Send alert to PagerDuty."""
    api_key = os.getenv("PAGERDUTY_API_KEY")
    if not api_key:
        return

    payload = {
        "routing_key": api_key,
        "event_action": "trigger",
        "payload": {
            "summary": title,
            "severity": "critical",
            "source": "arc-production",
            "custom_details": {
                "description": description
            }
        }
    }

    requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload
    )


def send_email_alert(subject: str, body: str, recipients: List[str]):
    """Send email alert."""
    import smtplib
    from email.mime.text import MIMEText

    smtp_server = os.getenv("SMTP_SERVER")
    smtp_port = int(os.getenv("SMTP_PORT", "587"))
    smtp_user = os.getenv("SMTP_USER")
    smtp_password = os.getenv("SMTP_PASSWORD")

    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = smtp_user
    msg['To'] = ', '.join(recipients)

    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(smtp_user, smtp_password)
        server.send_message(msg)

Dashboards

Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "Arc Production Monitoring",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(arc_requests_total[5m])"
          }
        ]
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(arc_requests_total{status=~\"5..\"}[5m]) / rate(arc_requests_total[5m])"
          }
        ]
      },
      {
        "title": "P99 Latency",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(arc_request_duration_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "LLM Cost (Hourly)",
        "targets": [
          {
            "expr": "increase(arc_llm_cost_dollars[1h])"
          }
        ]
      },
      {
        "title": "Cache Hit Rate",
        "targets": [
          {
            "expr": "rate(arc_cache_hits_total[5m]) / (rate(arc_cache_hits_total[5m]) + rate(arc_cache_misses_total[5m]))"
          }
        ]
      }
    ]
  }
}

Observability Tools

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'arc'
    static_configs:
      - targets: ['arc:9090']
        labels:
          environment: 'production'

  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

rule_files:
  - 'alerts.yml'

Alert Rules (Prometheus)

# alerts.yml
groups:
  - name: arc_alerts
    interval: 30s
    rules:
      - alert: HighErrorRate
        expr: rate(arc_requests_total{status=~"5.."}[5m]) / rate(arc_requests_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"

      - alert: HighLatency
        expr: histogram_quantile(0.99, rate(arc_request_duration_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "P99 latency is {{ $value }}s"

      - alert: HighLLMCost
        expr: increase(arc_llm_cost_dollars[1h]) > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High LLM cost"
          description: "Hourly cost is ${{ $value }}"

Best Practices

1. Structured Logging

# Always log with structure
logger.info(
    "Agent executed",
    agent_name="researcher",
    duration_ms=1500,
    tokens=1000,
    status="success"
)

# Not: logger.info("Agent researcher took 1500ms with 1000 tokens")

2. Appropriate Log Levels

# DEBUG: Detailed information for diagnosing problems
logger.debug("Cache key computed", key=cache_key)

# INFO: General informational messages
logger.info("Agent started", agent_name="researcher")

# WARNING: Something unexpected but not critical
logger.warning("Cache miss", cache_type="llm", key=cache_key)

# ERROR: Error occurred but application continues
logger.error("LLM call failed", error=str(e), retry_count=3)

# CRITICAL: Critical error, application may not continue
logger.critical("Database connection lost", error=str(e))

3. Monitor What Matters

Focus on:

  • Request rate and latency
  • Error rates
  • LLM costs and token usage
  • Cache hit rates
  • Resource utilization

4. Set Meaningful Alerts

# Good: Actionable alert
alert = AlertRule(
    name="high_error_rate_critical",
    description="Error rate > 5% for 5 minutes - immediate action required",
    condition=lambda: get_error_rate() > 0.05,
    severity=AlertSeverity.CRITICAL
)

# Bad: Too noisy
alert = AlertRule(
    name="any_error",
    description="An error occurred",
    condition=lambda: get_error_count() > 0,
    severity=AlertSeverity.CRITICAL
)

5. Use Distributed Tracing

# Trace entire request flow
with tracer.start_as_current_span("process_request") as span:
    span.set_attribute("user_id", user_id)

    with tracer.start_as_current_span("agent.execute"):
        agent.execute(task)

    with tracer.start_as_current_span("llm.call"):
        llm.invoke(prompt)

6. Monitor Cost

# Track and alert on LLM costs
def check_cost_budget():
    daily_cost = get_daily_llm_cost()
    budget = 1000  # $1000/day

    if daily_cost > budget * 0.8:
        logger.warning(
            "Approaching daily cost budget",
            current_cost=daily_cost,
            budget=budget,
            percentage=daily_cost / budget
        )

7. Retain Logs Appropriately

DEBUG logs: 1 day
INFO logs: 7 days
WARNING logs: 30 days
ERROR logs: 90 days
CRITICAL logs: 1 year
Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs