Comprehensive guide for monitoring, logging, and observability of Az Core framework applications in production.
Overview
Observability is critical for understanding system behavior, diagnosing issues, and maintaining reliability in production. This guide covers logging, metrics, tracing, alerting, and visualization strategies.
Observability Pillars
Three Pillars of Observability
- Logs: Discrete events with context
- Metrics: Numerical measurements over time
- Traces: Request flows through the system
# observability/__init__.py
from typing import Optional
import logging
from prometheus_client import Counter, Histogram, Gauge
from opentelemetry import trace
class ObservabilityManager:
"""Unified observability management."""
def __init__(self, service_name: str = "arc"):
self.service_name = service_name
# Setup logging
self.logger = logging.getLogger(service_name)
# Setup metrics
self.request_counter = Counter(
'arc_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'arc_request_duration_seconds',
'Request duration',
['method', 'endpoint']
)
self.active_requests = Gauge(
'arc_active_requests',
'Active requests'
)
# Setup tracing
self.tracer = trace.get_tracer(service_name)
def log(self, level: str, message: str, **kwargs):
"""Log with structured context."""
getattr(self.logger, level)(message, extra=kwargs)
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""Record request metrics."""
self.request_counter.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
self.request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
def trace_operation(self, operation_name: str):
"""Create trace span for operation."""
return self.tracer.start_as_current_span(operation_name)
# Usage
obs = ObservabilityManager("arc-production")
obs.log("info", "Processing request", user_id="123", task="analyze")
Logging Strategy
Structured Logging
# logging/structured.py
import logging
import json
from datetime import datetime
from typing import Dict, Any
class StructuredLogger:
"""Structured logging with JSON output."""
def __init__(self, name: str, level: str = "INFO"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level))
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
self.logger.addHandler(handler)
def log(self, level: str, message: str, **context):
"""Log with structured context."""
extra = {"context": context}
getattr(self.logger, level)(message, extra=extra)
def info(self, message: str, **context):
self.log("info", message, **context)
def warning(self, message: str, **context):
self.log("warning", message, **context)
def error(self, message: str, error: Exception = None, **context):
if error:
context["error"] = str(error)
context["error_type"] = type(error).__name__
self.log("error", message, **context)
def debug(self, message: str, **context):
self.log("debug", message, **context)
class JSONFormatter(logging.Formatter):
"""Format logs as JSON."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra context
if hasattr(record, 'context'):
log_data.update(record.context)
return json.dumps(log_data)
# Usage
logger = StructuredLogger("arc")
logger.info(
"Agent invoked",
agent_name="researcher",
task_id="123",
user_id="user_456"
)
logger.error(
"LLM call failed",
error=exception,
model="gpt-4o",
retry_count=3,
duration_ms=5000
)
Log Levels Strategy
# logging/config.py
import logging
from typing import Dict
class LogLevelManager:
"""Manage log levels per component."""
def __init__(self):
self.levels: Dict[str, int] = {
# Core components
"arc.agents": logging.INFO,
"arc.workflows": logging.INFO,
"arc.core": logging.INFO,
# External libraries - reduce noise
"httpx": logging.WARNING,
"openai": logging.WARNING,
"langchain": logging.WARNING,
"urllib3": logging.WARNING,
# Debug specific components
"arc.cache": logging.DEBUG,
"arc.rl": logging.DEBUG,
}
def configure(self):
"""Apply log levels."""
for logger_name, level in self.levels.items():
logging.getLogger(logger_name).setLevel(level)
def set_level(self, logger_name: str, level: str):
"""Set log level for specific logger."""
level_int = getattr(logging, level.upper())
logging.getLogger(logger_name).setLevel(level_int)
self.levels[logger_name] = level_int
# Apply configuration
log_manager = LogLevelManager()
log_manager.configure()
Contextual Logging
# logging/context.py
import contextvars
from typing import Dict, Any
# Context variables for request tracking
request_id_var = contextvars.ContextVar('request_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)
class ContextualLogger:
"""Logger with automatic context injection."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def _get_context(self) -> Dict[str, Any]:
"""Get current context."""
context = {}
request_id = request_id_var.get()
if request_id:
context['request_id'] = request_id
user_id = user_id_var.get()
if user_id:
context['user_id'] = user_id
return context
def log(self, level: str, message: str, **kwargs):
"""Log with automatic context."""
context = self._get_context()
context.update(kwargs)
extra = {"context": context}
getattr(self.logger, level)(message, extra=extra)
def info(self, message: str, **kwargs):
self.log("info", message, **kwargs)
def error(self, message: str, **kwargs):
self.log("error", message, **kwargs)
# Middleware to set context
async def logging_middleware(request, call_next):
"""Set logging context from request."""
import uuid
request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
user_id = request.headers.get('X-User-ID')
request_id_var.set(request_id)
if user_id:
user_id_var.set(user_id)
response = await call_next(request)
response.headers['X-Request-ID'] = request_id
return response
# Usage
logger = ContextualLogger(logging.getLogger("arc"))
logger.info("Processing request") # Automatically includes request_id, user_id
Log Aggregation
# logging/aggregation.py
import logging
from logging.handlers import SysLogHandler
from pythonjsonlogger import jsonlogger
def setup_log_aggregation(
service: str = "datadog",
host: str = "localhost",
port: int = 10514
):
"""Setup log aggregation to external service."""
if service == "datadog":
# Datadog over syslog
handler = SysLogHandler(address=(host, port))
formatter = jsonlogger.JsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
elif service == "elasticsearch":
# Elasticsearch via logstash
from logstash import TCPLogstashHandler
handler = TCPLogstashHandler(host, port, version=1)
elif service == "cloudwatch":
# AWS CloudWatch
import boto3
from watchtower import CloudWatchLogHandler
handler = CloudWatchLogHandler(
log_group='/aws/arc',
stream_name='production'
)
else:
raise ValueError(f"Unknown service: {service}")
# Add handler to root logger
logging.getLogger().addHandler(handler)
Metrics Collection
Prometheus Metrics
# metrics/prometheus.py
from prometheus_client import Counter, Histogram, Gauge, Summary, Info
from prometheus_client import start_http_server
from typing import Dict, Any
import time
class MetricsCollector:
"""Collect and expose Prometheus metrics."""
def __init__(self):
# Request metrics
self.requests_total = Counter(
'arc_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'arc_request_duration_seconds',
'Request duration',
['method', 'endpoint'],
buckets=[.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]
)
# LLM metrics
self.llm_calls_total = Counter(
'arc_llm_calls_total',
'Total LLM calls',
['model', 'status']
)
self.llm_duration = Histogram(
'arc_llm_duration_seconds',
'LLM call duration',
['model'],
buckets=[.1, .5, 1, 2, 5, 10, 30, 60]
)
self.llm_tokens = Counter(
'arc_llm_tokens_total',
'Total LLM tokens',
['model', 'type'] # type: prompt, completion
)
self.llm_cost = Counter(
'arc_llm_cost_dollars',
'Total LLM cost in dollars',
['model']
)
# Agent metrics
self.agent_executions = Counter(
'arc_agent_executions_total',
'Total agent executions',
['agent_name', 'status']
)
self.agent_duration = Histogram(
'arc_agent_duration_seconds',
'Agent execution duration',
['agent_name']
)
# Cache metrics
self.cache_hits = Counter(
'arc_cache_hits_total',
'Cache hits',
['cache_type']
)
self.cache_misses = Counter(
'arc_cache_misses_total',
'Cache misses',
['cache_type']
)
self.cache_size = Gauge(
'arc_cache_size_bytes',
'Cache size in bytes',
['cache_type']
)
# System metrics
self.active_agents = Gauge(
'arc_active_agents',
'Number of active agents'
)
self.memory_usage = Gauge(
'arc_memory_usage_bytes',
'Memory usage in bytes'
)
# Application info
self.app_info = Info('arc_app', 'Application information')
self.app_info.info({
'version': '1.0.0',
'environment': 'production'
})
def record_request(
self,
method: str,
endpoint: str,
status: int,
duration: float
):
"""Record HTTP request metrics."""
self.requests_total.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
self.request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
def record_llm_call(
self,
model: str,
status: str,
duration: float,
prompt_tokens: int,
completion_tokens: int,
cost: float
):
"""Record LLM call metrics."""
self.llm_calls_total.labels(model=model, status=status).inc()
self.llm_duration.labels(model=model).observe(duration)
self.llm_tokens.labels(model=model, type='prompt').inc(prompt_tokens)
self.llm_tokens.labels(model=model, type='completion').inc(completion_tokens)
self.llm_cost.labels(model=model).inc(cost)
def record_agent_execution(
self,
agent_name: str,
status: str,
duration: float
):
"""Record agent execution metrics."""
self.agent_executions.labels(
agent_name=agent_name,
status=status
).inc()
self.agent_duration.labels(
agent_name=agent_name
).observe(duration)
def record_cache_operation(
self,
cache_type: str,
hit: bool,
size_bytes: int = 0
):
"""Record cache operation metrics."""
if hit:
self.cache_hits.labels(cache_type=cache_type).inc()
else:
self.cache_misses.labels(cache_type=cache_type).inc()
if size_bytes > 0:
self.cache_size.labels(cache_type=cache_type).set(size_bytes)
def start_server(self, port: int = 9090):
"""Start Prometheus metrics server."""
start_http_server(port)
print(f"Metrics server started on port {port}")
# Global metrics collector
metrics = MetricsCollector()
# Start metrics server
metrics.start_server(port=9090)
Custom Metrics Decorator
# metrics/decorators.py
from functools import wraps
import time
from typing import Callable
def track_execution(agent_name: str):
"""Decorator to track agent execution metrics."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
metrics.record_agent_execution(
agent_name=agent_name,
status=status,
duration=duration
)
return wrapper
return decorator
def track_llm_call(model: str):
"""Decorator to track LLM call metrics."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = func(*args, **kwargs)
# Extract token counts from result
prompt_tokens = result.get("usage", {}).get("prompt_tokens", 0)
completion_tokens = result.get("usage", {}).get("completion_tokens", 0)
cost = calculate_cost(model, prompt_tokens, completion_tokens)
metrics.record_llm_call(
model=model,
status=status,
duration=time.time() - start_time,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost=cost
)
return result
except Exception as e:
status = "error"
metrics.record_llm_call(
model=model,
status=status,
duration=time.time() - start_time,
prompt_tokens=0,
completion_tokens=0,
cost=0
)
raise
return wrapper
return decorator
# Usage
@track_execution("research_agent")
def execute_agent(task: str):
# Agent logic
pass
@track_llm_call("gpt-4o")
def call_llm(prompt: str):
# LLM call logic
pass
Distributed Tracing
OpenTelemetry Integration
# tracing/opentelemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
def setup_tracing(
service_name: str = "arc",
jaeger_host: str = "localhost",
jaeger_port: int = 6831
):
"""Setup OpenTelemetry distributed tracing."""
# Create resource
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production"
})
# Create tracer provider
tracer_provider = TracerProvider(resource=resource)
# Create Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_host,
agent_port=jaeger_port,
)
# Add span processor
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
# Set as global tracer provider
trace.set_tracer_provider(tracer_provider)
# Auto-instrument libraries
RequestsInstrumentor().instrument()
LoggingInstrumentor().instrument()
print(f"Tracing configured: {service_name} -> {jaeger_host}:{jaeger_port}")
# Tracer instance
tracer = trace.get_tracer(__name__)
def trace_agent_execution(agent_name: str):
"""Decorator to trace agent execution."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.type", "react")
try:
result = func(*args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
return wrapper
return decorator
# Usage
@trace_agent_execution("researcher")
def execute_researcher(task: str):
with tracer.start_as_current_span("llm.call") as llm_span:
llm_span.set_attribute("model", "gpt-4o")
# LLM call logic
with tracer.start_as_current_span("tool.search") as tool_span:
tool_span.set_attribute("tool", "search")
# Tool call logic
return result
Trace Context Propagation
# tracing/propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
def propagate_trace_context(headers: dict) -> dict:
"""Inject trace context into headers."""
inject(headers)
return headers
def extract_trace_context(headers: dict):
"""Extract trace context from headers."""
ctx = extract(headers)
return ctx
# Usage in HTTP requests
import requests
def make_traced_request(url: str, data: dict):
"""Make HTTP request with trace context."""
headers = {}
propagate_trace_context(headers)
response = requests.post(url, json=data, headers=headers)
return response
# Usage in FastAPI
from fastapi import Request
async def traced_endpoint(request: Request):
"""Endpoint with trace context extraction."""
ctx = extract_trace_context(dict(request.headers))
with tracer.start_as_current_span("process_request", context=ctx):
# Process request
pass
Application Performance Monitoring
APM Integration
# apm/datadog.py
from ddtrace import tracer, patch_all
from ddtrace.contrib.logging import patch as log_patch
def setup_datadog_apm(
service_name: str = "arc",
env: str = "production"
):
"""Setup Datadog APM."""
# Patch libraries
patch_all()
# Patch logging
log_patch()
# Configure tracer
tracer.configure(
hostname="datadog-agent",
port=8126,
service=service_name,
env=env
)
print(f"Datadog APM configured: {service_name}")
# apm/newrelic.py
import newrelic.agent
def setup_newrelic_apm(config_file: str = "newrelic.ini"):
"""Setup New Relic APM."""
newrelic.agent.initialize(config_file)
print("New Relic APM configured")
# apm/sentry.py
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
def setup_sentry(
dsn: str,
environment: str = "production",
traces_sample_rate: float = 0.1
):
"""Setup Sentry error tracking and performance monitoring."""
sentry_sdk.init(
dsn=dsn,
environment=environment,
traces_sample_rate=traces_sample_rate,
profiles_sample_rate=0.1,
integrations=[
LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR
)
]
)
print(f"Sentry configured: {environment}")
# Usage
from arc.apm import setup_datadog_apm, setup_sentry
setup_datadog_apm(service_name="arc", env="production")
setup_sentry(
dsn=os.getenv("SENTRY_DSN"),
environment="production"
)
LLM-Specific Monitoring
LLM Call Tracking
# monitoring/llm_tracker.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
import json
@dataclass
class LLMCallMetadata:
"""Metadata for LLM call."""
timestamp: datetime
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
duration_seconds: float
cost_dollars: float
status: str # success, error, timeout
error_message: Optional[str] = None
prompt_preview: Optional[str] = None
response_preview: Optional[str] = None
class LLMMonitor:
"""Monitor LLM calls for cost, performance, and quality."""
def __init__(self):
self.calls: List[LLMCallMetadata] = []
self.total_cost = 0.0
self.total_tokens = 0
def track_call(
self,
model: str,
prompt: str,
response: str,
usage: Dict[str, int],
duration: float,
status: str = "success",
error: Optional[str] = None
):
"""Track individual LLM call."""
# Calculate cost
cost = self._calculate_cost(
model,
usage['prompt_tokens'],
usage['completion_tokens']
)
metadata = LLMCallMetadata(
timestamp=datetime.now(),
model=model,
prompt_tokens=usage['prompt_tokens'],
completion_tokens=usage['completion_tokens'],
total_tokens=usage['total_tokens'],
duration_seconds=duration,
cost_dollars=cost,
status=status,
error_message=error,
prompt_preview=prompt[:100],
response_preview=response[:100] if response else None
)
self.calls.append(metadata)
self.total_cost += cost
self.total_tokens += usage['total_tokens']
# Log to metrics
metrics.record_llm_call(
model=model,
status=status,
duration=duration,
prompt_tokens=usage['prompt_tokens'],
completion_tokens=usage['completion_tokens'],
cost=cost
)
# Log high-cost calls
if cost > 1.0: # $1 threshold
logger.warning(
"High-cost LLM call",
model=model,
cost=cost,
tokens=usage['total_tokens']
)
def _calculate_cost(
self,
model: str,
prompt_tokens: int,
completion_tokens: int
) -> float:
"""Calculate cost based on model pricing."""
# Pricing per 1M tokens (as of 2024)
pricing = {
"gpt-4o": {"prompt": 2.50, "completion": 10.00},
"gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
"gpt-4-turbo": {"prompt": 10.00, "completion": 30.00},
"claude-3-5-sonnet": {"prompt": 3.00, "completion": 15.00},
"claude-3-haiku": {"prompt": 0.25, "completion": 1.25},
}
if model not in pricing:
return 0.0
prompt_cost = (prompt_tokens / 1_000_000) * pricing[model]["prompt"]
completion_cost = (completion_tokens / 1_000_000) * pricing[model]["completion"]
return prompt_cost + completion_cost
def get_statistics(self) -> Dict[str, Any]:
"""Get aggregated statistics."""
if not self.calls:
return {}
successful_calls = [c for c in self.calls if c.status == "success"]
failed_calls = [c for c in self.calls if c.status == "error"]
return {
"total_calls": len(self.calls),
"successful_calls": len(successful_calls),
"failed_calls": len(failed_calls),
"total_cost": self.total_cost,
"total_tokens": self.total_tokens,
"average_cost_per_call": self.total_cost / len(self.calls),
"average_tokens_per_call": self.total_tokens / len(self.calls),
"average_duration": sum(c.duration_seconds for c in self.calls) / len(self.calls),
"models_used": list(set(c.model for c in self.calls))
}
def export_to_json(self, filepath: str):
"""Export call history to JSON."""
data = [
{
"timestamp": c.timestamp.isoformat(),
"model": c.model,
"tokens": {
"prompt": c.prompt_tokens,
"completion": c.completion_tokens,
"total": c.total_tokens
},
"duration_seconds": c.duration_seconds,
"cost_dollars": c.cost_dollars,
"status": c.status
}
for c in self.calls
]
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
# Global LLM monitor
llm_monitor = LLMMonitor()
Token Usage Dashboard
# monitoring/token_dashboard.py
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
class TokenUsageDashboard:
"""Track token usage over time."""
def __init__(self):
self.usage_by_hour: Dict[datetime, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
self.usage_by_model: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
def record_usage(
self,
model: str,
prompt_tokens: int,
completion_tokens: int
):
"""Record token usage."""
hour = datetime.now().replace(minute=0, second=0, microsecond=0)
self.usage_by_hour[hour]["prompt"] += prompt_tokens
self.usage_by_hour[hour]["completion"] += completion_tokens
self.usage_by_hour[hour]["total"] += prompt_tokens + completion_tokens
self.usage_by_model[model]["prompt"] += prompt_tokens
self.usage_by_model[model]["completion"] += completion_tokens
self.usage_by_model[model]["total"] += prompt_tokens + completion_tokens
def get_hourly_usage(self, hours: int = 24) -> List[Dict]:
"""Get token usage for last N hours."""
cutoff = datetime.now() - timedelta(hours=hours)
return [
{
"hour": hour.isoformat(),
"prompt_tokens": usage["prompt"],
"completion_tokens": usage["completion"],
"total_tokens": usage["total"]
}
for hour, usage in sorted(self.usage_by_hour.items())
if hour >= cutoff
]
def get_model_usage(self) -> List[Dict]:
"""Get token usage by model."""
return [
{
"model": model,
"prompt_tokens": usage["prompt"],
"completion_tokens": usage["completion"],
"total_tokens": usage["total"]
}
for model, usage in self.usage_by_model.items()
]
Alerting
Alert Rules
# alerting/rules.py
from dataclasses import dataclass
from typing import Callable, List
from enum import Enum
class AlertSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
"""Alert rule definition."""
name: str
description: str
condition: Callable[[], bool]
severity: AlertSeverity
cooldown_seconds: int = 300 # 5 minutes
class AlertManager:
"""Manage alerts and notifications."""
def __init__(self):
self.rules: List[AlertRule] = []
self.last_alert_times: Dict[str, datetime] = {}
def add_rule(self, rule: AlertRule):
"""Add alert rule."""
self.rules.append(rule)
def check_rules(self):
"""Check all alert rules."""
for rule in self.rules:
try:
if rule.condition():
self._trigger_alert(rule)
except Exception as e:
logger.error(f"Error checking rule {rule.name}: {e}")
def _trigger_alert(self, rule: AlertRule):
"""Trigger alert if not in cooldown."""
now = datetime.now()
last_alert = self.last_alert_times.get(rule.name)
if last_alert:
time_since_last = (now - last_alert).total_seconds()
if time_since_last < rule.cooldown_seconds:
return # Still in cooldown
# Send alert
self._send_alert(rule)
self.last_alert_times[rule.name] = now
def _send_alert(self, rule: AlertRule):
"""Send alert to configured channels."""
message = f"[{rule.severity.upper()}] {rule.name}: {rule.description}"
# Send to Slack
send_slack_alert(message, severity=rule.severity)
# Send to PagerDuty for critical alerts
if rule.severity == AlertSeverity.CRITICAL:
send_pagerduty_alert(rule.name, rule.description)
logger.warning(f"Alert triggered: {rule.name}")
# Define alert rules
alert_manager = AlertManager()
# High error rate
alert_manager.add_rule(AlertRule(
name="high_error_rate",
description="Error rate exceeds 5%",
condition=lambda: get_error_rate() > 0.05,
severity=AlertSeverity.CRITICAL
))
# High LLM cost
alert_manager.add_rule(AlertRule(
name="high_llm_cost",
description="LLM cost exceeds $100/hour",
condition=lambda: get_hourly_llm_cost() > 100,
severity=AlertSeverity.WARNING
))
# Low cache hit rate
alert_manager.add_rule(AlertRule(
name="low_cache_hit_rate",
description="Cache hit rate below 50%",
condition=lambda: get_cache_hit_rate() < 0.5,
severity=AlertSeverity.WARNING
))
# High latency
alert_manager.add_rule(AlertRule(
name="high_latency",
description="P99 latency exceeds 10s",
condition=lambda: get_p99_latency() > 10,
severity=AlertSeverity.WARNING
))
Notification Channels
# alerting/notifications.py
import requests
import json
def send_slack_alert(message: str, severity: str = "warning"):
"""Send alert to Slack."""
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not webhook_url:
return
color = {
"info": "#36a64f",
"warning": "#ff9900",
"critical": "#ff0000"
}.get(severity, "#808080")
payload = {
"attachments": [{
"color": color,
"text": message,
"footer": "Arc Monitoring",
"ts": int(time.time())
}]
}
requests.post(webhook_url, json=payload)
def send_pagerduty_alert(title: str, description: str):
"""Send alert to PagerDuty."""
api_key = os.getenv("PAGERDUTY_API_KEY")
if not api_key:
return
payload = {
"routing_key": api_key,
"event_action": "trigger",
"payload": {
"summary": title,
"severity": "critical",
"source": "arc-production",
"custom_details": {
"description": description
}
}
}
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
def send_email_alert(subject: str, body: str, recipients: List[str]):
"""Send email alert."""
import smtplib
from email.mime.text import MIMEText
smtp_server = os.getenv("SMTP_SERVER")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER")
smtp_password = os.getenv("SMTP_PASSWORD")
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = smtp_user
msg['To'] = ', '.join(recipients)
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.send_message(msg)
Dashboards
Grafana Dashboard Configuration
{
"dashboard": {
"title": "Arc Production Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(arc_requests_total[5m])"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(arc_requests_total{status=~\"5..\"}[5m]) / rate(arc_requests_total[5m])"
}
]
},
{
"title": "P99 Latency",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(arc_request_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "LLM Cost (Hourly)",
"targets": [
{
"expr": "increase(arc_llm_cost_dollars[1h])"
}
]
},
{
"title": "Cache Hit Rate",
"targets": [
{
"expr": "rate(arc_cache_hits_total[5m]) / (rate(arc_cache_hits_total[5m]) + rate(arc_cache_misses_total[5m]))"
}
]
}
]
}
}
Observability Tools
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'arc'
static_configs:
- targets: ['arc:9090']
labels:
environment: 'production'
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- 'alerts.yml'
Alert Rules (Prometheus)
# alerts.yml
groups:
- name: arc_alerts
interval: 30s
rules:
- alert: HighErrorRate
expr: rate(arc_requests_total{status=~"5.."}[5m]) / rate(arc_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
- alert: HighLatency
expr: histogram_quantile(0.99, rate(arc_request_duration_seconds_bucket[5m])) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P99 latency is {{ $value }}s"
- alert: HighLLMCost
expr: increase(arc_llm_cost_dollars[1h]) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "High LLM cost"
description: "Hourly cost is ${{ $value }}"
Best Practices
1. Structured Logging
# Always log with structure
logger.info(
"Agent executed",
agent_name="researcher",
duration_ms=1500,
tokens=1000,
status="success"
)
# Not: logger.info("Agent researcher took 1500ms with 1000 tokens")
2. Appropriate Log Levels
# DEBUG: Detailed information for diagnosing problems
logger.debug("Cache key computed", key=cache_key)
# INFO: General informational messages
logger.info("Agent started", agent_name="researcher")
# WARNING: Something unexpected but not critical
logger.warning("Cache miss", cache_type="llm", key=cache_key)
# ERROR: Error occurred but application continues
logger.error("LLM call failed", error=str(e), retry_count=3)
# CRITICAL: Critical error, application may not continue
logger.critical("Database connection lost", error=str(e))
3. Monitor What Matters
Focus on:
- Request rate and latency
- Error rates
- LLM costs and token usage
- Cache hit rates
- Resource utilization
4. Set Meaningful Alerts
# Good: Actionable alert
alert = AlertRule(
name="high_error_rate_critical",
description="Error rate > 5% for 5 minutes - immediate action required",
condition=lambda: get_error_rate() > 0.05,
severity=AlertSeverity.CRITICAL
)
# Bad: Too noisy
alert = AlertRule(
name="any_error",
description="An error occurred",
condition=lambda: get_error_count() > 0,
severity=AlertSeverity.CRITICAL
)
5. Use Distributed Tracing
# Trace entire request flow
with tracer.start_as_current_span("process_request") as span:
span.set_attribute("user_id", user_id)
with tracer.start_as_current_span("agent.execute"):
agent.execute(task)
with tracer.start_as_current_span("llm.call"):
llm.invoke(prompt)
6. Monitor Cost
# Track and alert on LLM costs
def check_cost_budget():
daily_cost = get_daily_llm_cost()
budget = 1000 # $1000/day
if daily_cost > budget * 0.8:
logger.warning(
"Approaching daily cost budget",
current_cost=daily_cost,
budget=budget,
percentage=daily_cost / budget
)
7. Retain Logs Appropriately
DEBUG logs: 1 day
INFO logs: 7 days
WARNING logs: 30 days
ERROR logs: 90 days
CRITICAL logs: 1 year