<< goback()

How to Create Unified Workflows for Structured Tables and Messy Text

Typedef Team

How to Create Unified Workflows for Structured Tables and Messy Text

Most data pipelines fail at the boundary between structured and unstructured data. You have clean SQL tables on one side and messy PDFs, transcripts, and raw text on the other. The standard approach involves stitching together separate systems with brittle glue code that breaks under production load.

This article shows how to build unified workflows that process structured tables and unstructured text within a single framework using Typedef's Fenic, an open-source DataFrame library designed for AI workloads.

The Problem with Fragmented Data Workflows

Traditional data infrastructure separates structured and unstructured processing into distinct systems. You extract data from databases, preprocess text with custom scripts, call LLM APIs through microservices, then somehow merge everything back together.

This fragmentation creates three critical issues:

Infrastructure complexity - Each connection point between systems introduces failure modes. OCR models extract text from PDFs, transcription services handle audio, LLM providers process requests at different rate limits, and data warehouses store results. Managing dependencies, retries, and error handling across these boundaries consumes significant engineering resources.

Performance bottlenecks - Data moves between systems through serialization and deserialization cycles. What should be a single pipeline becomes a series of disconnected steps, each adding latency. Context window chunking logic scatters throughout codebases. Cost optimization requires balancing expensive accurate models against cheaper unreliable ones, but without unified visibility into the full pipeline.

Maintenance burden - Simple changes ripple across multiple systems. A schema update requires modifying extraction scripts, validation logic, database schemas, and API contracts. Testing becomes impractical with so many external dependencies. Debugging requires tracing through dozens of disconnected services.

The root cause is architectural: legacy data platforms were built for deterministic SQL operations, not probabilistic inference. LLM calls exist outside the query engine as external black boxes, preventing optimization and creating impedance mismatches.

Unified Workflows Through DataFrames

Fenic treats both structured tables and unstructured text as first-class DataFrame operations. The same API handles SQL-style filtering and LLM-powered extraction. Semantic operators like semantic.extract and semantic.join work alongside traditional operations like filter and group_by.

This unified approach means the query engine sees your entire pipeline. It can batch LLM calls efficiently, cache repeated operations, reorder execution for performance, and provide accurate cost estimates before running expensive inference.

The DataFrame abstraction provides three key benefits for mixed data workflows:

Lineage tracking - Every column and row has traceable origins, even when transformations involve non-deterministic model outputs. Debug issues by tracing individual records through the pipeline.

Columnar consistency - Whether working with structured IDs or unstructured summaries, data maintains structure throughout transformations. No more parsing JSON strings or validating ad-hoc formats.

Declarative optimization - Express what you want, let the engine determine how to execute efficiently. The optimizer batches requests, manages rate limits, and maximizes throughput automatically.

Setting Up Your Environment

Install Fenic with Python 3.10, 3.11, or 3.12:

bash
pip install fenic

Configure at least one LLM provider:

bash
export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"

Initialize a session with semantic capabilities:

python
import fenic as fc

config = fc.SessionConfig(
    app_name="unified_workflow",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "accurate": fc.AnthropicLanguageModel(
                "claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            ),
        },
        default_language_model="fast",
    ),
)

session = fc.Session.get_or_create(config)

This configuration defines model aliases that abstract provider details. Switch between models or providers without changing pipeline code, critical for production systems optimizing cost and performance dynamically.

Working with Structured Table Operations

Start with familiar DataFrame operations for structured data. Load data from any source:

python
# From CSV
customers = session.read.csv("customers.csv")

# From Parquet on S3
orders = session.read.parquet("s3://data-lake/orders/*.parquet")

# From existing tables
products = session.table("products")

Apply standard transformations:

python
high_value_customers = (
    customers
    .filter(fc.col("lifetime_value") > 10000)
    .filter(fc.col("region") == "enterprise")
    .select("customer_id", "company_name", "industry")
)

Join tables using traditional predicates:

python
customer_orders = (
    high_value_customers
    .join(
        orders,
        on="customer_id"
    )
    .group_by("customer_id", "company_name")
    .agg(
        fc.count("order_id").alias("order_count"),
        fc.sum("order_total").alias("total_revenue")
    )
)

These operations work exactly as expected in any DataFrame library. The difference appears when combining structured and unstructured data.

Processing Unstructured Text with Native Types

Fenic introduces specialized data types for unstructured content. These aren't metadata tags but unlock type-specific operations.

Working with MarkdownType

Process markdown documents with structure-aware operations:

python
docs = session.read.csv("documents.csv")

processed_docs = (
    docs
    .with_column("raw_content", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks(fc.col("raw_content"), header_level=2)
    )
    .with_column(
        "title",
        fc.json.jq(fc.markdown.to_json(fc.col("raw_content")), ".title")
    )
    .explode("sections")
)

The markdown.extract_header_chunks function preserves document structure rather than using naive character-count splitting. This dramatically improves downstream processing quality by maintaining context boundaries.

Handling TranscriptType

Process transcripts with speaker awareness:

python
from pydantic import BaseModel, Field
from typing import List

class SegmentSchema(BaseModel):
    speaker: str = Field(description="Who is talking")
    start_time: float = Field(description="Start time in seconds")
    key_points: List[str] = Field(description="Main points discussed")

meetings = (
    session.read.csv("transcripts.csv")
    .with_column("transcript", fc.col("file_content").cast(fc.TranscriptType))
    .with_column(
        "chunks",
        fc.text.recursive_token_chunk(fc.col("transcript"), chunk_size=1200)
    )
    .explode("chunks")
    .with_column(
        "segment",
        fc.semantic.extract("chunks", SegmentSchema, model_alias="fast")
    )
)

Fenic preserves speaker identity and timestamps through transformations, enabling speaker-aware analysis without manual parsing.

Manipulating JSON with JQ

Handle nested JSON with elegant expressions:

python
api_responses = (
    session.read.json("api_logs.json")
    .with_column("user_id", fc.json.jq(fc.col("payload"), ".user.id"))
    .with_column("error_codes", fc.json.jq(fc.col("payload"), ".errors[].code"))
    .filter(fc.col("error_codes").isNotNull())
)

This eliminates verbose dictionary navigation and handles missing keys gracefully.

Combining Structured and Unstructured Data

The real power emerges when mixing traditional and semantic operations in unified pipelines.

Schema-Driven Extraction

Transform unstructured text into structured columns using Pydantic schemas:

python
from pydantic import BaseModel
from typing import Literal, List

class SupportTicket(BaseModel):
    customer_tier: Literal["free", "pro", "enterprise"]
    region: Literal["us", "eu", "apac"]
    severity: Literal["low", "medium", "high", "critical"]
    category: Literal["bug", "feature", "question"]
    summary: str

tickets = (
    session.read.csv("raw_tickets.csv")
    .with_column(
        "parsed",
        fc.semantic.extract("raw_content", SupportTicket, model_alias="fast")
    )
    .unnest("parsed")
    .filter(fc.col("severity").isin(["high", "critical"]))
    .filter(fc.col("region") == "apac")
)

The schema guides LLM extraction with clear constraints. After extraction, data becomes structured and you can apply standard filtering and aggregation.

Semantic Predicates for Intelligent Filtering

Combine boolean logic with natural language conditions:

python
qualified_applicants = (
    candidates
    .filter(fc.col("years_experience") > 5)
    .filter(fc.col("education_level") == "masters")
    .filter(
        fc.semantic.predicate(
            "Does this resume show experience with distributed systems? Resume: {{resume}}",
            resume=fc.col("resume_text")
        )
    )
)

The query engine optimizes by filtering on cheap boolean conditions before invoking expensive LLM predicates.

Semantic Joins for Meaning-Based Matching

Join tables based on semantic similarity rather than exact matches:

python
prompt = """
Does this candidate match this job requirement?

Candidate: {{candidate_background}}
Job: {{job_requirements}}

Consider:
- Technical skills alignment
- Experience level
- Domain expertise
"""

matches = (
    candidates
    .semantic.join(
        other=job_postings,
        predicate=prompt,
        left_on=fc.col("resume_text"),
        right_on=fc.col("job_description")
    )
    .filter(fc.col("match_score") > 0.8)
    .order_by("application_date")
)

This pattern works for matching documents to queries, linking related records across databases, or deduplication based on semantic similarity.

Building a Complete Mixed-Data Pipeline

Here's a practical example processing customer feedback that combines structured metadata with unstructured text analysis:

python
from pydantic import BaseModel, Field
from typing import List

class FeedbackAnalysis(BaseModel):
    sentiment: Literal["positive", "negative", "neutral"]
    topics: List[str] = Field(description="Main topics mentioned")
    feature_requests: List[str] = Field(description="Specific features requested")
    pain_points: List[str] = Field(description="Problems reported")
    urgency: Literal["low", "medium", "high"]

# Load structured customer data
customers = session.table("customers")

# Load unstructured feedback
feedback = session.read.csv("customer_feedback.csv")

# Unified pipeline mixing both
analysis = (
    feedback
    # Extract structure from text
    .with_column(
        "analysis",
        fc.semantic.extract("feedback_text", FeedbackAnalysis, model_alias="fast")
    )
    .unnest("analysis")

    # Join with structured customer data
    .join(customers, on="customer_id")

    # Filter using both structured and semantic conditions
    .filter(fc.col("customer_tier") == "enterprise")
    .filter(fc.col("sentiment") == "negative")
    .filter(fc.col("urgency") == "high")

    # Group by structured dimension
    .group_by("industry", "region")

    # Semantic aggregation
    .agg(
        fc.count("*").alias("feedback_count"),
        fc.semantic.reduce(
            "Summarize the main pain points across this feedback",
            fc.col("pain_points"),
            model_alias="fast"
        ).alias("consolidated_issues")
    )
)

analysis.write.parquet("output/feedback_analysis.parquet")

This single pipeline:

  • Loads structured customer data from tables
  • Processes unstructured feedback text
  • Extracts structured information using LLMs
  • Joins structured and extracted data
  • Filters using traditional and semantic predicates
  • Aggregates with semantic summarization
  • Writes results back to structured storage

The query engine automatically batches LLM calls, manages rate limits, caches repeated operations, and optimizes execution order.

Advanced Pattern: Document Processing with Multiple Types

Real applications often process multiple unstructured formats simultaneously:

python
from pydantic import BaseModel
from typing import List

class DocumentInsight(BaseModel):
    key_points: List[str]
    entities: List[str]
    topics: List[str]

# Process different document types in unified pipeline
documents = (
    session.read.csv("documents.csv")

    # Handle markdown documents
    .with_column(
        "content_md",
        fc.when(fc.col("type") == "markdown")
        .then(fc.col("raw_content").cast(fc.MarkdownType))
        .otherwise(None)
    )

    # Handle HTML documents
    .with_column(
        "content_html",
        fc.when(fc.col("type") == "html")
        .then(fc.col("raw_content").cast(fc.HtmlType))
        .otherwise(None)
    )

    # Extract text from appropriate type
    .with_column(
        "text",
        fc.coalesce(
            fc.markdown.extract_text(fc.col("content_md")),
            fc.html.extract_text(fc.col("content_html")),
            fc.col("raw_content")
        )
    )

    # Chunk for processing
    .with_column(
        "chunks",
        fc.text.recursive_token_chunk(fc.col("text"), chunk_size=1000)
    )
    .explode("chunks")

    # Extract insights from each chunk
    .with_column(
        "insights",
        fc.semantic.extract("chunks", DocumentInsight, model_alias="fast")
    )

    # Aggregate back by document
    .group_by("document_id")
    .agg(
        fc.collect_list("insights").alias("all_insights")
    )
)

Optimizing Mixed Workflows for Production

Production systems require reliability and cost management. Fenic provides infrastructure designed for production workloads.

Multi-Tier Model Configuration

Use appropriate models for different task complexities:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                "gpt-4o-nano",
                rpm=500,
                tpm=200_000
            ),
            "mini": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=300,
                tpm=150_000
            ),
            "full": fc.AnthropicLanguageModel(
                "claude-opus-4",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            ),
        },
        default_language_model="mini",
    ),
)

# Use appropriate model per operation
pipeline = (
    df
    # Simple classification with cheap model
    .with_column(
        "category",
        fc.semantic.classify(
            fc.col("text"),
            classes=["sales", "support", "billing"],
            model_alias="nano"
        )
    )

    # Structured extraction with balanced model
    .with_column(
        "details",
        fc.semantic.extract(fc.col("text"), DetailSchema, model_alias="mini")
    )

    # Complex reasoning with powerful model
    .with_column(
        "recommendation",
        fc.semantic.map(
            "Analyze this case and provide detailed recommendations: {{text}}",
            text=fc.col("text"),
            model_alias="full"
        )
    )
)

Strategic model selection can reduce costs by 80% while maintaining quality for appropriate tasks, as detailed in this resource.

Explicit Caching for Expensive Operations

Cache results at critical points:

python
# Cache after expensive extraction
enriched = (
    raw_data
    .with_column("extracted", fc.semantic.extract(...))
    .cache()
)

# Multiple downstream operations use cached results
analysis_1 = enriched.filter(condition1).collect()
analysis_2 = enriched.filter(condition2).collect()

The engine also caches identical inference calls automatically within sessions.

Incremental Development and Testing

Develop with small samples before scaling:

python
# Development phase
sample = df.limit(100)
result = sample.semantic.extract(...).collect()

# Check costs and validate
print(f"Cost for 100 rows: ${result.metrics.lm_metrics.total_cost}")
print(f"Token usage: {result.metrics.lm_metrics.total_tokens}")

# Scale after validation
full_result = df.semantic.extract(...).write.parquet("output/")

Hybrid Preprocessing Strategy

Use fuzzy matching for candidate selection before expensive semantic operations:

python
# Fast fuzzy filtering
candidates = (
    left_table
    .join(right_table)
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("name_left"),
            fc.col("name_right"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

# Expensive semantic matching on filtered candidates
final = (
    candidates
    .semantic.join(
        predicate="Are these the same entity? Left: {{left}}, Right: {{right}}",
        left_on=fc.col("description_left"),
        right_on=fc.col("description_right")
    )
)

This reduces costs by orders of magnitude compared to semantic joins on full cross-products.

Monitoring and Observability

Track pipeline performance and costs:

python
result = pipeline.collect()
metrics = result.metrics

# Overall execution
print(f"Execution time: {metrics.execution_time_ms}ms")
print(f"Total cost: ${metrics.total_lm_metrics.cost}")
print(f"Total tokens: {metrics.total_lm_metrics.num_total_tokens}")

# Per-operator breakdown
for op_metric in metrics.operator_metrics:
    if op_metric.cost > 5.0:
        print(f"Expensive operation: {op_metric.name}")
        print(f"  Cost: ${op_metric.cost}")
        print(f"  Time: {op_metric.execution_time_ms}ms")

Use these insights to shift operations to cheaper models, add caching, or restructure pipelines.

Integrating with Existing Infrastructure

Fenic works with existing data infrastructure without requiring data movement.

Lakehouse-Native Architecture

Read and write standard formats:

python
# Read from Parquet on S3
df = session.read.parquet("s3://lake/raw/*.parquet")

# Process with mixed operations
processed = (
    df
    .with_column("extracted", fc.semantic.extract(...))
    .filter(fc.col("score") > threshold)
)

# Write back to Parquet
processed.write.parquet("s3://lake/processed/output.parquet")

Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration.

Agent Preprocessing Pipelines

Prepare structured context for agents offline:

python
# Batch preprocessing
agent_context = (
    documents
    .with_column("summary", fc.semantic.map("Summarize: {{text}}", ...))
    .with_column("embedding", fc.semantic.embed("processed_content"))
    .with_column("metadata", fc.semantic.extract(...))
)

agent_context.write.parquet("agent_knowledge_base/")

# Agents query preprocessed data at runtime
# No expensive inference in user-facing paths

This separation enables predictable, responsive agents while leveraging Fenic's strength in batch processing.

Building MCP Tools from Unified Workflows

Expose unified workflows as Model Context Protocol tools for agent integration. The Hacker News research agent example demonstrates this pattern:

python
import fenic.api.functions as fc
from fenic.core.types.datatypes import StringType
from fenic.core.mcp.types import ToolParam

def register_search_tool(session):
    """Register unified search tool combining structured and semantic operations."""

    # Get tables
    items = session.table("items")
    metadata = session.table("metadata")

    # Tool parameter
    query = fc.tool_param("query", StringType)

    # Unified search combining structured filtering and semantic matching
    results = (
        items
        # Structured filter on metadata
        .join(metadata, on="item_id")
        .filter(fc.col("published_date") > "2024-01-01")
        .filter(fc.col("category").isin(["technical", "analysis"]))

        # Semantic matching on content
        .filter(
            fc.semantic.predicate(
                "Does this content relate to: {{search_query}}",
                search_query=query
            )
        )

        # Structured sorting and limiting
        .order_by(fc.col("relevance_score").desc())
        .limit(50)
    )

    # Register as MCP tool
    session.catalog.create_tool(
        tool_name="search_items",
        tool_description="Search items using semantic matching with structured filters",
        tool_query=results,
        tool_params=[
            ToolParam(
                name="query",
                description="Natural language search query"
            )
        ],
        result_limit=50,
    )

Create and run the MCP server:

python
from fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync

# Register all tools
register_search_tool(session)

# Get tools from catalog
tools = session.catalog.list_tools()

# Create MCP server
server = create_mcp_server(
    session=session,
    server_name="unified_search",
    user_defined_tools=tools
)

# Run server
run_mcp_server_sync(
    server=server,
    transport="http",
    port=8080
)

Agents can now call these tools to execute unified workflows that seamlessly combine structured and unstructured data processing.

Best Practices for Unified Workflows

Design clear Pydantic schemas - Field descriptions guide LLM extraction. Include constraints and examples:

python
class TransactionData(BaseModel):
    merchant: str = Field(description="Business name where purchase occurred")
    category: Literal["grocery", "dining", "transport"] = Field(
        description="Category based on merchant type"
    )
    amount: float = Field(description="Amount in USD", ge=0)

Separate batch and real-time operations - Use Fenic for heavy preprocessing in batch pipelines. Let agents query prepared data without expensive inference at request time.

Leverage row-level lineage - When issues arise, trace individual records through the pipeline to identify where transformations produced unexpected results.

Monitor token usage patterns - Review which operations consume tokens and adjust model selection or prompts accordingly.

Test with representative samples - Validate logic and estimate costs on small datasets before processing millions of rows.

When to Use Unified Workflows

Unified workflows excel when:

  • Processing documents with structured metadata (support tickets, contracts, research papers)
  • Building semantic search over structured databases
  • Enriching tables with insights from unstructured fields
  • Creating agent preprocessing pipelines
  • Analyzing conversations with metadata (chat logs, meeting transcripts)
  • Matching entities across structured and unstructured sources

The approach provides less value when:

  • Working purely with structured data (use standard DataFrames)
  • Processing unstructured data without any structured dimensions
  • Building real-time interactive systems (use batch preprocessing instead)

Conclusion

Unified workflows eliminate the fragmentation between structured tables and unstructured text. By treating both as first-class DataFrame operations, you build pipelines that are easier to develop, test, and maintain.

The key insight from Fenic's architecture is making inference visible to the query engine. When semantic operators like semantic.extract and semantic.join work alongside traditional operations, the optimizer can batch requests, cache results, and execute efficiently.

This approach scales from prototype to production with zero code changes. Develop locally with small datasets, validate costs and correctness, then deploy to cloud infrastructure with automatic scaling.

Stop building separate systems for structured and unstructured data. Build unified workflows that handle both within a single, optimizable framework.

Resources:

the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.