<< goback()

How to Persist and Search Letta Conversations with Versioned Stores on Typedef

Typedef Team

How to Persist and Search Letta Conversations with Versioned Stores on Typedef

Letta agents require persistent memory to maintain context across sessions and learn from interactions. While Letta handles in-context memory blocks automatically, integrating Typedef's catalog system provides a production-grade storage layer with SQL queryability, DataFrame operations, and version-controlled conversation history.

This guide demonstrates how to build a durable conversation store for Letta agents using Typedef's Fenic framework, enabling queryable conversation history, semantic search capabilities, and reliable state management.

Architecture Overview

The integration combines three core components:

Letta Agent Layer - Manages stateful agents with memory blocks, tool execution, and conversation orchestration. Letta provides automatic persistence for agent state but benefits from external storage for conversation analytics and cross-session queries.

Typedef Catalog Layer - Provides persistent storage with full SQL support, schema management, and table versioning. The catalog stores conversation history, entity extractions, and interaction metadata in structured tables.

MCP Integration Layer - Exposes catalog operations as tools that Letta agents can call directly. This enables agents to query their own conversation history, search past interactions, and retrieve context from previous sessions.

Setting Up the Storage Infrastructure

Install Fenic and configure a session with persistent local storage.

python
pip install --upgrade fenic

Create a Python file for your storage configuration:

python
from fenic.api.session.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel
from pathlib import Path
import fenic.api.functions as fc

config = SessionConfig(
    app_name="letta_conversation_store",
    db_path=Path("./letta_conversations.db"),
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "openai": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        default_embedding_model="openai"
    )
)

session = Session.get_or_create(config)
session.catalog.create_database("letta_memory")
session.catalog.set_current_database("letta_memory")

The db_path parameter ensures all table data persists locally across sessions. Each save operation writes to this database, creating a versioned history of all conversations.

Designing the Conversation Schema

Define tables to capture different aspects of Letta agent interactions.

Conversation Messages Table

Store the complete message history with timestamps and metadata.

python
from fenic.core.types import StringType, IntegerType, TimestampType
from fenic.core.types.schema import Schema, ColumnField
from datetime import datetime

conversation_schema = Schema([
    ColumnField("message_id", StringType),
    ColumnField("agent_id", StringType),
    ColumnField("conversation_id", StringType),
    ColumnField("timestamp", TimestampType),
    ColumnField("role", StringType),
    ColumnField("content", StringType),
    ColumnField("tool_calls", StringType),
    ColumnField("memory_state", StringType)
])

session.catalog.create_table(
    "conversations",
    conversation_schema,
    description="Complete message history for all Letta agents with tool calls and memory states"
)

Memory Block Snapshots Table

Track changes to Letta's editable memory blocks over time.

python
memory_snapshot_schema = Schema([
    ColumnField("snapshot_id", StringType),
    ColumnField("agent_id", StringType),
    ColumnField("conversation_id", StringType),
    ColumnField("timestamp", TimestampType),
    ColumnField("block_label", StringType),
    ColumnField("block_value", StringType),
    ColumnField("block_limit", IntegerType)
])

session.catalog.create_table(
    "memory_snapshots",
    memory_snapshot_schema,
    description="Versioned snapshots of Letta agent memory blocks showing evolution over time"
)

Extracted Entities Table

Store entities and facts extracted from conversations for semantic queries.

python
entity_schema = Schema([
    ColumnField("entity_id", StringType),
    ColumnField("agent_id", StringType),
    ColumnField("conversation_id", StringType),
    ColumnField("entity_type", StringType),
    ColumnField("entity_name", StringType),
    ColumnField("entity_value", StringType),
    ColumnField("first_mentioned", TimestampType),
    ColumnField("last_updated", TimestampType)
])

session.catalog.create_table(
    "entities",
    entity_schema,
    description="Named entities and facts extracted from agent conversations"
)

Writing Conversation Data

Create functions to persist Letta agent interactions to the catalog.

Storing Message Turns

python
import json

def store_message(
    message_id: str,
    agent_id: str,
    conversation_id: str,
    role: str,
    content: str,
    tool_calls: list = None,
    memory_state: dict = None
):
    """Store a single message turn in the catalog."""
    df = session.create_dataframe({
        "message_id": [message_id],
        "agent_id": [agent_id],
        "conversation_id": [conversation_id],
        "timestamp": [datetime.now()],
        "role": [role],
        "content": [content],
        "tool_calls": [json.dumps(tool_calls or [])],
        "memory_state": [json.dumps(memory_state or {})]
    })

    df.write.save_as_table("conversations", mode="append")

Versioning Memory Blocks

Capture memory block changes after each agent interaction.

python
def snapshot_memory_blocks(
    agent_id: str,
    conversation_id: str,
    memory_blocks: list
):
    """Create versioned snapshots of all memory blocks."""
    snapshots = []

    for block in memory_blocks:
        snapshots.append({
            "snapshot_id": f"{agent_id}_{datetime.now().isoformat()}",
            "agent_id": agent_id,
            "conversation_id": conversation_id,
            "timestamp": datetime.now(),
            "block_label": block.get("label"),
            "block_value": block.get("value"),
            "block_limit": block.get("limit", 2000)
        })

    df = session.create_dataframe(snapshots)
    df.write.save_as_table("memory_snapshots", mode="append")

Batch Write Pattern

For high-throughput scenarios, batch multiple messages before writing.

python
def store_conversation_batch(messages: list):
    """Store multiple messages in a single write operation."""
    df = session.create_dataframe(messages)
    df.write.save_as_table("conversations", mode="append")

Creating Query Tools for Letta Agents

Register catalog-backed tools that Letta can call through MCP to search its own history.

Recent Conversation Retrieval

python
from fenic.core.mcp.types import ToolParam

recent_messages = (
    session.table("conversations")
    .filter(
        (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) &
        (fc.col("conversation_id") == fc.tool_param("conversation_id", StringType))
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.tool_param("limit", IntegerType))
)

session.catalog.create_tool(
    tool_name="search_conversations",
    tool_description="Search for specific terms across all agent conversations",
    tool_query=conversation_search,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="agent_id",
            description="The agent ID"
        ),
        ToolParam(
            name="search_term",
            description="Term to search for in conversation content"
        ),
        ToolParam(
            name="limit",
            description="Maximum number of results",
            has_default=True,
            default_value=10
        )
    ]
)

Memory Block History Query

Track how agent memory evolved over time.

python
memory_evolution = (
    session.table("memory_snapshots")
    .filter(
        (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) &
        (fc.col("block_label") == fc.tool_param("block_label", StringType))
    )
    .sort(fc.col("timestamp").asc())
)

session.catalog.create_tool(
    tool_name="get_memory_history",
    tool_description="Retrieves the evolution of a specific memory block over time",
    tool_query=memory_evolution,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="agent_id",
            description="The agent ID"
        ),
        ToolParam(
            name="block_label",
            description="Memory block label (e.g., 'human', 'persona')"
        )
    ]
)

Cross-Conversation Search

Search for specific terms across all conversations for an agent.

python
conversation_search = (
    session.table("conversations")
    .filter(
        (fc.col("agent_id") == fc.tool_param("agent_id", StringType)) &
        fc.col("content").contains(fc.tool_param("search_term", StringType))
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.tool_param("limit", IntegerType))
)

session.catalog.create_tool(
    tool_name="search_conversations",
    tool_description="Search for specific terms across all agent conversations",
    tool_query=conversation_search,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="agent_id",
            description="The agent ID"
        ),
        ToolParam(
            name="search_term",
            description="Term to search for in conversation content"
        ),
        ToolParam(
            name="limit",
            description="Maximum number of results",
            default_value=10
        )
    ]
)

Semantic Search with Embeddings

Enable semantic similarity search across conversation history.

Adding Embeddings to Conversations

python
conversations_df = session.table("conversations")

conversations_with_embeddings = conversations_df.with_column(
    "content_embedding",
    fc.semantic.embed(fc.col("content"))
)

conversations_with_embeddings.write.save_as_table(
    "conversations_embedded",
    mode="overwrite"
)

session.catalog.set_table_description(
    "conversations_embedded",
    "Conversation messages with semantic embeddings for similarity search"
)

Clustering Conversations by Topic

Group related conversations using semantic clustering.

python
clustered_conversations = (
    conversations_with_embeddings
    .semantic.with_cluster_labels(
        by=fc.col("content_embedding"),
        num_clusters=10,
        label_column="topic_cluster"
    )
)

clustered_conversations.write.save_as_table(
    "conversation_topics",
    mode="overwrite"
)

Topic Distribution Analysis

Analyze conversation patterns across agents.

python
topic_distribution = session.sql("""
    SELECT
        agent_id,
        topic_cluster,
        COUNT(*) as message_count,
        MIN(timestamp) as first_message,
        MAX(timestamp) as last_message
    FROM {conversations}
    GROUP BY agent_id, topic_cluster
    ORDER BY agent_id, message_count DESC
""", conversations=session.table("conversation_topics"))

topic_distribution.show()

Running the MCP Server

Expose catalog tools to Letta agents through an MCP server.

Development Server

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.api.mcp.tools import SystemToolConfig

tools = session.catalog.list_tools()

server = create_mcp_server(
    session,
    "Letta Conversation Store",
    user_defined_tools=tools,
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="letta",
        max_result_rows=100
    ),
    concurrency_limit=10
)

run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production Deployment

python
from fenic.api.mcp import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server,
    stateless_http=False,
    path="/mcp"
)

Deploy with uvicorn:

bash
uvicorn server:app --host 0.0.0.0 --port 8000 --workers 4

Integrating with Letta Agents

Configure Letta to use the conversation store tools through MCP.

Creating Tools in Letta

python
from letta import create_client

client = create_client()

def get_conversation_history(agent_id: str, conversation_id: str, limit: int = 20):
    """Retrieve conversation history from Typedef catalog."""
    import requests

    response = requests.post(
        "http://localhost:8000/mcp",
        json={
            "tool": "get_recent_messages",
            "params": {
                "agent_id": agent_id,
                "conversation_id": conversation_id,
                "limit": limit
            }
        }
    )
    return response.json()

def search_past_conversations(agent_id: str, search_term: str, limit: int = 10):
    """Search for specific terms in past conversations."""
    import requests

    response = requests.post(
        "http://localhost:8000/mcp",
        json={
            "tool": "search_conversations",
            "params": {
                "agent_id": agent_id,
                "search_term": search_term,
                "limit": limit
            }
        }
    )
    return response.json()

Wrapping Storage Operations

Create a wrapper that automatically persists Letta conversations.

python
class LettaConversationStore:
    def __init__(self, session, agent_id):
        self.session = session
        self.agent_id = agent_id

    def log_message(self, conversation_id, role, content, tool_calls=None):
        """Log a message to the catalog."""
        message_id = f"{self.agent_id}_{datetime.now().isoformat()}"
        store_message(
            message_id=message_id,
            agent_id=self.agent_id,
            conversation_id=conversation_id,
            role=role,
            content=content,
            tool_calls=tool_calls
        )

    def log_memory_state(self, conversation_id, memory_blocks):
        """Log current memory block state."""
        snapshot_memory_blocks(
            agent_id=self.agent_id,
            conversation_id=conversation_id,
            memory_blocks=memory_blocks
        )

    def get_conversation_context(self, conversation_id, limit=20):
        """Retrieve recent conversation context."""
        df = (
            self.session.table("conversations")
            .filter(
                (fc.col("agent_id") == self.agent_id) &
                (fc.col("conversation_id") == conversation_id)
            )
            .sort(fc.col("timestamp").desc())
            .limit(limit)
        )
        return df.to_pydict()

Usage with Letta Agents

python
from letta import LettaClient

letta_client = LettaClient()

agent_state = letta_client.agents.create(
    model="openai/gpt-4o",
    embedding="openai/text-embedding-3-small",
    memory_blocks=[
        {"label": "human", "value": "The user's name is Alex."},
        {"label": "persona", "value": "I am a helpful assistant with persistent memory."}
    ]
)

conversation_id = "session_001"
store = LettaConversationStore(session, agent_state.id)

user_message = "What projects did we discuss last week?"
store.log_message(conversation_id, "user", user_message)

response = letta_client.messages.create(
    agent_id=agent_state.id,
    messages=[{"role": "user", "content": user_message}]
)

store.log_message(conversation_id, "assistant", response.content)
store.log_memory_state(conversation_id, agent_state.memory_blocks)

Advanced Query Patterns

Build sophisticated analytics on conversation data.

Conversation Metrics

python
conversation_stats = session.sql("""
    SELECT
        agent_id,
        conversation_id,
        COUNT(*) as total_messages,
        SUM(CASE WHEN role = 'user' THEN 1 ELSE 0 END) as user_messages,
        SUM(CASE WHEN role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
        MIN(timestamp) as conversation_start,
        MAX(timestamp) as conversation_end
    FROM {conversations}
    GROUP BY agent_id, conversation_id
    ORDER BY conversation_start DESC
""", conversations=session.table("conversations"))

conversation_stats.show()

Memory Evolution Analysis

Track how memory blocks change throughout a conversation.

python
memory_changes = session.sql("""
    SELECT
        m1.agent_id,
        m1.block_label,
        m1.timestamp as change_time,
        m1.block_value as new_value,
        LAG(m1.block_value) OVER (
            PARTITION BY m1.agent_id, m1.block_label
            ORDER BY m1.timestamp
        ) as previous_value
    FROM {snapshots} m1
    ORDER BY m1.agent_id, m1.block_label, m1.timestamp
""", snapshots=session.table("memory_snapshots"))

memory_changes.show()

Entity Frequency Analysis

Identify the most frequently discussed entities.

python
entity_frequency = session.sql("""
    SELECT
        entity_type,
        entity_name,
        COUNT(DISTINCT conversation_id) as conversation_count,
        MIN(first_mentioned) as first_seen,
        MAX(last_updated) as last_seen
    FROM {entities}
    GROUP BY entity_type, entity_name
    ORDER BY conversation_count DESC
    LIMIT 20
""", entities=session.table("entities"))

entity_frequency.show()

Monitoring and Metrics

Track storage performance and costs with built-in metrics.

python
metrics_df = session.table("fenic_system.query_metrics")

storage_metrics = session.sql("""
    SELECT
        DATE(end_ts) as date,
        COUNT(*) as total_operations,
        SUM(total_lm_requests) as llm_calls,
        SUM(total_lm_cost) as total_cost,
        AVG(latency_ms) as avg_latency
    FROM {metrics}
    WHERE query_text LIKE '%conversations%'
    GROUP BY DATE(end_ts)
    ORDER BY date DESC
""", metrics=metrics_df)

storage_metrics.show()

Version Control and Rollback

Typedef's catalog enables point-in-time recovery for conversations.

Creating Conversation Snapshots

python
from datetime import datetime

snapshot_name = f"conversations_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

conversations_df = session.table("conversations")
conversations_df.write.save_as_table(snapshot_name, mode="overwrite")

session.catalog.set_table_description(
    snapshot_name,
    f"Conversation backup created at {datetime.now().isoformat()}"
)

Restoring from Snapshots

python
def restore_conversations(snapshot_table: str):
    """Restore conversations from a snapshot."""
    snapshot_df = session.table(snapshot_table)
    snapshot_df.write.save_as_table("conversations", mode="overwrite")
    print(f"Restored conversations from {snapshot_table}")

restore_conversations("conversations_backup_20250115_140000")

Comparing Versions

python
def compare_conversation_versions(table_v1: str, table_v2: str):
    """Compare two versions of conversation data."""
    df_v1 = session.table(table_v1)
    df_v2 = session.table(table_v2)

    result = session.sql("""
        SELECT
            'v1_only' as source,
            COUNT(*) as count
        FROM {v1}
        WHERE message_id NOT IN (SELECT message_id FROM {v2})
        UNION ALL
        SELECT
            'v2_only' as source,
            COUNT(*) as count
        FROM {v2}
        WHERE message_id NOT IN (SELECT message_id FROM {v1})
    """, v1=df_v1, v2=df_v2)

    return result

comparison = compare_conversation_versions(
    "conversations_backup_20250115_140000",
    "conversations"
)
comparison.show()

Production Best Practices

Data Retention Policies

Implement automatic cleanup of old conversations.

python
def cleanup_old_conversations(retention_days: int = 90):
    """Remove conversations older than retention period."""
    cutoff_date = datetime.now() - timedelta(days=retention_days)

    conversations_df = session.table("conversations")
    active_conversations = conversations_df.filter(
        fc.col("timestamp") >= fc.lit(cutoff_date)
    )

    active_conversations.write.save_as_table(
        "conversations",
        mode="overwrite"
    )

Batch Processing

For high-volume scenarios, use batch writes to reduce overhead.

python
class BatchedConversationStore:
    def __init__(self, session, batch_size=100):
        self.session = session
        self.batch_size = batch_size
        self.message_buffer = []

    def add_message(self, message_data):
        """Add message to buffer."""
        self.message_buffer.append(message_data)

        if len(self.message_buffer) >= self.batch_size:
            self.flush()

    def flush(self):
        """Write buffered messages to catalog."""
        if not self.message_buffer:
            return

        df = self.session.create_dataframe(self.message_buffer)
        df.write.save_as_table("conversations", mode="append")
        self.message_buffer = []

Error Handling

Implement robust error handling for storage operations.

python
from fenic.core.error import ExecutionError, ValidationError

def safe_store_message(message_data):
    """Store message with error handling."""
    try:
        df = session.create_dataframe([message_data])
        df.write.save_as_table("conversations", mode="append")
        return True
    except ValidationError as e:
        print(f"Validation error: {e}")
        return False
    except ExecutionError as e:
        print(f"Execution error: {e}")
        return False

Performance Optimization

Table Indexing

While Fenic handles internal optimization, structure queries to minimize full table scans.

python
optimized_query = (
    session.table("conversations")
    .filter(fc.col("agent_id") == "agent_123")
    .filter(fc.col("timestamp") >= datetime.now() - timedelta(days=7))
    .select("conversation_id", "content", "timestamp")
)

Using Persist for Reused DataFrames

Cache intermediate results when running multiple queries.

python
recent_conversations = (
    session.table("conversations")
    .filter(fc.col("timestamp") >= datetime.now() - timedelta(days=7))
    .persist()
)

metrics_by_agent = recent_conversations.group_by("agent_id").count()
metrics_by_conversation = recent_conversations.group_by("conversation_id").count()

Next Steps

To start building with Typedef and Letta:

The combination of Letta's stateful agent architecture and Typedef's queryable catalog system provides a production-ready foundation for building agents that learn from persistent conversation history. Start with simple message logging, then scale to semantic search and cross-conversation analytics as your needs grow. How to Persist and Search Le ... efcf0800ea1d3fc8ece85d499.md External Displaying How to Persist and Search Letta Conversations with 295df41efcf0800ea1d3fc8ece85d499.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.