Build multi-agent AI systems with LangChain and LangGraph using Archil for persistent memory and stateless agent architecture
# Mount your Archil disk with shared access
sudo mkdir -p /mnt/archil
sudo archil mount dsk-DISKID /mnt/archil --region aws-us-east-1 --auth-token TOKEN --shared
# Create directories for agent data
sudo mkdir -p /mnt/archil/langchain/{checkpoints,memory,agents,models}
sudo chown -R $USER:$USER /mnt/archil/langchain
--shared
flag enables multiple servers to access the same agent memory simultaneously, crucial for distributed multi-agent systems.
cd /mnt/archil/langchain
python -m venv langchain-env
source langchain-env/bin/activate
# Core LangChain and LangGraph packages
pip install langchain langgraph langchain-openai
# SQLite for lightweight checkpointing (built into Python)
# No additional database drivers needed - using file-based storage
# Additional utilities
pip install python-dotenv pydantic
# Set your OpenAI API key
export OPENAI_API_KEY="your-api-key-here"
# /mnt/archil/langchain/multi_agent_system.py
import os
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END
from datetime import datetime
# Define the shared state structure
class AgentState(TypedDict):
messages: List[str]
research_data: str
draft_content: str
final_content: str
feedback: str
iteration: int
# Initialize the language model
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
def research_agent(state: AgentState) -> AgentState:
"""Research agent gathers information on the topic"""
prompt = f"""You are a research agent. Based on the user's request: {state['messages'][-1]}
Conduct thorough research and provide key findings, statistics, and relevant information.
Focus on accuracy and comprehensiveness."""
response = llm.invoke([SystemMessage(content=prompt)])
return {
**state,
"research_data": response.content,
"messages": state["messages"] + [f"Research completed: {response.content[:100]}..."]
}
def writer_agent(state: AgentState) -> AgentState:
"""Writer agent creates content based on research"""
prompt = f"""You are a writing agent. Using this research data:
{state['research_data']}
Create well-structured, engaging content that addresses the original request:
{state['messages'][0]}
Make it informative and well-organized."""
response = llm.invoke([SystemMessage(content=prompt)])
return {
**state,
"draft_content": response.content,
"messages": state["messages"] + [f"Draft completed: {len(response.content)} characters"]
}
def reviewer_agent(state: AgentState) -> AgentState:
"""Reviewer agent evaluates and provides feedback"""
prompt = f"""You are a review agent. Evaluate this content:
{state['draft_content']}
Original request: {state['messages'][0]}
Research data: {state['research_data'][:200]}...
Provide specific feedback on:
1. Accuracy and completeness
2. Structure and clarity
3. Suggestions for improvement
If the content is good enough, say "APPROVED". Otherwise, provide detailed feedback."""
response = llm.invoke([SystemMessage(content=prompt)])
# Determine if we need another iteration
is_approved = "APPROVED" in response.content.upper()
return {
**state,
"feedback": response.content,
"final_content": state["draft_content"] if is_approved else "",
"iteration": state["iteration"] + 1,
"messages": state["messages"] + [f"Review completed: {'Approved' if is_approved else 'Needs revision'}"]
}
def should_continue(state: AgentState) -> str:
"""Determine the next step in the workflow"""
if state["final_content"]:
return "end"
elif state["iteration"] >= 3: # Max 3 iterations
return "end"
else:
return "revise"
# Create the workflow graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("research", research_agent)
workflow.add_node("write", writer_agent)
workflow.add_node("review", reviewer_agent)
# Define the flow
workflow.set_entry_point("research")
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_conditional_edges(
"review",
should_continue,
{
"revise": "write",
"end": END
}
)
# Compile the workflow
app = workflow.compile()
# /mnt/archil/langchain/supervisor_pattern.py
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# Define available agents
members = ["Researcher", "Writer", "Reviewer"]
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
)
options = ["FINISH"] + members
# Create supervisor prompt
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
("system", "Given the conversation above, who should act next? Or should we FINISH? Select one of: {options}"),
]).partial(options=str(options), members=", ".join(members))
llm = ChatOpenAI(model="gpt-4")
def supervisor_agent(state):
"""Supervisor decides which agent should act next"""
supervisor_chain = prompt | llm
response = supervisor_chain.invoke(state)
return {"next": response.content}
# Create supervisor workflow
supervisor_graph = StateGraph(AgentState)
supervisor_graph.add_node("supervisor", supervisor_agent)
supervisor_graph.add_node("research", research_agent)
supervisor_graph.add_node("write", writer_agent)
supervisor_graph.add_node("review", reviewer_agent)
# Define conditional routing
def route_to_agent(state) -> Literal["research", "write", "review", "__end__"]:
"""Route to the appropriate agent based on supervisor decision"""
next_agent = state.get("next", "").lower()
if next_agent == "finish":
return "__end__"
elif "researcher" in next_agent:
return "research"
elif "writer" in next_agent:
return "write"
elif "reviewer" in next_agent:
return "review"
else:
return "__end__"
supervisor_graph.set_entry_point("supervisor")
supervisor_graph.add_conditional_edges("supervisor", route_to_agent)
# Add edges back to supervisor after each agent
for member in ["research", "write", "review"]:
supervisor_graph.add_edge(member, "supervisor")
# Compile the supervisor workflow
supervisor_app = supervisor_graph.compile()
# /mnt/archil/langchain/file_checkpointer.py
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
class FileCheckpointer:
"""Simple file-based checkpointer using Archil persistent storage"""
def __init__(self, checkpoint_dir="/mnt/archil/langchain/checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
def save_checkpoint(self, thread_id: str, state: Dict[str, Any]):
"""Save agent state to a JSON file"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
checkpoint_data = {
"state": state,
"timestamp": datetime.utcnow().isoformat(),
"thread_id": thread_id
}
# Atomic write using temporary file
temp_file = checkpoint_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
# Atomic rename
temp_file.rename(checkpoint_file)
def load_checkpoint(self, thread_id: str) -> Optional[Dict[str, Any]]:
"""Load agent state from JSON file"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
if checkpoint_file.exists():
try:
with open(checkpoint_file, 'r') as f:
data = json.load(f)
return data.get("state")
except (json.JSONDecodeError, IOError):
return None
return None
def list_checkpoints(self) -> list:
"""List all available checkpoints"""
return [f.stem for f in self.checkpoint_dir.glob("*.json")]
def delete_checkpoint(self, thread_id: str):
"""Delete a checkpoint"""
checkpoint_file = self.checkpoint_dir / f"{thread_id}.json"
checkpoint_file.unlink(missing_ok=True)
# Usage with LangGraph
file_checkpointer = FileCheckpointer()
# Custom workflow compilation with file checkpointer
class FileCheckpointWorkflow:
def __init__(self, workflow, checkpointer):
self.workflow = workflow
self.checkpointer = checkpointer
def invoke(self, initial_state: dict, config: dict):
thread_id = config.get("configurable", {}).get("thread_id")
if thread_id:
# Load existing state if available
saved_state = self.checkpointer.load_checkpoint(thread_id)
if saved_state:
# Merge with initial state
initial_state.update(saved_state)
# Run the workflow
result = self.workflow.invoke(initial_state)
if thread_id:
# Save the final state
self.checkpointer.save_checkpoint(thread_id, result)
return result
# Use compiled workflow with file checkpointer
checkpoint_app = FileCheckpointWorkflow(app, file_checkpointer)
# Run with persistent state
config = {"configurable": {"thread_id": "agent_session_1"}}
result = checkpoint_app.invoke({
"messages": ["Write a comprehensive guide about renewable energy"],
"research_data": "",
"draft_content": "",
"final_content": "",
"feedback": "",
"iteration": 0
}, config)
# /mnt/archil/langchain/agent_memory.py
import json
import os
from datetime import datetime
from typing import Dict, List, Any
from pathlib import Path
class ArchiledAgentMemory:
"""Persistent memory system for stateless agents using Archil storage"""
def __init__(self, base_path="/mnt/archil/langchain/memory"):
self.base_path = Path(base_path)
def save_agent_memory(self, agent_id: str, memory_data: Dict[str, Any]):
"""Save agent memory to persistent storage"""
agent_dir = self.base_path / agent_id
memory_file = agent_dir / "memory.json"
# Load existing memory and merge
existing_memory = self.load_agent_memory(agent_id)
existing_memory.update(memory_data)
existing_memory["last_updated"] = datetime.utcnow().isoformat()
with open(memory_file, 'w') as f:
json.dump(existing_memory, f, indent=2)
def load_agent_memory(self, agent_id: str) -> Dict[str, Any]:
"""Load agent memory from persistent storage"""
memory_file = self.base_path / agent_id / "memory.json"
if memory_file.exists():
with open(memory_file, 'r') as f:
return json.load(f)
return {}
def save_conversation_history(self, session_id: str, messages: List[Dict]):
"""Save conversation history"""
session_dir = self.base_path / "conversations" / session_id
history_file = session_dir / "history.json"
with open(history_file, 'w') as f:
json.dump({
"messages": messages,
"created_at": datetime.utcnow().isoformat()
}, f, indent=2)
def load_conversation_history(self, session_id: str) -> List[Dict]:
"""Load conversation history"""
history_file = self.base_path / "conversations" / session_id / "history.json"
if history_file.exists():
with open(history_file, 'r') as f:
data = json.load(f)
return data.get("messages", [])
return []
# Usage example
memory_manager = ArchiledAgentMemory()
# Save agent-specific memory
memory_manager.save_agent_memory("research_agent_1", {
"expertise_areas": ["renewable_energy", "climate_science"],
"recent_queries": ["solar panel efficiency", "wind turbine technology"],
"learned_facts": {
"solar_efficiency_2024": "Modern panels achieve 22-26% efficiency",
"wind_capacity_growth": "Global capacity grew 77% in 2023"
}
})
# Load memory for stateless agent
agent_memory = memory_manager.load_agent_memory("research_agent_1")
# /mnt/archil/langchain/complete_example.py
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize components
memory_manager = ArchiledAgentMemory()
sqlite_checkpointer = SQLiteCheckpointer()
# Compile the complete workflow
complete_app = SQLiteCheckpointWorkflow(supervisor_app, sqlite_checkpointer)
def run_multi_agent_task(user_request: str, session_id: str = None):
"""Run a complete multi-agent task with persistent memory"""
if not session_id:
session_id = f"session_{int(datetime.utcnow().timestamp())}"
# Load conversation history
conversation_history = memory_manager.load_conversation_history(session_id)
# Initial state
initial_state = {
"messages": [user_request],
"research_data": "",
"draft_content": "",
"final_content": "",
"feedback": "",
"iteration": 0,
"next": ""
}
# Configuration for persistent checkpoints
config = {"configurable": {"thread_id": session_id}}
try:
# Run the workflow
result = complete_app.invoke(initial_state, config)
# Save conversation history
conversation_history.append({
"user_request": user_request,
"result": result,
"timestamp": datetime.utcnow().isoformat()
})
memory_manager.save_conversation_history(session_id, conversation_history)
# Update agent memories based on the interaction
memory_manager.save_agent_memory("research_agent", {
"last_research_topic": user_request,
"research_quality_score": len(result.get("research_data", "")) / 100
})
return result
except Exception as e:
print(f"Error in multi-agent task: {e}")
return {"error": str(e)}
# Example usage
if __name__ == "__main__":
result = run_multi_agent_task(
"Create a comprehensive analysis of renewable energy trends in 2024",
session_id="renewable_energy_analysis_1"
)
print("Final Content:")
print(result.get("final_content", "No content generated"))
# /mnt/archil/langchain/distributed_agents.py
import fcntl
import time
from pathlib import Path
class DistributedAgentCoordinator:
"""Coordinate agents across multiple servers using file-based locking"""
def __init__(self, coordination_path="/mnt/archil/langchain/coordination"):
self.coordination_path = Path(coordination_path)
def acquire_agent_lock(self, agent_type: str, timeout: int = 30):
"""Acquire exclusive lock for an agent type"""
lock_file = self.coordination_path / f"{agent_type}.lock"
start_time = time.time()
while time.time() - start_time < timeout:
try:
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
return fd
except FileExistsError:
time.sleep(0.1)
raise TimeoutError(f"Could not acquire lock for {agent_type}")
def release_agent_lock(self, fd: int, agent_type: str):
"""Release agent lock"""
os.close(fd)
lock_file = self.coordination_path / f"{agent_type}.lock"
lock_file.unlink(missing_ok=True)
# Usage in distributed setup
coordinator = DistributedAgentCoordinator()
def distributed_research_agent(state: AgentState) -> AgentState:
"""Research agent with distributed coordination"""
lock_fd = coordinator.acquire_agent_lock("research")
try:
# Load any shared research cache
cache_file = Path("/mnt/archil/langchain/cache/research_cache.json")
if cache_file.exists():
with open(cache_file, 'r') as f:
research_cache = json.load(f)
else:
research_cache = {}
# Perform research (implementation from earlier)
result = research_agent(state)
# Update shared cache
research_cache[state['messages'][-1][:50]] = result['research_data']
with open(cache_file, 'w') as f:
json.dump(research_cache, f)
return result
finally:
coordinator.release_agent_lock(lock_fd, "research")
# /mnt/archil/langchain/monitoring.py
import json
import time
from datetime import datetime
from pathlib import Path
class AgentMonitor:
"""Monitor agent performance and system health"""
def __init__(self, metrics_path="/mnt/archil/langchain/metrics"):
self.metrics_path = Path(metrics_path)
def log_agent_execution(self, agent_name: str, execution_time: float,
success: bool, input_size: int, output_size: int):
"""Log agent execution metrics"""
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"agent_name": agent_name,
"execution_time": execution_time,
"success": success,
"input_size": input_size,
"output_size": output_size
}
# Append to daily metrics file
date_str = datetime.utcnow().strftime("%Y-%m-%d")
metrics_file = self.metrics_path / f"agent_metrics_{date_str}.jsonl"
with open(metrics_file, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def get_agent_performance_summary(self, days: int = 7) -> dict:
"""Get performance summary for the last N days"""
summary = {
"total_executions": 0,
"success_rate": 0.0,
"average_execution_time": 0.0,
"agent_breakdown": {}
}
# Process metrics files for the specified period
# Implementation would read and aggregate metrics
return summary
# Decorator for monitoring agent functions
def monitor_agent(agent_name: str):
def decorator(func):
def wrapper(state):
monitor = AgentMonitor()
start_time = time.time()
try:
input_size = len(str(state))
result = func(state)
output_size = len(str(result))
execution_time = time.time() - start_time
monitor.log_agent_execution(
agent_name, execution_time, True, input_size, output_size
)
return result
except Exception as e:
execution_time = time.time() - start_time
monitor.log_agent_execution(
agent_name, execution_time, False, 0, 0
)
raise
return wrapper
return decorator
# Apply monitoring to agents
@monitor_agent("research")
def monitored_research_agent(state: AgentState) -> AgentState:
return research_agent(state)
# /mnt/archil/langchain/config.py
import json
from pathlib import Path
from typing import Dict, Any
class ArchiledConfig:
"""Configuration management using Archil persistent storage"""
def __init__(self, config_path="/mnt/archil/langchain/config"):
self.config_path = Path(config_path)
self.config_file = self.config_path / "agent_config.json"
# Initialize with defaults if config doesn't exist
if not self.config_file.exists():
self._create_default_config()
def _create_default_config(self):
"""Create default configuration"""
default_config = {
"agents": {
"research": {
"model": "gpt-4",
"temperature": 0.3,
"max_tokens": 2000,
"timeout": 30
},
"writer": {
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 3000,
"timeout": 45
},
"reviewer": {
"model": "gpt-4",
"temperature": 0.2,
"max_tokens": 1500,
"timeout": 30
}
},
"workflow": {
"max_iterations": 3,
"checkpoint_interval": 1,
"enable_monitoring": True
},
"storage": {
"checkpointer_type": "postgres",
"memory_retention_days": 30,
"cache_size_mb": 100
}
}
with open(self.config_file, 'w') as f:
json.dump(default_config, f, indent=2)
def get_config(self, section: str = None) -> Dict[str, Any]:
"""Get configuration section or entire config"""
with open(self.config_file, 'r') as f:
config = json.load(f)
return config.get(section) if section else config
def update_config(self, section: str, updates: Dict[str, Any]):
"""Update configuration section"""
with open(self.config_file, 'r') as f:
config = json.load(f)
if section not in config:
config[section] = {}
config[section].update(updates)
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=2)
# Usage
config_manager = ArchiledConfig()
agent_config = config_manager.get_config("agents")
workflow_config = config_manager.get_config("workflow")
# Test the complete setup
cd /mnt/archil/langchain
source langchain-env/bin/activate
# Run a simple test
python -c "
from complete_example import run_multi_agent_task
result = run_multi_agent_task('Write a brief summary of solar energy benefits')
print('Success!' if result.get('final_content') else 'Failed!')
"
# Check persistent storage
ls -la /mnt/archil/langchain/
ls -la /mnt/archil/langchain/memory/
ls -la /mnt/archil/langchain/checkpoints/
Was this page helpful?