<< goback()

How to Decouple Batch Inference from Real-Time AI Reasoning

Typedef Team

How to Decouple Batch Inference from Real-Time AI Reasoning

AI agents fail in production when they handle both heavy data processing and real-time decision-making simultaneously. This coupling creates unpredictable latency, resource contention, and debugging problems. The solution: separate batch inference operations from real-time reasoning through architectural decoupling.

The Operational Problem

Most AI systems force a single runtime to manage multiple workloads:

  • Document parsing and OCR operations
  • Multi-stage LLM extraction pipelines
  • Transcription and audio processing
  • Real-time user query handling
  • Decision-making and response generation

This architecture breaks under production load. When agents produce incorrect results, teams cannot determine whether the issue stems from reasoning logic or data preprocessing. Resource contention occurs when batch operations compete with real-time queries for API quota and compute capacity.

The impact is measurable. Typedef's work with RudderStack demonstrated that triage systems handling both preprocessing and decision-making create backlogs that slow product teams. Matic Insurance faced similar challenges processing thousands of documents before implementing batch separation.

Batch vs Real-Time Operations

Identify which operations belong in each processing layer.

Batch Inference Layer

These operations are expensive and time-intensive:

  • Document parsing across large corpora using PDF, Markdown, and HTML processing
  • Structured extraction from unstructured text with schema validation
  • Content classification and semantic categorization
  • Embedding generation for similarity operations
  • Clustering and aggregation tasks
  • Multi-stage enrichment workflows

Batch operations benefit from:

  • Efficient API rate limit management
  • Request grouping and optimization
  • Result caching and reuse
  • Retry logic without blocking users
  • Cost reduction through model selection

Real-Time Reasoning Layer

These operations require immediate execution:

  • User query responses
  • Pre-processed data retrieval
  • Routing and decision logic
  • Business rule application
  • Natural language generation
  • Tool execution against prepared datasets

Real-time operations must minimize latency. The pattern: agents query clean, preprocessed data rather than raw documents.

Architectural Separation Pattern

The Fenic framework enables architectural decoupling through three distinct layers.

Batch Preprocessing Layer

Process heavy inference operations offline or on schedules. Fenic provides DataFrame abstractions with semantic operators that treat LLM calls as native operations.

Batch responsibilities:

  • Load raw documents from storage
  • Parse multiple formats (PDF, Markdown, JSON, HTML)
  • Extract structured information using Pydantic schemas
  • Classify content into defined taxonomies
  • Generate embeddings for semantic operations
  • Cluster related content
  • Persist results to queryable tables

Tool Exposure Layer

Transform processed DataFrames into callable functions. Agents invoke typed operations rather than parsing text.

Tool layer functions:

  • Read operations over processed tables
  • Filtered views with parameters
  • Aggregated statistics
  • Indexed content search
  • Schema and metadata access

Agent Runtime Layer

Agents orchestrate tool calls and synthesize results without accessing raw data.

Agent responsibilities:

  • Query interpretation
  • Tool selection based on requirements
  • Structured result processing
  • Decision-making with preprocessed context
  • Response generation

Building Batch Pipelines with Fenic

Fenic's semantic operators function as DataFrame primitives, enabling declarative pipelines with automatic optimization.

Schema-Driven Extraction

Extract validated, type-safe structures from unstructured text:

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import List

class TicketInfo(BaseModel):
    category: str = Field(description="Support ticket category")
    priority: str = Field(description="Priority level")
    entities: List[str] = Field(description="Named entities")

config = fc.SessionConfig(
    app_name="batch_preprocessing",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash-lite",
                rpm=500,
                tpm=200_000
            ),
            "accurate": fc.OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=100,
                tpm=100_000
            )
        },
        default_language_model="fast"
    )
)

session = fc.Session.get_or_create(config)

preprocessed = (
    session.read.docs("/data/tickets/", content_type="markdown", recursive=True)
    .with_column(
        "extracted",
        fc.semantic.extract(
            fc.col("content"),
            TicketInfo,
            model_alias="accurate"
        )
    )
    .unnest("extracted")
    .persist()
)

preprocessed.write.save_as_table("support_tickets", mode="overwrite")

Schema-driven extraction provides validation at preprocessing time, type safety for agent consumption, and lineage tracking from outputs to source documents.

Classification Pipelines

Apply consistent taxonomies using semantic operations:

python
classified = (
    preprocessed
    .with_column(
        "classification",
        fc.semantic.classify(
            fc.col("category"),
            ["Account Access", "Billing Issue", "Technical Problem", "Feature Request"],
            model_alias="fast"
        )
    )
    .persist()
)

Faster models reduce costs while maintaining classification accuracy. The batch context enables automatic request batching and optimization.

Semantic Join Operations

Match DataFrames by meaning instead of exact strings:

python
candidates = session.table("candidate_profiles")
jobs = session.table("open_positions")

matches = candidates.semantic.join(
    jobs,
    predicate="""
    Evaluate candidate fit for this role.
    Candidate: {{left_on}}
    Requirements: {{right_on}}
    Consider skills and experience.
    """,
    left_on=fc.col("resume"),
    right_on=fc.col("job_description")
).persist()

This preprocessing transforms agent tasks from "evaluate all candidates" to "work with pre-matched candidates." Heavy computation happens in batch; agents handle final decisions.

Cost Optimization Pattern

Start with cheap models for classification, then apply expensive models to high-value subsets:

python
result = (
    df
    .with_column(
        "mentions_pricing",
        fc.semantic.classify(
            fc.col("text"),
            ["yes", "no"],
            model_alias="fast"
        )
    )
    .filter(fc.col("mentions_pricing") == "yes")
    .with_column(
        "pricing_info",
        fc.semantic.extract(
            fc.col("text"),
            PricingInquiry,
            model_alias="accurate"
        )
    )
)

This pattern reduces costs by 60-80% by applying expensive models only where accuracy matters.

Tool Exposure with MCP

Transform DataFrames into agent-callable tools using Fenic's catalog system.

Declarative Tool Creation

Define tools as DataFrame queries:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

search_df = (
    session.table("support_tickets")
    .filter(fc.col("classification") == fc.tool_param("category", StringType))
    .limit(fc.tool_param("limit", IntegerType))
)

session.catalog.create_tool(
    tool_name="search_tickets",
    tool_description="Search support tickets by category",
    tool_query=search_df,
    tool_params=[
        ToolParam(
            name="category",
            description="Ticket category filter",
            allowed_values=["Account Access", "Billing Issue", "Technical Problem"]
        ),
        ToolParam(
            name="limit",
            description="Maximum results",
            default_value=10
        )
    ],
    result_limit=50
)

Tools defined declaratively become versionable metadata. Schema changes propagate automatically.

System Tool Generation

Auto-generate standard operations for any table:

python
from fenic.api.mcp.tools import SystemToolConfig

system_tools = SystemToolConfig(
    table_names=["support_tickets"],
    tool_namespace="support",
    max_result_rows=100
)

This creates five tools:

  • support_schema: Column names and types
  • support_profile: Column statistics
  • support_read: Filtered data pages
  • support_search_summary: Regex text search
  • support_analyze: SQL query execution

MCP Server Deployment

Expose tools for agent consumption:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session,
    "TicketServer",
    user_defined_tools=tools,
    concurrency_limit=8
)

run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1"
)

Production deployment with ASGI:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

server = create_mcp_server(session, "TicketServer", user_defined_tools=tools)

app = run_mcp_server_asgi(
    server,
    stateless_http=True,
    path="/mcp"
)

Deploy with: uvicorn server:app --workers 4 --port 8000

Agent Runtime Implementation

With preprocessing complete and tools exposed, agents become orchestration layers querying structured data.

Separation Pattern

python
# Batch preprocessing (offline)
preprocessed = (
    raw_documents
    .with_column(
        "metadata",
        fc.semantic.extract(
            fc.col("content"),
            DocumentMetadata
        )
    )
    .with_column(
        "doc_type",
        fc.semantic.classify(
            fc.col("content"),
            ["policy", "claim", "correspondence", "legal"]
        )
    )
    .semantic.with_cluster_labels(
        by=fc.col("content_embedding"),
        num_clusters=20
    )
    .persist()
)

preprocessed.write.save_as_table("preprocessed_docs")

# Agent runtime (real-time)
relevant_docs = (
    session.table("preprocessed_docs")
    .filter(
        (fc.col("doc_type") == "claim") &
        (fc.col("cluster_label") == target_cluster)
    )
)

Agents query preprocessed data instantly rather than parsing documents on-demand.

Tool Integration

Agents invoke MCP tools:

python
import requests

def handle_query(user_query: str):
    category = determine_category(user_query)

    response = requests.post(
        "http://localhost:8000/mcp/tools/search_tickets",
        json={"category": category, "limit": 5}
    )

    tickets = response.json()
    return synthesize_response(tickets, user_query)

No parsing, extraction, or heavy inference. Only fast queries over prepared results.

Production Operations

Incremental Preprocessing

Process new documents as they arrive:

python
def incremental_batch():
    new_docs = session.read.docs(
        "/data/new_tickets/",
        content_type="markdown",
        recursive=True
    )

    processed = (
        new_docs
        .with_column("extracted", fc.semantic.extract(fc.col("content"), TicketInfo))
        .with_column("classification", fc.semantic.classify(fc.col("category"), classes))
    )

    processed.write.save_as_table("support_tickets", mode="append")

This pattern keeps structured data current without blocking real-time operations.

Lineage Tracking

Trace outputs through transformations when debugging:

python
lineage = df.lineage()

# Trace from problematic result
source_rows = lineage.backwards(["result_uuid1", "result_uuid2"])

# Trace from source
result_rows = lineage.forwards(["source_uuid1"])

result = df.collect()

Lineage reveals source documents, applied transformations, prompt templates, model outputs, and token costs per operation.

Performance Monitoring

Track metrics at operation level:

python
result = df.collect()

print(f"Duration: {result.metrics.query_duration_ms}ms")
print(f"Tokens: {result.metrics.lm_metrics.total_tokens}")
print(f"Cost: ${result.metrics.lm_metrics.total_cost}")

for op in result.metrics.operator_metrics:
    print(f"Operator: {op.operator_name}, Duration: {op.duration_ms}ms")

Metrics transform development from trial-and-error to measurement-driven optimization.

Caching Strategy

Cache expensive operations for faster iteration:

python
cached_df = df.with_column(
    "extracted",
    fc.semantic.extract(fc.col("text"), Schema)
).persist()

# Iterate without rerunning extraction
result1 = cached_df.filter(...).with_column(
    "classified",
    fc.semantic.classify(fc.col("text"), classes=[...])
)

result2 = cached_df.group_by(...).agg(
    fc.semantic.reduce("Summarize items", fc.col("text"))
)

This workflow reduces iteration cycles from hours to minutes.

Case Study: Log Clustering

The log clustering implementation demonstrates production separation:

Batch Processing

  1. Parse logs using templates for multiple formats
  2. Generate fingerprints for stable grouping keys
  3. Tag severity deterministically (ERROR, WARN, INFO)
  4. Cluster semantically within severity buckets
  5. Persist triage views, clusters, and assignments

Tool Layer

Expose read-only MCP tools:

  • list_clusters(severity_floor): Severity-weighted ranking
  • clusters_by_severity(severity): Single severity filter
  • assignments_for_cluster(cluster_id): Raw log lines
  • coverage_metrics(): Processing statistics

Agent Layer

Natural language queries invoke tools:

  • "Top clusters above WARN in the last hour"
  • "Only ERRORs for payment-api"
  • "Assignments for cluster 7"

Agents answer instantly by querying preprocessed views rather than parsing raw logs.

Case Study: RudderStack Triage

RudderStack achieved 95% triage time reduction through separation:

Batch Context Layer

  1. Ingest warehouse data (tickets, transcripts, usage)
  2. Ingest documentation (product docs, PRDs, strategy)
  3. Build semantic context model
  4. Create semantic links between issues and PRDs

Tool Layer

Expose retrieval via MCP tools for classification and research

Agent Runtime

Subscribe to Linear events. For each feature request:

  1. Classify to product taxonomy
  2. Surface related issues and duplicates
  3. Pull prospect and support signals
  4. Propose prioritization with citations
  5. Write decision to Linear

The agent operates on preprocessed context. Results: 90%+ first-pass accuracy and cited evidence for decisions.

Benefits of Separation

Performance Predictability

Agents respond consistently without data processing overhead. Real-time query latency becomes deterministic.

Resource Efficiency

Batch operations run during off-peak hours or on dedicated infrastructure. Real-time agents use minimal resources. API rate limits become manageable.

Debugging Clarity

Clear boundaries between preprocessing and reasoning isolate issues quickly. Test pipelines independently. Lineage connects outputs to sources.

Cost Reduction

Batched LLM calls are more efficient than individual requests. Model selection applies expensive models only where needed. Caching eliminates redundant calls.

Independent Scaling

Preprocessing pipelines scale separately from agent runtime. Add preprocessing capacity without affecting real-time responsiveness. Horizontal scaling with stateless HTTP servers.

Implementation Steps

Follow this sequence when implementing decoupled architecture:

Classify Operations

  • Separate batch from real-time operations
  • Document data flow between layers
  • Define interfaces clearly

Build Batch Pipelines

  • Use semantic operators for processing
  • Implement schema-driven extraction
  • Add monitoring and lineage
  • Configure rate limits

Expose Tools

  • Create declarative tools from DataFrames
  • Generate system tools
  • Deploy MCP servers
  • Document parameters

Implement Agents

  • Query tools, not raw data
  • Handle errors at tool layer
  • Monitor agent performance separately

Optimize

  • Cache expensive operations
  • Use hierarchical processing for costs
  • Adjust batch frequencies
  • Refine model selection

Additional Resources

Implementation Outcome

Decoupling batch inference from real-time AI reasoning transforms AI systems from prototypes into production applications. Separating heavy preprocessing from decision logic creates predictable performance, clearer debugging, and lower costs.

The pattern: preprocess in batch with semantic operators, expose through typed tools, and let agents handle orchestration. This architectural separation enables reliable AI systems that scale.

Start with one preprocessing pipeline, expose two tools, and connect an agent. Measure latency, costs, and debugging time. Expand coverage as patterns emerge.

The preprocessing layer determines whether AI agents deliver value or remain in pilot phase. How to Decouple Batch Infere ... efcf080198451fd83f55905b9.md External Displaying How to Decouple Batch Inference from Real-Time AI 296df41efcf080198451fd83f55905b9.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.