> ## Documentation Index
> Fetch the complete documentation index at: https://docs.archil.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Multi-Agent Systems

> Build multi-agent AI systems with LangChain and LangGraph using Archil for persistent memory and stateless agent architecture

[LangChain](https://python.langchain.com/) is a framework for developing applications powered by language models, while [LangGraph](https://langchain-ai.github.io/langgraph/) extends LangChain with graph-based orchestration for complex multi-agent workflows. This guide demonstrates how to use Archil disks to provide persistent memory storage for stateless agents, enabling scalable multi-agent systems with shared state across multiple servers.

## Overview

LangGraph enables sophisticated multi-agent architectures where specialized agents (Research, Writer, Reviewer) work together as nodes in a graph. With Archil, you can:

* **Persist agent memory** across restarts and deployments
* **Share state** between multiple agent instances running on different servers
* **Store checkpoints** for complex workflows using LangChain's checkpointer libraries
* **Scale horizontally** while maintaining consistent agent memory

## Create an Archil disk

First, follow the Archil [Getting Started Guide](/getting-started/quickstart) to create an Archil disk for storing your agent memory and checkpoints.

```bash theme={null}
# Mount your Archil disk with shared access
sudo mkdir -p /mnt/archil
export ARCHIL_MOUNT_TOKEN="<token>"
sudo --preserve-env=ARCHIL_MOUNT_TOKEN archil mount <disk-name> /mnt/archil --region aws-us-east-1 --shared

# Create directories for agent data
sudo mkdir -p /mnt/archil/langchain/{checkpoints,memory,agents,models}
sudo chown -R $USER:$USER /mnt/archil/langchain
```

The `--shared` flag enables multiple servers to access the same agent memory simultaneously, crucial for distributed multi-agent systems.

## Install Dependencies

Set up your Python environment with LangChain, LangGraph, and database drivers:

```bash theme={null}
cd /mnt/archil/langchain
python -m venv langchain-env
source langchain-env/bin/activate

# Core LangChain and LangGraph packages
pip install langchain langgraph langchain-openai

# SQLite for lightweight checkpointing (built into Python)
# No additional database drivers needed - using file-based storage

# Additional utilities
pip install python-dotenv pydantic

# Set your OpenAI API key
export OPENAI_API_KEY="your-api-key-here"
```

## Multi-Agent Architecture Setup

LangGraph uses a "divide-and-conquer" approach where specialized agents handle different aspects of complex tasks. Here's how to set up a multi-agent system with Research, Writer, and Reviewer agents:

```python expandable theme={null}
# /mnt/archil/langchain/multi_agent_system.py
import os
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END
from datetime import datetime

# Define the shared state structure
class AgentState(TypedDict):
    messages: List[str]
    research_data: str
    draft_content: str
    final_content: str
    feedback: str
    iteration: int

# Initialize the language model
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

def research_agent(state: AgentState) -> AgentState:
    """Research agent gathers information on the topic"""
    prompt = f"""You are a research agent. Based on the user's request: {state['messages'][-1]}
    
    Conduct thorough research and provide key findings, statistics, and relevant information.
    Focus on accuracy and comprehensiveness."""
    
    response = llm.invoke([SystemMessage(content=prompt)])
    
    return {
        **state,
        "research_data": response.content,
        "messages": state["messages"] + [f"Research completed: {response.content[:100]}..."]
    }

def writer_agent(state: AgentState) -> AgentState:
    """Writer agent creates content based on research"""
    prompt = f"""You are a writing agent. Using this research data:
    {state['research_data']}
    
    Create well-structured, engaging content that addresses the original request:
    {state['messages'][0]}
    
    Make it informative and well-organized."""
    
    response = llm.invoke([SystemMessage(content=prompt)])
    
    return {
        **state,
        "draft_content": response.content,
        "messages": state["messages"] + [f"Draft completed: {len(response.content)} characters"]
    }

def reviewer_agent(state: AgentState) -> AgentState:
    """Reviewer agent evaluates and provides feedback"""
    prompt = f"""You are a review agent. Evaluate this content:
    {state['draft_content']}
    
    Original request: {state['messages'][0]}
    Research data: {state['research_data'][:200]}...
    
    Provide specific feedback on:
    1. Accuracy and completeness
    2. Structure and clarity
    3. Suggestions for improvement
    
    If the content is good enough, say "APPROVED". Otherwise, provide detailed feedback."""
    
    response = llm.invoke([SystemMessage(content=prompt)])
    
    # Determine if we need another iteration
    is_approved = "APPROVED" in response.content.upper()
    
    return {
        **state,
        "feedback": response.content,
        "final_content": state["draft_content"] if is_approved else "",
        "iteration": state["iteration"] + 1,
        "messages": state["messages"] + [f"Review completed: {'Approved' if is_approved else 'Needs revision'}"]
    }

def should_continue(state: AgentState) -> str:
    """Determine the next step in the workflow"""
    if state["final_content"]:
        return "end"
    elif state["iteration"] >= 3:  # Max 3 iterations
        return "end"
    else:
        return "revise"

# Create the workflow graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("research", research_agent)
workflow.add_node("write", writer_agent)
workflow.add_node("review", reviewer_agent)

# Define the flow
workflow.set_entry_point("research")
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_conditional_edges(
    "review",
    should_continue,
    {
        "revise": "write",
        "end": END
    }
)

# Compile the workflow
app = workflow.compile()
```

## Supervisor Pattern Implementation

The supervisor pattern coordinates multiple agents and manages the overall workflow. Here's how to implement it with LangGraph:

```python expandable theme={null}
# /mnt/archil/langchain/supervisor_pattern.py
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# Define available agents
members = ["Researcher", "Writer", "Reviewer"]
system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    " following workers: {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)

options = ["FINISH"] + members

# Create supervisor prompt
prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    MessagesPlaceholder(variable_name="messages"),
    ("system", "Given the conversation above, who should act next? Or should we FINISH? Select one of: {options}"),
]).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-4")

def supervisor_agent(state):
    """Supervisor decides which agent should act next"""
    supervisor_chain = prompt | llm
    response = supervisor_chain.invoke(state)
    return {"next": response.content}

# Create supervisor workflow
supervisor_graph = StateGraph(AgentState)
supervisor_graph.add_node("supervisor", supervisor_agent)
supervisor_graph.add_node("research", research_agent)
supervisor_graph.add_node("write", writer_agent)
supervisor_graph.add_node("review", reviewer_agent)

# Define conditional routing
def route_to_agent(state) -> Literal["research", "write", "review", "__end__"]:
    """Route to the appropriate agent based on supervisor decision"""
    next_agent = state.get("next", "").lower()
    if next_agent == "finish":
        return "__end__"
    elif "researcher" in next_agent:
        return "research"
    elif "writer" in next_agent:
        return "write"
    elif "reviewer" in next_agent:
        return "review"
    else:
        return "__end__"

supervisor_graph.set_entry_point("supervisor")
supervisor_graph.add_conditional_edges("supervisor", route_to_agent)

# Add edges back to supervisor after each agent
for member in ["research", "write", "review"]:
    supervisor_graph.add_edge(member, "supervisor")

# Compile the supervisor workflow
supervisor_app = supervisor_graph.compile()
```

## Checkpointer Integration

LangGraph supports persistent state management through checkpointers. With Archil, we can use simple file-based checkpointing or SQLite for lightweight, persistent storage:

<Tabs>
  <Tab title="File-Based">
    ### File-Based Checkpointer

    Simple JSON file-based checkpointing for lightweight persistence:

    ```python expandable theme={null}
    # /mnt/archil/langchain/file_checkpointer.py
    import json
    import os
    from datetime import datetime
    from pathlib import Path
    from typing import Dict, Any, Optional

    class FileCheckpointer:
        """Simple file-based checkpointer using Archil persistent storage"""
        
        def __init__(self, checkpoint_dir="/mnt/archil/langchain/checkpoints"):
            self.checkpoint_dir = Path(checkpoint_dir)
        
        def save_checkpoint(self, thread_id: str, state: Dict[str, Any]):
            """Save agent state to a JSON file"""
            checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
            
            checkpoint_data = {
                "state": state,
                "timestamp": datetime.utcnow().isoformat(),
                "thread_id": thread_id
            }
            
            # Atomic write using temporary file
            temp_file = checkpoint_file.with_suffix('.tmp')
            with open(temp_file, 'w') as f:
                json.dump(checkpoint_data, f, indent=2)
            
            # Atomic rename
            temp_file.rename(checkpoint_file)
        
        def load_checkpoint(self, thread_id: str) -> Optional[Dict[str, Any]]:
            """Load agent state from JSON file"""
            checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
            
            if checkpoint_file.exists():
                try:
                    with open(checkpoint_file, 'r') as f:
                        data = json.load(f)
                    return data.get("state")
                except (json.JSONDecodeError, IOError):
                    return None
            return None
        
        def list_checkpoints(self) -> list:
            """List all available checkpoints"""
            return [f.stem for f in self.checkpoint_dir.glob("*.json")]
        
        def delete_checkpoint(self, thread_id: str):
            """Delete a checkpoint"""
            checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
            checkpoint_file.unlink(missing_ok=True)

    # Usage with LangGraph
    file_checkpointer = FileCheckpointer()

    # Custom workflow compilation with file checkpointer
    class FileCheckpointWorkflow:
        def __init__(self, workflow, checkpointer):
            self.workflow = workflow
            self.checkpointer = checkpointer
        
        def invoke(self, initial_state: dict, config: dict):
            thread_id = config.get("configurable", {}).get("thread_id")
            
            if thread_id:
                # Load existing state if available
                saved_state = self.checkpointer.load_checkpoint(thread_id)
                if saved_state:
                    # Merge with initial state
                    initial_state.update(saved_state)
            
            # Run the workflow
            result = self.workflow.invoke(initial_state)
            
            if thread_id:
                # Save the final state
                self.checkpointer.save_checkpoint(thread_id, result)
            
            return result

    # Use compiled workflow with file checkpointer
    checkpoint_app = FileCheckpointWorkflow(app, file_checkpointer)

    # Run with persistent state
    config = {"configurable": {"thread_id": "agent_session_1"}}
    result = checkpoint_app.invoke({
        "messages": ["Write a comprehensive guide about renewable energy"],
        "research_data": "",
        "draft_content": "",
        "final_content": "",
        "feedback": "",
        "iteration": 0
    }, config)
    ```
  </Tab>

  <Tab title="SQLite">
    ### SQLite Checkpointer

    SQLite-based checkpointing with history tracking and structured queries:

    ```python expandable theme={null}
    # /mnt/archil/langchain/sqlite_checkpointer.py
    import sqlite3
    import json
    from datetime import datetime
    from pathlib import Path
    from typing import Dict, Any, Optional

    class SQLiteCheckpointer:
        """SQLite-based checkpointer using Archil persistent storage"""
        
        def __init__(self, db_path="/mnt/archil/langchain/checkpoints/agent_checkpoints.db"):
            self.db_path = Path(db_path)
            self._init_db()
        
        def _init_db(self):
            """Initialize the SQLite database and tables"""
            with sqlite3.connect(self.db_path) as conn:
                conn.execute("""
                    CREATE TABLE IF NOT EXISTS checkpoints (
                        thread_id TEXT PRIMARY KEY,
                        state TEXT NOT NULL,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                """)
                
                conn.execute("""
                    CREATE TABLE IF NOT EXISTS checkpoint_history (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        thread_id TEXT NOT NULL,
                        state TEXT NOT NULL,
                        checkpoint_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        FOREIGN KEY (thread_id) REFERENCES checkpoints (thread_id)
                    )
                """)
                conn.commit()
        
        def save_checkpoint(self, thread_id: str, state: Dict[str, Any], keep_history: bool = True):
            """Save agent state to SQLite database"""
            state_json = json.dumps(state)
            
            with sqlite3.connect(self.db_path) as conn:
                # Save current state
                conn.execute("""
                    INSERT OR REPLACE INTO checkpoints (thread_id, state, updated_at)
                    VALUES (?, ?, ?)
                """, (thread_id, state_json, datetime.utcnow().isoformat()))
                
                # Optionally save to history
                if keep_history:
                    conn.execute("""
                        INSERT INTO checkpoint_history (thread_id, state)
                        VALUES (?, ?)
                    """, (thread_id, state_json))
                
                conn.commit()
        
        def load_checkpoint(self, thread_id: str) -> Optional[Dict[str, Any]]:
            """Load agent state from SQLite database"""
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute(
                    "SELECT state FROM checkpoints WHERE thread_id = ?",
                    (thread_id,)
                )
                row = cursor.fetchone()
                
                if row:
                    return json.loads(row[0])
                return None
        
        def get_checkpoint_history(self, thread_id: str, limit: int = 10) -> list:
            """Get checkpoint history for a thread"""
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute("""
                    SELECT state, checkpoint_time FROM checkpoint_history 
                    WHERE thread_id = ? 
                    ORDER BY checkpoint_time DESC 
                    LIMIT ?
                """, (thread_id, limit))
                
                return [
                    {"state": json.loads(row[0]), "timestamp": row[1]}
                    for row in cursor.fetchall()
                ]
        
        def list_threads(self) -> list:
            """List all thread IDs with checkpoints"""
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute("SELECT thread_id, updated_at FROM checkpoints ORDER BY updated_at DESC")
                return [{"thread_id": row[0], "last_updated": row[1]} for row in cursor.fetchall()]
        
        def delete_checkpoint(self, thread_id: str):
            """Delete a checkpoint and its history"""
            with sqlite3.connect(self.db_path) as conn:
                conn.execute("DELETE FROM checkpoint_history WHERE thread_id = ?", (thread_id,))
                conn.execute("DELETE FROM checkpoints WHERE thread_id = ?", (thread_id,))
                conn.commit()

    # Usage with LangGraph
    sqlite_checkpointer = SQLiteCheckpointer()

    # Enhanced workflow with SQLite checkpointer
    class SQLiteCheckpointWorkflow:
        def __init__(self, workflow, checkpointer):
            self.workflow = workflow
            self.checkpointer = checkpointer
        
        def invoke(self, initial_state: dict, config: dict):
            thread_id = config.get("configurable", {}).get("thread_id")
            
            if thread_id:
                # Load existing state if available
                saved_state = self.checkpointer.load_checkpoint(thread_id)
                if saved_state:
                    initial_state.update(saved_state)
            
            # Run the workflow
            result = self.workflow.invoke(initial_state)
            
            if thread_id:
                # Save checkpoint with history
                self.checkpointer.save_checkpoint(thread_id, result, keep_history=True)
            
            return result
        
        def get_thread_history(self, thread_id: str):
            """Get the execution history for a thread"""
            return self.checkpointer.get_checkpoint_history(thread_id)

    # Use compiled workflow with SQLite checkpointer
    checkpoint_app = SQLiteCheckpointWorkflow(app, sqlite_checkpointer)

    # Run with persistent state and history tracking
    config = {"configurable": {"thread_id": "renewable_energy_research_1"}}
    result = checkpoint_app.invoke({
        "messages": ["Write a comprehensive guide about renewable energy"],
        "research_data": "",
        "draft_content": "",
        "final_content": "",
        "feedback": "",
        "iteration": 0
    }, config)

    # View execution history
    history = app.get_thread_history("renewable_energy_research_1")
    print(f"Found {len(history)} checkpoints in history")
    ```
  </Tab>
</Tabs>

## Stateless Agent Memory Persistence

Create a comprehensive memory management system for stateless agents:

```python expandable theme={null}
# /mnt/archil/langchain/agent_memory.py
import json
import os
from datetime import datetime
from typing import Dict, List, Any
from pathlib import Path

class ArchiledAgentMemory:
    """Persistent memory system for stateless agents using Archil storage"""
    
    def __init__(self, base_path="/mnt/archil/langchain/memory"):
        self.base_path = Path(base_path)
    
    def save_agent_memory(self, agent_id: str, memory_data: Dict[str, Any]):
        """Save agent memory to persistent storage"""
        agent_dir = self.base_path / agent_id
        
        memory_file = agent_dir / "memory.json"
        
        # Load existing memory and merge
        existing_memory = self.load_agent_memory(agent_id)
        existing_memory.update(memory_data)
        existing_memory["last_updated"] = datetime.utcnow().isoformat()
        
        with open(memory_file, 'w') as f:
            json.dump(existing_memory, f, indent=2)
    
    def load_agent_memory(self, agent_id: str) -> Dict[str, Any]:
        """Load agent memory from persistent storage"""
        memory_file = self.base_path / agent_id / "memory.json"
        
        if memory_file.exists():
            with open(memory_file, 'r') as f:
                return json.load(f)
        return {}
    
    def save_conversation_history(self, session_id: str, messages: List[Dict]):
        """Save conversation history"""
        session_dir = self.base_path / "conversations" / session_id
        
        history_file = session_dir / "history.json"
        with open(history_file, 'w') as f:
            json.dump({
                "messages": messages,
                "created_at": datetime.utcnow().isoformat()
            }, f, indent=2)
    
    def load_conversation_history(self, session_id: str) -> List[Dict]:
        """Load conversation history"""
        history_file = self.base_path / "conversations" / session_id / "history.json"
        
        if history_file.exists():
            with open(history_file, 'r') as f:
                data = json.load(f)
                return data.get("messages", [])
        return []

# Usage example
memory_manager = ArchiledAgentMemory()

# Save agent-specific memory
memory_manager.save_agent_memory("research_agent_1", {
    "expertise_areas": ["renewable_energy", "climate_science"],
    "recent_queries": ["solar panel efficiency", "wind turbine technology"],
    "learned_facts": {
        "solar_efficiency_2024": "Modern panels achieve 22-26% efficiency",
        "wind_capacity_growth": "Global capacity grew 77% in 2023"
    }
})

# Load memory for stateless agent
agent_memory = memory_manager.load_agent_memory("research_agent_1")
```

## Complete Multi-Agent Example

Here's a complete example that ties everything together:

```python expandable theme={null}
# /mnt/archil/langchain/complete_example.py
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize components
memory_manager = ArchiledAgentMemory()
sqlite_checkpointer = SQLiteCheckpointer()

# Compile the complete workflow
complete_app = SQLiteCheckpointWorkflow(supervisor_app, sqlite_checkpointer)

def run_multi_agent_task(user_request: str, session_id: str = None):
    """Run a complete multi-agent task with persistent memory"""
    
    if not session_id:
        session_id = f"session_{int(datetime.utcnow().timestamp())}"
    
    # Load conversation history
    conversation_history = memory_manager.load_conversation_history(session_id)
    
    # Initial state
    initial_state = {
        "messages": [user_request],
        "research_data": "",
        "draft_content": "",
        "final_content": "",
        "feedback": "",
        "iteration": 0,
        "next": ""
    }
    
    # Configuration for persistent checkpoints
    config = {"configurable": {"thread_id": session_id}}
    
    try:
        # Run the workflow
        result = complete_app.invoke(initial_state, config)
        
        # Save conversation history
        conversation_history.append({
            "user_request": user_request,
            "result": result,
            "timestamp": datetime.utcnow().isoformat()
        })
        memory_manager.save_conversation_history(session_id, conversation_history)
        
        # Update agent memories based on the interaction
        memory_manager.save_agent_memory("research_agent", {
            "last_research_topic": user_request,
            "research_quality_score": len(result.get("research_data", "")) / 100
        })
        
        return result
        
    except Exception as e:
        print(f"Error in multi-agent task: {e}")
        return {"error": str(e)}

# Example usage
if __name__ == "__main__":
    result = run_multi_agent_task(
        "Create a comprehensive analysis of renewable energy trends in 2024",
        session_id="renewable_energy_analysis_1"
    )
    
    print("Final Content:")
    print(result.get("final_content", "No content generated"))
```

## Advanced Features and Best Practices

### Scaling Across Multiple Servers

When running agents across multiple servers, use Archil's shared disk capabilities:

```python expandable theme={null}
# /mnt/archil/langchain/distributed_agents.py
import fcntl
import time
from pathlib import Path

class DistributedAgentCoordinator:
    """Coordinate agents across multiple servers using file-based locking"""
    
    def __init__(self, coordination_path="/mnt/archil/langchain/coordination"):
        self.coordination_path = Path(coordination_path)
    
    def acquire_agent_lock(self, agent_type: str, timeout: int = 30):
        """Acquire exclusive lock for an agent type"""
        lock_file = self.coordination_path / f"{agent_type}.lock"
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
                return fd
            except FileExistsError:
                time.sleep(0.1)
        
        raise TimeoutError(f"Could not acquire lock for {agent_type}")
    
    def release_agent_lock(self, fd: int, agent_type: str):
        """Release agent lock"""
        os.close(fd)
        lock_file = self.coordination_path / f"{agent_type}.lock"
        lock_file.unlink(missing_ok=True)

# Usage in distributed setup
coordinator = DistributedAgentCoordinator()

def distributed_research_agent(state: AgentState) -> AgentState:
    """Research agent with distributed coordination"""
    lock_fd = coordinator.acquire_agent_lock("research")
    
    try:
        # Load any shared research cache
        cache_file = Path("/mnt/archil/langchain/cache/research_cache.json")
        if cache_file.exists():
            with open(cache_file, 'r') as f:
                research_cache = json.load(f)
        else:
            research_cache = {}
        
        # Perform research (implementation from earlier)
        result = research_agent(state)
        
        # Update shared cache
        research_cache[state['messages'][-1][:50]] = result['research_data']
        with open(cache_file, 'w') as f:
            json.dump(research_cache, f)
        
        return result
        
    finally:
        coordinator.release_agent_lock(lock_fd, "research")
```

### Monitoring and Observability

Track agent performance and system health:

```python expandable theme={null}
# /mnt/archil/langchain/monitoring.py
import json
import time
from datetime import datetime
from pathlib import Path

class AgentMonitor:
    """Monitor agent performance and system health"""
    
    def __init__(self, metrics_path="/mnt/archil/langchain/metrics"):
        self.metrics_path = Path(metrics_path)
    
    def log_agent_execution(self, agent_name: str, execution_time: float, 
                          success: bool, input_size: int, output_size: int):
        """Log agent execution metrics"""
        
        metrics = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent_name": agent_name,
            "execution_time": execution_time,
            "success": success,
            "input_size": input_size,
            "output_size": output_size
        }
        
        # Append to daily metrics file
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        metrics_file = self.metrics_path / f"agent_metrics_{date_str}.jsonl"
        
        with open(metrics_file, 'a') as f:
            f.write(json.dumps(metrics) + '\n')
    
    def get_agent_performance_summary(self, days: int = 7) -> dict:
        """Get performance summary for the last N days"""
        summary = {
            "total_executions": 0,
            "success_rate": 0.0,
            "average_execution_time": 0.0,
            "agent_breakdown": {}
        }
        
        # Process metrics files for the specified period
        # Implementation would read and aggregate metrics
        
        return summary

# Decorator for monitoring agent functions
def monitor_agent(agent_name: str):
    def decorator(func):
        def wrapper(state):
            monitor = AgentMonitor()
            start_time = time.time()
            
            try:
                input_size = len(str(state))
                result = func(state)
                output_size = len(str(result))
                execution_time = time.time() - start_time
                
                monitor.log_agent_execution(
                    agent_name, execution_time, True, input_size, output_size
                )
                
                return result
                
            except Exception as e:
                execution_time = time.time() - start_time
                monitor.log_agent_execution(
                    agent_name, execution_time, False, 0, 0
                )
                raise
        
        return wrapper
    return decorator

# Apply monitoring to agents
@monitor_agent("research")
def monitored_research_agent(state: AgentState) -> AgentState:
    return research_agent(state)
```

## Configuration Management

Store configuration in Archil for consistent settings across deployments:

```python expandable theme={null}
# /mnt/archil/langchain/config.py
import json
from pathlib import Path
from typing import Dict, Any

class ArchiledConfig:
    """Configuration management using Archil persistent storage"""
    
    def __init__(self, config_path="/mnt/archil/langchain/config"):
        self.config_path = Path(config_path)
        self.config_file = self.config_path / "agent_config.json"
        
        # Initialize with defaults if config doesn't exist
        if not self.config_file.exists():
            self._create_default_config()
    
    def _create_default_config(self):
        """Create default configuration"""
        default_config = {
            "agents": {
                "research": {
                    "model": "gpt-4",
                    "temperature": 0.3,
                    "max_tokens": 2000,
                    "timeout": 30
                },
                "writer": {
                    "model": "gpt-4",
                    "temperature": 0.7,
                    "max_tokens": 3000,
                    "timeout": 45
                },
                "reviewer": {
                    "model": "gpt-4",
                    "temperature": 0.2,
                    "max_tokens": 1500,
                    "timeout": 30
                }
            },
            "workflow": {
                "max_iterations": 3,
                "checkpoint_interval": 1,
                "enable_monitoring": True
            },
            "storage": {
                "checkpointer_type": "postgres",
                "memory_retention_days": 30,
                "cache_size_mb": 100
            }
        }
        
        with open(self.config_file, 'w') as f:
            json.dump(default_config, f, indent=2)
    
    def get_config(self, section: str = None) -> Dict[str, Any]:
        """Get configuration section or entire config"""
        with open(self.config_file, 'r') as f:
            config = json.load(f)
        
        return config.get(section) if section else config
    
    def update_config(self, section: str, updates: Dict[str, Any]):
        """Update configuration section"""
        with open(self.config_file, 'r') as f:
            config = json.load(f)
        
        if section not in config:
            config[section] = {}
        
        config[section].update(updates)
        
        with open(self.config_file, 'w') as f:
            json.dump(config, f, indent=2)

# Usage
config_manager = ArchiledConfig()
agent_config = config_manager.get_config("agents")
workflow_config = config_manager.get_config("workflow")
```

## Testing Your Setup

Validate your LangChain/LangGraph integration with Archil:

```bash theme={null}
# Test the complete setup
cd /mnt/archil/langchain
source langchain-env/bin/activate

# Run a simple test
python -c "
from complete_example import run_multi_agent_task
result = run_multi_agent_task('Write a brief summary of solar energy benefits')
print('Success!' if result.get('final_content') else 'Failed!')
"

# Check persistent storage
ls -la /mnt/archil/langchain/
ls -la /mnt/archil/langchain/memory/
ls -la /mnt/archil/langchain/checkpoints/
```

## Best Practices

1. **Memory Management**: Regularly clean up old conversation histories and agent memories to prevent storage bloat
2. **Error Handling**: Implement robust error handling and recovery mechanisms for distributed agent failures
3. **Security**: Store API keys and sensitive configuration in environment variables, not in Archil storage
4. **Performance**: Use appropriate checkpointer backends based on your performance requirements (file-based for simplicity, SQLite for structured queries and history)
5. **Monitoring**: Implement comprehensive logging and monitoring to track agent performance and system health
6. **Scaling**: Use Archil's shared disk capabilities to enable horizontal scaling of your agent systems

With this setup, you have a robust, scalable multi-agent system that leverages Archil's persistent storage capabilities to maintain state across deployments and servers, enabling truly stateless agent architectures with persistent memory.
