Overview
LangGraph enables sophisticated multi-agent architectures where specialized agents (Research, Writer, Reviewer) work together as nodes in a graph. With Archil, you can:- Persist agent memory across restarts and deployments
- Share state between multiple agent instances running on different servers
- Store checkpoints for complex workflows using LangChain’s checkpointer libraries
- Scale horizontally while maintaining consistent agent memory
Create an Archil disk
First, follow the Archil Getting Started Guide to create an Archil disk for storing your agent memory and checkpoints.Copy
Ask AI
# Mount your Archil disk with shared access
sudo mkdir -p /mnt/archil
sudo archil mount <disk-name> /mnt/archil --region aws-us-east-1 --auth-token TOKEN --shared
# Create directories for agent data
sudo mkdir -p /mnt/archil/langchain/{checkpoints,memory,agents,models}
sudo chown -R $USER:$USER /mnt/archil/langchain
--shared
flag enables multiple servers to access the same agent memory simultaneously, crucial for distributed multi-agent systems.
Install Dependencies
Set up your Python environment with LangChain, LangGraph, and database drivers:Copy
Ask AI
cd /mnt/archil/langchain
python -m venv langchain-env
source langchain-env/bin/activate
# Core LangChain and LangGraph packages
pip install langchain langgraph langchain-openai
# SQLite for lightweight checkpointing (built into Python)
# No additional database drivers needed - using file-based storage
# Additional utilities
pip install python-dotenv pydantic
# Set your OpenAI API key
export OPENAI_API_KEY="your-api-key-here"
Multi-Agent Architecture Setup
LangGraph uses a “divide-and-conquer” approach where specialized agents handle different aspects of complex tasks. Here’s how to set up a multi-agent system with Research, Writer, and Reviewer agents:Copy
Ask AI
# /mnt/archil/langchain/multi_agent_system.py
import os
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END
from datetime import datetime
# Define the shared state structure
class AgentState(TypedDict):
messages: List[str]
research_data: str
draft_content: str
final_content: str
feedback: str
iteration: int
# Initialize the language model
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
def research_agent(state: AgentState) -> AgentState:
"""Research agent gathers information on the topic"""
prompt = f"""You are a research agent. Based on the user's request: {state['messages'][-1]}
Conduct thorough research and provide key findings, statistics, and relevant information.
Focus on accuracy and comprehensiveness."""
response = llm.invoke([SystemMessage(content=prompt)])
return {
**state,
"research_data": response.content,
"messages": state["messages"] + [f"Research completed: {response.content[:100]}..."]
}
def writer_agent(state: AgentState) -> AgentState:
"""Writer agent creates content based on research"""
prompt = f"""You are a writing agent. Using this research data:
{state['research_data']}
Create well-structured, engaging content that addresses the original request:
{state['messages'][0]}
Make it informative and well-organized."""
response = llm.invoke([SystemMessage(content=prompt)])
return {
**state,
"draft_content": response.content,
"messages": state["messages"] + [f"Draft completed: {len(response.content)} characters"]
}
def reviewer_agent(state: AgentState) -> AgentState:
"""Reviewer agent evaluates and provides feedback"""
prompt = f"""You are a review agent. Evaluate this content:
{state['draft_content']}
Original request: {state['messages'][0]}
Research data: {state['research_data'][:200]}...
Provide specific feedback on:
1. Accuracy and completeness
2. Structure and clarity
3. Suggestions for improvement
If the content is good enough, say "APPROVED". Otherwise, provide detailed feedback."""
response = llm.invoke([SystemMessage(content=prompt)])
# Determine if we need another iteration
is_approved = "APPROVED" in response.content.upper()
return {
**state,
"feedback": response.content,
"final_content": state["draft_content"] if is_approved else "",
"iteration": state["iteration"] + 1,
"messages": state["messages"] + [f"Review completed: {'Approved' if is_approved else 'Needs revision'}"]
}
def should_continue(state: AgentState) -> str:
"""Determine the next step in the workflow"""
if state["final_content"]:
return "end"
elif state["iteration"] >= 3: # Max 3 iterations
return "end"
else:
return "revise"
# Create the workflow graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("research", research_agent)
workflow.add_node("write", writer_agent)
workflow.add_node("review", reviewer_agent)
# Define the flow
workflow.set_entry_point("research")
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_conditional_edges(
"review",
should_continue,
{
"revise": "write",
"end": END
}
)
# Compile the workflow
app = workflow.compile()
Supervisor Pattern Implementation
The supervisor pattern coordinates multiple agents and manages the overall workflow. Here’s how to implement it with LangGraph:Copy
Ask AI
# /mnt/archil/langchain/supervisor_pattern.py
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# Define available agents
members = ["Researcher", "Writer", "Reviewer"]
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
)
options = ["FINISH"] + members
# Create supervisor prompt
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
("system", "Given the conversation above, who should act next? Or should we FINISH? Select one of: {options}"),
]).partial(options=str(options), members=", ".join(members))
llm = ChatOpenAI(model="gpt-4")
def supervisor_agent(state):
"""Supervisor decides which agent should act next"""
supervisor_chain = prompt | llm
response = supervisor_chain.invoke(state)
return {"next": response.content}
# Create supervisor workflow
supervisor_graph = StateGraph(AgentState)
supervisor_graph.add_node("supervisor", supervisor_agent)
supervisor_graph.add_node("research", research_agent)
supervisor_graph.add_node("write", writer_agent)
supervisor_graph.add_node("review", reviewer_agent)
# Define conditional routing
def route_to_agent(state) -> Literal["research", "write", "review", "__end__"]:
"""Route to the appropriate agent based on supervisor decision"""
next_agent = state.get("next", "").lower()
if next_agent == "finish":
return "__end__"
elif "researcher" in next_agent:
return "research"
elif "writer" in next_agent:
return "write"
elif "reviewer" in next_agent:
return "review"
else:
return "__end__"
supervisor_graph.set_entry_point("supervisor")
supervisor_graph.add_conditional_edges("supervisor", route_to_agent)
# Add edges back to supervisor after each agent
for member in ["research", "write", "review"]:
supervisor_graph.add_edge(member, "supervisor")
# Compile the supervisor workflow
supervisor_app = supervisor_graph.compile()
Checkpointer Integration
LangGraph supports persistent state management through checkpointers. With Archil, we can use simple file-based checkpointing or SQLite for lightweight, persistent storage:File-Based Checkpointer
Simple JSON file-based checkpointing for lightweight persistence:Copy
Ask AI
# /mnt/archil/langchain/file_checkpointer.py
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
class FileCheckpointer:
"""Simple file-based checkpointer using Archil persistent storage"""
def __init__(self, checkpoint_dir="/mnt/archil/langchain/checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
def save_checkpoint(self, thread_id: str, state: Dict[str, Any]):
"""Save agent state to a JSON file"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
checkpoint_data = {
"state": state,
"timestamp": datetime.utcnow().isoformat(),
"thread_id": thread_id
}
# Atomic write using temporary file
temp_file = checkpoint_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
# Atomic rename
temp_file.rename(checkpoint_file)
def load_checkpoint(self, thread_id: str) -> Optional[Dict[str, Any]]:
"""Load agent state from JSON file"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
if checkpoint_file.exists():
try:
with open(checkpoint_file, 'r') as f:
data = json.load(f)
return data.get("state")
except (json.JSONDecodeError, IOError):
return None
return None
def list_checkpoints(self) -> list:
"""List all available checkpoints"""
return [f.stem for f in self.checkpoint_dir.glob("*.json")]
def delete_checkpoint(self, thread_id: str):
"""Delete a checkpoint"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
checkpoint_file.unlink(missing_ok=True)
# Usage with LangGraph
file_checkpointer = FileCheckpointer()
# Custom workflow compilation with file checkpointer
class FileCheckpointWorkflow:
def __init__(self, workflow, checkpointer):
self.workflow = workflow
self.checkpointer = checkpointer
def invoke(self, initial_state: dict, config: dict):
thread_id = config.get("configurable", {}).get("thread_id")
if thread_id:
# Load existing state if available
saved_state = self.checkpointer.load_checkpoint(thread_id)
if saved_state:
# Merge with initial state
initial_state.update(saved_state)
# Run the workflow
result = self.workflow.invoke(initial_state)
if thread_id:
# Save the final state
self.checkpointer.save_checkpoint(thread_id, result)
return result
# Use compiled workflow with file checkpointer
checkpoint_app = FileCheckpointWorkflow(app, file_checkpointer)
# Run with persistent state
config = {"configurable": {"thread_id": "agent_session_1"}}
result = checkpoint_app.invoke({
"messages": ["Write a comprehensive guide about renewable energy"],
"research_data": "",
"draft_content": "",
"final_content": "",
"feedback": "",
"iteration": 0
}, config)
Stateless Agent Memory Persistence
Create a comprehensive memory management system for stateless agents:Copy
Ask AI
# /mnt/archil/langchain/agent_memory.py
import json
import os
from datetime import datetime
from typing import Dict, List, Any
from pathlib import Path
class ArchiledAgentMemory:
"""Persistent memory system for stateless agents using Archil storage"""
def __init__(self, base_path="/mnt/archil/langchain/memory"):
self.base_path = Path(base_path)
def save_agent_memory(self, agent_id: str, memory_data: Dict[str, Any]):
"""Save agent memory to persistent storage"""
agent_dir = self.base_path / agent_id
memory_file = agent_dir / "memory.json"
# Load existing memory and merge
existing_memory = self.load_agent_memory(agent_id)
existing_memory.update(memory_data)
existing_memory["last_updated"] = datetime.utcnow().isoformat()
with open(memory_file, 'w') as f:
json.dump(existing_memory, f, indent=2)
def load_agent_memory(self, agent_id: str) -> Dict[str, Any]:
"""Load agent memory from persistent storage"""
memory_file = self.base_path / agent_id / "memory.json"
if memory_file.exists():
with open(memory_file, 'r') as f:
return json.load(f)
return {}
def save_conversation_history(self, session_id: str, messages: List[Dict]):
"""Save conversation history"""
session_dir = self.base_path / "conversations" / session_id
history_file = session_dir / "history.json"
with open(history_file, 'w') as f:
json.dump({
"messages": messages,
"created_at": datetime.utcnow().isoformat()
}, f, indent=2)
def load_conversation_history(self, session_id: str) -> List[Dict]:
"""Load conversation history"""
history_file = self.base_path / "conversations" / session_id / "history.json"
if history_file.exists():
with open(history_file, 'r') as f:
data = json.load(f)
return data.get("messages", [])
return []
# Usage example
memory_manager = ArchiledAgentMemory()
# Save agent-specific memory
memory_manager.save_agent_memory("research_agent_1", {
"expertise_areas": ["renewable_energy", "climate_science"],
"recent_queries": ["solar panel efficiency", "wind turbine technology"],
"learned_facts": {
"solar_efficiency_2024": "Modern panels achieve 22-26% efficiency",
"wind_capacity_growth": "Global capacity grew 77% in 2023"
}
})
# Load memory for stateless agent
agent_memory = memory_manager.load_agent_memory("research_agent_1")
Complete Multi-Agent Example
Here’s a complete example that ties everything together:Copy
Ask AI
# /mnt/archil/langchain/complete_example.py
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize components
memory_manager = ArchiledAgentMemory()
sqlite_checkpointer = SQLiteCheckpointer()
# Compile the complete workflow
complete_app = SQLiteCheckpointWorkflow(supervisor_app, sqlite_checkpointer)
def run_multi_agent_task(user_request: str, session_id: str = None):
"""Run a complete multi-agent task with persistent memory"""
if not session_id:
session_id = f"session_{int(datetime.utcnow().timestamp())}"
# Load conversation history
conversation_history = memory_manager.load_conversation_history(session_id)
# Initial state
initial_state = {
"messages": [user_request],
"research_data": "",
"draft_content": "",
"final_content": "",
"feedback": "",
"iteration": 0,
"next": ""
}
# Configuration for persistent checkpoints
config = {"configurable": {"thread_id": session_id}}
try:
# Run the workflow
result = complete_app.invoke(initial_state, config)
# Save conversation history
conversation_history.append({
"user_request": user_request,
"result": result,
"timestamp": datetime.utcnow().isoformat()
})
memory_manager.save_conversation_history(session_id, conversation_history)
# Update agent memories based on the interaction
memory_manager.save_agent_memory("research_agent", {
"last_research_topic": user_request,
"research_quality_score": len(result.get("research_data", "")) / 100
})
return result
except Exception as e:
print(f"Error in multi-agent task: {e}")
return {"error": str(e)}
# Example usage
if __name__ == "__main__":
result = run_multi_agent_task(
"Create a comprehensive analysis of renewable energy trends in 2024",
session_id="renewable_energy_analysis_1"
)
print("Final Content:")
print(result.get("final_content", "No content generated"))
Advanced Features and Best Practices
Scaling Across Multiple Servers
When running agents across multiple servers, use Archil’s shared disk capabilities:Copy
Ask AI
# /mnt/archil/langchain/distributed_agents.py
import fcntl
import time
from pathlib import Path
class DistributedAgentCoordinator:
"""Coordinate agents across multiple servers using file-based locking"""
def __init__(self, coordination_path="/mnt/archil/langchain/coordination"):
self.coordination_path = Path(coordination_path)
def acquire_agent_lock(self, agent_type: str, timeout: int = 30):
"""Acquire exclusive lock for an agent type"""
lock_file = self.coordination_path / f"{agent_type}.lock"
start_time = time.time()
while time.time() - start_time < timeout:
try:
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
return fd
except FileExistsError:
time.sleep(0.1)
raise TimeoutError(f"Could not acquire lock for {agent_type}")
def release_agent_lock(self, fd: int, agent_type: str):
"""Release agent lock"""
os.close(fd)
lock_file = self.coordination_path / f"{agent_type}.lock"
lock_file.unlink(missing_ok=True)
# Usage in distributed setup
coordinator = DistributedAgentCoordinator()
def distributed_research_agent(state: AgentState) -> AgentState:
"""Research agent with distributed coordination"""
lock_fd = coordinator.acquire_agent_lock("research")
try:
# Load any shared research cache
cache_file = Path("/mnt/archil/langchain/cache/research_cache.json")
if cache_file.exists():
with open(cache_file, 'r') as f:
research_cache = json.load(f)
else:
research_cache = {}
# Perform research (implementation from earlier)
result = research_agent(state)
# Update shared cache
research_cache[state['messages'][-1][:50]] = result['research_data']
with open(cache_file, 'w') as f:
json.dump(research_cache, f)
return result
finally:
coordinator.release_agent_lock(lock_fd, "research")
Monitoring and Observability
Track agent performance and system health:Copy
Ask AI
# /mnt/archil/langchain/monitoring.py
import json
import time
from datetime import datetime
from pathlib import Path
class AgentMonitor:
"""Monitor agent performance and system health"""
def __init__(self, metrics_path="/mnt/archil/langchain/metrics"):
self.metrics_path = Path(metrics_path)
def log_agent_execution(self, agent_name: str, execution_time: float,
success: bool, input_size: int, output_size: int):
"""Log agent execution metrics"""
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"agent_name": agent_name,
"execution_time": execution_time,
"success": success,
"input_size": input_size,
"output_size": output_size
}
# Append to daily metrics file
date_str = datetime.utcnow().strftime("%Y-%m-%d")
metrics_file = self.metrics_path / f"agent_metrics_{date_str}.jsonl"
with open(metrics_file, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def get_agent_performance_summary(self, days: int = 7) -> dict:
"""Get performance summary for the last N days"""
summary = {
"total_executions": 0,
"success_rate": 0.0,
"average_execution_time": 0.0,
"agent_breakdown": {}
}
# Process metrics files for the specified period
# Implementation would read and aggregate metrics
return summary
# Decorator for monitoring agent functions
def monitor_agent(agent_name: str):
def decorator(func):
def wrapper(state):
monitor = AgentMonitor()
start_time = time.time()
try:
input_size = len(str(state))
result = func(state)
output_size = len(str(result))
execution_time = time.time() - start_time
monitor.log_agent_execution(
agent_name, execution_time, True, input_size, output_size
)
return result
except Exception as e:
execution_time = time.time() - start_time
monitor.log_agent_execution(
agent_name, execution_time, False, 0, 0
)
raise
return wrapper
return decorator
# Apply monitoring to agents
@monitor_agent("research")
def monitored_research_agent(state: AgentState) -> AgentState:
return research_agent(state)
Configuration Management
Store configuration in Archil for consistent settings across deployments:Copy
Ask AI
# /mnt/archil/langchain/config.py
import json
from pathlib import Path
from typing import Dict, Any
class ArchiledConfig:
"""Configuration management using Archil persistent storage"""
def __init__(self, config_path="/mnt/archil/langchain/config"):
self.config_path = Path(config_path)
self.config_file = self.config_path / "agent_config.json"
# Initialize with defaults if config doesn't exist
if not self.config_file.exists():
self._create_default_config()
def _create_default_config(self):
"""Create default configuration"""
default_config = {
"agents": {
"research": {
"model": "gpt-4",
"temperature": 0.3,
"max_tokens": 2000,
"timeout": 30
},
"writer": {
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 3000,
"timeout": 45
},
"reviewer": {
"model": "gpt-4",
"temperature": 0.2,
"max_tokens": 1500,
"timeout": 30
}
},
"workflow": {
"max_iterations": 3,
"checkpoint_interval": 1,
"enable_monitoring": True
},
"storage": {
"checkpointer_type": "postgres",
"memory_retention_days": 30,
"cache_size_mb": 100
}
}
with open(self.config_file, 'w') as f:
json.dump(default_config, f, indent=2)
def get_config(self, section: str = None) -> Dict[str, Any]:
"""Get configuration section or entire config"""
with open(self.config_file, 'r') as f:
config = json.load(f)
return config.get(section) if section else config
def update_config(self, section: str, updates: Dict[str, Any]):
"""Update configuration section"""
with open(self.config_file, 'r') as f:
config = json.load(f)
if section not in config:
config[section] = {}
config[section].update(updates)
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=2)
# Usage
config_manager = ArchiledConfig()
agent_config = config_manager.get_config("agents")
workflow_config = config_manager.get_config("workflow")
Testing Your Setup
Validate your LangChain/LangGraph integration with Archil:Copy
Ask AI
# Test the complete setup
cd /mnt/archil/langchain
source langchain-env/bin/activate
# Run a simple test
python -c "
from complete_example import run_multi_agent_task
result = run_multi_agent_task('Write a brief summary of solar energy benefits')
print('Success!' if result.get('final_content') else 'Failed!')
"
# Check persistent storage
ls -la /mnt/archil/langchain/
ls -la /mnt/archil/langchain/memory/
ls -la /mnt/archil/langchain/checkpoints/
Best Practices
- Memory Management: Regularly clean up old conversation histories and agent memories to prevent storage bloat
- Error Handling: Implement robust error handling and recovery mechanisms for distributed agent failures
- Security: Store API keys and sensitive configuration in environment variables, not in Archil storage
- Performance: Use appropriate checkpointer backends based on your performance requirements (file-based for simplicity, SQLite for structured queries and history)
- Monitoring: Implement comprehensive logging and monitoring to track agent performance and system health
- Scaling: Use Archil’s shared disk capabilities to enable horizontal scaling of your agent systems