LangChain agents excel at reasoning and orchestration, but their effectiveness depends entirely on the quality and structure of data they can access. Raw unstructured data creates friction—agents waste tokens parsing formats, struggle with context window limits, and produce inconsistent results.
Typedef.ai solves this with Fenic, a PySpark-inspired DataFrame framework that provides semantic preprocessing infrastructure for AI applications. By handling data preparation, enrichment, and structuring before agents interact with it, Fenic transforms brittle LLM pipelines into production-grade systems.
The Data Quality Problem in Agent Workflows
LangChain agents face three core challenges when working with real-world data:
Unstructured inputs require preprocessing: Documents, transcripts, CSVs, and logs need extraction, cleaning, and normalization before agents can reason effectively. Without preprocessing, agents spend valuable context window space on format parsing instead of decision-making.
Context window limitations: Agents need pre-filtered, relevant data rather than entire datasets. Even with extended context windows, sending raw data to agents is inefficient and expensive.
Inconsistent tool interfaces: Custom Python functions for data access create maintenance overhead and lack type safety. Each function becomes a potential failure point in production.
What is Fenic
Fenic is an open-source DataFrame framework built specifically for AI workflows. It provides:
- DataFrame operations with AI-native types for standard transformations that work alongside semantic functions
- Semantic operators for extraction, classification, clustering, and semantic joins with automatic embedding and LLM call management
- Model Context Protocol integration for declarative tool creation that exposes DataFrames as callable functions
- Multi-provider inference with unified interfaces for OpenAI, Anthropic, Google, and other providers
The framework handles rate limiting, retries, and concurrent request batching automatically, allowing developers to focus on pipeline logic rather than infrastructure.
Setting Up Fenic for LangChain Integration
Install Fenic and configure a session with your preferred language models:
pythonpip install fenic
Create a session with semantic capabilities:
pythonfrom fenic.api.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel ) config = SessionConfig( app_name="langchain_agent_tools", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Core Semantic Operations for Data Preparation
Semantic Extraction
Transform unstructured text into typed data structures using Pydantic models. This operation converts messy documents into queryable structured data:
pythonfrom pydantic import BaseModel, Field from typing import List import fenic.api.functions as fc class ProductInfo(BaseModel): name: str = Field(description="Product name") price: float = Field(description="Price in USD") features: List[str] = Field(description="Key product features") # Load documents and extract structured data df = session.read.docs("./product_descriptions/**/*.md", content_type="markdown", recursive=True) df = df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=ProductInfo ).alias("product_data") )
The extracted data becomes queryable through standard DataFrame operations, enabling precise filtering and transformation before agents access it. More details available in this resource.
Semantic Joins
Join datasets based on meaning rather than exact string matches. Semantic joins evaluate natural language predicates between row pairs:
pythonfrom textwrap import dedent from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Create examples to guide the join examples = JoinExampleCollection() examples.create_example(JoinExample( left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience", right="Backend Developer - Python/Go", output=True )) examples.create_example(JoinExample( left="Marketing Manager with social media expertise", right="Senior Software Engineer", output=False )) # Perform semantic join matched_df = jobs_df.semantic.join( candidates_df, predicate=dedent(''' Job Requirements: {{ left_on }} Candidate Skills: {{ right_on }} The candidate meets the core requirements for this role. '''), left_on=fc.col("job_requirements"), right_on=fc.col("candidate_skills"), examples=examples )
This creates high-quality matches that agents can use for recommendations without needing to evaluate every combination themselves.
Semantic Clustering
Group similar items automatically using embeddings and K-means clustering:
pythondf_with_embeddings = df.select( fc.col("ticket_id"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ) clustered_df = df_with_embeddings.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" ) # Analyze clusters cluster_summary = clustered_df.group_by("cluster_id").agg( fc.count("*").alias("ticket_count"), fc.first(fc.col("description")).alias("sample_description") )
Agents can query pre-clustered data to identify patterns and trends without processing raw records.
Semantic Mapping
Transform text using LLM-powered generation with few-shot examples:
pythonfrom fenic.core.types.semantic_examples import MapExample, MapExampleCollection examples = MapExampleCollection() examples.create_example(MapExample( input={"title": "User can't login", "body": "Getting 401 errors"}, output="Authentication" )) examples.create_example(MapExample( input={"title": "Slow page load", "body": "Dashboard takes 10s to load"}, output="Performance" )) df = df.select( fc.col("ticket_id"), fc.semantic.map( "Classify this support ticket: {{ title }} - {{ body }}", title=fc.col("title"), body=fc.col("body"), examples=examples ).alias("category") )
This pre-classifies data, allowing agents to work with clean categorical variables instead of raw text.
Creating MCP Tools from Fenic DataFrames
The Model Context Protocol enables seamless integration between Fenic's data pipelines and LangChain agents. Create tools that agents can call to access processed data.
Building a Parameterized Tool
Define a DataFrame query with parameters using tool_param:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType, FloatType # Load and prepare customer data customers_df = session.read.csv("./data/customers.csv") # Create parameterized query search_query = customers_df.filter( fc.col("industry").contains( fc.tool_param("industry", StringType) ) & (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType)) ).select( fc.col("company_name"), fc.col("contact_email"), fc.col("annual_revenue"), fc.col("industry") ) # Register as a tool session.catalog.create_tool( tool_name="search_customers", tool_description="Search for customers by industry and minimum revenue threshold", tool_query=search_query, tool_params=[ ToolParam( name="industry", description="Industry sector to filter by (e.g., 'technology', 'healthcare')" ), ToolParam( name="min_revenue", description="Minimum annual revenue in USD", default_value=0, has_default=True ) ], result_limit=50 )
Multi-Step Semantic Tool
Build sophisticated tools that chain semantic operations:
python# Load support tickets tickets_df = session.read.csv("./data/support_tickets.csv") # Process with semantic operations processed_tickets = tickets_df.select( fc.col("ticket_id"), fc.col("subject"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ).semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=15, label_column="category_cluster" ) # Create search tool with similarity matching from fenic.core.types import EmbeddingType similar_tickets_query = processed_tickets.filter( fc.embedding.compute_similarity( fc.col("embeddings"), fc.tool_param("query_vector", EmbeddingType(dimensions=1536, embedding_model="text-embedding-3-small")), metric="cosine" ) > 0.7 ).select( fc.col("ticket_id"), fc.col("subject"), fc.col("category_cluster") ) session.catalog.create_tool( tool_name="find_similar_tickets", tool_description="Find support tickets similar to a given query embedding", tool_query=similar_tickets_query, tool_params=[ ToolParam( name="query_vector", description="Embedding vector of the search query" ) ], result_limit=10 )
Running an MCP Server for LangChain Integration
Deploy your Fenic tools as an MCP server that LangChain agents can connect to.
Synchronous Server Setup
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get all tools from catalog tools = session.catalog.list_tools() # Create MCP server server = create_mcp_server( session=session, server_name="CustomerDataServer", user_defined_tools=tools, concurrency_limit=10 ) # Run synchronously run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI Deployment
For production environments, use the ASGI interface:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi session = Session.get_or_create(config) tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="ProductionDataServer", user_defined_tools=tools, concurrency_limit=20 ) app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy with uvicorn # uvicorn app:app --host 0.0.0.0 --port 8000
CLI-Based Server
Use the fenic-serve command for rapid deployment:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_customers find_similar_tickets # Use stdio transport for direct integration fenic-serve --transport stdio
Integration Architecture Patterns
Pattern 1: Preprocessing Layer
Use Fenic to prepare data before LangChain processing:
Raw Data → Fenic (parse, structure, enrich) → LangChain (orchestration, decisions)
Fenic handles batch operations while LangChain manages interactive flows. This pattern works well for applications that need significant data preparation before agent interaction.
Pattern 2: MCP Integration
Expose Fenic operations as tools that LangChain agents call:
LangChain Agent → MCP Server (Fenic tools) → Structured Data
Agents access data analysis without leaving their execution context. More details in this guide.
Pattern 3: Hybrid Processing
Combine both approaches for advanced workflows:
Fenic (batch enrichment) → Data Store → LangChain (interactive) ↔ MCP (Fenic tools)
Advanced Semantic Pipeline Patterns
Hybrid Search with Embeddings
Combine semantic similarity with structured filters:
pythonproducts_df = session.read.csv("./data/products.csv") # Add embeddings products_with_embeddings = products_df.select( fc.col("product_id"), fc.col("name"), fc.col("description"), fc.col("price"), fc.col("category"), fc.semantic.embed(fc.col("description")).alias("desc_embeddings") ) from fenic.core.types import StringType, FloatType, EmbeddingType hybrid_search = products_with_embeddings.filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("price").between( fc.tool_param("min_price", FloatType), fc.tool_param("max_price", FloatType) )) ).with_column( "similarity_score", fc.embedding.compute_similarity( fc.col("desc_embeddings"), fc.tool_param("query_embedding", EmbeddingType(dimensions=1536, embedding_model="text-embedding-3-small")), metric="cosine" ) ).filter( fc.col("similarity_score") > 0.6 ).order_by( fc.col("similarity_score").desc() ) session.catalog.create_tool( tool_name="hybrid_product_search", tool_description="Search products using category, price range, and semantic similarity", tool_query=hybrid_search, tool_params=[ ToolParam(name="category", description="Product category"), ToolParam(name="min_price", description="Minimum price in USD"), ToolParam(name="max_price", description="Maximum price in USD"), ToolParam(name="query_embedding", description="Search query embedding vector") ], result_limit=20 )
Dynamic Data Enrichment
Process and enrich data on-demand using async UDFs:
pythonimport aiohttp from fenic.api.functions import async_udf from fenic.core.types import StructType, StructField, StringType, FloatType @async_udf( return_type=StructType([ StructField("sentiment", StringType), StructField("confidence", FloatType) ]), max_concurrency=15, timeout_seconds=5, num_retries=2 ) async def analyze_sentiment(text: str) -> dict: async with aiohttp.ClientSession() as session: async with session.post( "https://api.example.com/sentiment", json={"text": text} ) as resp: data = await resp.json() return { "sentiment": data["label"], "confidence": data["score"] } # Apply to DataFrame enriched_df = reviews_df.select( fc.col("review_id"), fc.col("review_text"), analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis") )
Multi-Source Data Aggregation
Combine data from multiple sources with semantic operations:
python# Load from different sources sales_df = session.read.csv("s3://my-bucket/sales/*.csv") feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True) support_df = session.read.parquet("hf://datasets/company/support/data/*.parquet") # Extract structured feedback feedback_structured = feedback_df.select( fc.semantic.extract( fc.col("content"), response_format=CustomerFeedback ).alias("feedback_data") ) # Join sales with feedback using semantic join combined = sales_df.semantic.join( feedback_structured, predicate="The customer {{ left_on }} mentioned product {{ right_on }}", left_on=fc.col("customer_email"), right_on=fc.col("feedback_data")["customer_contact"] )
System Tool Configuration
Fenic provides automatic tool generation for common operations:
pythonfrom fenic.api.mcp.tools import SystemToolConfig # Save DataFrames as tables products_df.write.save_as_table("products", mode="overwrite") customers_df.write.save_as_table("customers", mode="overwrite") # Add descriptions session.catalog.set_table_description( "products", "Product catalog with descriptions, pricing, and availability" ) session.catalog.set_table_description( "customers", "Customer database with contact information and purchase history" ) # Create server with automatic system tools server = create_mcp_server( session=session, server_name="AutomatedToolServer", system_tools=SystemToolConfig( table_names=session.catalog.list_tables(), tool_namespace="data", max_result_rows=100 ) )
This generates tools for schema inspection, data profiling, regex search, and SQL analysis across all tables. Agents can explore data without custom tool definitions.
Text Processing for Unstructured Data
Text Chunking Strategies
Proper chunking is critical for RAG applications. Fenic provides multiple strategies:
pythonfrom fenic.api.functions import text # Recursive token chunking - preserves structure at natural boundaries chunked_df = docs_df.select( fc.col("file_path"), text.recursive_token_chunk( fc.col("content"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") ) # Explode chunks into individual rows chunked_df = chunked_df.select( fc.col("file_path"), fc.col("chunks") ).explode("chunks").select( fc.col("file_path"), fc.col("chunks").alias("chunk_text") )
Additional chunking methods include character-based and word-based chunking. Full implementation details in this resource.
Transcript Processing
Parse multiple transcript formats into unified schema:
pythonfrom fenic.api.functions import text # Load transcript files df = session.read.docs( "transcripts/**/*.txt", content_type="markdown", recursive=True ) # Parse SRT format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments into rows segments_df = parsed_df.select( fc.col("file_path"), fc.col("segments") ).explode("segments").select( fc.col("file_path"), fc.col("segments").index.alias("index"), fc.col("segments").speaker.alias("speaker"), fc.col("segments").start_time.alias("start_time"), fc.col("segments").end_time.alias("end_time"), fc.col("segments").content.alias("text") )
Supported formats include SRT, WebVTT, and generic conversation formats. See transcript processing guide for complete examples.
Performance Optimization
Caching and Materialization
Save intermediate results to avoid recomputation:
python# Cache expensive semantic operations embeddings_df = documents_df.select( fc.col("doc_id"), fc.semantic.embed(fc.col("content")).alias("embeddings") ) # Materialize to catalog embeddings_df.write.save_as_table("document_embeddings", mode="overwrite") # Reuse in multiple tools cached_embeddings = session.table("document_embeddings")
Batch Processing
Process data in batches to control memory and API usage:
python# Process large datasets in chunks large_df = session.read.csv("./data/large_dataset.csv") batch_size = 1000 offset = 0 while True: batch = large_df.limit(batch_size).offset(offset) processed_batch = batch.select( fc.col("id"), fc.semantic.extract(fc.col("text"), response_format=Schema) ) processed_batch.write.save_as_table( "processed_results", mode="append" ) if batch.count() < batch_size: break offset += batch_size
Rate Limit Management
Configure per-model rate limits in session config:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100000 ) }, default_language_model="fast" ) )
Monitoring and Metrics
Track query performance and costs through Fenic's built-in metrics:
python# Query metrics table metrics_df = session.table("fenic_system.query_metrics") # Analyze recent queries recent_queries = metrics_df.select( fc.col("query_id"), fc.col("total_lm_cost"), fc.col("total_lm_requests"), fc.col("total_rm_requests"), fc.col("end_ts") ).order_by(fc.col("end_ts").desc()).limit(10) recent_queries.show() # Aggregate costs by time window cost_analysis = session.sql(""" SELECT DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour, SUM(total_lm_cost) as total_cost, SUM(total_lm_requests) as total_requests FROM {metrics} GROUP BY hour ORDER BY hour DESC """, metrics=metrics_df) cost_analysis.show()
Complete Integration Example
End-to-end implementation combining all components:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType import fenic.api.functions as fc from pydantic import BaseModel, Field from typing import List # 1. Configure session config = SessionConfig( app_name="customer_intelligence", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # 2. Load and process data customers_df = session.read.csv("./data/customers.csv") feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True) # 3. Define extraction schema class Feedback(BaseModel): sentiment: str = Field(description="Sentiment: positive, negative, or neutral") main_topic: str = Field(description="Primary topic discussed") action_items: List[str] = Field(description="Suggested action items") # 4. Process feedback with semantic extraction processed_feedback = feedback_df.select( fc.col("file_path"), fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis") ) # 5. Create semantic search tool search_customers_query = customers_df.filter( fc.col("segment").contains(fc.tool_param("segment", StringType)) ).select( fc.col("customer_id"), fc.col("company_name"), fc.col("segment"), fc.col("annual_revenue") ) session.catalog.create_tool( tool_name="search_customers_by_segment", tool_description="Find customers in a specific business segment", tool_query=search_customers_query, tool_params=[ ToolParam( name="segment", description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')" ) ], result_limit=50 ) # 6. Create feedback analysis tool feedback_analysis_query = processed_feedback.filter( fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType) ) session.catalog.create_tool( tool_name="analyze_feedback_by_sentiment", tool_description="Retrieve customer feedback filtered by sentiment", tool_query=feedback_analysis_query, tool_params=[ ToolParam( name="sentiment_filter", description="Filter by sentiment: positive, negative, or neutral" ) ], result_limit=25 ) # 7. Deploy MCP server tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="CustomerIntelligenceServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Key Implementation Principles
Preprocess data with semantic operations: Use extraction, classification, clustering, and semantic joins to structure data before agents access it. This reduces agent complexity and improves response quality.
Build type-safe tools: Create parameterized tools using tool_param for compile-time safety and runtime validation. Type-safe interfaces prevent runtime errors and improve agent reliability.
Deploy via MCP: Expose Fenic DataFrames through MCP servers that LangChain agents can call directly. This separation of concerns keeps agent logic clean and data operations maintainable.
Optimize for production: Cache embeddings, batch process large datasets, and monitor costs through Fenic's metrics system. Production systems require observability and cost management.
Use system tools: Automatic tool generation for common operations like schema inspection and SQL analysis reduces development time and provides agents with flexible data exploration capabilities.
Additional Resources
For more implementation patterns and use cases, see:
- Orchestrate Reliable Agents with Typedef and LangGraph
- Build Reliable AI Pipelines with Fenic Semantic Operators
- Create Composable Semantic Operators for Data Transformation
- Eliminate Fragile Glue Code in AI Data Processing
Conclusion
Separating data preparation from agent reasoning creates cleaner architectures where agents focus on decisions while semantic pipelines handle data transformation challenges. This separation improves reliability, reduces costs, and makes agentic systems easier to maintain and scale.
Fenic provides the production-grade preprocessing layer that LangChain applications need, handling parsing, semantic enrichment, and batch inference within a familiar DataFrame API. The Model Context Protocol integration enables seamless agent access to processed data, while semantic operations eliminate custom prompt engineering for common tasks.
Explore the Fenic repository and Typedef platform to build semantic data pipelines for your LangChain agents. How to Augment LangChain A ... cf08090a0e0dfcfa53ab647.md External Displaying How to Augment LangChain Agents with Typedef ai Pr 29cdf41efcf08090a0e0dfcfa53ab647.md.

