The most valuable data for AI systems lives in PDFs, audio recordings, and video files. Yet the path from raw documents and recordings to production AI applications remains a technical nightmare. Independent reports show that approximately 95% of enterprise GenAI pilots fail to reach measurable production impact, largely due to preprocessing infrastructure problems.
OCR models, transcription services, and document parsing require managing brittle microservices, handling multiple failure modes, coordinating rate limits, and writing extensive glue code just to feed clean data to AI systems. This guide shows how to build production-grade preprocessing pipelines using Fenic, the inference-first DataFrame framework from Typedef.ai.
The Preprocessing Bottleneck in AI Systems
Why Traditional Approaches Fail
Traditional data infrastructure was built for rows and columns, not the messy, multi-modal inputs that modern AI systems require. Teams building AI applications face several critical challenges:
Extraction complexity: Converting PDFs to text requires OCR models, each introducing latency, error rates, and operational overhead. Audio files need transcription services with speaker diarization. The preprocessing maze grows with each data type.
Format fragmentation: Transcripts arrive in SRT, WebVTT, and generic formats. Each requires custom parsing logic. PDFs contain tables, images, and complex layouts that generic text extraction misses.
Pipeline brittleness: Custom scripts connect OCR services, transcription APIs, LLM providers, and data warehouses. Each connection point adds failure modes, serialization overhead, and maintenance burden.
Rate limit management: Multiple model providers have different rate limits. Manual throttling code scatters throughout pipelines. One provider timeout cascades through the entire system.
Context handling: Documents exceed context windows. Teams write chunking logic that splits at arbitrary character counts, losing semantic coherence. Overlapping chunks create redundant processing costs.
The Cost of Fragile Glue Code
The typical preprocessing pipeline looks like this:
python# Scattered custom code across multiple files def process_documents(pdfs): texts = [] for pdf in pdfs: text = ocr_service.extract(pdf) # Manual OCR chunks = custom_chunk_function(text, max_tokens=1000) # Manual chunking for chunk in chunks: time.sleep(0.5) # Manual rate limiting summary = llm_api.summarize(chunk) # Manual API calls texts.append(summary) return combine_summaries(texts) # Manual aggregation
This approach creates:
- Development velocity collapse: 80% of time spent on infrastructure, 20% on features
- Production failures at scale: Rate limit errors cascade through pipelines
- Debugging nightmares: Tracing failures through dozens of custom scripts
- Cost explosion: Duplicate API calls from poor caching strategies
When RudderStack piloted Typedef's preprocessing approach, they cut triage time by 95% by replacing fragile glue code with structured pipelines.
Structured Approaches to Document Preprocessing
PDF Processing with Native Support
Fenic treats PDF processing as a first-class operation rather than an external service. The framework provides semantic.parse_pdf() for converting PDF files to structured markdown with intelligent handling of images, tables, and page boundaries.
pythonimport fenic as fc # Configure session with model provider config = fc.SessionConfig( app_name="document_processor", semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = fc.Session.get_or_create(config) # Load PDF metadata pdf_metadata = session.read.pdf_metadata("data/docs/**/*.pdf", recursive=True) # Parse PDFs into markdown with page separators pdf_markdown = pdf_metadata.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ).alias("markdown_content") ) # Access structured content pdf_markdown.show()
Key capabilities:
- Automatic page separation with customizable separators
- Image description for tables and charts
- Native integration with Google File API for Gemini models
- Batch processing of entire directories with glob patterns
The pdf_metadata() reader extracts comprehensive metadata before processing:
- File path, size, and error handling
- Title, author, creation and modification dates
- Page count, image count
- Form fields and signature detection
- Encryption status
This metadata enables intelligent preprocessing decisions before expensive model calls.
Transcript Processing with Format Awareness
Transcription output varies widely across services. Fenic's parse_transcript() function normalizes SRT, WebVTT, and generic conversation formats into a unified schema.
pythonimport fenic as fc from fenic.api.functions import text # Load transcript files df = session.read.docs( "transcripts/**/*.txt", content_type="markdown", recursive=True ) # Parse SRT format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments into rows segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("segment") ).select( fc.col("file_path"), fc.col("segment.index").alias("index"), fc.col("segment.speaker").alias("speaker"), fc.col("segment.start_time").alias("start_time"), fc.col("segment.end_time").alias("end_time"), fc.col("segment.content").alias("text") )
Unified schema output:
index: Entry number (1-based)speaker: Speaker identifier when availablestart_time: Timestamp in secondsend_time: End timestamp in secondsduration: Segment durationcontent: Transcript textformat: Original format type
This normalization eliminates format-specific parsing code throughout your pipeline. All downstream operations work with consistent structured data regardless of input format.
Semantic Extraction for Structured Data
Schema-Driven Extraction with Type Safety
The most powerful preprocessing technique is structured extraction: converting unstructured text into validated, type-safe data structures. This eliminates prompt engineering brittleness where AI systems hallucinate fields or formats.
Fenic integrates Pydantic schemas directly into the extraction process:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class ActionItem(BaseModel): task: str = Field(description="The action to be taken") owner: str = Field(description="Person responsible") deadline: str = Field(description="Due date if mentioned") priority: Literal["urgent", "high", "medium", "low"] # Extract structured action items from meeting transcripts action_items_df = segments_df.select( fc.col("file_path"), fc.col("text"), fc.semantic.extract( fc.col("text"), ActionItem ).alias("action_item") ).filter( fc.col("action_item").is_not_null() )
Benefits of schema-driven extraction:
- Validation happens at preprocessing time, not during runtime when failures are costly
- Type safety means downstream systems can rely on data structures without defensive checks
- Lineage tracking connects extracted entities back to source documents
- Debugging becomes tractable when extraction errors occur
The Matic Insurance case study demonstrates this impact: semantic extraction pipelines across thousands of policies and transcripts, built in days rather than months, dramatically reducing human analysis errors.
Handling Nested and Complex Schemas
Real-world documents contain nested structures. Fenic handles hierarchical extraction:
pythonclass Issue(BaseModel): description: str severity: Literal["critical", "high", "medium", "low"] affected_systems: List[str] class TicketData(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] region: Literal["us", "eu", "apac"] issues: List[Issue] tickets = df.with_column( "extracted", fc.semantic.extract(fc.col("raw_ticket"), TicketData) ).unnest("extracted").explode("issues") # Filter for critical bugs in APAC region critical_bugs = tickets.filter( (fc.col("region") == "apac") & (fc.col("issues.severity") == "critical") )
The unnest() operation flattens nested structures while the explode() operation converts arrays into rows, enabling standard DataFrame operations on hierarchical data.
Managing Rate Limits and Multi-Provider Orchestration
Declarative Model Configuration
Production systems require reliability across multiple LLM providers with different rate limits, pricing models, and capabilities. Fenic's session configuration handles multi-provider orchestration with built-in throttling:
pythonconfig = fc.SessionConfig( app_name="production_pipeline", semantic=fc.SemanticConfig( language_models={ "fast": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=500, tpm=200000 ), "accurate": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100000 ), "cheap": fc.AnthropicLanguageModel( model_name="claude-haiku-4-0", rpm=300, input_tpm=150000, output_tpm=75000 ) }, default_language_model="fast" ) )
Model selection strategy:
- Use "fast" for bulk classification tasks
- Use "accurate" for critical extraction where errors are costly
- Use "cheap" for simple transformations
Fenic handles rate limiting, token counting, and automatic throttling across all providers. The framework self-throttles based on provider limits and adjusts request rates dynamically.
Automatic Batching and Optimization
The query engine optimizes entire pipelines before execution:
pythonpipeline = ( df .filter(fc.col("priority") == "high") .select( "*", fc.semantic.extract( fc.col("content"), TicketSchema, model_alias="fast" ).alias("ticket_info") ) .filter( fc.semantic.predicate( "The sentiment {{ sentiment }} is frustrated", sentiment=fc.col("ticket_info.sentiment") ) ) ) # Trigger optimized execution result = pipeline.collect()
Optimization capabilities:
- Automatic request batching for maximum throughput
- Intelligent caching of repeated inference patterns
- Cost optimization by identifying opportunities for smaller models
- Operation reordering to minimize expensive operations
- Accurate cost estimates before execution
Handling Context Windows and Document Chunking
Intelligent Chunking with Structure Awareness
Generic text splitting at arbitrary character counts loses semantic coherence. Fenic respects document structure through specialized data types.
MarkdownType treats Markdown as a first-class data type with native parsing:
python# Parse markdown and extract chunks based on headers df = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks( fc.col("raw_blog"), header_level=2 ) ) .explode("chunks") )
This approach chunks documents at natural boundaries (sections, paragraphs, headings) while maintaining configurable token limits and overlap.
Transcript Temporal Chunking
For transcripts, chunking by time windows preserves conversation flow:
python# Add 5-minute time buckets windowed_df = segments_df.with_column( "time_bucket", (fc.col("start_time") / 300).cast("integer") * 300 ) # Aggregate segments within each window temporal_chunks = windowed_df.group_by("file_path", "time_bucket").agg( fc.semantic.reduce( "Summarize this conversation segment", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") )
This creates time-based segments that maintain conversational context while keeping content within model context windows.
Production Patterns for Preprocessing Pipelines
Batch Preprocessing with Persistent Results
Separate heavy preprocessing from real-time operations:
python# Preprocessing stage: run offline preprocessed = ( raw_documents .with_column( "metadata", fc.semantic.extract( fc.col("content"), DocumentMetadata ) ) .with_column( "document_type", fc.semantic.classify( fc.col("content"), classes=["policy", "claim", "correspondence", "legal"], model_alias="fast" ) ) .persist() ) # Write preprocessed data preprocessed.write.save_as_table("preprocessed_documents") # Runtime: fast queries on preprocessed data relevant_docs = ( session.table("preprocessed_documents") .filter(fc.col("document_type") == "claim") )
This architectural pattern delivers:
- Predictable runtime performance
- Better resource utilization through batched API calls
- Clear separation between preprocessing and application logic
- Testable preprocessing independently from application code
Error Handling and Resilience
Production pipelines require robust error handling:
python# Add error columns for graceful degradation safe_df = segments_df.with_column( "classification", fc.when( fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 0), fc.semantic.classify( fc.col("text"), ["Type A", "Type B"], model_alias="fast" ) ).otherwise(fc.lit("unclassified")) ) # Track failures separately failed_df = segments_df.filter( fc.col("text").is_null() ).select( fc.col("file_path"), fc.lit("Null text content").alias("error") ) failed_df.write.csv("processing_errors.csv")
Fenic includes built-in retry logic and exponential backoff for transient failures. The framework handles rate limits and API errors gracefully without crashing entire pipelines.
Semantic Operations for Enhanced Preprocessing
Classification with Few-Shot Examples
Categorize preprocessed content using semantic classification:
pythonfrom fenic.api.functions import semantic # Classify support call segments classified_df = segments_df.with_column( "category", semantic.classify( fc.col("text"), ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"] ) ) # Aggregate by classification category_summary = classified_df.group_by("category").agg( fc.count("*").alias("count"), fc.avg(fc.col("end_time") - fc.col("start_time")).alias("avg_duration") )
Sentiment Analysis for Conversational Data
Track sentiment across transcripts:
python# Add sentiment scores sentiment_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Aggregate by speaker speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") )
Semantic Filtering with Natural Language Predicates
Filter preprocessed data using natural language:
python# Find segments discussing specific topics relevant_df = segments_df.filter( semantic.predicate( "This text discusses product features or roadmap planning: {{ text }}", text=fc.col("text") ) )
This eliminates brittle keyword matching and handles variations in language naturally.
Lineage Tracking and Debugging
Row-Level Lineage
When preprocessing produces unexpected results, lineage tracking enables precise debugging:
python# Build preprocessing pipeline processed_df = ( df .with_column("parsed", fc.text.parse_transcript(fc.col("content"), "srt")) .with_column("extracted", fc.semantic.extract(fc.col("parsed"), Schema)) .filter(fc.semantic.predicate("Filter condition", text=fc.col("extracted"))) ) # Access lineage information lineage = processed_df.lineage() # Trace backwards from problematic result rows source_rows = lineage.backward(["result_uuid1", "result_uuid2"]) # Trace forwards from source rows result_rows = lineage.forward(["source_uuid1"])
Lineage capabilities:
- Trace every output back through transformations and prompts
- Identify which source document provided context
- See which model generated each classification
- Access token costs and timing for each operation
Query Metrics and Cost Tracking
Monitor preprocessing pipeline performance:
pythonresult = pipeline.collect() # Access comprehensive metrics print(f"Query duration: {result.metrics.query_duration_ms}ms") print(f"Total tokens: {result.metrics.lm_metrics.total_tokens}") print(f"Total cost: ${result.metrics.lm_metrics.total_cost}") # Operator-level metrics for op_metric in result.metrics.operator_metrics: print(f"Operator: {op_metric.operator_name}") print(f"Duration: {op_metric.duration_ms}ms")
This observability transforms preprocessing development from trial-and-error to data-driven optimization.
Complete Preprocessing Pipeline Example
Here's a production-ready pipeline combining all techniques:
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import List, Literal # Define extraction schema class MeetingInsight(BaseModel): action_items: List[str] decisions: List[str] discussion_topics: List[str] sentiment: Literal["positive", "neutral", "negative"] # Configure session config = fc.SessionConfig( app_name="meeting_processor", semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = fc.Session.get_or_create(config) # Load and parse transcripts raw_df = session.read.docs( "transcripts/**/*.srt", content_type="markdown", recursive=True ) # Parse transcripts parsed_df = raw_df.select( fc.col("file_path"), fc.text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.col("file_path"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.start_time").alias("start_time"), fc.col("seg.content").alias("text") ) # Enrich with semantic analysis enriched_df = segments_df.with_column( "category", fc.semantic.classify( fc.col("text"), ["Technical", "Business", "Administrative", "Other"] ) ) # Generate summaries per file summaries_df = enriched_df.group_by("file_path").agg( fc.count("*").alias("segment_count"), fc.semantic.reduce( "Create a concise summary of this conversation", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ) # Extract structured insights insights_df = summaries_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("summary"), MeetingInsight ).alias("insights") ) # Save results insights_df.write.save_as_table("processed_meetings", mode="overwrite") # Access metrics result = insights_df.collect() print(f"Processed {len(result)} meetings") print(f"Total cost: ${result.metrics.lm_metrics.total_cost}")
Moving from Prototype to Production
Local Development to Cloud Deployment
Fenic enables seamless scaling from laptop to production:
python# Local development local_session = fc.Session.get_or_create(fc.SessionConfig( app_name="dev_pipeline" )) df = local_session.read.csv("local_data.csv") processed = df.select( fc.semantic.extract(fc.col("text"), Schema).alias("extracted") ) # Same code, cloud execution cloud_config = fc.SessionConfig( app_name="production_pipeline", cloud=fc.CloudConfig( size=fc.CloudExecutorSize.MEDIUM ) ) cloud_session = fc.Session.get_or_create(cloud_config) df = cloud_session.read.csv("s3://bucket/data/*.csv") processed = df.select( fc.semantic.extract(fc.col("text"), Schema).alias("extracted") )
Zero code changes required for production deployment. The framework handles infrastructure scaling automatically.
Monitoring and Observability
Track preprocessing pipeline health in production:
python# Access system metrics table metrics = session.table("fenic_system.query_metrics") # Analyze costs by operation metrics.group_by("operation").agg( fc.count("*").alias("call_count"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency") ).order_by(fc.col("total_cost").desc()).show()
This visibility enables teams to identify bottlenecks, optimize costs, and ensure preprocessing pipelines remain healthy under load.
Key Takeaways
Preprocessing challenges in OCR and transcription stem from fragile glue code, format fragmentation, and operational complexity. The shift to structured, inference-first pipelines changes what's possible with AI systems.
Critical patterns for production preprocessing:
- Use schema-driven extraction with Pydantic models for type-safe, validated outputs
- Treat semantic operations as first-class DataFrame operations, not external API calls
- Separate batch preprocessing from real-time operations for predictable performance
- Leverage specialized data types (MarkdownType, TranscriptType) for structure-aware processing
- Configure multi-provider orchestration declaratively with automatic rate limiting
- Implement row-level lineage for debugging and optimization
- Cache expensive operations and measure costs at every pipeline stage
The Fenic framework provides the foundation: inference-first architecture, native unstructured data types, automatic optimization, and production-ready reliability features. Companies using this approach report building pipelines in days rather than months, with 95% time savings on preprocessing tasks.
For teams building AI applications at scale, the preprocessing layer isn't just infrastructure—it determines whether systems deliver consistent value or remain trapped in pilot paralysis. Typedef transforms preprocessing from operational overhead to strategic advantage.
Additional Resources
- Fenic GitHub Repository
- Typedef Open Source Announcement
- Building Reliable AI Pipelines
- Eliminating Fragile Glue Code
- Transcript Processing Guide
- Latest Release Notes How to Tackle Preprocessing ... 3a14c80bfa98ceab07be3c8ce.md External Displaying How to Tackle Preprocessing Challenges in OCR and 2aa10da3a14c80bfa98ceab07be3c8ce.md.

