The shift from traditional ETL pipelines to AI-native data processing introduces operational overhead that most teams struggle with. Rate limiting, prompt management, result validation, and cost tracking fragment workflows across custom scripts, inference services, and data warehouses. This guide demonstrates how to integrate LLMs into data pipelines using Fenic, an open-source DataFrame framework designed for this purpose.
The Core Challenge
Traditional data workflows follow predictable patterns: extract data from sources, transform it through SQL or DataFrame operations, and load it into storage systems. This approach breaks down when working with unstructured text, where the most valuable insights require model inference.
Teams typically build custom solutions that juggle:
- Provider-specific API clients with different rate limits
- Manual batch processing to stay within quotas
- Separate validation logic for each inference step
- Custom retry mechanisms for transient failures
- Ad-hoc cost and latency tracking
Fenic solves this by treating semantic operations as first-class DataFrame primitives, letting you build AI pipelines with the same declarative patterns used in traditional data engineering.
Setting Up Your Environment
Install Fenic and configure a session with your model providers:
pythonpip install fenic
Create a session with model configuration:
pythonfrom fenic.api.session import Session, SessionConfig, SemanticConfig from fenic.api.session.config import ( OpenAILanguageModel, OpenAIEmbeddingModel, AnthropicLanguageModel, GoogleDeveloperLanguageModel ) config = SessionConfig( app_name="data_pipeline", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "claude": AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100_000, output_tpm=100_000 ), "gemini": GoogleDeveloperLanguageModel( model_name="gemini-2.0-flash", rpm=100, tpm=1_000_000 ) }, default_language_model="gemini", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=1_000_000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Fenic handles rate limiting, request batching, and retry logic automatically based on your configured limits.
Loading and Transforming Traditional Data
Start with standard DataFrame operations for structured data:
pythonimport fenic.api.functions as F from fenic.api.functions import col # Load from CSV df = session.read.csv("s3://my-bucket/customer_feedback.csv") # Traditional DataFrame operations df = (df .filter(col("rating") < 3) .with_column("feedback_length", F.length(col("feedback_text"))) .with_column("submission_month", F.dt.month(col("submission_date"))) )
Load data from multiple sources:
python# Parquet files orders_df = session.read.parquet("s3://warehouse/orders/*.parquet") # HuggingFace datasets dataset_df = session.read.csv("hf://datasets/squad/default/train.csv") # Local directories with automatic metadata docs_df = session.read.docs( "/data/support_tickets/", content_type="markdown", recursive=True )
The session provides familiar DataFrame methods: filter, select, group_by, join, with_column, and standard aggregations.
Semantic Text Classification
Use semantic.predicate to filter data based on natural language conditions:
pythonfrom fenic.api.functions import semantic # Filter feedback mentioning UI problems ui_issues = df.filter( semantic.predicate( "Does this feedback mention user interface or navigation problems? Feedback: {{text}}", text=col("feedback_text") ) ) # Multi-column semantic filtering urgent_tickets = tickets_df.filter( semantic.predicate( """ Subject: {{subject}} Description: {{description}} This ticket indicates an urgent issue requiring immediate attention. """, subject=col("ticket_subject"), description=col("ticket_description") ) )
The predicate function evaluates to boolean values, integrating with standard filter operations:
python# Combine with traditional filters critical_feedback = (df .filter(col("rating") == 1) .filter( semantic.predicate( "This feedback indicates a critical bug or system failure: {{text}}", text=col("feedback_text") ) ) )
Structured Extraction from Unstructured Text
Extract structured data from text using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class SupportIssue(BaseModel): category: Literal["bug", "feature_request", "question", "complaint"] severity: Literal["low", "medium", "high", "critical"] affected_features: List[str] = Field(description="Product features mentioned") sentiment: Literal["positive", "neutral", "negative"] # Extract structured data extracted_df = df.with_column( "parsed_issue", semantic.extract(col("feedback_text"), SupportIssue) ) # Access nested fields result = (extracted_df .with_column("category", col("parsed_issue")["category"]) .with_column("severity", col("parsed_issue")["severity"]) .with_column("sentiment", col("parsed_issue")["sentiment"]) )
For advanced nested structures:
pythonclass Entity(BaseModel): name: str = Field(description="Entity name") entity_type: str = Field(description="Type of entity") class KnowledgeGraph(BaseModel): entities: List[Entity] = Field(description="Entities found in text") key_topics: List[str] = Field(description="Main topics discussed") actionable_items: List[str] = Field(description="Actions mentioned") # Extract knowledge graph knowledge_df = df.with_column( "kg", semantic.extract(col("document_text"), KnowledgeGraph) ) # Explode entities into separate rows entities_df = (knowledge_df .select(col("document_id"), col("kg")["entities"].alias("entity")) .explode("entity") .select( col("document_id"), col("entity")["name"].alias("entity_name"), col("entity")["entity_type"].alias("entity_type") ) )
Text Generation and Summarization
Generate new text based on existing columns:
python# Generate product descriptions products_df = products_df.with_column( "marketing_copy", semantic.map( """ Product: {{name}} Features: {{features}} Target Audience: {{audience}} Write compelling marketing copy that highlights key benefits. """, name=col("product_name"), features=col("feature_list"), audience=col("target_demographic") ) ) # Multi-column summarization summary_df = tickets_df.with_column( "executive_summary", semantic.map( """ Ticket ID: {{id}} Customer Tier: {{tier}} Issue: {{description}} History: {{previous_interactions}} Provide a 2-sentence executive summary for management review. """, id=col("ticket_id"), tier=col("customer_tier"), description=col("full_description"), previous_interactions=col("interaction_history") ) )
Control model behavior with temperature and output tokens:
pythoncreative_content = df.with_column( "creative_description", semantic.map( "Transform this technical spec into creative marketing content: {{spec}}", spec=col("technical_specification"), temperature=0.7, max_output_tokens=200 ) )
Semantic Joins Across Datasets
Join datasets based on semantic similarity rather than exact matches:
pythonfrom fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Join job postings with candidate resumes examples = JoinExampleCollection() examples.create_example(JoinExample( left="5 years Python, FastAPI, PostgreSQL, building scalable backend services", right="Senior Backend Engineer - Python expertise required", output=True )) examples.create_example(JoinExample( left="Frontend developer with React and TypeScript", right="Senior Backend Engineer - Python expertise required", output=False )) matched_candidates = jobs_df.semantic.join( other=resumes_df, predicate=""" Job Requirements: {{left_on}} Candidate Background: {{right_on}} The candidate's experience closely matches the job requirements. """, left_on=col("job_description"), right_on=col("resume_text"), examples=examples )
For similarity-based matching using embeddings:
python# Find similar support tickets similar_tickets = new_tickets_df.semantic.sim_join( historical_tickets_df, left_on=semantic.embed(col("ticket_description")), right_on=semantic.embed(col("historical_description")), k=3, similarity_metric="cosine", similarity_score_column="match_score" )
Building Production Pipelines
Combine operations into complete pipelines:
python# End-to-end customer feedback pipeline pipeline_df = ( session.read.csv("s3://data/feedback.csv") # Traditional filtering .filter(col("submission_date") >= "2024-01-01") .filter(col("feedback_text").isNotNull()) # Extract structured information .with_column( "analysis", semantic.extract(col("feedback_text"), SupportIssue) ) .with_column("category", col("analysis")["category"]) .with_column("severity", col("analysis")["severity"]) # Generate responses for high-priority items .with_column( "draft_response", F.when( col("severity").isin(["high", "critical"]), semantic.map( "Draft a professional response to this {{severity}} {{category}}: {{text}}", severity=col("severity"), category=col("category"), text=col("feedback_text") ) ).otherwise(F.lit(None)) ) # Calculate metrics .with_column("processing_date", F.current_timestamp()) ) # Write results pipeline_df.write.save_as_table("processed_feedback", mode="overwrite")
Track inference costs and performance:
python# Query local metrics metrics_df = session.table("fenic_system.query_metrics") # Analyze costs by model cost_analysis = session.sql(""" SELECT model, COUNT(*) as request_count, SUM(total_lm_cost) as total_cost, AVG(total_lm_cost) as avg_cost_per_request, SUM(total_lm_requests) as total_model_calls FROM {metrics} WHERE CAST(end_ts AS TIMESTAMP) > CAST('2024-01-01' AS TIMESTAMP) GROUP BY model ORDER BY total_cost DESC """, metrics=metrics_df) cost_analysis.show()
Embedding-Based Operations
Generate embeddings for semantic search:
python# Add embeddings to documents docs_with_embeddings = docs_df.with_column( "doc_embedding", semantic.embed(col("document_text")) ) # Cluster documents clustered_docs = docs_with_embeddings.semantic.with_cluster_labels( by=col("doc_embedding"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" ) # Analyze clusters cluster_stats = clustered_docs.group_by("cluster_id").agg( F.count("*").alias("doc_count"), F.collect_list("document_title").alias("sample_titles") )
Calculate similarity between embeddings:
pythonfrom fenic.api.functions import embedding # Compare documents to a query query_vector = [0.1, 0.2, ...] # Your query embedding results = docs_with_embeddings.with_column( "relevance_score", embedding.compute_similarity( col("doc_embedding"), query_vector, metric="cosine" ) ).order_by(F.desc("relevance_score"))
Model Context Protocol Integration
Build agentic applications by exposing DataFrames as MCP tools:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam # Register a dataset as a tool df.write.save_as_table("customer_data", mode="overwrite") session.catalog.set_table_description( "customer_data", "Customer records including purchase history and feedback" ) # Create a custom tool from fenic.api.functions import tool_param from fenic.core.types import StringType, IntegerType search_results_df = ( session.table("customer_data") .filter(col("customer_tier") == tool_param("tier", StringType)) .limit(tool_param("limit", IntegerType)) ) session.catalog.create_tool( tool_name="search_customers", tool_description="Search customers by tier and return top results", tool_query=search_results_df, tool_params=[ ToolParam( name="tier", description="Customer tier to filter by", allowed_values=["free", "pro", "enterprise"] ), ToolParam( name="limit", description="Maximum results to return", has_default=True, default_value=10 ) ], result_limit=50 ) # Launch MCP server tools = session.catalog.list_tools() server = create_mcp_server( session, "DataPipeline Server", user_defined_tools=tools, concurrency_limit=8 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Agents can now query your data pipeline through the MCP protocol.
Asynchronous UDFs for External APIs
Integrate external services with async operations:
pythonimport fenic as fc from fenic.core.types import IntegerType import aiohttp @fc.async_udf( return_type=IntegerType, max_concurrency=10, timeout_seconds=5, num_retries=2 ) async def enrich_with_external_api(customer_id: int) -> int: async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/credit-score/{customer_id}" ) as response: data = await response.json() return data["score"] # Apply to DataFrame enriched_df = customers_df.select( col("customer_id"), col("name"), enrich_with_external_api(col("customer_id")).alias("credit_score") )
This maintains row order and handles failures gracefully while maximizing throughput.
Practical Patterns for Production
Pattern 1: Batch Processing with Cost Control
python# Process in batches with cost tracking def process_batch(batch_df, model_alias): return (batch_df .with_column( "summary", semantic.map( "Summarize: {{text}}", text=col("content"), model_alias=model_alias, max_output_tokens=100 ) ) ) # Monitor costs per batch metrics_before = session.table("fenic_system.query_metrics") result_df = process_batch(df, "gemini") # Use cheaper model metrics_after = session.table("fenic_system.query_metrics") # Calculate batch cost batch_cost = session.sql(""" SELECT SUM(total_lm_cost) as batch_cost FROM {after} WHERE query_id NOT IN (SELECT query_id FROM {before}) """, after=metrics_after, before=metrics_before)
Pattern 2: Incremental Processing
pythonfrom fenic.api.functions import current_timestamp # Track processed records def process_incremental(table_name, timestamp_col): # Get last processed timestamp last_processed = session.sql(f""" SELECT MAX(processed_at) as max_ts FROM {table_name}_processed """) # Process only new records new_records = ( session.table(table_name) .filter(col(timestamp_col) > last_processed) .with_column("processed_at", current_timestamp()) ) # Apply semantic operations processed = new_records.with_column( "classification", semantic.predicate("Is this urgent? {{text}}", text=col("content")) ) # Append to processed table processed.write.save_as_table( f"{table_name}_processed", mode="append" )
Pattern 3: Multi-Model Routing
python# Route to appropriate model based on complexity def smart_routing(df): # Use fast model for simple cases simple_cases = ( df.filter(F.length(col("text")) < 500) .with_column( "result", semantic.map("Classify: {{text}}", text=col("text"), model_alias="gemini") ) ) # Use powerful model for complex cases complex_cases = ( df.filter(F.length(col("text")) >= 500) .with_column( "result", semantic.map("Classify: {{text}}", text=col("text"), model_alias="gpt4") ) ) return simple_cases.union(complex_cases)
Performance Considerations
Fenic optimizes inference operations automatically:
Batch Processing: Semantic operations batch rows together for efficient API usage.
Automatic Retry: Transient failures retry with exponential backoff without manual intervention.
Rate Limiting: Configured rate limits prevent quota exhaustion across all operations.
Concurrent Execution: Async operations maximize throughput while respecting concurrency limits.
Query Optimization: The query planner optimizes DataFrame operations before execution.
Monitor performance using the metrics table:
python# Identify slow operations slow_queries = session.sql(""" SELECT query_id, CAST((julianday(end_ts) - julianday(start_ts)) * 86400 AS INTEGER) as duration_seconds, total_lm_requests, total_lm_cost FROM {metrics} WHERE duration_seconds > 30 ORDER BY duration_seconds DESC LIMIT 10 """, metrics=session.table("fenic_system.query_metrics"))
Best Practices
1. Configure rate limits accurately: Set RPM and TPM based on your provider tier to prevent throttling.
2. Use appropriate models: Route simple tasks to fast, cheap models and advanced tasks to capable models.
3. Validate extraction schemas: Include clear field descriptions in Pydantic models for better extraction quality.
4. Monitor costs continuously: Query the metrics table regularly to track spending patterns.
5. Handle null values explicitly: Set strict=True in semantic operations to skip rows with null inputs.
6. Batch operations when possible: Combine multiple semantic steps in a single pipeline for better performance.
7. Save intermediate results: Write DataFrames to tables at key pipeline stages for debugging and recovery.
python# Save checkpoints (df .filter(col("status") == "pending") .write.save_as_table("checkpoint_filtered", mode="overwrite") ) processed = ( session.table("checkpoint_filtered") .with_column("result", semantic.extract(col("text"), Schema)) )
Development to Production
Develop locally and deploy to cloud without code changes:
python# Local development local_config = SessionConfig( app_name="my_pipeline", semantic=SemanticConfig( language_models={"gpt4": OpenAILanguageModel(...)} ) ) # Production with cloud execution production_config = SessionConfig( app_name="my_pipeline", semantic=SemanticConfig( language_models={"gpt4": OpenAILanguageModel(...)} ), cloud=CloudConfig(size=CloudExecutorSize.MEDIUM) )
The same pipeline code runs in both environments. Learn more about Typedef cloud for production deployment.
Additional Resources
- Fenic GitHub Repository - Source code and examples
- Latest Release Notes - Feature updates and improvements
- Fenic Introduction - Project philosophy and vision
- Typedef Platform - Cloud infrastructure for production workloads
Fenic provides the infrastructure layer for integrating LLMs into data workflows without operational overhead. By treating semantic operations as DataFrame primitives, teams can build maintainable AI pipelines using familiar data engineering patterns.

