<< goback()

How to Integrate LLMs into Traditional Data Workflows Seamlessly

Typedef Team

How to Integrate LLMs into Traditional Data Workflows Seamlessly

The shift from traditional ETL pipelines to AI-native data processing introduces operational overhead that most teams struggle with. Rate limiting, prompt management, result validation, and cost tracking fragment workflows across custom scripts, inference services, and data warehouses. This guide demonstrates how to integrate LLMs into data pipelines using Fenic, an open-source DataFrame framework designed for this purpose.

The Core Challenge

Traditional data workflows follow predictable patterns: extract data from sources, transform it through SQL or DataFrame operations, and load it into storage systems. This approach breaks down when working with unstructured text, where the most valuable insights require model inference.

Teams typically build custom solutions that juggle:

  • Provider-specific API clients with different rate limits
  • Manual batch processing to stay within quotas
  • Separate validation logic for each inference step
  • Custom retry mechanisms for transient failures
  • Ad-hoc cost and latency tracking

Fenic solves this by treating semantic operations as first-class DataFrame primitives, letting you build AI pipelines with the same declarative patterns used in traditional data engineering.

Setting Up Your Environment

Install Fenic and configure a session with your model providers:

python
pip install fenic

Create a session with model configuration:

python
from fenic.api.session import Session, SessionConfig, SemanticConfig
from fenic.api.session.config import (
    OpenAILanguageModel,
    OpenAIEmbeddingModel,
    AnthropicLanguageModel,
    GoogleDeveloperLanguageModel
)

config = SessionConfig(
    app_name="data_pipeline",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "claude": AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100_000,
                output_tpm=100_000
            ),
            "gemini": GoogleDeveloperLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=100,
                tpm=1_000_000
            )
        },
        default_language_model="gemini",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=1_000_000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Fenic handles rate limiting, request batching, and retry logic automatically based on your configured limits.

Loading and Transforming Traditional Data

Start with standard DataFrame operations for structured data:

python
import fenic.api.functions as F
from fenic.api.functions import col

# Load from CSV
df = session.read.csv("s3://my-bucket/customer_feedback.csv")

# Traditional DataFrame operations
df = (df
    .filter(col("rating") < 3)
    .with_column("feedback_length", F.length(col("feedback_text")))
    .with_column("submission_month", F.dt.month(col("submission_date")))
)

Load data from multiple sources:

python
# Parquet files
orders_df = session.read.parquet("s3://warehouse/orders/*.parquet")

# HuggingFace datasets
dataset_df = session.read.csv("hf://datasets/squad/default/train.csv")

# Local directories with automatic metadata
docs_df = session.read.docs(
    "/data/support_tickets/",
    content_type="markdown",
    recursive=True
)

The session provides familiar DataFrame methods: filter, select, group_by, join, with_column, and standard aggregations.

Semantic Text Classification

Use semantic.predicate to filter data based on natural language conditions:

python
from fenic.api.functions import semantic

# Filter feedback mentioning UI problems
ui_issues = df.filter(
    semantic.predicate(
        "Does this feedback mention user interface or navigation problems? Feedback: {{text}}",
        text=col("feedback_text")
    )
)

# Multi-column semantic filtering
urgent_tickets = tickets_df.filter(
    semantic.predicate(
        """
        Subject: {{subject}}
        Description: {{description}}

        This ticket indicates an urgent issue requiring immediate attention.
        """,
        subject=col("ticket_subject"),
        description=col("ticket_description")
    )
)

The predicate function evaluates to boolean values, integrating with standard filter operations:

python
# Combine with traditional filters
critical_feedback = (df
    .filter(col("rating") == 1)
    .filter(
        semantic.predicate(
            "This feedback indicates a critical bug or system failure: {{text}}",
            text=col("feedback_text")
        )
    )
)

Structured Extraction from Unstructured Text

Extract structured data from text using Pydantic schemas:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class SupportIssue(BaseModel):
    category: Literal["bug", "feature_request", "question", "complaint"]
    severity: Literal["low", "medium", "high", "critical"]
    affected_features: List[str] = Field(description="Product features mentioned")
    sentiment: Literal["positive", "neutral", "negative"]

# Extract structured data
extracted_df = df.with_column(
    "parsed_issue",
    semantic.extract(col("feedback_text"), SupportIssue)
)

# Access nested fields
result = (extracted_df
    .with_column("category", col("parsed_issue")["category"])
    .with_column("severity", col("parsed_issue")["severity"])
    .with_column("sentiment", col("parsed_issue")["sentiment"])
)

For advanced nested structures:

python
class Entity(BaseModel):
    name: str = Field(description="Entity name")
    entity_type: str = Field(description="Type of entity")

class KnowledgeGraph(BaseModel):
    entities: List[Entity] = Field(description="Entities found in text")
    key_topics: List[str] = Field(description="Main topics discussed")
    actionable_items: List[str] = Field(description="Actions mentioned")

# Extract knowledge graph
knowledge_df = df.with_column(
    "kg",
    semantic.extract(col("document_text"), KnowledgeGraph)
)

# Explode entities into separate rows
entities_df = (knowledge_df
    .select(col("document_id"), col("kg")["entities"].alias("entity"))
    .explode("entity")
    .select(
        col("document_id"),
        col("entity")["name"].alias("entity_name"),
        col("entity")["entity_type"].alias("entity_type")
    )
)

Text Generation and Summarization

Generate new text based on existing columns:

python
# Generate product descriptions
products_df = products_df.with_column(
    "marketing_copy",
    semantic.map(
        """
        Product: {{name}}
        Features: {{features}}
        Target Audience: {{audience}}

        Write compelling marketing copy that highlights key benefits.
        """,
        name=col("product_name"),
        features=col("feature_list"),
        audience=col("target_demographic")
    )
)

# Multi-column summarization
summary_df = tickets_df.with_column(
    "executive_summary",
    semantic.map(
        """
        Ticket ID: {{id}}
        Customer Tier: {{tier}}
        Issue: {{description}}
        History: {{previous_interactions}}

        Provide a 2-sentence executive summary for management review.
        """,
        id=col("ticket_id"),
        tier=col("customer_tier"),
        description=col("full_description"),
        previous_interactions=col("interaction_history")
    )
)

Control model behavior with temperature and output tokens:

python
creative_content = df.with_column(
    "creative_description",
    semantic.map(
        "Transform this technical spec into creative marketing content: {{spec}}",
        spec=col("technical_specification"),
        temperature=0.7,
        max_output_tokens=200
    )
)

Semantic Joins Across Datasets

Join datasets based on semantic similarity rather than exact matches:

python
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Join job postings with candidate resumes
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="5 years Python, FastAPI, PostgreSQL, building scalable backend services",
    right="Senior Backend Engineer - Python expertise required",
    output=True
))
examples.create_example(JoinExample(
    left="Frontend developer with React and TypeScript",
    right="Senior Backend Engineer - Python expertise required",
    output=False
))

matched_candidates = jobs_df.semantic.join(
    other=resumes_df,
    predicate="""
    Job Requirements: {{left_on}}
    Candidate Background: {{right_on}}

    The candidate's experience closely matches the job requirements.
    """,
    left_on=col("job_description"),
    right_on=col("resume_text"),
    examples=examples
)

For similarity-based matching using embeddings:

python
# Find similar support tickets
similar_tickets = new_tickets_df.semantic.sim_join(
    historical_tickets_df,
    left_on=semantic.embed(col("ticket_description")),
    right_on=semantic.embed(col("historical_description")),
    k=3,
    similarity_metric="cosine",
    similarity_score_column="match_score"
)

Building Production Pipelines

Combine operations into complete pipelines:

python
# End-to-end customer feedback pipeline
pipeline_df = (
    session.read.csv("s3://data/feedback.csv")

    # Traditional filtering
    .filter(col("submission_date") >= "2024-01-01")
    .filter(col("feedback_text").isNotNull())

    # Extract structured information
    .with_column(
        "analysis",
        semantic.extract(col("feedback_text"), SupportIssue)
    )
    .with_column("category", col("analysis")["category"])
    .with_column("severity", col("analysis")["severity"])

    # Generate responses for high-priority items
    .with_column(
        "draft_response",
        F.when(
            col("severity").isin(["high", "critical"]),
            semantic.map(
                "Draft a professional response to this {{severity}} {{category}}: {{text}}",
                severity=col("severity"),
                category=col("category"),
                text=col("feedback_text")
            )
        ).otherwise(F.lit(None))
    )

    # Calculate metrics
    .with_column("processing_date", F.current_timestamp())
)

# Write results
pipeline_df.write.save_as_table("processed_feedback", mode="overwrite")

Track inference costs and performance:

python
# Query local metrics
metrics_df = session.table("fenic_system.query_metrics")

# Analyze costs by model
cost_analysis = session.sql("""
    SELECT
        model,
        COUNT(*) as request_count,
        SUM(total_lm_cost) as total_cost,
        AVG(total_lm_cost) as avg_cost_per_request,
        SUM(total_lm_requests) as total_model_calls
    FROM {metrics}
    WHERE CAST(end_ts AS TIMESTAMP) > CAST('2024-01-01' AS TIMESTAMP)
    GROUP BY model
    ORDER BY total_cost DESC
""", metrics=metrics_df)

cost_analysis.show()

Embedding-Based Operations

Generate embeddings for semantic search:

python
# Add embeddings to documents
docs_with_embeddings = docs_df.with_column(
    "doc_embedding",
    semantic.embed(col("document_text"))
)

# Cluster documents
clustered_docs = docs_with_embeddings.semantic.with_cluster_labels(
    by=col("doc_embedding"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

# Analyze clusters
cluster_stats = clustered_docs.group_by("cluster_id").agg(
    F.count("*").alias("doc_count"),
    F.collect_list("document_title").alias("sample_titles")
)

Calculate similarity between embeddings:

python
from fenic.api.functions import embedding

# Compare documents to a query
query_vector = [0.1, 0.2, ...]  # Your query embedding

results = docs_with_embeddings.with_column(
    "relevance_score",
    embedding.compute_similarity(
        col("doc_embedding"),
        query_vector,
        metric="cosine"
    )
).order_by(F.desc("relevance_score"))

Model Context Protocol Integration

Build agentic applications by exposing DataFrames as MCP tools:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam

# Register a dataset as a tool
df.write.save_as_table("customer_data", mode="overwrite")
session.catalog.set_table_description(
    "customer_data",
    "Customer records including purchase history and feedback"
)

# Create a custom tool
from fenic.api.functions import tool_param
from fenic.core.types import StringType, IntegerType

search_results_df = (
    session.table("customer_data")
    .filter(col("customer_tier") == tool_param("tier", StringType))
    .limit(tool_param("limit", IntegerType))
)

session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search customers by tier and return top results",
    tool_query=search_results_df,
    tool_params=[
        ToolParam(
            name="tier",
            description="Customer tier to filter by",
            allowed_values=["free", "pro", "enterprise"]
        ),
        ToolParam(
            name="limit",
            description="Maximum results to return",
            has_default=True,
            default_value=10
        )
    ],
    result_limit=50
)

# Launch MCP server
tools = session.catalog.list_tools()
server = create_mcp_server(
    session,
    "DataPipeline Server",
    user_defined_tools=tools,
    concurrency_limit=8
)
run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Agents can now query your data pipeline through the MCP protocol.

Asynchronous UDFs for External APIs

Integrate external services with async operations:

python
import fenic as fc
from fenic.core.types import IntegerType
import aiohttp

@fc.async_udf(
    return_type=IntegerType,
    max_concurrency=10,
    timeout_seconds=5,
    num_retries=2
)
async def enrich_with_external_api(customer_id: int) -> int:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.example.com/credit-score/{customer_id}"
        ) as response:
            data = await response.json()
            return data["score"]

# Apply to DataFrame
enriched_df = customers_df.select(
    col("customer_id"),
    col("name"),
    enrich_with_external_api(col("customer_id")).alias("credit_score")
)

This maintains row order and handles failures gracefully while maximizing throughput.

Practical Patterns for Production

Pattern 1: Batch Processing with Cost Control

python
# Process in batches with cost tracking
def process_batch(batch_df, model_alias):
    return (batch_df
        .with_column(
            "summary",
            semantic.map(
                "Summarize: {{text}}",
                text=col("content"),
                model_alias=model_alias,
                max_output_tokens=100
            )
        )
    )

# Monitor costs per batch
metrics_before = session.table("fenic_system.query_metrics")
result_df = process_batch(df, "gemini")  # Use cheaper model
metrics_after = session.table("fenic_system.query_metrics")

# Calculate batch cost
batch_cost = session.sql("""
    SELECT SUM(total_lm_cost) as batch_cost
    FROM {after}
    WHERE query_id NOT IN (SELECT query_id FROM {before})
""", after=metrics_after, before=metrics_before)

Pattern 2: Incremental Processing

python
from fenic.api.functions import current_timestamp

# Track processed records
def process_incremental(table_name, timestamp_col):
    # Get last processed timestamp
    last_processed = session.sql(f"""
        SELECT MAX(processed_at) as max_ts
        FROM {table_name}_processed
    """)

    # Process only new records
    new_records = (
        session.table(table_name)
        .filter(col(timestamp_col) > last_processed)
        .with_column("processed_at", current_timestamp())
    )

    # Apply semantic operations
    processed = new_records.with_column(
        "classification",
        semantic.predicate("Is this urgent? {{text}}", text=col("content"))
    )

    # Append to processed table
    processed.write.save_as_table(
        f"{table_name}_processed",
        mode="append"
    )

Pattern 3: Multi-Model Routing

python
# Route to appropriate model based on complexity
def smart_routing(df):
    # Use fast model for simple cases
    simple_cases = (
        df.filter(F.length(col("text")) < 500)
        .with_column(
            "result",
            semantic.map("Classify: {{text}}", text=col("text"), model_alias="gemini")
        )
    )

    # Use powerful model for complex cases
    complex_cases = (
        df.filter(F.length(col("text")) >= 500)
        .with_column(
            "result",
            semantic.map("Classify: {{text}}", text=col("text"), model_alias="gpt4")
        )
    )

    return simple_cases.union(complex_cases)

Performance Considerations

Fenic optimizes inference operations automatically:

Batch Processing: Semantic operations batch rows together for efficient API usage.

Automatic Retry: Transient failures retry with exponential backoff without manual intervention.

Rate Limiting: Configured rate limits prevent quota exhaustion across all operations.

Concurrent Execution: Async operations maximize throughput while respecting concurrency limits.

Query Optimization: The query planner optimizes DataFrame operations before execution.

Monitor performance using the metrics table:

python
# Identify slow operations
slow_queries = session.sql("""
    SELECT
        query_id,
        CAST((julianday(end_ts) - julianday(start_ts)) * 86400 AS INTEGER) as duration_seconds,
        total_lm_requests,
        total_lm_cost
    FROM {metrics}
    WHERE duration_seconds > 30
    ORDER BY duration_seconds DESC
    LIMIT 10
""", metrics=session.table("fenic_system.query_metrics"))

Best Practices

1. Configure rate limits accurately: Set RPM and TPM based on your provider tier to prevent throttling.

2. Use appropriate models: Route simple tasks to fast, cheap models and advanced tasks to capable models.

3. Validate extraction schemas: Include clear field descriptions in Pydantic models for better extraction quality.

4. Monitor costs continuously: Query the metrics table regularly to track spending patterns.

5. Handle null values explicitly: Set strict=True in semantic operations to skip rows with null inputs.

6. Batch operations when possible: Combine multiple semantic steps in a single pipeline for better performance.

7. Save intermediate results: Write DataFrames to tables at key pipeline stages for debugging and recovery.

python
# Save checkpoints
(df
    .filter(col("status") == "pending")
    .write.save_as_table("checkpoint_filtered", mode="overwrite")
)

processed = (
    session.table("checkpoint_filtered")
    .with_column("result", semantic.extract(col("text"), Schema))
)

Development to Production

Develop locally and deploy to cloud without code changes:

python
# Local development
local_config = SessionConfig(
    app_name="my_pipeline",
    semantic=SemanticConfig(
        language_models={"gpt4": OpenAILanguageModel(...)}
    )
)

# Production with cloud execution
production_config = SessionConfig(
    app_name="my_pipeline",
    semantic=SemanticConfig(
        language_models={"gpt4": OpenAILanguageModel(...)}
    ),
    cloud=CloudConfig(size=CloudExecutorSize.MEDIUM)
)

The same pipeline code runs in both environments. Learn more about Typedef cloud for production deployment.

Additional Resources

Fenic provides the infrastructure layer for integrating LLMs into data workflows without operational overhead. By treating semantic operations as DataFrame primitives, teams can build maintainable AI pipelines using familiar data engineering patterns.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.