<< goback()

How to Augment LangChain Memory with Fenic's Persistent Data Abstractions

Typedef Team

How to Augment LangChain Memory with Fenic's Persistent Data Abstractions

LangChain agents need persistent memory that survives restarts, supports semantic search, and enables SQL queries. Fenic provides this through its catalog system—a persistent, queryable storage layer built on DataFrame abstractions.

This guide shows how to build production-grade memory for LangChain using Fenic's catalog, semantic operators, and Model Context Protocol (MCP) integration.

The Memory Problem in LangChain

LangChain agents face three critical memory limitations:

  • No persistence - Conversation history disappears on restart
  • Unstructured storage - Plain text or JSON blobs make queries like "find Q3 pricing discussions" impossible
  • Limited semantic capabilities - Finding conversations by meaning requires separate vector stores and custom pipelines

Fenic solves this with catalog tables that provide:

  • Persistent storage across sessions (local or cloud)
  • Full SQL query support for temporal and relational analysis
  • Native semantic operations (embeddings, similarity search, clustering)
  • Type-safe schemas with automatic validation

Catalog Architecture for Persistent Memory

The catalog provides persistent, queryable storage through three core components:

Database organization - Multiple databases, each containing tables and views for logical separation of memory types (conversations, entities, tasks).

Schema enforcement - Typed columns ensure data consistency and enable efficient querying without parsing unstructured text.

SQL interface - Query memory using standard SQL operations: filters, joins, aggregations, window functions.

Unlike temporary DataFrames that exist only during execution, catalog tables persist in local SQLite databases or cloud storage.

Setting Up Persistent Storage

Install Fenic:

bash
pip install fenic

Configure a session with persistent storage:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel
from pathlib import Path

config = SessionConfig(
    app_name="langchain_memory",
    db_path=Path("./agent_memory.db"),
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

The db_path parameter creates a SQLite database file. All catalog data persists here across application restarts.

Creating Memory Schemas

Define schemas for different memory types:

python
from fenic.core.types.datatypes import StringType, TimestampType
from fenic.core.types.schema import Schema, ColumnField

# Conversation history
conversation_schema = Schema([
    ColumnField("conversation_id", StringType),
    ColumnField("timestamp", TimestampType),
    ColumnField("role", StringType),
    ColumnField("content", StringType),
    ColumnField("metadata", StringType)
])

session.catalog.create_table(
    "conversations",
    conversation_schema,
    description="Agent conversation history with timestamps"
)

# Entity memory
entity_schema = Schema([
    ColumnField("entity_id", StringType),
    ColumnField("entity_type", StringType),
    ColumnField("name", StringType),
    ColumnField("attributes", StringType),
    ColumnField("last_updated", TimestampType)
])

session.catalog.create_table(
    "entities",
    entity_schema,
    description="Extracted entities from conversations"
)

Schema benefits:

  • Type safety prevents data corruption
  • Explicit structure makes queries predictable
  • Descriptions provide self-documentation
  • Schema evolution supports adding columns without migration

Writing Memory Data

Store conversation turns using DataFrame writes:

python
from datetime import datetime
import fenic.api.functions as fc

def store_conversation_turn(conversation_id, role, content, metadata=None):
    df = session.create_dataframe([{
        "conversation_id": conversation_id,
        "timestamp": datetime.now(),
        "role": role,
        "content": content,
        "metadata": metadata or "{}"
    }])

    df.write.save_as_table("conversations", mode="append")

def store_entity(entity_id, entity_type, name, attributes):
    df = session.create_dataframe([{
        "entity_id": entity_id,
        "entity_type": entity_type,
        "name": name,
        "attributes": attributes,
        "last_updated": datetime.now()
    }])

    df.write.save_as_table("entities", mode="append")

Write modes:

  • mode="append" - Add new rows
  • mode="overwrite" - Replace entire table
  • Upsert logic requires SQL-based implementation

Reading Memory from Catalog

Retrieve conversation history with DataFrame operations:

python
# Load catalog table
conversations_df = session.table("conversations")

# Filter by conversation ID
conversation_history = conversations_df.filter(
    fc.col("conversation_id") == "user-123-session-456"
).sort(
    fc.col("timestamp").asc()
)

# Get recent messages
recent_messages = conversation_history.limit(10)

# Convert to list
messages = recent_messages.select(
    fc.col("role"),
    fc.col("content")
).collect()

The collect() method returns Python dictionaries for LangChain message history population.

Semantic Operations on Memory

Embedding Generation

Add embeddings for similarity-based retrieval:

python
from fenic.api.functions import semantic

conversations_df = session.table("conversations")

conversations_with_embeddings = conversations_df.with_column(
    "content_embedding",
    semantic.embed(fc.col("content"))
)

conversations_with_embeddings.write.save_as_table(
    "conversations_with_embeddings",
    mode="overwrite"
)

Semantic Similarity Search

Find conversations by meaning:

python
from fenic.api.functions import embedding

# Generate query embedding
query = "pricing and enterprise features discussion"
query_df = session.create_dataframe([{"query": query}])
query_embedding = query_df.select(
    semantic.embed(fc.col("query")).alias("query_emb")
).collect()[0]["query_emb"]

# Find similar conversations
embedded_conversations = session.table("conversations_with_embeddings")

similar_conversations = embedded_conversations.select(
    fc.col("conversation_id"),
    fc.col("content"),
    fc.col("timestamp"),
    embedding.compute_similarity(
        fc.col("content_embedding"),
        query_embedding,
        metric="cosine"
    ).alias("similarity_score")
).filter(
    fc.col("similarity_score") > 0.7
).sort(
    fc.col("similarity_score").desc()
).limit(5)

Results show the five most semantically relevant conversations.

Topic Clustering

Automatically group conversations:

python
embedded_df = session.table("conversations_with_embeddings")

clustered_conversations = embedded_df.semantic.with_cluster_labels(
    by=fc.col("content_embedding"),
    num_clusters=10,
    label_column="topic_cluster",
    centroid_column="cluster_centroid"
)

clustered_conversations.write.save_as_table(
    "conversation_topics",
    mode="overwrite"
)

# Analyze clusters
cluster_summary = session.table("conversation_topics").group_by(
    "topic_cluster"
).agg(
    fc.count("*").alias("message_count"),
    fc.first(fc.col("content")).alias("sample_message")
)

Clustering enables topic-based retrieval without manual categorization.

Creating MCP Tools for Memory Access

MCP tools expose Fenic operations to LangChain agents through a clean, callable interface.

Defining Memory Retrieval Tools

Create parameterized tools for common operations:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

# Recent conversation history tool
recent_conversations_query = (
    session.table("conversations")
    .filter(
        fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.coalesce(
        fc.tool_param("limit", IntegerType),
        fc.lit(10)
    ))
)

session.catalog.create_tool(
    tool_name="get_recent_conversations",
    tool_description="Retrieve recent conversation history for a session",
    tool_query=recent_conversations_query,
    result_limit=100,
    tool_params=[
        ToolParam(
            name="conversation_id",
            description="The conversation session ID"
        ),
        ToolParam(
            name="limit",
            description="Maximum number of messages to retrieve",
            has_default=True,
            default_value=10
        )
    ]
)

# Entity search tool
entity_search_query = (
    session.table("entities")
    .filter(
        fc.col("name").contains(fc.tool_param("search_term", StringType)) |
        fc.col("attributes").contains(fc.tool_param("search_term", StringType))
    )
)

session.catalog.create_tool(
    tool_name="search_entities",
    tool_description="Search entities by name or attributes",
    tool_query=entity_search_query,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="search_term",
            description="Term to search in entity names or attributes"
        )
    ]
)

Tool features:

  • Type-safe parameters with validation
  • Default values for optional parameters
  • Result limits to prevent overwhelming agents
  • Declarative queries optimize automatically

Launching the MCP Server

Expose tools through HTTP:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.api.mcp.tools import SystemToolConfig

tools = session.catalog.list_tools()

server = create_mcp_server(
    session,
    "LangChain Memory Server",
    user_defined_tools=tools,
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="memory",
        max_result_rows=100
    ),
    concurrency_limit=10
)

run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

System tools automatically generate:

  • Schema inspection tools
  • SQL execution tools
  • Data profiling tools
  • Regex search across text columns

For more on MCP server configuration, see the Fenic 0.4.0 release.

Integrating with LangChain

Creating LangChain Tools

Wrap MCP calls in LangChain tool format:

python
from langchain.tools import Tool
import requests

def call_fenic_tool(tool_name: str, **params):
    response = requests.post(
        "http://127.0.0.1:8000/mcp",
        json={
            "tool": tool_name,
            "parameters": params
        }
    )
    return response.json()

retrieve_memory_tool = Tool(
    name="retrieve_memory",
    func=lambda conversation_id, limit=10: call_fenic_tool(
        "get_recent_conversations",
        conversation_id=conversation_id,
        limit=limit
    ),
    description="Retrieve conversation history from persistent memory"
)

search_entities_tool = Tool(
    name="search_entities",
    func=lambda search_term: call_fenic_tool(
        "search_entities",
        search_term=search_term
    ),
    description="Search for entities by name or attributes"
)

Building Memory-Aware Agents

Create an agent with persistent context:

python
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

llm = ChatOpenAI(model="gpt-4", temperature=0)

prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an agent with persistent memory access.

    Use retrieve_memory to recall previous conversations.
    Use search_entities to find information about people, companies, or topics.
    Always check memory before responding for context-aware answers.
    """),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

tools = [retrieve_memory_tool, search_entities_tool]
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

conversation_id = "user-123-session-456"
response = agent_executor.invoke({
    "input": "What did we discuss about pricing last week?",
    "chat_history": [],
    "conversation_id": conversation_id
})

print(response["output"])

The agent queries Fenic's memory layer automatically to retrieve context.

Advanced Memory Patterns

Temporal Queries

Analyze conversation patterns over time:

python
from fenic.api.functions import dt

conversations_df = session.table("conversations")

# Get last week's conversations grouped by day
recent_week = conversations_df.filter(
    fc.col("timestamp") >= dt.date_sub(dt.current_timestamp(), 7)
).group_by(
    dt.date_trunc("day", fc.col("timestamp")).alias("date")
).agg(
    fc.count("*").alias("message_count")
)

recent_week.show()

Temporal operations:

  • dt.date_trunc() - Round timestamps to hour/day/week/month
  • dt.date_sub() - Calculate date offsets
  • dt.date_add() - Add time intervals
  • Window functions for time-series analysis

Semantic Memory Joins

Link related conversations across sessions:

python
conversation_1 = conversations_df.filter(
    fc.col("conversation_id") == "session-1"
).select(
    fc.col("content").alias("session_1_content")
)

conversation_2 = conversations_df.filter(
    fc.col("conversation_id") == "session-2"
).select(
    fc.col("content").alias("session_2_content")
)

related_discussions = conversation_1.semantic.join(
    other=conversation_2,
    predicate="""
    Session 1: {{ left_on }}
    Session 2: {{ right_on }}

    These messages discuss the same topic or decision.
    """,
    left_on=fc.col("session_1_content"),
    right_on=fc.col("session_2_content")
)

Semantic joins find conversations about the same topic without exact keyword matches.

Structured Entity Extraction

Transform conversations into structured entities:

python
from pydantic import BaseModel, Field

class ConversationEntity(BaseModel):
    entity_type: str = Field(description="Type: person, company, product, topic")
    name: str = Field(description="Entity name")
    context: str = Field(description="How this entity was discussed")
    sentiment: str = Field(description="Sentiment: positive, neutral, negative")

conversations_df = session.table("conversations")

extracted_entities = conversations_df.select(
    fc.col("conversation_id"),
    fc.col("timestamp"),
    semantic.extract(
        fc.col("content"),
        response_format=ConversationEntity
    ).alias("entity")
).filter(
    fc.col("entity").is_not_null()
)

extracted_entities.select(
    fc.col("entity.entity_type").alias("entity_type"),
    fc.col("entity.name").alias("name"),
    fc.col("entity.context").alias("attributes"),
    fc.col("timestamp").alias("last_updated")
).write.save_as_table("entities", mode="append")

This automatically populates entity memory from conversation history.

For more on extraction patterns, see enhancing LangChain agents with semantic pipelines.

Memory Summarization

Compress old conversations to save context window space:

python
old_conversations = session.table("conversations").filter(
    fc.col("timestamp") < dt.date_sub(dt.current_timestamp(), 30)
).group_by("conversation_id").agg(
    semantic.reduce(
        "Summarize key points from this conversation history",
        fc.col("content"),
        order_by=fc.col("timestamp")
    ).alias("summary")
)

summary_schema = Schema([
    ColumnField("conversation_id", StringType),
    ColumnField("summary", StringType),
    ColumnField("summary_date", TimestampType)
])

session.catalog.create_table(
    "conversation_summaries",
    summary_schema,
    description="Compressed summaries of old conversations"
)

old_conversations.with_column(
    "summary_date",
    fc.lit(datetime.now())
).write.save_as_table("conversation_summaries", mode="append")

Agents retrieve summaries for old conversations instead of full message histories.

Production Optimization

Batch Memory Writes

Reduce overhead with batched writes:

python
def store_conversation_batch(turns):
    df = session.create_dataframe(turns)
    df.write.save_as_table("conversations", mode="append")

# Accumulate turns
batch = []
for turn in conversation_turns:
    batch.append({
        "conversation_id": conversation_id,
        "timestamp": turn.timestamp,
        "role": turn.role,
        "content": turn.content,
        "metadata": turn.metadata
    })

# Write periodically
if len(batch) >= 100:
    store_conversation_batch(batch)
    batch = []

Batch benefits:

  • Reduced I/O overhead
  • Better throughput for high-volume applications
  • Lower latency per write operation
  • Efficient transaction handling

Query Performance

Optimize queries with indexed columns:

python
# Fast: Filter on indexed conversation_id
fast_query = session.table("conversations").filter(
    fc.col("conversation_id") == "session-123"
)

# Slower: Full table scan on content
slow_query = session.table("conversations").filter(
    fc.col("content").contains("pricing")
)

Performance tips:

  • Primary keys get automatic indexes
  • Filter on indexed columns first
  • Use embeddings for semantic search instead of text scanning
  • Pre-compute embeddings during writes, not reads

Metrics and Monitoring

Track memory operations:

python
metrics_df = session.table("fenic_system.query_metrics")

memory_stats = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        COUNT(*) as query_count,
        SUM(total_lm_cost) as total_cost
    FROM {metrics}
    WHERE query_text LIKE '%conversations%'
    GROUP BY hour
    ORDER BY hour DESC
    LIMIT 24
""", metrics=metrics_df)

memory_stats.show()

Metrics include:

  • Query count and frequency
  • LLM token usage and costs
  • Query latency and execution time
  • Operator-level performance breakdown

Cloud Deployment

Scale with cloud execution:

python
from fenic.api.session.config import CloudConfig, CloudExecutorSize

config = SessionConfig(
    app_name="langchain_memory",
    cloud=CloudConfig(
        size=CloudExecutorSize.MEDIUM
    ),
    semantic=SemanticConfig(...)
)

Cloud features:

  • Automatic scaling based on workload
  • Distributed query execution
  • Shared catalog across instances
  • Same DataFrame API, different backend

Complete Implementation

End-to-end example:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType, TimestampType
from fenic.core.types.schema import Schema, ColumnField
import fenic.api.functions as fc
from pathlib import Path
from datetime import datetime

config = SessionConfig(
    app_name="production_memory",
    db_path=Path("./memory.db"),
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

conversation_schema = Schema([
    ColumnField("conversation_id", StringType),
    ColumnField("timestamp", TimestampType),
    ColumnField("role", StringType),
    ColumnField("content", StringType),
    ColumnField("metadata", StringType)
])

session.catalog.create_table(
    "conversations",
    conversation_schema,
    description="Persistent conversation history"
)

def store_conversation(conversation_id, role, content):
    df = session.create_dataframe([{
        "conversation_id": conversation_id,
        "timestamp": datetime.now(),
        "role": role,
        "content": content,
        "metadata": "{}"
    }])
    df.write.save_as_table("conversations", mode="append")

recent_conversations = (
    session.table("conversations")
    .filter(
        fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.tool_param("limit", IntegerType))
)

session.catalog.create_tool(
    tool_name="get_memory",
    tool_description="Retrieve conversation memory",
    tool_query=recent_conversations,
    tool_params=[
        ToolParam(name="conversation_id", description="Session ID"),
        ToolParam(name="limit", description="Message limit", has_default=True, default_value=10)
    ]
)

tools = session.catalog.list_tools()
server = create_mcp_server(
    session,
    "Memory Server",
    user_defined_tools=tools
)

run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,
    port=8000
)

This implementation provides:

  • Persistent storage with automatic schema validation
  • Semantic search capabilities for context-aware retrieval
  • MCP tools for LangChain integration
  • Production-ready error handling and metrics

Key Takeaways

Fenic's catalog system solves LangChain's memory persistence problem through:

  • Persistent tables - Data survives restarts and scales across distributed deployments
  • SQL queries - Full relational operations on conversation history
  • Semantic operations - Find relevant memory by meaning, not keywords
  • MCP integration - Clean interfaces between LangChain orchestration and memory layer
  • Type safety - Schema enforcement prevents data corruption

The catalog treats memory as structured data with semantic capabilities. This enables LangChain applications to maintain rich, queryable context from prototype to production without separate vector stores or custom state management.

For more patterns, see using Fenic as a data layer in LangChain and building reliable AI pipelines. How to Augment LangChain Me ... fcf080b7a614fd2120e6f1b6.md External Displaying How to Augment LangChain Memory with Fenic's Persi 28fdf41efcf080b7a614fd2120e6f1b6.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.