<< goback()

How to Augment LangChain Agents with Typedef.ai Preprocessing Layers

Typedef Team

How to Augment LangChain Agents with Typedef.ai Preprocessing Layers

LangChain agents excel at reasoning and orchestration, but their effectiveness depends entirely on the quality and structure of data they can access. Raw unstructured data creates friction—agents waste tokens parsing formats, struggle with context window limits, and produce inconsistent results.

Typedef.ai solves this with Fenic, a PySpark-inspired DataFrame framework that provides semantic preprocessing infrastructure for AI applications. By handling data preparation, enrichment, and structuring before agents interact with it, Fenic transforms brittle LLM pipelines into production-grade systems.

The Data Quality Problem in Agent Workflows

LangChain agents face three core challenges when working with real-world data:

Unstructured inputs require preprocessing: Documents, transcripts, CSVs, and logs need extraction, cleaning, and normalization before agents can reason effectively. Without preprocessing, agents spend valuable context window space on format parsing instead of decision-making.

Context window limitations: Agents need pre-filtered, relevant data rather than entire datasets. Even with extended context windows, sending raw data to agents is inefficient and expensive.

Inconsistent tool interfaces: Custom Python functions for data access create maintenance overhead and lack type safety. Each function becomes a potential failure point in production.

What is Fenic

Fenic is an open-source DataFrame framework built specifically for AI workflows. It provides:

  • DataFrame operations with AI-native types for standard transformations that work alongside semantic functions
  • Semantic operators for extraction, classification, clustering, and semantic joins with automatic embedding and LLM call management
  • Model Context Protocol integration for declarative tool creation that exposes DataFrames as callable functions
  • Multi-provider inference with unified interfaces for OpenAI, Anthropic, Google, and other providers

The framework handles rate limiting, retries, and concurrent request batching automatically, allowing developers to focus on pipeline logic rather than infrastructure.

Setting Up Fenic for LangChain Integration

Install Fenic and configure a session with your preferred language models:

python
pip install fenic

Create a session with semantic capabilities:

python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="langchain_agent_tools",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Core Semantic Operations for Data Preparation

Semantic Extraction

Transform unstructured text into typed data structures using Pydantic models. This operation converts messy documents into queryable structured data:

python
from pydantic import BaseModel, Field
from typing import List
import fenic.api.functions as fc

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key product features")

# Load documents and extract structured data
df = session.read.docs("./product_descriptions/**/*.md",
                       content_type="markdown",
                       recursive=True)

df = df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ProductInfo
    ).alias("product_data")
)

The extracted data becomes queryable through standard DataFrame operations, enabling precise filtering and transformation before agents access it. More details available in this resource.

Semantic Joins

Join datasets based on meaning rather than exact string matches. Semantic joins evaluate natural language predicates between row pairs:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Create examples to guide the join
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience",
    right="Backend Developer - Python/Go",
    output=True
))

examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

# Perform semantic join
matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
        Job Requirements: {{ left_on }}
        Candidate Skills: {{ right_on }}
        The candidate meets the core requirements for this role.
    '''),
    left_on=fc.col("job_requirements"),
    right_on=fc.col("candidate_skills"),
    examples=examples
)

This creates high-quality matches that agents can use for recommendations without needing to evaluate every combination themselves.

Semantic Clustering

Group similar items automatically using embeddings and K-means clustering:

python
df_with_embeddings = df.select(
    fc.col("ticket_id"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

# Analyze clusters
cluster_summary = clustered_df.group_by("cluster_id").agg(
    fc.count("*").alias("ticket_count"),
    fc.first(fc.col("description")).alias("sample_description")
)

Agents can query pre-clustered data to identify patterns and trends without processing raw records.

Semantic Mapping

Transform text using LLM-powered generation with few-shot examples:

python
from fenic.core.types.semantic_examples import MapExample, MapExampleCollection

examples = MapExampleCollection()
examples.create_example(MapExample(
    input={"title": "User can't login", "body": "Getting 401 errors"},
    output="Authentication"
))

examples.create_example(MapExample(
    input={"title": "Slow page load", "body": "Dashboard takes 10s to load"},
    output="Performance"
))

df = df.select(
    fc.col("ticket_id"),
    fc.semantic.map(
        "Classify this support ticket: {{ title }} - {{ body }}",
        title=fc.col("title"),
        body=fc.col("body"),
        examples=examples
    ).alias("category")
)

This pre-classifies data, allowing agents to work with clean categorical variables instead of raw text.

Creating MCP Tools from Fenic DataFrames

The Model Context Protocol enables seamless integration between Fenic's data pipelines and LangChain agents. Create tools that agents can call to access processed data.

Building a Parameterized Tool

Define a DataFrame query with parameters using tool_param:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType, FloatType

# Load and prepare customer data
customers_df = session.read.csv("./data/customers.csv")

# Create parameterized query
search_query = customers_df.filter(
    fc.col("industry").contains(
        fc.tool_param("industry", StringType)
    ) &
    (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType))
).select(
    fc.col("company_name"),
    fc.col("contact_email"),
    fc.col("annual_revenue"),
    fc.col("industry")
)

# Register as a tool
session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search for customers by industry and minimum revenue threshold",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="industry",
            description="Industry sector to filter by (e.g., 'technology', 'healthcare')"
        ),
        ToolParam(
            name="min_revenue",
            description="Minimum annual revenue in USD",
            default_value=0,
            has_default=True
        )
    ],
    result_limit=50
)

Multi-Step Semantic Tool

Build sophisticated tools that chain semantic operations:

python
# Load support tickets
tickets_df = session.read.csv("./data/support_tickets.csv")

# Process with semantic operations
processed_tickets = tickets_df.select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="category_cluster"
)

# Create search tool with similarity matching
from fenic.core.types import EmbeddingType

similar_tickets_query = processed_tickets.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.tool_param("query_vector", EmbeddingType(dimensions=1536, embedding_model="text-embedding-3-small")),
        metric="cosine"
    ) > 0.7
).select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a given query embedding",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of the search query"
        )
    ],
    result_limit=10
)

Running an MCP Server for LangChain Integration

Deploy your Fenic tools as an MCP server that LangChain agents can connect to.

Synchronous Server Setup

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

# Get all tools from catalog
tools = session.catalog.list_tools()

# Create MCP server
server = create_mcp_server(
    session=session,
    server_name="CustomerDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

# Run synchronously
run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

For production environments, use the ASGI interface:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

session = Session.get_or_create(config)
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="ProductionDataServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000

CLI-Based Server

Use the fenic-serve command for rapid deployment:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_customers find_similar_tickets

# Use stdio transport for direct integration
fenic-serve --transport stdio

Integration Architecture Patterns

Pattern 1: Preprocessing Layer

Use Fenic to prepare data before LangChain processing:

Raw Data → Fenic (parse, structure, enrich) → LangChain (orchestration, decisions)

Fenic handles batch operations while LangChain manages interactive flows. This pattern works well for applications that need significant data preparation before agent interaction.

Pattern 2: MCP Integration

Expose Fenic operations as tools that LangChain agents call:

LangChain Agent → MCP Server (Fenic tools) → Structured Data

Agents access data analysis without leaving their execution context. More details in this guide.

Pattern 3: Hybrid Processing

Combine both approaches for advanced workflows:

Fenic (batch enrichment) → Data Store → LangChain (interactive) ↔ MCP (Fenic tools)

Advanced Semantic Pipeline Patterns

Hybrid Search with Embeddings

Combine semantic similarity with structured filters:

python
products_df = session.read.csv("./data/products.csv")

# Add embeddings
products_with_embeddings = products_df.select(
    fc.col("product_id"),
    fc.col("name"),
    fc.col("description"),
    fc.col("price"),
    fc.col("category"),
    fc.semantic.embed(fc.col("description")).alias("desc_embeddings")
)

from fenic.core.types import StringType, FloatType, EmbeddingType

hybrid_search = products_with_embeddings.filter(
    (fc.col("category") == fc.tool_param("category", StringType)) &
    (fc.col("price").between(
        fc.tool_param("min_price", FloatType),
        fc.tool_param("max_price", FloatType)
    ))
).with_column(
    "similarity_score",
    fc.embedding.compute_similarity(
        fc.col("desc_embeddings"),
        fc.tool_param("query_embedding", EmbeddingType(dimensions=1536, embedding_model="text-embedding-3-small")),
        metric="cosine"
    )
).filter(
    fc.col("similarity_score") > 0.6
).order_by(
    fc.col("similarity_score").desc()
)

session.catalog.create_tool(
    tool_name="hybrid_product_search",
    tool_description="Search products using category, price range, and semantic similarity",
    tool_query=hybrid_search,
    tool_params=[
        ToolParam(name="category", description="Product category"),
        ToolParam(name="min_price", description="Minimum price in USD"),
        ToolParam(name="max_price", description="Maximum price in USD"),
        ToolParam(name="query_embedding", description="Search query embedding vector")
    ],
    result_limit=20
)

Dynamic Data Enrichment

Process and enrich data on-demand using async UDFs:

python
import aiohttp
from fenic.api.functions import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

# Apply to DataFrame
enriched_df = reviews_df.select(
    fc.col("review_id"),
    fc.col("review_text"),
    analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis")
)

Multi-Source Data Aggregation

Combine data from multiple sources with semantic operations:

python
# Load from different sources
sales_df = session.read.csv("s3://my-bucket/sales/*.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown",
                                recursive=True)
support_df = session.read.parquet("hf://datasets/company/support/data/*.parquet")

# Extract structured feedback
feedback_structured = feedback_df.select(
    fc.semantic.extract(
        fc.col("content"),
        response_format=CustomerFeedback
    ).alias("feedback_data")
)

# Join sales with feedback using semantic join
combined = sales_df.semantic.join(
    feedback_structured,
    predicate="The customer {{ left_on }} mentioned product {{ right_on }}",
    left_on=fc.col("customer_email"),
    right_on=fc.col("feedback_data")["customer_contact"]
)

System Tool Configuration

Fenic provides automatic tool generation for common operations:

python
from fenic.api.mcp.tools import SystemToolConfig

# Save DataFrames as tables
products_df.write.save_as_table("products", mode="overwrite")
customers_df.write.save_as_table("customers", mode="overwrite")

# Add descriptions
session.catalog.set_table_description(
    "products",
    "Product catalog with descriptions, pricing, and availability"
)

session.catalog.set_table_description(
    "customers",
    "Customer database with contact information and purchase history"
)

# Create server with automatic system tools
server = create_mcp_server(
    session=session,
    server_name="AutomatedToolServer",
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="data",
        max_result_rows=100
    )
)

This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables. Agents can explore data without custom tool definitions.

Text Processing for Unstructured Data

Text Chunking Strategies

Proper chunking is critical for RAG applications. Fenic provides multiple strategies:

python
from fenic.api.functions import text

# Recursive token chunking - preserves structure at natural boundaries
chunked_df = docs_df.select(
    fc.col("file_path"),
    text.recursive_token_chunk(
        fc.col("content"),
        chunk_size=500,
        chunk_overlap_percentage=10
    ).alias("chunks")
)

# Explode chunks into individual rows
chunked_df = chunked_df.select(
    fc.col("file_path"),
    fc.col("chunks")
).explode("chunks").select(
    fc.col("file_path"),
    fc.col("chunks").alias("chunk_text")
)

Additional chunking methods include character-based and word-based chunking. Full implementation details in this resource.

Transcript Processing

Parse multiple transcript formats into unified schema:

python
from fenic.api.functions import text

# Load transcript files
df = session.read.docs(
    "transcripts/**/*.txt",
    content_type="markdown",
    recursive=True
)

# Parse SRT format
parsed_df = df.select(
    fc.col("file_path"),
    text.parse_transcript(fc.col("content"), "srt").alias("segments")
)

# Explode segments into rows
segments_df = parsed_df.select(
    fc.col("file_path"),
    fc.col("segments")
).explode("segments").select(
    fc.col("file_path"),
    fc.col("segments").index.alias("index"),
    fc.col("segments").speaker.alias("speaker"),
    fc.col("segments").start_time.alias("start_time"),
    fc.col("segments").end_time.alias("end_time"),
    fc.col("segments").content.alias("text")
)

Supported formats include SRT, WebVTT, and generic conversation formats. See transcript processing guide for complete examples.

Performance Optimization

Caching and Materialization

Save intermediate results to avoid recomputation:

python
# Cache expensive semantic operations
embeddings_df = documents_df.select(
    fc.col("doc_id"),
    fc.semantic.embed(fc.col("content")).alias("embeddings")
)

# Materialize to catalog
embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

# Reuse in multiple tools
cached_embeddings = session.table("document_embeddings")

Batch Processing

Process data in batches to control memory and API usage:

python
# Process large datasets in chunks
large_df = session.read.csv("./data/large_dataset.csv")

batch_size = 1000
offset = 0

while True:
    batch = large_df.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        fc.col("id"),
        fc.semantic.extract(fc.col("text"), response_format=Schema)
    )

    processed_batch.write.save_as_table(
        "processed_results",
        mode="append"
    )

    if batch.count() < batch_size:
        break

    offset += batch_size

Rate Limit Management

Configure per-model rate limits in session config:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

Monitoring and Metrics

Track query performance and costs through Fenic's built-in metrics:

python
# Query metrics table
metrics_df = session.table("fenic_system.query_metrics")

# Analyze recent queries
recent_queries = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("total_rm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(10)

recent_queries.show()

# Aggregate costs by time window
cost_analysis = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests
    FROM {metrics}
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

cost_analysis.show()

Complete Integration Example

End-to-end implementation combining all components:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# 1. Configure session
config = SessionConfig(
    app_name="customer_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# 2. Load and process data
customers_df = session.read.csv("./data/customers.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown",
                                recursive=True)

# 3. Define extraction schema
class Feedback(BaseModel):
    sentiment: str = Field(description="Sentiment: positive, negative, or neutral")
    main_topic: str = Field(description="Primary topic discussed")
    action_items: List[str] = Field(description="Suggested action items")

# 4. Process feedback with semantic extraction
processed_feedback = feedback_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis")
)

# 5. Create semantic search tool
search_customers_query = customers_df.filter(
    fc.col("segment").contains(fc.tool_param("segment", StringType))
).select(
    fc.col("customer_id"),
    fc.col("company_name"),
    fc.col("segment"),
    fc.col("annual_revenue")
)

session.catalog.create_tool(
    tool_name="search_customers_by_segment",
    tool_description="Find customers in a specific business segment",
    tool_query=search_customers_query,
    tool_params=[
        ToolParam(
            name="segment",
            description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')"
        )
    ],
    result_limit=50
)

# 6. Create feedback analysis tool
feedback_analysis_query = processed_feedback.filter(
    fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType)
)

session.catalog.create_tool(
    tool_name="analyze_feedback_by_sentiment",
    tool_description="Retrieve customer feedback filtered by sentiment",
    tool_query=feedback_analysis_query,
    tool_params=[
        ToolParam(
            name="sentiment_filter",
            description="Filter by sentiment: positive, negative, or neutral"
        )
    ],
    result_limit=25
)

# 7. Deploy MCP server
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="CustomerIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Key Implementation Principles

Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it. This reduces agent complexity and improves response quality.

Build type-safe tools: Create parameterized tools using tool_param for compile-time safety and runtime validation. Type-safe interfaces prevent runtime errors and improve agent reliability.

Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents can call directly. This separation of concerns keeps agent logic clean and data operations maintainable.

Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system. Production systems require observability and cost management.

Use system tools: Automatic tool generation for common operations like schema inspection and SQL analysis reduces development time and provides agents with flexible data exploration capabilities.

Additional Resources

For more implementation patterns and use cases, see:

Conclusion

Separating data preparation from agent reasoning creates cleaner architectures where agents focus on decisions while semantic pipelines handle data transformation challenges. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.

Fenic provides the production-grade preprocessing layer that LangChain applications need, handling parsing, semantic enrichment, and batch inference within a familiar DataFrame API. The Model Context Protocol integration enables seamless agent access to processed data, while semantic operations eliminate custom prompt engineering for common tasks.

Explore the Fenic repository and Typedef platform to build semantic data pipelines for your LangChain agents. How to Augment LangChain A ... cf08090a0e0dfcfa53ab647.md External Displaying How to Augment LangChain Agents with Typedef ai Pr 29cdf41efcf08090a0e0dfcfa53ab647.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.