The Episodic Memory system automatically stores and retrieves agent interaction history, enabling agents to learn from past experiences and demonstrate graduation readiness.
cd backend
alembic upgrade headThis creates the following tables:
episodes- Main episode storageepisode_segments- Episode segmentsepisode_access_logs- Audit trail
from core.models import Episode
from core.database import get_db_session
with get_db_session() as db:
episodes = db.query(Episode).count()
print(f"Episodes in database: {episodes}")Episodes are automatically created after agent sessions complete:
from core.episode_integration import trigger_episode_creation
# Non-blocking trigger
trigger_episode_creation(
session_id="chat_123",
agent_id="agent_456",
title="Optional custom title"
)from core.episode_retrieval_service import EpisodeRetrievalService
from core.database import get_db_session
with get_db_session() as db:
service = EpisodeRetrievalService(db)
# Get episodes from last 7 days
result = await service.retrieve_temporal(
agent_id="agent_456",
time_range="7d"
)
for episode in result["episodes"]:
print(f"{episode['title']}: {episode['started_at']}")# Find episodes similar to a query
result = await service.retrieve_semantic(
agent_id="agent_456",
query="HST tax calculations for machinery"
)
for episode in result["episodes"]:
print(f"{episode['title']} (similarity: {episode.get('score', 0)})")# Get complete episode with all segments
result = await service.retrieve_sequential(
episode_id="episode_123",
agent_id="agent_456"
)
print(f"Episode: {result['episode']['title']}")
print(f"Summary: {result['episode']['summary']}")
for segment in result['segments']:
print(f"\n[{segment['segment_type']}] {segment['content']}")# Find episodes relevant to current task
result = await service.retrieve_contextual(
agent_id="agent_456",
current_task="Calculate HST for invoice #12345",
limit=5
)
for episode in result["episodes"]:
print(f"{episode['title']} (relevance: {episode.get('relevance_score', 0)})")Episodes now include canvas presentations and user feedback for enriched agent reasoning.
When retrieving full episodes, canvas and feedback context are included by default:
# Retrieve episode with canvas and feedback context
result = await service.retrieve_sequential(
episode_id="ep_123",
agent_id="agent_456",
include_canvas=True, # Default: True
include_feedback=True # Default: True
)
# Access enriched context
print(f"Canvas presentations: {len(result['canvas_context'])}")
print(f"User feedback: {len(result['feedback_context'])}")
for canvas in result['canvas_context']:
print(f" - {canvas['canvas_type']}: {canvas['action']}")
for feedback in result['feedback_context']:
print(f" - {feedback['feedback_type']}: {feedback.get('rating', 'N/A')}")Find episodes where you presented specific canvas types:
# Find episodes with spreadsheet presentations
result = await service.retrieve_by_canvas_type(
agent_id="agent_456",
canvas_type="sheets",
action="present",
time_range="30d"
)
for episode in result["episodes"]:
print(f"{episode['title']}: {episode['canvas_action_count']} canvas actions")Episodes with positive feedback are automatically boosted in relevance:
# Contextual retrieval applies feedback weighting
result = await service.retrieve_contextual(
agent_id="agent_456",
current_task="Analyze sales data"
)
# Episodes with positive feedback get +0.2 boost
# Episodes with negative feedback get -0.3 penalty
for episode in result["episodes"]:
score = episode.get('relevance_score', 0)
feedback = episode.get('aggregate_feedback_score', 0)
print(f"{episode['title']}: relevance={score:.2f}, feedback={feedback:.2f}")Agents automatically use canvas and feedback context when recalling episodes:
from core.agent_world_model import WorldModelService
service = WorldModelService()
result = await service.recall_experiences(
agent=agent,
current_task_description="Show me sales data"
)
# Episodes include canvas_context and feedback_context
for episode in result["episodes"]:
canvas_context = episode.get("canvas_context", [])
# Agent reasoning: "User liked charts, didn't like sheets"
for canvas in canvas_context:
if canvas["action"] == "close":
print(f"User didn't like {canvas['canvas_type']}")Complete Documentation: See CANVAS_FEEDBACK_EPISODIC_MEMORY.md for full details.
from core.agent_graduation_service import AgentGraduationService
from core.database import get_db_session
with get_db_session() as db:
service = AgentGraduationService(db)
result = await service.calculate_readiness_score(
agent_id="student_agent",
target_maturity="INTERN"
)
if result["ready"]:
print(f"✓ Ready for promotion! Score: {result['score']}/100")
else:
print(f"✗ Not ready. Score: {result['score']}/100")
print(f"Gaps: {', '.join(result['gaps'])}")
print(f"Recommendation: {result['recommendation']}")| Level | Min Episodes | Max Interventions | Min Score |
|---|---|---|---|
| INTERN | 10 | 50% | 70/100 |
| SUPERVISED | 25 | 20% | 85/100 |
| AUTONOMOUS | 50 | 0% | 95/100 |
# After readiness check passes
await service.promote_agent(
agent_id="student_agent",
new_maturity="INTERN",
validated_by="admin_user"
)# For governance compliance
audit = await service.get_graduation_audit_trail(agent_id="agent_456")
print(f"Agent: {audit['agent_name']}")
print(f"Maturity: {audit['current_maturity']}")
print(f"Total Episodes: {audit['total_episodes']}")
print(f"Total Interventions: {audit['total_interventions']}")
print(f"Constitutional Score: {audit['avg_constitutional_score']}")# Create episode
curl -X POST http://localhost:8000/api/episodes/create \
-H "Content-Type: application/json" \
-d '{
"session_id": "chat_123",
"agent_id": "agent_456"
}'
# Temporal retrieval
curl -X POST http://localhost:8000/api/episodes/retrieve/temporal \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_456",
"time_range": "7d"
}'
# Semantic retrieval
curl -X POST http://localhost:8000/api/episodes/retrieve/semantic \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_456",
"query": "tax calculations"
}'
# Get full episode
curl http://localhost:8000/api/episodes/retrieve/episode_123?agent_id=agent_456# Check readiness
curl http://localhost:8000/api/episodes/graduation/readiness/agent_456?target_maturity=INTERN
# Run exam
curl -X POST http://localhost:8000/api/episodes/graduation/exam \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_456",
"edge_case_episodes": ["ep_1", "ep_2", "ep_3"]
}'
# Promote agent
curl -X POST http://localhost:8000/api/episodes/graduation/promote \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_456",
"new_maturity": "INTERN",
"validated_by": "user_789"
}'
# Get audit trail
curl http://localhost:8000/api/episodes/graduation/audit/agent_456from core.agent_world_model import WorldModelService
world_model = WorldModelService()
# Agent automatically recalls relevant past episodes
recall = await world_model.recall_experiences(
agent=agent,
current_task_description="Calculate HST for machinery sale",
limit=5
)
# Episodes included in recall results
for episode in recall["episodes"]:
# Agent can learn from past successes/failures
if episode["human_intervention_count"] == 0:
print(f"Autonomous success: {episode['title']}")# Monitor agent's journey from STUDENT to AUTONOMOUS
audit = await service.get_graduation_audit_trail(agent_id="agent_456")
episodes_by_maturity = audit["episodes_by_maturity"]
print(f"Student episodes: {episodes_by_maturity.get('STUDENT', 0)}")
print(f"Intern episodes: {episodes_by_maturity.get('INTERN', 0)}")
print(f"Supervised episodes: {episodes_by_maturity.get('SUPERVISED', 0)}")
print(f"Autonomous episodes: {episodes_by_maturity.get('AUTONOMOUS', 0)}")# Generate report for hospital board
audit = await service.get_graduation_audit_trail(agent_id="medscribe_agent")
report = f"""
MedScribe Graduation Report
============================
Total Episodes: {audit['total_episodes']}
Total Interventions: {audit['total_interventions']}
Avg Constitutional Score: {audit['avg_constitutional_score']:.2f}
Current Maturity: {audit['current_maturity']}
Compliance Status: {'✓ PASS' if audit['total_interventions'] == 0 else '✗ FAIL'}
"""
print(report)- Use Temporal Retrieval for recent episodes (< 30 days) - fastest (~10ms)
- Use Semantic Retrieval for finding similar episodes across all time
- Use Contextual Retrieval as default - balances recency and relevance
- Cache Episode IDs for frequently accessed episodes
Problem: Episodes not appearing after agent sessions
Solution:
- Check if
agent_idis provided in chat request - Verify LanceDB is accessible:
python -c "from core.lancedb_handler import get_lancedb_handler; print(get_lancedb_handler().test_connection())" - Check session has sufficient content (min 2 messages or 1 execution)
Problem: Retrieval returns "governance_check": {"allowed": False}
Solution:
- Verify agent maturity level
- Check action complexity matches maturity (see Governance Requirements)
- Review
governance_check["reason"]for specific failure reason
Problem: Semantic retrieval > 100ms
Solution:
- Verify LanceDB connection to S3/local storage
- Check embedding provider is configured
- Consider using temporal retrieval for recent episodes
- Read Full Implementation Guide
- Review Agent Graduation Guide
- Check API Documentation
- Run tests:
pytest tests/test_episode_*.py -v