<< goback()

How to Automate Batch Inference Across Multiple Model Providers

Typedef Team

How to Automate Batch Inference Across Multiple Model Providers

Batch inference across multiple LLM providers requires managing different API rate limits, handling provider-specific failures, optimizing costs, and maintaining type safety. This guide shows how to build production-grade batch inference pipelines using Fenic, a DataFrame framework purpose-built for AI workloads.

Multi-Provider Batch Inference Requirements

Technical Challenges

Rate Limit Constraints

  • Each provider enforces distinct RPM (requests per minute) and TPM (tokens per minute) limits
  • OpenAI: 500 RPM typical
  • Anthropic: 50 RPM for Claude models with separate input/output TPM
  • Google Vertex: 200 RPM for Gemini models
  • Requires per-provider throttling logic

Provider-Specific Error Handling

  • Different error codes across providers
  • 429 responses require distinct retry strategies
  • Quota exhaustion vs temporary rate limiting
  • Transient network failures need exponential backoff

Cost Management

  • Model pricing varies 10x or more between providers
  • Task-appropriate model selection reduces costs significantly
  • Token usage tracking required for budget control
  • Need visibility into per-operation expenses

Context Window Limits

  • Each model supports different maximum token counts
  • Document chunking strategies vary by provider capabilities
  • Request batching must respect context limits

Provider Configuration

Declarative Setup

Fenic eliminates orchestration code through declarative provider registration. Define all providers with their constraints once, then reference them throughout your pipeline.

python
import fenic as fc

config = fc.SessionConfig(
    app_name="batch_inference_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200000
            ),
            "balanced": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=50,
                input_tpm=100000,
                output_tpm=50000
            ),
            "efficient": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=200,
                tpm=200000
            ),
            "powerful": fc.OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

session = fc.Session.get_or_create(config)

Rate Limit Enforcement

The framework automatically enforces configured rate limits:

  • RPM limiting: Throttles requests to stay within per-minute quotas
  • TPM tracking: Monitors token consumption against limits
  • Adaptive throttling: Adjusts request rates based on 429 responses
  • Provider isolation: Rate limits enforced independently per provider

API Key Validation

Provider keys validate during session creation, preventing runtime surprises:

python
# Keys validated here - fails immediately if misconfigured
session = fc.Session.get_or_create(config)

# By this point, all provider credentials confirmed valid

Invalid keys produce clear error messages before any processing begins.

OpenRouter Integration

OpenRouter support enables dynamic provider routing based on cost, latency, or throughput:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "adaptive": fc.OpenRouterLanguageModel(
                model_name="openai/gpt-4o",
                profiles={
                    "low_latency": fc.OpenRouterLanguageModel.Profile(
                        provider=fc.OpenRouterLanguageModel.Provider(
                            sort="latency"
                        )
                    ),
                    "cost_optimized": fc.OpenRouterLanguageModel.Profile(
                        provider=fc.OpenRouterLanguageModel.Provider(
                            sort="price"
                        )
                    )
                },
                default_profile="low_latency"
            )
        }
    )
)

Batch Processing Patterns

Schema-Driven Extraction

Type-safe extraction through Pydantic schemas ensures consistent outputs across providers:

python
from pydantic import BaseModel, Field
from typing import Literal, List

class FeedbackAnalysis(BaseModel):
    sentiment: Literal["positive", "negative", "neutral"]
    urgency: Literal["low", "medium", "high", "critical"]
    categories: List[str]
    issues: List[str]

df = session.read.csv("feedback.csv")

results = df.select(
    "*",
    fc.semantic.extract(
        fc.col("text"),
        FeedbackAnalysis,
        model_alias="fast"
    ).alias("analysis")
)

Automatic Optimizations:

  • Request batching where provider APIs support it
  • Rate limit enforcement per configured constraints
  • Retry logic with exponential backoff
  • Individual row failure handling (returns None vs pipeline failure)

Multi-Stage Provider Selection

Optimize costs by routing different stages to appropriate models:

python
class QuickClassification(BaseModel):
    is_technical: bool
    needs_escalation: bool

class DetailedAnalysis(BaseModel):
    root_cause: str
    actions: List[str]
    timeline: str

pipeline = (
    df
    # Stage 1: Fast classification on all records
    .select(
        "*",
        fc.semantic.extract(
            fc.col("content"),
            QuickClassification,
            model_alias="fast"
        ).alias("triage")
    )
    # Stage 2: Detailed analysis only on escalated items
    .filter(fc.col("triage.needs_escalation") == True)
    .select(
        "*",
        fc.semantic.extract(
            fc.col("content"),
            DetailedAnalysis,
            model_alias="powerful"
        ).alias("details")
    )
)

results = pipeline.collect()

Cost Optimization Strategy:

  • Use inexpensive models for classification/filtering
  • Apply expensive models only to filtered subsets
  • Minimize total API spend while maintaining quality

Concurrent Processing with Async UDFs

Async UDFs enable parallel external API calls with bounded concurrency:

python
import aiohttp

@fc.async_udf(
    return_type=fc.IntegerType,
    max_concurrency=10,
    timeout_seconds=5,
    num_retries=2
)
async def fetch_score(customer_id: int) -> int:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.example.com/score/{customer_id}"
        ) as resp:
            data = await resp.json()
            return data["score"]

df = df.select(
    fc.col("customer_id"),
    fetch_score(fc.col("customer_id")).alias("score")
)

Async UDF Features:

  • Configurable concurrency limits prevent resource exhaustion
  • Per-item timeouts avoid hung requests
  • Automatic retry with exponential backoff
  • Ordered results maintain input row sequence
  • Individual failures return None without breaking pipeline

Production Reliability

Automatic Batching

The query optimizer groups operations before execution:

  • Request consolidation: Multiple rows batched into single API calls where supported
  • Lazy evaluation: Complete pipeline analysis enables global optimization
  • Operation reordering: Expensive inference moved after filtering
python
# Optimizer automatically batches and reorders
pipeline = (
    df
    .filter(fc.col("priority") == "high")  # Filter applied first
    .select(
        "*",
        fc.semantic.extract(
            fc.col("text"),
            Schema,
            model_alias="balanced"
        ).alias("extracted")  # Extraction only on filtered rows
    )
)

Self-Throttling

Built-in rate limiting prevents API failures:

  • Provider-specific RPM/TPM enforcement
  • Adaptive rate adjustment based on 429 responses
  • Fast failure on quota exhaustion with actionable errors
python
config = fc.SemanticConfig(
    language_models={
        "throttled": fc.AnthropicLanguageModel(
            model_name="claude-3-5-haiku-latest",
            rpm=50,  # Automatically enforced
            input_tpm=100000,
            output_tpm=50000
        )
    }
)

Error Recovery

Production error handling includes:

  • Exponential backoff retries for transient failures
  • Immediate failure on quota exhaustion (e.g., OpenAI 429 quota errors)
  • Individual row failures return None instead of breaking entire pipeline
  • Detailed error logging for debugging production issues

Cost Tracking

Monitor spending and performance across providers:

python
results = pipeline.collect()

# Access detailed metrics
print(f"Total tokens: {results.metrics.total_lm_metrics.num_output_tokens}")
print(f"Cost: ${results.metrics.total_lm_metrics.cost:.4f}")
print(f"Execution time: {results.metrics.execution_time_ms}ms")

# Query local metrics table
metrics = session.table("fenic_system.query_metrics")

cost_analysis = (
    metrics
    .group_by("model")
    .agg(
        fc.sum("cost_usd").alias("total_cost"),
        fc.avg("latency_ms").alias("avg_latency"),
        fc.count("*").alias("calls")
    )
    .order_by("total_cost", ascending=False)
)

cost_analysis.show()

Advanced Batch Scenarios

Document Processing at Scale

Process large document collections with page-level chunking:

python
# Load PDF metadata for filtering
pdfs = session.read.pdf_metadata(
    "documents/**/*.pdf",
    recursive=True
)

# Filter before expensive parsing
filtered = pdfs.filter(
    (fc.col("page_count") > 5) &
    (fc.col("page_count") < 100)
)

# Parse only filtered documents
markdown = filtered.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True
    ).alias("content")
)

Details: Fenic 0.5.0 document processing features

Semantic Joins

Match records based on semantic similarity:

python
matched = (
    candidates
    .semantic.join(
        other=jobs,
        predicate="""
        Evaluate candidate fit for job.
        Candidate: {{left_on}}
        Requirements: {{right_on}}
        """,
        left_on=fc.col("resume"),
        right_on=fc.col("description"),
        model_alias="balanced"
    )
    .order_by(fc.col("application_date"))
    .limit(100)
)

HuggingFace Dataset Integration

Process datasets directly from HuggingFace:

python
df = session.read.csv("hf://datasets/squad/default/train.csv")

class AnalysisResult(BaseModel):
    question_type: str
    difficulty: Literal["easy", "medium", "hard"]
    concepts: List[str]

processed = df.select(
    "*",
    fc.semantic.extract(
        fc.col("context"),
        AnalysisResult,
        model_alias="fast"
    ).alias("analysis")
)

Local to Production Scaling

Local Development

Full engine capabilities available locally:

python
session = fc.Session.get_or_create(config)

df = session.read.csv("sample.csv")
processed = df.select(
    fc.semantic.extract(
        fc.col("text"),
        Schema
    ).alias("extracted")
)

# Cache for iterative development
processed.cache()
processed.write.parquet("local_results.parquet")

Production Deployment

Same code scales to cloud with zero modifications:

python
config = fc.SessionConfig(
    app_name="production_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200000
            ),
            "balanced": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=50,
                input_tpm=100000,
                output_tpm=50000
            )
        }
    ),
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.LARGE
    )
)

session = fc.Session.get_or_create(config)

# Identical pipeline code
df = session.read.csv("s3://bucket/data/*.csv")
processed = df.select(
    fc.semantic.extract(
        fc.col("text"),
        Schema
    ).alias("extracted")
)
processed.write.parquet("s3://bucket/results/")

Best Practices

Model Selection Strategy

Task-Based Model Assignment:

  • Classification/filtering: Fast, cost-efficient models (gpt-4o-mini, gemini-flash)
  • Structured extraction: Balanced models (claude-3-5-haiku)
  • Deep reasoning: Powerful models (gpt-4o, claude-opus) only when necessary
python
language_models = {
    "nano": fc.OpenAILanguageModel(
        model_name="gpt-4o-mini",
        rpm=500,
        tpm=200000
    ),
    "standard": fc.AnthropicLanguageModel(
        model_name="claude-3-5-haiku-latest",
        rpm=100,
        input_tpm=100000,
        output_tpm=50000
    ),
    "premium": fc.OpenAILanguageModel(
        model_name="gpt-4o",
        rpm=100,
        tpm=100000
    )
}

# Route operations to appropriate models
df.select(
    fc.semantic.classify(
        fc.col("text"),
        classes=["bug", "feature", "question"],
        model_alias="nano"
    ).alias("category")
).filter(
    fc.col("category") == "bug"
).select(
    fc.semantic.extract(
        fc.col("text"),
        BugReport,
        model_alias="standard"
    ).alias("details")
)

Caching Strategy

Cache expensive operations at strategic points:

python
pipeline = (
    df
    .select(
        fc.semantic.extract(
            fc.col("text"),
            Schema,
            model_alias="premium"
        ).alias("extracted")
    )
    .cache()  # Cache expensive extraction
    .filter(fc.col("extracted.priority") == "high")
    .semantic.join(other=reference_data, ...)
)

When to Cache:

  • After expensive extraction operations
  • Before iterative analysis/filtering
  • When reusing intermediate results across pipelines

Provider Performance Monitoring

Track per-provider metrics for optimization:

python
metrics = session.table("fenic_system.query_metrics")

provider_performance = (
    metrics
    .group_by("model")
    .agg(
        fc.avg("latency_ms").alias("avg_latency"),
        fc.sum("cost_usd").alias("total_cost"),
        fc.count("*").alias("requests")
    )
    .order_by("total_cost", ascending=False)
)

provider_performance.show()

Context Window Management

Account for model-specific limits:

python
# Select model based on content length
df.with_column(
    "length",
    fc.length(fc.col("content"))
).with_column(
    "model_to_use",
    fc.when(fc.col("length") < 8000, fc.lit("fast"))
      .when(fc.col("length") < 100000, fc.lit("standard"))
      .otherwise(fc.lit("efficient"))
).select(
    fc.semantic.extract(
        fc.col("content"),
        Schema,
        model_alias=fc.col("model_to_use")
    ).alias("extracted")
)

Implementation Examples

Support Ticket Triage

python
from pydantic import BaseModel
from typing import Literal, List

class TicketAnalysis(BaseModel):
    severity: Literal["low", "medium", "high", "critical"]
    category: str
    needs_engineering: bool
    resolution_time: str

tickets = session.read.csv("tickets.csv")

processed = (
    tickets
    .select(
        "*",
        fc.semantic.extract(
            fc.col("description"),
            TicketAnalysis,
            model_alias="fast"
        ).alias("analysis")
    )
    .filter(
        (fc.col("analysis.severity") == "high") |
        (fc.col("analysis.severity") == "critical")
    )
    .semantic.join(
        other=knowledge_base,
        predicate="Find solutions for {{left_on}}",
        left_on=fc.col("description"),
        right_on=fc.col("solution"),
        model_alias="balanced"
    )
)

results = processed.collect()

Content Moderation

python
class ModerationResult(BaseModel):
    is_safe: bool
    categories: List[Literal[
        "hate", "violence", "sexual", "harassment", "spam"
    ]]
    confidence: float
    needs_review: bool

content = session.read.docs("posts/*.json", content_type="json")

flagged = (
    content
    .select(
        "*",
        fc.semantic.extract(
            fc.col("post"),
            ModerationResult,
            model_alias="fast"
        ).alias("moderation")
    )
    .filter(
        (~fc.col("moderation.is_safe")) |
        (fc.col("moderation.needs_review"))
    )
)

flagged_content = flagged.collect()

Market Intelligence

python
from pydantic import BaseModel, Field
from typing import Literal, List

class CompetitorInsight(BaseModel):
    sentiment: Literal["positive", "negative", "neutral"]
    features: List[str]
    pricing_mentioned: bool
    pain_points: List[str]

reviews = session.read.csv("reviews.csv")

analysis = (
    reviews
    .select(
        "*",
        fc.semantic.extract(
            fc.col("review"),
            CompetitorInsight,
            model_alias="balanced"
        ).alias("insights")
    )
    .with_column(
        "review_vector",
        fc.semantic.embed(fc.col("review"))
    )
    .semantic.with_cluster_labels(
        by=fc.col("review_vector"),
        num_clusters=10,
        label_column="theme"
    )
    .group_by("theme")
    .agg(
        fc.semantic.reduce(
            "Summarize common themes",
            fc.col("review"),
            model_alias="premium"
        ).alias("summary")
    )
)

insights = analysis.collect()

Implementation Checklist

Initial Setup:

  • Configure all required model providers with rate limits
  • Validate API keys during session initialization
  • Set up local development environment
  • Implement metrics collection

Pipeline Development:

  • Define Pydantic schemas for type-safe extraction
  • Select appropriate models for each stage
  • Implement filtering before expensive operations
  • Add caching at strategic points
  • Test locally with data subset

Production Deployment:

  • Configure cloud compute resources
  • Set up monitoring for provider performance
  • Implement alerting for quota/rate limit issues
  • Document model selection rationale
  • Plan cost optimization based on metrics

Ongoing Optimization:

  • Monitor per-provider costs and latency
  • Adjust model selection based on performance
  • Refine schemas based on extraction quality
  • Optimize batch sizes for throughput
  • Review rate limit configurations

Resources

Getting Started:

Advanced Topics:

Batch inference automation across multiple providers requires managing rate limits, costs, and provider-specific behaviors. Fenic provides an inference-first architecture that treats multi-provider batch processing as native DataFrame operations, enabling production-grade pipelines without custom orchestration code. How to Automate Batch Infere ... efcf0801bafb3c6c415e22640.md External Displaying How to Automate Batch Inference Across Multiple Mo 2a6df41efcf0801bafb3c6c415e22640.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.