Modern data pipelines face a critical challenge: 80% of enterprise data exists in unstructured formats like PDFs, transcripts, images, and markdown documents, yet traditional data platforms can only efficiently process structured rows and columns. The result is fragmented infrastructure where teams maintain separate systems for structured ETL and unstructured content processing, creating maintenance overhead and operational complexity.
This guide demonstrates how to build unified pipelines that process both structured and unstructured data using a single DataFrame abstraction, eliminating the need for disparate tools and brittle integration code.
The Dual-Data Problem in Production Systems
Traditional data platforms treat unstructured content as an afterthought. When you need to analyze customer feedback alongside transaction data, or process support tickets with user metrics, the standard approach requires:
- Separate preprocessing steps for text extraction
- Custom scripts to call LLM APIs
- Manual data movement between systems
- Complex orchestration to combine results
This architectural split creates several problems:
Operational fragility: Each integration point introduces failure modes. Rate limits, API changes, and serialization issues cascade through pipelines.
Development velocity collapse: Engineers spend the majority of their time managing infrastructure rather than building features. Simple changes require updates across multiple disconnected systems.
Cost inefficiency: Duplicate API calls, suboptimal model selection, and poor caching strategies inflate processing costs. Without unified visibility, optimization becomes impossible.
The root cause is architectural: data platforms designed for deterministic operations on structured data cannot natively handle probabilistic inference on unstructured content.
The DataFrame Abstraction for Unified Processing
The solution lies in treating both structured and unstructured data as first-class citizens within a unified DataFrame API. Typedef's Fenic framework extends the familiar PySpark-inspired DataFrame abstraction with semantic operators that understand meaning, not just values.
This approach provides several advantages:
Single pipeline for mixed data: Combine traditional operations like filter and join with semantic operations like extract and classify in one declarative workflow.
Type-safe processing: Define schemas once using Pydantic models. The framework validates results automatically, eliminating manual parsing and error handling.
Automatic optimization: The query engine sees the entire pipeline—both traditional and semantic operations—enabling global optimization of batching, caching, and operation ordering.
Row-level lineage: Track every transformation, even through non-deterministic LLM operations. This makes debugging production issues tractable.
The key insight is that AI workloads are fundamentally pipelines: they take inputs, reason over context, generate outputs, and log results. DataFrames already handle these patterns for structured data. Extending them to unstructured content creates unified infrastructure.
Specialized Data Types for Unstructured Content
Traditional DataFrames support basic types: integers, strings, floats, booleans. Processing unstructured content requires richer type systems that understand document structure.
Fenic provides AI-native data types with specialized operations:
MarkdownType
Native support for markdown documents with structure-aware operations:
pythondf = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2) ) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks")) ) )
The markdown.extract_header_chunks function leverages document structure for semantically meaningful chunks rather than naive character-count splitting. This preserves context boundaries and avoids splits mid-sentence.
TranscriptType
Handle SRT, WebVTT, and generic transcript formats with speaker and timestamp awareness:
pythonprocessed = ( df .with_column("transcript", fc.col("file").cast(fc.TranscriptType)) .select( "*", fc.text.recursive_token_chunk( fc.col("transcript"), chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks") ) .explode("chunks") )
The framework preserves speaker identity and timestamps through transformations, enabling speaker-aware analysis without manual parsing.
JSONType
Manipulate nested JSON with JQ expressions:
python.with_column("author", fc.json.jq(fc.col("metadata"), ".author.name")) .with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]"))
This eliminates verbose Python dictionary navigation and handles missing keys gracefully.
EmbeddingType
First-class support for vector operations with fixed-length embedding vectors and similarity operations, enabling semantic search and clustering without external vector databases.
These specialized types integrate seamlessly with semantic operators, creating pipelines that process diverse content formats efficiently within a single framework.
Semantic Operators: The Bridge Between Data Types
Semantic operators enable natural language operations on data, functioning as first-class DataFrame primitives alongside traditional operations like filter and aggregate. The query engine understands these operations and optimizes them accordingly.
Schema-Driven Extraction
Transform unstructured text into structured data using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class Issue(BaseModel): category: Literal["bug", "feature_request", "question"] severity: Literal["low", "medium", "high", "critical"] description: str class Ticket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] region: Literal["us", "eu", "apac"] issues: List[Issue] tickets = ( df .with_column( "extracted", fc.semantic.extract(fc.col("raw_ticket"), Ticket) ) .unnest("extracted") .filter(fc.col("region") == "apac") .explode("issues") ) bugs = tickets.filter(fc.col("issues").category == "bug")
This pattern combines semantic extraction with traditional filtering. The semantic.extract operation converts unstructured ticket text into structured fields. Then standard DataFrame operations filter by region and explode nested lists. Finally, traditional column filtering isolates bugs.
The schema provides both documentation and validation. Field descriptions guide LLM extraction:
pythonclass Transaction(BaseModel): merchant: str = Field(description="The business name where transaction occurred") category: Literal["grocery", "dining", "transport"] = Field( description="Transaction category based on merchant type" ) amount: float = Field(description="Transaction amount in USD")
Natural Language Predicates
Filter data using natural language conditions instead of complex regex:
pythonapplicants = df.filter( (fc.col("yoe") > 5) & fc.semantic.predicate( "Has MCP Protocol experience? Resume: {resume}" ) )
This combines traditional boolean logic (years of experience) with semantic understanding (protocol experience). The query engine optimizes both together, potentially filtering on the numeric condition first before invoking the expensive LLM predicate.
Semantic Joins
Join DataFrames based on meaning rather than exact matches:
pythonjoined = ( applicants.semantic.join( jobs, """Is this candidate a good fit for the job? Candidate Background: {resume:left} Job Requirements: {job_description:right} Use the following criteria: - Technical skills alignment - Experience level appropriateness - Domain knowledge overlap""" ) .order_by("application_date") .limit(5) )
Unlike fuzzy string matching that measures character similarity, semantic joins understand domain-specific criteria and make nuanced decisions about row relationships.
Classification and Transformation
Additional operators handle common unstructured data tasks:
semantic.classify: Categorize text with few-shot examples, providing consistent category assignments without training custom modelssemantic.map: Apply natural language transformations for text generation, rewriting, translation, and summarizationsemantic.with_cluster_labels: Cluster rows by semantic similarity using embeddings, automatically grouping related contentsemantic.reduce: Aggregate grouped data with LLM operations for semantic summarizationsemantic.analyze_sentiment: Built-in sentiment analysis without external services
These operators compose naturally. Chain them with traditional operations to build sophisticated pipelines that handle both data types.
Step-by-Step: Building a Unified Pipeline
This section demonstrates building a complete pipeline that processes both structured metrics and unstructured content together.
Environment Setup
Install Fenic and configure LLM providers:
bashpip install fenic export OPENAI_API_KEY="your-openai-api-key" export ANTHROPIC_API_KEY="your-anthropic-api-key"
Initialize a session with semantic configuration:
pythonimport fenic as fc config = fc.SessionConfig( app_name="unified_pipeline", semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( "gpt-4.1-nano", rpm=500, tpm=200_000 ), "flash": fc.GoogleVertexLanguageModel( "gemini-2.0-flash-lite", rpm=300, tpm=150_000 ), }, default_language_model="flash", ), ) session = fc.Session.get_or_create(config)
Model aliases abstract provider-specific details. Using names like "nano" and "flash" makes it easy to swap models without changing pipeline code.
Loading Mixed Data
Load structured and unstructured data together:
python# Structured data: user metrics users_df = session.read.parquet("s3://bucket/users/*.parquet") # Unstructured data: support tickets tickets_df = session.read.json("s3://bucket/tickets/*.json")
Both DataFrames exist in the same execution context, enabling unified operations.
Extracting Structure from Unstructured Content
Define schemas for extraction:
pythonfrom pydantic import BaseModel from typing import List, Literal class SupportIssue(BaseModel): issue_type: Literal["technical", "billing", "feature_request"] priority: Literal["low", "medium", "high", "urgent"] affected_features: List[str] sentiment: Literal["frustrated", "neutral", "satisfied"] resolution_needed: bool
Extract structured data from ticket text:
pythonstructured_tickets = ( tickets_df .with_column( "issue_data", fc.semantic.extract(fc.col("ticket_text"), SupportIssue) ) .unnest("issue_data") )
The unnest operation flattens the nested Pydantic model into columns. Now structured_tickets has columns like issue_type, priority, and sentiment extracted from unstructured text.
Combining Structured and Unstructured in One Query
Join extracted data with structured metrics:
pythonanalysis = ( structured_tickets .filter(fc.col("priority").isin(["high", "urgent"])) .join( users_df, structured_tickets.user_id == users_df.id, "inner" ) .filter(fc.col("account_value") > 10000) .select( "ticket_id", "issue_type", "user_id", "account_value", "sentiment", fc.semantic.map( fc.col("ticket_text"), "Generate executive summary" ).alias("summary") ) )
This pipeline:
- Filters unstructured tickets by extracted priority (semantic operation result)
- Joins with structured user data (traditional operation)
- Filters by structured account value (traditional operation)
- Generates summaries from unstructured text (semantic operation)
The query engine optimizes the entire pipeline, potentially reordering operations to minimize expensive LLM calls.
Semantic Aggregation
Group and aggregate using semantic operations:
pythonissue_themes = ( analysis .with_column( "embedding", fc.semantic.embed(fc.col("ticket_text")) ) .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=5, label_column="cluster" ) .group_by("cluster", "issue_type") .agg( fc.count("*").alias("ticket_count"), fc.avg("account_value").alias("avg_value"), fc.semantic.reduce( fc.col("ticket_text"), "Summarize common themes across these tickets" ).alias("theme_summary") ) )
This combines:
- Semantic embedding generation
- Clustering by semantic similarity
- Traditional grouping by extracted categories
- Standard aggregations (count, average)
- Semantic aggregation (LLM summarization)
Writing Results
Write processed data back to the data lake:
pythonissue_themes.write.parquet("s3://bucket/analysis/themes/")
The framework supports standard formats (Parquet, CSV, JSON) and integrations (Iceberg, Delta Lake), ensuring compatibility with existing infrastructure.
Production Patterns for Mixed Data Pipelines
Production systems require reliability, cost efficiency, and observability beyond basic functionality.
Batch Preprocessing for Real-Time Systems
Separate heavy inference from request paths by preprocessing data in batch pipelines:
python# Offline batch processing enriched_docs = ( raw_documents .with_column("markdown", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks(fc.col("markdown"), header_level=2) ) .explode("chunks") .with_column( "embedding", fc.semantic.embed(fc.col("chunks")) ) .with_column( "metadata", fc.semantic.extract(fc.col("chunks"), DocumentMetadata) ) ) enriched_docs.write.parquet("s3://processed/documents/")
Real-time agents then query preprocessed data without expensive inference at request time. This provides predictable latency and better resource utilization.
Multi-Model Strategies
Configure different models for different tasks:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "cheap": fc.OpenAILanguageModel( "gpt-4o-mini", rpm=500, tpm=200_000 ), "accurate": fc.AnthropicLanguageModel( "claude-opus-4-0", rpm=100, tpm=100_000 ), }, default_language_model="cheap", ), ) pipeline = ( df # Simple classification with cheap model # Simple classification with cheap model .with_column( "category", fc.semantic.classify( fc.col("text"), categories=["support", "sales", "technical"], model_alias="cheap" ) ) # Complex extraction with accurate model .with_column( "detailed_analysis", fc.semantic.extract( fc.col("text"), ComplexSchema, model_alias="accurate" ) ) )
Strategic model selection can reduce costs by 80% while maintaining quality. Use inexpensive models for straightforward tasks and reserve powerful models for complex reasoning.
Intelligent Caching
Cache expensive operations explicitly:
pythonbase_data = ( df .filter(fc.col("date") >= "2024-01-01") .with_column( "extracted", fc.semantic.extract(fc.col("content"), Schema) ) .cache() ) # Multiple downstream operations use cached results analysis_a = base_data.filter(fc.col("region") == "us").collect() analysis_b = base_data.group_by("category").count().collect()
The framework also automatically caches identical inference calls within a session, preventing redundant API calls.
Monitoring and Cost Tracking
Access comprehensive metrics:
pythonresult = pipeline.collect() metrics = result.metrics() print(f"Total tokens: {metrics.lm_metrics.total_tokens}") print(f"Total cost: ${metrics.lm_metrics.total_cost}") print(f"Execution time: {metrics.execution_time}s") # Operator-level metrics for op_metric in metrics.operator_metrics: if op_metric.cost > 10.0: print(f"Expensive operator: {op_metric.name}, Cost: ${op_metric.cost}")
Token counting and cost tracking provide visibility into resource usage, helping teams optimize spending and stay within budget constraints.
Real-World Application: Product Triage Pipeline
RudderStack reduced triage time by 95% using unified pipelines that process both structured data and unstructured content together.
Their system ingests:
Structured data: Support ticket metadata, user segments, product usage metrics from the data warehouse
Unstructured data: Ticket descriptions, sales call transcripts, product documentation from Notion
The pipeline:
- Normalize inputs: Load warehouse data and scrape documentation into DataFrames
- Build semantic context: Infer product taxonomy from docs, map tickets to categories, create semantic links between issues and PRDs
- Classify and enrich: Use semantic operators to classify new tickets, surface related work, and propose decisions
- Persist results: Write mappings and rationales back to the warehouse for analytics
Key technical patterns:
pythontriage_pipeline = ( new_tickets_df # Extract structured data from unstructured tickets .with_column( "classification", fc.semantic.extract(fc.col("ticket_text"), TicketClassification) ) # Join with structured usage data .join( usage_metrics_df, new_tickets_df.customer_id == usage_metrics_df.customer_id ) # Semantic join to find related PRDs .semantic.join( prds_df, "Ticket {ticket_text:left} relates to PRD {prd_description:right}" ) # Traditional aggregation by category .group_by("classification.category") .agg( fc.count("*").alias("ticket_count"), fc.semantic.reduce( fc.col("ticket_text"), "Summarize priority rationale" ).alias("decision_rationale") ) )
Results:
- 95% reduction in PM time per triage
- 90%+ first-pass category acceptance
- Semantic links provide explainability and accuracy
- Citations surface prospect and community signals directly
The key insight: treating both structured metrics and unstructured content as first-class citizens in the same pipeline eliminated the need for separate systems and integration code.
Best Practices for Unified Pipelines
Design Clear Schemas
Pydantic schemas guide extraction quality. Provide detailed field descriptions:
pythonclass CustomerFeedback(BaseModel): primary_issue: str = Field( description="The main problem or request in 1-2 sentences" ) affected_features: List[str] = Field( description="Specific product features mentioned or implied" ) urgency: Literal["low", "medium", "high"] = Field( description="Urgency based on language, tone, and stated timelines" )
Use Literal types to constrain outputs and reduce hallucination.
Optimize Operation Order
Place cheap operations before expensive ones:
python# Good: Filter first, then extract pipeline = ( df .filter(fc.col("priority") == "high") # Cheap .with_column( "analysis", fc.semantic.extract(fc.col("text"), Schema) # Expensive ) ) # Bad: Extract everything, then filter pipeline = ( df .with_column( "analysis", fc.semantic.extract(fc.col("text"), Schema) # Expensive on all rows ) .filter(fc.col("priority") == "high") # Cheap )
The query optimizer may reorder some operations automatically, but explicit ordering helps.
Test with Small Samples
Validate logic and estimate costs before scaling:
python# Development with 100 rows sample = df.limit(100) result = sample.semantic.extract(...).collect() print(f"Cost for 100 rows: ${result.metrics().lm_metrics.total_cost}") print(f"Estimated cost for full dataset: ${result.metrics().lm_metrics.total_cost * (df.count() / 100)}") # Validate results, then scale df.semantic.extract(...).write.parquet("output/")
Lazy evaluation makes it trivial to test pipelines on representative samples.
Use Specialized Types Appropriately
Cast content to appropriate types early:
pythondf = ( df .with_column("doc", fc.col("content").cast(fc.MarkdownType)) .with_column("meta", fc.col("metadata").cast(fc.JsonType)) )
This unlocks specialized operations and improves performance by enabling type-specific optimizations.
Implement Hybrid Approaches
Combine traditional and semantic operations strategically:
python# Use fuzzy matching for initial filtering candidates = ( left_df.join(right_df) .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("name_left"), fc.col("name_right"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) # Then semantic matching on filtered candidates final = candidates.semantic.join( right_df, "Are these the same entity? {description_left:left} vs {description_right:right}" )
This reduces costs by orders of magnitude compared to semantic operations on full cross-products.
Infrastructure Considerations
Lakehouse-Native Architecture
The framework operates as pure compute without proprietary storage:
python# Read from existing data lake df = session.read.parquet("s3://lake/raw/*.parquet") # Process processed = df.semantic.extract(...).filter(...) # Write back to lake processed.write.format("iceberg").save("s3://lake/processed/")
Full compatibility with Parquet, Iceberg, Delta Lake enables integration with existing infrastructure. No data movement or duplication required.
Local Development to Cloud Deployment
Develop locally and deploy to cloud without code changes:
python# Local development config = fc.SessionConfig(app_name="dev_pipeline") session = fc.Session.get_or_create(config) # Production deployment - same code, just update config config = fc.SessionConfig( app_name="prod_pipeline", cloud=fc.CloudConfig(size=fc.CloudExecutorSize.LARGE) ) session = fc.Session.get_or_create(config)
The framework handles scaling automatically. Same pipeline code works locally and in production.
Rate Limiting and Reliability
The framework automatically handles rate limits:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o", rpm=500, # Requests per minute tpm=200_000 # Tokens per minute ) } ) )
The engine tracks usage in real-time and self-throttles when approaching limits. Built-in retry logic handles transient failures automatically.
Moving from Fragmented to Unified
Traditional approaches to mixed data processing create architectural complexity:
Before: Structured ETL in Spark, unstructured processing in custom Python scripts, manual orchestration in Airflow, data movement between systems
After: Single DataFrame pipeline processing both data types, unified optimization, automatic batching and caching, integrated lineage tracking
The benefits compound in production:
Development velocity: Build features instead of managing infrastructure. Changes require updates to one pipeline instead of multiple systems.
Operational stability: Fewer integration points mean fewer failure modes. Built-in retry logic and rate limiting provide reliability by default.
Cost efficiency: Global optimization reduces redundant operations. Strategic model selection and caching minimize inference costs.
Debugging tractability: Row-level lineage traces data through transformations. Comprehensive metrics identify bottlenecks.
The technical foundation is an inference-first query engine where LLM operations are first-class citizens alongside traditional operations. This enables optimization impossible when inference happens in external black-box services.
Getting Started
Installation requires Python 3.10+:
bashpip install fenic
Configure at least one LLM provider and build your first unified pipeline:
pythonimport fenic as fc from pydantic import BaseModel from typing import Literal class FeedbackAnalysis(BaseModel): topic: Literal["product", "support", "billing", "other"] sentiment: Literal["positive", "negative", "neutral"] key_points: list[str] config = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "default": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ) } ) ) session = fc.Session.get_or_create(config) # Load structured user data users = session.read.csv("users.csv") # Load unstructured feedback feedback = session.read.json("feedback.json") # Unified pipeline analysis = ( feedback .with_column( "analysis", fc.semantic.extract(fc.col("feedback_text"), FeedbackAnalysis) ) .unnest("analysis") .join(users, feedback.user_id == users.id) .filter( (fc.col("sentiment") == "negative") & (fc.col("account_tier") == "enterprise") ) .select( "user_id", "account_tier", "topic", "key_points", fc.semantic.map( fc.col("feedback_text"), "Generate action items" ).alias("action_items") ) ) analysis.show()
This pipeline extracts structure from unstructured feedback, joins with structured user data, applies mixed filtering, and generates summaries—all in one declarative workflow.
For additional resources, implementation guides, and access to the cloud platform, visit the Typedef documentation.
Conclusion
Processing unstructured and structured data in separate systems creates fragmentation, operational overhead, and missed optimization opportunities. Unified pipelines that treat both data types as first-class citizens through a DataFrame abstraction eliminate these problems.
The technical foundation requires:
- Specialized data types that understand content structure (Markdown, Transcript, JSON)
- Semantic operators as native DataFrame primitives
- An inference-first query engine that optimizes across operation types
- Row-level lineage for debugging non-deterministic transformations
The result: single pipelines that combine traditional operations like filtering and joining with semantic operations like extraction and classification, all optimized together by a query engine that understands both.
Organizations report order-of-magnitude improvements in development velocity, cost efficiency, and operational reliability when moving from fragmented infrastructure to unified processing. The architectural simplification alone justifies adoption, but the ability to optimize globally across operation types provides additional leverage.
For teams building AI-powered data products, unified DataFrame APIs for mixed data represent the infrastructure foundation that makes production systems tractable. How to Process Unstructured D ... 1efcf08056b27df3c690d38027.md External Displaying How to Process Unstructured Data Alongside Structu 2aadf41efcf08056b27df3c690d38027.md.

