Enterprise AI systems stall when forced to process unstructured data through infrastructure built for structured tables. 80% of organizational data exists in unstructured formats, yet most platforms handle LLM calls as external black boxes. This architectural mismatch prevents scale and keeps AI pilots from reaching production.
Identify the Core Bottleneck
Start by measuring where your pipeline fails. Most enterprises face three critical breakpoints:
Processing Volume: Unstructured data grows at 55-65% annually. Traditional query engines cannot batch LLM calls efficiently, causing linear processing time that scales poorly.
Infrastructure Cost: Poor data quality costs organizations $12.9 million annually. Without visibility into token usage and model selection, costs spiral as teams unknowingly use expensive models for simple tasks.
Integration Failures: 54% of organizations cite data movement without disruption as their top technical challenge. Custom scripts connecting OCR pipelines, transcription services, and LLM providers create brittle integration points.
Eliminate Fragile Glue Code
Replace Manual Orchestration
Traditional pipelines require custom scripts for every component. Each connection introduces failure modes, serialization latency, and manual rate limit management.
Before (manual approach):
pythonimport time from openai import OpenAI import pandas as pd client = OpenAI() def extract_sentiment(text): time.sleep(0.1) # Manual rate limiting try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": f"Analyze sentiment: {text}"}] ) return response.choices[0].message.content except Exception as e: return retry_with_backoff(extract_sentiment, text) df = pd.DataFrame({"text": ["I love this product!", "This is terrible."]}) df["sentiment"] = df["text"].apply(extract_sentiment)
The query engine has zero visibility into the UDF. It cannot batch calls, cache results, or optimize operation order.
After (with Fenic):
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import Literal class PolicyInsight(BaseModel): risk_level: Literal["low", "medium", "high", "critical"] coverage_gaps: list[str] recommendations: list[str] config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000) } ) ) session = fc.Session.get_or_create(config) results = ( df .select("*", fc.semantic.extract(fc.col("policy_text"), PolicyInsight).alias("policy_insight")) .filter(fc.semantic.predicate( "{{ policy_insight }} has non-empty coverage gaps", policy_insight=fc.col("policy_insight") )) .semantic.join( other=claims_df, predicate="The policy {{ left_on }} is related to claim {{ right_on }}", left_on=fc.col("policy_id"), right_on=fc.col("claim_policy_ref") ) ) results.show()
The query engine sees semantic operations as first-class primitives. It batches API calls automatically, caches repeated operations, reorders for efficiency, and self-throttles based on provider limits.
Configure Multi-Provider Management
Stop writing provider-specific retry logic. Configure models declaratively:
pythonimport fenic as fc config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000), "accurate": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000), "cheap": fc.GoogleVertexLanguageModel(model_name="gemini-2.0-flash", rpm=200, tpm=200000) }, default_language_model="fast" ) ) session = fc.Session.get_or_create(config) df = session.read.csv("feedback.csv") results = df.select( "*", fc.semantic.extract(fc.col("text"), Summary, model_alias="accurate").alias("summary_data") ) results.show()
Rate limits apply automatically. Retry logic is built-in. Model switching requires changing one parameter.
Implement Schema-Driven Extraction
Replace prompt engineering with type-safe schemas. Define output structure once, get validated results consistently.
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import Literal class CustomerData(BaseModel): name: str age: int = Field(ge=0, le=150) status: Literal["active", "inactive", "pending"] config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000) } ) ) session = fc.Session.get_or_create(config) df = session.create_dataframe({ "text": [ "Alice is 30 years old and active.", "Bob is 200 years old and inactive.", "Charlie is 40 and pending approval." ] }) df_processed = df.select( "*", fc.semantic.extract(fc.col("text"), CustomerData).alias("customer_data") ) df_processed.show()
Pydantic validation catches extraction errors. Field constraints ensure data quality. The schema documents expected structure without separate documentation.
Use Specialized Data Types
Stop writing preprocessing code for each format. Native support for unstructured types eliminates manual parsing:
Process Markdown Documents
pythonimport fenic as fc df = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2) ) .with_column("title", fc.json.jq(fc.col("raw_blog"), ".title")) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks").get_item("content")) ) )
MarkdownType preserves document structure. Header-based chunking respects semantic boundaries instead of naive character splitting.
Handle Transcripts with Speaker Awareness
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import Optional class MeetingActionItems(BaseModel): description: str owner: str due_date: Optional[str] = None config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000) } ) ) session = fc.Session.get_or_create(config) df = session.create_dataframe({ "file": ["meeting1.srt", "meeting2.vtt"] }) meetings = ( df .with_column("transcript", fc.col("file").cast(fc.TranscriptType)) .select("*", fc.semantic.extract(fc.col("transcript"), MeetingActionItems).alias("action_items")) .filter(fc.col("action_items").get_item("owner") == "Engineering") ) result = meetings.collect()
TranscriptType handles SRT and WebVTT formats. Speaker identity persists through transformations without manual parsing.
Manipulate Nested JSON
python.with_column("author", fc.json.jq(fc.col("metadata"), ".author.name")) .with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]"))
JQ expressions handle nested structures without verbose dictionary navigation code.
Build Production Pipelines
Enable Automatic Optimization
The query engine optimizes entire pipelines before execution. Lazy evaluation allows operation reordering, filter pushdown, and intelligent batching.
pythonimport fenic as fc from pydantic import BaseModel class TicketSchema(BaseModel): customer_id: str issue: str sentiment: str config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000) } ) ) session = fc.Session.get_or_create(config) df = session.create_dataframe({ "priority": ["high", "low", "high"], "content": [ "The app keeps crashing, I'm really annoyed!", "General feedback, nothing urgent.", "Payment failed again, this is frustrating!" ] }) knowledge_base = session.create_dataframe({ "solution_id": [1, 2], "solution_text": ["Restart the app", "Check payment settings"] }) pipeline = ( df .filter(fc.col("priority") == "high") .select("*", fc.semantic.extract(fc.col("content"), TicketSchema).alias("ticket_info")) .filter(fc.semantic.predicate( "The sentiment {{ sentiment }} is frustrated", sentiment=fc.col("ticket_info").get_item("sentiment") )) .semantic.join( other=knowledge_base, predicate="The issue {{ left_on }} can be resolved by {{ right_on }}", left_on=fc.col("ticket_info").get_item("issue"), right_on=fc.col("solution_text") ) ) result = pipeline.collect()
The pipeline filters before extraction, reducing expensive LLM calls. Semantic joins only process filtered rows.
Track Lineage and Costs
Every operation is traceable. Row-level lineage tracks individual processing history through non-deterministic transformations.
pythonresult = df.select( fc.semantic.map( "Analyze sentiment: {{ text }}", text=fc.col("text") ) ).collect() print(result.metrics.total_lm_metrics.num_output_tokens) print(result.metrics.total_lm_metrics.cost) print(result.metrics.execution_time_ms)
Metrics show token counts, costs, and execution time per operator. Use this data to identify expensive operations.
Apply RudderStack's Pattern
RudderStack cut triage time by 95% using this approach:
Step 1: Ingest and Normalize
Load support tickets, sales transcripts, product docs, and Notion PRDs into the warehouse.
Step 2: Build Semantic Context
python# Infer product taxonomy from documentation taxonomy = ( docs_df .select("*", fc.semantic.extract(fc.col("content"), TaxonomySchema).alias("taxonomy_data")) .select("*", fc.semantic.classify(fc.col("category"), predefined_categories).alias("category")) ) # Map tickets to taxonomy with citations mapped_tickets = ( tickets_df .semantic.join( other=taxonomy, predicate="The ticket {{ left_on }} relates to product area {{ right_on }}", left_on=fc.col("description"), right_on=fc.col("category_description") ) )
Step 3: Create Agent Tools
Expose retrieval operations via MCP tools for real-time agent access.
Step 4: Measure Results
Store mappings and rationales in the warehouse. Track acceptance rates and time savings.
Results:
- 95% reduction in PM time per triage
- 90% first-pass category acceptance
- Citations surface prospect signals directly in Linear
- Bottleneck shifted from inference to input breadth
"I wake up every morning and get a list of five things that are worth my time to look at; that's a game-changer." — David Daly, Product, RudderStack
Optimize Costs
Select Models Strategically
Use cheaper models for simple tasks. Reserve expensive models for reasoning.
pythonsemantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel(model_name="gpt-4o-nano", rpm=100, tpm=100000), "mini": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000), "full": fc.AnthropicLanguageModel(model_name="claude-opus-4", rpm=50, input_tpm=100000, output_tpm=50000) } ) # Simple classification uses nano .select("*", fc.semantic.classify(fc.col("col"), classes, model_alias="nano").alias("category")) # Structured extraction uses mini .select("*", fc.semantic.extract(fc.col("col"), schema, model_alias="mini").alias("extracted")) # Multi-step reasoning uses full .select("*", fc.semantic.map(instruction, model_alias="full", **columns).alias("result"))
Cost difference between models is 10-100x. Strategic selection cuts costs by 80% while maintaining quality.
Cache Expensive Operations
pythondf_cached = df.filter(...).semantic.extract(...).cache() result1 = df_cached.filter(condition1).collect() result2 = df_cached.filter(condition2).collect()
Subsequent operations reuse cached results. The engine also caches identical inference calls automatically.
Combine Fuzzy and Semantic Matching
Use fuzzy string matching for initial filtering. Apply semantic operations only to candidates:
pythoncandidates = ( left_df.join(right_df, how="cross") .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) final = candidates.semantic.join( other=right_df, predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This reduces semantic join costs by orders of magnitude versus full cross-products.
Separate Batch and Real-Time Processing
Preprocess data offline. Agents query enriched results without inference at request time.
python# Batch preprocessing (runs offline) agent_context = ( documents .with_column("extracted", fc.semantic.extract(fc.col("content_col"), StructuredMetadata)) .with_column("embedding", fc.semantic.embed(fc.col("processed_content"))) .with_column( "summary", fc.semantic.map( "Summarize in 100 words: {{content}}", content=fc.col("content"), model_alias="mini" ) ) ) agent_context.write.parquet("agent_knowledge_base/")
Benefits:
- Agents respond predictably without LLM latency
- Batch processing amortizes fixed costs
- Planning decouples from execution
- Preprocessing validates offline before production
Scale to Production
Deploy Without Code Changes
Develop locally, deploy to cloud with identical code:
python# Local development session = fc.Session.get_or_create() df = session.read.csv("local_data.csv") processed = df.select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted")) processed.write.parquet("results.parquet")
python# Production (same code, different config) config = fc.SessionConfig( cloud=fc.CloudConfig( size=fc.CloudExecutorSize.MEDIUM ) ) session = fc.Session.get_or_create(config) df = session.read.csv("s3://bucket/data/*.csv") processed = df.select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted")) processed.write.parquet("s3://bucket/results/output.parquet")
Automatic scaling handles increased load. No infrastructure management required.
Integrate with Existing Lakehouses
Read from and write to standard formats without data movement:
pythonsession = fc.Session.get_or_create() df = session.read.parquet("s3://data-lake/raw/*.parquet") processed = df.select("*", fc.semantic.extract(fc.col("col"), Schema).alias("extracted")).filter(fc.col("extracted").isnotnull()) processed.write.parquet("s3://data-lake/processed/")
Works with Parquet, Iceberg, Delta Lake, and Lance. Built on Apache Arrow for compatibility with Spark, Polars, DuckDB, and pandas.
Implementation Checklist
Phase 1: Assess Current State
- Measure processing time per record
- Count distinct LLM API calls in your pipeline
- Calculate total monthly inference costs
- Identify manual rate limiting code
- List preprocessing steps for each data format
Phase 2: Eliminate Glue Code
- Replace UDF-based LLM calls with semantic operators
- Configure multi-provider model management
- Define Pydantic schemas for extraction targets
- Remove manual batching and retry logic
- Test with small dataset to validate results
Phase 3: Optimize Performance
- Add explicit caching to expensive operations
- Use fuzzy matching before semantic joins
- Select appropriate model sizes per task
- Enable query optimization with lazy evaluation
- Track metrics to identify bottlenecks
Phase 4: Scale Production
- Separate batch preprocessing from real-time agents
- Write enriched data to lakehouse
- Deploy to cloud with zero code changes
- Monitor token usage and costs
- Iterate based on production metrics
Monitor and Iterate
Track these metrics weekly:
Cost Metrics:
- Total token usage by model
- Cost per processed record
- Cache hit rate for identical operations
Performance Metrics:
- Records processed per hour
- Average latency per operation
- Ratio of fuzzy to semantic matches
Quality Metrics:
- Schema validation pass rate
- Manual correction frequency
- Downstream system error rate
Use metric trends to refine model selection, adjust caching strategy, and optimize operation order.
Next Steps
Install Fenic:
bashpip install fenic
Review these resources:
- Typedef Platform
- Fenic Open Source Announcement
- Fenic 0.4.0 Release
- Fenic 0.3.0 Release
- Building Reliable AI Pipelines
- Eliminate Fragile Glue Code
- Composable Semantic Operators
- Orchestrate Reliable Agents
- Unstructured Data Statistics
Start with a single document processing pipeline. Validate extraction quality with 100 records. Measure cost and performance before scaling to full production. How to Tackle Unstructured ... fcf080a8ad25d83386e5b708.md External Displaying How to Tackle Unstructured Data Bottlenecks in Ent 290df41efcf080a8ad25d83386e5b708.md.

