<< goback()

How to Integrate Fenic for Context Engineering in LangChain Agents

Typedef Team

How to Integrate Fenic for Context Engineering in LangChain Agents

LangChain agents excel at reasoning and orchestration, but their effectiveness depends on the quality of data they receive. Raw, unstructured data creates friction—agents waste tokens parsing formats, struggle with context windows, and produce inconsistent results. Fenic addresses this by providing a DataFrame-based approach to prepare, enrich, and structure data before it reaches your agents.

This guide demonstrates how to leverage Fenic as a data layer for context engineering in LangChain applications, combining semantic operations with the Model Context Protocol to build production-grade agent tools.

The Context Engineering Challenge

LangChain agents face three core problems when working with real-world data:

Unstructured inputs require preprocessing: Documents, CSVs, and logs need extraction, cleaning, and normalization before agents can reason effectively over them.

Context window limitations: Agents need pre-filtered, relevant data rather than entire datasets that exceed token limits.

Inconsistent tool interfaces: Custom Python functions for data access create maintenance overhead and lack type safety.

Fenic addresses each challenge through its semantic DataFrame API, enabling batch preprocessing that produces clean, structured data ready for agent consumption.

Setting Up Fenic for Agent Integration

Install Fenic and configure a session with your preferred language models:

bash
pip install fenic

Create a session with semantic capabilities:

python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="langchain_agent_tools",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

The session configuration supports multiple providers including OpenAI, Anthropic, Google Vertex, and others, with automatic rate limiting and retry logic built in.

Core Semantic Operations for Context Preparation

Semantic Extraction

Transform unstructured text into typed data structures using Pydantic models. This operation extracts structured information from messy documents:

python
from pydantic import BaseModel, Field
from typing import List
import fenic.api.functions as fc

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key product features")

# Load documents and extract structured data
df = session.read.docs(
    "./product_descriptions/**/*.md",
    content_type="markdown",
    recursive=True
)

df = df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ProductInfo
    ).alias("product_data")
)

The extracted data becomes queryable through standard DataFrame operations, enabling precise filtering and transformation before agents access it.

Semantic Joins

Join datasets based on meaning rather than exact string matches. Semantic joins evaluate natural language predicates between row pairs:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Create examples to guide the join
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience",
    right="Backend Developer - Python/Go",
    output=True
))

examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

# Perform semantic join
matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
        Job Requirements: {{ left_on }}
        Candidate Skills: {{ right_on }}

        The candidate meets the core requirements for this role.
    '''),
    left_on=fc.col("job_requirements"),
    right_on=fc.col("candidate_skills"),
    examples=examples
)

This creates high-quality matches that agents can use for recommendations without needing to evaluate every combination themselves.

Semantic Clustering

Group similar items automatically using embeddings and K-means clustering:

python
df_with_embeddings = df.select(
    fc.col("ticket_id"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

# Analyze clusters
cluster_summary = clustered_df.group_by("cluster_id").agg(
    fc.count("*").alias("ticket_count"),
    fc.first(fc.col("description")).alias("sample_description")
)

Agents can query pre-clustered data to identify patterns and trends without processing raw records.

Semantic Mapping

Transform text using LLM-powered generation with few-shot examples:

python
from fenic.core.types.semantic_examples import MapExample, MapExampleCollection

examples = MapExampleCollection()
examples.create_example(MapExample(
    input={"title": "User can't login", "body": "Getting 401 errors"},
    output="Authentication"
))

examples.create_example(MapExample(
    input={"title": "Slow page load", "body": "Dashboard takes 10s to load"},
    output="Performance"
))

df = df.select(
    fc.col("ticket_id"),
    fc.semantic.map(
        "Classify this support ticket: {{ title }} - {{ body }}",
        title=fc.col("title"),
        body=fc.col("body"),
        examples=examples
    ).alias("category")
)

This pre-classifies data, allowing agents to work with clean categorical variables instead of raw text.

Creating MCP Tools from Fenic DataFrames

The Model Context Protocol enables seamless integration between Fenic's data pipelines and LangChain agents. Create tools that agents can call to access processed data.

Building a Parameterized Tool

Define a DataFrame query with parameters using tool_param:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

# Load and prepare customer data
customers_df = session.read.csv("./data/customers.csv")

# Create parameterized query
search_query = customers_df.filter(
    fc.col("industry").contains(
        fc.tool_param("industry", StringType)
    ) &
    (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType))
).select(
    fc.col("company_name"),
    fc.col("contact_email"),
    fc.col("annual_revenue"),
    fc.col("industry")
)

# Register as a tool
session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search for customers by industry and minimum revenue threshold",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="industry",
            description="Industry sector to filter by (e.g., 'technology', 'healthcare')"
        ),
        ToolParam(
            name="min_revenue",
            description="Minimum annual revenue in USD",
            default_value=0,
            has_default=True
        )
    ],
    result_limit=50
)

Multi-Step Semantic Tool

Build sophisticated tools that chain semantic operations:

python
# Load support tickets
tickets_df = session.read.csv("./data/support_tickets.csv")

# Process with semantic operations
processed_tickets = tickets_df.select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="category_cluster"
)

# Create search tool with similarity matching
similar_tickets_query = processed_tickets.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.tool_param("query_vector", fc.col("embeddings").data_type),
        metric="cosine"
    ) > 0.7
).select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a given query embedding",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of the search query"
        )
    ],
    result_limit=10
)

Running an MCP Server for LangChain Integration

Deploy your Fenic tools as an MCP server that LangChain agents can connect to.

Synchronous Server Setup

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

# Get all tools from catalog
tools = session.catalog.list_tools()

# Create MCP server
server = create_mcp_server(
    session=session,
    server_name="CustomerDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

# Run synchronously
run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

For production environments, use the ASGI interface:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

session = Session.get_or_create(config)
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="ProductionDataServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000

CLI-Based Server

Use the fenic-serve command for rapid deployment:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_customers find_similar_tickets

# Use stdio transport for direct integration
fenic-serve --transport stdio

Advanced Context Engineering Patterns

Hybrid Search with Embeddings

Combine semantic similarity with structured filters:

python
products_df = session.read.csv("./data/products.csv")

# Add embeddings
products_with_embeddings = products_df.select(
    fc.col("product_id"),
    fc.col("name"),
    fc.col("description"),
    fc.col("price"),
    fc.col("category"),
    fc.semantic.embed(fc.col("description")).alias("desc_embeddings")
)

from fenic.core.types import FloatType

# Hybrid search tool
hybrid_search = products_with_embeddings.filter(
    (fc.col("category") == fc.tool_param("category", StringType)) &
    (fc.col("price").between(
        fc.tool_param("min_price", FloatType),
        fc.tool_param("max_price", FloatType)
    ))
).with_column(
    "similarity_score",
    fc.embedding.compute_similarity(
        fc.col("desc_embeddings"),
        fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type),
        metric="cosine"
    )
).filter(
    fc.col("similarity_score") > 0.6
).order_by(
    fc.col("similarity_score").desc()
)

session.catalog.create_tool(
    tool_name="hybrid_product_search",
    tool_description="Search products using category, price range, and semantic similarity",
    tool_query=hybrid_search,
    tool_params=[
        ToolParam(name="category", description="Product category"),
        ToolParam(name="min_price", description="Minimum price in USD"),
        ToolParam(name="max_price", description="Maximum price in USD"),
        ToolParam(name="query_embedding", description="Search query embedding vector")
    ],
    result_limit=20
)

Dynamic Data Enrichment

Process and enrich data on-demand using async UDFs:

python
import aiohttp
from fenic.api.functions import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

# Apply to DataFrame
enriched_df = reviews_df.select(
    fc.col("review_id"),
    fc.col("review_text"),
    analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis")
)

Multi-Source Data Aggregation

Combine data from multiple sources with semantic operations:

python
# Load from different sources
sales_df = session.read.csv("s3://my-bucket/sales/*.csv")
feedback_df = session.read.docs(
    "./feedback/**/*.md",
    content_type="markdown",
    recursive=True
)
support_df = session.read.parquet("hf://datasets/company/support/data/*.parquet")

# Extract structured feedback
feedback_structured = feedback_df.select(
    fc.semantic.extract(
        fc.col("content"),
        response_format=CustomerFeedback
    ).alias("feedback_data")
)

# Join sales with feedback using semantic join
combined = sales_df.semantic.join(
    feedback_structured,
    predicate="The customer {{ left_on }} mentioned product {{ right_on }}",
    left_on=fc.col("customer_email"),
    right_on=fc.col("feedback_data")["customer_contact"]
)

System Tool Configuration

Fenic provides automatic tool generation for common operations:

python
from fenic.api.mcp.tools import SystemToolConfig

# Save DataFrames as tables
products_df.write.save_as_table("products", mode="overwrite")
customers_df.write.save_as_table("customers", mode="overwrite")

# Add descriptions
session.catalog.set_table_description(
    "products",
    "Product catalog with descriptions, pricing, and availability"
)

session.catalog.set_table_description(
    "customers",
    "Customer database with contact information and purchase history"
)

# Create server with automatic system tools
server = create_mcp_server(
    session=session,
    server_name="AutomatedToolServer",
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="data",
        max_result_rows=100
    )
)

This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables—agents can explore your data without custom tool definitions.

Text Processing for Context Preparation

Loading Unstructured Documents

Fenic provides specialized readers for various document formats:

python
# Load markdown documents
docs_df = session.read.docs(
    "data/documents/**/*.md",
    content_type="markdown",
    recursive=True
)

# Load CSV data
csv_df = session.read.csv("data/*.csv")

# Extract PDF metadata
pdf_metadata = session.read.pdf_metadata(
    "data/pdfs/**/*.pdf",
    recursive=True
)

Text Chunking Strategies

Proper chunking is critical for context preparation:

python
from fenic.api.functions import text, col

# Recursive token chunking
chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=500,
        chunk_overlap_percentage=10
    ).alias("chunks")
)

# Explode chunks into individual rows
chunked_df = chunked_df.select(
    col("file_path"),
    col("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Generating Embeddings

Once text is chunked, generate embeddings for semantic search:

python
from fenic.api.functions import semantic

# Generate embeddings for all chunks
embedded_df = chunked_df.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding")
)

# Token counting for monitoring
embedded_df = embedded_df.with_column(
    text.count_tokens(col("chunk_text")).alias("token_count")
)

Performance Optimization

Caching and Materialization

Save intermediate results to avoid recomputation:

python
# Cache expensive semantic operations
embeddings_df = documents_df.select(
    fc.col("doc_id"),
    fc.semantic.embed(fc.col("content")).alias("embeddings")
)

# Materialize to catalog
embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

# Reuse in multiple tools
cached_embeddings = session.table("document_embeddings")

Batch Processing

Process data in batches to control memory and API usage:

python
# Process large datasets in chunks
large_df = session.read.csv("./data/large_dataset.csv")

# Use limit and offset for pagination
batch_size = 1000
offset = 0

while True:
    batch = large_df.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        fc.col("id"),
        fc.semantic.extract(fc.col("text"), response_format=Schema)
    )

    processed_batch.write.save_as_table(
        "processed_results",
        mode="append"
    )

    if batch.count() < batch_size:
        break

    offset += batch_size

Rate Limit Management

Configure per-model rate limits in session config to prevent API throttling:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

Monitoring and Metrics

Track query performance and costs through Fenic's built-in metrics:

python
# Query metrics table
metrics_df = session.table("fenic_system.query_metrics")

# Analyze recent queries
recent_queries = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("total_rm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(10)

recent_queries.show()

# Aggregate costs by time window
cost_analysis = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests
    FROM {metrics}
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

cost_analysis.show()

Complete Integration Example

Here's an end-to-end implementation combining context engineering with LangChain:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# 1. Configure session
config = SessionConfig(
    app_name="customer_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# 2. Load and process data
customers_df = session.read.csv("./data/customers.csv")
feedback_df = session.read.docs(
    "./feedback/**/*.md",
    content_type="markdown",
    recursive=True
)

# 3. Define extraction schema
class Feedback(BaseModel):
    sentiment: str = Field(description="Sentiment: positive, negative, or neutral")
    main_topic: str = Field(description="Primary topic discussed")
    action_items: List[str] = Field(description="Suggested action items")

# 4. Process feedback with semantic extraction
processed_feedback = feedback_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis")
)

# 5. Create semantic search tool
search_customers_query = customers_df.filter(
    fc.col("segment").contains(fc.tool_param("segment", StringType))
).select(
    fc.col("customer_id"),
    fc.col("company_name"),
    fc.col("segment"),
    fc.col("annual_revenue")
)

session.catalog.create_tool(
    tool_name="search_customers_by_segment",
    tool_description="Find customers in a specific business segment",
    tool_query=search_customers_query,
    tool_params=[
        ToolParam(
            name="segment",
            description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')"
        )
    ],
    result_limit=50
)

# 6. Create feedback analysis tool
feedback_analysis_query = processed_feedback.filter(
    fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType)
)

session.catalog.create_tool(
    tool_name="analyze_feedback_by_sentiment",
    tool_description="Retrieve customer feedback filtered by sentiment",
    tool_query=feedback_analysis_query,
    tool_params=[
        ToolParam(
            name="sentiment_filter",
            description="Filter by sentiment: positive, negative, or neutral"
        )
    ],
    result_limit=25
)

# 7. Deploy MCP server
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="CustomerIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Best Practices for Production Deployments

Structure Extraction from Unstructured Data

Treat unstructured data as containing latent structure that LLMs can surface:

Leverage Semantic Chunking: Use intelligent chunking over naive character-count splitting. Respect document structure for semantically meaningful segments. Configure overlap to ensure context continuity.

Extract and Preserve Structure: Process markdown documents with header-based chunking. Maintain document hierarchy and temporal relationships in your data pipeline.

Dynamic Context Assembly Strategies

Implement adaptive context selection based on query requirements:

Hierarchical Context Management: Define multiple model tiers with appropriate rate limits and token budgets per tier. Route queries based on complexity and latency requirements to optimize both cost and performance.

Semantic Operations for Context Selection: Use semantic predicates to filter relevant context dynamically based on user queries. This reduces token consumption while improving response quality.

Monitoring and Debugging Workflows

Implement comprehensive observability for your context engineering pipelines:

Row-Level Lineage Tracking: Trace every transformation including non-deterministic outputs. This enables you to investigate unexpected agent behaviors and optimize context selection strategies.

Explicit Caching Strategically: Cache expensive operations during iterative development. Use .cache() on DataFrames to persist results of costly semantic operations.

Production Deployment Patterns

Develop Locally, Scale on Cloud

The same code runs seamlessly from laptop to production through the SessionConfig system:

python
import fenic as fc

config = fc.SessionConfig(
    app_name="production_agent",
    semantic=fc.SemanticConfig(
        language_models={
            "flash": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash-lite",
                rpm=300,
                tpm=150_000
            )
        }
    ),
    cloud=fc.CloudConfig(size=fc.CloudExecutorSize.LARGE)
)

# Create session with config
session = fc.Session.get_or_create(config)

# Same pipeline code works locally and in cloud
production_context = (df
    .with_column(
        "extracted_context",
        fc.semantic.extract(
            fc.col("combined_context"),
            context_schema,
            model_alias="flash"
        )
    )
    .unnest("extracted_context")
    .collect()
)

Zero Code Changes from Development to Production

Fenic follows a "develop locally, deploy to cloud instantly" philosophy. The SessionConfig system enables you to prototype locally and scale to production without modifying your pipeline code.

Key Implementation Principles

Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it.

Build type-safe tools: Create parameterized tools using tool_param for compile-time safety and runtime validation.

Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents can call directly.

Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system.

Leverage system tools: Use automatic tool generation for common operations like schema inspection and SQL analysis.

By separating data preparation from agent reasoning, Fenic enables cleaner architectures where agents focus on decisions while semantic pipelines handle the challenges of data transformation. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.

Explore the Fenic repository to build semantic data pipelines for your LangChain agents. For more information about building production-grade AI applications, visit the Typedef platform.

Additional Resources

from fenic.core.types import FloatType

Hybrid search tool

hybrid_search = products_with_embeddings.filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("price").between( fc.tool_param("min_price", FloatType), fc.tool_param("max_price", FloatType) )) ).with_column( "similarity_score", fc.embedding.compute_similarity( fc.col("desc_embeddings"), fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type), metric="cosine" ) ).filter( fc.col("similarity_score") > 0.6 ).order_by( fc.col("similarity_score").desc() ) How to Integrate Fenic for C ... efcf0806fb9a1d34b81541b2b.md External Displaying How to Integrate Fenic for Context Engineering in 29cdf41efcf0806fb9a1d34b81541b2b.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.