<< goback()

How to Boost LangChain Agents with Typedef.ai Enrichment Operators

Typedef Team

How to Boost LangChain Agents with Typedef.ai Enrichment Operators

LangChain agents excel at orchestrating language model interactions and making decisions, but their effectiveness depends on the quality and structure of data they can access. When agents work with raw, unstructured data, they waste tokens parsing formats, struggle with context window limitations, and produce inconsistent results.

Typedef.ai's Fenic framework solves this by providing semantic enrichment operators that prepare, structure, and transform data before it reaches your agents. By treating LLM inference as a first-class DataFrame operation, Fenic enables batch preprocessing that produces clean, structured data ready for agent consumption.

The Data Quality Gap in LangChain Agent Workflows

LangChain agents face three core challenges when processing real-world data:

Unstructured inputs require extensive preprocessing. Documents, CSVs, logs, and transcripts need extraction, cleaning, and normalization before agents can reason effectively. Without proper structure, agents spend cognitive capacity on parsing rather than decision-making.

Context window constraints limit reasoning. Agents perform better with pre-filtered, relevant data rather than entire datasets that exceed token limits. Raw data dumps force agents to scan through irrelevant information, reducing accuracy and increasing costs.

Inconsistent tool interfaces create maintenance overhead. Custom Python functions for data access lack type safety and require manual error handling. Each data source needs its own integration code, leading to brittle pipelines.

Fenic's semantic operators address these challenges through a DataFrame-based architecture where semantic understanding operates as a native data transformation, not an afterthought.

Core Semantic Operators for Agent Data Enrichment

Typedef.ai provides eight semantic operators that transform how agents access and process data. These operators work within a PySpark-inspired DataFrame API, making them composable with traditional data operations.

semantic.extract: Schema-Driven Structured Extraction

The semantic.extract operator converts unstructured text into typed data structures using Pydantic schemas. This eliminates brittle prompt engineering and provides consistent, validated results.

python
from pydantic import BaseModel, Field
from typing import List
import fenic.api.functions as fc

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key product features")

df = session.read.docs("./product_descriptions/**/*.md",
                       content_type="markdown", recursive=True)

df = df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ProductInfo
    ).alias("product_data")
)

The schema acts as both documentation and validation. Agents query the extracted structured data using standard DataFrame operations, filtering on specific fields without needing to parse raw text.

semantic.join: Meaning-Based Matching

Traditional joins require exact matches. Semantic joins evaluate natural language predicates to determine if rows should match, enabling sophisticated relationships based on meaning rather than string equality.

python
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL",
    right="Backend Developer - Python/Go",
    output=True
))
examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate='''
    Job Requirements: {{ left_on }}
    Candidate Skills: {{ right_on }}
    The candidate meets the core requirements for this role.
    ''',
    left_on=fc.col("job_requirements"),
    right_on=fc.col("candidate_skills"),
    examples=examples
)

This creates high-quality matches that agents can use for recommendations without evaluating every combination themselves. The semantic join handles the heavy lifting of comparing text semantically, while agents focus on business logic.

semantic.predicate: Natural Language Filtering

The semantic.predicate operator enables filtering with natural language conditions instead of complex regex patterns or keyword matching.

python
applicants = df.filter(
    (fc.col("yoe") > 5) &
    fc.semantic.predicate(
        "Has MCP Protocol experience? Resume: {{resume}}",
        resume=fc.col("resume")
    )
)

This combines traditional column filtering with semantic understanding. The query engine optimizes both together, potentially filtering on cheap boolean conditions first before invoking expensive LLM predicates.

semantic.classify and semantic.map

Classification categorizes text with few-shot examples, while mapping applies natural language transformations:

python
from fenic.core.types.semantic_examples import MapExample, MapExampleCollection

# Classification
df = df.with_column(
    "category",
    fc.semantic.classify(
        fc.col("text"),
        ["Technical Issue", "Billing Question", "Feature Request"]
    )
)

# Mapping with examples
examples = MapExampleCollection()
examples.create_example(MapExample(
    input={"title": "User can't login", "body": "Getting 401 errors"},
    output="Authentication"
))

df = df.select(
    fc.col("ticket_id"),
    fc.semantic.map(
        "Classify this support ticket: {{ title }} - {{ body }}",
        title=fc.col("title"),
        body=fc.col("body"),
        examples=examples
    ).alias("category")
)

These pre-classified columns allow agents to work with clean categorical variables instead of raw text, reducing processing time and improving consistency.

semantic.with_cluster_labels: Automatic Grouping

Cluster similar items using embeddings and K-means without predefined categories:

python
df_with_embeddings = df.select(
    fc.col("ticket_id"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

cluster_summary = clustered_df.group_by("cluster_id").agg(
    fc.count("*").alias("ticket_count"),
    fc.first(fc.col("description")).alias("sample_description")
)

Agents query pre-clustered data to identify patterns and trends without processing raw records, significantly reducing computational overhead.

Setting Up Fenic for LangChain Integration

Installation and Configuration

Install Fenic and configure a session with your model providers:

python
pip install fenic
python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="langchain_agent_tools",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Rate limiting parameters (rpm and tpm) prevent API throttling while Fenic automatically batches requests, implements retry logic, and self-throttles to maximize throughput.

Creating MCP Tools from Enriched DataFrames

The Model Context Protocol (MCP) enables seamless integration between Fenic's data pipelines and LangChain agents. Create parameterized tools that agents call to access processed data.

Building Parameterized Tools

Define DataFrame queries with parameters using tool_param:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

customers_df = session.read.csv("./data/customers.csv")

search_query = customers_df.filter(
    fc.col("industry").contains(
        fc.tool_param("industry", StringType)
    ) &
    (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType))
).select(
    fc.col("company_name"),
    fc.col("contact_email"),
    fc.col("annual_revenue"),
    fc.col("industry")
)

session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search for customers by industry and minimum revenue threshold",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="industry",
            description="Industry sector to filter by (e.g., 'technology', 'healthcare')"
        ),
        ToolParam(
            name="min_revenue",
            description="Minimum annual revenue in USD",
            default_value=0,
            has_default=True
        )
    ],
    result_limit=50
)

Multi-Step Semantic Tools

Build sophisticated tools that chain semantic operations:

python
tickets_df = session.read.csv("./data/support_tickets.csv")

processed_tickets = tickets_df.select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="category_cluster"
)

similar_tickets_query = processed_tickets.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.tool_param("query_vector", fc.col("embeddings").data_type),
        metric="cosine"
    ) > 0.7
).select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a given query embedding",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of the search query"
        )
    ],
    result_limit=10
)

Running MCP Servers for LangChain Agents

Deploy your Fenic tools as an MCP server that LangChain agents connect to directly.

Synchronous Server Setup

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="CustomerDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

For production environments, use the ASGI interface:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

server = create_mcp_server(
    session=session,
    server_name="ProductionDataServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000

CLI-Based Server

Use the fenic-serve command for rapid deployment:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_customers find_similar_tickets

# Use stdio transport for direct integration
fenic-serve --transport stdio

Advanced Enrichment Patterns for LangChain Agents

Hybrid Search with Embeddings

Combine semantic similarity with structured filters:

python
from fenic.core.types import StringType, FloatType

products_df = session.read.csv("./data/products.csv")

products_with_embeddings = products_df.select(
    fc.col("product_id"),
    fc.col("name"),
    fc.col("description"),
    fc.col("price"),
    fc.col("category"),
    fc.semantic.embed(fc.col("description")).alias("desc_embeddings")
)

hybrid_search = products_with_embeddings.filter(
    (fc.col("category") == fc.tool_param("category", StringType)) &
    (fc.col("price").between(
        fc.tool_param("min_price", FloatType),
        fc.tool_param("max_price", FloatType)
    ))
).with_column(
    "similarity_score",
    fc.embedding.compute_similarity(
        fc.col("desc_embeddings"),
        fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type),
        metric="cosine"
    )
).filter(
    fc.col("similarity_score") > 0.6
).order_by(
    fc.col("similarity_score").desc()
)

This pattern enables agents to perform precise searches that combine business rules with semantic understanding.

Dynamic Data Enrichment with Async UDFs

Process and enrich data on-demand using async UDFs for external API integration:

python
import aiohttp
from fenic.api.functions import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

enriched_df = reviews_df.select(
    fc.col("review_id"),
    fc.col("review_text"),
    analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis")
)

Multi-Source Data Aggregation

Combine data from multiple sources with semantic operations:

python
sales_df = session.read.csv("s3://my-bucket/sales/*.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown", recursive=True)

feedback_structured = feedback_df.select(
    fc.semantic.extract(
        fc.col("content"),
        response_format=CustomerFeedback
    ).alias("feedback_data")
)

combined = sales_df.semantic.join(
    feedback_structured,
    predicate="The customer {{ left_on }} mentioned product {{ right_on }}",
    left_on=fc.col("customer_email"),
    right_on=fc.col("feedback_data")["customer_contact"]
)

System Tool Configuration

Fenic provides automatic tool generation for common operations:

python
from fenic.api.mcp.tools import SystemToolConfig

products_df.write.save_as_table("products", mode="overwrite")
customers_df.write.save_as_table("customers", mode="overwrite")

session.catalog.set_table_description(
    "products",
    "Product catalog with descriptions, pricing, and availability"
)
session.catalog.set_table_description(
    "customers",
    "Customer database with contact information and purchase history"
)

server = create_mcp_server(
    session=session,
    server_name="AutomatedToolServer",
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="data",
        max_result_rows=100
    )
)

This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables. Agents explore your data without custom tool definitions.

Performance Optimization for Agent Workflows

Caching and Materialization

Save intermediate results to avoid recomputation:

python
embeddings_df = documents_df.select(
    fc.col("doc_id"),
    fc.semantic.embed(fc.col("content")).alias("embeddings")
)

embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

# Reuse in multiple tools
cached_embeddings = session.table("document_embeddings")

Batch Processing

Process large datasets in chunks to control memory and API usage:

python
large_df = session.read.csv("./data/large_dataset.csv")

batch_size = 1000
offset = 0

while True:
    batch = large_df.limit(batch_size).offset(offset)
    processed_batch = batch.select(
        fc.col("id"),
        fc.semantic.extract(fc.col("text"), response_format=Schema)
    )

    processed_batch.write.save_as_table(
        "processed_results",
        mode="append"
    )

    if batch.count() < batch_size:
        break
    offset += batch_size

Rate Limit Management

Configure per-model rate limits in session config to prevent API throttling:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

Monitoring and Metrics

Track query performance and costs through Fenic's built-in metrics:

python
metrics_df = session.table("fenic_system.query_metrics")

recent_queries = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(10)

recent_queries.show()

cost_analysis = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests
    FROM {metrics}
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

cost_analysis.show()

Complete Integration Example

Here's an end-to-end implementation showing how to augment LangChain agents with Typedef enrichment operators:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# Configure session
config = SessionConfig(
    app_name="customer_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# Load and process data
customers_df = session.read.csv("./data/customers.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown", recursive=True)

# Define extraction schema
class Feedback(BaseModel):
    sentiment: str = Field(description="Sentiment: positive, negative, or neutral")
    main_topic: str = Field(description="Primary topic discussed")
    action_items: List[str] = Field(description="Suggested action items")

# Process feedback with semantic extraction
processed_feedback = feedback_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis")
)

# Create semantic search tool
search_customers_query = customers_df.filter(
    fc.col("segment").contains(fc.tool_param("segment", StringType))
).select(
    fc.col("customer_id"),
    fc.col("company_name"),
    fc.col("segment"),
    fc.col("annual_revenue")
)

session.catalog.create_tool(
    tool_name="search_customers_by_segment",
    tool_description="Find customers in a specific business segment",
    tool_query=search_customers_query,
    tool_params=[
        ToolParam(
            name="segment",
            description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')"
        )
    ],
    result_limit=50
)

# Create feedback analysis tool
feedback_analysis_query = processed_feedback.filter(
    fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType)
)

session.catalog.create_tool(
    tool_name="analyze_feedback_by_sentiment",
    tool_description="Retrieve customer feedback filtered by sentiment",
    tool_query=feedback_analysis_query,
    tool_params=[
        ToolParam(
            name="sentiment_filter",
            description="Filter by sentiment: positive, negative, or neutral"
        )
    ],
    result_limit=25
)

# Deploy MCP server
tools = session.catalog.list_tools()
server = create_mcp_server(
    session=session,
    server_name="CustomerIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Architecture Patterns for Production

Batch Preprocessing Layer

Use Fenic for heavy lifting in batch pipelines that prepare clean data for real-time agents:

python
enriched_data = (
    raw_documents
    .with_column("raw_md", fc.col("content").cast(fc.MarkdownType))
    .with_column("chunks", fc.markdown.extract_header_chunks("raw_md", header_level=2))
    .explode("chunks")
    .with_column("embedding", fc.semantic.embed(fc.col("chunks").content))
    .with_column(
        "metadata",
        fc.semantic.extract(fc.col("chunks").content, response_format=DocumentMetadata, model_alias="cheap")
    )
)

enriched_data.write.parquet("s3://my-bucket/enriched/")

Agents query the enriched data without expensive inference at request time. This architecture provides:

  • Predictable and responsive agents with no LLM latency in user-facing paths
  • Better resource utilization through batch processing that amortizes fixed costs
  • Cleaner separation where planning and orchestration decouple from execution
  • Easier debugging since preprocessing happens once and can be validated offline

Separation of Concerns

LangChain excels at orchestrating reasoning and decision-making. Fenic excels at data preparation and semantic enrichment. Combining both frameworks creates a robust architecture:

Fenic handles:

  • Batch processing of documents, transcripts, and logs
  • Schema-driven extraction from unstructured text
  • Semantic classification and clustering
  • Embedding generation and similarity search
  • Multi-source data aggregation

LangChain handles:

  • Agent reasoning and planning
  • Tool orchestration and execution
  • Conversation management
  • Decision-making based on structured data

This separation enables cleaner code, better performance, and systems that scale reliably.

Key Takeaways

Typedef.ai enrichment operators transform how LangChain agents work with data by moving semantic processing into batch pipelines. The benefits:

Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it. This reduces agent complexity and improves response quality.

Build type-safe tools: Create parameterized tools using tool_param for compile-time safety and runtime validation. Agents call well-defined interfaces with clear contracts.

Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents call directly. The Model Context Protocol provides seamless integration without custom glue code.

Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system. The declarative API enables automatic optimization.

Leverage system tools: Use automatic tool generation for common operations like schema inspection and SQL analysis. Agents explore data without manual tool definitions.

By separating data preparation from agent reasoning, Fenic enables cleaner architectures where agents focus on decisions while semantic pipelines handle data transformation challenges. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.

For teams building production LangChain applications, Typedef.ai's enrichment operators provide the missing data layer that transforms brittle prototypes into reliable systems. The DataFrame abstraction brings the same rigor to AI workloads that data engineers have applied to traditional pipelines for decades, now extended with semantic intelligence.

Additional Resources

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.