<< goback()

How to Build LangGraph Agents That Read/Write Versioned Memory on Typedef

Typedef Team

How to Build LangGraph Agents That Read/Write Versioned Memory on Typedef

Production AI agents require persistent memory that maintains state across sessions, tracks changes over time, and enables reasoning about historical interactions. This guide demonstrates how to build LangGraph agents with versioned memory using Typedef's persistent catalog, combining deterministic data operations with stateful orchestration.

Memory Architecture Components

Typedef's catalog provides production-grade storage that persists across sessions. When paired with LangGraph's state management, this creates a complete memory system with versioning capabilities built on DataFrame operations.

The architecture consists of three interconnected layers:

  • Persistent Catalog Layer - Stores all memory data in tables with schemas, timestamps, and metadata through SQL operations
  • Tool Layer - Exposes catalog operations as MCP tools that agents invoke to read and write memory
  • Orchestration Layer - Manages agent execution flow using LangGraph's state graphs

Configure the Memory System

Configure a Fenic session with persistent storage. The catalog persists to disk, maintaining state across application restarts.

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel
from pathlib import Path

config = SessionConfig(
    app_name="langgraph_memory",
    db_path=Path("./agent_memory.db"),
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4"
    )
)

session = Session.get_or_create(config)

The db_path parameter creates a local SQLite database that persists all catalog operations. For production deployments, point this to cloud storage with identical code.

Design Versioned Memory Tables

Design memory schemas to support versioning through timestamps and version fields. Each memory entry tracks creation time and version number.

python
from fenic.core.types import StringType, IntegerType, TimestampType, BooleanType
from fenic.core.types import Schema, ColumnField
from datetime import datetime

conversation_schema = Schema([
    ColumnField("conversation_id", StringType),
    ColumnField("version", IntegerType),
    ColumnField("timestamp", TimestampType),
    ColumnField("role", StringType),
    ColumnField("content", StringType),
    ColumnField("metadata", StringType),
    ColumnField("is_current", BooleanType)
])

session.catalog.create_table(
    "conversations",
    conversation_schema,
    description="Versioned conversation history with temporal tracking"
)

entity_schema = Schema([
    ColumnField("entity_id", StringType),
    ColumnField("version", IntegerType),
    ColumnField("timestamp", TimestampType),
    ColumnField("entity_type", StringType),
    ColumnField("attributes", StringType),
    ColumnField("is_current", BooleanType)
])

session.catalog.create_table(
    "entities",
    entity_schema,
    description="Entity memory with version tracking"
)

The is_current flag marks the latest version of each entry. The version field enables historical queries. This pattern supports both point-in-time lookups and temporal analysis.

Write Versioned Memory

Implement functions that maintain version history when storing new data. Each write operation increments the version and marks the previous entry as historical.

python
import fenic.api.functions as fc

def write_conversation_turn(conversation_id, role, content, metadata=None):
    """Write a conversation turn with automatic versioning."""

    # Get current max version for this conversation
    existing = session.table("conversations").filter(
        fc.col("conversation_id") == conversation_id
    )

    max_version = existing.select(
        fc.coalesce(fc.max(fc.col("version")), fc.lit(0)).alias("max_version")
    ).to_pydict()["max_version"][0]

    new_version = max_version + 1

    # Mark all current entries as historical
    if max_version > 0:
        updates = existing.filter(fc.col("is_current") == True).select(
            fc.col("conversation_id"),
            fc.col("version"),
            fc.col("timestamp"),
            fc.col("role"),
            fc.col("content"),
            fc.col("metadata"),
            fc.lit(False).alias("is_current")
        )
        updates.write.save_as_table("conversations", mode="append")

    # Write new version
    new_entry = session.create_dataframe({
        "conversation_id": [conversation_id],
        "version": [new_version],
        "timestamp": [datetime.now()],
        "role": [role],
        "content": [content],
        "metadata": [metadata or "{}"],
        "is_current": [True]
    })

    new_entry.write.save_as_table("conversations", mode="append")
    return new_version

def write_entity(entity_id, entity_type, attributes):
    """Update entity with versioning."""

    existing = session.table("entities").filter(
        fc.col("entity_id") == entity_id
    )

    max_version = existing.select(
        fc.coalesce(fc.max(fc.col("version")), fc.lit(0)).alias("max_version")
    ).to_pydict()["max_version"][0]

    new_version = max_version + 1

    # Mark previous version as historical
    if max_version > 0:
        updates = existing.filter(fc.col("is_current") == True).select(
            fc.col("entity_id"),
            fc.col("version"),
            fc.col("timestamp"),
            fc.col("entity_type"),
            fc.col("attributes"),
            fc.lit(False).alias("is_current")
        )
        updates.write.save_as_table("entities", mode="append")

    # Write new version
    new_entity = session.create_dataframe({
        "entity_id": [entity_id],
        "version": [new_version],
        "timestamp": [datetime.now()],
        "entity_type": [entity_type],
        "attributes": [attributes],
        "is_current": [True]
    })

    new_entity.write.save_as_table("entities", mode="append")
    return new_version

This versioning strategy maintains complete history while enabling fast queries for current state through the is_current index.

Create Memory Access Tools

Define catalog-backed tools that LangGraph agents call to interact with memory. Tools are declarative and type-safe.

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType, BooleanType

# Tool to retrieve current conversation state
current_conversations = (
    session.table("conversations")
    .filter(
        (fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)) &
        (fc.col("is_current") == True)
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.coalesce(fc.tool_param("limit", IntegerType), fc.lit(10)))
)

session.catalog.create_tool(
    tool_name="get_current_memory",
    tool_description="Retrieves current conversation memory for a session",
    tool_query=current_conversations,
    result_limit=100,
    tool_params=[
        ToolParam(
            name="conversation_id",
            description="Conversation session identifier"
        ),
        ToolParam(
            name="limit",
            description="Maximum messages to retrieve",
            has_default=True,
            default_value=10
        )
    ]
)

# Tool to retrieve historical versions
historical_conversations = (
    session.table("conversations")
    .filter(
        (fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)) &
        (fc.col("version") == fc.tool_param("version", IntegerType))
    )
    .sort(fc.col("timestamp").asc())
)

session.catalog.create_tool(
    tool_name="get_memory_version",
    tool_description="Retrieves a specific version of conversation memory",
    tool_query=historical_conversations,
    result_limit=100,
    tool_params=[
        ToolParam(
            name="conversation_id",
            description="Conversation session identifier"
        ),
        ToolParam(
            name="version",
            description="Specific version number to retrieve"
        )
    ]
)

# Tool to search entities
entity_search = (
    session.table("entities")
    .filter(
        (fc.col("is_current") == True) &
        (fc.col("attributes").contains(fc.tool_param("search_term", StringType)))
    )
)

session.catalog.create_tool(
    tool_name="search_entities",
    tool_description="Search current entities by attributes",
    tool_query=entity_search,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="search_term",
            description="Term to search in entity attributes"
        )
    ]
)

The catalog validates parameters automatically and handles query execution.

Run the MCP Server

Expose tools through Typedef's MCP server implementation. The server makes catalog operations available to LangGraph agents over HTTP.

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.api.mcp.tools import SystemToolConfig

# Get user-defined tools from catalog
tools = session.catalog.list_tools()

# Create server with both custom and system tools
server = create_mcp_server(
    session,
    "LangGraph Memory Server",
    user_defined_tools=tools,
    system_tools=SystemToolConfig(
        table_names=["conversations", "entities"],
        tool_namespace="memory",
        max_result_rows=100
    ),
    concurrency_limit=10
)

# Run server
run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,
    port=8000,
    host="127.0.0.1"
)

System tools provide automatic operations:

  • Schema inspection
  • Data profiling
  • SQL queries

Setting stateless_http=False maintains session state across requests.

For production deployments, use ASGI:

python
from fenic.api.mcp import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server,
    stateless_http=False,
    path="/mcp"
)

# Deploy with: uvicorn server:app --workers 4 --port 8000

Build the LangGraph Agent

Create a LangGraph agent that integrates with the memory system. The agent uses tools to maintain context across interactions.

python
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
import operator
import requests

# Define agent state with memory tracking
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    conversation_id: str
    current_version: int
    memory_context: dict
    entity_cache: dict

# Wrap MCP endpoints as LangGraph tools
@tool
def retrieve_memory(conversation_id: str, limit: int = 10) -> dict:
    """Retrieve current conversation memory."""
    response = requests.post(
        "http://localhost:8000/tools/get_current_memory",
        json={"conversation_id": conversation_id, "limit": limit},
        timeout=30
    )
    return response.json()

@tool
def retrieve_version(conversation_id: str, version: int) -> dict:
    """Retrieve a specific memory version."""
    response = requests.post(
        "http://localhost:8000/tools/get_memory_version",
        json={"conversation_id": conversation_id, "version": version},
        timeout=30
    )
    return response.json()

@tool
def search_entities(search_term: str) -> dict:
    """Search for entities in memory."""
    response = requests.post(
        "http://localhost:8000/tools/search_entities",
        json={"search_term": search_term},
        timeout=30
    )
    return response.json()

@tool
def store_memory(conversation_id: str, role: str, content: str, metadata: str = "{}") -> dict:
    """Store a new memory entry with versioning."""
    version = write_conversation_turn(conversation_id, role, content, metadata)
    return {"version": version, "status": "stored"}

# Bind tools to LLM
llm = ChatOpenAI(model="gpt-4o")
tools = [retrieve_memory, retrieve_version, search_entities, store_memory]
llm_with_tools = llm.bind_tools(tools)

Implement the Agent Graph

Build the LangGraph state machine that orchestrates memory operations alongside decision-making.

python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

def load_memory(state: AgentState):
    """Load relevant memory before processing."""
    conversation_id = state["conversation_id"]

    # Retrieve current memory
    memory_result = retrieve_memory.invoke({
        "conversation_id": conversation_id,
        "limit": 20
    })

    return {
        "memory_context": memory_result,
        "messages": state["messages"]
    }

def call_model(state: AgentState):
    """Invoke LLM with memory context."""
    messages = state["messages"]
    memory = state.get("memory_context", {})

    # Inject memory context into system message
    memory_context = f"Previous conversation history: {memory}"
    enhanced_messages = [
        {"role": "system", "content": memory_context}
    ] + messages

    response = llm_with_tools.invoke(enhanced_messages)
    return {"messages": [response]}

def execute_tools(state: AgentState):
    """Execute tool calls from the LLM."""
    last_message = state["messages"][-1]
    tool_outputs = []

    for tool_call in last_message.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        try:
            if tool_name == "retrieve_memory":
                result = retrieve_memory.invoke(tool_args)
            elif tool_name == "retrieve_version":
                result = retrieve_version.invoke(tool_args)
            elif tool_name == "search_entities":
                result = search_entities.invoke(tool_args)
            elif tool_name == "store_memory":
                result = store_memory.invoke(tool_args)

            tool_outputs.append(
                ToolMessage(
                    content=str(result),
                    tool_call_id=tool_call["id"]
                )
            )
        except Exception as e:
            tool_outputs.append(
                ToolMessage(
                    content=f"Error: {str(e)}",
                    tool_call_id=tool_call["id"]
                )
            )

    return {"messages": tool_outputs}

def save_interaction(state: AgentState):
    """Save the interaction to versioned memory."""
    conversation_id = state["conversation_id"]
    last_message = state["messages"][-1]

    # Store agent response
    if isinstance(last_message, AIMessage):
        version = write_conversation_turn(
            conversation_id,
            "assistant",
            last_message.content
        )
        return {"current_version": version}

    return {}

def should_continue(state: AgentState):
    """Determine if more tool calls are needed."""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "continue"
    return "save"

# Build the graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("load_memory", load_memory)
workflow.add_node("agent", call_model)
workflow.add_node("tools", execute_tools)
workflow.add_node("save", save_interaction)

# Define edges
workflow.set_entry_point("load_memory")
workflow.add_edge("load_memory", "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "tools",
        "save": "save"
    }
)
workflow.add_edge("tools", "agent")
workflow.add_edge("save", END)

# Compile
app = workflow.compile()

The graph loads memory before each interaction and saves results afterward, maintaining version history.

Execute the Agent

Run the agent with automatic memory management.

python
from datetime import datetime
import uuid

def run_agent(user_input: str, conversation_id: str = None):
    """Run agent with versioned memory."""

    # Generate conversation ID if not provided
    if conversation_id is None:
        conversation_id = f"conv-{uuid.uuid4()}"

    # Store user input
    write_conversation_turn(conversation_id, "user", user_input)

    # Initialize state
    initial_state = {
        "messages": [HumanMessage(content=user_input)],
        "conversation_id": conversation_id,
        "current_version": 0,
        "memory_context": {},
        "entity_cache": {}
    }

    # Execute agent
    result = app.invoke(initial_state)

    # Extract response
    final_message = result["messages"][-1]
    version = result.get("current_version", 0)

    return {
        "response": final_message.content,
        "conversation_id": conversation_id,
        "version": version
    }

# Usage
response = run_agent(
    "What did we discuss about pricing last week?",
    conversation_id="conv-12345"
)

print(f"Response: {response['response']}")
print(f"Version: {response['version']}")

The agent retrieves relevant history, processes the query, and stores the interaction with version tracking.

Query Memory Across Versions

Access historical versions to analyze how context evolved over time.

python
def get_version_history(conversation_id: str):
    """Retrieve all versions for a conversation."""

    history = session.table("conversations").filter(
        fc.col("conversation_id") == conversation_id
    ).select(
        fc.col("version"),
        fc.col("timestamp"),
        fc.col("role"),
        fc.col("content"),
        fc.col("is_current")
    ).sort(
        fc.col("version").asc()
    )

    return history.to_pydict()

def compare_versions(conversation_id: str, v1: int, v2: int):
    """Compare two memory versions."""

    version1 = session.table("conversations").filter(
        (fc.col("conversation_id") == conversation_id) &
        (fc.col("version") == v1)
    ).to_pydict()

    version2 = session.table("conversations").filter(
        (fc.col("conversation_id") == conversation_id) &
        (fc.col("version") == v2)
    ).to_pydict()

    return {
        "version1": version1,
        "version2": version2,
        "comparison": {
            "entries_added": len(version2) - len(version1),
            "time_delta": version2[0]["timestamp"] - version1[0]["timestamp"]
        }
    }

# Analyze version history
history = get_version_history("conv-12345")
print(f"Total versions: {len(set([h['version'] for h in history]))}")

# Compare specific versions
comparison = compare_versions("conv-12345", v1=1, v2=5)
print(f"Entries added: {comparison['comparison']['entries_added']}")

Version queries enable temporal analysis, debugging agent behavior, and reasoning about how context influenced decisions.

Semantic Memory Search

Combine semantic operations with versioned memory for intelligent retrieval based on meaning rather than exact matches.

python
from pydantic import BaseModel
from typing import List

class MemoryRelevance(BaseModel):
    is_relevant: bool
    relevance_score: float
    key_topics: List[str]

def semantic_memory_search(query: str, conversation_id: str, threshold: float = 0.7):
    """Search memory using semantic similarity."""

    # Get current memory
    current_memory = session.table("conversations").filter(
        (fc.col("conversation_id") == conversation_id) &
        (fc.col("is_current") == True)
    )

    # Use semantic predicate to filter relevant entries
    relevant = current_memory.filter(
        fc.semantic.predicate(
            f"Is this memory relevant to the query: {query}? Content: {{{{content}}}}",
            content=fc.col("content")
        )
    )

    # Extract relevance information
    analyzed = relevant.select(
        fc.col("*"),
        fc.semantic.extract(
            fc.col("content"),
            MemoryRelevance
        ).alias("relevance")
    ).unnest("relevance")

    # Filter by threshold
    filtered = analyzed.filter(
        fc.col("relevance_score") >= threshold
    ).sort(
        fc.col("relevance_score").desc()
    )

    return filtered.to_pydict()

# Use semantic search
relevant_memories = semantic_memory_search(
    "pricing discussions from last week",
    conversation_id="conv-12345",
    threshold=0.8
)

Semantic search retrieves contextually relevant memories even when exact keywords don't match.

Monitor Memory Operations

Track memory usage and agent performance using Typedef's metrics system.

python
def analyze_memory_metrics():
    """Analyze memory system performance."""

    metrics_df = session.table("fenic_system.query_metrics")

    # Memory operation statistics
    memory_stats = session.sql("""
        SELECT
            DATE(end_ts) as date,
            COUNT(*) as total_operations,
            SUM(total_lm_requests) as llm_calls,
            SUM(total_lm_cost) as cost_usd,
            AVG(latency_ms) as avg_latency_ms
        FROM {metrics}
        WHERE query_text LIKE '%conversations%'
           OR query_text LIKE '%entities%'
        GROUP BY DATE(end_ts)
        ORDER BY date DESC
        LIMIT 30
    """, metrics=metrics_df)

    memory_stats.show()

    # Version distribution
    version_dist = session.table("conversations").group_by(
        fc.col("conversation_id")
    ).agg(
        fc.max(fc.col("version")).alias("max_version"),
        fc.count("*").alias("total_entries")
    ).sort(
        fc.col("max_version").desc()
    )

    version_dist.show(10)

# Run analytics
analyze_memory_metrics()

Metrics help optimize memory operations, identify bottlenecks, and control costs.

Production Deployment Strategies

Multi-Tier Memory Architecture

Implement different memory tiers for different retention policies.

python
def create_tiered_memory():
    """Create memory system with multiple tiers."""

    # Short-term memory (last 24 hours)
    short_term_schema = Schema([
        ColumnField("conversation_id", StringType),
        ColumnField("timestamp", TimestampType),
        ColumnField("content", StringType),
        ColumnField("expiry", TimestampType)
    ])

    session.catalog.create_table(
        "short_term_memory",
        short_term_schema,
        description="Recent interactions with automatic expiry"
    )

    # Long-term memory (consolidated summaries)
    long_term_schema = Schema([
        ColumnField("conversation_id", StringType),
        ColumnField("version", IntegerType),
        ColumnField("timestamp", TimestampType),
        ColumnField("summary", StringType),
        ColumnField("key_facts", StringType)
    ])

    session.catalog.create_table(
        "long_term_memory",
        long_term_schema,
        description="Consolidated conversation summaries"
    )

def consolidate_to_long_term(conversation_id: str):
    """Consolidate short-term memory into long-term storage."""

    # Get recent interactions
    recent = session.table("short_term_memory").filter(
        fc.col("conversation_id") == conversation_id
    )

    # Generate summary using semantic reduce
    consolidated = recent.group_by(
        fc.col("conversation_id")
    ).agg(
        fc.semantic.reduce(
            "Create a comprehensive summary of these interactions, extracting key facts and decisions",
            fc.col("content")
        ).alias("summary")
    )

    # Store in long-term memory
    consolidated.write.save_as_table("long_term_memory", mode="append")

Cloud Execution

Scale the memory system with cloud execution.

python
from fenic.api.session.config import CloudConfig, CloudExecutorSize

production_config = SessionConfig(
    app_name="production_agent_memory",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=500,
                tpm=500000
            )
        }
    ),
    cloud=CloudConfig(
        size=CloudExecutorSize.LARGE
    )
)

# Same code runs locally and in cloud
production_session = Session.get_or_create(production_config)

The same code scales from laptop to production without modifications.

Best Practices

Version Management

Keep version numbers sequential and track them consistently. Use the is_current flag for fast current-state queries while maintaining complete history.

Memory Pruning

Implement retention policies to manage storage growth. Archive old versions to cold storage while keeping recent versions in hot storage.

python
def archive_old_versions(conversation_id: str, keep_versions: int = 10):
    """Archive versions beyond retention policy."""

    all_versions = session.table("conversations").filter(
        fc.col("conversation_id") == conversation_id
    ).select(
        fc.col("version")
    ).sort(
        fc.col("version").desc()
    )

    versions_to_archive = all_versions.limit(keep_versions, offset=keep_versions)

    # Move to archive table
    archived = session.table("conversations").filter(
        fc.col("version").isin(versions_to_archive.to_pydict()["version"])
    )

    archived.write.save_as_table("conversations_archive", mode="append")

Error Handling

Wrap all memory operations in try-catch blocks to handle network failures, timeout issues, and constraint violations.

Consistency

Use timestamps consistently across all memory tables to enable accurate temporal queries and version comparisons.

Testing

Test version rollback scenarios and ensure agents handle missing or corrupted memory appropriately.

Support Agent Implementation

Build a customer support agent with versioned memory.

python
from pydantic import BaseModel
from typing import List

class CustomerContext(BaseModel):
    customer_tier: str
    previous_issues: List[str]
    sentiment: str

def build_support_agent():
    """Build a customer support agent with versioned memory."""

    @tool
    def analyze_customer_history(customer_id: str) -> dict:
        """Analyze customer interaction history."""

        history = session.table("conversations").filter(
            (fc.col("conversation_id").contains(customer_id)) &
            (fc.col("is_current") == True)
        )

        # Extract structured context
        analyzed = history.select(
            fc.col("*"),
            fc.semantic.extract(
                fc.col("content"),
                CustomerContext
            ).alias("context")
        ).unnest("context")

        return analyzed.to_pydict()

    @tool
    def escalate_issue(conversation_id: str, reason: str) -> dict:
        """Escalate issue and update memory."""

        # Mark escalation in metadata
        version = write_conversation_turn(
            conversation_id,
            "system",
            f"Issue escalated: {reason}",
            metadata='{"escalated": true, "reason": "' + reason + '"}'
        )

        return {"status": "escalated", "version": version}

    # Build agent with memory tools
    support_tools = [analyze_customer_history, escalate_issue, retrieve_memory, store_memory]
    support_llm = llm.bind_tools(support_tools)

    return support_llm

# Use the agent
support_agent = build_support_agent()

Implementation Resources

Build LangGraph agents with versioned memory on Typedef:

The combination of versioned memory and agent orchestration enables AI applications that learn from history and maintain context across extended interactions. How to Build LangGraph Agen ... fcf080b4b94ac8af2ce65f9f.md External Displaying How to Build LangGraph Agents That Read Write Vers 295df41efcf080b4b94ac8af2ce65f9f.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.