The biggest obstacle preventing AI agents from scaling in production isn't model capabilities—it's the preprocessing maze. Most teams spend months wrestling with:
- OCR models
- Transcription services
- Document chunking
- Rate limits
- Brittle glue code
All just to feed clean data to their agents.
The impact: Independent reports show ~95% of enterprise GenAI pilots don't reach measurable production impact.
Typedef.ai addresses this through Fenic, an inference-first DataFrame framework that treats preprocessing as a first-class operation rather than an afterthought. By separating heavy batch inference from real-time agent decisions, Fenic enables more predictable, responsive, and cost-effective AI applications. This approach has already helped companies like Matic Insurance deploy semantic extraction pipelines across thousands of documents in days rather than months, dramatically reducing errors and costs.
The key insight: AI agent workflows are fundamentally pipelines—they take inputs, reason over context, generate outputs, and log results. By applying decades of proven data engineering patterns to AI workloads, teams can build deterministic workflows on top of non-deterministic models.
Preprocessing challenges that bottleneck agent performance
Traditional data infrastructure was built for rows and columns, not the messy, multi-modal inputs that modern AI agents require. The most valuable data lives in:
- PDFs
- Audio recordings
- Images
- Video
Each requiring specialized extraction models that introduce new failure modes, latency spikes, and operational complexity.
But extraction is just the beginning. Converting raw unstructured text into something agents can actually use requires multi-stage LLM pipelines. Teams end up managing:
- Rate limits across providers
- Chunking documents to fit context windows
- Balancing expensive accurate models against cheaper unreliable ones
- Constantly shuttling data between custom scripts, warehouses, and inference infrastructure
This impedance mismatch creates overhead, duplication, and chaos.
The fundamental problem: Agents traditionally handle both real-time decision-making AND heavy data preprocessing. This coupling creates:
- Unpredictable latency
- Resource contention
- Nearly impossible debugging
When an agent produces incorrect results, was it bad reasoning logic or corrupted preprocessing? Without proper lineage tracking, you're left guessing.
Fenic solves this by providing a preprocessing layer that extracts structure, enriches context, and prepares clean data in batch—then hands off to agents for real-time decisions. This architectural separation delivers:
- More predictable agents
- Better resource utilization
- Cleaner separation between planning and execution
Treating semantic operations as native DataFrame primitives
The breakthrough in Fenic's approach comes from treating semantic operations—classification, extraction, joins—as first-class DataFrame operations, not external API calls. Just as you'd use filter, map, and aggregate in traditional data pipelines, Fenic provides semantic operators that leverage LLMs while maintaining the structure and composability of DataFrames.
This inference-first architecture means the query engine is fully aware when LLM calls happen. Unlike traditional User-Defined Functions (UDFs) that appear as black boxes, Fenic's semantic operators enable intelligent optimizations:
- Automatic request batching
- Caching of repeated operations
- Choosing appropriate model sizes based on context
- Reordering operations to minimize expensive API calls
- Providing accurate cost estimates before execution
Core semantic operators include:
- semantic.extract – Transforms unstructured text into structured data using Pydantic schemas, enabling type-safe extraction with validation
- semantic.classify – Categorizes text with few-shot examples, handling advanced taxonomies that evolve over time
- semantic.map – Applies natural language transformations to data, like summarization or paraphrasing
- semantic.join – Matches DataFrames based on meaning rather than exact values—think matching research papers to queries semantically rather than keyword matching
- semantic.predicate – Creates boolean filters using natural language, such as "Does this feedback mention UI/UX issues?"
- semantic.with_cluster_labels – Groups data by semantic similarity using embeddings
- semantic.reduce – Aggregates grouped data with LLM operations, useful for synthesizing themes across customer feedback
These operators compose naturally with traditional DataFrame operations. You can chain df.filter(…).semantic.extract(…).semantic.classify(…) into declarative pipelines that the query engine optimizes end-to-end. This composability matters because preprocessing isn't a single operation—it's a multi-stage pipeline where each stage builds on the previous one.
Building structured extraction pipelines with schema validation
The most powerful preprocessing technique for agent decision-making is structured extraction: converting unstructured text into validated, type-safe data structures. This eliminates the “prompt engineering hell” where agents consume raw text and hallucinate fields, formats, or entire entities.
Fenic integrates Pydantic schemas directly into the extraction process, ensuring every piece of extracted data conforms to expected types and constraints. Here’s how to extract action items
pythonimport fenic as fc from pydantic import BaseModel, Field # Define schema for extraction class ActionItem(BaseModel): task: str = Field(description="The action item") owner: str = Field(description="Person responsible") deadline: str = Field(description="Due date") # Initialize session session = fc.Session.get_or_create(fc.SessionConfig(app_name="meeting_action_items")) # Read transcript file as CSV (treating SRT as text) # Note: For production use, you'd want to properly parse SRT format df = session.read.csv("meeting.srt") # Parse the transcript and extract structured action items action_items = df.with_column( "parsed_transcript", fc.text.parse_transcript(fc.col("content"), format="srt") ).with_column( "action_items", fc.semantic.extract( column=fc.col("content"), # or parsed content response_format=ActionItem, model_alias="gpt-4" ) ) # Collect results results = action_items.collect() print(results)
This pattern extends to advanced nested schemas. For customer support tickets, you might define issue types with severity levels, affected components, and resolution steps—all validated against your schema. The agent receives clean, structured data rather than parsing free-form text.
Schema-driven extraction provides three critical benefits for agent decision-making: validation happens at preprocessing time, not during agent runtime when failures are costly; type safety means agents can rely on data structures without defensive checks; and lineage tracking connects extracted entities back to source documents, enabling debugging when agents make incorrect decisions.
Production deployments at companies like Matic Insurance show the impact: semantic extraction pipelines across thousands of policies and transcripts, built in days rather than months, dramatically reducing human analysis errors and lowering Errors & Omissions risk.
Native support for multimodal preprocessing workflows
Agents consume diverse data types—transcripts, documents, JSON APIs, embeddings—but traditional data tools treat these as generic text blobs. Fenic provides specialized types for text-heavy workloads that preserve semantic boundaries and enable structure-aware processing.
MarkdownType treats Markdown as a first-class data type with native parsing and extraction capabilities. Rather than chunking documents at arbitrary character counts, Fenic respects document structure—sections, paragraphs, headings—enabling intelligent chunking that maintains context. This matters for RAG systems where naive splitting loses critical information.
TranscriptType handles SRT, WebVTT, and generic formats with speaker and timestamp awareness. You can filter by speaker, aggregate speaking time, or extract insights from specific conversation segments. Example: “Summarize action items mentioned by the project manager in the last 10 minutes.”
JsonType supports JQ expressions for nested data manipulation, letting you extract specific fields from advanced API responses without writing brittle parsing code. Combined with semantic operators, you can classify JSON payloads by intent, extract entities, or validate against schemas.
The framework includes automatic text chunking with configurable overlap for long documents, respecting structure rather than blindly splitting at token boundaries. For documents exceeding context windows, Fenic chunks intelligently while maintaining semantic coherence between chunks.
Token management and context handling are handled as first-class citizens. The engine tracks token limits, manages context windows, handles chunking automatically, and implements retries and rate limits transparently. Agents never need to worry about these operational details—they receive appropriately sized, preprocessed contexts ready for decision-making.
Semantic joins for context-aware decision-making
Traditional joins match on exact values: JOIN customers ON customer_id. But agent decision-making often requires fuzzy matching based on meaning. Should this candidate be interviewed for this role? Does this research paper answer my question? Is this customer inquiry similar to previous resolved issues?
Semantic joins in Fenic enable matching based on meaning rather than exact string comparison:
pythonimport fenic as fc # Initialize a session session = fc.Session.get_or_create(fc.SessionConfig(app_name="semantic_join_demo")) # Example candidate data candidates = session.create_dataframe([ {"id": 1, "resume": "Software engineer with 5 years of Python and ML experience"}, {"id": 2, "resume": "Frontend developer skilled in React and TypeScript"}, {"id": 3, "resume": "Data scientist with background in NLP and deep learning"} ]) # Example job data jobs = session.create_dataframe([ {"id": "A", "job_description": "Looking for a machine learning engineer with NLP expertise"}, {"id": "B", "job_description": "Hiring frontend engineer skilled in React and UI design"} ]) # Perform semantic join matches = candidates.semantic.join( jobs, predicate=""" Evaluate if this candidate is a good fit for the role. Candidate: {{left_on}} Job Requirements: {{right_on}} Consider skills, experience level, and domain expertise. """, left_on=fc.col("resume"), right_on=fc.col("job_description") ) # Collect results results = matches.collect() print(results)
This preprocessing step transforms agent decision-making from “search through all candidates and evaluate each” to “work with pre-matched, ranked candidates.” The heavy lifting happens in batch; the agent focuses on final decisions and edge cases.
Semantic predicates extend this concept to filtering. Rather than keyword matching, create natural language filters:
pythonhigh_priority = df.filter( fc.semantic.predicate( "Is this feedback about critical bugs or security issues? Feedback: {{customer_feedback}}", customer_feedback=fc.col("customer_feedback") ) )
These predicates preprocess your data to surface the most relevant subset for agent evaluation. Instead of an agent parsing thousands of feedback items, it works with the 50 that actually matter—dramatically reducing latency and costs.
Configuring multi-provider inference for production reliability
Production agent systems require reliability across multiple LLM providers with different rate limits, pricing models, and capabilities. Fenic’s session configuration handles multi-provider orchestration with built-in throttling and cost optimization:
pythonimport fenic as fc # Define session configuration with multiple model providers config = fc.SessionConfig( app_name="agent_preprocessing", semantic=fc.SemanticConfig( language_models={ "fast": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=500, tpm=200_000 ), "accurate": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100_000 ), "cheap": fc.AnthropicLanguageModel( model_name="claude-haiku-4-0", rpm=300, input_tpm=150_000, output_tpm=75_000 ), }, default_language_model="fast", ), ) # Create or get session session = fc.Session.get_or_create(config) print("Session initialized:", session)
This configuration defines model profiles for different use cases: use “fast” for bulk classification tasks, “accurate” for critical extraction where errors are costly, and “cheap” for simple transformations. Fenic handles rate limiting, token counting, and automatic throttling across all providers.
Model profiles enable even finer control. Configure the same model with different parameters for specific workloads:
python"claude": fc.AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, input_tpm=100_000, output_tpm=100_000, profiles={ "fast": fc.AnthropicLanguageModel.Profile(thinking_token_budget=1024), "thorough": fc.AnthropicLanguageModel.Profile(thinking_token_budget=4096) }, default_profile="fast" )
During preprocessing, Fenic automatically batches requests to maximize throughput while respecting rate limits. Async I/O handles concurrent requests, exponential backoff manages transient failures, and comprehensive error handling ensures pipelines don’t crash on individual model timeouts.
Production-ready features include cost tracking at the operation level, persistent caching to avoid recomputing expensive operations, query metrics showing token usage and latency per operation, and row-level lineage connecting every output back to source documents and prompts.
Decoupling batch preprocessing from real-time agent execution
The most impactful architectural pattern for agent performance is separating batch preprocessing from real-time decision-making. Traditional agents handle everything in the critical path: loading documents, parsing formats, extracting entities, classifying content, AND making decisions. This creates unpredictable latency and resource contention.
Fenic enables a preprocessing-first architecture where heavy batch operations run offline:
pythonimport fenic as fc # Preprocessing stage: run batch, offline preprocessed = ( raw_documents .with_column( "metadata", fc.semantic.extract( column=fc.col("content"), response_format=DocumentMetadata ) ) .with_column( "document_type", fc.semantic.classify( fc.col("content"), classes=["policy", "claim", "correspondence", "legal"] ) ) .semantic.with_cluster_labels( by=fc.col("content_embedding"), num_clusters=20 ) .persist() ) # Write preprocessed data for agent runtime preprocessed.write.save_as_table("preprocessed_documents") # -- Agent runtime: fast, predictable -- relevant_docs = ( session.table("preprocessed_documents") .filter( (fc.col("document_type") == "claim") & (fc.col("cluster_label") == target_cluster) ) ) print("Relevant docs ready for agent runtime:", relevant_docs)
This separation delivers multiple benefits. Agents become more predictable because they’re no longer bogged down by heavy data processing. Resource utilization improves through batched LLM calls that are far more efficient than one-off requests. The architecture becomes cleaner with explicit boundaries between planning, preprocessing, and execution. Debugging becomes tractable because you can test preprocessing independently from agent runtime logic.
The pattern extends to continuous preprocessing pipelines. As new documents arrive, background jobs preprocess them incrementally, keeping the structured data fresh for agent consumption without blocking real-time operations.
Implementing row-level lineage for debugging agent decisions
When agents make incorrect decisions, the critical question is: why? Was the reasoning logic flawed, or was the input data corrupted during preprocessing? Without lineage tracking, debugging becomes guesswork.
Fenic provides row-level lineage that traces every output back through transformations, prompts, and source documents:
python# Access lineage information from DataFrame, not QueryResult lineage = df.lineage() # Trace backwards from problematic result rows source_rows = lineage.backward(["result_uuid1", "result_uuid2"]) # Trace forwards from source rows result_rows = lineage.forward(["source_uuid1"]) # Execute the query separately result = df.collect()
This granular tracking enables precise debugging. When an agent classifies a customer inquiry incorrectly, lineage shows: the source document that provided context, every transformation applied during preprocessing, which prompt template was used, what model generated the classification, and token costs and timing for each operation.
Query metrics provide operation-level visibility:
pythonresult = df.collect() print(f"Query duration: {result.metrics.query_duration_ms}ms") print(f"Total tokens: {result.metrics.lm_metrics.total_tokens}") print(f"Total cost: ${result.metrics.lm_metrics.total_cost}") for op_metric in result.metrics.operator_metrics: print(f"Operator: {op_metric.operator_name}") print(f"Duration: {op_metric.duration_ms}ms")
This observability transforms agent development from “tweak prompts and hope” to “measure, analyze, optimize.” You can identify which preprocessing steps are bottlenecks, which models provide the best accuracy-to-cost ratio, and where to focus optimization efforts.
Explicit caching complements lineage for faster iteration:
python# Cache expensive preprocessing cached_df = df.with_column( "extracted", fc.semantic.extract(fc.col("text"), response_format=YourSchema) ).persist() # Iterate on downstream logic without rerunning extraction result1 = cached_df.filter(...).with_column( "classified", fc.semantic.classify(fc.col("text"), classes=[...]) ) result2 = cached_df.group_by(...).agg( fc.semantic.reduce("Summarize these items", fc.col("text")) )
This development workflow—preprocess once, experiment many times—accelerates the iteration cycle from hours to minutes.
Practical patterns for agent preprocessing workflows
Real-world agent systems benefit from specific preprocessing patterns that improve decision quality while reducing costs and latency.
Pattern 1: Hierarchical extraction starts with fast, cheap models for initial classification, then applies expensive accurate models only to high-value subsets. Example: use a fast model to identify customer inquiries that mention pricing, then apply an accurate extraction model only to those rows:
pythonresult = ( df .with_column("mentions_pricing", fc.semantic.classify( fc.col("text"), classes=["yes", "no"], model_alias="fast" )) .filter(fc.col("mentions_pricing") == "yes") .with_column("pricing_info", fc.semantic.extract( fc.col("text"), response_format=PricingInquiry, model_alias="accurate" )) )
Pattern 2: Semantic clustering for similar items groups related entities before agent processing. For customer feedback analysis, cluster similar comments then summarize each cluster:
pythonclustered = ( feedback .semantic.with_cluster_labels( by=fc.col("text_embedding"), num_clusters=15 ) .group_by("cluster_label") .agg( fc.semantic.reduce( "Summarize the common themes in this feedback: {{text}}", fc.col("text"), group_context={"text": fc.col("text")} ) ) )
Agents work with 15 summaries instead of thousands of raw comments—dramatically faster and cheaper.
Pattern 3: Dynamic context assembly uses semantic joins to gather relevant context. Rather than passing entire document collections to agents, preprocess by matching documents to queries:
pythonrelevant_context = documents.semantic.join( user_queries, predicate="Is this document relevant to answering the query? Document: {{left_on}} Query: {{right_on}}", left_on=fc.col("document_content"), right_on=fc.col("query_text") )
Pattern 4: Progressive enrichment builds structured metadata incrementally. Start with basic extraction, then add classifications, then compute embeddings, then cluster:(
pythonenriched = ( raw_data .with_column("metadata", fc.semantic.extract(fc.col("text"), BaseMetadata)) .with_column("priority", fc.semantic.classify( fc.col("text"), classes=["urgent", "normal", "low"] )) .with_column("embedding", fc.semantic.embed(fc.col("text"))) .semantic.with_cluster_labels(by=fc.col("embedding"), num_clusters=10) )
Each stage adds information that subsequent stages or agents can leverage.
Conclusion: from preprocessing chaos to structured pipelines
The shift from brittle preprocessing scripts to structured, optimizable pipelines fundamentally changes what’s possible with AI agents. By treating inference as a first-class operation, providing semantic operators as DataFrame primitives, and separating batch preprocessing from real-time decisions, teams can build production-grade agent systems that scale reliably.
The Fenic framework provides the foundation: inference-first query engine, native unstructured data types, multi-provider orchestration, row-level lineage, and cost optimization. These capabilities transform preprocessing from operational overhead to strategic advantage. Clean, structured, validated data enables agents to focus on what they do best—reasoning and decision-making—rather than wrestling with data quality issues.
Production deployments demonstrate the impact: pipelines built in days rather than months, dramatic error reduction through schema validation, lower costs via batch optimization, and faster iteration through caching and lineage. As one team put it: “Typedef lets us build and deploy semantic extraction pipelines across thousands of documents in days not months.”
For teams serious about moving AI agents from prototypes to production, the path forward is clear: preprocess deliberately with structured pipelines, decouple batch inference from real-time decisions, validate data with schemas before agent consumption, track lineage for debugging and optimization, and optimize costs through intelligent model selection and batching.
The preprocessing layer isn’t just infrastructure—it’s the foundation that determines whether agents deliver consistent value or remain trapped in pilot paralysis.

