Letta agents require persistent memory to maintain context across sessions and learn from interactions. While Letta handles in-context memory blocks automatically, integrating Typedef's catalog system provides a production-grade storage layer with SQL queryability, DataFrame operations, and version-controlled conversation history.
This guide demonstrates how to build a durable conversation store for Letta agents using Typedef's Fenic framework, enabling queryable conversation history, semantic search capabilities, and reliable state management.
Architecture Overview
The integration combines three core components:
Letta Agent Layer - Manages stateful agents with memory blocks, tool execution, and conversation orchestration. Letta provides automatic persistence for agent state but benefits from external storage for conversation analytics and cross-session queries.
Typedef Catalog Layer - Provides persistent storage with full SQL support, schema management, and table versioning. The catalog stores conversation history, entity extractions, and interaction metadata in structured tables.
MCP Integration Layer - Exposes catalog operations as tools that Letta agents can call directly. This enables agents to query their own conversation history, search past interactions, and retrieve context from previous sessions.
Setting Up the Storage Infrastructure
Install Fenic and configure a session with persistent local storage.
pythonpip install --upgrade fenic
Create a Python file for your storage configuration:
pythonfrom fenic.api.session.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel from pathlib import Path import fenic.api.functions as fc config = SessionConfig( app_name="letta_conversation_store", db_path=Path("./letta_conversations.db"), semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, embedding_models={ "openai": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_language_model="gpt4", default_embedding_model="openai" ) ) session = Session.get_or_create(config) session.catalog.create_database("letta_memory") session.catalog.set_current_database("letta_memory")
The db_path parameter ensures all table data persists locally across sessions. Each save operation writes to this database, creating a versioned history of all conversations.
Designing the Conversation Schema
Define tables to capture different aspects of Letta agent interactions.
Conversation Messages Table
Store the complete message history with timestamps and metadata.
pythonfrom fenic.core.types import StringType, IntegerType, TimestampType from fenic.core.types.schema import Schema, ColumnField from datetime import datetime conversation_schema = Schema([ ColumnField("message_id", StringType), ColumnField("agent_id", StringType), ColumnField("conversation_id", StringType), ColumnField("timestamp", TimestampType), ColumnField("role", StringType), ColumnField("content", StringType), ColumnField("tool_calls", StringType), ColumnField("memory_state", StringType) ]) session.catalog.create_table( "conversations", conversation_schema, description="Complete message history for all Letta agents with tool calls and memory states" )
Memory Block Snapshots Table
Track changes to Letta's editable memory blocks over time.
pythonmemory_snapshot_schema = Schema([ ColumnField("snapshot_id", StringType), ColumnField("agent_id", StringType), ColumnField("conversation_id", StringType), ColumnField("timestamp", TimestampType), ColumnField("block_label", StringType), ColumnField("block_value", StringType), ColumnField("block_limit", IntegerType) ]) session.catalog.create_table( "memory_snapshots", memory_snapshot_schema, description="Versioned snapshots of Letta agent memory blocks showing evolution over time" )
Extracted Entities Table
Store entities and facts extracted from conversations for semantic queries.
pythonentity_schema = Schema([ ColumnField("entity_id", StringType), ColumnField("agent_id", StringType), ColumnField("conversation_id", StringType), ColumnField("entity_type", StringType), ColumnField("entity_name", StringType), ColumnField("entity_value", StringType), ColumnField("first_mentioned", TimestampType), ColumnField("last_updated", TimestampType) ]) session.catalog.create_table( "entities", entity_schema, description="Named entities and facts extracted from agent conversations" )
Writing Conversation Data
Create functions to persist Letta agent interactions to the catalog.
Storing Message Turns
pythonimport json def store_message( message_id: str, agent_id: str, conversation_id: str, role: str, content: str, tool_calls: list = None, memory_state: dict = None ): """Store a single message turn in the catalog.""" df = session.create_dataframe({ "message_id": [message_id], "agent_id": [agent_id], "conversation_id": [conversation_id], "timestamp": [datetime.now()], "role": [role], "content": [content], "tool_calls": [json.dumps(tool_calls or [])], "memory_state": [json.dumps(memory_state or {})] }) df.write.save_as_table("conversations", mode="append")
Versioning Memory Blocks
Capture memory block changes after each agent interaction.
pythondef snapshot_memory_blocks( agent_id: str, conversation_id: str, memory_blocks: list ): """Create versioned snapshots of all memory blocks.""" snapshots = [] for block in memory_blocks: snapshots.append({ "snapshot_id": f"{agent_id}_{datetime.now().isoformat()}", "agent_id": agent_id, "conversation_id": conversation_id, "timestamp": datetime.now(), "block_label": block.get("label"), "block_value": block.get("value"), "block_limit": block.get("limit", 2000) }) df = session.create_dataframe(snapshots) df.write.save_as_table("memory_snapshots", mode="append")
Batch Write Pattern
For high-throughput scenarios, batch multiple messages before writing.
pythondef store_conversation_batch(messages: list): """Store multiple messages in a single write operation.""" df = session.create_dataframe(messages) df.write.save_as_table("conversations", mode="append")
Creating Query Tools for Letta Agents
Register catalog-backed tools that Letta can call through MCP to search its own history.
Recent Conversation Retrieval
pythonfrom fenic.core.mcp.types import ToolParam recent_messages = ( session.table("conversations") .filter( (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) & (fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)) ) .sort(fc.col("timestamp").desc()) .limit(fc.tool_param("limit", IntegerType)) ) session.catalog.create_tool( tool_name="search_conversations", tool_description="Search for specific terms across all agent conversations", tool_query=conversation_search, result_limit=50, tool_params=[ ToolParam( name="agent_id", description="The agent ID" ), ToolParam( name="search_term", description="Term to search for in conversation content" ), ToolParam( name="limit", description="Maximum number of results", has_default=True, default_value=10 ) ] )
Memory Block History Query
Track how agent memory evolved over time.
pythonmemory_evolution = ( session.table("memory_snapshots") .filter( (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) & (fc.col("block_label") == fc.tool_param("block_label", StringType)) ) .sort(fc.col("timestamp").asc()) ) session.catalog.create_tool( tool_name="get_memory_history", tool_description="Retrieves the evolution of a specific memory block over time", tool_query=memory_evolution, result_limit=50, tool_params=[ ToolParam( name="agent_id", description="The agent ID" ), ToolParam( name="block_label", description="Memory block label (e.g., 'human', 'persona')" ) ] )
Cross-Conversation Search
Search for specific terms across all conversations for an agent.
pythonconversation_search = ( session.table("conversations") .filter( (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) & fc.col("content").contains(fc.tool_param("search_term", StringType)) ) .sort(fc.col("timestamp").desc()) .limit(fc.tool_param("limit", IntegerType)) ) session.catalog.create_tool( tool_name="search_conversations", tool_description="Search for specific terms across all agent conversations", tool_query=conversation_search, result_limit=50, tool_params=[ ToolParam( name="agent_id", description="The agent ID" ), ToolParam( name="search_term", description="Term to search for in conversation content" ), ToolParam( name="limit", description="Maximum number of results", default_value=10 ) ] )
Semantic Search with Embeddings
Enable semantic similarity search across conversation history.
Adding Embeddings to Conversations
pythonconversations_df = session.table("conversations") conversations_with_embeddings = conversations_df.with_column( "content_embedding", fc.semantic.embed(fc.col("content")) ) conversations_with_embeddings.write.save_as_table( "conversations_embedded", mode="overwrite" ) session.catalog.set_table_description( "conversations_embedded", "Conversation messages with semantic embeddings for similarity search" )
Clustering Conversations by Topic
Group related conversations using semantic clustering.
pythonclustered_conversations = ( conversations_with_embeddings .semantic.with_cluster_labels( by=fc.col("content_embedding"), num_clusters=10, label_column="topic_cluster" ) ) clustered_conversations.write.save_as_table( "conversation_topics", mode="overwrite" )
Topic Distribution Analysis
Analyze conversation patterns across agents.
pythontopic_distribution = session.sql(""" SELECT agent_id, topic_cluster, COUNT(*) as message_count, MIN(timestamp) as first_message, MAX(timestamp) as last_message FROM {conversations} GROUP BY agent_id, topic_cluster ORDER BY agent_id, message_count DESC """, conversations=session.table("conversation_topics")) topic_distribution.show()
Running the MCP Server
Expose catalog tools to Letta agents through an MCP server.
Development Server
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.api.mcp.tools import SystemToolConfig tools = session.catalog.list_tools() server = create_mcp_server( session, "Letta Conversation Store", user_defined_tools=tools, system_tools=SystemToolConfig( table_names=session.catalog.list_tables(), tool_namespace="letta", max_result_rows=100 ), concurrency_limit=10 ) run_mcp_server_sync( server, transport="http", stateless_http=False, port=8000, host="127.0.0.1", path="/mcp" )
Production Deployment
pythonfrom fenic.api.mcp import run_mcp_server_asgi app = run_mcp_server_asgi( server, stateless_http=False, path="/mcp" )
Deploy with uvicorn:
bashuvicorn server:app --host 0.0.0.0 --port 8000 --workers 4
Integrating with Letta Agents
Configure Letta to use the conversation store tools through MCP.
Creating Tools in Letta
pythonfrom letta import create_client client = create_client() def get_conversation_history(agent_id: str, conversation_id: str, limit: int = 20): """Retrieve conversation history from Typedef catalog.""" import requests response = requests.post( "http://localhost:8000/mcp", json={ "tool": "get_recent_messages", "params": { "agent_id": agent_id, "conversation_id": conversation_id, "limit": limit } } ) return response.json() def search_past_conversations(agent_id: str, search_term: str, limit: int = 10): """Search for specific terms in past conversations.""" import requests response = requests.post( "http://localhost:8000/mcp", json={ "tool": "search_conversations", "params": { "agent_id": agent_id, "search_term": search_term, "limit": limit } } ) return response.json()
Wrapping Storage Operations
Create a wrapper that automatically persists Letta conversations.
pythonclass LettaConversationStore: def __init__(self, session, agent_id): self.session = session self.agent_id = agent_id def log_message(self, conversation_id, role, content, tool_calls=None): """Log a message to the catalog.""" message_id = f"{self.agent_id}_{datetime.now().isoformat()}" store_message( message_id=message_id, agent_id=self.agent_id, conversation_id=conversation_id, role=role, content=content, tool_calls=tool_calls ) def log_memory_state(self, conversation_id, memory_blocks): """Log current memory block state.""" snapshot_memory_blocks( agent_id=self.agent_id, conversation_id=conversation_id, memory_blocks=memory_blocks ) def get_conversation_context(self, conversation_id, limit=20): """Retrieve recent conversation context.""" df = ( self.session.table("conversations") .filter( (fc.col("agent_id") == self.agent_id) & (fc.col("conversation_id") == conversation_id) ) .sort(fc.col("timestamp").desc()) .limit(limit) ) return df.to_pydict()
Usage with Letta Agents
pythonfrom letta import LettaClient letta_client = LettaClient() agent_state = letta_client.agents.create( model="openai/gpt-4o", embedding="openai/text-embedding-3-small", memory_blocks=[ {"label": "human", "value": "The user's name is Alex."}, {"label": "persona", "value": "I am a helpful assistant with persistent memory."} ] ) conversation_id = "session_001" store = LettaConversationStore(session, agent_state.id) user_message = "What projects did we discuss last week?" store.log_message(conversation_id, "user", user_message) response = letta_client.messages.create( agent_id=agent_state.id, messages=[{"role": "user", "content": user_message}] ) store.log_message(conversation_id, "assistant", response.content) store.log_memory_state(conversation_id, agent_state.memory_blocks)
Advanced Query Patterns
Build sophisticated analytics on conversation data.
Conversation Metrics
pythonconversation_stats = session.sql(""" SELECT agent_id, conversation_id, COUNT(*) as total_messages, SUM(CASE WHEN role = 'user' THEN 1 ELSE 0 END) as user_messages, SUM(CASE WHEN role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, MIN(timestamp) as conversation_start, MAX(timestamp) as conversation_end FROM {conversations} GROUP BY agent_id, conversation_id ORDER BY conversation_start DESC """, conversations=session.table("conversations")) conversation_stats.show()
Memory Evolution Analysis
Track how memory blocks change throughout a conversation.
pythonmemory_changes = session.sql(""" SELECT m1.agent_id, m1.block_label, m1.timestamp as change_time, m1.block_value as new_value, LAG(m1.block_value) OVER ( PARTITION BY m1.agent_id, m1.block_label ORDER BY m1.timestamp ) as previous_value FROM {snapshots} m1 ORDER BY m1.agent_id, m1.block_label, m1.timestamp """, snapshots=session.table("memory_snapshots")) memory_changes.show()
Entity Frequency Analysis
Identify the most frequently discussed entities.
pythonentity_frequency = session.sql(""" SELECT entity_type, entity_name, COUNT(DISTINCT conversation_id) as conversation_count, MIN(first_mentioned) as first_seen, MAX(last_updated) as last_seen FROM {entities} GROUP BY entity_type, entity_name ORDER BY conversation_count DESC LIMIT 20 """, entities=session.table("entities")) entity_frequency.show()
Monitoring and Metrics
Track storage performance and costs with built-in metrics.
pythonmetrics_df = session.table("fenic_system.query_metrics") storage_metrics = session.sql(""" SELECT DATE(end_ts) as date, COUNT(*) as total_operations, SUM(total_lm_requests) as llm_calls, SUM(total_lm_cost) as total_cost, AVG(latency_ms) as avg_latency FROM {metrics} WHERE query_text LIKE '%conversations%' GROUP BY DATE(end_ts) ORDER BY date DESC """, metrics=metrics_df) storage_metrics.show()
Version Control and Rollback
Typedef's catalog enables point-in-time recovery for conversations.
Creating Conversation Snapshots
pythonfrom datetime import datetime snapshot_name = f"conversations_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" conversations_df = session.table("conversations") conversations_df.write.save_as_table(snapshot_name, mode="overwrite") session.catalog.set_table_description( snapshot_name, f"Conversation backup created at {datetime.now().isoformat()}" )
Restoring from Snapshots
pythondef restore_conversations(snapshot_table: str): """Restore conversations from a snapshot.""" snapshot_df = session.table(snapshot_table) snapshot_df.write.save_as_table("conversations", mode="overwrite") print(f"Restored conversations from {snapshot_table}") restore_conversations("conversations_backup_20250115_140000")
Comparing Versions
pythondef compare_conversation_versions(table_v1: str, table_v2: str): """Compare two versions of conversation data.""" df_v1 = session.table(table_v1) df_v2 = session.table(table_v2) result = session.sql(""" SELECT 'v1_only' as source, COUNT(*) as count FROM {v1} WHERE message_id NOT IN (SELECT message_id FROM {v2}) UNION ALL SELECT 'v2_only' as source, COUNT(*) as count FROM {v2} WHERE message_id NOT IN (SELECT message_id FROM {v1}) """, v1=df_v1, v2=df_v2) return result comparison = compare_conversation_versions( "conversations_backup_20250115_140000", "conversations" ) comparison.show()
Production Best Practices
Data Retention Policies
Implement automatic cleanup of old conversations.
pythondef cleanup_old_conversations(retention_days: int = 90): """Remove conversations older than retention period.""" cutoff_date = datetime.now() - timedelta(days=retention_days) conversations_df = session.table("conversations") active_conversations = conversations_df.filter( fc.col("timestamp") >= fc.lit(cutoff_date) ) active_conversations.write.save_as_table( "conversations", mode="overwrite" )
Batch Processing
For high-volume scenarios, use batch writes to reduce overhead.
pythonclass BatchedConversationStore: def __init__(self, session, batch_size=100): self.session = session self.batch_size = batch_size self.message_buffer = [] def add_message(self, message_data): """Add message to buffer.""" self.message_buffer.append(message_data) if len(self.message_buffer) >= self.batch_size: self.flush() def flush(self): """Write buffered messages to catalog.""" if not self.message_buffer: return df = self.session.create_dataframe(self.message_buffer) df.write.save_as_table("conversations", mode="append") self.message_buffer = []
Error Handling
Implement robust error handling for storage operations.
pythonfrom fenic.core.error import ExecutionError, ValidationError def safe_store_message(message_data): """Store message with error handling.""" try: df = session.create_dataframe([message_data]) df.write.save_as_table("conversations", mode="append") return True except ValidationError as e: print(f"Validation error: {e}") return False except ExecutionError as e: print(f"Execution error: {e}") return False
Performance Optimization
Table Indexing
While Fenic handles internal optimization, structure queries to minimize full table scans.
pythonoptimized_query = ( session.table("conversations") .filter(fc.col("agent_id") == "agent_123") .filter(fc.col("timestamp") >= datetime.now() - timedelta(days=7)) .select("conversation_id", "content", "timestamp") )
Using Persist for Reused DataFrames
Cache intermediate results when running multiple queries.
pythonrecent_conversations = ( session.table("conversations") .filter(fc.col("timestamp") >= datetime.now() - timedelta(days=7)) .persist() ) metrics_by_agent = recent_conversations.group_by("agent_id").count() metrics_by_conversation = recent_conversations.group_by("conversation_id").count()
Next Steps
To start building with Typedef and Letta:
- Install Fenic:
pip install fenic - Review the Fenic documentation
- Explore the Mastra pipelines guide for additional memory patterns
- Check the LangGraph orchestration guide for agent integration patterns
- Read about building reliable AI pipelines with semantic operators
The combination of Letta's stateful agent architecture and Typedef's queryable catalog system provides a production-ready foundation for building agents that learn from persistent conversation history. Start with simple message logging, then scale to semantic search and cross-conversation analytics as your needs grow. How to Persist and Search Le ... efcf0800ea1d3fc8ece85d499.md External Displaying How to Persist and Search Letta Conversations with 295df41efcf0800ea1d3fc8ece85d499.md.

