LangChain agents excel at reasoning and decision-making, but their effectiveness depends on the quality and structure of data they can access. Raw, unstructured data creates friction—agents waste tokens parsing formats, struggle with context windows, and produce inconsistent results.
Fenic solves this by providing a DataFrame-based approach to prepare, enrich, and structure data before it reaches your agents. By combining semantic operations with the Model Context Protocol (MCP), Fenic transforms messy data pipelines into production-grade agent tools.
The Data Quality Problem in Agentic Workflows
LangChain agents face three core challenges when working with real-world data:
Unstructured inputs require preprocessing: Documents, CSVs, and logs need extraction, cleaning, and normalization before agents can reason effectively over them.
Context window limitations: Agents need pre-filtered, relevant data rather than entire datasets that exceed token limits.
Inconsistent tool interfaces: Custom Python functions for data access create maintenance overhead and lack type safety.
Fenic addresses each challenge through its semantic DataFrame API, enabling batch preprocessing that produces clean, structured data ready for agent consumption.
Fenic's Semantic Data Pipeline Architecture
Fenic provides a PySpark-inspired DataFrame framework with first-class support for semantic operations. The architecture includes:
DataFrame operations with AI-native types: Standard transformations (select, filter, join) work alongside semantic functions that use language models for intelligent data manipulation.
Semantic operators: Built-in functions for extraction, classification, clustering, and semantic joins that handle embeddings and LLM calls automatically.
MCP server integration: Declarative tool creation that exposes DataFrames as callable functions for agents to use.
Multi-provider inference: Unified interface for OpenAI, Anthropic, Google, and other model providers with automatic rate limiting and retry logic.
Setting Up Fenic for Agent Integration
Install Fenic and configure a session with your preferred language models:
pythonpip install fenic
Create a session with semantic capabilities:
pythonfrom fenic.api.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel ) config = SessionConfig( app_name="langchain_agent_tools", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Core Semantic Operations for Agent Data Preparation
Semantic Extraction
Extract structured information from unstructured text using Pydantic models. This operation transforms messy documents into typed data structures:
pythonfrom pydantic import BaseModel, Field from typing import List import fenic.api.functions as fc class ProductInfo(BaseModel): name: str = Field(description="Product name") price: float = Field(description="Price in USD") features: List[str] = Field(description="Key product features") # Load documents and extract structured data df = session.read.docs("./product_descriptions/**/*.md", content_type="markdown", recursive=True) df = df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=ProductInfo ).alias("product_data") )
The extracted data becomes queryable through standard DataFrame operations, enabling precise filtering and transformation before agents access it.
Semantic Joins
Join datasets based on meaning rather than exact string matches. Semantic joins evaluate natural language predicates between row pairs:
pythonfrom textwrap import dedent from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Create examples to guide the join examples = JoinExampleCollection() examples.create_example(JoinExample( left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience", right="Backend Developer - Python/Go", output=True )) examples.create_example(JoinExample( left="Marketing Manager with social media expertise", right="Senior Software Engineer", output=False )) # Perform semantic join matched_df = jobs_df.semantic.join( candidates_df, predicate=dedent(''' Job Requirements: {{ left_on }} Candidate Skills: {{ right_on }} The candidate meets the core requirements for this role. '''), left_on=fc.col("job_requirements"), right_on=fc.col("candidate_skills"), examples=examples )
This creates high-quality matches that agents can use for recommendations without needing to evaluate every combination themselves.
Semantic Clustering
Group similar items automatically using embeddings and K-means clustering:
pythondf_with_embeddings = df.select( fc.col("ticket_id"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ) clustered_df = df_with_embeddings.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" ) # Analyze clusters cluster_summary = clustered_df.group_by("cluster_id").agg( fc.count("*").alias("ticket_count"), fc.first(fc.col("description")).alias("sample_description") )
Agents can query pre-clustered data to identify patterns and trends without processing raw records.
Semantic Mapping
Transform text using LLM-powered generation with few-shot examples:
pythonfrom fenic.core.types.semantic_examples import MapExample, MapExampleCollection examples = MapExampleCollection() examples.create_example(MapExample( input={"title": "User can't login", "body": "Getting 401 errors"}, output="Authentication" )) examples.create_example(MapExample( input={"title": "Slow page load", "body": "Dashboard takes 10s to load"}, output="Performance" )) df = df.select( fc.col("ticket_id"), fc.semantic.map( "Classify this support ticket: {{ title }} - {{ body }}", title=fc.col("title"), body=fc.col("body"), examples=examples ).alias("category") )
This pre-classifies data, allowing agents to work with clean categorical variables instead of raw text.
Creating MCP Tools from Fenic DataFrames
The Model Context Protocol enables seamless integration between Fenic's data pipelines and LangChain agents. Create tools that agents can call to access processed data.
Building a Parameterized Tool
Define a DataFrame query with parameters using tool_param
:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType, FloatType # Load and prepare customer data customers_df = session.read.csv("./data/customers.csv") # Create parameterized query search_query = customers_df.filter( fc.col("industry").contains( fc.tool_param("industry", StringType) ) & (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType)) ).select( fc.col("company_name"), fc.col("contact_email"), fc.col("annual_revenue"), fc.col("industry") ) # Register as a tool session.catalog.create_tool( tool_name="search_customers", tool_description="Search for customers by industry and minimum revenue threshold", tool_query=search_query, tool_params=[ ToolParam( name="industry", description="Industry sector to filter by (e.g., 'technology', 'healthcare')" ), ToolParam( name="min_revenue", description="Minimum annual revenue in USD", default_value=0, has_default=True ) ], result_limit=50 )
Multi-Step Semantic Tool
Build sophisticated tools that chain semantic operations:
python# Load support tickets tickets_df = session.read.csv("./data/support_tickets.csv") # Process with semantic operations processed_tickets = tickets_df.select( fc.col("ticket_id"), fc.col("subject"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ).semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=15, label_column="category_cluster" ) # Create search tool with similarity matching similar_tickets_query = processed_tickets.filter( fc.embedding.compute_similarity( fc.col("embeddings"), fc.tool_param("query_vector", fc.col("embeddings").data_type), metric="cosine" ) > 0.7 ).select( fc.col("ticket_id"), fc.col("subject"), fc.col("category_cluster") ) session.catalog.create_tool( tool_name="find_similar_tickets", tool_description="Find support tickets similar to a given query embedding", tool_query=similar_tickets_query, tool_params=[ ToolParam( name="query_vector", description="Embedding vector of the search query" ) ], result_limit=10 )
Running an MCP Server for LangChain Integration
Deploy your Fenic tools as an MCP server that LangChain agents can connect to:
Synchronous Server Setup
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get all tools from catalog tools = session.catalog.list_tools() # Create MCP server server = create_mcp_server( session=session, server_name="CustomerDataServer", user_defined_tools=tools, concurrency_limit=10 ) # Run synchronously run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI Deployment
For production environments, use the ASGI interface:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi session = Session.get_or_create(config) tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="ProductionDataServer", user_defined_tools=tools, concurrency_limit=20 ) app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy with uvicorn # uvicorn app:app --host 0.0.0.0 --port 8000
CLI-Based Server
Use the fenic-serve command for rapid deployment:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_customers find_similar_tickets # Use stdio transport for direct integration fenic-serve --transport stdio
Advanced Semantic Pipeline Patterns
Hybrid Search with Embeddings
Combine semantic similarity with structured filters:
pythonproducts_df = session.read.csv("./data/products.csv") # Add embeddings products_with_embeddings = products_df.select( fc.col("product_id"), fc.col("name"), fc.col("description"), fc.col("price"), fc.col("category"), fc.semantic.embed(fc.col("description")).alias("desc_embeddings") ) # Hybrid search tool hybrid_search = products_with_embeddings.filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("price").between( fc.tool_param("min_price", fc.FloatType), fc.tool_param("max_price", fc.FloatType) )) ).with_column( "similarity_score", fc.embedding.compute_similarity( fc.col("desc_embeddings"), fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type), metric="cosine" ) ).filter( fc.col("similarity_score") > 0.6 ).order_by( fc.col("similarity_score").desc() ) session.catalog.create_tool( tool_name="hybrid_product_search", tool_description="Search products using category, price range, and semantic similarity", tool_query=hybrid_search, tool_params=[ ToolParam(name="category", description="Product category"), ToolParam(name="min_price", description="Minimum price in USD"), ToolParam(name="max_price", description="Maximum price in USD"), ToolParam(name="query_embedding", description="Search query embedding vector") ], result_limit=20 )
Dynamic Data Enrichment
Process and enrich data on-demand using async UDFs:
pythonimport aiohttp from fenic.api.functions import async_udf from fenic.core.types import StructType, StructField, StringType, FloatType @async_udf( return_type=StructType([ StructField("sentiment", StringType), StructField("confidence", FloatType) ]), max_concurrency=15, timeout_seconds=5, num_retries=2 ) async def analyze_sentiment(text: str) -> dict: async with aiohttp.ClientSession() as session: async with session.post( "https://api.example.com/sentiment", json={"text": text} ) as resp: data = await resp.json() return { "sentiment": data["label"], "confidence": data["score"] } # Apply to DataFrame enriched_df = reviews_df.select( fc.col("review_id"), fc.col("review_text"), analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis") )
Multi-Source Data Aggregation
Combine data from multiple sources with semantic operations:
python# Load from different sources sales_df = session.read.csv("s3://my-bucket/sales/*.csv") feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True) support_df = session.read.parquet("hf://datasets/company/support/data/*.parquet") # Extract structured feedback feedback_structured = feedback_df.select( fc.semantic.extract( fc.col("content"), response_format=CustomerFeedback ).alias("feedback_data") ) # Join sales with feedback using semantic join combined = sales_df.semantic.join( feedback_structured, predicate="The customer {{ left_on }} mentioned product {{ right_on }}", left_on=fc.col("customer_email"), right_on=fc.col("feedback_data")["customer_contact"] )
System Tool Configuration
Fenic provides automatic tool generation for common operations:
pythonfrom fenic.api.mcp.tools import SystemToolConfig # Save DataFrames as tables products_df.write.save_as_table("products", mode="overwrite") customers_df.write.save_as_table("customers", mode="overwrite") # Add descriptions session.catalog.set_table_description( "products", "Product catalog with descriptions, pricing, and availability" ) session.catalog.set_table_description( "customers", "Customer database with contact information and purchase history" ) # Create server with automatic system tools server = create_mcp_server( session=session, server_name="AutomatedToolServer", system_tools=SystemToolConfig( table_names=session.catalog.list_tables(), tool_namespace="data", max_result_rows=100 ) )
This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables—agents can explore your data without custom tool definitions.
Performance Optimization
Caching and Materialization
Save intermediate results to avoid recomputation:
python# Cache expensive semantic operations embeddings_df = documents_df.select( fc.col("doc_id"), fc.semantic.embed(fc.col("content")).alias("embeddings") ) # Materialize to catalog embeddings_df.write.save_as_table("document_embeddings", mode="overwrite") # Reuse in multiple tools cached_embeddings = session.table("document_embeddings")
Batch Processing
Process data in batches to control memory and API usage:
python# Process large datasets in chunks large_df = session.read.csv("./data/large_dataset.csv") # Use limit and offset for pagination batch_size = 1000 offset = 0 while True: batch = large_df.limit(batch_size).offset(offset) processed_batch = batch.select( fc.col("id"), fc.semantic.extract(fc.col("text"), response_format=Schema) ) processed_batch.write.save_as_table( "processed_results", mode="append" ) if batch.count() < batch_size: break offset += batch_size
Rate Limit Management
Configure per-model rate limits in session config to prevent API throttling:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100000 ) }, default_language_model="fast" ) )
Monitoring and Metrics
Track query performance and costs through Fenic's built-in metrics:
python# Query metrics table metrics_df = session.table("fenic_system.query_metrics") # Analyze recent queries recent_queries = metrics_df.select( fc.col("query_id"), fc.col("total_lm_cost"), fc.col("total_lm_requests"), fc.col("total_rm_requests"), fc.col("end_ts") ).order_by(fc.col("end_ts").desc()).limit(10) recent_queries.show() # Aggregate costs by time window cost_analysis = session.sql(""" SELECT DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour, SUM(total_lm_cost) as total_cost, SUM(total_lm_requests) as total_requests FROM {metrics} GROUP BY hour ORDER BY hour DESC """, metrics=metrics_df) cost_analysis.show()
Complete Integration Example
Here's an end-to-end implementation:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel from fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType import fenic.api.functions as fc from pydantic import BaseModel, Field from typing import List # 1. Configure session config = SessionConfig( app_name="customer_intelligence", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # 2. Load and process data customers_df = session.read.csv("./data/customers.csv") feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True) # 3. Define extraction schema class Feedback(BaseModel): sentiment: str = Field(description="Sentiment: positive, negative, or neutral") main_topic: str = Field(description="Primary topic discussed") action_items: List[str] = Field(description="Suggested action items") # 4. Process feedback with semantic extraction processed_feedback = feedback_df.select( fc.col("file_path"), fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis") ) # 5. Create semantic search tool search_customers_query = customers_df.filter( fc.col("segment").contains(fc.tool_param("segment", StringType)) ).select( fc.col("customer_id"), fc.col("company_name"), fc.col("segment"), fc.col("annual_revenue") ) session.catalog.create_tool( tool_name="search_customers_by_segment", tool_description="Find customers in a specific business segment", tool_query=search_customers_query, tool_params=[ ToolParam( name="segment", description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')" ) ], result_limit=50 ) # 6. Create feedback analysis tool feedback_analysis_query = processed_feedback.filter( fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType) ) session.catalog.create_tool( tool_name="analyze_feedback_by_sentiment", tool_description="Retrieve customer feedback filtered by sentiment", tool_query=feedback_analysis_query, tool_params=[ ToolParam( name="sentiment_filter", description="Filter by sentiment: positive, negative, or neutral" ) ], result_limit=25 ) # 7. Deploy MCP server tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="CustomerIntelligenceServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Key Takeaways
Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it.
Build type-safe tools: Create parameterized tools using tool_param
for compile-time safety and runtime validation.
Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents can call directly.
Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system.
Leverage system tools: Use automatic tool generation for common operations like schema inspection and SQL analysis.
By separating data preparation from agent reasoning, Fenic enables cleaner architectures where agents focus on decisions while semantic pipelines handle the challenges of data transformation. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.
Explore the Fenic repository and documentation to build semantic data pipelines for your LangChain agents.