<< goback()

How to Integrate OpenAI and Google Vertex Models into Data Pipelines

Typedef Team

How to Integrate OpenAI and Google Vertex Models into Data Pipelines

Integrating multiple LLM providers into data pipelines requires infrastructure that treats model inference as a first-class operation rather than an external API call. Fenic, Typedef's open-source DataFrame framework, enables production-ready multi-provider integration with declarative configuration, automatic batching, and unified semantic operators.

This guide provides implementation patterns for integrating OpenAI and Google Vertex AI models into data transformation pipelines using Fenic's inference-first architecture.

Prerequisites and Installation

Fenic requires Python 3.10, 3.11, or 3.12. Install the framework:

bash
pip install fenic

Set up provider API keys in your environment:

bash
export OPENAI_API_KEY="your-openai-api-key"
export GOOGLE_API_KEY="your-google-api-key"  # For Google Developer API
# For Google Vertex AI, configure gcloud authentication
gcloud auth application-default login

Configuring OpenAI Models

OpenAI models in Fenic use the OpenAILanguageModel configuration class, which requires the model name and rate limiting parameters.

Basic OpenAI Configuration

python
import fenic as fc

config = fc.SessionConfig(
    app_name="openai_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt-nano": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=500,  # Requests per minute
                tpm=200_000  # Tokens per minute
            ),
            "gpt-mini": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=300,
                tpm=150_000
            )
        },
        default_language_model="gpt-nano"
    )
)

session = fc.Session.get_or_create(config)

The rpm and tpm parameters define rate limits that Fenic enforces automatically through self-throttling and request batching. This prevents API throttling while maximizing throughput.

OpenAI Reasoning Models with Profiles

For OpenAI's o-series reasoning models, configure profiles to control reasoning effort:

python
config = fc.SessionConfig(
    app_name="reasoning_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "o4-mini": fc.OpenAILanguageModel(
                model_name="o4-mini",
                rpm=100,
                tpm=100_000,
                profiles={
                    "fast": fc.OpenAILanguageModel.Profile(
                        reasoning_effort="low"
                    ),
                    "thorough": fc.OpenAILanguageModel.Profile(
                        reasoning_effort="high"
                    )
                },
                default_profile="fast"
            )
        },
        default_language_model="o4-mini"
    )
)

Profiles enable dynamic model behavior selection within pipelines without changing code:

python
# Use default "fast" profile
df.with_column(
    "analysis",
    fc.semantic.map(
        "Analyze {{ text }}",
        text=fc.col("content"),
        model_alias="o4-mini"
    )
)

# Override to "thorough" profile for complex cases
df.with_column(
    "analysis",
    fc.semantic.map(
        "Analyze {{ text }}",
        text=fc.col("content"),
        model_alias=fc.ModelAlias(name="o4-mini", profile="thorough")
    )
)

Configuring Google Vertex AI Models

Google Vertex AI integration requires two steps: authentication setup and model configuration.

Authentication Setup

Vertex AI uses Google Cloud credentials. Configure authentication:

bash
# Set up application default credentials
gcloud auth application-default login

# Or set service account key explicitly
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"

Basic Vertex AI Configuration

python
config = fc.SessionConfig(
    app_name="vertex_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "gemini-flash": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=300,
                tpm=150_000
            ),
            "gemini-pro": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.5-flash",
                rpm=100,
                tpm=100_000
            )
        },
        default_language_model="gemini-flash"
    )
)

session = fc.Session.get_or_create(config)

Vertex AI Reasoning Models with Thinking Budgets

Gemini 2.5 and later models support thinking token budgets for enhanced reasoning:

python
config = fc.SessionConfig(
    app_name="gemini_reasoning",
    semantic=fc.SemanticConfig(
        language_models={
            "gemini-reasoning": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.5-flash",
                rpm=100,
                tpm=100_000,
                profiles={
                    "thinking_disabled": fc.GoogleVertexLanguageModel.Profile(),
                    "fast": fc.GoogleVertexLanguageModel.Profile(
                        thinking_token_budget=1024
                    ),
                    "thorough": fc.GoogleVertexLanguageModel.Profile(
                        thinking_token_budget=8192
                    )
                },
                default_profile="fast"
            )
        },
        default_language_model="gemini-reasoning"
    )
)

Multi-Provider Pipeline Configuration

Production pipelines benefit from strategic multi-provider strategies. Configure multiple providers with model aliases that abstract implementation details:

python
config = fc.SessionConfig(
    app_name="multi_provider_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            # OpenAI models
            "nano": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=500,
                tpm=200_000
            ),
            "mini": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=300,
                tpm=150_000
            ),

            # Google Vertex models
            "flash": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=300,
                tpm=150_000
            ),
            "flash-lite": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash-lite",
                rpm=400,
                tpm=200_000
            )
        },
        default_language_model="flash-lite"
    )
)

session = fc.Session.get_or_create(config)

This configuration enables cost and performance optimization by routing different operations to appropriate models:

python
# Use cheap model for simple classification
df = df.with_column(
    "category",
    fc.semantic.classify(
        fc.col("content"),
        ["technical", "business", "personal"],
        model_alias="nano"
    )
)

# Use fast model for bulk extraction
df = df.with_column(
    "extracted",
    fc.semantic.extract(fc.col("content"), DataSchema, model_alias="flash-lite")
)

# Use powerful model for complex reasoning
df = df.with_column(
    "analysis",
    fc.semantic.map(
        "Provide detailed analysis: {{ text }}",
        text=fc.col("content"),
        model_alias="mini"
    )
)

Implementing Schema-Driven Extraction

The semantic.extract operator transforms unstructured text into structured data using Pydantic schemas. This works identically across OpenAI and Google Vertex models:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class Issue(BaseModel):
    category: Literal["bug", "feature", "question"]
    severity: Literal["low", "medium", "high"]
    description: str = Field(description="Brief issue description")

class SupportTicket(BaseModel):
    customer_tier: Literal["free", "pro", "enterprise"]
    issues: List[Issue]
    sentiment: Literal["positive", "neutral", "negative"]

# Works with both OpenAI and Vertex models
tickets_openai = (
    df.with_column(
        "parsed",
        fc.semantic.extract(fc.col("raw_text"), SupportTicket, model_alias="mini")
    )
    .unnest("parsed")
)

tickets_vertex = (
    df.with_column(
        "parsed",
        fc.semantic.extract(fc.col("raw_text"), SupportTicket, model_alias="flash")
    )
    .unnest("parsed")
)

The schema drives extraction quality. Clear field descriptions improve results:

python
class Document(BaseModel):
    title: str = Field(description="Document title or heading")
    summary: str = Field(
        description="2-3 sentence summary covering main points"
    )
    entities: List[str] = Field(
        description="Named entities: people, organizations, locations"
    )
    key_dates: List[str] = Field(
        description="Important dates mentioned in ISO format YYYY-MM-DD"
    )

Building Semantic Filters with Predicates

The semantic.predicate operator enables natural language filtering across both providers:

python
# Filter with OpenAI
filtered_openai = df.filter(
    (fc.col("score") > 50) &
    fc.semantic.predicate(
        "Does {{ resume }} show experience with distributed systems?",
        resume=fc.col("resume_text"),
        model_alias="nano"
    )
)

# Filter with Vertex AI
filtered_vertex = df.filter(
    (fc.col("years") > 5) &
    fc.semantic.predicate(
        "Does {{ feedback }} mention UI/UX problems?",
        feedback=fc.col("customer_feedback"),
        model_alias="flash-lite"
    )
)

Combine traditional filters with semantic predicates to optimize costs. Run cheap boolean filters first to reduce data volume before expensive LLM operations.

Implementing Semantic Transformations

The semantic.map operator applies natural language transformations:

python
# Summarization with OpenAI
df_summarized = df.with_column(
    "summary",
    fc.semantic.map(
        "Summarize in 2 sentences: {{ article }}",
        article=fc.col("full_text"),
        model_alias="mini"
    )
)

# Translation with Vertex AI
df_translated = df.with_column(
    "translated",
    fc.semantic.map(
        "Translate to Spanish: {{ text }}",
        text=fc.col("english_text"),
        model_alias="flash"
    )
)

Use structured outputs with response_format parameter:

python
class Summary(BaseModel):
    headline: str = Field(description="Compelling headline")
    bullets: List[str] = Field(description="3-5 key points")
    sentiment: Literal["positive", "neutral", "negative"]

df_structured = df.with_column(
    "structured_summary",
    fc.semantic.map(
        "Analyze {{ content }}",
        content=fc.col("article"),
        response_format=Summary,
        model_alias="mini"
    )
)

Advanced: Semantic Joins Across Providers

Semantic joins match DataFrames based on meaning rather than exact values:

python
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Define examples to guide matching
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="5 years Python backend development, FastAPI, PostgreSQL",
    right="Senior Backend Engineer - Python expertise required",
    output=True
))
examples.create_example(JoinExample(
    left="3 years frontend React development",
    right="Senior Backend Engineer",
    output=False
))

# Semantic join with OpenAI
matched_openai = applicants.semantic.join(
    other=jobs,
    predicate="""
    Candidate: {{ left_on }}
    Job: {{ right_on }}
    Is the candidate qualified for this role?
    """,
    left_on=fc.col("resume"),
    right_on=fc.col("job_description"),
    examples=examples,
    model_alias="mini"
)

# Same join logic with Vertex AI
matched_vertex = applicants.semantic.join(
    other=jobs,
    predicate="""
    Candidate: {{ left_on }}
    Job: {{ right_on }}
    Is the candidate qualified for this role?
    """,
    left_on=fc.col("resume"),
    right_on=fc.col("job_description"),
    examples=examples,
    model_alias="flash"
)

Processing PDFs with Google Vertex Native Support

Fenic 0.5.0 added native PDF parsing with Google Vertex models:

python
config = fc.SessionConfig(
    app_name="pdf_processing",
    semantic=fc.SemanticConfig(
        language_models={
            "gemini": fc.GoogleDeveloperLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=100,
                tpm=1000
            )
        },
        default_language_model="gemini"
    )
)

session = fc.Session.get_or_create(config)

# Read PDF metadata
pdfs = session.read.pdf_metadata("data/docs/**/*.pdf", recursive=True)

# Parse PDFs to markdown with page chunking
markdown = pdfs.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True
    ).alias("markdown")
)

Google Vertex models use the native file API for PDFs, providing optimized token accounting and improved batching. OpenAI models work via standard text prompting after PDF text extraction.

Rate Limiting and Cost Management

Fenic automatically handles rate limiting and batching for both providers. The framework:

  • Tracks token usage in real-time
  • Self-throttles when approaching limits
  • Batches requests for optimal throughput
  • Implements exponential backoff for retries
  • Caches identical inference calls within sessions

Monitor costs and performance:

python
result = pipeline.collect()
metrics = result.metrics()

# Overall metrics
print(f"Total cost: ${metrics.lm_metrics.total_cost}")
print(f"Total tokens: {metrics.lm_metrics.total_tokens}")
print(f"Execution time: {metrics.execution_time}s")

# Per-operator metrics
for op in metrics.operator_metrics:
    print(f"{op.name}: ${op.cost}, {op.latency}ms")

Production Pipeline Example

Complete pipeline using both OpenAI and Google Vertex models:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class DocumentMetadata(BaseModel):
    title: str
    category: Literal["technical", "business", "legal"]
    entities: List[str]

class ContentAnalysis(BaseModel):
    summary: str = Field(description="2-3 sentence summary")
    sentiment: Literal["positive", "neutral", "negative"]
    key_topics: List[str]

config = fc.SessionConfig(
    app_name="production_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "classifier": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=500,
                tpm=200_000
            ),
            "extractor": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash-lite",
                rpm=400,
                tpm=200_000
            ),
            "analyzer": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=300,
                tpm=150_000
            )
        },
        default_language_model="extractor"
    )
)

session = fc.Session.get_or_create(config)

# Load raw documents
df = session.read.docs("data/documents/*.txt", content_type="markdown")

# Multi-stage pipeline with strategic model selection
processed = (
    df
    # Quick classification with cheap model
    .with_column(
        "category",
        fc.semantic.classify(
            fc.col("content"),
            ["technical", "business", "legal"],
            model_alias="classifier"
        )
    )
    # Filter early to reduce downstream processing
    .filter(fc.col("category").isin(["technical", "business"]))
    # Extract structured metadata with fast model
    .with_column(
        "metadata",
        fc.semantic.extract(fc.col("content"), DocumentMetadata, model_alias="extractor")
    )
    .unnest("metadata")
    # Deep analysis with more capable model
    .with_column(
        "analysis",
        fc.semantic.extract(fc.col("content"), ContentAnalysis, model_alias="analyzer")
    )
    .unnest("analysis")
)

# Write results
processed.write.parquet("output/processed_documents.parquet")

# Review metrics
metrics = processed.collect().metrics()
print(f"Pipeline cost: ${metrics.lm_metrics.total_cost}")
print(f"Total documents processed: {processed.count()}")

Hybrid Provider Strategies

Route operations strategically based on model strengths:

python
# OpenAI excels at structured extraction
structured_data = df.with_column(
    "extracted",
    fc.semantic.extract(fc.col("text"), ComplexSchema, model_alias="mini")
)

# Vertex AI provides fast bulk processing
bulk_classified = df.with_column(
    "category",
    fc.semantic.classify(fc.col("text"), categories, model_alias="flash-lite")
)

# Use appropriate model for task complexity
simple_summaries = df.with_column(
    "summary",
    fc.semantic.map("Summarize: {{ text }}", text=fc.col("content"), model_alias="nano")
)

detailed_analysis = df.with_column(
    "analysis",
    fc.semantic.map(
        "Provide detailed analysis: {{ text }}",
        text=fc.col("content"),
        model_alias="mini"
    )
)

Error Handling and Resilience

Fenic handles provider-specific error patterns automatically:

  • OpenAI 429 quota exhausted: fails fast with clear errors
  • Vertex AI transient failures: automatic retry with exponential backoff
  • Token capacity violations: early validation before requests
  • Invalid responses: structured logging with row-level lineage

Track failures in production:

python
try:
    result = pipeline.collect()
except Exception as e:
    # Fenic provides detailed error context
    print(f"Pipeline failed: {e}")
    print(f"Failed at operator: {e.operator_name}")
    print(f"Row context: {e.row_data}")

Best Practices

Model Selection Strategy

  • Use OpenAI nano/mini for cost-sensitive classification tasks
  • Use Vertex Flash models for high-throughput bulk processing
  • Reserve powerful models (GPT-4o, Gemini Pro) for complex reasoning
  • Test with small datasets to validate costs before scaling

Configuration Management

  • Store configurations in version control
  • Use environment variables for API keys
  • Define model aliases that abstract provider details
  • Configure rate limits based on your API tier

Performance Optimization

  • Filter data early with cheap boolean operations
  • Use smaller models for initial classification
  • Cache intermediate results for iterative development
  • Monitor metrics to identify expensive operations

Production Deployment

  • Implement comprehensive error handling
  • Set up cost alerts based on token usage
  • Use row-level lineage for debugging
  • Test failover strategies between providers

Integration with Existing Infrastructure

Fenic integrates seamlessly with existing data infrastructure. Read more about:

Next Steps

Start with simple extraction or classification tasks using a single provider. Test with small datasets to validate results and costs. Scale gradually while monitoring metrics. Add multi-provider strategies as your pipeline grows in complexity.

The declarative DataFrame API abstracts provider-specific implementation details, enabling seamless migration between models as requirements change. This architectural approach provides the flexibility needed for production AI systems.

For complete examples and documentation, visit the Fenic GitHub repository or read the Fenic open source announcement. How to Integrate OpenAI and ... efcf080bbaad1e0fafa4bf605.md External Displaying How to Integrate OpenAI and Google Vertex Models i 2a6df41efcf080bbaad1e0fafa4bf605.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.