<< goback()

How to Process Unstructured Data Alongside Structured Data in One Platform

Typedef Team

How to Process Unstructured Data Alongside Structured Data in One Platform

Modern data pipelines face a critical challenge: 80% of enterprise data exists in unstructured formats like PDFs, transcripts, images, and markdown documents, yet traditional data platforms can only efficiently process structured rows and columns. The result is fragmented infrastructure where teams maintain separate systems for structured ETL and unstructured content processing, creating maintenance overhead and operational complexity.

This guide demonstrates how to build unified pipelines that process both structured and unstructured data using a single DataFrame abstraction, eliminating the need for disparate tools and brittle integration code.

The Dual-Data Problem in Production Systems

Traditional data platforms treat unstructured content as an afterthought. When you need to analyze customer feedback alongside transaction data, or process support tickets with user metrics, the standard approach requires:

  • Separate preprocessing steps for text extraction
  • Custom scripts to call LLM APIs
  • Manual data movement between systems
  • Complex orchestration to combine results

This architectural split creates several problems:

Operational fragility: Each integration point introduces failure modes. Rate limits, API changes, and serialization issues cascade through pipelines.

Development velocity collapse: Engineers spend the majority of their time managing infrastructure rather than building features. Simple changes require updates across multiple disconnected systems.

Cost inefficiency: Duplicate API calls, suboptimal model selection, and poor caching strategies inflate processing costs. Without unified visibility, optimization becomes impossible.

The root cause is architectural: data platforms designed for deterministic operations on structured data cannot natively handle probabilistic inference on unstructured content.

The DataFrame Abstraction for Unified Processing

The solution lies in treating both structured and unstructured data as first-class citizens within a unified DataFrame API. Typedef's Fenic framework extends the familiar PySpark-inspired DataFrame abstraction with semantic operators that understand meaning, not just values.

This approach provides several advantages:

Single pipeline for mixed data: Combine traditional operations like filter and join with semantic operations like extract and classify in one declarative workflow.

Type-safe processing: Define schemas once using Pydantic models. The framework validates results automatically, eliminating manual parsing and error handling.

Automatic optimization: The query engine sees the entire pipeline—both traditional and semantic operations—enabling global optimization of batching, caching, and operation ordering.

Row-level lineage: Track every transformation, even through non-deterministic LLM operations. This makes debugging production issues tractable.

The key insight is that AI workloads are fundamentally pipelines: they take inputs, reason over context, generate outputs, and log results. DataFrames already handle these patterns for structured data. Extending them to unstructured content creates unified infrastructure.

Specialized Data Types for Unstructured Content

Traditional DataFrames support basic types: integers, strings, floats, booleans. Processing unstructured content requires richer type systems that understand document structure.

Fenic provides AI-native data types with specialized operations:

MarkdownType

Native support for markdown documents with structure-aware operations:

python
df = (
    df
    .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2)
    )
    .explode("chunks")
    .with_column(
        "embeddings",
        fc.semantic.embed(fc.col("chunks"))
    )
)

The markdown.extract_header_chunks function leverages document structure for semantically meaningful chunks rather than naive character-count splitting. This preserves context boundaries and avoids splits mid-sentence.

TranscriptType

Handle SRT, WebVTT, and generic transcript formats with speaker and timestamp awareness:

python
processed = (
    df
    .with_column("transcript", fc.col("file").cast(fc.TranscriptType))
    .select(
        "*",
        fc.text.recursive_token_chunk(
            fc.col("transcript"),
            chunk_size=1200,
            chunk_overlap_percentage=0
        ).alias("chunks")
    )
    .explode("chunks")
)

The framework preserves speaker identity and timestamps through transformations, enabling speaker-aware analysis without manual parsing.

JSONType

Manipulate nested JSON with JQ expressions:

python
.with_column("author", fc.json.jq(fc.col("metadata"), ".author.name"))
.with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]"))

This eliminates verbose Python dictionary navigation and handles missing keys gracefully.

EmbeddingType

First-class support for vector operations with fixed-length embedding vectors and similarity operations, enabling semantic search and clustering without external vector databases.

These specialized types integrate seamlessly with semantic operators, creating pipelines that process diverse content formats efficiently within a single framework.

Semantic Operators: The Bridge Between Data Types

Semantic operators enable natural language operations on data, functioning as first-class DataFrame primitives alongside traditional operations like filter and aggregate. The query engine understands these operations and optimizes them accordingly.

Schema-Driven Extraction

Transform unstructured text into structured data using Pydantic schemas:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class Issue(BaseModel):
    category: Literal["bug", "feature_request", "question"]
    severity: Literal["low", "medium", "high", "critical"]
    description: str

class Ticket(BaseModel):
    customer_tier: Literal["free", "pro", "enterprise"]
    region: Literal["us", "eu", "apac"]
    issues: List[Issue]

tickets = (
    df
    .with_column(
        "extracted",
        fc.semantic.extract(fc.col("raw_ticket"), Ticket)
    )
    .unnest("extracted")
    .filter(fc.col("region") == "apac")
    .explode("issues")
)

bugs = tickets.filter(fc.col("issues").category == "bug")

This pattern combines semantic extraction with traditional filtering. The semantic.extract operation converts unstructured ticket text into structured fields. Then standard DataFrame operations filter by region and explode nested lists. Finally, traditional column filtering isolates bugs.

The schema provides both documentation and validation. Field descriptions guide LLM extraction:

python
class Transaction(BaseModel):
    merchant: str = Field(description="The business name where transaction occurred")
    category: Literal["grocery", "dining", "transport"] = Field(
        description="Transaction category based on merchant type"
    )
    amount: float = Field(description="Transaction amount in USD")

Natural Language Predicates

Filter data using natural language conditions instead of complex regex:

python
applicants = df.filter(
    (fc.col("yoe") > 5) &
    fc.semantic.predicate(
        "Has MCP Protocol experience? Resume: {resume}"
    )
)

This combines traditional boolean logic (years of experience) with semantic understanding (protocol experience). The query engine optimizes both together, potentially filtering on the numeric condition first before invoking the expensive LLM predicate.

Semantic Joins

Join DataFrames based on meaning rather than exact matches:

python
joined = (
    applicants.semantic.join(
        jobs,
        """Is this candidate a good fit for the job?
Candidate Background: {resume:left}
Job Requirements: {job_description:right}
Use the following criteria:
- Technical skills alignment
- Experience level appropriateness
- Domain knowledge overlap"""
    )
    .order_by("application_date")
    .limit(5)
)

Unlike fuzzy string matching that measures character similarity, semantic joins understand domain-specific criteria and make nuanced decisions about row relationships.

Classification and Transformation

Additional operators handle common unstructured data tasks:

  • semantic.classify: Categorize text with few-shot examples, providing consistent category assignments without training custom models
  • semantic.map: Apply natural language transformations for text generation, rewriting, translation, and summarization
  • semantic.with_cluster_labels: Cluster rows by semantic similarity using embeddings, automatically grouping related content
  • semantic.reduce: Aggregate grouped data with LLM operations for semantic summarization
  • semantic.analyze_sentiment: Built-in sentiment analysis without external services

These operators compose naturally. Chain them with traditional operations to build sophisticated pipelines that handle both data types.

Step-by-Step: Building a Unified Pipeline

This section demonstrates building a complete pipeline that processes both structured metrics and unstructured content together.

Environment Setup

Install Fenic and configure LLM providers:

bash
pip install fenic

export OPENAI_API_KEY="your-openai-api-key"
export ANTHROPIC_API_KEY="your-anthropic-api-key"

Initialize a session with semantic configuration:

python
import fenic as fc

config = fc.SessionConfig(
    app_name="unified_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                "gpt-4.1-nano",
                rpm=500,
                tpm=200_000
            ),
            "flash": fc.GoogleVertexLanguageModel(
                "gemini-2.0-flash-lite",
                rpm=300,
                tpm=150_000
            ),
        },
        default_language_model="flash",
    ),
)

session = fc.Session.get_or_create(config)

Model aliases abstract provider-specific details. Using names like "nano" and "flash" makes it easy to swap models without changing pipeline code.

Loading Mixed Data

Load structured and unstructured data together:

python
# Structured data: user metrics
users_df = session.read.parquet("s3://bucket/users/*.parquet")

# Unstructured data: support tickets
tickets_df = session.read.json("s3://bucket/tickets/*.json")

Both DataFrames exist in the same execution context, enabling unified operations.

Extracting Structure from Unstructured Content

Define schemas for extraction:

python
from pydantic import BaseModel
from typing import List, Literal

class SupportIssue(BaseModel):
    issue_type: Literal["technical", "billing", "feature_request"]
    priority: Literal["low", "medium", "high", "urgent"]
    affected_features: List[str]
    sentiment: Literal["frustrated", "neutral", "satisfied"]
    resolution_needed: bool

Extract structured data from ticket text:

python
structured_tickets = (
    tickets_df
    .with_column(
        "issue_data",
        fc.semantic.extract(fc.col("ticket_text"), SupportIssue)
    )
    .unnest("issue_data")
)

The unnest operation flattens the nested Pydantic model into columns. Now structured_tickets has columns like issue_type, priority, and sentiment extracted from unstructured text.

Combining Structured and Unstructured in One Query

Join extracted data with structured metrics:

python
analysis = (
    structured_tickets
    .filter(fc.col("priority").isin(["high", "urgent"]))
    .join(
        users_df,
        structured_tickets.user_id == users_df.id,
        "inner"
    )
    .filter(fc.col("account_value") > 10000)
    .select(
    "ticket_id",
    "issue_type",
    "user_id",
    "account_value",
    "sentiment",
    fc.semantic.map(
        fc.col("ticket_text"),
        "Generate executive summary"
    ).alias("summary")
)
)

This pipeline:

  1. Filters unstructured tickets by extracted priority (semantic operation result)
  2. Joins with structured user data (traditional operation)
  3. Filters by structured account value (traditional operation)
  4. Generates summaries from unstructured text (semantic operation)

The query engine optimizes the entire pipeline, potentially reordering operations to minimize expensive LLM calls.

Semantic Aggregation

Group and aggregate using semantic operations:

python
issue_themes = (
    analysis
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("ticket_text"))
    )
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=5,
        label_column="cluster"
    )
    .group_by("cluster", "issue_type")
    .agg(
        fc.count("*").alias("ticket_count"),
        fc.avg("account_value").alias("avg_value"),
        fc.semantic.reduce(
            fc.col("ticket_text"),
            "Summarize common themes across these tickets"
        ).alias("theme_summary")
    )
)

This combines:

  • Semantic embedding generation
  • Clustering by semantic similarity
  • Traditional grouping by extracted categories
  • Standard aggregations (count, average)
  • Semantic aggregation (LLM summarization)

Writing Results

Write processed data back to the data lake:

python
issue_themes.write.parquet("s3://bucket/analysis/themes/")

The framework supports standard formats (Parquet, CSV, JSON) and integrations (Iceberg, Delta Lake), ensuring compatibility with existing infrastructure.

Production Patterns for Mixed Data Pipelines

Production systems require reliability, cost efficiency, and observability beyond basic functionality.

Batch Preprocessing for Real-Time Systems

Separate heavy inference from request paths by preprocessing data in batch pipelines:

python
# Offline batch processing
enriched_docs = (
    raw_documents
    .with_column("markdown", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(fc.col("markdown"), header_level=2)
    )
    .explode("chunks")
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("chunks"))
    )
    .with_column(
        "metadata",
        fc.semantic.extract(fc.col("chunks"), DocumentMetadata)
    )
)

enriched_docs.write.parquet("s3://processed/documents/")

Real-time agents then query preprocessed data without expensive inference at request time. This provides predictable latency and better resource utilization.

Multi-Model Strategies

Configure different models for different tasks:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "cheap": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "accurate": fc.AnthropicLanguageModel(
                "claude-opus-4-0",
                rpm=100,
                tpm=100_000
            ),
        },
        default_language_model="cheap",
    ),
)

pipeline = (
    df
    # Simple classification with cheap model
    # Simple classification with cheap model
.with_column(
    "category",
    fc.semantic.classify(
        fc.col("text"),
        categories=["support", "sales", "technical"],
        model_alias="cheap"
    )
)
    # Complex extraction with accurate model
    .with_column(
        "detailed_analysis",
        fc.semantic.extract(
            fc.col("text"),
            ComplexSchema,
            model_alias="accurate"
        )
    )
)

Strategic model selection can reduce costs by 80% while maintaining quality. Use inexpensive models for straightforward tasks and reserve powerful models for complex reasoning.

Intelligent Caching

Cache expensive operations explicitly:

python
base_data = (
    df
    .filter(fc.col("date") >= "2024-01-01")
    .with_column(
        "extracted",
        fc.semantic.extract(fc.col("content"), Schema)
    )
    .cache()
)

# Multiple downstream operations use cached results
analysis_a = base_data.filter(fc.col("region") == "us").collect()
analysis_b = base_data.group_by("category").count().collect()

The framework also automatically caches identical inference calls within a session, preventing redundant API calls.

Monitoring and Cost Tracking

Access comprehensive metrics:

python
result = pipeline.collect()
metrics = result.metrics()

print(f"Total tokens: {metrics.lm_metrics.total_tokens}")
print(f"Total cost: ${metrics.lm_metrics.total_cost}")
print(f"Execution time: {metrics.execution_time}s")

# Operator-level metrics
for op_metric in metrics.operator_metrics:
    if op_metric.cost > 10.0:
        print(f"Expensive operator: {op_metric.name}, Cost: ${op_metric.cost}")

Token counting and cost tracking provide visibility into resource usage, helping teams optimize spending and stay within budget constraints.

Real-World Application: Product Triage Pipeline

RudderStack reduced triage time by 95% using unified pipelines that process both structured data and unstructured content together.

Their system ingests:

Structured data: Support ticket metadata, user segments, product usage metrics from the data warehouse

Unstructured data: Ticket descriptions, sales call transcripts, product documentation from Notion

The pipeline:

  1. Normalize inputs: Load warehouse data and scrape documentation into DataFrames
  2. Build semantic context: Infer product taxonomy from docs, map tickets to categories, create semantic links between issues and PRDs
  3. Classify and enrich: Use semantic operators to classify new tickets, surface related work, and propose decisions
  4. Persist results: Write mappings and rationales back to the warehouse for analytics

Key technical patterns:

python
triage_pipeline = (
    new_tickets_df
    # Extract structured data from unstructured tickets
    .with_column(
        "classification",
        fc.semantic.extract(fc.col("ticket_text"), TicketClassification)
    )
    # Join with structured usage data
    .join(
        usage_metrics_df,
        new_tickets_df.customer_id == usage_metrics_df.customer_id
    )
    # Semantic join to find related PRDs
    .semantic.join(
        prds_df,
        "Ticket {ticket_text:left} relates to PRD {prd_description:right}"
    )
    # Traditional aggregation by category
    .group_by("classification.category")
    .agg(
        fc.count("*").alias("ticket_count"),
        fc.semantic.reduce(
            fc.col("ticket_text"),
            "Summarize priority rationale"
        ).alias("decision_rationale")
    )
)

Results:

  • 95% reduction in PM time per triage
  • 90%+ first-pass category acceptance
  • Semantic links provide explainability and accuracy
  • Citations surface prospect and community signals directly

The key insight: treating both structured metrics and unstructured content as first-class citizens in the same pipeline eliminated the need for separate systems and integration code.

Best Practices for Unified Pipelines

Design Clear Schemas

Pydantic schemas guide extraction quality. Provide detailed field descriptions:

python
class CustomerFeedback(BaseModel):
    primary_issue: str = Field(
        description="The main problem or request in 1-2 sentences"
    )
    affected_features: List[str] = Field(
        description="Specific product features mentioned or implied"
    )
    urgency: Literal["low", "medium", "high"] = Field(
        description="Urgency based on language, tone, and stated timelines"
    )

Use Literal types to constrain outputs and reduce hallucination.

Optimize Operation Order

Place cheap operations before expensive ones:

python
# Good: Filter first, then extract
pipeline = (
    df
    .filter(fc.col("priority") == "high")  # Cheap
    .with_column(
        "analysis",
        fc.semantic.extract(fc.col("text"), Schema)  # Expensive
    )
)

# Bad: Extract everything, then filter
pipeline = (
    df
    .with_column(
        "analysis",
        fc.semantic.extract(fc.col("text"), Schema)  # Expensive on all rows
    )
    .filter(fc.col("priority") == "high")  # Cheap
)

The query optimizer may reorder some operations automatically, but explicit ordering helps.

Test with Small Samples

Validate logic and estimate costs before scaling:

python
# Development with 100 rows
sample = df.limit(100)
result = sample.semantic.extract(...).collect()
print(f"Cost for 100 rows: ${result.metrics().lm_metrics.total_cost}")
print(f"Estimated cost for full dataset: ${result.metrics().lm_metrics.total_cost * (df.count() / 100)}")

# Validate results, then scale
df.semantic.extract(...).write.parquet("output/")

Lazy evaluation makes it trivial to test pipelines on representative samples.

Use Specialized Types Appropriately

Cast content to appropriate types early:

python
df = (
    df
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column("meta", fc.col("metadata").cast(fc.JsonType))
)

This unlocks specialized operations and improves performance by enabling type-specific optimizations.

Implement Hybrid Approaches

Combine traditional and semantic operations strategically:

python
# Use fuzzy matching for initial filtering
candidates = (
    left_df.join(right_df)
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("name_left"),
            fc.col("name_right"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

# Then semantic matching on filtered candidates
final = candidates.semantic.join(
    right_df,
    "Are these the same entity? {description_left:left} vs {description_right:right}"
)

This reduces costs by orders of magnitude compared to semantic operations on full cross-products.

Infrastructure Considerations

Lakehouse-Native Architecture

The framework operates as pure compute without proprietary storage:

python
# Read from existing data lake
df = session.read.parquet("s3://lake/raw/*.parquet")

# Process
processed = df.semantic.extract(...).filter(...)

# Write back to lake
processed.write.format("iceberg").save("s3://lake/processed/")

Full compatibility with Parquet, Iceberg, Delta Lake enables integration with existing infrastructure. No data movement or duplication required.

Local Development to Cloud Deployment

Develop locally and deploy to cloud without code changes:

python
# Local development
config = fc.SessionConfig(app_name="dev_pipeline")
session = fc.Session.get_or_create(config)

# Production deployment - same code, just update config
config = fc.SessionConfig(
    app_name="prod_pipeline",
    cloud=fc.CloudConfig(size=fc.CloudExecutorSize.LARGE)
)
session = fc.Session.get_or_create(config)

The framework handles scaling automatically. Same pipeline code works locally and in production.

Rate Limiting and Reliability

The framework automatically handles rate limits:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=500,  # Requests per minute
                tpm=200_000  # Tokens per minute
            )
        }
    )
)

The engine tracks usage in real-time and self-throttles when approaching limits. Built-in retry logic handles transient failures automatically.

Moving from Fragmented to Unified

Traditional approaches to mixed data processing create architectural complexity:

Before: Structured ETL in Spark, unstructured processing in custom Python scripts, manual orchestration in Airflow, data movement between systems

After: Single DataFrame pipeline processing both data types, unified optimization, automatic batching and caching, integrated lineage tracking

The benefits compound in production:

Development velocity: Build features instead of managing infrastructure. Changes require updates to one pipeline instead of multiple systems.

Operational stability: Fewer integration points mean fewer failure modes. Built-in retry logic and rate limiting provide reliability by default.

Cost efficiency: Global optimization reduces redundant operations. Strategic model selection and caching minimize inference costs.

Debugging tractability: Row-level lineage traces data through transformations. Comprehensive metrics identify bottlenecks.

The technical foundation is an inference-first query engine where LLM operations are first-class citizens alongside traditional operations. This enables optimization impossible when inference happens in external black-box services.

Getting Started

Installation requires Python 3.10+:

bash
pip install fenic

Configure at least one LLM provider and build your first unified pipeline:

python
import fenic as fc
from pydantic import BaseModel
from typing import Literal

class FeedbackAnalysis(BaseModel):
    topic: Literal["product", "support", "billing", "other"]
    sentiment: Literal["positive", "negative", "neutral"]
    key_points: list[str]

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            )
        }
    )
)

session = fc.Session.get_or_create(config)

# Load structured user data
users = session.read.csv("users.csv")

# Load unstructured feedback
feedback = session.read.json("feedback.json")

# Unified pipeline
analysis = (
    feedback
    .with_column(
        "analysis",
        fc.semantic.extract(fc.col("feedback_text"), FeedbackAnalysis)
    )
    .unnest("analysis")
    .join(users, feedback.user_id == users.id)
    .filter(
        (fc.col("sentiment") == "negative") &
        (fc.col("account_tier") == "enterprise")
    )
    .select(
        "user_id",
        "account_tier",
        "topic",
        "key_points",
        fc.semantic.map(
            fc.col("feedback_text"),
            "Generate action items"
        ).alias("action_items")
    )
)

analysis.show()

This pipeline extracts structure from unstructured feedback, joins with structured user data, applies mixed filtering, and generates summaries—all in one declarative workflow.

For additional resources, implementation guides, and access to the cloud platform, visit the Typedef documentation.

Conclusion

Processing unstructured and structured data in separate systems creates fragmentation, operational overhead, and missed optimization opportunities. Unified pipelines that treat both data types as first-class citizens through a DataFrame abstraction eliminate these problems.

The technical foundation requires:

  • Specialized data types that understand content structure (Markdown, Transcript, JSON)
  • Semantic operators as native DataFrame primitives
  • An inference-first query engine that optimizes across operation types
  • Row-level lineage for debugging non-deterministic transformations

The result: single pipelines that combine traditional operations like filtering and joining with semantic operations like extraction and classification, all optimized together by a query engine that understands both.

Organizations report order-of-magnitude improvements in development velocity, cost efficiency, and operational reliability when moving from fragmented infrastructure to unified processing. The architectural simplification alone justifies adoption, but the ability to optimize globally across operation types provides additional leverage.

For teams building AI-powered data products, unified DataFrame APIs for mixed data represent the infrastructure foundation that makes production systems tractable. How to Process Unstructured D ... 1efcf08056b27df3c690d38027.md External Displaying How to Process Unstructured Data Alongside Structu 2aadf41efcf08056b27df3c690d38027.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.