Building production LLM applications means handling diverse data types simultaneously—text documents, audio transcripts, images, structured databases, and video content. LangChain provides orchestration for agent logic, but managing multimodal data at scale requires specialized infrastructure. Fenic offers a DataFrame-based data layer that preprocesses, enriches, and structures multimodal content before agents consume it.
The Multimodal Data Infrastructure Problem
Most valuable enterprise data exists across multiple modalities. Text documents come in markdown, PDFs, HTML, and plain text. Audio files need transcription with speaker identification. Images require vision model analysis. Structured data lives in CSVs, JSON, and databases. Video combines temporal, visual, and audio components.
Traditional data stacks fail at multimodal AI workloads. Query engines treat LLM calls as external black boxes, preventing optimization. Rate limits across model providers require manual coordination. Context window constraints force custom chunking logic. Different modalities need separate processing pipelines. The result: fragile glue code connecting OCR services, transcription APIs, vision models, LLM providers, vector databases, and warehouses.
Fenic solves this through an inference-first architecture where semantic operations are native DataFrame primitives rather than external functions.
Why Fenic for Multimodal LangChain Applications
Fenic provides a PySpark-inspired DataFrame API built specifically for AI workloads. The framework handles three critical capabilities:
Native multimodal types extend beyond standard data formats. MarkdownType parses document structure with operations like header-based chunking. TranscriptType handles SRT, WebVTT, and generic formats with speaker and timestamp awareness. JsonType enables JQ expressions for nested data manipulation. EmbeddingType represents vectors with semantic similarity operations. DocumentPathType loads PDFs and text files directly.
Semantic operations as DataFrame primitives include extraction with Pydantic schemas, classification with few-shot learning, clustering using embeddings, semantic joins based on meaning, and filtering with natural language predicates. The query engine understands when inference happens, enabling automatic batching, intelligent caching, and cost optimization.
Multi-provider orchestration offers unified interfaces for OpenAI, Anthropic, Google, and other providers with declarative rate limiting, automatic retry logic, and provider-specific fallbacks.
The recommended pattern separates concerns. Fenic handles batch preprocessing—loading multimodal data, extracting structure, generating embeddings, and preparing clean context. LangChain agents receive structured data and focus on planning and decision-making.
Session Configuration for Multimodal Workloads
Install Fenic and configure providers:
bashpip install fenic
Set up a session with multiple model tiers:
pythonfrom fenic.api.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, AnthropicLanguageModel, OpenAIEmbeddingModel ) config = SessionConfig( app_name="multimodal_integration", semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "accurate": AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, input_tpm=100_000, output_tpm=50_000 ), "reasoning": OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100_000 ) }, default_language_model="fast", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100_000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
This configuration establishes model tiers for cost optimization—use fast models for classification and extraction, reserve accurate models for complex reasoning.
Loading Multimodal Data Sources
Fenic provides specialized readers for different data types:
Text Documents
pythonimport fenic.api.functions as fc from fenic.api.functions import col # Markdown files docs_df = session.read.docs( "data/documents/**/*.md", content_type="markdown", recursive=True ) # Filter successful loads docs_df = docs_df.filter(col("error").is_null())
Structured Data
pythonfrom fenic.core.types.schema import Schema, ColumnField from fenic.core.types.datatypes import StringType, IntegerType # CSV with explicit schema csv_df = session.read.csv( "data/products.csv", schema=Schema([ ColumnField(name="product_id", data_type=IntegerType), ColumnField(name="description", data_type=StringType), ColumnField(name="category", data_type=StringType) ]) ) # JSON files json_df = session.read.json("data/logs/*.json")
Audio Transcripts
Fenic's TranscriptType handles multiple formats with unified parsing:
pythonfrom fenic.api.functions import text # Load transcript files transcript_df = session.read.docs( "transcripts/**/*.srt", content_type="markdown", recursive=True ) # Parse SRT format parsed_df = transcript_df.select( col("file_path"), text.parse_transcript(col("content"), "srt").alias("segments") ) # Explode into individual segments segments_df = parsed_df.select( col("file_path"), fc.explode(col("segments")).alias("segment") ).select( col("file_path"), col("segment.speaker").alias("speaker"), col("segment.start_time").alias("start_time"), col("segment.end_time").alias("end_time"), col("segment.content").alias("text") )
HuggingFace Datasets
Fenic 0.4.0 added direct HuggingFace integration:
python# Load from HuggingFace hf_df = session.read.csv("hf://datasets/squad/default/train.csv") # Parquet files hf_parquet = session.read.parquet( "hf://datasets/cais/mmlu/astronomy/*.parquet" )
PDF Metadata
pythonpdf_metadata = session.read.pdf_metadata( "data/pdfs/**/*.pdf", recursive=True ) # Filter by characteristics relevant_pdfs = pdf_metadata.filter( (col("page_count") < 100) & (col("page_count") > 10) )
Text Chunking for Multimodal Context
Proper chunking maintains relationships across modalities. Fenic provides three strategies:
Recursive Token Chunking
Split at natural boundaries while respecting token limits:
pythonfrom fenic.api.functions import text chunked_df = docs_df.select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") ) # Expand chunks into rows chunked_df = chunked_df.select( col("file_path"), col("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Character-Based Chunking
Fixed-size chunks for consistent processing:
pythonchunked_df = docs_df.select( col("file_path"), text.character_chunk( col("content"), chunk_size=1000, chunk_overlap_percentage=15 ).alias("chunks") ) # Expand chunks into rows chunked_df = chunked_df.select( col("file_path"), col("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Word-Based Chunking
Control chunk boundaries with custom delimiters:
pythonchunked_df = docs_df.select( col("file_path"), text.recursive_word_chunk( col("content"), chunk_size=200, chunk_overlap_percentage=10, chunking_character_set_custom_characters=['\n\n', '\n', '.', ' '] ).alias("chunks") ).explode("chunks")
Semantic Extraction Across Modalities
Extract structured data from unstructured multimodal content using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal from fenic.api.functions import semantic class ProductInfo(BaseModel): name: str = Field(description="Product name") price: float = Field(description="Price in USD") features: List[str] = Field(description="Key features") category: Literal["electronics", "furniture", "clothing", "other"] # Extract from text descriptions product_df = session.read.csv("product_descriptions.csv") extracted_df = product_df.select( col("product_id"), semantic.extract( col("description"), response_format=ProductInfo ).alias("product_data") ) # Flatten nested structure structured_df = extracted_df.select( col("product_id"), col("product_data.name").alias("name"), col("product_data.price").alias("price"), col("product_data.features").alias("features"), col("product_data.category").alias("category") )
Meeting Transcript Analysis
Extract action items and decisions from audio transcripts:
pythonclass MeetingInsight(BaseModel): speaker: str key_points: List[str] action_items: List[str] sentiment: Literal["positive", "neutral", "negative"] meeting_analysis = segments_df.with_column( "analysis", semantic.extract(col("text"), MeetingInsight) ).unnest("analysis") # Filter segments with action items actionable = meeting_analysis.filter( fc.length(col("action_items")) > 0 ) # Aggregate by speaker from fenic.api.functions.builtin import collect_list speaker_summary = actionable.group_by("speaker").agg( collect_list("action_items").alias("all_actions"), collect_list("key_points").alias("all_points") )
Embedding Generation for Multimodal Search
Generate embeddings for semantic similarity across data types:
pythonfrom fenic.api.functions import semantic, text # Generate embeddings for text chunks embedded_df = chunked_df.select( col("file_path"), col("chunk_text"), semantic.embed(col("chunk_text")).alias("embedding") ) # Add token counting embedded_df = embedded_df.with_column( text.count_tokens(col("chunk_text")).alias("token_count") )
Semantic Similarity Search
Query embeddings to find relevant content:
pythonfrom fenic.api.functions import embedding # Generate query embedding query_text = "machine learning deployment strategies" query_df = session.create_dataframe([{"query": query_text}]) query_embedding = query_df.select( semantic.embed(col("query")).alias("query_emb") ).collect()[0]["query_emb"] # Compute similarity scores results_df = embedded_df.select( col("file_path"), col("chunk_text"), embedding.compute_similarity( col("embedding"), query_embedding, metric="cosine" ).alias("similarity_score") ) # Get top matches top_results = results_df.order_by( col("similarity_score").desc() ).limit(10)
Classification for Multimodal Content
Classify content across different modalities:
pythonfrom fenic.api.functions.semantic import ClassDefinition categories = [ ClassDefinition( label="Technical Documentation", description="API docs, architecture guides, technical specifications" ), ClassDefinition( label="Business Content", description="Marketing materials, business plans, proposals" ), ClassDefinition( label="Support Material", description="FAQs, troubleshooting guides, user support content" ), ClassDefinition( label="Training Content", description="Tutorials, educational materials, onboarding guides" ) ] classified_df = docs_df.select( col("file_path"), col("content"), semantic.classify( col("content"), classes=categories ).alias("category") )
Transcript Classification
Categorize conversation segments:
python# Classify support call segments call_types = ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"] classified_segments = segments_df.with_column( "call_type", semantic.classify(col("text"), call_types) ) # Analyze distribution from fenic.api.functions.builtin import count type_distribution = classified_segments.group_by("call_type").agg( count("*").alias("segment_count") ).order_by(col("segment_count").desc())
Semantic Joins for Cross-Modal Relationships
Join datasets based on meaning rather than exact matches:
pythonfrom textwrap import dedent from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Create examples examples = JoinExampleCollection() examples.create_example(JoinExample( left="Senior Backend Engineer with Python, FastAPI, PostgreSQL", right="Backend Developer - Python/Go", output=True )) examples.create_example(JoinExample( left="Marketing Manager with social media expertise", right="Senior Software Engineer", output=False )) # Semantic join between job postings and candidate profiles matched_df = jobs_df.semantic.join( candidates_df, predicate=dedent(''' Job Requirements: {{ left_on }} Candidate Skills: {{ right_on }} The candidate meets the core requirements for this role. '''), left_on=col("job_requirements"), right_on=col("candidate_skills"), examples=examples )
Cross-Transcript Analysis
Link related discussions across different meeting transcripts:
python# Extract topics from meeting 1 meeting1_df = segments_df.filter( col("file_path").like("%meeting_1%") ).select( col("text").alias("meeting1_text") ) # Extract topics from meeting 2 meeting2_df = segments_df.filter( col("file_path").like("%meeting_2%") ).select( col("text").alias("meeting2_text") ) # Find related discussions related_discussions = meeting1_df.semantic.join( other=meeting2_df, predicate=""" Meeting 1: {{ left_on }} Meeting 2: {{ right_on }} These segments discuss the same topic or decision. """, left_on=col("meeting1_text"), right_on=col("meeting2_text") )
Semantic Clustering for Pattern Detection
Group similar content across modalities:
python# Generate embeddings df_with_embeddings = docs_df.select( col("document_id"), col("content"), semantic.embed(col("content")).alias("embeddings") ) # Cluster documents clustered_df = df_with_embeddings.semantic.with_cluster_labels( by=col("embeddings"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" ) # Analyze clusters from fenic.api.functions.builtin import first cluster_summary = clustered_df.group_by("cluster_id").agg( count("*").alias("document_count"), first(col("content")).alias("sample_content") )
Building Complete Multimodal Pipelines
Combine operations into end-to-end workflows:
pythonfrom fenic.api.functions import text, col, semantic, dt from fenic.api.functions.builtin import md5 # Load multimodal data raw_docs = session.read.docs( "data/docs/**/*.md", content_type="markdown", recursive=True ) # Complete pipeline final_df = ( raw_docs # Clean text .select( col("file_path"), text.trim(col("content")).alias("content") ) .filter(col("content") != "") # Chunk documents .select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=400, chunk_overlap_percentage=10 ).alias("chunks") ) .explode("chunks") .select( col("file_path"), col("chunks").alias("chunk_text") ) # Generate embeddings .with_column( "embedding", semantic.embed(col("chunk_text")) ) # Add metadata .with_column( "chunk_id", text.count_tokens(col("chunk_text")) # Use a proper hash or ID generation ) .with_column( "tokens", text.count_tokens(col("chunk_text")) ) .with_column( "processed_at", dt.current_timestamp() ) ) # Persist results final_df.write.save_as_table("multimodal_index", mode="overwrite")
Exporting to LangChain
Convert Fenic-processed data for LangChain consumption:
python# Save as table final_df.write.save_as_table("processed_documents", mode="overwrite") # Retrieve for LangChain processed_data = session.table("processed_documents") # Convert to pandas pandas_df = processed_data.to_pandas() # Create LangChain documents from langchain.schema import Document documents = [ Document( page_content=row["chunk_text"], metadata={ "file_path": row["file_path"], "chunk_id": row["chunk_id"], "tokens": row["tokens"], "embedding": row["embedding"] } ) for row in pandas_df.to_dict('records') ]
Export to Vector Databases
Prepare data for Pinecone, Weaviate, or similar stores:
pythonfrom fenic.api.functions.builtin import struct export_df = final_df.select( col("chunk_id"), col("chunk_text"), col("embedding"), struct( col("file_path"), col("tokens"), col("processed_at") ).alias("metadata") ) # Export to parquet export_df.write.parquet("output/vector_db_ready.parquet") # Or collect for direct insertion vector_records = export_df.collect() for record in vector_records: vector_db.upsert( id=record["chunk_id"], values=record["embedding"], metadata={ "text": record["chunk_text"], "file_path": record["metadata"]["file_path"], "tokens": record["metadata"]["tokens"] } )
Model Context Protocol Integration
MCP enables declarative tool creation for LangChain agents:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType # Create parameterized search tool search_query = session.table("multimodal_index").filter( col("chunk_text").contains( fc.tool_param("search_term", StringType) ) ).limit(fc.tool_param("limit", IntegerType)) session.catalog.create_tool( tool_name="search_documents", tool_description="Search processed documents by keyword", tool_query=search_query, tool_params=[ ToolParam( name="search_term", description="Keyword or phrase to search for" ), ToolParam( name="limit", description="Maximum results to return", default_value=10, has_default=True ) ], result_limit=50 )
Multi-Step Semantic Tools
Build tools that chain semantic operations:
python# Load support tickets tickets_df = session.read.csv("./data/support_tickets.csv") # Process with semantic operations processed_tickets = ( tickets_df .select( col("ticket_id"), col("subject"), col("description"), semantic.embed(col("description")).alias("embeddings") ) .semantic.with_cluster_labels( by=col("embeddings"), num_clusters=15, label_column="category_cluster" ) ) # Create similarity search tool similar_tickets_query = processed_tickets.filter( embedding.compute_similarity( col("embeddings"), fc.tool_param("query_vector", col("embeddings").data_type), metric="cosine" ) > 0.7 ).select( col("ticket_id"), col("subject"), col("category_cluster") ) session.catalog.create_tool( tool_name="find_similar_tickets", tool_description="Find support tickets similar to a query", tool_query=similar_tickets_query, tool_params=[ ToolParam( name="query_vector", description="Embedding vector of search query" ) ], result_limit=10 )
Deploying MCP Servers
Launch MCP servers for LangChain agent access:
Synchronous Server
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="MultimodalDataServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI Deployment
pythonfrom fenic.api.mcp import run_mcp_server_asgi app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy with uvicorn # uvicorn app:app --host 0.0.0.0 --port 8000
Async UDFs for External Services
Async UDFs enable concurrent API calls while maintaining DataFrame semantics:
pythonimport aiohttp import fenic.api.functions as fc from fenic.api.functions.builtin import async_udf from fenic.core.types import StructType, StructField, StringType, FloatType @async_udf( return_type=StructType([ StructField("sentiment", StringType), StructField("confidence", FloatType) ]), max_concurrency=15, timeout_seconds=5, num_retries=2 ) async def analyze_sentiment(text: str) -> dict: async with aiohttp.ClientSession() as session: async with session.post( "https://api.example.com/sentiment", json={"text": text} ) as resp: data = await resp.json() return { "sentiment": data["label"], "confidence": data["score"] } # Apply to DataFrame enriched_df = reviews_df.select( col("review_id"), col("review_text"), analyze_sentiment(col("review_text")).alias("sentiment_analysis") )
Performance Optimization
Batch Processing
Control memory and API usage through batching:
pythonlarge_df = session.read.docs("data/large_corpus/**/*.md", recursive=True) # Process the entire DataFrame processed_df = large_df.select( col("file_path"), semantic.embed(col("content")).alias("embedding") ) # Write results processed_df.write.parquet( "output/embeddings.parquet", mode="overwrite" )
Caching Expensive Operations
Materialize intermediate results:
python# Cache expensive semantic operations embeddings_df = documents_df.select( col("doc_id"), semantic.embed(col("content")).alias("embeddings") ) # Persist to catalog embeddings_df.write.save_as_table("document_embeddings", mode="overwrite") # Reuse in multiple tools cached_embeddings = session.table("document_embeddings")
Rate Limit Management
Configure per-model limits in session config:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500_000 ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100_000 ) }, default_language_model="fast" ) )
Monitoring and Metrics
Track query performance and costs:
python# Query metrics table metrics_df = session.table("fenic_system.query_metrics") # Analyze recent queries recent_queries = metrics_df.select( col("query_id"), col("total_lm_cost"), col("total_lm_requests"), col("end_ts") ).order_by(col("end_ts").desc()).limit(10) recent_queries.show() # Aggregate costs by time window from fenic.api.functions.builtin import sum as sum_func cost_analysis = session.sql(""" SELECT DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour, SUM(total_lm_cost) as total_cost, SUM(total_lm_requests) as total_requests FROM {metrics} GROUP BY hour ORDER BY hour DESC """, metrics=metrics_df) cost_analysis.show()
Production Best Practices
Chunk Size Selection
Choose sizes based on model context windows:
python# For 8K context models chunk_size = 400 # tokens overlap = 10 # percent # For 128K context models chunk_size = 2000 # tokens overlap = 5 # percent
Error Handling
Handle processing failures gracefully:
pythonfrom fenic.api.functions.builtin import when, lit processed_df = docs_df.with_column( "is_valid", when(col("error").is_null(), True).otherwise(False) ) # Separate successful and failed success_df = processed_df.filter(col("is_valid")) failed_df = processed_df.filter(~col("is_valid")) # Log failures failed_df.select( col("file_path"), col("error") ).write.csv("processing_errors.csv")
Incremental Processing
Avoid reprocessing existing data:
python# Save processing state processed_df.write.save_as_table("processed_documents", mode="append") # Load existing paths existing = session.table("processed_documents") existing_paths = existing.select(col("file_path")).distinct().collect() processed_paths = {row["file_path"] for row in existing_paths} # Filter new documents only new_docs = docs_df.filter( ~col("file_path").isin(list(processed_paths)) )
Complete Integration Example
End-to-end multimodal pipeline with LangChain integration:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType import fenic.api.functions as fc from fenic.api.functions import semantic, text from pydantic import BaseModel, Field from typing import List # Configure session config = SessionConfig( app_name="multimodal_integration", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100_000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100_000 ) } ) ) session = Session.get_or_create(config) # Load multimodal data docs_df = session.read.docs("data/**/*.md", content_type="markdown", recursive=True) transcripts_df = session.read.docs("transcripts/**/*.srt", content_type="markdown", recursive=True) # Define extraction schema class ContentInsight(BaseModel): summary: str = Field(description="Brief content summary") topics: List[str] = Field(description="Main topics") sentiment: str = Field(description="Overall sentiment") # Process documents processed_docs = ( docs_df .select( fc.col("file_path"), text.recursive_token_chunk( fc.col("content"), chunk_size=400, chunk_overlap_percentage=10 ).alias("chunks") ) .explode("chunks") .select( fc.col("file_path"), fc.col("chunks").alias("chunk_text") ) .with_column( "embedding", semantic.embed(fc.col("chunk_text")) ) .with_column( "insights", semantic.extract(fc.col("chunk_text"), ContentInsight) ) ) # Process transcripts parsed_transcripts = ( transcripts_df .select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) .explode("segments") .select( fc.col("file_path"), fc.col("segments.speaker").alias("speaker"), fc.col("segments.content").alias("text") ) .with_column( "category", semantic.classify( fc.col("text"), ["Question", "Answer", "Action Item", "Discussion"] ) ) ) # Save to catalog processed_docs.write.save_as_table("document_index", mode="overwrite") parsed_transcripts.write.save_as_table("transcript_index", mode="overwrite") # Create MCP tools session.catalog.create_tool( tool_name="search_content", tool_description="Search across documents and transcripts", tool_query=session.table("document_index").filter( fc.col("chunk_text").contains( fc.tool_param("query", StringType) ) ), tool_params=[ ToolParam(name="query", description="Search query") ], result_limit=20 ) # Deploy MCP server tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="MultimodalServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Key Takeaways
Fenic provides production-ready infrastructure for multimodal data integration in LangChain applications. The framework handles text, audio, structured data, and embeddings through a unified DataFrame API with native semantic operations.
By separating batch preprocessing from real-time agent execution, teams achieve faster response times, lower costs, and better debuggability. The declarative approach enables automatic optimization across model providers while maintaining familiar DataFrame semantics.
Integration with LangChain occurs through standard data formats, the Model Context Protocol, or direct programmatic access. This flexibility lets teams structure agentic workflows based on specific requirements rather than infrastructure constraints.
For teams building multimodal AI applications, Fenic eliminates fragile glue code connecting different services and APIs. The result is more maintainable, scalable infrastructure that brings structure to probabilistic AI systems.
Explore Fenic on GitHub, review integration guides, and learn about building reliable AI pipelines. How to Leverage Fenic DataFr ... efcf080ffbcd7fab66cc38f07.md External Displaying How to Leverage Fenic DataFrames in LangChain for 28fdf41efcf080ffbcd7fab66cc38f07.md.

