• Getting Started
  • Core Concepts
  • Reinforcement Learning
  • Model Context Protocol (MCP)
  • Workflow Patterns
  • Advanced Agent Patterns
  • Guides

Guides

Troubleshooting Guide

Common issues and solutions for debugging Azcore applications and workflows.

Comprehensive troubleshooting guide for Az Core framework applications covering common issues, debugging techniques, and problem resolution strategies.

Overview

This guide helps you diagnose and resolve common issues in Az Core applications, from development to production environments.

Common Issues

Installation Issues

Issue: Dependencies fail to install

Symptoms:

ERROR: Could not find a version that satisfies the requirement

Solutions:

# Update pip
python -m pip install --upgrade pip

# Install with specific Python version
python3.11 -m pip install azcore

# Clear cache and reinstall
pip cache purge
pip install --no-cache-dir azcore

# Install from source
git clone https://github.com/yourusername/azcore.git
cd Arc
pip install -e .

Issue: Import errors

Symptoms:

ImportError: cannot import name 'ReactAgent' from 'azcore.agents'

Solutions:

# Verify installation
pip show azcore

# Check Python path
python -c "import sys; print('\n'.join(sys.path))"

# Reinstall in editable mode
pip uninstall azcore
pip install -e .

# Check for conflicting packages
pip list | grep azcore

Configuration Issues

Issue: API keys not found

Symptoms:

ValueError: Required environment variable OPENAI_API_KEY is not set

Solutions:

# Check environment variables
env | grep API_KEY

# Load from .env file
python -c "from dotenv import load_dotenv; load_dotenv(); import os; print(os.getenv('OPENAI_API_KEY'))"

# Set temporarily
export OPENAI_API_KEY="sk-..."

# Verify .env file exists and has correct format
cat .env
# Should have: OPENAI_API_KEY=sk-...

Issue: Invalid configuration

Symptoms:

ValidationError: Invalid configuration

Solutions:

# Validate configuration
from azcore.config import Settings

try:
    settings = Settings()
    print("Configuration valid")
except Exception as e:
    print(f"Configuration error: {e}")

# Check specific values
print(f"LLM Model: {settings.llm_model}")
print(f"Temperature: {settings.llm_temperature}")

LLM Issues

Issue: Rate limit exceeded

Symptoms:

RateLimitError: Rate limit reached for requests

Solutions:

# 1. Implement exponential backoff
from tenacity import retry, wait_exponential, stop_after_attempt

@retry(
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5)
)
def call_llm(prompt):
    return llm.invoke(prompt)

# 2. Use rate limiting
from ratelimit import limits, sleep_and_retry

@sleep_and_retry
@limits(calls=50, period=60)  # 50 calls per minute
def call_llm(prompt):
    return llm.invoke(prompt)

# 3. Implement request queuing
import asyncio
from asyncio import Queue, Semaphore

class RateLimitedLLM:
    def __init__(self, max_concurrent=10):
        self.semaphore = Semaphore(max_concurrent)
        self.queue = Queue()

    async def invoke(self, prompt):
        async with self.semaphore:
            await asyncio.sleep(0.1)  # Small delay
            return await llm.ainvoke(prompt)

Issue: Context length exceeded

Symptoms:

InvalidRequestError: This model's maximum context length is 4097 tokens

Solutions:

# 1. Truncate conversation history
def truncate_messages(messages, max_tokens=3000):
    """Keep only recent messages that fit in context."""
    total_tokens = 0
    truncated = []

    for msg in reversed(messages):
        msg_tokens = len(msg["content"]) // 4  # Rough estimate
        if total_tokens + msg_tokens > max_tokens:
            break
        truncated.insert(0, msg)
        total_tokens += msg_tokens

    return truncated

# 2. Summarize old messages
from langchain_openai import ChatOpenAI

def summarize_history(messages):
    """Summarize old messages to reduce tokens."""
    if len(messages) < 10:
        return messages

    # Summarize old messages
    old_messages = messages[:-5]
    recent_messages = messages[-5:]

    summary_llm = ChatOpenAI(model="gpt-4o-mini")
    summary = summary_llm.invoke(
        f"Summarize this conversation:\n{old_messages}"
    )

    return [
        {"role": "system", "content": f"Previous: {summary}"},
        *recent_messages
    ]

# 3. Use a larger context model
llm = ChatOpenAI(model="gpt-4-turbo-preview")  # 128k context

Issue: Timeout errors

Symptoms:

TimeoutError: Request timed out

Solutions:

# 1. Increase timeout
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4o-mini",
    request_timeout=120,  # 2 minutes
    max_retries=3
)

# 2. Use streaming for long responses
async def stream_response(prompt):
    """Stream response to avoid timeouts."""
    async for chunk in llm.astream(prompt):
        yield chunk

# 3. Split large requests
def chunk_prompt(prompt, max_length=2000):
    """Split large prompt into chunks."""
    words = prompt.split()
    chunks = []
    current_chunk = []

    for word in words:
        current_chunk.append(word)
        if len(' '.join(current_chunk)) > max_length:
            chunks.append(' '.join(current_chunk))
            current_chunk = []

    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks

Issue: Inconsistent responses

Symptoms:

  • Same input produces different outputs
  • Unpredictable behavior

Solutions:

# 1. Set temperature to 0 for deterministic outputs
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0  # Deterministic
)

# 2. Set seed for reproducibility (when supported)
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    model_kwargs={"seed": 42}
)

# 3. Use self-consistency pattern
from azcore.agents import SelfConsistencyAgent

agent = SelfConsistencyAgent(
    name="consistent",
    llm=llm,
    num_samples=5  # Multiple samples, vote on answer
)

Agent Issues

Issue: Agent not responding

Symptoms:

  • Agent returns empty response
  • No error message

Solutions:

# 1. Enable debug logging
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("azcore")
logger.setLevel(logging.DEBUG)

# 2. Check agent state
result = agent.invoke(state)
print(f"State: {state}")
print(f"Result: {result}")
print(f"Messages: {result.get('messages', [])}")

# 3. Verify LLM is working
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")
test_response = llm.invoke("test")
print(f"LLM Response: {test_response}")

# 4. Check for errors in result
if "error" in result:
    print(f"Error: {result['error']}")

Issue: Agent using wrong tools

Symptoms:

  • Agent doesn't use available tools
  • Agent uses wrong tool

Solutions:

# 1. Verify tools are properly configured
print(f"Agent tools: {[tool.name for tool in agent.tools]}")

# 2. Improve tool descriptions
from langchain.tools import Tool

better_tool = Tool(
    name="search",
    description="Search for current information on the internet. Use this when you need up-to-date facts, news, or information not in your training data.",
    func=search_function
)

# 3. Add tool usage examples to prompt
prompt = """You are a helpful assistant with access to tools.

Available tools:
- search: Use to find current information
- calculator: Use for mathematical calculations

Examples:
User: "What is the weather today?"
Assistant: [Uses search tool]

User: "What is 15% of 240?"
Assistant: [Uses calculator tool]

Now help the user:"""

# 4. Use tool-calling models
llm = ChatOpenAI(
    model="gpt-4o",  # Better at tool use
    temperature=0
)

Issue: Agent stuck in loop

Symptoms:

  • Agent repeats same action
  • Never completes task

Solutions:

# 1. Set max iterations
from azcore.agents import ReactAgent

agent = ReactAgent(
    name="agent",
    llm=llm,
    max_loops=5  # Limit iterations
)

# 2. Add loop detection
class LoopDetector:
    def __init__(self, max_repeats=3):
        self.action_history = []
        self.max_repeats = max_repeats

    def check_loop(self, action):
        self.action_history.append(action)

        # Check for repeated actions
        recent = self.action_history[-self.max_repeats:]
        if len(set(recent)) == 1:
            raise Exception("Agent stuck in loop")

# 3. Add state change detection
previous_state = None
for i in range(max_iterations):
    current_state = agent.invoke(state)

    if current_state == previous_state:
        print("No progress, stopping")
        break

    previous_state = current_state

Workflow Issues

Issue: Workflow fails partway through

Symptoms:

  • Some agents complete, others don't
  • Incomplete results

Solutions:

# 1. Add error handling per agent
from azcore.workflows import SequentialWorkflow

class RobustWorkflow(SequentialWorkflow):
    def run(self, task):
        results = []

        for agent in self.agents:
            try:
                result = agent.invoke({"messages": [{"role": "user", "content": task}]})
                results.append(result)
                task = result["messages"][-1]["content"]
            except Exception as e:
                logger.error(f"Agent {agent.name} failed: {e}")
                # Continue with error handling
                results.append({"error": str(e)})
                break

        return results

# 2. Implement checkpointing
class CheckpointedWorkflow:
    def __init__(self, agents, checkpoint_dir="checkpoints"):
        self.agents = agents
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)

    def run(self, task, workflow_id):
        # Load checkpoint if exists
        checkpoint = self._load_checkpoint(workflow_id)
        start_index = checkpoint.get("last_completed_agent", 0)

        results = checkpoint.get("results", [])

        for i, agent in enumerate(self.agents[start_index:], start=start_index):
            result = agent.invoke({"messages": [{"role": "user", "content": task}]})
            results.append(result)

            # Save checkpoint
            self._save_checkpoint(workflow_id, {
                "last_completed_agent": i + 1,
                "results": results
            })

            task = result["messages"][-1]["content"]

        return results

# 3. Add retry logic
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def run_workflow_with_retry(workflow, task):
    return workflow.run(task)

Performance Issues

Issue: Slow response times

Symptoms:

  • Requests take > 10 seconds
  • High latency

Solutions:

# 1. Profile performance
import time

def profile_execution(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        print(f"{func.__name__} took {duration:.2f}s")
        return result
    return wrapper

@profile_execution
def slow_function():
    # Find bottleneck
    pass

# 2. Enable caching
from azcore.utils.cached_llm import CachedLLM

llm = ChatOpenAI(model="gpt-4o-mini")
cached_llm = CachedLLM(llm, cache_type="exact")

# 3. Use async for parallel operations
import asyncio

async def parallel_agents(tasks):
    """Run multiple agents in parallel."""
    results = await asyncio.gather(*[
        agent.ainvoke(task) for task in tasks
    ])
    return results

# 4. Check for network issues
import httpx

async def test_llm_latency():
    """Measure LLM API latency."""
    start = time.time()
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.openai.com")
    latency = time.time() - start
    print(f"API latency: {latency:.3f}s")

Issue: High memory usage

Symptoms:

MemoryError: Out of memory

Solutions:

# 1. Monitor memory usage
import psutil

def check_memory():
    """Check current memory usage."""
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Memory usage: {memory_mb:.2f} MB")

# 2. Clear conversation history
def clear_old_messages(messages, keep_last=10):
    """Keep only recent messages."""
    return messages[-keep_last:]

# 3. Use generators for large datasets
def process_large_file(filepath):
    """Process file line by line instead of loading all."""
    with open(filepath, 'r') as f:
        for line in f:
            yield process_line(line)

# 4. Clear caches periodically
from azcore.utils.caching import clear_all_caches

# Clear caches every hour
import schedule

schedule.every(1).hour.do(clear_all_caches)

# 5. Use __slots__ for classes
class Message:
    __slots__ = ['role', 'content']  # Reduces memory

    def __init__(self, role, content):
        self.role = role
        self.content = content

Memory Issues

Issue: Memory leaks

Symptoms:

  • Memory usage grows over time
  • Application crashes after running

Solutions:

# 1. Profile memory usage
from memory_profiler import profile

@profile
def potentially_leaking_function():
    # Find memory leaks
    pass

# 2. Use weak references for caches
import weakref

class WeakCache:
    def __init__(self):
        self.cache = weakref.WeakValueDictionary()

# 3. Explicitly delete large objects
def process_large_data():
    data = load_large_dataset()
    result = process(data)
    del data  # Free memory immediately
    return result

# 4. Use context managers
class ResourceManager:
    def __enter__(self):
        self.resource = acquire_resource()
        return self.resource

    def __exit__(self, exc_type, exc_val, exc_tb):
        release_resource(self.resource)
        del self.resource

with ResourceManager() as resource:
    use(resource)
# Resource automatically cleaned up

Network Issues

Issue: Connection errors

Symptoms:

ConnectionError: Failed to establish connection

Solutions:

# 1. Check network connectivity
import requests

def test_connectivity():
    """Test connection to LLM API."""
    try:
        response = requests.get(
            "https://api.openai.com",
            timeout=5
        )
        print(f"Connection OK: {response.status_code}")
    except Exception as e:
        print(f"Connection failed: {e}")

# 2. Configure retries
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

session = requests.Session()
retry = Retry(
    total=5,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

# 3. Use proxies if needed
import os

proxies = {
    'http': os.getenv('HTTP_PROXY'),
    'https': os.getenv('HTTPS_PROXY')
}

llm = ChatOpenAI(
    model="gpt-4o-mini",
    model_kwargs={"proxies": proxies}
)

# 4. Check firewall settings
# Ensure ports 443 (HTTPS) and 80 (HTTP) are open

Debugging Techniques

Enable Debug Logging

# Set up comprehensive logging
import logging

# Configure root logger
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('debug.log'),
        logging.StreamHandler()
    ]
)

# Set specific loggers
logging.getLogger('azcore').setLevel(logging.DEBUG)
logging.getLogger('langchain').setLevel(logging.INFO)
logging.getLogger('httpx').setLevel(logging.WARNING)

Interactive Debugging

# Using pdb
import pdb

def problematic_function():
    result = some_operation()
    pdb.set_trace()  # Breakpoint here
    return result

# Using ipdb (better interface)
import ipdb

def debug_agent(agent, task):
    result = agent.invoke({"messages": [{"role": "user", "content": task}]})
    ipdb.set_trace()  # Inspect result
    return result

# Using breakpoint() (Python 3.7+)
def debug_function():
    result = operation()
    breakpoint()  # Drops into debugger
    return result

Trace Execution

# Trace function calls
import sys

def trace_calls(frame, event, arg):
    if event == 'call':
        filename = frame.f_code.co_filename
        funcname = frame.f_code.co_name
        print(f"Calling {funcname} in {filename}")
    return trace_calls

sys.settrace(trace_calls)

# Run code to trace
agent.invoke(task)

sys.settrace(None)  # Disable tracing

Log Analysis

Analyzing Error Patterns

# log_analyzer.py
import re
from collections import Counter

def analyze_logs(log_file):
    """Analyze log file for patterns."""
    errors = []
    warnings = []

    with open(log_file, 'r') as f:
        for line in f:
            if 'ERROR' in line:
                errors.append(line)
            elif 'WARNING' in line:
                warnings.append(line)

    # Count error types
    error_types = Counter()
    for error in errors:
        # Extract error type
        match = re.search(r'(\w+Error):', error)
        if match:
            error_types[match.group(1)] += 1

    print(f"Total errors: {len(errors)}")
    print(f"Total warnings: {len(warnings)}")
    print(f"\nTop errors:")
    for error_type, count in error_types.most_common(5):
        print(f"  {error_type}: {count}")

# Usage
analyze_logs('app.log')

Recovery Procedures

Graceful Shutdown

# Handle graceful shutdown
import signal
import sys

def signal_handler(sig, frame):
    """Handle shutdown signal."""
    print("Shutting down gracefully...")

    # 1. Stop accepting new requests
    stop_accepting_requests()

    # 2. Wait for in-flight requests to complete
    wait_for_completion(timeout=30)

    # 3. Save state
    save_application_state()

    # 4. Close connections
    close_all_connections()

    print("Shutdown complete")
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

State Recovery

# Implement state recovery
import pickle
from pathlib import Path

class StateManager:
    """Manage application state for recovery."""

    def __init__(self, state_dir="state"):
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(exist_ok=True)

    def save_state(self, state_id, state):
        """Save state to disk."""
        state_file = self.state_dir / f"{state_id}.pkl"
        with open(state_file, 'wb') as f:
            pickle.dump(state, f)

    def load_state(self, state_id):
        """Load state from disk."""
        state_file = self.state_dir / f"{state_id}.pkl"
        if not state_file.exists():
            return None

        with open(state_file, 'rb') as f:
            return pickle.load(f)

    def recover(self, state_id):
        """Recover from saved state."""
        state = self.load_state(state_id)
        if state:
            print(f"Recovering state: {state_id}")
            return state
        else:
            print(f"No saved state found: {state_id}")
            return None


# Usage
state_manager = StateManager()

# Save state periodically
state_manager.save_state("workflow_123", current_state)

# Recover after crash
recovered_state = state_manager.recover("workflow_123")
if recovered_state:
    continue_from_state(recovered_state)

Quick Reference

Common Commands

# Check installation
pip show azcore

# Run tests
pytest tests/

# Enable debug mode
export DEBUG=1
python app.py

# Check logs
tail -f logs/app.log

# Monitor memory
watch -n 1 'ps aux | grep python'

# Check ports
netstat -tulpn | grep :8000

# Test API
curl -X POST http://localhost:8000/api/agent \
  -H "Content-Type: application/json" \
  -d '{"task": "test"}'

Environment Variables

# Debug
export DEBUG=1
export LOG_LEVEL=DEBUG

# API Keys
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."

# Configuration
export APP_ENV=development
export CACHE_ENABLED=true

# Performance
export WORKERS=4
export MAX_REQUESTS=1000

Getting Help

Documentation

Community

Support

Edit this page on GitHub
AzrienLabs logo

AzrienLabs

Craftedby Team AzrienLabs