Comprehensive troubleshooting guide for Az Core framework applications covering common issues, debugging techniques, and problem resolution strategies.
Overview
This guide helps you diagnose and resolve common issues in Az Core applications, from development to production environments.
Common Issues
Installation Issues
Issue: Dependencies fail to install
Symptoms:
ERROR: Could not find a version that satisfies the requirement
Solutions:
# Update pip
python -m pip install --upgrade pip
# Install with specific Python version
python3.11 -m pip install azcore
# Clear cache and reinstall
pip cache purge
pip install --no-cache-dir azcore
# Install from source
git clone https://github.com/yourusername/azcore.git
cd Arc
pip install -e .
Issue: Import errors
Symptoms:
ImportError: cannot import name 'ReactAgent' from 'azcore.agents'
Solutions:
# Verify installation
pip show azcore
# Check Python path
python -c "import sys; print('\n'.join(sys.path))"
# Reinstall in editable mode
pip uninstall azcore
pip install -e .
# Check for conflicting packages
pip list | grep azcore
Configuration Issues
Issue: API keys not found
Symptoms:
ValueError: Required environment variable OPENAI_API_KEY is not set
Solutions:
# Check environment variables
env | grep API_KEY
# Load from .env file
python -c "from dotenv import load_dotenv; load_dotenv(); import os; print(os.getenv('OPENAI_API_KEY'))"
# Set temporarily
export OPENAI_API_KEY="sk-..."
# Verify .env file exists and has correct format
cat .env
# Should have: OPENAI_API_KEY=sk-...
Issue: Invalid configuration
Symptoms:
ValidationError: Invalid configuration
Solutions:
# Validate configuration
from azcore.config import Settings
try:
settings = Settings()
print("Configuration valid")
except Exception as e:
print(f"Configuration error: {e}")
# Check specific values
print(f"LLM Model: {settings.llm_model}")
print(f"Temperature: {settings.llm_temperature}")
LLM Issues
Issue: Rate limit exceeded
Symptoms:
RateLimitError: Rate limit reached for requests
Solutions:
# 1. Implement exponential backoff
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
def call_llm(prompt):
return llm.invoke(prompt)
# 2. Use rate limiting
from ratelimit import limits, sleep_and_retry
@sleep_and_retry
@limits(calls=50, period=60) # 50 calls per minute
def call_llm(prompt):
return llm.invoke(prompt)
# 3. Implement request queuing
import asyncio
from asyncio import Queue, Semaphore
class RateLimitedLLM:
def __init__(self, max_concurrent=10):
self.semaphore = Semaphore(max_concurrent)
self.queue = Queue()
async def invoke(self, prompt):
async with self.semaphore:
await asyncio.sleep(0.1) # Small delay
return await llm.ainvoke(prompt)
Issue: Context length exceeded
Symptoms:
InvalidRequestError: This model's maximum context length is 4097 tokens
Solutions:
# 1. Truncate conversation history
def truncate_messages(messages, max_tokens=3000):
"""Keep only recent messages that fit in context."""
total_tokens = 0
truncated = []
for msg in reversed(messages):
msg_tokens = len(msg["content"]) // 4 # Rough estimate
if total_tokens + msg_tokens > max_tokens:
break
truncated.insert(0, msg)
total_tokens += msg_tokens
return truncated
# 2. Summarize old messages
from langchain_openai import ChatOpenAI
def summarize_history(messages):
"""Summarize old messages to reduce tokens."""
if len(messages) < 10:
return messages
# Summarize old messages
old_messages = messages[:-5]
recent_messages = messages[-5:]
summary_llm = ChatOpenAI(model="gpt-4o-mini")
summary = summary_llm.invoke(
f"Summarize this conversation:\n{old_messages}"
)
return [
{"role": "system", "content": f"Previous: {summary}"},
*recent_messages
]
# 3. Use a larger context model
llm = ChatOpenAI(model="gpt-4-turbo-preview") # 128k context
Issue: Timeout errors
Symptoms:
TimeoutError: Request timed out
Solutions:
# 1. Increase timeout
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4o-mini",
request_timeout=120, # 2 minutes
max_retries=3
)
# 2. Use streaming for long responses
async def stream_response(prompt):
"""Stream response to avoid timeouts."""
async for chunk in llm.astream(prompt):
yield chunk
# 3. Split large requests
def chunk_prompt(prompt, max_length=2000):
"""Split large prompt into chunks."""
words = prompt.split()
chunks = []
current_chunk = []
for word in words:
current_chunk.append(word)
if len(' '.join(current_chunk)) > max_length:
chunks.append(' '.join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
Issue: Inconsistent responses
Symptoms:
- Same input produces different outputs
- Unpredictable behavior
Solutions:
# 1. Set temperature to 0 for deterministic outputs
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0 # Deterministic
)
# 2. Set seed for reproducibility (when supported)
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
model_kwargs={"seed": 42}
)
# 3. Use self-consistency pattern
from azcore.agents import SelfConsistencyAgent
agent = SelfConsistencyAgent(
name="consistent",
llm=llm,
num_samples=5 # Multiple samples, vote on answer
)
Agent Issues
Issue: Agent not responding
Symptoms:
- Agent returns empty response
- No error message
Solutions:
# 1. Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("azcore")
logger.setLevel(logging.DEBUG)
# 2. Check agent state
result = agent.invoke(state)
print(f"State: {state}")
print(f"Result: {result}")
print(f"Messages: {result.get('messages', [])}")
# 3. Verify LLM is working
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
test_response = llm.invoke("test")
print(f"LLM Response: {test_response}")
# 4. Check for errors in result
if "error" in result:
print(f"Error: {result['error']}")
Issue: Agent using wrong tools
Symptoms:
- Agent doesn't use available tools
- Agent uses wrong tool
Solutions:
# 1. Verify tools are properly configured
print(f"Agent tools: {[tool.name for tool in agent.tools]}")
# 2. Improve tool descriptions
from langchain.tools import Tool
better_tool = Tool(
name="search",
description="Search for current information on the internet. Use this when you need up-to-date facts, news, or information not in your training data.",
func=search_function
)
# 3. Add tool usage examples to prompt
prompt = """You are a helpful assistant with access to tools.
Available tools:
- search: Use to find current information
- calculator: Use for mathematical calculations
Examples:
User: "What is the weather today?"
Assistant: [Uses search tool]
User: "What is 15% of 240?"
Assistant: [Uses calculator tool]
Now help the user:"""
# 4. Use tool-calling models
llm = ChatOpenAI(
model="gpt-4o", # Better at tool use
temperature=0
)
Issue: Agent stuck in loop
Symptoms:
- Agent repeats same action
- Never completes task
Solutions:
# 1. Set max iterations
from azcore.agents import ReactAgent
agent = ReactAgent(
name="agent",
llm=llm,
max_loops=5 # Limit iterations
)
# 2. Add loop detection
class LoopDetector:
def __init__(self, max_repeats=3):
self.action_history = []
self.max_repeats = max_repeats
def check_loop(self, action):
self.action_history.append(action)
# Check for repeated actions
recent = self.action_history[-self.max_repeats:]
if len(set(recent)) == 1:
raise Exception("Agent stuck in loop")
# 3. Add state change detection
previous_state = None
for i in range(max_iterations):
current_state = agent.invoke(state)
if current_state == previous_state:
print("No progress, stopping")
break
previous_state = current_state
Workflow Issues
Issue: Workflow fails partway through
Symptoms:
- Some agents complete, others don't
- Incomplete results
Solutions:
# 1. Add error handling per agent
from azcore.workflows import SequentialWorkflow
class RobustWorkflow(SequentialWorkflow):
def run(self, task):
results = []
for agent in self.agents:
try:
result = agent.invoke({"messages": [{"role": "user", "content": task}]})
results.append(result)
task = result["messages"][-1]["content"]
except Exception as e:
logger.error(f"Agent {agent.name} failed: {e}")
# Continue with error handling
results.append({"error": str(e)})
break
return results
# 2. Implement checkpointing
class CheckpointedWorkflow:
def __init__(self, agents, checkpoint_dir="checkpoints"):
self.agents = agents
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def run(self, task, workflow_id):
# Load checkpoint if exists
checkpoint = self._load_checkpoint(workflow_id)
start_index = checkpoint.get("last_completed_agent", 0)
results = checkpoint.get("results", [])
for i, agent in enumerate(self.agents[start_index:], start=start_index):
result = agent.invoke({"messages": [{"role": "user", "content": task}]})
results.append(result)
# Save checkpoint
self._save_checkpoint(workflow_id, {
"last_completed_agent": i + 1,
"results": results
})
task = result["messages"][-1]["content"]
return results
# 3. Add retry logic
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def run_workflow_with_retry(workflow, task):
return workflow.run(task)
Performance Issues
Issue: Slow response times
Symptoms:
- Requests take > 10 seconds
- High latency
Solutions:
# 1. Profile performance
import time
def profile_execution(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"{func.__name__} took {duration:.2f}s")
return result
return wrapper
@profile_execution
def slow_function():
# Find bottleneck
pass
# 2. Enable caching
from azcore.utils.cached_llm import CachedLLM
llm = ChatOpenAI(model="gpt-4o-mini")
cached_llm = CachedLLM(llm, cache_type="exact")
# 3. Use async for parallel operations
import asyncio
async def parallel_agents(tasks):
"""Run multiple agents in parallel."""
results = await asyncio.gather(*[
agent.ainvoke(task) for task in tasks
])
return results
# 4. Check for network issues
import httpx
async def test_llm_latency():
"""Measure LLM API latency."""
start = time.time()
async with httpx.AsyncClient() as client:
response = await client.get("https://api.openai.com")
latency = time.time() - start
print(f"API latency: {latency:.3f}s")
Issue: High memory usage
Symptoms:
MemoryError: Out of memory
Solutions:
# 1. Monitor memory usage
import psutil
def check_memory():
"""Check current memory usage."""
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Memory usage: {memory_mb:.2f} MB")
# 2. Clear conversation history
def clear_old_messages(messages, keep_last=10):
"""Keep only recent messages."""
return messages[-keep_last:]
# 3. Use generators for large datasets
def process_large_file(filepath):
"""Process file line by line instead of loading all."""
with open(filepath, 'r') as f:
for line in f:
yield process_line(line)
# 4. Clear caches periodically
from azcore.utils.caching import clear_all_caches
# Clear caches every hour
import schedule
schedule.every(1).hour.do(clear_all_caches)
# 5. Use __slots__ for classes
class Message:
__slots__ = ['role', 'content'] # Reduces memory
def __init__(self, role, content):
self.role = role
self.content = content
Memory Issues
Issue: Memory leaks
Symptoms:
- Memory usage grows over time
- Application crashes after running
Solutions:
# 1. Profile memory usage
from memory_profiler import profile
@profile
def potentially_leaking_function():
# Find memory leaks
pass
# 2. Use weak references for caches
import weakref
class WeakCache:
def __init__(self):
self.cache = weakref.WeakValueDictionary()
# 3. Explicitly delete large objects
def process_large_data():
data = load_large_dataset()
result = process(data)
del data # Free memory immediately
return result
# 4. Use context managers
class ResourceManager:
def __enter__(self):
self.resource = acquire_resource()
return self.resource
def __exit__(self, exc_type, exc_val, exc_tb):
release_resource(self.resource)
del self.resource
with ResourceManager() as resource:
use(resource)
# Resource automatically cleaned up
Network Issues
Issue: Connection errors
Symptoms:
ConnectionError: Failed to establish connection
Solutions:
# 1. Check network connectivity
import requests
def test_connectivity():
"""Test connection to LLM API."""
try:
response = requests.get(
"https://api.openai.com",
timeout=5
)
print(f"Connection OK: {response.status_code}")
except Exception as e:
print(f"Connection failed: {e}")
# 2. Configure retries
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
session = requests.Session()
retry = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 3. Use proxies if needed
import os
proxies = {
'http': os.getenv('HTTP_PROXY'),
'https': os.getenv('HTTPS_PROXY')
}
llm = ChatOpenAI(
model="gpt-4o-mini",
model_kwargs={"proxies": proxies}
)
# 4. Check firewall settings
# Ensure ports 443 (HTTPS) and 80 (HTTP) are open
Debugging Techniques
Enable Debug Logging
# Set up comprehensive logging
import logging
# Configure root logger
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('debug.log'),
logging.StreamHandler()
]
)
# Set specific loggers
logging.getLogger('azcore').setLevel(logging.DEBUG)
logging.getLogger('langchain').setLevel(logging.INFO)
logging.getLogger('httpx').setLevel(logging.WARNING)
Interactive Debugging
# Using pdb
import pdb
def problematic_function():
result = some_operation()
pdb.set_trace() # Breakpoint here
return result
# Using ipdb (better interface)
import ipdb
def debug_agent(agent, task):
result = agent.invoke({"messages": [{"role": "user", "content": task}]})
ipdb.set_trace() # Inspect result
return result
# Using breakpoint() (Python 3.7+)
def debug_function():
result = operation()
breakpoint() # Drops into debugger
return result
Trace Execution
# Trace function calls
import sys
def trace_calls(frame, event, arg):
if event == 'call':
filename = frame.f_code.co_filename
funcname = frame.f_code.co_name
print(f"Calling {funcname} in {filename}")
return trace_calls
sys.settrace(trace_calls)
# Run code to trace
agent.invoke(task)
sys.settrace(None) # Disable tracing
Log Analysis
Analyzing Error Patterns
# log_analyzer.py
import re
from collections import Counter
def analyze_logs(log_file):
"""Analyze log file for patterns."""
errors = []
warnings = []
with open(log_file, 'r') as f:
for line in f:
if 'ERROR' in line:
errors.append(line)
elif 'WARNING' in line:
warnings.append(line)
# Count error types
error_types = Counter()
for error in errors:
# Extract error type
match = re.search(r'(\w+Error):', error)
if match:
error_types[match.group(1)] += 1
print(f"Total errors: {len(errors)}")
print(f"Total warnings: {len(warnings)}")
print(f"\nTop errors:")
for error_type, count in error_types.most_common(5):
print(f" {error_type}: {count}")
# Usage
analyze_logs('app.log')
Recovery Procedures
Graceful Shutdown
# Handle graceful shutdown
import signal
import sys
def signal_handler(sig, frame):
"""Handle shutdown signal."""
print("Shutting down gracefully...")
# 1. Stop accepting new requests
stop_accepting_requests()
# 2. Wait for in-flight requests to complete
wait_for_completion(timeout=30)
# 3. Save state
save_application_state()
# 4. Close connections
close_all_connections()
print("Shutdown complete")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
State Recovery
# Implement state recovery
import pickle
from pathlib import Path
class StateManager:
"""Manage application state for recovery."""
def __init__(self, state_dir="state"):
self.state_dir = Path(state_dir)
self.state_dir.mkdir(exist_ok=True)
def save_state(self, state_id, state):
"""Save state to disk."""
state_file = self.state_dir / f"{state_id}.pkl"
with open(state_file, 'wb') as f:
pickle.dump(state, f)
def load_state(self, state_id):
"""Load state from disk."""
state_file = self.state_dir / f"{state_id}.pkl"
if not state_file.exists():
return None
with open(state_file, 'rb') as f:
return pickle.load(f)
def recover(self, state_id):
"""Recover from saved state."""
state = self.load_state(state_id)
if state:
print(f"Recovering state: {state_id}")
return state
else:
print(f"No saved state found: {state_id}")
return None
# Usage
state_manager = StateManager()
# Save state periodically
state_manager.save_state("workflow_123", current_state)
# Recover after crash
recovered_state = state_manager.recover("workflow_123")
if recovered_state:
continue_from_state(recovered_state)
Quick Reference
Common Commands
# Check installation
pip show azcore
# Run tests
pytest tests/
# Enable debug mode
export DEBUG=1
python app.py
# Check logs
tail -f logs/app.log
# Monitor memory
watch -n 1 'ps aux | grep python'
# Check ports
netstat -tulpn | grep :8000
# Test API
curl -X POST http://localhost:8000/api/agent \
-H "Content-Type: application/json" \
-d '{"task": "test"}'
Environment Variables
# Debug
export DEBUG=1
export LOG_LEVEL=DEBUG
# API Keys
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
# Configuration
export APP_ENV=development
export CACHE_ENABLED=true
# Performance
export WORKERS=4
export MAX_REQUESTS=1000
Getting Help
Documentation
Community
- GitHub Issues: https://github.com/yourusername/azcore/issues
- Discord: https://discord.gg/azcore
- Stack Overflow: Tag
azcore
Support
- Email: support@azcore.com
- Enterprise Support: enterprise@azcore.com