AI agents fail in production when they handle both heavy data processing and real-time decision-making simultaneously. This coupling creates unpredictable latency, resource contention, and debugging problems. The solution: separate batch inference operations from real-time reasoning through architectural decoupling.
The Operational Problem
Most AI systems force a single runtime to manage multiple workloads:
- Document parsing and OCR operations
- Multi-stage LLM extraction pipelines
- Transcription and audio processing
- Real-time user query handling
- Decision-making and response generation
This architecture breaks under production load. When agents produce incorrect results, teams cannot determine whether the issue stems from reasoning logic or data preprocessing. Resource contention occurs when batch operations compete with real-time queries for API quota and compute capacity.
The impact is measurable. Typedef's work with RudderStack demonstrated that triage systems handling both preprocessing and decision-making create backlogs that slow product teams. Matic Insurance faced similar challenges processing thousands of documents before implementing batch separation.
Batch vs Real-Time Operations
Identify which operations belong in each processing layer.
Batch Inference Layer
These operations are expensive and time-intensive:
- Document parsing across large corpora using PDF, Markdown, and HTML processing
- Structured extraction from unstructured text with schema validation
- Content classification and semantic categorization
- Embedding generation for similarity operations
- Clustering and aggregation tasks
- Multi-stage enrichment workflows
Batch operations benefit from:
- Efficient API rate limit management
- Request grouping and optimization
- Result caching and reuse
- Retry logic without blocking users
- Cost reduction through model selection
Real-Time Reasoning Layer
These operations require immediate execution:
- User query responses
- Pre-processed data retrieval
- Routing and decision logic
- Business rule application
- Natural language generation
- Tool execution against prepared datasets
Real-time operations must minimize latency. The pattern: agents query clean, preprocessed data rather than raw documents.
Architectural Separation Pattern
The Fenic framework enables architectural decoupling through three distinct layers.
Batch Preprocessing Layer
Process heavy inference operations offline or on schedules. Fenic provides DataFrame abstractions with semantic operators that treat LLM calls as native operations.
Batch responsibilities:
- Load raw documents from storage
- Parse multiple formats (PDF, Markdown, JSON, HTML)
- Extract structured information using Pydantic schemas
- Classify content into defined taxonomies
- Generate embeddings for semantic operations
- Cluster related content
- Persist results to queryable tables
Tool Exposure Layer
Transform processed DataFrames into callable functions. Agents invoke typed operations rather than parsing text.
Tool layer functions:
- Read operations over processed tables
- Filtered views with parameters
- Aggregated statistics
- Indexed content search
- Schema and metadata access
Agent Runtime Layer
Agents orchestrate tool calls and synthesize results without accessing raw data.
Agent responsibilities:
- Query interpretation
- Tool selection based on requirements
- Structured result processing
- Decision-making with preprocessed context
- Response generation
Building Batch Pipelines with Fenic
Fenic's semantic operators function as DataFrame primitives, enabling declarative pipelines with automatic optimization.
Schema-Driven Extraction
Extract validated, type-safe structures from unstructured text:
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import List class TicketInfo(BaseModel): category: str = Field(description="Support ticket category") priority: str = Field(description="Priority level") entities: List[str] = Field(description="Named entities") config = fc.SessionConfig( app_name="batch_preprocessing", semantic=fc.SemanticConfig( language_models={ "fast": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=500, tpm=200_000 ), "accurate": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100_000 ) }, default_language_model="fast" ) ) session = fc.Session.get_or_create(config) preprocessed = ( session.read.docs("/data/tickets/", content_type="markdown", recursive=True) .with_column( "extracted", fc.semantic.extract( fc.col("content"), TicketInfo, model_alias="accurate" ) ) .unnest("extracted") .persist() ) preprocessed.write.save_as_table("support_tickets", mode="overwrite")
Schema-driven extraction provides validation at preprocessing time, type safety for agent consumption, and lineage tracking from outputs to source documents.
Classification Pipelines
Apply consistent taxonomies using semantic operations:
pythonclassified = ( preprocessed .with_column( "classification", fc.semantic.classify( fc.col("category"), ["Account Access", "Billing Issue", "Technical Problem", "Feature Request"], model_alias="fast" ) ) .persist() )
Faster models reduce costs while maintaining classification accuracy. The batch context enables automatic request batching and optimization.
Semantic Join Operations
Match DataFrames by meaning instead of exact strings:
pythoncandidates = session.table("candidate_profiles") jobs = session.table("open_positions") matches = candidates.semantic.join( jobs, predicate=""" Evaluate candidate fit for this role. Candidate: {{left_on}} Requirements: {{right_on}} Consider skills and experience. """, left_on=fc.col("resume"), right_on=fc.col("job_description") ).persist()
This preprocessing transforms agent tasks from "evaluate all candidates" to "work with pre-matched candidates." Heavy computation happens in batch; agents handle final decisions.
Cost Optimization Pattern
Start with cheap models for classification, then apply expensive models to high-value subsets:
pythonresult = ( df .with_column( "mentions_pricing", fc.semantic.classify( fc.col("text"), ["yes", "no"], model_alias="fast" ) ) .filter(fc.col("mentions_pricing") == "yes") .with_column( "pricing_info", fc.semantic.extract( fc.col("text"), PricingInquiry, model_alias="accurate" ) ) )
This pattern reduces costs by 60-80% by applying expensive models only where accuracy matters.
Tool Exposure with MCP
Transform DataFrames into agent-callable tools using Fenic's catalog system.
Declarative Tool Creation
Define tools as DataFrame queries:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType search_df = ( session.table("support_tickets") .filter(fc.col("classification") == fc.tool_param("category", StringType)) .limit(fc.tool_param("limit", IntegerType)) ) session.catalog.create_tool( tool_name="search_tickets", tool_description="Search support tickets by category", tool_query=search_df, tool_params=[ ToolParam( name="category", description="Ticket category filter", allowed_values=["Account Access", "Billing Issue", "Technical Problem"] ), ToolParam( name="limit", description="Maximum results", default_value=10 ) ], result_limit=50 )
Tools defined declaratively become versionable metadata. Schema changes propagate automatically.
System Tool Generation
Auto-generate standard operations for any table:
pythonfrom fenic.api.mcp.tools import SystemToolConfig system_tools = SystemToolConfig( table_names=["support_tickets"], tool_namespace="support", max_result_rows=100 )
This creates five tools:
support_schema: Column names and typessupport_profile: Column statisticssupport_read: Filtered data pagessupport_search_summary: Regex text searchsupport_analyze: SQL query execution
MCP Server Deployment
Expose tools for agent consumption:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session, "TicketServer", user_defined_tools=tools, concurrency_limit=8 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1" )
Production deployment with ASGI:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi server = create_mcp_server(session, "TicketServer", user_defined_tools=tools) app = run_mcp_server_asgi( server, stateless_http=True, path="/mcp" )
Deploy with: uvicorn server:app --workers 4 --port 8000
Agent Runtime Implementation
With preprocessing complete and tools exposed, agents become orchestration layers querying structured data.
Separation Pattern
python# Batch preprocessing (offline) preprocessed = ( raw_documents .with_column( "metadata", fc.semantic.extract( fc.col("content"), DocumentMetadata ) ) .with_column( "doc_type", fc.semantic.classify( fc.col("content"), ["policy", "claim", "correspondence", "legal"] ) ) .semantic.with_cluster_labels( by=fc.col("content_embedding"), num_clusters=20 ) .persist() ) preprocessed.write.save_as_table("preprocessed_docs") # Agent runtime (real-time) relevant_docs = ( session.table("preprocessed_docs") .filter( (fc.col("doc_type") == "claim") & (fc.col("cluster_label") == target_cluster) ) )
Agents query preprocessed data instantly rather than parsing documents on-demand.
Tool Integration
Agents invoke MCP tools:
pythonimport requests def handle_query(user_query: str): category = determine_category(user_query) response = requests.post( "http://localhost:8000/mcp/tools/search_tickets", json={"category": category, "limit": 5} ) tickets = response.json() return synthesize_response(tickets, user_query)
No parsing, extraction, or heavy inference. Only fast queries over prepared results.
Production Operations
Incremental Preprocessing
Process new documents as they arrive:
pythondef incremental_batch(): new_docs = session.read.docs( "/data/new_tickets/", content_type="markdown", recursive=True ) processed = ( new_docs .with_column("extracted", fc.semantic.extract(fc.col("content"), TicketInfo)) .with_column("classification", fc.semantic.classify(fc.col("category"), classes)) ) processed.write.save_as_table("support_tickets", mode="append")
This pattern keeps structured data current without blocking real-time operations.
Lineage Tracking
Trace outputs through transformations when debugging:
pythonlineage = df.lineage() # Trace from problematic result source_rows = lineage.backwards(["result_uuid1", "result_uuid2"]) # Trace from source result_rows = lineage.forwards(["source_uuid1"]) result = df.collect()
Lineage reveals source documents, applied transformations, prompt templates, model outputs, and token costs per operation.
Performance Monitoring
Track metrics at operation level:
pythonresult = df.collect() print(f"Duration: {result.metrics.query_duration_ms}ms") print(f"Tokens: {result.metrics.lm_metrics.total_tokens}") print(f"Cost: ${result.metrics.lm_metrics.total_cost}") for op in result.metrics.operator_metrics: print(f"Operator: {op.operator_name}, Duration: {op.duration_ms}ms")
Metrics transform development from trial-and-error to measurement-driven optimization.
Caching Strategy
Cache expensive operations for faster iteration:
pythoncached_df = df.with_column( "extracted", fc.semantic.extract(fc.col("text"), Schema) ).persist() # Iterate without rerunning extraction result1 = cached_df.filter(...).with_column( "classified", fc.semantic.classify(fc.col("text"), classes=[...]) ) result2 = cached_df.group_by(...).agg( fc.semantic.reduce("Summarize items", fc.col("text")) )
This workflow reduces iteration cycles from hours to minutes.
Case Study: Log Clustering
The log clustering implementation demonstrates production separation:
Batch Processing
- Parse logs using templates for multiple formats
- Generate fingerprints for stable grouping keys
- Tag severity deterministically (ERROR, WARN, INFO)
- Cluster semantically within severity buckets
- Persist triage views, clusters, and assignments
Tool Layer
Expose read-only MCP tools:
list_clusters(severity_floor): Severity-weighted rankingclusters_by_severity(severity): Single severity filterassignments_for_cluster(cluster_id): Raw log linescoverage_metrics(): Processing statistics
Agent Layer
Natural language queries invoke tools:
- "Top clusters above WARN in the last hour"
- "Only ERRORs for payment-api"
- "Assignments for cluster 7"
Agents answer instantly by querying preprocessed views rather than parsing raw logs.
Case Study: RudderStack Triage
RudderStack achieved 95% triage time reduction through separation:
Batch Context Layer
- Ingest warehouse data (tickets, transcripts, usage)
- Ingest documentation (product docs, PRDs, strategy)
- Build semantic context model
- Create semantic links between issues and PRDs
Tool Layer
Expose retrieval via MCP tools for classification and research
Agent Runtime
Subscribe to Linear events. For each feature request:
- Classify to product taxonomy
- Surface related issues and duplicates
- Pull prospect and support signals
- Propose prioritization with citations
- Write decision to Linear
The agent operates on preprocessed context. Results: 90%+ first-pass accuracy and cited evidence for decisions.
Benefits of Separation
Performance Predictability
Agents respond consistently without data processing overhead. Real-time query latency becomes deterministic.
Resource Efficiency
Batch operations run during off-peak hours or on dedicated infrastructure. Real-time agents use minimal resources. API rate limits become manageable.
Debugging Clarity
Clear boundaries between preprocessing and reasoning isolate issues quickly. Test pipelines independently. Lineage connects outputs to sources.
Cost Reduction
Batched LLM calls are more efficient than individual requests. Model selection applies expensive models only where needed. Caching eliminates redundant calls.
Independent Scaling
Preprocessing pipelines scale separately from agent runtime. Add preprocessing capacity without affecting real-time responsiveness. Horizontal scaling with stateless HTTP servers.
Implementation Steps
Follow this sequence when implementing decoupled architecture:
Classify Operations
- Separate batch from real-time operations
- Document data flow between layers
- Define interfaces clearly
Build Batch Pipelines
- Use semantic operators for processing
- Implement schema-driven extraction
- Add monitoring and lineage
- Configure rate limits
Expose Tools
- Create declarative tools from DataFrames
- Generate system tools
- Deploy MCP servers
- Document parameters
Implement Agents
- Query tools, not raw data
- Handle errors at tool layer
- Monitor agent performance separately
Optimize
- Cache expensive operations
- Use hierarchical processing for costs
- Adjust batch frequencies
- Refine model selection
Additional Resources
- How to Enhance AI Agent Preprocessing for Better Decision-Making
- How to Build Reliable AI Pipelines with Fenic's Semantic Operators
- Introducing Fenic: A PySpark-Inspired DataFrame Framework
- Fenic 0.5.0 Release: Date Types and OpenRouter
- Fenic 0.4.0 Release: Declarative Tools and MCP
Implementation Outcome
Decoupling batch inference from real-time AI reasoning transforms AI systems from prototypes into production applications. Separating heavy preprocessing from decision logic creates predictable performance, clearer debugging, and lower costs.
The pattern: preprocess in batch with semantic operators, expose through typed tools, and let agents handle orchestration. This architectural separation enables reliable AI systems that scale.
Start with one preprocessing pipeline, expose two tools, and connect an agent. Measure latency, costs, and debugging time. Expand coverage as patterns emerge.
The preprocessing layer determines whether AI agents deliver value or remain in pilot phase. How to Decouple Batch Infere ... efcf080198451fd83f55905b9.md External Displaying How to Decouple Batch Inference from Real-Time AI 296df41efcf080198451fd83f55905b9.md.

