<< goback()

How to Enhance LangChain Agents with Fenic's Semantic Data Pipelines

Typedef Team

How to Enhance LangChain Agents with Fenic's Semantic Data Pipelines

LangChain agents excel at reasoning and decision-making, but their effectiveness depends on the quality and structure of data they can access. Raw, unstructured data creates friction—agents waste tokens parsing formats, struggle with context windows, and produce inconsistent results.

Fenic solves this by providing a DataFrame-based approach to prepare, enrich, and structure data before it reaches your agents. By combining semantic operations with the Model Context Protocol (MCP), Fenic transforms messy data pipelines into production-grade agent tools.

The Data Quality Problem in Agentic Workflows

LangChain agents face three core challenges when working with real-world data:

Unstructured inputs require preprocessing: Documents, CSVs, and logs need extraction, cleaning, and normalization before agents can reason effectively over them.

Context window limitations: Agents need pre-filtered, relevant data rather than entire datasets that exceed token limits.

Inconsistent tool interfaces: Custom Python functions for data access create maintenance overhead and lack type safety.

Fenic addresses each challenge through its semantic DataFrame API, enabling batch preprocessing that produces clean, structured data ready for agent consumption.

Fenic's Semantic Data Pipeline Architecture

Fenic provides a PySpark-inspired DataFrame framework with first-class support for semantic operations. The architecture includes:

DataFrame operations with AI-native types: Standard transformations (select, filter, join) work alongside semantic functions that use language models for intelligent data manipulation.

Semantic operators: Built-in functions for extraction, classification, clustering, and semantic joins that handle embeddings and LLM calls automatically.

MCP server integration: Declarative tool creation that exposes DataFrames as callable functions for agents to use.

Multi-provider inference: Unified interface for OpenAI, Anthropic, Google, and other model providers with automatic rate limiting and retry logic.

Setting Up Fenic for Agent Integration

Install Fenic and configure a session with your preferred language models:

python
pip install fenic

Create a session with semantic capabilities:

python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="langchain_agent_tools",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Core Semantic Operations for Agent Data Preparation

Semantic Extraction

Extract structured information from unstructured text using Pydantic models. This operation transforms messy documents into typed data structures:

python
from pydantic import BaseModel, Field
from typing import List
import fenic.api.functions as fc

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key product features")

# Load documents and extract structured data
df = session.read.docs("./product_descriptions/**/*.md",
                       content_type="markdown",
                       recursive=True)

df = df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ProductInfo
    ).alias("product_data")
)

The extracted data becomes queryable through standard DataFrame operations, enabling precise filtering and transformation before agents access it.

Semantic Joins

Join datasets based on meaning rather than exact string matches. Semantic joins evaluate natural language predicates between row pairs:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Create examples to guide the join
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience",
    right="Backend Developer - Python/Go",
    output=True
))
examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

# Perform semantic join
matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
        Job Requirements: {{ left_on }}
        Candidate Skills: {{ right_on }}
        The candidate meets the core requirements for this role.
    '''),
    left_on=fc.col("job_requirements"),
    right_on=fc.col("candidate_skills"),
    examples=examples
)

This creates high-quality matches that agents can use for recommendations without needing to evaluate every combination themselves.

Semantic Clustering

Group similar items automatically using embeddings and K-means clustering:

python
df_with_embeddings = df.select(
    fc.col("ticket_id"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

# Analyze clusters
cluster_summary = clustered_df.group_by("cluster_id").agg(
    fc.count("*").alias("ticket_count"),
    fc.first(fc.col("description")).alias("sample_description")
)

Agents can query pre-clustered data to identify patterns and trends without processing raw records.

Semantic Mapping

Transform text using LLM-powered generation with few-shot examples:

python
from fenic.core.types.semantic_examples import MapExample, MapExampleCollection

examples = MapExampleCollection()
examples.create_example(MapExample(
    input={"title": "User can't login", "body": "Getting 401 errors"},
    output="Authentication"
))
examples.create_example(MapExample(
    input={"title": "Slow page load", "body": "Dashboard takes 10s to load"},
    output="Performance"
))

df = df.select(
    fc.col("ticket_id"),
    fc.semantic.map(
        "Classify this support ticket: {{ title }} - {{ body }}",
        title=fc.col("title"),
        body=fc.col("body"),
        examples=examples
    ).alias("category")
)

This pre-classifies data, allowing agents to work with clean categorical variables instead of raw text.

Creating MCP Tools from Fenic DataFrames

The Model Context Protocol enables seamless integration between Fenic's data pipelines and LangChain agents. Create tools that agents can call to access processed data.

Building a Parameterized Tool

Define a DataFrame query with parameters using tool_param:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType, FloatType

# Load and prepare customer data
customers_df = session.read.csv("./data/customers.csv")

# Create parameterized query
search_query = customers_df.filter(
    fc.col("industry").contains(
        fc.tool_param("industry", StringType)
    ) &
    (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType))
).select(
    fc.col("company_name"),
    fc.col("contact_email"),
    fc.col("annual_revenue"),
    fc.col("industry")
)

# Register as a tool
session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search for customers by industry and minimum revenue threshold",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="industry",
            description="Industry sector to filter by (e.g., 'technology', 'healthcare')"
        ),
        ToolParam(
            name="min_revenue",
            description="Minimum annual revenue in USD",
            default_value=0,
            has_default=True
        )
    ],
    result_limit=50
)

Multi-Step Semantic Tool

Build sophisticated tools that chain semantic operations:

python
# Load support tickets
tickets_df = session.read.csv("./data/support_tickets.csv")

# Process with semantic operations
processed_tickets = tickets_df.select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="category_cluster"
)

# Create search tool with similarity matching
similar_tickets_query = processed_tickets.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.tool_param("query_vector", fc.col("embeddings").data_type),
        metric="cosine"
    ) > 0.7
).select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a given query embedding",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of the search query"
        )
    ],
    result_limit=10
)

Running an MCP Server for LangChain Integration

Deploy your Fenic tools as an MCP server that LangChain agents can connect to:

Synchronous Server Setup

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

# Get all tools from catalog
tools = session.catalog.list_tools()

# Create MCP server
server = create_mcp_server(
    session=session,
    server_name="CustomerDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

# Run synchronously
run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

For production environments, use the ASGI interface:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

session = Session.get_or_create(config)
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="ProductionDataServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000

CLI-Based Server

Use the fenic-serve command for rapid deployment:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_customers find_similar_tickets

# Use stdio transport for direct integration
fenic-serve --transport stdio

Advanced Semantic Pipeline Patterns

Hybrid Search with Embeddings

Combine semantic similarity with structured filters:

python
products_df = session.read.csv("./data/products.csv")

# Add embeddings
products_with_embeddings = products_df.select(
    fc.col("product_id"),
    fc.col("name"),
    fc.col("description"),
    fc.col("price"),
    fc.col("category"),
    fc.semantic.embed(fc.col("description")).alias("desc_embeddings")
)

# Hybrid search tool
hybrid_search = products_with_embeddings.filter(
    (fc.col("category") == fc.tool_param("category", StringType)) &
    (fc.col("price").between(
        fc.tool_param("min_price", fc.FloatType),
        fc.tool_param("max_price", fc.FloatType)
    ))
).with_column(
    "similarity_score",
    fc.embedding.compute_similarity(
        fc.col("desc_embeddings"),
        fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type),
        metric="cosine"
    )
).filter(
    fc.col("similarity_score") > 0.6
).order_by(
    fc.col("similarity_score").desc()
)

session.catalog.create_tool(
    tool_name="hybrid_product_search",
    tool_description="Search products using category, price range, and semantic similarity",
    tool_query=hybrid_search,
    tool_params=[
        ToolParam(name="category", description="Product category"),
        ToolParam(name="min_price", description="Minimum price in USD"),
        ToolParam(name="max_price", description="Maximum price in USD"),
        ToolParam(name="query_embedding", description="Search query embedding vector")
    ],
    result_limit=20
)

Dynamic Data Enrichment

Process and enrich data on-demand using async UDFs:

python
import aiohttp
from fenic.api.functions import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

# Apply to DataFrame
enriched_df = reviews_df.select(
    fc.col("review_id"),
    fc.col("review_text"),
    analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis")
)

Multi-Source Data Aggregation

Combine data from multiple sources with semantic operations:

python
# Load from different sources
sales_df = session.read.csv("s3://my-bucket/sales/*.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown",
                                recursive=True)
support_df = session.read.parquet("hf://datasets/company/support/data/*.parquet")

# Extract structured feedback
feedback_structured = feedback_df.select(
    fc.semantic.extract(
        fc.col("content"),
        response_format=CustomerFeedback
    ).alias("feedback_data")
)

# Join sales with feedback using semantic join
combined = sales_df.semantic.join(
    feedback_structured,
    predicate="The customer {{ left_on }} mentioned product {{ right_on }}",
    left_on=fc.col("customer_email"),
    right_on=fc.col("feedback_data")["customer_contact"]
)

System Tool Configuration

Fenic provides automatic tool generation for common operations:

python
from fenic.api.mcp.tools import SystemToolConfig

# Save DataFrames as tables
products_df.write.save_as_table("products", mode="overwrite")
customers_df.write.save_as_table("customers", mode="overwrite")

# Add descriptions
session.catalog.set_table_description(
    "products",
    "Product catalog with descriptions, pricing, and availability"
)
session.catalog.set_table_description(
    "customers",
    "Customer database with contact information and purchase history"
)

# Create server with automatic system tools
server = create_mcp_server(
    session=session,
    server_name="AutomatedToolServer",
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="data",
        max_result_rows=100
    )
)

This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables—agents can explore your data without custom tool definitions.

Performance Optimization

Caching and Materialization

Save intermediate results to avoid recomputation:

python
# Cache expensive semantic operations
embeddings_df = documents_df.select(
    fc.col("doc_id"),
    fc.semantic.embed(fc.col("content")).alias("embeddings")
)

# Materialize to catalog
embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

# Reuse in multiple tools
cached_embeddings = session.table("document_embeddings")

Batch Processing

Process data in batches to control memory and API usage:

python
# Process large datasets in chunks
large_df = session.read.csv("./data/large_dataset.csv")

# Use limit and offset for pagination
batch_size = 1000
offset = 0

while True:
    batch = large_df.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        fc.col("id"),
        fc.semantic.extract(fc.col("text"), response_format=Schema)
    )

    processed_batch.write.save_as_table(
        "processed_results",
        mode="append"
    )

    if batch.count() < batch_size:
        break

    offset += batch_size

Rate Limit Management

Configure per-model rate limits in session config to prevent API throttling:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

Monitoring and Metrics

Track query performance and costs through Fenic's built-in metrics:

python
# Query metrics table
metrics_df = session.table("fenic_system.query_metrics")

# Analyze recent queries
recent_queries = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("total_rm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(10)

recent_queries.show()

# Aggregate costs by time window
cost_analysis = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests
    FROM {metrics}
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

cost_analysis.show()

Complete Integration Example

Here's an end-to-end implementation:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# 1. Configure session
config = SessionConfig(
    app_name="customer_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# 2. Load and process data
customers_df = session.read.csv("./data/customers.csv")
feedback_df = session.read.docs("./feedback/**/*.md",
                                content_type="markdown",
                                recursive=True)

# 3. Define extraction schema
class Feedback(BaseModel):
    sentiment: str = Field(description="Sentiment: positive, negative, or neutral")
    main_topic: str = Field(description="Primary topic discussed")
    action_items: List[str] = Field(description="Suggested action items")

# 4. Process feedback with semantic extraction
processed_feedback = feedback_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis")
)

# 5. Create semantic search tool
search_customers_query = customers_df.filter(
    fc.col("segment").contains(fc.tool_param("segment", StringType))
).select(
    fc.col("customer_id"),
    fc.col("company_name"),
    fc.col("segment"),
    fc.col("annual_revenue")
)

session.catalog.create_tool(
    tool_name="search_customers_by_segment",
    tool_description="Find customers in a specific business segment",
    tool_query=search_customers_query,
    tool_params=[
        ToolParam(
            name="segment",
            description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')"
        )
    ],
    result_limit=50
)

# 6. Create feedback analysis tool
feedback_analysis_query = processed_feedback.filter(
    fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType)
)

session.catalog.create_tool(
    tool_name="analyze_feedback_by_sentiment",
    tool_description="Retrieve customer feedback filtered by sentiment",
    tool_query=feedback_analysis_query,
    tool_params=[
        ToolParam(
            name="sentiment_filter",
            description="Filter by sentiment: positive, negative, or neutral"
        )
    ],
    result_limit=25
)

# 7. Deploy MCP server
tools = session.catalog.list_tools()
server = create_mcp_server(
    session=session,
    server_name="CustomerIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Key Takeaways

Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it.

Build type-safe tools: Create parameterized tools using tool_param for compile-time safety and runtime validation.

Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents can call directly.

Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system.

Leverage system tools: Use automatic tool generation for common operations like schema inspection and SQL analysis.

By separating data preparation from agent reasoning, Fenic enables cleaner architectures where agents focus on decisions while semantic pipelines handle the challenges of data transformation. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.

Explore the Fenic repository and documentation to build semantic data pipelines for your LangChain agents.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.