<< goback()

How to Tackle Unstructured Data Bottlenecks in Enterprise AI

Typedef Team

How to Tackle Unstructured Data Bottlenecks in Enterprise AI

Enterprise AI systems stall when forced to process unstructured data through infrastructure built for structured tables. 80% of organizational data exists in unstructured formats, yet most platforms handle LLM calls as external black boxes. This architectural mismatch prevents scale and keeps AI pilots from reaching production.

Identify the Core Bottleneck

Start by measuring where your pipeline fails. Most enterprises face three critical breakpoints:

Processing Volume: Unstructured data grows at 55-65% annually. Traditional query engines cannot batch LLM calls efficiently, causing linear processing time that scales poorly.

Infrastructure Cost: Poor data quality costs organizations $12.9 million annually. Without visibility into token usage and model selection, costs spiral as teams unknowingly use expensive models for simple tasks.

Integration Failures: 54% of organizations cite data movement without disruption as their top technical challenge. Custom scripts connecting OCR pipelines, transcription services, and LLM providers create brittle integration points.

Eliminate Fragile Glue Code

Replace Manual Orchestration

Traditional pipelines require custom scripts for every component. Each connection introduces failure modes, serialization latency, and manual rate limit management.

Before (manual approach):

python
import time
from openai import OpenAI
import pandas as pd

client = OpenAI()

def extract_sentiment(text):
    time.sleep(0.1)  # Manual rate limiting
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"Analyze sentiment: {text}"}]
        )
        return response.choices[0].message.content
    except Exception as e:
        return retry_with_backoff(extract_sentiment, text)

df = pd.DataFrame({"text": ["I love this product!", "This is terrible."]})
df["sentiment"] = df["text"].apply(extract_sentiment)

The query engine has zero visibility into the UDF. It cannot batch calls, cache results, or optimize operation order.

After (with Fenic):

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import Literal

class PolicyInsight(BaseModel):
    risk_level: Literal["low", "medium", "high", "critical"]
    coverage_gaps: list[str]
    recommendations: list[str]

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000)
        }
    )
)

session = fc.Session.get_or_create(config)

results = (
    df
    .select("*", fc.semantic.extract(fc.col("policy_text"), PolicyInsight).alias("policy_insight"))
    .filter(fc.semantic.predicate(
        "{{ policy_insight }} has non-empty coverage gaps",
        policy_insight=fc.col("policy_insight")
    ))
    .semantic.join(
        other=claims_df,
        predicate="The policy {{ left_on }} is related to claim {{ right_on }}",
        left_on=fc.col("policy_id"),
        right_on=fc.col("claim_policy_ref")
    )
)

results.show()

The query engine sees semantic operations as first-class primitives. It batches API calls automatically, caches repeated operations, reorders for efficiency, and self-throttles based on provider limits.

Configure Multi-Provider Management

Stop writing provider-specific retry logic. Configure models declaratively:

python
import fenic as fc

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000),
            "accurate": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000),
            "cheap": fc.GoogleVertexLanguageModel(model_name="gemini-2.0-flash", rpm=200, tpm=200000)
        },
        default_language_model="fast"
    )
)

session = fc.Session.get_or_create(config)
df = session.read.csv("feedback.csv")

results = df.select(
    "*",
    fc.semantic.extract(fc.col("text"), Summary, model_alias="accurate").alias("summary_data")
)

results.show()

Rate limits apply automatically. Retry logic is built-in. Model switching requires changing one parameter.

Implement Schema-Driven Extraction

Replace prompt engineering with type-safe schemas. Define output structure once, get validated results consistently.

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import Literal

class CustomerData(BaseModel):
    name: str
    age: int = Field(ge=0, le=150)
    status: Literal["active", "inactive", "pending"]

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000)
        }
    )
)

session = fc.Session.get_or_create(config)

df = session.create_dataframe({
    "text": [
        "Alice is 30 years old and active.",
        "Bob is 200 years old and inactive.",
        "Charlie is 40 and pending approval."
    ]
})

df_processed = df.select(
    "*",
    fc.semantic.extract(fc.col("text"), CustomerData).alias("customer_data")
)

df_processed.show()

Pydantic validation catches extraction errors. Field constraints ensure data quality. The schema documents expected structure without separate documentation.

Use Specialized Data Types

Stop writing preprocessing code for each format. Native support for unstructured types eliminates manual parsing:

Process Markdown Documents

python
import fenic as fc

df = (
    df
    .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2)
    )
    .with_column("title", fc.json.jq(fc.col("raw_blog"), ".title"))
    .explode("chunks")
    .with_column(
        "embeddings",
        fc.semantic.embed(fc.col("chunks").get_item("content"))
    )
)

MarkdownType preserves document structure. Header-based chunking respects semantic boundaries instead of naive character splitting.

Handle Transcripts with Speaker Awareness

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import Optional

class MeetingActionItems(BaseModel):
    description: str
    owner: str
    due_date: Optional[str] = None

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000)
        }
    )
)

session = fc.Session.get_or_create(config)

df = session.create_dataframe({
    "file": ["meeting1.srt", "meeting2.vtt"]
})

meetings = (
    df
    .with_column("transcript", fc.col("file").cast(fc.TranscriptType))
    .select("*", fc.semantic.extract(fc.col("transcript"), MeetingActionItems).alias("action_items"))
    .filter(fc.col("action_items").get_item("owner") == "Engineering")
)

result = meetings.collect()

TranscriptType handles SRT and WebVTT formats. Speaker identity persists through transformations without manual parsing.

Manipulate Nested JSON

python
.with_column("author", fc.json.jq(fc.col("metadata"), ".author.name"))
.with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]"))

JQ expressions handle nested structures without verbose dictionary navigation code.

Build Production Pipelines

Enable Automatic Optimization

The query engine optimizes entire pipelines before execution. Lazy evaluation allows operation reordering, filter pushdown, and intelligent batching.

python
import fenic as fc
from pydantic import BaseModel

class TicketSchema(BaseModel):
    customer_id: str
    issue: str
    sentiment: str

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000)
        }
    )
)

session = fc.Session.get_or_create(config)

df = session.create_dataframe({
    "priority": ["high", "low", "high"],
    "content": [
        "The app keeps crashing, I'm really annoyed!",
        "General feedback, nothing urgent.",
        "Payment failed again, this is frustrating!"
    ]
})

knowledge_base = session.create_dataframe({
    "solution_id": [1, 2],
    "solution_text": ["Restart the app", "Check payment settings"]
})

pipeline = (
    df
    .filter(fc.col("priority") == "high")
    .select("*", fc.semantic.extract(fc.col("content"), TicketSchema).alias("ticket_info"))
    .filter(fc.semantic.predicate(
        "The sentiment {{ sentiment }} is frustrated",
        sentiment=fc.col("ticket_info").get_item("sentiment")
    ))
    .semantic.join(
        other=knowledge_base,
        predicate="The issue {{ left_on }} can be resolved by {{ right_on }}",
        left_on=fc.col("ticket_info").get_item("issue"),
        right_on=fc.col("solution_text")
    )
)

result = pipeline.collect()

The pipeline filters before extraction, reducing expensive LLM calls. Semantic joins only process filtered rows.

Track Lineage and Costs

Every operation is traceable. Row-level lineage tracks individual processing history through non-deterministic transformations.

python
result = df.select(
    fc.semantic.map(
        "Analyze sentiment: {{ text }}",
        text=fc.col("text")
    )
).collect()

print(result.metrics.total_lm_metrics.num_output_tokens)
print(result.metrics.total_lm_metrics.cost)
print(result.metrics.execution_time_ms)

Metrics show token counts, costs, and execution time per operator. Use this data to identify expensive operations.

Apply RudderStack's Pattern

RudderStack cut triage time by 95% using this approach:

Step 1: Ingest and Normalize

Load support tickets, sales transcripts, product docs, and Notion PRDs into the warehouse.

Step 2: Build Semantic Context

python
# Infer product taxonomy from documentation
taxonomy = (
    docs_df
    .select("*", fc.semantic.extract(fc.col("content"), TaxonomySchema).alias("taxonomy_data"))
    .select("*", fc.semantic.classify(fc.col("category"), predefined_categories).alias("category"))
)

# Map tickets to taxonomy with citations
mapped_tickets = (
    tickets_df
    .semantic.join(
        other=taxonomy,
        predicate="The ticket {{ left_on }} relates to product area {{ right_on }}",
        left_on=fc.col("description"),
        right_on=fc.col("category_description")
    )
)

Step 3: Create Agent Tools

Expose retrieval operations via MCP tools for real-time agent access.

Step 4: Measure Results

Store mappings and rationales in the warehouse. Track acceptance rates and time savings.

Results:

  • 95% reduction in PM time per triage
  • 90% first-pass category acceptance
  • Citations surface prospect signals directly in Linear
  • Bottleneck shifted from inference to input breadth

"I wake up every morning and get a list of five things that are worth my time to look at; that's a game-changer." — David Daly, Product, RudderStack

Optimize Costs

Select Models Strategically

Use cheaper models for simple tasks. Reserve expensive models for reasoning.

python
semantic=fc.SemanticConfig(
    language_models={
        "nano": fc.OpenAILanguageModel(model_name="gpt-4o-nano", rpm=100, tpm=100000),
        "mini": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000),
        "full": fc.AnthropicLanguageModel(model_name="claude-opus-4", rpm=50, input_tpm=100000, output_tpm=50000)
    }
)

# Simple classification uses nano
.select("*", fc.semantic.classify(fc.col("col"), classes, model_alias="nano").alias("category"))

# Structured extraction uses mini
.select("*", fc.semantic.extract(fc.col("col"), schema, model_alias="mini").alias("extracted"))

# Multi-step reasoning uses full
.select("*", fc.semantic.map(instruction, model_alias="full", **columns).alias("result"))

Cost difference between models is 10-100x. Strategic selection cuts costs by 80% while maintaining quality.

Cache Expensive Operations

python
df_cached = df.filter(...).semantic.extract(...).cache()

result1 = df_cached.filter(condition1).collect()
result2 = df_cached.filter(condition2).collect()

Subsequent operations reuse cached results. The engine also caches identical inference calls automatically.

Combine Fuzzy and Semantic Matching

Use fuzzy string matching for initial filtering. Apply semantic operations only to candidates:

python
candidates = (
    left_df.join(right_df, how="cross")
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("company_name"),
            fc.col("business_name"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

final = candidates.semantic.join(
    other=right_df,
    predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}",
    left_on=fc.col("company_description"),
    right_on=fc.col("business_description")
)

This reduces semantic join costs by orders of magnitude versus full cross-products.

Separate Batch and Real-Time Processing

Preprocess data offline. Agents query enriched results without inference at request time.

python
# Batch preprocessing (runs offline)
agent_context = (
    documents
    .with_column("extracted", fc.semantic.extract(fc.col("content_col"), StructuredMetadata))
    .with_column("embedding", fc.semantic.embed(fc.col("processed_content")))
    .with_column(
        "summary",
        fc.semantic.map(
            "Summarize in 100 words: {{content}}",
            content=fc.col("content"),
            model_alias="mini"
        )
    )
)

agent_context.write.parquet("agent_knowledge_base/")

Benefits:

  • Agents respond predictably without LLM latency
  • Batch processing amortizes fixed costs
  • Planning decouples from execution
  • Preprocessing validates offline before production

Scale to Production

Deploy Without Code Changes

Develop locally, deploy to cloud with identical code:

python
# Local development
session = fc.Session.get_or_create()
df = session.read.csv("local_data.csv")
processed = df.select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted"))
processed.write.parquet("results.parquet")
python
# Production (same code, different config)
config = fc.SessionConfig(
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.MEDIUM
    )
)

session = fc.Session.get_or_create(config)

df = session.read.csv("s3://bucket/data/*.csv")
processed = df.select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted"))
processed.write.parquet("s3://bucket/results/output.parquet")

Automatic scaling handles increased load. No infrastructure management required.

Integrate with Existing Lakehouses

Read from and write to standard formats without data movement:

python
session = fc.Session.get_or_create()
df = session.read.parquet("s3://data-lake/raw/*.parquet")
processed = df.select("*", fc.semantic.extract(fc.col("col"), Schema).alias("extracted")).filter(fc.col("extracted").isnotnull())
processed.write.parquet("s3://data-lake/processed/")

Works with Parquet, Iceberg, Delta Lake, and Lance. Built on Apache Arrow for compatibility with Spark, Polars, DuckDB, and pandas.

Implementation Checklist

Phase 1: Assess Current State

  1. Measure processing time per record
  2. Count distinct LLM API calls in your pipeline
  3. Calculate total monthly inference costs
  4. Identify manual rate limiting code
  5. List preprocessing steps for each data format

Phase 2: Eliminate Glue Code

  1. Replace UDF-based LLM calls with semantic operators
  2. Configure multi-provider model management
  3. Define Pydantic schemas for extraction targets
  4. Remove manual batching and retry logic
  5. Test with small dataset to validate results

Phase 3: Optimize Performance

  1. Add explicit caching to expensive operations
  2. Use fuzzy matching before semantic joins
  3. Select appropriate model sizes per task
  4. Enable query optimization with lazy evaluation
  5. Track metrics to identify bottlenecks

Phase 4: Scale Production

  1. Separate batch preprocessing from real-time agents
  2. Write enriched data to lakehouse
  3. Deploy to cloud with zero code changes
  4. Monitor token usage and costs
  5. Iterate based on production metrics

Monitor and Iterate

Track these metrics weekly:

Cost Metrics:

  • Total token usage by model
  • Cost per processed record
  • Cache hit rate for identical operations

Performance Metrics:

  • Records processed per hour
  • Average latency per operation
  • Ratio of fuzzy to semantic matches

Quality Metrics:

  • Schema validation pass rate
  • Manual correction frequency
  • Downstream system error rate

Use metric trends to refine model selection, adjust caching strategy, and optimize operation order.

Next Steps

Install Fenic:

bash
pip install fenic

Review these resources:

Start with a single document processing pipeline. Validate extraction quality with 100 records. Measure cost and performance before scaling to full production. How to Tackle Unstructured ... fcf080a8ad25d83386e5b708.md External Displaying How to Tackle Unstructured Data Bottlenecks in Ent 290df41efcf080a8ad25d83386e5b708.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.