LangChain agents require structured, clean data to function effectively. Raw unstructured inputs—PDFs, transcripts, CSVs, markdown files—cause agents to waste tokens on parsing, exceed context limits, and produce inconsistent outputs.
Typedef.ai's Fenic provides a DataFrame-based preprocessing layer that transforms unstructured data into structured, queryable formats before reaching your agents.
Why Unstructured Inputs Break Agent Workflows
Unstructured data introduces three critical problems:
Token inefficiency: Agents spend computational resources parsing formats rather than reasoning over content.
Context window violations: Raw documents exceed token limits, forcing manual chunking logic scattered across codebases.
Inconsistent results: Lack of schema validation produces unpredictable outputs that downstream systems cannot reliably consume.
Fenic addresses these issues with a PySpark-inspired DataFrame API where semantic operations are first-class citizens, not external bolt-ons.
Initial Setup
Install Fenic:
bashpip install fenic
Configure your session with model credentials:
pythonfrom fenic.api.session import Session, SessionConfig from fenic.api.session.config import SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel config = SessionConfig( app_name="langchain_preprocessing", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
The configuration handles rate limiting and request batching automatically.
Loading Documents
Fenic provides format-specific readers:
Markdown Files
pythonfrom fenic.api.functions import col docs_df = session.read.docs( "data/documents/**/*.md", content_type="markdown", recursive=True ) docs_df = docs_df.filter(col("error").is_null())
CSV Files
pythoncsv_df = session.read.csv("data/*.csv")
With explicit schema:
pythonfrom fenic.core.types.schema import Schema, ColumnField from fenic.core.types.datatypes import StringType, IntegerType csv_df = session.read.csv( "data/products.csv", schema=Schema([ ColumnField(name="product_id", data_type=IntegerType), ColumnField(name="description", data_type=StringType) ]) )
PDF Metadata
pythonpdf_metadata = session.read.pdf_metadata( "data/pdfs/**/*.pdf", recursive=True ) relevant_pdfs = pdf_metadata.filter(col("page_count") < 100)
Text Chunking for RAG
Chunking strategies determine how effectively agents can retrieve relevant context.
Recursive Token Chunking
Splits at paragraph breaks and sentence boundaries:
pythonfrom fenic.api.functions import text, col chunked_df = docs_df.select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") ) chunked_df = chunked_df.select( col("file_path"), col("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Character-Based Chunking
Fixed-size chunks:
pythonchunked_df = docs_df.select( col("file_path"), text.character_chunk( col("content"), chunk_size=1000, chunk_overlap_percentage=15 ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Word-Based Chunking
Custom delimiters for domain-specific splitting:
pythonchunked_df = docs_df.select( col("file_path"), text.recursive_word_chunk( col("content"), chunk_size=200, chunk_overlap_percentage=10, chunking_character_set_custom_characters=['\n\n', '\n', '.', ' '] ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Embedding Generation
Generate embeddings with automatic batching:
pythonfrom fenic.api.functions import semantic embedded_df = chunked_df.select( col("file_path"), col("chunk_text"), semantic.embed(col("chunk_text")).alias("embedding") ) embedded_df = embedded_df.with_column( text.count_tokens(col("chunk_text")).alias("token_count") )
Fenic handles API rate limits and concurrent request batching without manual async code.
Schema-Driven Extraction
Extract structured data using Pydantic models:
pythonfrom pydantic import BaseModel, Field from typing import List class Entity(BaseModel): name: str = Field(description="Entity name") type: str = Field(description="Entity type: person, organization, or location") class ExtractedInfo(BaseModel): entities: List[Entity] = Field(description="Named entities found in text") summary: str = Field(description="Brief summary of the content") topics: List[str] = Field(description="Main topics discussed") extracted_df = docs_df.select( col("file_path"), semantic.extract( col("content"), response_format=ExtractedInfo ).alias("extracted_data") ) extracted_df = extracted_df.select( col("file_path"), col("extracted_data.summary").alias("summary"), col("extracted_data.entities").alias("entities"), col("extracted_data.topics").alias("topics") )
This eliminates manual prompt engineering and validation logic.
Document Classification
Categorize documents into predefined classes:
pythonfrom fenic.api.functions.semantic import ClassDefinition categories = [ ClassDefinition( label="Technical Documentation", description="API docs, architecture guides, technical specifications" ), ClassDefinition( label="Business Content", description="Marketing materials, business plans, proposals" ), ClassDefinition( label="Support Material", description="FAQs, troubleshooting guides, user support content" ) ] classified_df = docs_df.select( col("file_path"), col("content"), semantic.classify( col("content"), classes=categories ).alias("category") )
Semantic Filtering
Filter using natural language predicates:
pythonrelevant_df = docs_df.filter( semantic.predicate( "This text discusses product features or roadmap planning: {{ text }}", text=col("content") ) )
Transcript Processing
Parsing Multiple Formats
Fenic supports SRT, WebVTT, and generic formats:
pythonfrom fenic.api.functions import text transcripts_df = session.read.docs( "transcripts/**/*.txt", content_type="markdown", recursive=True ) parsed_df = transcripts_df.select( col("file_path"), text.parse_transcript(col("content"), "srt").alias("segments") ) segments_df = parsed_df.select( col("file_path"), col("segments") ).explode("segments").select( col("file_path"), col("segments.index").alias("index"), col("segments.speaker").alias("speaker"), col("segments.start_time").alias("start_time"), col("segments.end_time").alias("end_time"), col("segments.content").alias("text") )
Sentiment Analysis on Transcripts
pythonsentiment_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(col("text")) ) speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") )
Extracting Action Items
pythonfrom pydantic import BaseModel, Field class ActionItem(BaseModel): task: str = Field(description="The action to be taken") assignee: str = Field(description="Person responsible") deadline: str = Field(description="Due date if mentioned") actions_df = segments_df.select( col("file_path"), col("text"), semantic.extract( col("text"), ActionItem ).alias("action_item") ).filter( col("action_item").is_not_null() )
Building Complete RAG Pipelines
Combine operations into a production pipeline:
pythonfrom fenic.api.functions import dt import fenic.api.functions as fc # Load documents raw_docs = session.read.docs("data/docs/**/*.md", content_type="markdown", recursive=True) # Clean text cleaned_docs = raw_docs.select( col("file_path"), text.trim(col("content")).alias("content") ).filter(col("content") != "") # Chunk documents chunked = cleaned_docs.select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=400, chunk_overlap_percentage=10 ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") ) # Generate embeddings embedded = chunked.select( col("file_path"), col("chunk_text"), semantic.embed(col("chunk_text")).alias("embedding"), text.count_tokens(col("chunk_text")).alias("tokens") ) # Add metadata final_df = embedded.with_column("processed_at", dt.current_timestamp()) # Save results final_df.write.save_as_table("processed_documents", mode="overwrite")
Semantic Search Operations
Perform similarity search in DataFrames:
pythonfrom fenic.api.functions import embedding query_text = "machine learning best practices" query_df = session.create_dataframe([{"query": query_text}]) query_embedding = query_df.select( semantic.embed(col("query")).alias("query_emb") ).collect()[0]["query_emb"] results_df = embedded_df.select( col("file_path"), col("chunk_text"), embedding.compute_similarity( col("embedding"), query_embedding, metric="cosine" ).alias("similarity_score") ) top_results = results_df.order_by( col("similarity_score").desc() ).limit(10)
MCP Tool Creation
Create tools using Model Context Protocol for LangChain agent integration:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType search_query = final_df.filter( col("file_path").contains( fc.tool_param("path_filter", StringType) ) & (col("tokens") >= fc.tool_param("min_tokens", IntegerType)) ).select( col("file_path"), col("chunk_text"), col("tokens") ) session.catalog.create_tool( tool_name="search_documents", tool_description="Search processed documents by path and minimum token count", tool_query=search_query, tool_params=[ ToolParam( name="path_filter", description="Filter documents by path substring" ), ToolParam( name="min_tokens", description="Minimum number of tokens per chunk", default_value=100, has_default=True ) ], result_limit=50 )
MCP Server Deployment
Synchronous HTTP Server
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="DocumentDataServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI Deployment
pythonfrom fenic.api.mcp import run_mcp_server_asgi app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy with: uvicorn app:app --host 0.0.0.0 --port 8000
CLI Deployment
Use the fenic-serve command:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_documents # Use stdio transport fenic-serve --transport stdio
LangChain Integration
Export processed data for LangChain consumption:
python# Export to CSV final_df.write.csv("output/chunks_with_embeddings.csv") # Convert to pandas pandas_df = final_df.to_pandas() # Create LangChain documents from langchain.schema import Document documents = [ Document( page_content=row["chunk_text"], metadata={ "file_path": row["file_path"], "chunk_id": row["chunk_id"], "tokens": row["tokens"] } ) for row in pandas_df.to_dict('records') ]
Semantic Joins
Join datasets based on meaning:
pythonfrom textwrap import dedent from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection examples = JoinExampleCollection() examples.create_example(JoinExample( left="Senior Backend Engineer with Python experience", right="Backend Developer - Python/Go", output=True )) examples.create_example(JoinExample( left="Marketing Manager with social media expertise", right="Senior Software Engineer", output=False )) matched_df = jobs_df.semantic.join( candidates_df, predicate=dedent(''' Job Requirements: {{ left_on }} Candidate Skills: {{ right_on }} The candidate meets the core requirements for this role. '''), left_on=col("job_requirements"), right_on=col("candidate_skills"), examples=examples )
Semantic Clustering
Group similar items using embeddings:
pythondf_with_embeddings = df.select( col("ticket_id"), col("description"), semantic.embed(col("description")).alias("embeddings") ) clustered_df = df_with_embeddings.semantic.with_cluster_labels( by=col("embeddings"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" ) cluster_summary = clustered_df.group_by("cluster_id").agg( fc.count("*").alias("ticket_count"), fc.first(col("description")).alias("sample_description") )
Performance Optimization
Batch Processing
Process large datasets in chunks:
pythonlarge_docs = session.read.docs("data/large_corpus/**/*.md", recursive=True) batch_size = 1000 total_rows = large_docs.count() for offset in range(0, total_rows, batch_size): batch = large_docs.limit(batch_size).offset(offset) processed_batch = batch.select( col("file_path"), semantic.embed(col("content")).alias("embedding") ) processed_batch.write.parquet( f"output/batch_{offset}.parquet", mode="overwrite" )
Token Monitoring
Track usage and costs:
pythonimport fenic.api.functions as fc metrics_df = embedded_df.select( fc.sum(col("token_count")).alias("total_tokens"), fc.count("*").alias("total_chunks"), fc.avg(col("token_count")).alias("avg_tokens_per_chunk") ) metrics_df.show()
Caching Operations
pythonembeddings_df = documents_df.select( col("doc_id"), semantic.embed(col("content")).alias("embeddings") ) embeddings_df.write.save_as_table("document_embeddings", mode="overwrite") cached_embeddings = session.table("document_embeddings")
Error Handling
Separate successful and failed operations:
pythonfrom fenic.api.functions.builtin import when, lit processed_df = docs_df.with_column( fc.when(col("error").is_null(), True) .otherwise(False) .alias("is_valid") ) success_df = processed_df.filter(col("is_valid")) failed_df = processed_df.filter(~col("is_valid")) failed_df.select(col("file_path"), col("error")).show()
Incremental Processing
Track processed documents:
pythonprocessed_df.write.save_as_table("processed_documents", mode="append") existing = session.table("processed_documents") existing_paths = existing.select(col("file_path")).distinct().collect() processed_paths = {row["file_path"] for row in existing_paths} new_docs = docs_df.filter( ~col("file_path").isin(list(processed_paths)) )
Vector Database Export
Prepare data for vector database ingestion:
pythonfrom fenic.api.functions.builtin import struct export_df = embedded_df.select( col("chunk_id"), col("chunk_text"), col("embedding"), fc.struct( col("file_path"), col("tokens") ).alias("metadata") ) export_df.write.parquet("output/vector_db_ready.parquet")
SQL Interface
Run SQL queries on DataFrames:
pythonresults = session.sql(""" SELECT file_path, COUNT(*) as chunk_count, AVG(tokens) as avg_tokens, MAX(similarity_score) as max_similarity FROM {chunks} WHERE tokens BETWEEN 100 AND 500 GROUP BY file_path HAVING chunk_count > 5 ORDER BY max_similarity DESC """, chunks=results_df)
Template-Based Extraction
Extract data from formatted text:
pythonlog_df = session.create_dataframe([ {"log": "2024-01-15 ERROR Connection failed to server"}, {"log": "2024-01-15 INFO Request processed successfully"} ]) parsed_logs = log_df.select( text.extract( col("log"), template="${date} ${level} ${message}" ).alias("parsed") ) parsed_logs = parsed_logs.select( col("parsed.date").alias("date"), col("parsed.level").alias("level"), col("parsed.message").alias("message") )
Jinja Template Rendering
Generate prompts dynamically:
pythonfrom fenic.api.functions.builtin import lit prompt_df = chunked_df.select( text.jinja( """Answer the following question based on the context: Context: {{ context }} Question: {{ question }} Provide a detailed answer based solely on the context provided.""", context=col("chunk_text"), question=fc.lit("What are the main concepts discussed?") ).alias("prompt") )
Production Example
Complete implementation:
pythonfrom fenic.api.session import Session, SessionConfig from fenic.api.session.config import SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType import fenic.api.functions as fc from pydantic import BaseModel, Field from typing import List # Configure session config = SessionConfig( app_name="document_intelligence", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # Load data docs_df = session.read.docs("./docs/**/*.md", content_type="markdown", recursive=True) # Define schema class DocumentInsight(BaseModel): summary: str = Field(description="Brief summary of the document") key_topics: List[str] = Field(description="Main topics discussed") sentiment: str = Field(description="Overall sentiment") # Process processed = docs_df.select( fc.col("file_path"), fc.semantic.extract(fc.col("content"), response_format=DocumentInsight).alias("insights") ) # Save processed.write.save_as_table("processed_docs", mode="overwrite") # Create tool search_query = session.table("processed_docs").filter( fc.col("insights")["key_topics"].contains( fc.tool_param("topic", StringType) ) ) session.catalog.create_tool( tool_name="search_by_topic", tool_description="Find documents discussing specific topics", tool_query=search_query, tool_params=[ ToolParam( name="topic", description="Topic to search for in documents" ) ], result_limit=20 ) # Deploy tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="DocumentIntelligenceServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Configuration Best Practices
Chunk Size Selection
- 8K context models: 400 tokens, 10% overlap
- 128K context models: 2000 tokens, 5% overlap
Model Selection
Configure tiers for cost optimization:
pythonlanguage_models = { "fast": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000), "balanced": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100000, output_tpm=50000), "accurate": fc.OpenAILanguageModel(model_name="gpt-4o", rpm=100, tpm=100000) }
Use appropriate models per task:
pythondf.select("*", fc.semantic.map( "Classify {{ text }}", text=fc.col("text"), model_alias="fast" ).alias("category")) df.select("*", fc.semantic.extract( fc.col("complex_doc"), Schema, model_alias="accurate" ).alias("extracted"))
Key Implementation Points
Fenic provides LangChain agents with structured preprocessing through:
- DataFrame abstraction for unstructured data handling
- Native text operations for chunking and parsing
- Automatic batching and rate limiting
- Type-safe extraction via Pydantic schemas
- MCP integration for seamless tool creation
- Built-in monitoring and lineage tracking
The separation of data preparation from agent reasoning reduces costs, improves reliability, and simplifies maintenance at scale.
Additional Resources
- Fenic GitHub Repository
- Build Reliable AI Pipelines with Fenic's Semantic Operators
- Eliminate Fragile Glue Code in AI Data Processing
- Augment LangChain with Typedef.ai for Transcript Processing
- Typedef.ai Platform How to Handle Unstructured I ... efcf08090bf62d0e2ec725af3.md External Displaying How to Handle Unstructured Inputs in LangChain Age 290df41efcf08090bf62d0e2ec725af3.md.

