<< goback()

How to Give Mastra Pipelines Durable, Queryable Memory Using Typedef's Catalog

Typedef Team

How to Give Mastra Pipelines Durable, Queryable Memory Using Typedef's Catalog

AI agents need persistent memory to maintain context across sessions and make informed decisions. While Mastra provides an excellent TypeScript framework for building agentic workflows, pairing it with Typedef's Catalog through Model Context Protocol (MCP) creates a production-grade memory layer with SQL queryability and DataFrame operations.

This guide demonstrates how to build a memory-backed Mastra pipeline using Typedef's persistent catalog as the storage layer.

Overview of the Architecture

Mastra runs in TypeScript and manages agent workflows, while Typedef's Fenic provides Python-based DataFrame operations with a persistent catalog. The integration works through MCP servers that expose Typedef's catalog operations as tools that Mastra agents can call.

Key Components

Typedef's Catalog - Persistent storage system for structured data with full SQL support

  • Tables persist across sessions in local or cloud storage
  • Built-in metadata management and descriptions
  • Automatic type handling and schema evolution

Fenic MCP Server - Exposes catalog operations as callable tools

  • Read, write, and query operations
  • Profile and analyze data
  • Search across text columns

Mastra Workflows - Orchestrate agent tasks with memory persistence

  • Call MCP tools to store and retrieve data
  • Maintain conversation context
  • Build deterministic multi-step pipelines

Setting Up Typedef's Catalog

Start by installing Fenic and configuring a session with persistent storage.

python
pip install --upgrade fenic

Create a Python file for your MCP server (fenic_server.py):

python
from fenic.api.session.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync
from fenic.api.mcp.tools import SystemToolConfig
from pathlib import Path

# Configure session with persistent storage
config = SessionConfig(
    app_name="mastra_memory",
    db_path=Path("./memory.db"),  # Persistent local database
    semantic=SemanticConfig(
        language_models={
            "openai": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "openai": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# Create catalog structure
session.catalog.create_database("agent_memory")
session.catalog.set_current_database("agent_memory")

Creating Memory Tables

Define tables to store different types of agent memory.

python
from fenic.core.types.datatypes import StringType, IntegerType, TimestampType
from fenic.core.types.schema import Schema, ColumnField

# Conversation history table
conversation_schema = Schema([
    ColumnField("conversation_id", StringType),
    ColumnField("timestamp", TimestampType),
    ColumnField("role", StringType),
    ColumnField("content", StringType),
    ColumnField("metadata", StringType)
])

session.catalog.create_table(
    "conversations",
    conversation_schema,
    description="Stores all agent conversation history with timestamps and metadata"
)

# Entity memory table
entity_schema = Schema([
    ColumnField("entity_id", StringType),
    ColumnField("entity_type", StringType),
    ColumnField("name", StringType),
    ColumnField("attributes", StringType),
    ColumnField("last_updated", TimestampType)
])

session.catalog.create_table(
    "entities",
    entity_schema,
    description="Stores extracted entities and their attributes from conversations"
)

# Task memory table
task_schema = Schema([
    ColumnField("task_id", StringType),
    ColumnField("description", StringType),
    ColumnField("status", StringType),
    ColumnField("created_at", TimestampType),
    ColumnField("completed_at", TimestampType)
])

session.catalog.create_table(
    "tasks",
    task_schema,
    description="Tracks tasks mentioned or created during agent interactions"
)

Writing Data to the Catalog

Store conversation data and context using Fenic DataFrames.

python
from datetime import datetime
import fenic.api.functions.core as fc

def store_conversation_turn(conversation_id, role, content, metadata=None):
    """Store a single conversation turn in the catalog."""
    df = session.create_dataframe({
        "conversation_id": [conversation_id],
        "timestamp": [datetime.now()],
        "role": [role],
        "content": [content],
        "metadata": [metadata or "{}"]
    })

    df.write.save_as_table("conversations", mode="append")

def store_entity(entity_id, entity_type, name, attributes):
    """Store or update an entity in memory."""
    df = session.create_dataframe({
        "entity_id": [entity_id],
        "entity_type": [entity_type],
        "name": [name],
        "attributes": [attributes],
        "last_updated": [datetime.now()]
    })

    df.write.save_as_table("entities", mode="append")

Creating Queryable Tools

Register catalog-backed tools that Mastra can call through MCP.

python
from fenic.core.mcp.types import ToolParam
import fenic.api.functions.core as fc
from fenic.core.types.datatypes import StringType, IntegerType

# Tool to retrieve recent conversations
recent_conversations = (
    session.table("conversations")
    .filter(
        fc.col("conversation_id") == fc.tool_param("conversation_id", StringType)
    )
    .sort(fc.col("timestamp").desc())
    .limit(fc.coalesce(fc.tool_param("limit", IntegerType), fc.lit(10)))
)

session.catalog.create_tool(
    tool_name="get_recent_conversations",
    tool_description="Retrieves recent conversation history for a specific conversation ID",
    tool_query=recent_conversations,
    result_limit=100,
    tool_params=[
        ToolParam(
            name="conversation_id",
            description="The conversation ID to retrieve history for"
        ),
        ToolParam(
            name="limit",
            description="Maximum number of messages to retrieve",
            has_default=True,
            default_value=10
        )
    ]
)

from fenic.core.types.datatypes import StringType

# Tool to search for entities
entity_search = (
    session.table("entities")
    .filter(
        fc.col("name").contains(fc.tool_param("search_term", StringType)) |
        fc.col("attributes").contains(fc.tool_param("search_term", StringType))
    )
)

session.catalog.create_tool(
    tool_name="search_entities",
    tool_description="Search for entities by name or attributes",
    tool_query=entity_search,
    result_limit=50,
    tool_params=[
        ToolParam(
            name="search_term",
            description="Term to search for in entity names or attributes"
        )
    ]
)

Running the MCP Server

Launch the Fenic MCP server to expose catalog tools.

python
# Get all tools from catalog
tools = session.catalog.list_tools()

# Create MCP server with system tools for automatic operations
server = create_mcp_server(
    session,
    "Mastra Memory Server",
    user_defined_tools=tools,
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="memory",
        max_result_rows=100
    ),
    concurrency_limit=10
)

# Run server on HTTP
run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,  # Keep state across requests
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Alternatively, use the CLI for quick server startup:

bash
fenic-serve --transport http --port 8000 --host 127.0.0.1 --tools get_recent_conversations search_entities

Connecting Mastra to the MCP Server

In your Mastra application, configure MCP tool integration to call the Fenic server.

tsx
import { Agent } from '@mastra/core/agent';
import { createTool } from '@mastra/core/tools';
import { z } from 'zod';

// MCP client tool that calls Fenic server
const storeConversationTool = createTool({
  id: 'store-conversation',
  description: 'Stores conversation history in persistent memory',
  inputSchema: z.object({
    conversation_id: z.string(),
    role: z.enum(['user', 'assistant']),
    content: z.string(),
    metadata: z.string().optional()
  }),
  execute: async ({ context }) => {
    const response = await fetch('http://127.0.0.1:8000/mcp', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        tool: 'memory_analyze',
        params: {
          query: `INSERT INTO conversations VALUES ('${context.conversation_id}', NOW(), '${context.role}', '${context.content}', '${context.metadata || "{}"}')`
        }
      })
    });

    return await response.json();
  }
});

const retrieveMemoryTool = createTool({
  id: 'retrieve-memory',
  description: 'Retrieves conversation history from persistent memory',
  inputSchema: z.object({
    conversation_id: z.string(),
    limit: z.number().default(10)
  }),
  execute: async ({ context }) => {
    const response = await fetch('http://127.0.0.1:8000/mcp', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        tool: 'get_recent_conversations',
        params: {
          conversation_id: context.conversation_id,
          limit: context.limit
        }
      })
    });

    const data = await response.json();
    return data.results;
  }
});

Building a Memory-Aware Mastra Agent

Create an agent that uses persistent memory for context.

tsx
import { openai } from '@ai-sdk/openai';

const memoryAgent = new Agent({
  name: 'Memory Agent',
  instructions: `
    You are an agent with access to persistent memory.
    Always store important conversation details and retrieve relevant context.
    Use the store-conversation tool after each interaction.
    Use the retrieve-memory tool to recall previous conversations.
  `,
  model: openai('gpt-4-turbo'),
  tools: {
    storeConversation: storeConversationTool,
    retrieveMemory: retrieveMemoryTool
  }
});

// Use the agent with automatic memory persistence
const conversationId = 'user-123-session-456';

const response = await memoryAgent.generate(
  'What did we discuss last time?',
  {
    context: {
      conversation_id: conversationId
    }
  }
);

Advanced Pattern: Semantic Memory Search

Combine Typedef's semantic operations with catalog storage for intelligent memory retrieval.

python
# Enable semantic operations in session config
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.api.session.config import OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.functions import semantic
from pathlib import Path

config = SessionConfig(
    app_name="mastra_memory",
    db_path=Path("./memory.db"),
    semantic=SemanticConfig(
        language_models={
            "openai": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "openai": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="openai",
        default_embedding_model="openai"
    )
)

session = Session.get_or_create(config)

# Create semantic search tool
conversations_df = session.table("conversations")

# Semantic clustering for conversation topics
# First, add embeddings to the conversations
conversations_with_embeddings = conversations_df.with_column(
    "content_embeddings",
    semantic.embed(fc.col("content"))
)

# Cluster conversations by semantic similarity
clustered = conversations_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("content_embeddings"),
    num_clusters=5,
    label_column="cluster_label"
)

clustered.write.save_as_table("conversation_topics", mode="overwrite")

# Note: For semantic similarity search with dynamic query parameters,
# you would need to compute embeddings at query time, which requires
# a different approach than declarative DataFrame tools.
# Consider using the system tools with SQL queries that reference
# pre-computed embedding columns, or implement custom search logic
# in your MCP tool handler.

Monitoring and Analytics

Track agent memory usage with Typedef's built-in metrics.

python
# Query metrics to analyze memory operations
metrics_df = session.table("fenic_system.query_metrics")

memory_stats = session.sql("""
    SELECT
        DATE(end_ts) as date,
        COUNT(*) as total_queries,
        SUM(total_lm_requests) as llm_calls,
        SUM(total_lm_cost) as total_cost
    FROM {metrics}
    WHERE query_text LIKE '%conversations%'
    GROUP BY DATE(end_ts)
    ORDER BY date DESC
""", metrics=metrics_df)

memory_stats.show()

Production Deployment

For production environments, use ASGI deployment with state management.

python
from fenic.api.mcp.server import run_mcp_server_asgi

# Create ASGI app
app = run_mcp_server_asgi(
    server,
    stateless_http=False,
    port=8000,
    host="0.0.0.0",
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn fenic_server:app --host 0.0.0.0 --port 8000

Benefits of This Architecture

Durability - Catalog data persists across agent restarts and deployments

Queryability - Full SQL support for advanced memory queries and analytics

Type Safety - Schema enforcement ensures data consistency

Scalability - Cloud execution support for distributed processing

Observability - Built-in metrics tracking for all memory operations

Semantic Operations - Native support for embeddings and semantic search

Learn More

By combining Mastra's workflow orchestration with Typedef's persistent catalog, you build AI agents with production-grade memory that scales from prototype to production.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.