Data fragmentation blocks production deployment of multimodal AI systems. When processing text, images, audio, video, and structured data together, traditional architectures create isolated pipelines where each data type requires separate handling. This fragmentation creates scattered preprocessing scripts, disconnected inference services, and brittle integration code that fails under production load.
The Data Fragmentation Problem
Root Causes
Multimodal AI systems handle diverse input types: PDFs requiring OCR, audio files needing transcription, images requiring vision analysis, and structured databases. Legacy data infrastructure treats each modality as a separate problem.
This separation creates three failure modes:
Architectural silos: OCR pipelines run independently from transcription services. Vector databases operate separately from data warehouses. Each component has distinct APIs, authentication, rate limits, and error handling.
Data movement overhead: Moving data between specialized systems consumes latency and compute. Serialization and deserialization at each boundary adds processing time and creates opportunities for data loss.
Context loss: Processing fragments across disconnected systems breaks semantic relationships between modalities. A product mentioned in meeting transcripts and support tickets appears in separate systems with no connection.
Impact on Development and Operations
Teams report spending 80% of development time managing infrastructure versus building features. Simple changes require updates across multiple disconnected systems. Testing becomes difficult with numerous external dependencies creating unpredictable failures.
Production deployments face cascading failures when rate limits trigger. Model API changes break workflows. Debugging requires tracing through dozens of custom scripts with limited visibility.
Cost optimization lacks unified visibility. Duplicate API calls waste resources through poor caching. Expensive models run where cheaper alternatives suffice. Teams lack metrics showing which operations drive spending.
Inference-First Architecture
Treating Inference as Native Data Operations
Fenic rebuilds the query engine with inference awareness built in. Rather than adding LLM calls onto systems designed for deterministic operations through User Defined Functions, semantic operations become native DataFrame primitives.
This shift enables the query engine to know exactly when inference happens:
- Intelligent batching groups API calls for maximum throughput while respecting provider limits
- Smart caching reuses inference results across pipeline stages without manual management
- Cost optimization routes simple tasks to smaller, cheaper models while preserving accuracy
- Operation reordering minimizes expensive inference calls
DataFrames for Multimodal Processing
The DataFrame abstraction brings structure to probabilistic systems. Even with stochastic inference across OCR, transcription, and LLM operations, DataFrames provide:
Lineage tracking: Every column and row maintains traceable origins, even from model outputs. This enables debugging by tracing individual records through transformations.
Columnar consistency: Summaries, embeddings, and scores maintain structured representations throughout the pipeline. This simplifies downstream processing.
Deterministic transformations: Wrapping inference in the pattern model + prompt + input → output enables caching, versioning, and debugging. The framework treats probabilistic operations as predictable components.
Unified Multimodal Processing Implementation
AI-Native Data Types
Fenic's type system eliminates preprocessing steps with first-class column types for multimodal content:
- MarkdownType: Native markdown parsing with structure extraction as a DataFrame operation
- TranscriptType: Handles SRT, WebVTT, and generic formats with speaker awareness and timestamps
- JsonType: Manipulates nested JSON using JQ expressions within DataFrame operations
- EmbeddingType: Fixed-length vectors with similarity operations as native functions
- DocumentPathType: Loads PDFs, documents, and text files from local or remote sources
These types integrate with semantic operators, enabling sophisticated pipelines through a unified interface.
Processing Multiple Modalities in Single Pipelines
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import List, Literal class SupportAnalysis(BaseModel): issue_category: Literal["bug", "feature_request", "question"] sentiment: Literal["frustrated", "neutral", "satisfied"] priority_indicators: List[str] technical_context: str # Configure session with multiple providers config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "accurate": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=200, input_tpm=100_000, output_tpm=50_000 ) }, default_language_model="fast" ) ) session = fc.Session.get_or_create(config) # Process multiple modalities in unified pipeline results = ( session.read.docs("/data/support_tickets/", content_type="markdown", recursive=True) .with_column("transcript", fc.col("file_path").cast(fc.TranscriptType)) .with_column("markdown_content", fc.col("content")) .select( "*", fc.semantic.extract( fc.col("transcript"), SupportAnalysis, model_alias="accurate" ).alias("analysis") ) .filter(fc.col("analysis.priority_indicators").size() > 0) .with_column( "embeddings", fc.semantic.embed(fc.col("analysis.technical_context")) ) ) results.show()
This pipeline processes transcripts, markdown documents, and structured data through a single DataFrame operation. The framework handles rate limiting, batching, and error recovery automatically.
Semantic Operations for Content Processing
Schema-Driven Extraction
The semantic.extract operator transforms unstructured text into structured data using Pydantic schemas:
pythonfrom pydantic import BaseModel from typing import Literal, List class PolicyInsight(BaseModel): risk_level: Literal["low", "medium", "high", "critical"] coverage_gaps: List[str] recommendations: List[str] # Extract structure from unstructured documents policies = ( df.select( "*", fc.semantic.extract( fc.col("policy_text"), PolicyInsight ).alias("insights") ) .filter(fc.col("insights.risk_level") == "critical") )
The schema serves as both documentation and validation.
Semantic Joins Across Modalities
Traditional joins require exact key matches. Semantic joins enable matching based on meaning:
python# Join candidates with jobs based on semantic fit prompt = """ Is this candidate a good fit for the job? Candidate Background: {{left_on}} Job Requirements: {{right_on}} Consider: - Technical skill alignment - Experience level match - Domain expertise """ matched = ( applicants .semantic.join( other=jobs, predicate=prompt, left_on=fc.col("resume"), right_on=fc.col("job_description") ) .order_by(fc.col("application_date")) .limit(10) )
This handles nuanced matching that keyword-based systems cannot capture while maintaining DataFrame join semantics.
Multi-Provider Model Management
Configure multiple model providers declaratively, routing operations based on accuracy requirements and cost:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "standard": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=200, input_tpm=100_000, output_tpm=50_000 ), "power": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100_000 ) }, default_language_model="standard" ) ) # Route operations to appropriate models df.select( "*", # Fast, cheap model for simple classification fc.semantic.classify( fc.col("text"), classes=["support", "sales", "technical"], model_alias="nano" ).alias("category"), # Powerful model for extraction fc.semantic.extract( fc.col("complex_doc"), DetailedSchema, model_alias="power" ).alias("extracted") )
The framework handles provider-specific rate limits, retry logic, and error patterns while exposing consistent interfaces.
Batch Processing and Parallel Operations
Async UDFs for Concurrent I/O
Fenic 0.4.0's async UDF support enables parallel API calls and database lookups across rows while maintaining type safety:
pythonimport fenic as fc from fenic.core.types import IntegerType import aiohttp @fc.async_udf( return_type=IntegerType, max_concurrency=10, timeout_seconds=5, num_retries=2 ) async def enrich_customer_data(customer_id: int) -> int: async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/customer/{customer_id}" ) as resp: data = await resp.json() return data["lifetime_value"] # Apply parallel enrichment enriched = df.select( fc.col("customer_id"), enrich_customer_data(fc.col("customer_id")).alias("ltv") )
This maximizes throughput on I/O-bound workloads while preserving DataFrame semantics. Configurable concurrency, automatic retries, and timeouts ensure production-safe execution.
Directory Content Loading
Process entire directories of multimodal content in single operations:
python# Load all documents from directory docs = session.read.docs( "/data/product_docs/", content_type="markdown", recursive=True ) # Process all documents in batch processed = ( docs .with_column("sections", fc.markdown.extract_header_chunks( fc.col("content"), header_level=2 )) .explode("sections") .select( "*", fc.semantic.extract( fc.col("sections.content"), DocumentationSchema ).alias("structured") ) )
The framework handles file loading and parallelization automatically.
Production Reliability
Automatic Optimization
Fenic's lazy evaluation enables query optimization across entire pipelines:
python# Define pipeline pipeline = ( df .filter(fc.col("priority") == "high") .select( "*", fc.semantic.extract(fc.col("content"), TicketSchema).alias("ticket") ) .filter(fc.semantic.predicate( "The sentiment {{sentiment}} indicates frustration", sentiment=fc.col("ticket.sentiment") )) .semantic.join( other=knowledge_base, predicate="Issue {{left_on}} matches solution {{right_on}}", left_on=fc.col("ticket.issue"), right_on=fc.col("solution_text") ) ) # Trigger optimized execution results = pipeline.collect()
The query planner reorders operations to minimize expensive inference calls, batches similar operations, and applies caching automatically.
Cost and Performance Tracking
Built-in metrics provide visibility into inference costs:
python# Query local metrics metrics = session.table("fenic_system.query_metrics") cost_analysis = ( metrics .select( "query_id", "model", "latency_ms", "cost_usd", "num_input_tokens", "num_output_tokens" ) .order_by(fc.col("cost_usd").desc()) ) cost_analysis.show()
This enables teams to identify optimization opportunities and track spending.
Error Handling
Production pipelines require robust error handling. The framework provides built-in retry logic, rate limiting, and error reporting:
python# Access execution metrics result = pipeline.collect() # Access metrics from the QueryResult print(f"Execution time: {result.metrics.execution_time}ms") print(f"Rows processed: {result.metrics.rows_processed}") # Note: Detailed token-level metrics are available in the session's query_metrics table metrics_df = session.table("fenic_system.query_metrics")
The framework handles rate limits and transient API failures with comprehensive logging.
Production Results
RudderStack Case Study
RudderStack's implementation achieved production-scale results. The warehouse-native CDP processes unstructured inputs from sales calls, support tickets, and documentation.
Results:
- 95% reduction in PM time per triage task
- 90% first-pass category acceptance rate
- Automatic citation of prospect signals and feedback
- Semantic linking between issues and PRDs for explainable decisions
The system ingests support ticket threads, sales call transcripts, product documentation, and Notion PRDs into unified warehouse context. Semantic indexing creates connections across data sources, enabling the triage agent to classify requests, surface related work, and propose decisions with rationale.
Enterprise Analytics Results
Product managers previously spent weeks manually processing data for basic queries. With unified multimodal processing, they query and analyze diverse datasets using LLM categorizations, achieving 100x time savings.
Insurance Deployment
Insurance companies deploying semantic extraction pipelines across thousands of policies and transcripts report completion in days rather than months. The unified approach reduces errors from human analysis, cuts costs, and lowers operational risk.
Implementation Best Practices
Define Schemas Once
Use Pydantic models as single sources of truth:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class SupportTicket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] issue_type: Literal["bug", "feature", "question"] priority: int = Field(ge=1, le=5) technical_details: List[str] # Reuse schema across pipeline extracted = df.select( "*", fc.semantic.extract(fc.col("raw_text"), SupportTicket).alias("ticket") ) validated = extracted.filter(fc.col("ticket.priority") >= 3)
This eliminates prompt brittleness and manual validation.
Use Lazy Evaluation
Build complete pipelines before execution:
python# Define multi-stage pipeline pipeline = ( df .filter(initial_conditions) .select( "*", fc.semantic.extract(fc.col("content"), Schema).alias("extracted") ) .join(reference_data, on="key_column") .with_column( "category", fc.semantic.classify(fc.col("extracted.field"), classes=["A", "B", "C"]) ) .cache() ) # Execute when ready results = pipeline.collect()
The optimizer works across the entire pipeline.
Route to Appropriate Models
Configure model tiers and route based on task requirements:
pythonlanguage_models = { "nano": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "standard": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=200, input_tpm=100_000, output_tpm=50_000 ), "power": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100_000 ) } # Route based on complexity df.select( "*", fc.semantic.classify( fc.col("text"), classes=["A", "B", "C"], model_alias="nano" ).alias("category"), fc.semantic.extract( fc.col("document"), ComplexSchema, model_alias="power" ).alias("extracted") )
This balances accuracy with cost.
Scaling to Production
Local Development
Develop and test pipelines locally with full engine capability:
python# Local development session = fc.Session.get_or_create() df = session.read.csv("sample_data.csv") processed = df.select( "*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted") ) processed.write.parquet("results.parquet")
Cloud Deployment
Same code scales to cloud execution:
python# Production configuration config = fc.SessionConfig( cloud=fc.CloudConfig( size=fc.CloudExecutorSize.MEDIUM ) ) session = fc.Session.get_or_create(config) # Same code, cloud-scale execution df = session.read.csv("s3://bucket/production_data/*.csv") processed = df.select( "*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted") ) processed.write.parquet("s3://bucket/results/output.parquet")
Zero code changes between local testing and production.
Getting Started
Installation
bashpip install fenic
Basic Configuration
pythonimport fenic as fc from pydantic import BaseModel # Configure providers config = fc.SessionConfig( app_name="multimodal_pipeline", semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ) } ) ) session = fc.Session.get_or_create(config)
First Unified Pipeline
pythonfrom pydantic import BaseModel from typing import List class ContentInsights(BaseModel): summary: str key_topics: List[str] sentiment: str # Load data df = session.read.csv("feedback.csv") # Process through unified pipeline from fenic.core.types import StringType insights = ( df .select( "*", fc.semantic.extract( fc.col("feedback"), ContentInsights ).alias("insights") ) .with_column( "topic_embedding", fc.semantic.embed( fc.col("insights.key_topics").cast(StringType) ) ) .semantic.with_cluster_labels( by=fc.col("topic_embedding"), num_clusters=5, label_column="cluster" ) .group_by("cluster") .agg( fc.semantic.reduce( "Summarize common themes", fc.col("feedback") ) ) ) insights.show()
Related Resources
- Fenic GitHub Repository
- Fenic Open Source Announcement
- Fenic 0.4.0 Release Notes
- Building Reliable AI Pipelines Guide
- Eliminating Fragile Glue Code
- Typedef Platform
Summary
Data fragmentation in multimodal AI systems stems from architectural mismatches between traditional data infrastructure and inference-first workloads. Treating semantic operations as first-class DataFrame primitives eliminates brittle glue code while maintaining familiar development patterns.
The inference-first architecture provides automatic optimization, intelligent batching, and unified processing across text, images, audio, and structured data. Specialized data types handle multimodal content natively. Production features including automatic error handling, cost tracking, and lineage support ensure reliable deployment.
Organizations report 95% time reductions in workflows, 100x productivity gains on data analysis, and the ability to ship AI pipelines in days instead of months. Unifying fragmented preprocessing across a single DataFrame abstraction enables teams to build deterministic workflows on probabilistic models at production scale. How to Solve Data Fragmentati ... 1efcf080f7acd6d202cb1fbc9d.md External Displaying How to Solve Data Fragmentation in Multimodal AI I 2aadf41efcf080f7acd6d202cb1fbc9d.md.

