<< goback()

How to Supercharge Unstructured.io Ingestion with Typed Schemas and Vector Joins on Typedef

Typedef Team

How to Supercharge Unstructured.io Ingestion with Typed Schemas and Vector Joins on Typedef

Document processing pipelines require extracting structured data from PDFs, Word files, HTML, and other formats. Most teams waste months building brittle glue code to parse documents, validate outputs, and link related information across files.

Typedef provides an inference-first data engine where semantic operations work as native DataFrame primitives. This guide shows how to build production document pipelines using typed schemas, vector embeddings, and semantic joins.

Why Traditional Document Pipelines Fail

Current document processing approaches create technical debt:

  • Custom parsing scripts for each file type
  • Manual validation of extracted data
  • Scattered LLM API calls across microservices
  • No semantic relationship detection
  • Rate limiting failures at scale

Typedef's architecture treats inference as a first-class operation within the query engine, enabling automatic optimization, batching, and cost management.

Installing Fenic for Document Processing

Fenic requires Python 3.10, 3.11, or 3.12:

bash
pip install fenic

Configure API credentials:

bash
export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"
export GOOGLE_API_KEY="your-key"

Initialize a session:

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import List, Literal

config = fc.SessionConfig(
    app_name="document_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "accurate": fc.AnthropicLanguageModel(
                "claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            ),
        },
        default_language_model="fast",
    ),
)

session = fc.Session.get_or_create(config)

Native Document Types in Fenic

Fenic provides specialized data types optimized for document processing:

  • MarkdownType - Parse and extract structure from markdown files
  • TranscriptType - Process SRT, WebVTT formats with speaker awareness
  • JsonType - Manipulate nested JSON with JQ expressions
  • HtmlType - Handle raw HTML markup
  • DocumentPathType - Load PDFs and text files

These types enable native operations without preprocessing steps.

Parsing PDFs with Typed Output

Fenic 0.5.0 introduced native PDF parsing with page chunking:

python
# Load PDF metadata
pdfs = session.read.pdf_metadata("contracts/**/*.pdf", recursive=True)

# Parse to markdown with page separators
parsed = pdfs.select(
    fc.col("file_path"),
    fc.col("page_count"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True,
    ).alias("markdown_content")
)

parsed.show()

The read.pdf_metadata function extracts size, page count, author, creation dates, and other metadata before parsing. This enables filtering documents before expensive parsing operations.

Schema-Driven Extraction from Documents

Define Pydantic schemas for type-safe extraction:

python
class ContractClause(BaseModel):
    clause_type: Literal["payment", "termination", "liability", "confidentiality"]
    parties: List[str] = Field(description="Legal entities bound by this clause")
    obligations: List[str] = Field(description="Specific requirements or duties")
    effective_date: str = Field(description="Date format YYYY-MM-DD")
    amount: float | None = Field(description="Dollar amounts if mentioned")

class InvoiceData(BaseModel):
    invoice_number: str
    vendor_name: str
    total_amount: float = Field(ge=0)
    line_items: List[str]
    due_date: str

Apply extraction to parsed documents:

python
# Extract structured data from markdown
extracted = (
    parsed
    .select(
        "*",
        fc.semantic.extract(
            fc.col("markdown_content"),
            ContractClause,
            model_alias="accurate"
        ).alias("clause_data")
    )
    .unnest("clause_data")
)

# Filter high-value clauses
payment_terms = extracted.filter(
    fc.col("clause_type").isin(["payment", "liability"])
)

payment_terms.show()

The semantic.extract operator validates outputs against the schema automatically, preventing invalid data from entering downstream systems.

Processing Multiple Document Types

Load entire directories into DataFrames:

python
from fenic.core.types import MarkdownType, JsonType

# Load markdown documents
md_docs = session.read.docs(
    "/data/documentation/",
    content_type="markdown",
    recursive=True
)

# Extract structured sections
sections = (
    md_docs
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(
            fc.col("content"),
            header_level=2
        )
    )
    .explode("chunks")
)

sections.show()

The extract_header_chunks function leverages document structure for semantically meaningful chunks instead of naive character splitting.

Vector Embeddings for Semantic Search

Generate embeddings as a native column operation:

python
# Create embeddings for document chunks
embedded = (
    sections
    .with_column(
        "chunk_embedding",
        fc.semantic.embed(fc.col("chunks").content)
    )
)

# Cluster similar content
clustered = (
    embedded
    .semantic.with_cluster_labels(
        by=fc.col("chunk_embedding"),
        num_clusters=8,
        label_column="cluster_id"
    )
)

# Summarize each cluster
cluster_themes = (
    clustered
    .group_by("cluster_id")
    .agg(
        fc.semantic.reduce(
            "Identify the common theme across these sections",
            fc.col("chunks").content,
            model_alias="fast"
        ).alias("theme")
    )
)

cluster_themes.show()

EmbeddingType is a first-class data type enabling vector operations without external database dependencies during processing.

Semantic Joins Across Documents

Link related content across documents using meaning-based matching:

python
# Load two document sets
contracts_df = session.read.pdf_metadata("contracts/**/*.pdf")
policies_df = session.read.pdf_metadata("policies/**/*.pdf")

# Parse both
contracts_parsed = contracts_df.select(
    "*",
    fc.semantic.parse_pdf(fc.col("file_path")).alias("contract_text")
)

policies_parsed = policies_df.select(
    "*",
    fc.semantic.parse_pdf(fc.col("file_path")).alias("policy_text")
)

# Define semantic join predicate
join_predicate = """
Does the contract clause address this policy requirement?

Contract: {{left_on}}
Policy: {{right_on}}

Consider:
- Does the clause satisfy the policy intent?
- Are required controls present?
- Is language sufficiently specific?
"""

# Perform semantic join
matched = (
    contracts_parsed
    .semantic.join(
        other=policies_parsed,
        predicate=join_predicate,
        left_on=fc.col("contract_text"),
        right_on=fc.col("policy_text"),
        model_alias="accurate"
    )
)

matched.select(
    "file_path",
    "policy_text",
    "contract_text"
).show()

Semantic joins enable sophisticated matching logic beyond simple text similarity.

Complete Document Processing Pipeline

Combine operations into production workflows:

python
class DocumentMetadata(BaseModel):
    doc_type: Literal["contract", "invoice", "report", "memo"]
    primary_entities: List[str]
    key_dates: List[str]
    financial_terms: List[str]
    action_items: List[str]

class ContractTerm(BaseModel):
    term_type: Literal["payment", "delivery", "warranty", "indemnity"]
    description: str
    parties_affected: List[str]
    start_date: str
    end_date: str

def process_document_batch(input_path: str, output_path: str):
    """
    End-to-end document processing pipeline
    """

    # Load PDF metadata
    docs = session.read.pdf_metadata(input_path, recursive=True)

    # Parse PDFs to markdown
    parsed = docs.select(
        "*",
        fc.semantic.parse_pdf(
            fc.col("file_path"),
            page_separator="--- PAGE {page} ---"
        ).alias("content")
    )

    # Extract document-level metadata
    with_metadata = (
        parsed
        .select(
            "*",
            fc.semantic.extract(
                fc.col("content"),
                DocumentMetadata,
                model_alias="fast"
            ).alias("metadata")
        )
        .unnest("metadata")
    )

    # Extract contract terms
    with_terms = (
        with_metadata
        .filter(fc.col("doc_type") == "contract")
        .select(
            "*",
            fc.semantic.extract(
                fc.col("content"),
                ContractTerm,
                model_alias="accurate"
            ).alias("terms")
        )
        .unnest("terms")
    )

    # Generate embeddings
    with_embeddings = (
        with_terms
        .with_column(
            "term_embedding",
            fc.semantic.embed(fc.col("description"))
        )
    )

    # Classify priority
    classified = (
        with_embeddings
        .with_column(
            "priority",
            fc.semantic.classify(
                fc.col("description"),
                classes=["critical", "high", "medium", "low"],
                model_alias="fast"
            )
        )
    )

    # Write results and capture metrics
write_metrics = classified.write.parquet(output_path)

return {
    "documents_processed": docs.count(),
    "terms_extracted": classified.count(),
    "total_cost_usd": write_metrics.total_lm_metrics.cost,
    "execution_time_sec": write_metrics.execution_time_ms / 1000
}

# Execute pipeline
stats = process_document_batch(
    input_path="data/contracts/**/*.pdf",
    output_path="s3://bucket/processed/"
)

print(f"Pipeline metrics: {stats}")

This pattern demonstrates inference-first architecture where semantic operations compose with traditional DataFrame transformations.

Processing Transcripts with Speaker Awareness

Handle meeting transcripts and call recordings:

python
from pathlib import Path

class MeetingSegment(BaseModel):
    speaker: str
    start_time: float
    end_time: float
    key_points: List[str]
    action_items: List[str]

# Load transcript
transcript_data = Path("meetings/standup.json").read_text()
df = session.create_dataframe({"transcript": [transcript_data]})

# Chunk with speaker awareness
processed = (
    df
    .select(
        "*",
        fc.text.recursive_token_chunk(
            "transcript",
            chunk_size=1200,
            chunk_overlap_percentage=0
        ).alias("chunks")
    )
    .explode("chunks")
    .select(
        fc.col("chunks").alias("chunk"),
        fc.semantic.extract(
            "chunk",
            MeetingSegment,
            model_alias="fast"
        ).alias("segment")
    )
    .unnest("segment")
)

# Aggregate by speaker
speaker_summary = (
    processed
    .group_by(fc.col("speaker"))
    .agg(
        fc.semantic.reduce(
            "Summarize this speaker's contributions",
            fc.col("key_points").cast(fc.StringType),
            model_alias="fast"
        ).alias("summary")
    )
)

speaker_summary.show()

TranscriptType preserves speaker identity and timestamps through transformations.

Optimizing Pipeline Performance

Model Selection Strategy

Use cost-efficient models for high-volume tasks:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel("gpt-4o-nano", rpm=500),
            "mini": fc.OpenAILanguageModel("gpt-4o-mini", rpm=500),
            "full": fc.AnthropicLanguageModel("claude-opus-4", rpm=100),
        }
    )
)

# Classification with nano model
.semantic.classify(
    fc.col("text"),
    classes=["urgent", "routine"],
    model_alias="nano"
)

# Extraction with mini model
.semantic.extract(
    fc.col("text"),
    StandardSchema,
    model_alias="mini"
)

# Critical reasoning with full model
.semantic.extract(
    fc.col("text"),
    ComplexSchema,
    model_alias="full"
)

Cost differences between models range from 10-100x, making strategic selection essential.

Explicit Caching

Cache expensive operations to prevent redundant processing:

python
# Execute once and materialize to avoid re-execution
result = df.select(
    "*",
    fc.semantic.extract(
        fc.col("text"), 
        Schema, 
        model_alias="accurate"
    ).alias("extracted")
).unnest("extracted").collect()

# Convert back to DataFrame for further operations
# (work with result.data in your preferred format)
extracted = session.create_dataframe(result.data)

# Multiple filters use the materialized results
critical = extracted.filter(fc.col("priority") == "critical").collect()
high = extracted.filter(fc.col("priority") == "high").collect()

Batch Processing at Scale

Process directories efficiently:

python
# Load all documents
all_docs = session.read.pdf_metadata(
    "archive/**/*.pdf",
    recursive=True
)

# Filter before parsing
recent_docs = all_docs.filter(
    fc.col("modified_date") > "2024-01-01"
)

# Parse and process
processed = (
    recent_docs
    .select(
        "*",
        fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
    )
    .semantic.extract(fc.col("content"), Schema)
    .write.parquet("s3://output/")
)

The framework handles rate limiting and retries automatically.

Building Knowledge Graphs from Documents

Extract entities and relationships:

python
class Entity(BaseModel):
    entity_type: Literal["person", "organization", "location", "date"]
    entity_name: str
    context: str

class Relationship(BaseModel):
    source_entity: str
    target_entity: str
    relationship_type: Literal["employed_by", "located_in", "reports_to"]
    confidence: Literal["high", "medium", "low"]

# Extract entities
entities = (
    parsed_docs
    .select(
        "*",
        fc.semantic.extract(
            fc.col("content"),
            Entity,
            model_alias="fast"
        ).alias("entity")
    )
    .unnest("entity")
    .with_column(
        "entity_embedding",
        fc.semantic.embed(
            fc.col("entity_name") + " " + fc.col("context")
        )
    )
)

# Extract relationships
relationships = (
    parsed_docs
    .select(
        "*",
        fc.semantic.extract(
            fc.col("content"),
            Relationship,
            model_alias="accurate"
        ).alias("rel")
    )
    .unnest("rel")
)

# Link entities across documents
entity_links = (
    entities
    .alias("e1")
    .semantic.join(
        other=entities.alias("e2"),
        predicate="""
        Do these refer to the same real-world entity?
        Entity 1: {{left_on}}
        Entity 2: {{right_on}}
        """,
        left_on=fc.col("entity_name"),
        right_on=fc.col("entity_name"),
        model_alias="accurate"
    )
    .filter(fc.col("e1.file_path") != fc.col("e2.file_path"))
)

# Write knowledge graph
entities.write.parquet("kg/entities/")
relationships.write.parquet("kg/relationships/")
entity_links.write.parquet("kg/links/")

Monitoring Pipeline Metrics

Track costs and performance:

python
result = pipeline.collect()
metrics = result.metrics

print(f"Total tokens: {metrics.total_lm_metrics.num_output_tokens}")
print(f"Total cost: ${metrics.total_lm_metrics.cost:.2f}")
print(f"Execution time: {metrics.execution_time_ms/1000:.2f}s")

# Per-model breakdown
for model_name, model_metrics in metrics.lm_metrics.items():
    print(f"\n{model_name}:")
    print(f"  Requests: {model_metrics.num_requests}")
    print(f"  Input tokens: {model_metrics.num_input_tokens}")
    print(f"  Output tokens: {model_metrics.num_output_tokens}")
    print(f"  Cost: ${model_metrics.cost:.2f}")

Built-in metrics and observability provide visibility into resource usage.

Advanced Filtering with Semantic Predicates

Apply natural language filters to documents:

python
# Load and parse documents
docs = session.read.pdf_metadata("legal/**/*.pdf")
parsed = docs.select(
    "*",
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
)

# Filter with semantic predicate
relevant = parsed.filter(
    fc.semantic.predicate(
        """
        Does this document contain information about:
        - Intellectual property rights
        - Licensing terms
        - Technology transfer

        Content: {{content}}
        """,
        content=fc.col("content")
    )
)

relevant.show()

Semantic predicates enable content-based filtering without regex patterns.

Integrating with Data Infrastructure

Write to Lakehouses

Export to standard formats:

python
# Write to Parquet
processed.write.parquet("s3://bucket/output/")

# Write to table in catalog (for persistent storage)
processed.write.save_as_table("processed_documents", mode="append")

# Write to CSV (alternative format)
processed.write.csv("s3://bucket/output.csv")

Lakehouse-native architecture enables seamless integration without data movement.

Vector Database Export

Prepare data for vector search:

python
# Generate embeddings with metadata
vector_data = (
    processed
    .select(
        fc.col("document_id").alias("id"),
        fc.col("content").alias("text"),
        fc.col("doc_type").alias("category"),
        fc.col("file_path").alias("source"),
        fc.semantic.embed(fc.col("content")).alias("embedding")
    )
)

# Write for vector DB ingestion
vector_data.write.parquet("vector_store_input/")

Production Best Practices

Schema Design Guidelines

Create clear, specific schemas:

python
class PurchaseOrder(BaseModel):
    po_number: str = Field(description="Unique PO identifier")
    vendor_name: str = Field(description="Legal vendor name")
    order_date: str = Field(description="Date format YYYY-MM-DD")
    line_items: List[str] = Field(description="Items being purchased")
    total_amount: float = Field(ge=0, description="Total in USD")
    approval_status: Literal["pending", "approved", "rejected"]

Field descriptions and constraints improve extraction accuracy.

Error Handling

Implement validation checks:

python
# Check for extraction failures BEFORE collecting
failed = pipeline.filter(fc.col("extracted_data").is_null())

if failed.count() > 0:
    print(f"Warning: {failed.count()} documents failed extraction")
    failed.select("file_path").show()

Cost Estimation

Test on samples before full runs:

python
# Process sample
sample = df.limit(50).semantic.extract(...).collect()
cost_per_doc = sample.metrics.total_lm_metrics.cost / 50

total_docs = df.count()
estimated_cost = cost_per_doc * total_docs

print(f"Estimated cost for {total_docs} docs: ${estimated_cost:.2f}")

Scaling to Cloud Execution

Deploy with Typedef Cloud for automatic scaling:

python
config = fc.SessionConfig(
    app_name="production_docs",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel("gpt-4o-mini", rpm=500)
        }
    ),
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.LARGE
    )
)

session = fc.Session.get_or_create(config)

# Same code scales automatically
df = session.read.pdf_metadata("s3://inputs/**/*.pdf")
processed = df.semantic.parse_pdf(...).write.parquet("s3://outputs/")

Zero code changes enable local-to-cloud deployment.

Key Takeaways

Typedef provides native capabilities for document ingestion pipelines:

  • Native PDF parsing with page chunking and metadata extraction
  • Typed schemas eliminate brittle prompt engineering
  • Semantic joins link related content across documents
  • Vector operations as first-class DataFrame operations
  • Automatic optimization handles batching, caching, rate limiting
  • Built-in observability tracks costs and performance
  • Lakehouse integration works with existing infrastructure

Teams building document processing systems report 100x time savings and dramatic cost reductions compared to custom microservices.

Start building document pipelines with Fenic on GitHub, read the open source announcement, or explore semantic operators for data transformation. How to Supercharge Unstruct ... fcf08013a051c820742cb4b1.md External Displaying How to Supercharge Unstructured io Ingestion with 295df41efcf08013a051c820742cb4b1.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.