Most AI projects fail to reach production because traditional data infrastructure wasn't built for LLM operations. Teams spend months stitching together OCR services, transcription APIs, vector databases, and data warehouses with brittle glue code that breaks at scale. The solution requires rethinking data engines from first principles with inference as a native operation.
What Makes an Engine Inference-First
Legacy data platforms optimize for rows and columns. They treat LLM calls as external black boxes through User Defined Functions (UDFs). The query engine cannot see inside these functions, preventing automatic batching, intelligent caching, operation reordering, and cost optimization.
An inference-first engine rebuilds the query optimizer with model awareness at its core. Fenic, the open-source framework from Typedef, demonstrates this architecture by embedding semantic operations directly into the DataFrame API. When the query engine sees semantic.extract() or semantic.join(), it knows this is inference with specific characteristics: high latency, token costs, batching opportunities, and caching potential.
This visibility enables optimizations impossible with traditional approaches:
- Batch API calls across rows to minimize network overhead
- Cache repeated inference patterns automatically
- Reorder operations to reduce data volume before expensive inference
- Estimate costs accurately before execution
- Self-throttle to respect provider rate limits while maximizing throughput
The result: teams build deterministic workflows on top of non-deterministic models with production-grade reliability.
Core Architecture Components
DataFrames as the Unifying Layer
AI applications are data pipelines. They take inputs, reason over context, generate outputs, and log results. The DataFrame abstraction handles these patterns naturally while adding structure to probabilistic systems.
Lineage tracking: Every column and row has traceable origins, even from non-deterministic model outputs. When debugging unexpected results, trace backwards through transformations to identify where issues originated.
Columnar consistency: Whether dealing with summaries, embeddings, or classification scores, columns maintain structured types throughout the pipeline. This consistency eliminates manual type checking and validation code.
Deterministic transformations: Wrapping inference in declarative logic creates a pattern: model + prompt + input → output. This enables versioning, caching, and reproducibility with stochastic models.
AI-Native Data Types
Generic text columns force preprocessing logic throughout your codebase. Fenic provides specialized types that handle AI formats natively:
MarkdownType: Parse and extract structure from markdown documents without external preprocessing. The engine treats headers, lists, and code blocks as first-class structures.
TranscriptType: Process SRT, WebVTT, and transcript formats with speaker and timestamp awareness. Query by speaker, extract time ranges, or segment conversations without manual parsing.
JsonType: Manipulate nested JSON using JQ expressions within DataFrame operations. Extract deeply nested fields and transform structures without custom parsing functions.
DocumentPathType: Load PDFs, Word documents, and text files with automatic content extraction. The engine handles file I/O, format detection, and text extraction as native operations.
EmbeddingType: Store and operate on vector embeddings with similarity operations built in. Calculate cosine similarity, find nearest neighbors, or cluster embeddings using standard DataFrame syntax.
The Fenic 0.5.0 release added Date and Timestamp types with timezone-aware conversions, plus enhanced PDF parsing with page chunking and metadata extraction.
Multi-Provider Model Integration
Production systems need flexibility to route tasks to appropriate models. Fast, cheap models handle simple classification while capable models tackle extraction. Managing this across OpenAI, Anthropic, Google, and other providers manually creates operational overhead.
An inference-first engine handles provider differences transparently:
pythonimport fenic as fc config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ), "accurate": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000 ), "efficient": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=200, tpm=200000 ) }, default_language_model="fast" ) ) session = fc.Session.get_or_create(config)
Rate limiting parameters (rpm and tpm) prevent throttling. The engine automatically batches requests, implements retry logic, and self-throttles to stay within limits while maximizing throughput through async I/O.
Implementing Semantic Operations
Schema-Driven Extraction
Type-safe extraction from unstructured text eliminates prompt engineering brittleness. Define a Pydantic schema once and get validated results:
pythonfrom pydantic import BaseModel, Field from typing import Literal, List class PolicyInsight(BaseModel): risk_level: Literal["low", "medium", "high", "critical"] coverage_gaps: List[str] recommendations: List[str] confidence: float = Field(ge=0, le=1) results = df.select( "*", fc.semantic.extract( fc.col("policy_text"), PolicyInsight ).alias("insights") )
The schema serves as documentation and validation. The engine constructs prompts automatically that produce the specified structure and validates responses against constraints. When models return invalid data, the framework retries with clarifying instructions.
Semantic Filtering
Traditional filtering relies on exact matches or regular expressions. Semantic predicates enable content-based filtering:
pythonapplicants = df.filter( (fc.col("years_of_experience") > 5) & fc.semantic.predicate( "Has MCP Protocol experience? Resume: {{resume}}", resume=fc.col("resume") ) )
This combines traditional boolean logic with semantic evaluation. The query engine optimizes both together—filtering on cheap boolean conditions before invoking expensive LLM predicates.
Predicates support Jinja templating with loops, conditionals, and dynamic row-specific logic:
pythonfc.semantic.predicate( """ Does this feedback mention {{ search_term }}? {% if priority == "high" %} Only return true if it's a critical issue. {% endif %} Feedback: {{ feedback_text }} """, search_term=fc.lit("UI problems"), priority=fc.col("priority"), feedback_text=fc.col("raw_feedback") )
Meaning-Based Joins
Traditional joins match on exact values. Semantic joins determine matches based on meaning:
pythonprompt = """ Is this candidate a good fit for the job? Candidate Background: {{left_on}} Job Requirements: {{right_on}} Consider technical skills, experience level, and domain knowledge. """ matched = applicants.semantic.join( other=jobs_df, predicate=prompt, left_on=fc.col("resume"), right_on=fc.col("job_description") ).limit(5)
The predicate receives both left and right row data, enabling sophisticated matching logic. The engine optimizes semantic joins by batching LLM calls across candidate pairs and caching decisions for repeated comparisons.
This pattern works for:
- Matching documents to queries in RAG systems
- Linking related records across databases
- Finding similar content
- Deduplication based on semantic similarity
Classification and Transformation
The semantic.classify operator categorizes text with few-shot examples:
pythonfrom fenic.core.types.semantic_examples import ClassifyExample, ClassifyExampleCollection examples = ClassifyExampleCollection() examples.create_example(ClassifyExample(input="App crashed on startup", output="bug")) examples.create_example(ClassifyExample(input="Need ability to export data", output="feature")) examples.create_example(ClassifyExample(input="How do I reset my password?", output="question")) classified = df.select( "*", fc.semantic.classify( fc.col("feedback"), classes=["bug", "feature", "question"], examples=examples ).alias("category") )
The semantic.map operator applies natural language transformations:
pythonexpanded = df.select( "*", fc.semantic.map( "Expand abbreviations in: {{text}}", text=fc.col("message") ).alias("expanded_text") )
Semantic Aggregation
Group data by semantic similarity rather than exact matches:
python# Use with_cluster_labels for semantic grouping instead grouped = ( df .with_column( "issue_embedding", fc.semantic.embed(fc.col("issue_description")) ) .semantic.with_cluster_labels( by=fc.col("issue_embedding"), num_clusters=5, label_column="cluster_label" ) .group_by("cluster_label") .agg( fc.semantic.reduce( "Summarize common themes", fc.col("issue_description") ).alias("theme") ) )
This enables clustering semantically related content without manual category definitions.
Production Reliability Features
Automatic Optimization and Batching
The query optimizer groups API calls efficiently. Instead of individual requests per row, the engine batches operations:
pythonpipeline = ( df .filter(fc.col("priority") == "high") .select( "*", fc.semantic.extract(fc.col("content"), TicketSchema).alias("ticket") ) .filter( fc.semantic.predicate( "Is this urgent? {{description}}", description=fc.col("ticket.description") ) ) )
Behind the scenes:
- Batches all extraction calls for high-priority tickets
- Respects provider rate limits through self-throttling
- Executes predicate filtering in parallel where possible
- Caches repeated inference patterns automatically
Async I/O and concurrent request batching maximize throughput while staying within rate limits.
Error Handling and Resilience
Built-in retry logic with exponential backoff handles transient failures. Rate limit errors trigger intelligent back-off rather than cascade failures.
Token counting and cost tracking provide visibility:
pythonresult = pipeline.collect() print(f"Tokens used: {result.metrics.total_lm_metrics.num_output_tokens}") print(f"Cost: ${result.metrics.total_lm_metrics.cost}") print(f"Execution time: {result.metrics.execution_time_ms}ms")
The framework gracefully handles quota exhaustion by failing fast with clear error messages. Token capacity guardrails raise errors early if requests cannot fit within model limits.
Data Lineage and Debugging
Row-level lineage tracks individual record processing history. When debugging unexpected outputs, trace backwards through the computation graph:
pythonresult = pipeline.collect() # Access the data in a supported format (e.g., Polars DataFrame) df_result = result.data # Iterate through the data for row in df_result.iter_rows(named=True): # Process row data as needed print(f"Row data: {row}")
Explicit caching speeds iterative development:
pythonexpensive_pipeline = ( df .select(fc.semantic.extract(fc.col("document"), Schema)) .cache() ) analysis_a = expensive_pipeline.filter(condition_a) analysis_b = expensive_pipeline.filter(condition_b)
The explain() method visualizes query plans before execution:
pythonpipeline.explain() # Shows operation ordering, batching strategy, and estimated costs
Real-World Implementation
Document Triage at Scale
RudderStack reduced triage time by 95% using an inference-first pipeline processing support tickets, sales calls, and product documentation:
pythontickets = session.read.csv("s3://data/support_tickets/*.csv") transcripts = session.read.docs("s3://data/calls/**/*.srt", content_type="transcript") docs = session.read.docs("data/notion/**/*.md", content_type="markdown") taxonomy = ( docs .with_column("chunks", fc.markdown.extract_header_chunks(fc.col("content"))) .explode("chunks") .select( "*", fc.semantic.extract( fc.col("chunks.content"), ProductFeatureSchema ).alias("features") ) ) processed_tickets = ( tickets .semantic.join( other=taxonomy, predicate="Ticket relates to feature", left_on=fc.col("description"), right_on=fc.col("features.name") ) .select( "*", fc.semantic.classify( fc.col("description"), examples=triage_examples ).alias("category") ) )
The pipeline achieves 90%+ first-pass category acceptance while surfacing relevant context from thousands of documents. Product managers review triage decisions in one pass instead of spending hours researching each request.
Content Classification Pipeline
Media companies classify and enrich articles at scale:
pythonarticles = ( session .read.docs("content/**/*.md", content_type="markdown") .with_column("chunks", fc.markdown.extract_header_chunks(fc.col("content"))) .explode("chunks") .select( "*", fc.semantic.classify( fc.col("chunks.content"), examples=content_examples ).alias("category"), fc.semantic.extract( fc.col("chunks.content"), EntitySchema ).alias("entities"), fc.semantic.embed(fc.col("chunks.content")).alias("embedding") ) .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=50, label_column="topic_cluster" ) )
One pipeline handles classification, entity extraction, embedding generation, and semantic clustering without separate inference infrastructure.
Insurance Policy Analysis
Matic Insurance built semantic extraction pipelines across thousands of policies:
pythonpolicies = ( session .read.pdf_metadata("policies/**/*.pdf", recursive=True) .filter(fc.col("page_count") < 100) .select( "*", fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ).alias("policy_text") ) .select( "*", fc.semantic.extract( fc.col("policy_text"), PolicyCoverageSchema ).alias("coverage") ) .unnest("coverage") ) gaps = policies.semantic.join( other=standard_coverages, predicate="Policy lacks this coverage", left_on=fc.col("coverage.type"), right_on=fc.col("required_coverage") )
Teams deploy pipelines in days instead of months, dramatically reducing errors from manual analysis while cutting costs.
Building Your First Pipeline
Installation and Setup
bashpip install fenic
Configure model providers:
pythonimport fenic as fc from pydantic import BaseModel config = fc.SessionConfig( app_name="production_pipeline", semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = fc.Session.get_or_create(config)
Basic Pipeline Pattern
pythonclass FeedbackInsight(BaseModel): summary: str sentiment: Literal["positive", "negative", "neutral"] key_issues: List[str] priority: Literal["low", "medium", "high"] df = session.read.csv("feedback.csv") insights = ( df .select( "*", fc.semantic.extract( fc.col("customer_feedback"), FeedbackInsight ).alias("insights") ) .filter(fc.col("insights.priority") == "high") .with_column( "issues_embedding", fc.semantic.embed(fc.col("insights.key_issues").cast(fc.StringType)) ) .semantic.with_cluster_labels( by=fc.col("issues_embedding"), num_clusters=5, label_column="issue_cluster" ) .group_by("issue_cluster") .agg( fc.count("*").alias("issue_count"), fc.semantic.reduce( "Summarize common themes", fc.col("customer_feedback") ).alias("cluster_theme") ) ) results = insights.collect()
This pipeline extracts structured insights, filters high-priority items, clusters similar issues semantically, and generates summaries per cluster—all in declarative DataFrame operations.
Multi-Stage Processing
pythondocuments = ( session .read.pdf_metadata("documents/**/*.pdf", recursive=True) .filter(fc.col("page_count").between(5, 100)) .select( "*", fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .cache() ) extracted = ( documents .select( "*", fc.semantic.extract( fc.col("content"), DocumentMetadataSchema ).alias("metadata") ) .unnest("metadata") ) enriched = ( extracted .select( "*", fc.semantic.classify( fc.col("content"), examples=doc_type_examples ).alias("doc_type"), fc.semantic.embed(fc.col("content")).alias("content_embedding") ) ) relationships = enriched.semantic.join( other=enriched, predicate="Documents discuss related topics", left_on=fc.col("content"), right_on=fc.col("content") )
Each stage builds on previous results. Caching expensive operations prevents redundant processing.
Best Practices for Scale
Define Schemas Once
Create reusable Pydantic models as single sources of truth:
pythonclass CustomerIntent(BaseModel): """Schema for customer intent extraction""" primary_intent: Literal["purchase", "support", "inquiry", "complaint"] urgency: Literal["immediate", "soon", "flexible"] entities: List[str] confidence: float = Field(ge=0, le=1) pipeline_a = df.select(fc.semantic.extract(fc.col("message"), CustomerIntent)) pipeline_b = df.select(fc.semantic.extract(fc.col("email"), CustomerIntent))
This eliminates prompt brittleness and ensures consistent extraction.
Leverage Lazy Evaluation
Build complete pipelines before execution to enable global optimization:
pythonpipeline = ( df .filter(base_condition) .select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted")) .join(reference_data, on="key") .with_column("category", fc.semantic.classify(fc.col("text"), classes=["A", "B"])) .filter(result_condition) .cache() ) results = pipeline.collect()
The optimizer reorders operations, identifies batching opportunities, and eliminates redundant operations.
Configure Models Appropriately
Use the right model for each task:
pythonlanguage_models = { "nano": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200000 ), "standard": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100000, output_tpm=50000 ), "power": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100000 ) } pipeline = ( df .select( "*", fc.semantic.classify( fc.col("text"), examples=examples, model_alias="nano" ).alias("category"), fc.semantic.extract( fc.col("complex_doc"), DetailedSchema, model_alias="power" ).alias("extracted") ) )
The cost difference between models is often 10-100x. Strategic selection reduces costs by 80% while maintaining quality.
Implement Incremental Processing
For large datasets, process incrementally:
pythonbatch_size = 1000 for offset in range(0, total_rows, batch_size): batch = ( df .limit(batch_size, offset) .select(fc.semantic.extract(...)) .cache() ) batch.write.parquet(f"output/batch_{offset}.parquet")
Write intermediate results to recover from failures without reprocessing.
Scaling from Local to Production
Local Development
Develop and test complete pipelines locally:
pythonsession = fc.Session.get_or_create( fc.SessionConfig(app_name="local_dev") ) df = session.read.csv("sample_data.csv") processed = ( df .select(fc.semantic.extract(fc.col("text"), Schema)) .cache() ) processed.write.parquet("local_results.parquet")
The full engine runs locally with no external dependencies beyond model API access.
Production Deployment
Deploy the same code to cloud execution with zero changes:
pythonsession = fc.Session.get_or_create( fc.SessionConfig( app_name="production", cloud=fc.CloudConfig( size=fc.CloudExecutorSize.LARGE ) ) ) df = session.read.csv("s3://production-data/**/*.csv") processed = ( df .select(fc.semantic.extract(fc.col("text"), Schema)) .cache() ) processed.write.parquet("s3://results/output.parquet")
Typedef Cloud handles serverless execution, automatic scaling, and infrastructure management.
Measuring Pipeline Performance
Cost and Performance Metrics
Track token usage, costs, and execution time:
pythonresult = pipeline.collect() metrics = result.metrics print(f"Total tokens: {metrics.total_lm_metrics.num_input_tokens + metrics.total_lm_metrics.num_output_tokens}") print(f"Total cost: ${metrics.total_lm_metrics.cost:.4f}") print(f"Execution time: {metrics.execution_time_ms / 1000:.2f}s") for op_metrics in metrics.operator_metrics: print(f"{op_metrics.operator_name}: {op_metrics.execution_time_ms}ms")
Use these metrics to identify optimization opportunities and track costs.
Quality Metrics
Monitor extraction accuracy:
pythonsample = pipeline.limit(100).collect() validation_errors = sum(1 for row in sample.rows if not row.is_valid) accuracy = (len(sample.rows) - validation_errors) / len(sample.rows) print(f"Schema validation accuracy: {accuracy:.2%}")
Implement automated quality checks in CI/CD pipelines.
Advanced Patterns
Batch Preprocessing for Agents
Separate heavy lifting from real-time agents:
pythonenriched_data = ( raw_documents .with_column("raw_md", fc.col("content").cast(fc.MarkdownType)) .with_column("chunks", fc.markdown.extract_header_chunks("raw_md", header_level=2)) .explode("chunks") .with_column("embedding", fc.semantic.embed(fc.col("chunks").content)) .with_column( "metadata", fc.semantic.extract("chunks", DocumentMetadata, model_alias="cheap") ) ) enriched_data.write.parquet("s3://my-bucket/enriched/")
Agents query enriched data without expensive inference at request time. This provides predictable responsiveness, better resource utilization, and cleaner separation between planning and execution.
Hybrid Fuzzy and Semantic Matching
The Fenic 0.4.0 release added built-in fuzzy string matching. Use fuzzy matching for initial candidate selection before expensive semantic joins:
pythoncandidates = ( left_df.join(right_df) .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) final = candidates.semantic.join( predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This hybrid approach reduces costs by orders of magnitude.
Declarative Tool Creation
Version 0.4.0 introduced declarative tool creation for agent function calling:
pythonfrom fenic.core.mcp.types import ToolParam session.catalog.create_tool( tool_name="search_documents", tool_description="Search through company documents", tool_query=df, tool_params=[ ToolParam( name="search_term", description="The term to search for", default_value="default" ), ToolParam( name="limit", description="Max results to return", default_value=10 ) ], result_limit=50 )
Tools are type-safe, discoverable, and automatically consumable by MCP servers.
Integration with Existing Infrastructure
Lakehouse-Native Architecture
Fenic reads and writes standard formats without data movement:
pythondf = session.read.parquet("s3://data-lake/raw/*.parquet") processed = df.semantic.extract(...).filter(...) processed.write.parquet("s3://data-lake/processed/")
Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration with existing infrastructure.
Hybrid Pipelines
Mix Fenic with existing workflows:
python# Spark preprocessing spark_df.write.parquet("s3://interim/") # Fenic semantic enrichment enriched = ( session.read.parquet("s3://interim/") .semantic.extract(...) .semantic.classify(...) ) enriched.write.parquet("s3://final/") # Continue with downstream processing
Use the right tool for each step.
Why Inference-First Architecture Matters
Traditional data platforms treat LLM calls as external black-box UDFs that query optimizers cannot inspect. Fenic's inference-first approach embeds LLM operations directly into the query engine.
When the query optimizer sees semantic.extract() or semantic.join(), it knows this is inference with high latency, token costs, batching benefits, and caching opportunities. The optimizer can:
- Reorder operations to minimize data processed by expensive inference
- Batch requests across rows to amortize fixed costs
- Cache aggressively since deterministic operations produce consistent outputs
- Parallelize intelligently across providers
- Estimate costs accurately before execution
This is impossible when LLM calls are hidden in UDFs or microservices. Semantic operators make inference visible to the optimizer, enabling optimizations that improve performance and reduce costs.
The declarative API provides auditability and reproducibility. Every operation is explicitly defined with inputs, prompts, and model configurations tracked automatically. Row-level lineage traces data flow through transformations.
Combined with native AI data types, automatic batch optimization, multi-provider support, and production-grade error handling, Fenic represents infrastructure purpose-built for AI workloads.
Getting Started
Install Fenic:
bashpip install fenic
Set up your first pipeline:
pythonimport fenic as fc from pydantic import BaseModel class InsightSchema(BaseModel): summary: str key_points: list[str] sentiment: str config = fc.SessionConfig( app_name="first_pipeline", semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = fc.Session.get_or_create(config) df = session.read.csv("feedback.csv") insights = ( df .select("*", fc.semantic.extract(fc.col("feedback"), InsightSchema).alias("insights")) .with_column("key_points_embedding", fc.semantic.embed(fc.col("insights.key_points").cast(fc.StringType))) .semantic.with_cluster_labels( by=fc.col("key_points_embedding"), num_clusters=5, label_column="cluster_label" ) .group_by("cluster_label") .agg(fc.semantic.reduce("Summarize cluster themes", fc.col("feedback"))) ) insights.show()
Additional Resources
Typedef platform overview - Learn about the complete AI data engine
Fenic GitHub repository - Explore the open-source framework
Fenic 0.5.0 release notes - Latest features including PDF parsing and date types
Fenic 0.4.0 release notes - Declarative tools and MCP integration
LangChain integration guide - Use Fenic with LangChain for agentic workflows
Semantic operator patterns - Advanced patterns for data transformation
RudderStack case study - How one company achieved 95% triage time reduction
Log clustering guide - Build agents for log analysis
Typedef with LangGraph - Orchestrate reliable agents
An inference-first architecture eliminates the brittle glue code that prevents AI systems from reaching production. By treating model inference as a native operation within the data engine, teams build deterministic workflows on top of non-deterministic models with production-grade reliability and scalability. How to Build Scalable AI Dat ... efcf080f2a243e07de3021c4e.md External Displaying How to Build Scalable AI Data Pipelines with an In 2a5df41efcf080f2a243e07de3021c4e.md.

