<< goback()

How to Manage Multimodal Data Types Like Markdown and Embeddings Efficiently

Typedef Team

How to Manage Multimodal Data Types Like Markdown and Embeddings Efficiently

Production AI systems process PDFs, transcripts, markdown documents, and embeddings. Traditional data platforms treat these as generic text or binary blobs, forcing developers to build custom preprocessing with external services.

The multimodal AI market reached $2.36 billion in 2024 and projects to $93.99 billion by 2035. Most data infrastructure wasn't built for this scale.

Fenic from Typedef provides native multimodal data types with specialized operations. Instead of managing embeddings through external vector databases or treating markdown as generic strings, these types integrate directly into DataFrame operations.

Native Multimodal Data Types in Fenic

Fenic supports five specialized data types for AI workloads:

  • MarkdownType - Parse and extract structure from markdown documents
  • TranscriptType - Process SRT, WebVTT formats with speaker and timestamp awareness
  • JsonType - Manipulate nested JSON using JQ expressions
  • HtmlType - Handle raw HTML markup
  • EmbeddingType - Fixed-length vector operations with similarity functions

These aren't metadata tags. They unlock type-specific operations within the DataFrame API.

Traditional vs Native Approach

Traditional preprocessing scatters operations across services:

python
# External services and manual orchestration
def process_document(pdf):
    text = ocr_service.extract(pdf)
    chunks = custom_chunk_function(text, max_tokens=1000)
    embeddings = []
    for chunk in chunks:
        time.sleep(0.5)  # Manual rate limiting
        embedding = embedding_api.embed(chunk)
        embeddings.append(embedding)
    return chunks, embeddings

Fenic's declarative approach:

python
import fenic as fc

df = (
    df
    .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks("raw_blog", header_level=2)
    )
    .explode("chunks")
    .with_column(
        "embeddings",
        fc.semantic.embed(fc.col("chunks").content)
    )
)

The query engine handles batching, rate limiting, and error handling. It optimizes across the entire pipeline.

Working with MarkdownType

Markdown contains semantic structure through headers, lists, and formatting. Generic text splitting destroys this structure.

Parse PDFs to Markdown

Fenic 0.5.0 added PDF-to-markdown parsing:

python
import fenic as fc

session = fc.Session.get_or_create(
    fc.SessionConfig(
        app_name="pdf_processor",
        semantic=fc.SemanticConfig(
            language_models={
                "gemini": fc.GoogleDeveloperLanguageModel(
                    model_name="gemini-2.0-flash",
                    rpm=100,
                    tpm=1000,
                )
            },
            default_language_model="gemini",
        ),
    )
)

# Read PDF metadata
pdfs = session.read.pdf_metadata("data/docs/**/*.pdf", recursive=True)

# Parse to markdown with page separators
markdown = pdfs.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True,
    ).alias("markdown"),
)

Key features:

  • Returns MarkdownType column for downstream operations
  • Page separators preserve context boundaries
  • Enables parallel processing of individual pages
  • Image descriptions for multimodal content

Structure-Aware Chunking

Header-based chunking respects document organization:

python
processed = (
    df
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks("doc", header_level=2)
    )
    .explode("sections")
)

Benefits:

  • Chunks align with content sections, not arbitrary character counts
  • Maintains semantic coherence for RAG systems
  • Preserves context relationships

Two-Stage Chunking Strategy

For long documents, combine markdown chunking with token-aware splitting:

python
chunked = (
    df
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks("doc", header_level=2)
    )
    .explode("sections")
    .with_column(
        "subsections",
        fc.text.recursive_token_chunk(
            "sections",
            chunk_size=1200,
            chunk_overlap_percentage=0
        )
    )
    .explode("subsections")
)

This maintains structural boundaries while ensuring chunks fit model context windows.

Embeddings as First-Class Data Types

EmbeddingType treats vectors as native DataFrame columns. No external vector database required for basic operations.

Generate Embeddings at Scale

Automatic batching with rate limiting:

python
import fenic as fc

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "text": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            )
        },
        default_language_model="text"
    )
)

session = fc.Session.get_or_create(config)

embeddings_df = (
    df
    .with_column("content_embedding", fc.semantic.embed(fc.col("content")))
    .with_column("title_embedding", fc.semantic.embed(fc.col("title")))
)

The engine:

  • Batches embedding requests across rows
  • Respects provider rate limits via rpm and tpm configuration
  • Handles retries automatically
  • Eliminates manual throttling logic

Semantic Clustering

Group related content by similarity:

python
clustered = (
    df
    .with_column("embedding", fc.semantic.embed(fc.col("text")))
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=5,
        label_column="cluster_label"
    )
    .group_by("cluster_label")
    .agg(
        fc.semantic.reduce(
            "Summarize the common themes in this cluster",
            fc.col("text")
        ).alias("cluster_summary")
    )
)

Pattern enables content organization without predefined categories.

Similarity-Based Joins

Join DataFrames using embedding similarity:

python
# Generate embeddings
questions_with_embeddings = questions.with_column(
    "q_embedding",
    fc.semantic.embed(fc.col("question_text"))
)

docs_with_embeddings = docs.with_column(
    "d_embedding",
    fc.semantic.embed(fc.col("doc_content"))
)

# Semantic join
matched = questions_with_embeddings.semantic.join(
    other=docs_with_embeddings,
    predicate="""
    Does this document answer the question?
    Question: {{left_on}}
    Document: {{right_on}}
    """,
    left_on=fc.col("question_text"),
    right_on=fc.col("doc_content")
)

The query engine uses embeddings for initial filtering before applying expensive LLM predicates.

TranscriptType for Audio and Video

TranscriptType preserves temporal structure and speaker information through transformations.

Load and Process Transcripts

python
from pathlib import Path
from pydantic import BaseModel, Field

class SegmentSchema(BaseModel):
    speaker: str = Field(description="Who is talking in this segment")
    start_time: float = Field(description="Start time (seconds)")
    end_time: float = Field(description="End time (seconds)")
    key_points: list[str] = Field(description="Main points discussed")

transcript_text = Path("data/transcript.json").read_text()
session = fc.Session.get_or_create(fc.SessionConfig(app_name="transcript_processor"))
df = session.create_dataframe({"transcript": [transcript_text]})

processed = (
    df
    .with_column("transcript", fc.col("transcript").cast(fc.TranscriptType))
    .select(
        "*",
        fc.text.recursive_token_chunk(
            "transcript",
            chunk_size=1200,
            chunk_overlap_percentage=0
        ).alias("chunks"),
    )
    .explode("chunks")
    .select(
        fc.col("chunks").alias("chunk"),
        fc.semantic.extract(
            "chunk",
            SegmentSchema,
            model_alias="mini"
        ).alias("segment"),
    )
)

Supported formats:

  • SRT (SubRip Subtitle)
  • WebVTT
  • Generic transcript formats

Speaker-Specific Analysis

Aggregate by speaker using preserved metadata:

python
speaker_summaries = (
    processed
    .group_by(fc.col("segment.speaker"))
    .agg(
        fc.semantic.reduce(
            "Summarize this speaker's contributions",
            fc.col("chunk")
        ).alias("speaker_summary")
    )
)

TranscriptType maintains speaker identity and timestamps throughout the pipeline.

Nested JSON with JQ Operations

JsonType supports JQ expressions for nested data manipulation:

python
# Extract nested fields
processed = (df
    .with_column("author_name", fc.json.jq(fc.col("metadata"), ".author.name"))
    .with_column("tags", fc.json.jq(fc.col("metadata"), ".tags[]"))
    .with_column("nested_value", fc.json.jq(fc.col("config"), ".settings.advanced.timeout"))
)

Advantages:

  • Eliminates verbose dictionary navigation
  • Handles missing keys gracefully
  • Supports array operations and complex queries

Combine JQ with Semantic Extraction

python
from pydantic import BaseModel
from typing import List

class ExtractedMetadata(BaseModel):
    categories: List[str]
    priority: str
    assigned_to: str

enriched = (
    df
    .with_column("raw_json", fc.col("data").cast(fc.JsonType))
    .with_column("basic_field", fc.json.jq("raw_json", ".title"))
    .with_column(
        "structured",
        fc.semantic.extract("raw_json", ExtractedMetadata)
    )
)

Complete Document Processing Pipeline

Production pipelines combine multiple data types with semantic operations:

python
from pydantic import BaseModel, Field
from typing import List

class DocumentMetadata(BaseModel):
    title: str
    main_topics: List[str]
    key_entities: List[str]
    summary: str = Field(description="Two-sentence summary")

config = fc.SessionConfig(
    app_name="doc_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "cheap": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "accurate": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            )
        },
        default_language_model="cheap"
    )
)

session = fc.Session.get_or_create(config)

pipeline = (
    session.read.pdf_metadata("data/documents/**/*.pdf", recursive=True)
    .filter(fc.col("page_count") < 100)
    .select(
        "*",
        fc.semantic.parse_pdf(
            fc.col("file_path"),
            page_separator="--- PAGE {page} ---"
        ).alias("markdown")
    )
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks("markdown", header_level=2)
    )
    .explode("sections")
    .with_column(
        "metadata",
        fc.semantic.extract(
            "sections",
            DocumentMetadata,
            model_alias="cheap"
        )
    )
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("sections").content)
    )
    .with_column(
        "quality_score",
        fc.semantic.map(
            "Rate content quality 1-10: {{text}}",
            text=fc.col("sections"),
            model_alias="cheap"
        )
    )
    .filter(fc.col("quality_score").cast(fc.IntegerType) >= 7)
)

metrics = pipeline.write.parquet("s3://bucket/enriched/documents.parquet")

Efficiency patterns:

  • Early filtering reduces data volume before expensive operations
  • Model tiering uses cheap models for classification and embeddings
  • Lazy evaluation enables query optimization
  • Native types eliminate external preprocessing

Batch Preprocessing for Agent Systems

Separate heavy inference from real-time agent logic:

python
# Offline batch preprocessing
enriched_knowledge = (
    raw_documents
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks("doc", header_level=2)
    )
    .explode("chunks")
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("chunks").content)
    )
    .with_column(
        "metadata",
        fc.semantic.extract(
            "chunks",
            DocumentMetadata,
            model_alias="cheap"
        )
    )
    .with_column(
        "summary",
        fc.semantic.map(
            "Summarize in 100 words: {{content}}",
            content=fc.col("chunks"),
            model_alias="cheap"
        )
    )
)

metrics = enriched_knowledge.write.parquet("agent_knowledge/")

This architecture provides:

  • Predictable latency - no LLM inference at request time
  • Better resource utilization - batch processing amortizes costs
  • Cleaner separation - planning decoupled from execution
  • Easier debugging - preprocessing validated offline

Performance Optimization

Intelligent Caching

Cache expensive operations explicitly:

python
cached = (
    df
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks("doc", header_level=2)
    )
    .explode("chunks")
    .cache()
)

# Multiple operations reuse cached data
embeddings = cached.with_column(
    "embedding",
    fc.semantic.embed(fc.col("chunks"))
)

classifications = cached.with_column(
    "category",
    fc.semantic.classify(
        fc.col("chunks"),
        classes=["technical", "business", "legal"]
    )
)

The engine caches identical operations automatically within a session.

Model Selection Strategy

Use appropriate model sizes for each task:

python
semantic=fc.SemanticConfig(
    language_models={
        "nano": fc.OpenAILanguageModel(
            model_name="gpt-4o-nano",
            rpm=500,
            tpm=200_000
        ),
        "mini": fc.OpenAILanguageModel(
            model_name="gpt-4o-mini",
            rpm=300,
            tpm=150_000
        ),
        "full": fc.AnthropicLanguageModel(
            model_name="claude-opus-4-0",
            rpm=100,
            input_tpm=100_000,
            output_tpm=50_000
        )
    }
)

pipeline = (
    df
    .with_column(
        "category",
        fc.semantic.classify(
            fc.col("text"),
            classes=["bug", "feature", "question"],
            model_alias="nano"
        )
    )
    .with_column(
        "metadata",
        fc.semantic.extract(
            fc.col("text"),
            Schema,
            model_alias="mini"
        )
    )
    .with_column(
        "analysis",
        fc.semantic.map(
            "Provide detailed technical analysis: {{text}}",
            text=fc.col("text"),
            model_alias="full"
        )
    )
)

Cost difference between models: 10-100x. Strategic selection reduces expenses by 80%.

Hybrid Fuzzy and Semantic Matching

Use fast fuzzy matching before expensive semantic operations:

python
# Fast fuzzy matching for candidates
candidates = (
    left_df.join(right_df)
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("company_name"),
            fc.col("business_name"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

# Semantic matching on filtered candidates
final = candidates.semantic.join(
    predicate="Are these the same company? Left: {{left}}, Right: {{right}}",
    left_on=fc.col("company_description"),
    right_on=fc.col("business_description")
)

This approach reduces costs by orders of magnitude.

Production Implementation

Schema-Driven Extraction

Define Pydantic schemas with clear field descriptions:

python
from pydantic import BaseModel, Field
from typing import Literal, List

class Ticket(BaseModel):
    customer_tier: Literal["free", "pro", "enterprise"] = Field(
        description="Customer subscription level"
    )
    region: Literal["us", "eu", "apac"] = Field(
        description="Geographic region"
    )
    severity: Literal["low", "medium", "high", "critical"] = Field(
        description="Issue severity based on business impact"
    )
    issues: List[str] = Field(
        description="List of specific issues mentioned"
    )

tickets = df.with_column(
    "extracted",
    fc.semantic.extract("raw_ticket", Ticket)
)

Schema benefits:

  • Clear descriptions guide LLM extraction
  • Literal types constrain outputs to valid categories
  • Reduces hallucinations
  • Type-safe results

Incremental Development

Test with small samples before scaling:

python
# Development with 100 rows
df_sample = df.limit(100)
result = df_sample.semantic.extract(...).collect()

# Check cost and validate
print(f"Cost for 100 rows: ${result.metrics.total_lm_metrics.total_cost}")
print(f"Execution time: {result.metrics.execution_time_ms}ms")

df_sample.semantic.extract(...).show()

# Scale after validation
metrics = df_full.semantic.extract(...).write.parquet("output/")

Monitor Pipeline Metrics

Track operations for optimization:

python
result = pipeline.collect()
metrics = result.metrics

print(f"Total tokens: {metrics.total_lm_metrics.total_tokens}")
print(f"Total cost: ${metrics.total_lm_metrics.total_cost}")

# Identify expensive operations
# Note: The operator_metrics attribute structure needs to be checked in docs
# Assuming basic metrics access for now
print(f"Execution time: {metrics.execution_time_ms}ms")
print(f"Output rows: {metrics.num_output_rows}")

Use insights to optimize model selection and caching.

Local Development to Cloud Scale

Same code runs locally and in cloud:

python
# Local development
session = fc.Session.get_or_create(
    fc.SessionConfig(app_name="local_dev")
)

df = session.read.csv("local_data.csv")
processed = df.semantic.extract(...).filter(...)
metrics = processed.write.parquet("results.parquet")

Cloud deployment adds configuration:

python
# Production
config = fc.SessionConfig(
    app_name="production",
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.MEDIUM
    )
)

session = fc.Session.get_or_create(config)

df = session.read.csv("s3://bucket/data/*.csv")
processed = df.semantic.extract(...).filter(...)
processed.write.parquet("s3://bucket/results/")

Zero code changes from prototype to production.

Infrastructure Integration

Lakehouse-Native Architecture

Works with existing data infrastructure:

python
# Read from Parquet (or other supported format)
df = session.read.parquet("s3://lake/raw_data/*.parquet")

# Process with semantic operators
processed = (
    df
    .with_column("doc", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "metadata",
        fc.semantic.extract("doc", Schema)
    )
    .with_column("embedding", fc.semantic.embed("doc"))
)

# Write to Parquet
metrics = processed.write.parquet("s3://lake/processed/data.parquet")

Compatible with:

  • Parquet
  • Iceberg
  • Delta Lake
  • Lance

Built on Apache Arrow for interoperability with Spark, Polars, DuckDB, and pandas.

Hybrid Pipeline Pattern

Mix Fenic with existing ETL:

python
# Spark preprocessing
spark_df.write.parquet("s3://interim/")

# Fenic semantic enrichment
enriched = (
    session.read.parquet("s3://interim/*.parquet")
    .with_column("doc", fc.col("text").cast(fc.MarkdownType))
    .with_column("extracted", fc.semantic.extract("doc", Schema))
    .with_column("category", fc.semantic.classify("doc", classes=["cat1", "cat2"]))
)

metrics = enriched.write.parquet("s3://final/data.parquet")

# Continue with Spark/dbt

Use the right tool for each step.

Implementation Summary

Efficient multimodal data management requires infrastructure built for AI workloads:

Native type support eliminates preprocessing. MarkdownType, TranscriptType, and EmbeddingType provide specialized operations respecting data structure.

Declarative operations enable optimization. The query engine batches requests, caches results, and reorders operations across pipelines.

Type-safe schemas reduce brittleness. Pydantic models define structure once, eliminating prompt engineering fragility.

Strategic model selection controls costs. Use cheap models for classification, standard for extraction, powerful for reasoning only.

Separate batch and real-time workloads. Preprocess offline with Fenic, serve to agents for predictable latency.

Traditional approaches using external services create maintenance overhead. Infrastructure treating inference as first-class operation makes production AI systems tractable.

Start with simple operations on small datasets. Validate costs and results. Scale with confidence.

Resources:

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.