Batch inference across multiple LLM providers requires managing different API rate limits, handling provider-specific failures, optimizing costs, and maintaining type safety. This guide shows how to build production-grade batch inference pipelines using Fenic, a DataFrame framework purpose-built for AI workloads.
Multi-Provider Batch Inference Requirements
Technical Challenges
Rate Limit Constraints
- Each provider enforces distinct RPM (requests per minute) and TPM (tokens per minute) limits
- OpenAI: 500 RPM typical
- Anthropic: 50 RPM for Claude models with separate input/output TPM
- Google Vertex: 200 RPM for Gemini models
- Requires per-provider throttling logic
Provider-Specific Error Handling
- Different error codes across providers
- 429 responses require distinct retry strategies
- Quota exhaustion vs temporary rate limiting
- Transient network failures need exponential backoff
Cost Management
- Model pricing varies 10x or more between providers
- Task-appropriate model selection reduces costs significantly
- Token usage tracking required for budget control
- Need visibility into per-operation expenses
Context Window Limits
- Each model supports different maximum token counts
- Document chunking strategies vary by provider capabilities
- Request batching must respect context limits
Provider Configuration
Declarative Setup
Fenic eliminates orchestration code through declarative provider registration. Define all providers with their constraints once, then reference them throughout your pipeline.
pythonimport fenic as fc config = fc.SessionConfig( app_name="batch_inference_pipeline", semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200000 ), "balanced": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000 ), "efficient": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=200, tpm=200000 ), "powerful": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100000 ) }, default_language_model="fast" ) ) session = fc.Session.get_or_create(config)
Rate Limit Enforcement
The framework automatically enforces configured rate limits:
- RPM limiting: Throttles requests to stay within per-minute quotas
- TPM tracking: Monitors token consumption against limits
- Adaptive throttling: Adjusts request rates based on 429 responses
- Provider isolation: Rate limits enforced independently per provider
API Key Validation
Provider keys validate during session creation, preventing runtime surprises:
python# Keys validated here - fails immediately if misconfigured session = fc.Session.get_or_create(config) # By this point, all provider credentials confirmed valid
Invalid keys produce clear error messages before any processing begins.
OpenRouter Integration
OpenRouter support enables dynamic provider routing based on cost, latency, or throughput:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "adaptive": fc.OpenRouterLanguageModel( model_name="openai/gpt-4o", profiles={ "low_latency": fc.OpenRouterLanguageModel.Profile( provider=fc.OpenRouterLanguageModel.Provider( sort="latency" ) ), "cost_optimized": fc.OpenRouterLanguageModel.Profile( provider=fc.OpenRouterLanguageModel.Provider( sort="price" ) ) }, default_profile="low_latency" ) } ) )
Batch Processing Patterns
Schema-Driven Extraction
Type-safe extraction through Pydantic schemas ensures consistent outputs across providers:
pythonfrom pydantic import BaseModel, Field from typing import Literal, List class FeedbackAnalysis(BaseModel): sentiment: Literal["positive", "negative", "neutral"] urgency: Literal["low", "medium", "high", "critical"] categories: List[str] issues: List[str] df = session.read.csv("feedback.csv") results = df.select( "*", fc.semantic.extract( fc.col("text"), FeedbackAnalysis, model_alias="fast" ).alias("analysis") )
Automatic Optimizations:
- Request batching where provider APIs support it
- Rate limit enforcement per configured constraints
- Retry logic with exponential backoff
- Individual row failure handling (returns None vs pipeline failure)
Multi-Stage Provider Selection
Optimize costs by routing different stages to appropriate models:
pythonclass QuickClassification(BaseModel): is_technical: bool needs_escalation: bool class DetailedAnalysis(BaseModel): root_cause: str actions: List[str] timeline: str pipeline = ( df # Stage 1: Fast classification on all records .select( "*", fc.semantic.extract( fc.col("content"), QuickClassification, model_alias="fast" ).alias("triage") ) # Stage 2: Detailed analysis only on escalated items .filter(fc.col("triage.needs_escalation") == True) .select( "*", fc.semantic.extract( fc.col("content"), DetailedAnalysis, model_alias="powerful" ).alias("details") ) ) results = pipeline.collect()
Cost Optimization Strategy:
- Use inexpensive models for classification/filtering
- Apply expensive models only to filtered subsets
- Minimize total API spend while maintaining quality
Concurrent Processing with Async UDFs
Async UDFs enable parallel external API calls with bounded concurrency:
pythonimport aiohttp @fc.async_udf( return_type=fc.IntegerType, max_concurrency=10, timeout_seconds=5, num_retries=2 ) async def fetch_score(customer_id: int) -> int: async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/score/{customer_id}" ) as resp: data = await resp.json() return data["score"] df = df.select( fc.col("customer_id"), fetch_score(fc.col("customer_id")).alias("score") )
Async UDF Features:
- Configurable concurrency limits prevent resource exhaustion
- Per-item timeouts avoid hung requests
- Automatic retry with exponential backoff
- Ordered results maintain input row sequence
- Individual failures return None without breaking pipeline
Production Reliability
Automatic Batching
The query optimizer groups operations before execution:
- Request consolidation: Multiple rows batched into single API calls where supported
- Lazy evaluation: Complete pipeline analysis enables global optimization
- Operation reordering: Expensive inference moved after filtering
python# Optimizer automatically batches and reorders pipeline = ( df .filter(fc.col("priority") == "high") # Filter applied first .select( "*", fc.semantic.extract( fc.col("text"), Schema, model_alias="balanced" ).alias("extracted") # Extraction only on filtered rows ) )
Self-Throttling
Built-in rate limiting prevents API failures:
- Provider-specific RPM/TPM enforcement
- Adaptive rate adjustment based on 429 responses
- Fast failure on quota exhaustion with actionable errors
pythonconfig = fc.SemanticConfig( language_models={ "throttled": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=50, # Automatically enforced input_tpm=100000, output_tpm=50000 ) } )
Error Recovery
Production error handling includes:
- Exponential backoff retries for transient failures
- Immediate failure on quota exhaustion (e.g., OpenAI 429 quota errors)
- Individual row failures return None instead of breaking entire pipeline
- Detailed error logging for debugging production issues
Cost Tracking
Monitor spending and performance across providers:
pythonresults = pipeline.collect() # Access detailed metrics print(f"Total tokens: {results.metrics.total_lm_metrics.num_output_tokens}") print(f"Cost: ${results.metrics.total_lm_metrics.cost:.4f}") print(f"Execution time: {results.metrics.execution_time_ms}ms") # Query local metrics table metrics = session.table("fenic_system.query_metrics") cost_analysis = ( metrics .group_by("model") .agg( fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency"), fc.count("*").alias("calls") ) .order_by("total_cost", ascending=False) ) cost_analysis.show()
Advanced Batch Scenarios
Document Processing at Scale
Process large document collections with page-level chunking:
python# Load PDF metadata for filtering pdfs = session.read.pdf_metadata( "documents/**/*.pdf", recursive=True ) # Filter before expensive parsing filtered = pdfs.filter( (fc.col("page_count") > 5) & (fc.col("page_count") < 100) ) # Parse only filtered documents markdown = filtered.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ).alias("content") )
Details: Fenic 0.5.0 document processing features
Semantic Joins
Match records based on semantic similarity:
pythonmatched = ( candidates .semantic.join( other=jobs, predicate=""" Evaluate candidate fit for job. Candidate: {{left_on}} Requirements: {{right_on}} """, left_on=fc.col("resume"), right_on=fc.col("description"), model_alias="balanced" ) .order_by(fc.col("application_date")) .limit(100) )
HuggingFace Dataset Integration
Process datasets directly from HuggingFace:
pythondf = session.read.csv("hf://datasets/squad/default/train.csv") class AnalysisResult(BaseModel): question_type: str difficulty: Literal["easy", "medium", "hard"] concepts: List[str] processed = df.select( "*", fc.semantic.extract( fc.col("context"), AnalysisResult, model_alias="fast" ).alias("analysis") )
Local to Production Scaling
Local Development
Full engine capabilities available locally:
pythonsession = fc.Session.get_or_create(config) df = session.read.csv("sample.csv") processed = df.select( fc.semantic.extract( fc.col("text"), Schema ).alias("extracted") ) # Cache for iterative development processed.cache() processed.write.parquet("local_results.parquet")
Production Deployment
Same code scales to cloud with zero modifications:
pythonconfig = fc.SessionConfig( app_name="production_pipeline", semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200000 ), "balanced": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000 ) } ), cloud=fc.CloudConfig( size=fc.CloudExecutorSize.LARGE ) ) session = fc.Session.get_or_create(config) # Identical pipeline code df = session.read.csv("s3://bucket/data/*.csv") processed = df.select( fc.semantic.extract( fc.col("text"), Schema ).alias("extracted") ) processed.write.parquet("s3://bucket/results/")
Best Practices
Model Selection Strategy
Task-Based Model Assignment:
- Classification/filtering: Fast, cost-efficient models (gpt-4o-mini, gemini-flash)
- Structured extraction: Balanced models (claude-3-5-haiku)
- Deep reasoning: Powerful models (gpt-4o, claude-opus) only when necessary
pythonlanguage_models = { "nano": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200000 ), "standard": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100000, output_tpm=50000 ), "premium": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=100, tpm=100000 ) } # Route operations to appropriate models df.select( fc.semantic.classify( fc.col("text"), classes=["bug", "feature", "question"], model_alias="nano" ).alias("category") ).filter( fc.col("category") == "bug" ).select( fc.semantic.extract( fc.col("text"), BugReport, model_alias="standard" ).alias("details") )
Caching Strategy
Cache expensive operations at strategic points:
pythonpipeline = ( df .select( fc.semantic.extract( fc.col("text"), Schema, model_alias="premium" ).alias("extracted") ) .cache() # Cache expensive extraction .filter(fc.col("extracted.priority") == "high") .semantic.join(other=reference_data, ...) )
When to Cache:
- After expensive extraction operations
- Before iterative analysis/filtering
- When reusing intermediate results across pipelines
Provider Performance Monitoring
Track per-provider metrics for optimization:
pythonmetrics = session.table("fenic_system.query_metrics") provider_performance = ( metrics .group_by("model") .agg( fc.avg("latency_ms").alias("avg_latency"), fc.sum("cost_usd").alias("total_cost"), fc.count("*").alias("requests") ) .order_by("total_cost", ascending=False) ) provider_performance.show()
Context Window Management
Account for model-specific limits:
python# Select model based on content length df.with_column( "length", fc.length(fc.col("content")) ).with_column( "model_to_use", fc.when(fc.col("length") < 8000, fc.lit("fast")) .when(fc.col("length") < 100000, fc.lit("standard")) .otherwise(fc.lit("efficient")) ).select( fc.semantic.extract( fc.col("content"), Schema, model_alias=fc.col("model_to_use") ).alias("extracted") )
Implementation Examples
Support Ticket Triage
pythonfrom pydantic import BaseModel from typing import Literal, List class TicketAnalysis(BaseModel): severity: Literal["low", "medium", "high", "critical"] category: str needs_engineering: bool resolution_time: str tickets = session.read.csv("tickets.csv") processed = ( tickets .select( "*", fc.semantic.extract( fc.col("description"), TicketAnalysis, model_alias="fast" ).alias("analysis") ) .filter( (fc.col("analysis.severity") == "high") | (fc.col("analysis.severity") == "critical") ) .semantic.join( other=knowledge_base, predicate="Find solutions for {{left_on}}", left_on=fc.col("description"), right_on=fc.col("solution"), model_alias="balanced" ) ) results = processed.collect()
Content Moderation
pythonclass ModerationResult(BaseModel): is_safe: bool categories: List[Literal[ "hate", "violence", "sexual", "harassment", "spam" ]] confidence: float needs_review: bool content = session.read.docs("posts/*.json", content_type="json") flagged = ( content .select( "*", fc.semantic.extract( fc.col("post"), ModerationResult, model_alias="fast" ).alias("moderation") ) .filter( (~fc.col("moderation.is_safe")) | (fc.col("moderation.needs_review")) ) ) flagged_content = flagged.collect()
Market Intelligence
pythonfrom pydantic import BaseModel, Field from typing import Literal, List class CompetitorInsight(BaseModel): sentiment: Literal["positive", "negative", "neutral"] features: List[str] pricing_mentioned: bool pain_points: List[str] reviews = session.read.csv("reviews.csv") analysis = ( reviews .select( "*", fc.semantic.extract( fc.col("review"), CompetitorInsight, model_alias="balanced" ).alias("insights") ) .with_column( "review_vector", fc.semantic.embed(fc.col("review")) ) .semantic.with_cluster_labels( by=fc.col("review_vector"), num_clusters=10, label_column="theme" ) .group_by("theme") .agg( fc.semantic.reduce( "Summarize common themes", fc.col("review"), model_alias="premium" ).alias("summary") ) ) insights = analysis.collect()
Implementation Checklist
Initial Setup:
- Configure all required model providers with rate limits
- Validate API keys during session initialization
- Set up local development environment
- Implement metrics collection
Pipeline Development:
- Define Pydantic schemas for type-safe extraction
- Select appropriate models for each stage
- Implement filtering before expensive operations
- Add caching at strategic points
- Test locally with data subset
Production Deployment:
- Configure cloud compute resources
- Set up monitoring for provider performance
- Implement alerting for quota/rate limit issues
- Document model selection rationale
- Plan cost optimization based on metrics
Ongoing Optimization:
- Monitor per-provider costs and latency
- Adjust model selection based on performance
- Refine schemas based on extraction quality
- Optimize batch sizes for throughput
- Review rate limit configurations
Resources
Getting Started:
- Fenic GitHub Repository
- Fenic 0.4.0 Release Notes
- Building Reliable AI Pipelines
- Open Source Announcement
Advanced Topics:
Batch inference automation across multiple providers requires managing rate limits, costs, and provider-specific behaviors. Fenic provides an inference-first architecture that treats multi-provider batch processing as native DataFrame operations, enabling production-grade pipelines without custom orchestration code. How to Automate Batch Infere ... efcf0801bafb3c6c415e22640.md External Displaying How to Automate Batch Inference Across Multiple Mo 2a6df41efcf0801bafb3c6c415e22640.md.

