Production AI systems process PDFs, transcripts, markdown documents, and embeddings. Traditional data platforms treat these as generic text or binary blobs, forcing developers to build custom preprocessing with external services.
The multimodal AI market reached $2.36 billion in 2024 and projects to $93.99 billion by 2035. Most data infrastructure wasn't built for this scale.
Fenic from Typedef provides native multimodal data types with specialized operations. Instead of managing embeddings through external vector databases or treating markdown as generic strings, these types integrate directly into DataFrame operations.
Native Multimodal Data Types in Fenic
Fenic supports five specialized data types for AI workloads:
- MarkdownType - Parse and extract structure from markdown documents
- TranscriptType - Process SRT, WebVTT formats with speaker and timestamp awareness
- JsonType - Manipulate nested JSON using JQ expressions
- HtmlType - Handle raw HTML markup
- EmbeddingType - Fixed-length vector operations with similarity functions
These aren't metadata tags. They unlock type-specific operations within the DataFrame API.
Traditional vs Native Approach
Traditional preprocessing scatters operations across services:
python# External services and manual orchestration def process_document(pdf): text = ocr_service.extract(pdf) chunks = custom_chunk_function(text, max_tokens=1000) embeddings = [] for chunk in chunks: time.sleep(0.5) # Manual rate limiting embedding = embedding_api.embed(chunk) embeddings.append(embedding) return chunks, embeddings
Fenic's declarative approach:
pythonimport fenic as fc df = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("raw_blog", header_level=2) ) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks").content) ) )
The query engine handles batching, rate limiting, and error handling. It optimizes across the entire pipeline.
Working with MarkdownType
Markdown contains semantic structure through headers, lists, and formatting. Generic text splitting destroys this structure.
Parse PDFs to Markdown
Fenic 0.5.0 added PDF-to-markdown parsing:
pythonimport fenic as fc session = fc.Session.get_or_create( fc.SessionConfig( app_name="pdf_processor", semantic=fc.SemanticConfig( language_models={ "gemini": fc.GoogleDeveloperLanguageModel( model_name="gemini-2.0-flash", rpm=100, tpm=1000, ) }, default_language_model="gemini", ), ) ) # Read PDF metadata pdfs = session.read.pdf_metadata("data/docs/**/*.pdf", recursive=True) # Parse to markdown with page separators markdown = pdfs.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True, ).alias("markdown"), )
Key features:
- Returns
MarkdownTypecolumn for downstream operations - Page separators preserve context boundaries
- Enables parallel processing of individual pages
- Image descriptions for multimodal content
Structure-Aware Chunking
Header-based chunking respects document organization:
pythonprocessed = ( df .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column( "sections", fc.markdown.extract_header_chunks("doc", header_level=2) ) .explode("sections") )
Benefits:
- Chunks align with content sections, not arbitrary character counts
- Maintains semantic coherence for RAG systems
- Preserves context relationships
Two-Stage Chunking Strategy
For long documents, combine markdown chunking with token-aware splitting:
pythonchunked = ( df .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column( "sections", fc.markdown.extract_header_chunks("doc", header_level=2) ) .explode("sections") .with_column( "subsections", fc.text.recursive_token_chunk( "sections", chunk_size=1200, chunk_overlap_percentage=0 ) ) .explode("subsections") )
This maintains structural boundaries while ensuring chunks fit model context windows.
Embeddings as First-Class Data Types
EmbeddingType treats vectors as native DataFrame columns. No external vector database required for basic operations.
Generate Embeddings at Scale
Automatic batching with rate limiting:
pythonimport fenic as fc config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "text": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ) }, default_language_model="text" ) ) session = fc.Session.get_or_create(config) embeddings_df = ( df .with_column("content_embedding", fc.semantic.embed(fc.col("content"))) .with_column("title_embedding", fc.semantic.embed(fc.col("title"))) )
The engine:
- Batches embedding requests across rows
- Respects provider rate limits via
rpmandtpmconfiguration - Handles retries automatically
- Eliminates manual throttling logic
Semantic Clustering
Group related content by similarity:
pythonclustered = ( df .with_column("embedding", fc.semantic.embed(fc.col("text"))) .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=5, label_column="cluster_label" ) .group_by("cluster_label") .agg( fc.semantic.reduce( "Summarize the common themes in this cluster", fc.col("text") ).alias("cluster_summary") ) )
Pattern enables content organization without predefined categories.
Similarity-Based Joins
Join DataFrames using embedding similarity:
python# Generate embeddings questions_with_embeddings = questions.with_column( "q_embedding", fc.semantic.embed(fc.col("question_text")) ) docs_with_embeddings = docs.with_column( "d_embedding", fc.semantic.embed(fc.col("doc_content")) ) # Semantic join matched = questions_with_embeddings.semantic.join( other=docs_with_embeddings, predicate=""" Does this document answer the question? Question: {{left_on}} Document: {{right_on}} """, left_on=fc.col("question_text"), right_on=fc.col("doc_content") )
The query engine uses embeddings for initial filtering before applying expensive LLM predicates.
TranscriptType for Audio and Video
TranscriptType preserves temporal structure and speaker information through transformations.
Load and Process Transcripts
pythonfrom pathlib import Path from pydantic import BaseModel, Field class SegmentSchema(BaseModel): speaker: str = Field(description="Who is talking in this segment") start_time: float = Field(description="Start time (seconds)") end_time: float = Field(description="End time (seconds)") key_points: list[str] = Field(description="Main points discussed") transcript_text = Path("data/transcript.json").read_text() session = fc.Session.get_or_create(fc.SessionConfig(app_name="transcript_processor")) df = session.create_dataframe({"transcript": [transcript_text]}) processed = ( df .with_column("transcript", fc.col("transcript").cast(fc.TranscriptType)) .select( "*", fc.text.recursive_token_chunk( "transcript", chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks"), ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract( "chunk", SegmentSchema, model_alias="mini" ).alias("segment"), ) )
Supported formats:
- SRT (SubRip Subtitle)
- WebVTT
- Generic transcript formats
Speaker-Specific Analysis
Aggregate by speaker using preserved metadata:
pythonspeaker_summaries = ( processed .group_by(fc.col("segment.speaker")) .agg( fc.semantic.reduce( "Summarize this speaker's contributions", fc.col("chunk") ).alias("speaker_summary") ) )
TranscriptType maintains speaker identity and timestamps throughout the pipeline.
Nested JSON with JQ Operations
JsonType supports JQ expressions for nested data manipulation:
python# Extract nested fields processed = (df .with_column("author_name", fc.json.jq(fc.col("metadata"), ".author.name")) .with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]")) .with_column("nested_value", fc.json.jq(fc.col("config"), ".settings.advanced.timeout")) )
Advantages:
- Eliminates verbose dictionary navigation
- Handles missing keys gracefully
- Supports array operations and complex queries
Combine JQ with Semantic Extraction
pythonfrom pydantic import BaseModel from typing import List class ExtractedMetadata(BaseModel): categories: List[str] priority: str assigned_to: str enriched = ( df .with_column("raw_json", fc.col("data").cast(fc.JsonType)) .with_column("basic_field", fc.json.jq("raw_json", ".title")) .with_column( "structured", fc.semantic.extract("raw_json", ExtractedMetadata) ) )
Complete Document Processing Pipeline
Production pipelines combine multiple data types with semantic operations:
pythonfrom pydantic import BaseModel, Field from typing import List class DocumentMetadata(BaseModel): title: str main_topics: List[str] key_entities: List[str] summary: str = Field(description="Two-sentence summary") config = fc.SessionConfig( app_name="doc_pipeline", semantic=fc.SemanticConfig( language_models={ "cheap": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "accurate": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100_000, output_tpm=50_000 ) }, default_language_model="cheap" ) ) session = fc.Session.get_or_create(config) pipeline = ( session.read.pdf_metadata("data/documents/**/*.pdf", recursive=True) .filter(fc.col("page_count") < 100) .select( "*", fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---" ).alias("markdown") ) .with_column( "sections", fc.markdown.extract_header_chunks("markdown", header_level=2) ) .explode("sections") .with_column( "metadata", fc.semantic.extract( "sections", DocumentMetadata, model_alias="cheap" ) ) .with_column( "embedding", fc.semantic.embed(fc.col("sections").content) ) .with_column( "quality_score", fc.semantic.map( "Rate content quality 1-10: {{text}}", text=fc.col("sections"), model_alias="cheap" ) ) .filter(fc.col("quality_score").cast(fc.IntegerType) >= 7) ) metrics = pipeline.write.parquet("s3://bucket/enriched/documents.parquet")
Efficiency patterns:
- Early filtering reduces data volume before expensive operations
- Model tiering uses cheap models for classification and embeddings
- Lazy evaluation enables query optimization
- Native types eliminate external preprocessing
Batch Preprocessing for Agent Systems
Separate heavy inference from real-time agent logic:
python# Offline batch preprocessing enriched_knowledge = ( raw_documents .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("doc", header_level=2) ) .explode("chunks") .with_column( "embedding", fc.semantic.embed(fc.col("chunks").content) ) .with_column( "metadata", fc.semantic.extract( "chunks", DocumentMetadata, model_alias="cheap" ) ) .with_column( "summary", fc.semantic.map( "Summarize in 100 words: {{content}}", content=fc.col("chunks"), model_alias="cheap" ) ) ) metrics = enriched_knowledge.write.parquet("agent_knowledge/")
This architecture provides:
- Predictable latency - no LLM inference at request time
- Better resource utilization - batch processing amortizes costs
- Cleaner separation - planning decoupled from execution
- Easier debugging - preprocessing validated offline
Performance Optimization
Intelligent Caching
Cache expensive operations explicitly:
pythoncached = ( df .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("doc", header_level=2) ) .explode("chunks") .cache() ) # Multiple operations reuse cached data embeddings = cached.with_column( "embedding", fc.semantic.embed(fc.col("chunks")) ) classifications = cached.with_column( "category", fc.semantic.classify( fc.col("chunks"), classes=["technical", "business", "legal"] ) )
The engine caches identical operations automatically within a session.
Model Selection Strategy
Use appropriate model sizes for each task:
pythonsemantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( model_name="gpt-4o-nano", rpm=500, tpm=200_000 ), "mini": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ), "full": fc.AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, input_tpm=100_000, output_tpm=50_000 ) } ) pipeline = ( df .with_column( "category", fc.semantic.classify( fc.col("text"), classes=["bug", "feature", "question"], model_alias="nano" ) ) .with_column( "metadata", fc.semantic.extract( fc.col("text"), Schema, model_alias="mini" ) ) .with_column( "analysis", fc.semantic.map( "Provide detailed technical analysis: {{text}}", text=fc.col("text"), model_alias="full" ) ) )
Cost difference between models: 10-100x. Strategic selection reduces expenses by 80%.
Hybrid Fuzzy and Semantic Matching
Use fast fuzzy matching before expensive semantic operations:
python# Fast fuzzy matching for candidates candidates = ( left_df.join(right_df) .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) # Semantic matching on filtered candidates final = candidates.semantic.join( predicate="Are these the same company? Left: {{left}}, Right: {{right}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This approach reduces costs by orders of magnitude.
Production Implementation
Schema-Driven Extraction
Define Pydantic schemas with clear field descriptions:
pythonfrom pydantic import BaseModel, Field from typing import Literal, List class Ticket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] = Field( description="Customer subscription level" ) region: Literal["us", "eu", "apac"] = Field( description="Geographic region" ) severity: Literal["low", "medium", "high", "critical"] = Field( description="Issue severity based on business impact" ) issues: List[str] = Field( description="List of specific issues mentioned" ) tickets = df.with_column( "extracted", fc.semantic.extract("raw_ticket", Ticket) )
Schema benefits:
- Clear descriptions guide LLM extraction
- Literal types constrain outputs to valid categories
- Reduces hallucinations
- Type-safe results
Incremental Development
Test with small samples before scaling:
python# Development with 100 rows df_sample = df.limit(100) result = df_sample.semantic.extract(...).collect() # Check cost and validate print(f"Cost for 100 rows: ${result.metrics.total_lm_metrics.total_cost}") print(f"Execution time: {result.metrics.execution_time_ms}ms") df_sample.semantic.extract(...).show() # Scale after validation metrics = df_full.semantic.extract(...).write.parquet("output/")
Monitor Pipeline Metrics
Track operations for optimization:
pythonresult = pipeline.collect() metrics = result.metrics print(f"Total tokens: {metrics.total_lm_metrics.total_tokens}") print(f"Total cost: ${metrics.total_lm_metrics.total_cost}") # Identify expensive operations # Note: The operator_metrics attribute structure needs to be checked in docs # Assuming basic metrics access for now print(f"Execution time: {metrics.execution_time_ms}ms") print(f"Output rows: {metrics.num_output_rows}")
Use insights to optimize model selection and caching.
Local Development to Cloud Scale
Same code runs locally and in cloud:
python# Local development session = fc.Session.get_or_create( fc.SessionConfig(app_name="local_dev") ) df = session.read.csv("local_data.csv") processed = df.semantic.extract(...).filter(...) metrics = processed.write.parquet("results.parquet")
Cloud deployment adds configuration:
python# Production config = fc.SessionConfig( app_name="production", cloud=fc.CloudConfig( size=fc.CloudExecutorSize.MEDIUM ) ) session = fc.Session.get_or_create(config) df = session.read.csv("s3://bucket/data/*.csv") processed = df.semantic.extract(...).filter(...) processed.write.parquet("s3://bucket/results/")
Zero code changes from prototype to production.
Infrastructure Integration
Lakehouse-Native Architecture
Works with existing data infrastructure:
python# Read from Parquet (or other supported format) df = session.read.parquet("s3://lake/raw_data/*.parquet") # Process with semantic operators processed = ( df .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column( "metadata", fc.semantic.extract("doc", Schema) ) .with_column("embedding", fc.semantic.embed("doc")) ) # Write to Parquet metrics = processed.write.parquet("s3://lake/processed/data.parquet")
Compatible with:
- Parquet
- Iceberg
- Delta Lake
- Lance
Built on Apache Arrow for interoperability with Spark, Polars, DuckDB, and pandas.
Hybrid Pipeline Pattern
Mix Fenic with existing ETL:
python# Spark preprocessing spark_df.write.parquet("s3://interim/") # Fenic semantic enrichment enriched = ( session.read.parquet("s3://interim/*.parquet") .with_column("doc", fc.col("text").cast(fc.MarkdownType)) .with_column("extracted", fc.semantic.extract("doc", Schema)) .with_column("category", fc.semantic.classify("doc", classes=["cat1", "cat2"])) ) metrics = enriched.write.parquet("s3://final/data.parquet") # Continue with Spark/dbt
Use the right tool for each step.
Implementation Summary
Efficient multimodal data management requires infrastructure built for AI workloads:
Native type support eliminates preprocessing. MarkdownType, TranscriptType, and EmbeddingType provide specialized operations respecting data structure.
Declarative operations enable optimization. The query engine batches requests, caches results, and reorders operations across pipelines.
Type-safe schemas reduce brittleness. Pydantic models define structure once, eliminating prompt engineering fragility.
Strategic model selection controls costs. Use cheap models for classification, standard for extraction, powerful for reasoning only.
Separate batch and real-time workloads. Preprocess offline with Fenic, serve to agents for predictable latency.
Traditional approaches using external services create maintenance overhead. Infrastructure treating inference as first-class operation makes production AI systems tractable.
Start with simple operations on small datasets. Validate costs and results. Scale with confidence.
Resources:
- Typedef Platform
- Fenic GitHub Repository
- Fenic Open Source Announcement
- Latest Release Notes How to Manage Multimodal Da ... fcf080c09938d198c652f611.md External Displaying How to Manage Multimodal Data Types Like Markdown 2aadf41efcf080c09938d198c652f611.md.

