Integrating multiple LLM providers into data pipelines requires infrastructure that treats model inference as a first-class operation rather than an external API call. Fenic, Typedef's open-source DataFrame framework, enables production-ready multi-provider integration with declarative configuration, automatic batching, and unified semantic operators.
This guide provides implementation patterns for integrating OpenAI and Google Vertex AI models into data transformation pipelines using Fenic's inference-first architecture.
Prerequisites and Installation
Fenic requires Python 3.10, 3.11, or 3.12. Install the framework:
bashpip install fenic
Set up provider API keys in your environment:
bashexport OPENAI_API_KEY="your-openai-api-key" export GOOGLE_API_KEY="your-google-api-key" # For Google Developer API # For Google Vertex AI, configure gcloud authentication gcloud auth application-default login
Configuring OpenAI Models
OpenAI models in Fenic use the OpenAILanguageModel configuration class, which requires the model name and rate limiting parameters.
Basic OpenAI Configuration
pythonimport fenic as fc config = fc.SessionConfig( app_name="openai_pipeline", semantic=fc.SemanticConfig( language_models={ "gpt-nano": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, # Requests per minute tpm=200_000 # Tokens per minute ), "gpt-mini": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ) }, default_language_model="gpt-nano" ) ) session = fc.Session.get_or_create(config)
The rpm and tpm parameters define rate limits that Fenic enforces automatically through self-throttling and request batching. This prevents API throttling while maximizing throughput.
OpenAI Reasoning Models with Profiles
For OpenAI's o-series reasoning models, configure profiles to control reasoning effort:
pythonconfig = fc.SessionConfig( app_name="reasoning_pipeline", semantic=fc.SemanticConfig( language_models={ "o4-mini": fc.OpenAILanguageModel( model_name="o4-mini", rpm=100, tpm=100_000, profiles={ "fast": fc.OpenAILanguageModel.Profile( reasoning_effort="low" ), "thorough": fc.OpenAILanguageModel.Profile( reasoning_effort="high" ) }, default_profile="fast" ) }, default_language_model="o4-mini" ) )
Profiles enable dynamic model behavior selection within pipelines without changing code:
python# Use default "fast" profile df.with_column( "analysis", fc.semantic.map( "Analyze {{ text }}", text=fc.col("content"), model_alias="o4-mini" ) ) # Override to "thorough" profile for complex cases df.with_column( "analysis", fc.semantic.map( "Analyze {{ text }}", text=fc.col("content"), model_alias=fc.ModelAlias(name="o4-mini", profile="thorough") ) )
Configuring Google Vertex AI Models
Google Vertex AI integration requires two steps: authentication setup and model configuration.
Authentication Setup
Vertex AI uses Google Cloud credentials. Configure authentication:
bash# Set up application default credentials gcloud auth application-default login # Or set service account key explicitly export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
Basic Vertex AI Configuration
pythonconfig = fc.SessionConfig( app_name="vertex_pipeline", semantic=fc.SemanticConfig( language_models={ "gemini-flash": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=300, tpm=150_000 ), "gemini-pro": fc.GoogleVertexLanguageModel( model_name="gemini-2.5-flash", rpm=100, tpm=100_000 ) }, default_language_model="gemini-flash" ) ) session = fc.Session.get_or_create(config)
Vertex AI Reasoning Models with Thinking Budgets
Gemini 2.5 and later models support thinking token budgets for enhanced reasoning:
pythonconfig = fc.SessionConfig( app_name="gemini_reasoning", semantic=fc.SemanticConfig( language_models={ "gemini-reasoning": fc.GoogleVertexLanguageModel( model_name="gemini-2.5-flash", rpm=100, tpm=100_000, profiles={ "thinking_disabled": fc.GoogleVertexLanguageModel.Profile(), "fast": fc.GoogleVertexLanguageModel.Profile( thinking_token_budget=1024 ), "thorough": fc.GoogleVertexLanguageModel.Profile( thinking_token_budget=8192 ) }, default_profile="fast" ) }, default_language_model="gemini-reasoning" ) )
Multi-Provider Pipeline Configuration
Production pipelines benefit from strategic multi-provider strategies. Configure multiple providers with model aliases that abstract implementation details:
pythonconfig = fc.SessionConfig( app_name="multi_provider_pipeline", semantic=fc.SemanticConfig( language_models={ # OpenAI models "nano": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 ), "mini": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ), # Google Vertex models "flash": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=300, tpm=150_000 ), "flash-lite": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=400, tpm=200_000 ) }, default_language_model="flash-lite" ) ) session = fc.Session.get_or_create(config)
This configuration enables cost and performance optimization by routing different operations to appropriate models:
python# Use cheap model for simple classification df = df.with_column( "category", fc.semantic.classify( fc.col("content"), ["technical", "business", "personal"], model_alias="nano" ) ) # Use fast model for bulk extraction df = df.with_column( "extracted", fc.semantic.extract(fc.col("content"), DataSchema, model_alias="flash-lite") ) # Use powerful model for complex reasoning df = df.with_column( "analysis", fc.semantic.map( "Provide detailed analysis: {{ text }}", text=fc.col("content"), model_alias="mini" ) )
Implementing Schema-Driven Extraction
The semantic.extract operator transforms unstructured text into structured data using Pydantic schemas. This works identically across OpenAI and Google Vertex models:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class Issue(BaseModel): category: Literal["bug", "feature", "question"] severity: Literal["low", "medium", "high"] description: str = Field(description="Brief issue description") class SupportTicket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] issues: List[Issue] sentiment: Literal["positive", "neutral", "negative"] # Works with both OpenAI and Vertex models tickets_openai = ( df.with_column( "parsed", fc.semantic.extract(fc.col("raw_text"), SupportTicket, model_alias="mini") ) .unnest("parsed") ) tickets_vertex = ( df.with_column( "parsed", fc.semantic.extract(fc.col("raw_text"), SupportTicket, model_alias="flash") ) .unnest("parsed") )
The schema drives extraction quality. Clear field descriptions improve results:
pythonclass Document(BaseModel): title: str = Field(description="Document title or heading") summary: str = Field( description="2-3 sentence summary covering main points" ) entities: List[str] = Field( description="Named entities: people, organizations, locations" ) key_dates: List[str] = Field( description="Important dates mentioned in ISO format YYYY-MM-DD" )
Building Semantic Filters with Predicates
The semantic.predicate operator enables natural language filtering across both providers:
python# Filter with OpenAI filtered_openai = df.filter( (fc.col("score") > 50) & fc.semantic.predicate( "Does {{ resume }} show experience with distributed systems?", resume=fc.col("resume_text"), model_alias="nano" ) ) # Filter with Vertex AI filtered_vertex = df.filter( (fc.col("years") > 5) & fc.semantic.predicate( "Does {{ feedback }} mention UI/UX problems?", feedback=fc.col("customer_feedback"), model_alias="flash-lite" ) )
Combine traditional filters with semantic predicates to optimize costs. Run cheap boolean filters first to reduce data volume before expensive LLM operations.
Implementing Semantic Transformations
The semantic.map operator applies natural language transformations:
python# Summarization with OpenAI df_summarized = df.with_column( "summary", fc.semantic.map( "Summarize in 2 sentences: {{ article }}", article=fc.col("full_text"), model_alias="mini" ) ) # Translation with Vertex AI df_translated = df.with_column( "translated", fc.semantic.map( "Translate to Spanish: {{ text }}", text=fc.col("english_text"), model_alias="flash" ) )
Use structured outputs with response_format parameter:
pythonclass Summary(BaseModel): headline: str = Field(description="Compelling headline") bullets: List[str] = Field(description="3-5 key points") sentiment: Literal["positive", "neutral", "negative"] df_structured = df.with_column( "structured_summary", fc.semantic.map( "Analyze {{ content }}", content=fc.col("article"), response_format=Summary, model_alias="mini" ) )
Advanced: Semantic Joins Across Providers
Semantic joins match DataFrames based on meaning rather than exact values:
pythonfrom fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Define examples to guide matching examples = JoinExampleCollection() examples.create_example(JoinExample( left="5 years Python backend development, FastAPI, PostgreSQL", right="Senior Backend Engineer - Python expertise required", output=True )) examples.create_example(JoinExample( left="3 years frontend React development", right="Senior Backend Engineer", output=False )) # Semantic join with OpenAI matched_openai = applicants.semantic.join( other=jobs, predicate=""" Candidate: {{ left_on }} Job: {{ right_on }} Is the candidate qualified for this role? """, left_on=fc.col("resume"), right_on=fc.col("job_description"), examples=examples, model_alias="mini" ) # Same join logic with Vertex AI matched_vertex = applicants.semantic.join( other=jobs, predicate=""" Candidate: {{ left_on }} Job: {{ right_on }} Is the candidate qualified for this role? """, left_on=fc.col("resume"), right_on=fc.col("job_description"), examples=examples, model_alias="flash" )
Processing PDFs with Google Vertex Native Support
Fenic 0.5.0 added native PDF parsing with Google Vertex models:
pythonconfig = fc.SessionConfig( app_name="pdf_processing", semantic=fc.SemanticConfig( language_models={ "gemini": fc.GoogleDeveloperLanguageModel( model_name="gemini-2.0-flash", rpm=100, tpm=1000 ) }, default_language_model="gemini" ) ) session = fc.Session.get_or_create(config) # Read PDF metadata pdfs = session.read.pdf_metadata("data/docs/**/*.pdf", recursive=True) # Parse PDFs to markdown with page chunking markdown = pdfs.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ).alias("markdown") )
Google Vertex models use the native file API for PDFs, providing optimized token accounting and improved batching. OpenAI models work via standard text prompting after PDF text extraction.
Rate Limiting and Cost Management
Fenic automatically handles rate limiting and batching for both providers. The framework:
- Tracks token usage in real-time
- Self-throttles when approaching limits
- Batches requests for optimal throughput
- Implements exponential backoff for retries
- Caches identical inference calls within sessions
Monitor costs and performance:
pythonresult = pipeline.collect() metrics = result.metrics() # Overall metrics print(f"Total cost: ${metrics.lm_metrics.total_cost}") print(f"Total tokens: {metrics.lm_metrics.total_tokens}") print(f"Execution time: {metrics.execution_time}s") # Per-operator metrics for op in metrics.operator_metrics: print(f"{op.name}: ${op.cost}, {op.latency}ms")
Production Pipeline Example
Complete pipeline using both OpenAI and Google Vertex models:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class DocumentMetadata(BaseModel): title: str category: Literal["technical", "business", "legal"] entities: List[str] class ContentAnalysis(BaseModel): summary: str = Field(description="2-3 sentence summary") sentiment: Literal["positive", "neutral", "negative"] key_topics: List[str] config = fc.SessionConfig( app_name="production_pipeline", semantic=fc.SemanticConfig( language_models={ "classifier": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 ), "extractor": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=400, tpm=200_000 ), "analyzer": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ) }, default_language_model="extractor" ) ) session = fc.Session.get_or_create(config) # Load raw documents df = session.read.docs("data/documents/*.txt", content_type="markdown") # Multi-stage pipeline with strategic model selection processed = ( df # Quick classification with cheap model .with_column( "category", fc.semantic.classify( fc.col("content"), ["technical", "business", "legal"], model_alias="classifier" ) ) # Filter early to reduce downstream processing .filter(fc.col("category").isin(["technical", "business"])) # Extract structured metadata with fast model .with_column( "metadata", fc.semantic.extract(fc.col("content"), DocumentMetadata, model_alias="extractor") ) .unnest("metadata") # Deep analysis with more capable model .with_column( "analysis", fc.semantic.extract(fc.col("content"), ContentAnalysis, model_alias="analyzer") ) .unnest("analysis") ) # Write results processed.write.parquet("output/processed_documents.parquet") # Review metrics metrics = processed.collect().metrics() print(f"Pipeline cost: ${metrics.lm_metrics.total_cost}") print(f"Total documents processed: {processed.count()}")
Hybrid Provider Strategies
Route operations strategically based on model strengths:
python# OpenAI excels at structured extraction structured_data = df.with_column( "extracted", fc.semantic.extract(fc.col("text"), ComplexSchema, model_alias="mini") ) # Vertex AI provides fast bulk processing bulk_classified = df.with_column( "category", fc.semantic.classify(fc.col("text"), categories, model_alias="flash-lite") ) # Use appropriate model for task complexity simple_summaries = df.with_column( "summary", fc.semantic.map("Summarize: {{ text }}", text=fc.col("content"), model_alias="nano") ) detailed_analysis = df.with_column( "analysis", fc.semantic.map( "Provide detailed analysis: {{ text }}", text=fc.col("content"), model_alias="mini" ) )
Error Handling and Resilience
Fenic handles provider-specific error patterns automatically:
- OpenAI 429 quota exhausted: fails fast with clear errors
- Vertex AI transient failures: automatic retry with exponential backoff
- Token capacity violations: early validation before requests
- Invalid responses: structured logging with row-level lineage
Track failures in production:
pythontry: result = pipeline.collect() except Exception as e: # Fenic provides detailed error context print(f"Pipeline failed: {e}") print(f"Failed at operator: {e.operator_name}") print(f"Row context: {e.row_data}")
Best Practices
Model Selection Strategy
- Use OpenAI nano/mini for cost-sensitive classification tasks
- Use Vertex Flash models for high-throughput bulk processing
- Reserve powerful models (GPT-4o, Gemini Pro) for complex reasoning
- Test with small datasets to validate costs before scaling
Configuration Management
- Store configurations in version control
- Use environment variables for API keys
- Define model aliases that abstract provider details
- Configure rate limits based on your API tier
Performance Optimization
- Filter data early with cheap boolean operations
- Use smaller models for initial classification
- Cache intermediate results for iterative development
- Monitor metrics to identify expensive operations
Production Deployment
- Implement comprehensive error handling
- Set up cost alerts based on token usage
- Use row-level lineage for debugging
- Test failover strategies between providers
Integration with Existing Infrastructure
Fenic integrates seamlessly with existing data infrastructure. Read more about:
- Building reliable AI pipelines with semantic operators
- Creating composable semantic operators for data transformation
- Orchestrating reliable agents with Typedef and LangGraph
- Using Fenic as a data layer for LangChain
Next Steps
Start with simple extraction or classification tasks using a single provider. Test with small datasets to validate results and costs. Scale gradually while monitoring metrics. Add multi-provider strategies as your pipeline grows in complexity.
The declarative DataFrame API abstracts provider-specific implementation details, enabling seamless migration between models as requirements change. This architectural approach provides the flexibility needed for production AI systems.
For complete examples and documentation, visit the Fenic GitHub repository or read the Fenic open source announcement. How to Integrate OpenAI and ... efcf080bbaad1e0fafa4bf605.md External Displaying How to Integrate OpenAI and Google Vertex Models i 2a6df41efcf080bbaad1e0fafa4bf605.md.

