Unstructured data holds the most valuable insights for modern applications—PDFs, audio recordings, support tickets, logs, and documents contain information that drives business decisions. Yet extracting meaningful structure from this data has traditionally required brittle regular expressions, custom preprocessing scripts, and fragile LLM integrations that break when formats shift or prompts drift.
Fenic solves this by treating semantic operations as first-class DataFrame primitives. If you know PySpark or Pandas, you already know how to structure unstructured text with Fenic. The framework provides familiar DataFrame operations enhanced with semantic intelligence, eliminating the glue code and microservices that make AI pipelines operational nightmares.
The Unstructured Data Challenge
Traditional ETL pipelines handle structured data well—extract from sources, load into warehouses, transform with SQL, expose through dashboards. This playbook fails with unstructured text because:
Processing requires multiple specialized models. OCR for PDFs, transcription for audio, computer vision for images. Each introduces failure modes, latency, and cost.
Turning text into structured output is operationally complex. LLM pipelines quickly become fragile when managing rate limits across providers, chunking documents for context windows, balancing expensive accurate models against cheaper unreliable ones, and constantly moving data between custom scripts, warehouses, and inference infrastructure.
The impedance mismatch creates chaos. Legacy data platforms treat LLM calls as external black-box functions that query optimizers cannot inspect or optimize. This makes pipelines brittle and expensive at scale.
Typedef's Fenic framework addresses these issues with an inference-first architecture where semantic operations are embedded directly into the query engine as first-class citizens.
DataFrames as the Abstraction for AI Pipelines
The insight behind Fenic is straightforward: agentic workflows and AI applications are pipelines. They take inputs, reason over context, generate outputs, and log results. This isn't fundamentally different from traditional data workflows.
DataFrames bring structure and determinism to probabilistic systems. Even with stochastic inference—LLMs, OCR, transcription—DataFrames provide:
Lineage. Every column and row has traceable origins, even from model output. When extractions fail or produce unexpected results, you can trace back through the pipeline to identify where issues originated.
Columnar consistency. Whether dealing with summaries, embeddings, or classification scores, columns stay structured and meaningful throughout transformations.
Deterministic transformations. Inference calls wrapped in declarative logic follow the pattern: model + prompt + input → output. This enables caching, versioning, and debugging that's impossible with imperative LLM code scattered across microservices.
Fenic provides a declarative DataFrame API for automatic optimization and auditability. Since it's Python, you can write any imperative code needed for dynamic behavior, custom logic, or tight integration with your existing stack.
Setting Up Fenic for Semantic Operations
Installation requires Python 3.10, 3.11, or 3.12:
bashpip install fenic
Configure at least one LLM provider with environment variables:
bashexport OPENAI_API_KEY="your-openai-api-key" export ANTHROPIC_API_KEY="your-anthropic-api-key" export GOOGLE_API_KEY="your-google-api-key"
Initialize a session with semantic configuration:
pythonimport fenic as fc config = fc.SessionConfig( app_name="my_app", semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 ), "flash": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=300, tpm=150_000 ), }, default_language_model="flash", ), ) session = fc.Session.get_or_create(config)
This configuration defines model aliases that abstract provider-specific details. Using aliases like "nano" and "flash" makes it trivial to swap models or providers without changing pipeline code—critical for production systems that need to optimize cost and performance dynamically.
Rate limiting parameters (rpm and tpm) prevent throttling by your provider. Fenic automatically batches requests, implements retry logic, and self-throttles to stay within limits while maximizing throughput with async I/O and concurrent request batching.
Core Semantic Operators for Structuring Text
Fenic provides nine semantic operators as first-class DataFrame primitives. Each operates on unstructured text and returns structured output that integrates seamlessly with traditional DataFrame operations.
Schema-Driven Extraction with semantic.extract
The semantic.extract operator transforms unstructured text into structured data using Pydantic schemas. This provides type-safe extraction where you define your desired output structure once and get validated results consistently.
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class Issue(BaseModel): category: Literal["bug", "feature_request", "question"] severity: Literal["low", "medium", "high", "critical"] description: str class Ticket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] region: Literal["us", "eu", "apac"] issues: List[Issue] tickets = ( df .with_column("extracted", fc.semantic.extract("raw_ticket", Ticket)) .unnest("extracted") .filter(fc.col("region") == "apac") .explode("issues") ) bugs = tickets.filter(fc.col("issues").category == "bug")
The schema guides LLM extraction with clear structure and constraints. Pydantic's Field descriptions provide additional context for better extraction accuracy:
pythonclass SegmentSchema(BaseModel): speaker: str = Field(description="Who is talking in this segment") start_time: float = Field(description="Start time (seconds)") end_time: float = Field(description="End time (seconds)") key_points: list[str] = Field(description="Bullet points for this segment")
After extraction, the data becomes structured and you can filter, aggregate, and transform using standard DataFrame operations. The unnest operation flattens nested Pydantic models into columns, while explode expands lists into separate rows.
This pattern eliminates the extract-then-parse-JSON-then-validate dance common in traditional LLM pipelines. Fenic handles schema validation, error handling, and retries automatically, with row-level lineage for debugging when extractions fail.
Natural Language Filtering with semantic.predicate
Semantic predicates enable filtering with natural language conditions instead of complex boolean logic:
pythonapplicants = df.filter( (fc.col("yoe") > 5) & fc.semantic.predicate( "Has MCP Protocol experience? Resume: {{resume}}", resume=fc.col("resume"), ) )
This combines traditional boolean logic with semantic understanding. The first condition uses standard column comparison, while the semantic predicate evaluates natural language criteria against unstructured text. The query engine optimizes both together—potentially filtering on the cheap boolean condition first before invoking the expensive LLM predicate.
Predicates accept template variables using Jinja syntax with double curly braces. The templating is Rust-powered as of version 0.3.0, providing dynamic, data-aware prompts with loops, conditionals, and arrays:
pythonfc.semantic.predicate( """ Does this feedback mention {{ search_term }}? {% if priority == "high" %} Only return true if it's a critical issue. {% endif %} Feedback: {{ feedback_text }} """, search_term=fc.lit("UI problems"), priority=fc.col("priority"), feedback_text=fc.col("raw_feedback") )
The template is evaluated per row, allowing row-specific logic in your prompts. This enables sophisticated filtering that adapts to each record's characteristics while maintaining the declarative DataFrame abstraction.
Meaning-Based Joins with semantic.join
Traditional joins match on exact values or use fuzzy string similarity. Semantic joins determine matches based on meaning:
pythonprompt = """ Is this candidate a good fit for the job? Candidate Background: {{left_on}} Job Requirements: {{right_on}} Use the following criteria to make your decision: - Technical skills alignment - Experience level appropriateness - Domain knowledge overlap """ joined = ( applicants.semantic.join( other=jobs, predicate=prompt, left_on=fc.col("resume"), right_on=fc.col("job_description"), ) .order_by("application_date") .limit(5) )
The predicate receives both left and right row data as context, enabling sophisticated matching logic that considers multiple factors. Unlike fuzzy string matching that measures character similarity, semantic joins understand domain-specific criteria and make nuanced decisions.
Fenic optimizes semantic joins by batching LLM calls across candidate pairs, caching decisions for repeated comparisons, and potentially using embeddings for initial filtering before applying the expensive LLM predicate to top candidates.
This pattern works exceptionally well for:
- Matching documents to queries in RAG systems
- Linking related records across databases
- Finding similar but not identical content
- Deduplication based on semantic similarity rather than string distance
Content Transformation with semantic.map
The semantic.map operator applies natural language transformations to data:
pythonfinal = ( processed .select( "*", fc.semantic.map( "Summarize this segment in 2 sentences:\n{{chunk}}", chunk=fc.col("chunk"), model_alias="mini" ).alias("segment_summary") ) )
This enables text generation, rewriting, translation, and summarization with simple natural language instructions. The operator handles batching and error handling automatically while maintaining the declarative DataFrame API.
Classification with semantic.classify
Categorize text with few-shot examples without training custom models:
pythondf = df.with_column( "category", fc.semantic.classify( fc.col("feedback_text"), classes=["bug_report", "feature_request", "praise", "complaint"], model_alias="nano" ) )
The operator provides consistent category assignments across datasets using LLM understanding rather than keyword matching. Few-shot examples can be provided to guide classification for domain-specific categories.
Semantic Aggregation with semantic.reduce
Aggregate grouped data with LLM operations instead of just counting or averaging:
pythonfinal = ( processed .group_by(fc.col("segment.speaker")) .agg( fc.semantic.reduce( "Combine these summaries into one clear paragraph", fc.col("segment_summary"), model_alias="mini" ).alias("speaker_summary") ) )
This enables semantic summarization across groups, creating coherent aggregations of unstructured text that maintain meaning and context.
Working with AI-Native Data Types
Fenic goes beyond standard data types with first-class support for AI-native formats: MarkdownType, TranscriptType, JSONType, HTMLType, and EmbeddingType. These aren't just metadata tags—they unlock specialized operations.
Processing Markdown with MarkdownType
pythondf = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("raw_blog", header_level=2) ) .with_column("title", fc.json.jq("raw_blog", ".title")) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks").content) ) )
The markdown.extract_header_chunks function leverages document structure—sections, paragraphs, headings—for semantically meaningful chunks instead of naive character-count splitting. This dramatically improves RAG quality by preserving context boundaries and avoiding splits mid-sentence.
Handling Transcripts with TranscriptType
TranscriptType handles SRT, WebVTT, and generic transcript formats with native understanding of speakers and timestamps:
pythontranscript_text = Path("data/transcript.json").read_text() df = session.create_dataframe({"transcript": [transcript_text]}) processed = ( df.select( "*", fc.text.recursive_token_chunk( "transcript", chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks"), ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract( "chunk", SegmentSchema, model_alias="mini" ).alias("segment"), ) )
Fenic preserves speaker identity and timestamps through transformations, enabling speaker-aware analysis without manual parsing. You can aggregate by speaker, analyze conversation flows, or extract speaker-specific insights.
Nested JSON Manipulation
JSONType supports JQ expressions for elegant nested data manipulation:
python.with_column("author", fc.json.jq("metadata", ".author.name")) .with_column("tags", fc.json.jq("metadata", ".tags[]"))
This eliminates verbose Python dictionary navigation code and handles missing keys gracefully.
Building Complete Transformation Pipelines
Real production pipelines combine multiple semantic operators with traditional DataFrame operations. Here's a complete podcast processing pipeline:
pythonfrom pathlib import Path from pydantic import BaseModel, Field class SegmentSchema(BaseModel): speaker: str = Field(description="Who is talking in this segment") start_time: float = Field(description="Start time (seconds)") end_time: float = Field(description="End time (seconds)") key_points: list[str] = Field(description="Bullet points for this segment") class EpisodeSummary(BaseModel): title: str guests: list[str] main_topics: list[str] actionable_insights: list[str] # Initialize session with model alias config = fc.SessionConfig( app_name="podcast_analysis", semantic=fc.SemanticConfig( language_models={ "mini": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ) }, default_language_model="mini" ), ) session = fc.Session.get_or_create(config) # Load raw data data_dir = Path("data") transcript_text = (data_dir / "transcript.json").read_text() meta_text = (data_dir / "meta.json").read_text() df = session.create_dataframe({"meta": [meta_text], "transcript": [transcript_text]}) # Extract metadata and segment transcript processed = ( df.select( "*", fc.semantic.extract("meta", EpisodeSummary, model_alias="mini").alias("episode"), fc.text.recursive_token_chunk("transcript", chunk_size=1200, chunk_overlap_percentage=0).alias("chunks"), ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract("chunk", SegmentSchema, model_alias="mini").alias("segment"), ) ) # Create abstracts per segment and aggregate by speaker final = ( processed .select( "*", fc.semantic.map( "Summarize this segment in 2 sentences:\n{{chunk}}", chunk=fc.col("chunk"), model_alias="mini" ).alias("segment_summary") ) .group_by(fc.col("segment.speaker")) .agg( fc.semantic.reduce( "Combine these summaries into one clear paragraph", fc.col("segment_summary"), model_alias="mini" ).alias("speaker_summary") ) ) final.show(truncate=120) final.write.parquet("podcast_summaries.parquet") session.stop()
This pipeline demonstrates six key composability patterns:
- Schema-driven extraction - Pydantic models define output structure for consistent parsing
- Intelligent chunking - Semantic-aware text splitting respects structure and context
- Explode for row multiplication - Transform single transcript into multiple segment rows
- Nested structure access - Reference nested fields like
segment.speakernaturally - Semantic aggregation - Group data and apply LLM operations across groups
- Mixed operations - Combine semantic and traditional DataFrame operations in one pipeline
The pipeline reads raw text, extracts structure, transforms content, aggregates semantically, and writes results—all declaratively expressed with automatic optimization, batching, and error handling.
Production Considerations for Reliable Pipelines
Production AI systems require reliability, observability, and cost management. Fenic provides infrastructure built for production from day one.
Automatic Batching and Optimization
Fenic automatically optimizes inference operations through batch grouping that minimizes latency and costs. The framework intelligently batches requests based on provider limits and throughput requirements, maximizing throughput with async I/O and concurrent request batching.
The declarative API enables automatic optimization that imperative code cannot achieve. When operations are expressed declaratively, the query engine sees the entire pipeline and optimizes globally:
- Reorder operations to minimize expensive LLM calls
- Push filters down to reduce data volume before inference
- Batch LLM calls across rows efficiently
- Cache repeated operations automatically
- Estimate costs before execution
Cost Tracking and Observability
Built-in token counting and cost tracking provide first-class observability into LLM operations:
pythonresult = query.collect() metrics = result.metrics() print(f"Total tokens: {metrics.lm_metrics.total_tokens}") print(f"Total cost: ${metrics.lm_metrics.total_cost}") print(f"Execution time: {metrics.execution_time}s")
Operator-level metrics show where time and money are spent, enabling targeted optimization. The declarative model plus rich metrics creates a feedback loop for optimization that's impossible with imperative LLM code scattered across microservices.
Intelligent Caching
Cache expensive operations explicitly to reduce redundant API calls:
pythondf_cached = df.filter(...).semantic.extract(...).cache() # Subsequent operations use cached results result1 = df_cached.filter(condition1).collect() result2 = df_cached.filter(condition2).collect()
The engine also caches identical inference calls automatically within a session, preventing redundant API calls when the same prompt with the same input appears multiple times.
Rate Limiting and Self-Throttling
Fenic automatically respects provider rate limits with configured rpm and tpm. The engine tracks token usage in real-time and self-throttles when approaching limits. Built-in retry logic handles transient failures automatically.
Separation of Batch Preprocessing and Real-Time Agents
Fenic excels at heavy lifting in batch pipelines that prepare clean, structured data for real-time agents:
python# Batch preprocessing pipeline (runs offline) enriched_data = ( raw_documents .with_column("raw_md", fc.col("content").cast(fc.MarkdownType)) .with_column("chunks", fc.markdown.extract_header_chunks("raw_md", header_level=2)) .explode("chunks") .with_column("embedding", fc.semantic.embed(fc.col("chunks").content)) .with_column( "metadata", fc.semantic.extract("chunks", DocumentMetadata, model_alias="cheap") ) ) enriched_data.write.parquet("s3://my-bucket/enriched/")
Agents then query the enriched data without expensive inference at request time. This architecture provides more predictable and responsive agents with better resource utilization and cleaner separation between planning and execution.
Real-World Use Cases and Implementation Patterns
Log Clustering and Triage
Building a log triage agent with Fenic turns raw logs into severity-aware clusters. The pipeline:
- Parses without brittleness - Define templates for common log formats, unnest into candidate columns, coalesce to canonical schema
- Fingerprints for stability - Create grouping keys that ignore volatile tokens (IDs, ports) but preserve the cause
- Tags severity deterministically - Separate signal from noise with cheap rules before clustering
- Clusters semantically per severity - Embed enriched text and cluster within severity buckets
- Publishes read-only tools - Expose MCP tools that agents can query for triage decisions
The result: engineers ask natural language questions like "top clusters above WARN in the last hour" or "only ERRORs for payment-api" and get immediate, actionable answers.
Customer Support Ticket Processing
Typedef cut RudderStack's triage time by 95% by structuring unstructured support tickets. The pipeline:
- Ingests support ticket threads and sales call transcripts from the warehouse
- Builds a semantic context model that infers and enriches product taxonomy
- Extracts structured data from tickets using schema-driven extraction
- Maps tickets to taxonomy and creates semantic links with citations
- Exposes tools for agents to classify, surface related tickets, and propose decisions
PMs now approve triage decisions in one pass with 90% first-pass category acceptance, supported by citations to prospect signals and community feedback.
Document Processing and Information Extraction
Fenic's specialized data types streamline document processing pipelines. Whether handling markdown documentation, HTML content, or transcripts, the framework provides native operations for common tasks:
pythondocuments = ( df .with_column("content", fc.col("raw").cast(fc.MarkdownType)) .with_column("chunks", fc.markdown.extract_header_chunks("content", header_level=2)) .explode("chunks") .with_column("metadata", fc.semantic.extract("chunks", DocumentMetadata)) .with_column("embedding", fc.semantic.embed(fc.col("chunks").content)) )
The structured output integrates directly with search systems, RAG pipelines, and analytics dashboards.
Best Practices for DataFrame-Based Text Processing
Strategic Model Selection
Use smaller, cheaper models for simple tasks and reserve expensive models for complex reasoning:
pythonsemantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel("gpt-4o-nano"), # Classification "mini": fc.OpenAILanguageModel("gpt-4o-mini"), # Extraction "full": fc.AnthropicLanguageModel("claude-opus-4") # Complex reasoning } ) # Simple classification .semantic.classify(col, classes, model_alias="nano") # Structured extraction .semantic.extract(col, schema, model_alias="mini") # Multi-step reasoning .semantic.map(complex_instruction, model_alias="full")
The cost difference between models is often 10-100x. Strategic model selection can reduce costs by 80% while maintaining quality for appropriate tasks.
Design Clear Pydantic Schemas
Schema field descriptions guide LLM extraction and improve accuracy:
pythonclass Transaction(BaseModel): merchant: str = Field(description="The business name where transaction occurred") category: Literal["grocery", "dining", "transport", "entertainment", "other"] = Field( description="Transaction category based on merchant type and purchase details" ) amount: float = Field(description="Transaction amount in USD") is_recurring: bool = Field( description="True if this appears to be a recurring/subscription charge" )
Clear descriptions with examples and constraints improve extraction accuracy significantly. Literal types constrain outputs to valid categories, reducing hallucination.
Test Pipelines Incrementally
Develop and test with small representative samples before scaling:
python# Development: 100 rows df_sample = df.limit(100) result = df_sample.semantic.extract(...).collect() print(f"Cost for 100 rows: ${result.metrics().lm_metrics.total_cost}") # Validate results, then scale df_full.semantic.extract(...).write.parquet("output/")
Fenic's lazy evaluation and metrics make it trivial to estimate costs and validate logic before processing millions of rows.
Leverage Hybrid Approaches
Version 0.3.0 added built-in fuzzy string matching with six algorithms. Use fuzzy matching for initial candidate selection before expensive semantic joins:
python# Fast fuzzy scoring for blocking candidates = ( left_df.join(right_df) # Cross join .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) # Score is 0-100 ) # Then expensive semantic matching on candidates final = candidates.semantic.join( predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This hybrid approach reduces costs by orders of magnitude compared to semantic joins on full cross-products.
Integration with Existing Infrastructure
Lakehouse-Native Architecture
Fenic is pure compute with no proprietary storage layer. Read from and write to existing lakehouses without data movement:
pythondf = session.read.parquet("s3://data-lake/raw/*.parquet") processed = df.semantic.extract(...).filter(...) processed.write.parquet("s3://data-lake/processed/")
Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration with existing infrastructure. Built on Apache Arrow for ecosystem interoperability—processed data works with Spark, Polars, DuckDB, and pandas.
Agent Preprocessing Pipelines
Use Fenic to create structured context for agents that eliminates heavy inference from request paths. This separation enables predictable, responsive agents while leveraging Fenic's strength in batch processing.
Hybrid Workflows with Traditional ETL
Mix Fenic with existing Spark and Airflow workflows:
python# Spark preprocessing spark_df.write.parquet("s3://interim/") # Fenic semantic enrichment enriched = ( session.read.parquet("s3://interim/") .semantic.extract(...) .semantic.classify(...) ) enriched.write.parquet("s3://final/") # Continue with downstream Spark/dbt processing
Fenic excels at semantic operations while standard tools handle traditional ETL. Use the right tool for each step.
The Inference-First Architecture Advantage
Traditional data platforms treat LLM calls as external black-box UDFs that query optimizers cannot inspect or optimize. Fenic's inference-first approach embeds LLM operations directly into the query engine as first-class citizens.
When the query optimizer sees semantic.extract() or semantic.join(), it understands this is an inference operation with specific characteristics: high latency, token costs, batching benefits, and caching opportunities. The optimizer can:
- Reorder operations to minimize data processed by expensive inference
- Batch requests across rows to amortize fixed costs
- Cache aggressively since deterministic operations with same inputs produce same outputs
- Parallelize intelligently across multiple providers or models
- Estimate costs accurately before execution
This is impossible when LLM calls are hidden in UDFs or microservices. Fenic's semantic operators make inference visible to the optimizer, enabling optimizations that dramatically improve performance and reduce costs.
The declarative API also provides auditability and reproducibility. Every operation is explicitly defined with inputs, prompts, and model configurations tracked automatically. Row-level lineage traces data flow through transformations, critical for debugging and compliance.
Start Building with DataFrame Primitives
Composable semantic operators transform LLM pipelines from brittle glue code into robust, optimizable data transformations. By treating inference as a first-class operation within a familiar DataFrame API, Fenic enables developers to build production AI systems with the same rigor and reliability that data engineers have applied to traditional pipelines.
Key principles for success:
- Declarative operations enable optimization
- Type-safe schemas eliminate brittle prompts
- Intelligent batching reduces costs
- Row-level lineage makes debugging tractable
- Semantic operators compose naturally with traditional DataFrame operations
Start with simple operations like semantic.extract or semantic.classify on small datasets, validate results and costs, then scale to production with confidence that the infrastructure handles batching, optimization, error handling, and observability automatically.
The DataFrame abstraction proves as powerful for AI workloads as it was for traditional data engineering, now extended with semantic intelligence. When semantic operators compose naturally with traditional DataFrame operations, you stop choosing between structured and unstructured data—you build unified pipelines that handle both.
Explore Fenic on GitHub | Read the Fenic announcement | Learn more about Typedef How to Bring Structure to Un ... efcf080ccb332c8f86a53ab68.md External Displaying How to Bring Structure to Unstructured Text Using 2aadf41efcf080ccb332c8f86a53ab68.md.

