<< goback()

How to Pair Reducto OCR with Schema Validation and LLM Fix-Ups on Typedef

Typedef Team

How to Pair Reducto OCR with Schema Validation and LLM Fix-Ups on Typedef

OCR technology extracts text from documents, but raw OCR output is rarely production-ready. Text formatting breaks, confidence scores vary, and critical data points hide in unstructured content. The gap between extraction and usable structured data has traditionally required brittle glue code, manual validation, and constant maintenance.

Typedef's Fenic framework eliminates this fragility by treating schema validation and LLM-based corrections as native DataFrame operations. This guide demonstrates how to build reliable pipelines that transform OCR output into validated, structured data using declarative operations rather than custom scripts.

The OCR-to-Structured-Data Challenge

OCR services like Reducto extract text from PDFs, images, and scanned documents. The output arrives as unstructured strings containing everything from invoice line items to contract clauses. Three problems consistently emerge:

Structural ambiguity: OCR text lacks semantic boundaries. Which lines belong to the header versus the body? Where does one section end and another begin?

Quality variance: Character recognition errors create subtle data corruption. "2024-01-15" becomes "Z024-O1-15", dollar amounts lose decimal points, and table structures collapse into linear text.

Schema mismatch: Business logic expects typed, validated data structures. OCR delivers raw strings requiring extensive parsing and validation before downstream systems can consume them.

Traditional approaches scatter validation logic across multiple services, creating maintenance bottlenecks that slow iteration and hide errors until production failures surface.

Fenic's Inference-First Architecture for Document Processing

Fenic rebuilds data processing from first principles with semantic operations as native capabilities. Rather than treating LLM calls as external functions requiring custom orchestration, Fenic's query engine understands inference operations and optimizes them alongside traditional DataFrame transformations.

This architectural choice delivers three critical advantages for OCR workflows:

Automatic batching: The engine groups API calls to minimize latency while respecting provider rate limits

Intelligent caching: Repeated extractions reuse prior results without manual cache management

Deterministic lineage: Every row maintains traceable provenance through transformations, even when those transformations involve probabilistic models

Schema-Driven Extraction with Pydantic Models

The foundation of reliable OCR processing is schema-driven extraction using semantic.extract. This operator transforms unstructured text into type-safe structures defined by Pydantic models.

Define the target structure with field-level descriptions:

python
from pydantic import BaseModel, Field
from typing import List, Literal, Optional
import fenic as fc

class InvoiceLineItem(BaseModel):
    description: str = Field(description="Product or service description")
    quantity: float = Field(description="Number of units")
    unit_price: float = Field(description="Price per unit in USD")
    total: float = Field(description="Line item total (quantity × unit_price)")

class Invoice(BaseModel):
    invoice_number: str = Field(description="Unique invoice identifier")
    invoice_date: str = Field(description="Invoice issue date in YYYY-MM-DD format")
    customer_name: str = Field(description="Name of the customer or business")
    line_items: List[InvoiceLineItem] = Field(description="All products or services billed")
    subtotal: float = Field(description="Sum of all line items before tax")
    tax_amount: float = Field(description="Total tax amount")
    total_amount: float = Field(description="Final amount due")
    payment_status: Literal["paid", "pending", "overdue"] = Field(
        description="Current payment status"
    )

The schema serves three purposes simultaneously: documentation for humans, validation rules enforced at runtime, and LLM guidance for extraction accuracy.

Apply extraction to OCR text using DataFrame operations:

python
# Configure session with appropriate language model
config = fc.SessionConfig(
    app_name="invoice_processor",
    semantic=fc.SemanticConfig(
        language_models={
            "primary": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="primary"
    )
)

session = fc.Session.get_or_create(config)

# Load OCR output (assuming OCR text is in a column named 'ocr_text')
df = session.read.csv("ocr_output.csv")

# Extract structured data with type-safe validation
invoices = df.select(
    fc.col("document_id"),
    fc.semantic.extract(
        fc.col("ocr_text"),
        Invoice,
        model_alias="primary"
    ).alias("invoice_data")
)

Schema-driven extraction achieves 74.2-96.1% F1 scores in evaluated domains without requiring thousands of labeled examples. The approach eliminates annotation costs while maintaining production-grade accuracy.

Validating Extraction Quality with Business Rules

Pydantic handles type validation automatically, but business logic often requires additional checks beyond type constraints. Implement validation using semantic predicates:

python
# Check for logical consistency in extracted data
validated_invoices = invoices.filter(
    # Type validation happens automatically via Pydantic
    # Add business rule validation
    fc.semantic.predicate(
        """The invoice {{ invoice_data }} has internally consistent math:
        - Each line_item total equals quantity × unit_price
        - Subtotal equals sum of line_item totals
        - Total amount equals subtotal + tax amount
        Return true only if all calculations are correct.""",
        invoice_data=fc.col("invoice_data")
    )
)

This approach surfaces extraction quality issues early in the pipeline rather than allowing invalid data to propagate downstream.

LLM-Based Correction with semantic.map

Even with strong schemas, OCR errors require correction. The semantic.map operator applies targeted fixes using natural language instructions:

python
from pydantic import BaseModel, Field

class CorrectedInvoice(BaseModel):
    invoice_data: Invoice = Field(description="The corrected invoice data")
    corrections_made: List[str] = Field(
        description="List of corrections applied (empty if no corrections needed)"
    )
    confidence: Literal["high", "medium", "low"] = Field(
        description="Confidence level in the corrected data"
    )

# Apply corrections for common OCR errors
corrected = validated_invoices.select(
    fc.col("document_id"),
    fc.semantic.map(
        """Review this invoice data for common OCR errors:
        - Zero/letter confusion: O vs 0, l vs 1, S vs 5
        - Decimal point errors in amounts
        - Date format inconsistencies
        - Transposed digits in numbers

        Invoice data: {{ invoice_data }}

        Return corrected data with a list of specific corrections made.
        Only make corrections when confident about OCR errors, not business logic errors.""",
        invoice_data=fc.col("invoice_data"),
        response_format=CorrectedInvoice
    ).alias("corrected_invoice")
)

The response_format parameter enforces structured output, ensuring corrections return predictable data structures rather than unstructured text.

Building Production-Ready Validation Pipelines

Real-world document processing requires multi-stage validation with escalation workflows. Combine schema extraction, validation, and correction into cohesive pipelines:

python
class ValidationStatus(BaseModel):
    passed_schema_validation: bool
    passed_business_rules: bool
    corrections_applied: int
    requires_human_review: bool
    review_reason: Optional[str] = None

def build_invoice_pipeline(df):
    """Complete invoice processing with validation and correction."""

    # Stage 1: Schema-driven extraction
    extracted = df.select(
        fc.col("document_id"),
        fc.col("ocr_text"),
        fc.semantic.extract(
            fc.col("ocr_text"),
            Invoice
        ).alias("raw_invoice")
    )

    # Stage 2: Business rule validation
    validated = extracted.with_column(
        "validation_passed",
        fc.semantic.predicate(
            """Invoice {{ invoice }} passes business rules:
            - Line item calculations are correct
            - Subtotal equals sum of line items
            - Total equals subtotal plus tax
            - Date is in valid format
            - All required fields are present and reasonable""",
            invoice=fc.col("raw_invoice")
        )
    )

    # Stage 3: Apply corrections to failed validations
    corrected = validated.with_column(
        "final_invoice",
        fc.when(
            fc.col("validation_passed"),
            fc.col("raw_invoice")  # Already valid, no correction needed
        ).otherwise(
            fc.semantic.map(
                """Fix OCR errors in invoice: {{ invoice }}

                Common issues to check:
                - Character confusion (O/0, l/1, S/5)
                - Missing or misplaced decimal points
                - Transposed digits
                - Incorrect date formats

                Return corrected invoice data.""",
                invoice=fc.col("raw_invoice"),
                response_format=Invoice
            )
        )
    )

    # Stage 4: Re-validate corrected data
    final_validated = corrected.with_column(
        "final_validation",
        fc.semantic.predicate(
            """Invoice {{ invoice }} now passes all business rules""",
            invoice=fc.col("final_invoice")
        )
    )

    # Stage 5: Flag items requiring human review
    return final_validated.with_column(
        "needs_review",
        ~fc.col("final_validation")
    )

This pipeline structure maintains lineage throughout the process, enabling debugging when issues arise and providing audit trails for compliance requirements.

Handling Multi-Page Documents with Semantic Joins

Complex documents span multiple pages with relationships between sections. Use semantic joins to connect related content:

python
# Process multi-page purchase orders where items span pages
page_df = session.read.csv("multi_page_ocr.csv")

class PurchaseOrderHeader(BaseModel):
    po_number: str
    vendor: str
    order_date: str
    delivery_address: str

class PurchaseOrderItem(BaseModel):
    item_number: str
    description: str
    quantity: int
    unit_price: float

# Extract header and items separately
headers = page_df.filter(
    fc.col("page_type") == "header"
).select(
    fc.col("document_id"),
    fc.semantic.extract(
        fc.col("ocr_text"),
        PurchaseOrderHeader
    ).alias("header")
)

items = page_df.filter(
    fc.col("page_type") == "items"
).select(
    fc.col("document_id"),
    fc.semantic.extract(
        fc.col("ocr_text"),
        PurchaseOrderItem
    ).alias("item")
)

# Join semantically related content
complete_orders = headers.semantic.join(
    other=items,
    predicate="""The purchase order header {{ left_on }}
    corresponds to the line items {{ right_on }} based on:
    - Document ID matching
    - Vendor name consistency
    - Logical item descriptions for the vendor type""",
    left_on=fc.col("header"),
    right_on=fc.col("item")
)

Semantic joins enable content-based relationships that traditional key-based joins miss, particularly valuable when OCR quality varies across pages.

Cost Optimization Through Model Selection

Different extraction tasks require different model capabilities. Configure model tiers for cost-effective processing:

python
config = fc.SessionConfig(
    app_name="cost_optimized_ocr",
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200000
            ),
            "standard": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100000,
                output_tpm=50000
            ),
            "power": fc.OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="nano"
    )
)

session = fc.Session.get_or_create(config)

# Use appropriate model tier for each task
df = session.read.csv("documents.csv")

processed = (
    df
    # Simple classification uses fast, cheap model
    .with_column(
        "doc_type",
        fc.semantic.classify(
            fc.col("ocr_text"),
            ["invoice", "receipt", "purchase_order", "contract"],
            model_alias="nano"
        )
    )
    # Complex extraction uses more capable model
    .with_column(
        "structured_data",
        fc.semantic.extract(
            fc.col("ocr_text"),
            Invoice,
            model_alias="standard"
        )
    )
    # Critical corrections use most capable model
    .filter(~fc.col("validation_passed"))
    .with_column(
        "corrected_data",
        fc.semantic.map(
            "Fix critical errors in {{ data }}",
            data=fc.col("structured_data"),
            response_format=Invoice,
            model_alias="power"
        )
    )
)

Track costs through built-in metrics:

python
result = processed.collect()

# Access comprehensive cost metrics
print(f"Total tokens: {result.metrics.total_lm_metrics.num_output_tokens}")
print(f"Total cost: ${result.metrics.total_lm_metrics.cost:.4f}")
print(f"Execution time: {result.metrics.execution_time_ms}ms")

Error Handling and Retry Strategies

Production systems require resilient error handling. Fenic provides built-in retry logic and rate limiting, but additional application-level validation ensures data quality:

python
def process_with_fallback(df):
    """Process with fallback strategies for extraction failures."""

    extracted = df.select(
        fc.col("document_id"),
        fc.col("ocr_text"),
        fc.semantic.extract(
            fc.col("ocr_text"),
            Invoice,
            temperature=0.0  # Deterministic extraction
        ).alias("invoice")
    )

    # Identify low-quality extractions
    quality_checked = extracted.with_column(
        "extraction_quality",
        fc.semantic.classify(
            fc.col("invoice"),
            [
                "complete_and_valid",
                "partial_data",
                "failed_extraction"
            ]
        )
    )

    # Apply different strategies based on quality
    final = (
        quality_checked
        .with_column(
            "processed_invoice",
            fc.when(
                fc.col("extraction_quality") == "complete_and_valid",
                fc.col("invoice")
            ).when(
                fc.col("extraction_quality") == "partial_data",
                # Retry with more explicit prompt
                fc.semantic.extract(
                    fc.col("ocr_text"),
                    Invoice,
                    temperature=0.3  # Slightly more creative
                )
            ).otherwise(
                # Flag for manual review
                fc.lit(None)
            )
        )
        .with_column(
            "requires_manual_review",
            fc.col("processed_invoice").is_null()
        )
    )

    return final

Integration with PDF Processing

Fenic includes native PDF parsing that pairs naturally with schema extraction workflows:

python
# Configure for PDF processing
config = fc.SessionConfig(
    app_name="pdf_processor",
    semantic=fc.SemanticConfig(
        language_models={
            "gemini": fc.GoogleDeveloperLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=100,
                tpm=1000
            )
        },
        default_language_model="gemini"
    )
)

session = fc.Session.get_or_create(config)

# Discover and parse PDFs
pdfs = session.read.pdf_metadata("data/invoices/**/*.pdf", recursive=True)

# Parse to markdown with page awareness
parsed = pdfs.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True
    ).alias("markdown_content")
)

# Extract structured data from parsed content
invoices = parsed.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("markdown_content"),
        Invoice
    ).alias("invoice_data")
)

PDF processing in Fenic 0.5.0 introduced page chunking and Google model support, enabling efficient processing of large document corpora.

Production Deployment Patterns

Move from local development to production scale without code changes:

python
# Local development
local_config = fc.SessionConfig(
    app_name="invoice_processor"
)
local_session = fc.Session.get_or_create(local_config)

df = local_session.read.csv("sample_invoices.csv")
processed = build_invoice_pipeline(df)
processed.write.parquet("local_results.parquet")

# Production deployment - identical pipeline code
prod_config = fc.SessionConfig(
    app_name="invoice_processor",
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.MEDIUM
    ),
    semantic=fc.SemanticConfig(
        language_models={
            "primary": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200000
            )
        }
    )
)

prod_session = fc.Session.get_or_create(prod_config)

# Same pipeline, cloud execution with automatic scaling
df = prod_session.read.csv("s3://bucket/invoices/*.csv")
processed = build_invoice_pipeline(df)
processed.write.parquet("s3://bucket/processed/invoices.parquet")

Monitoring and Observability

Track pipeline health through comprehensive metrics:

python
def monitor_extraction_quality(df, query_result):
    """Extract quality metrics from pipeline results."""

    metrics = {
        "total_documents": df.count(),
        "successful_extractions": df.filter(
            fc.col("validation_passed")
        ).count(),
        "corrections_applied": df.filter(
            fc.col("corrections_made") > 0
        ).count(),
        "manual_review_required": df.filter(
            fc.col("needs_review")
        ).count(),
        "total_cost": query_result.metrics.total_lm_metrics.cost,
        "avg_tokens_per_doc": (
            query_result.metrics.total_lm_metrics.num_output_tokens /
            df.count()
        ),
        "execution_time_seconds": query_result.metrics.execution_time_ms / 1000
    }

    return metrics

These metrics inform capacity planning and help identify optimization opportunities as document volumes scale.

Best Practices for OCR Integration

Define schemas incrementally: Start with required fields, add optional fields as patterns emerge

Use field descriptions liberally: Detailed descriptions improve extraction accuracy and serve as documentation

Validate early and often: Apply business rule checks immediately after extraction rather than at pipeline end

Tier model usage: Reserve powerful models for complex extractions and corrections; use efficient models for classification and simple tasks

Cache aggressively: Enable caching at pipeline stages where repeated extraction occurs

Monitor extraction patterns: Track which document types require corrections most frequently to identify systematic OCR issues

Maintain audit trails: Use DataFrame lineage to trace every transformation for compliance and debugging

Test with representative failures: Include documents with known OCR quality issues in test suites

Advanced: Multi-Model Validation

Increase reliability by validating extractions across multiple models:

python
def multi_model_extraction(df):
    """Extract with multiple models and validate consistency."""

    # Extract with two different models
    extracted_a = df.select(
        fc.col("document_id"),
        fc.semantic.extract(
            fc.col("ocr_text"),
            Invoice,
            model_alias="model_a"
        ).alias("invoice_a")
    )

    extracted_b = df.select(
        fc.col("document_id"),
        fc.semantic.extract(
            fc.col("ocr_text"),
            Invoice,
            model_alias="model_b"
        ).alias("invoice_b")
    )

    # Join results and validate consistency
    combined = extracted_a.join(
        extracted_b,
        on="document_id"
    ).with_column(
        "models_agree",
        fc.semantic.predicate(
            """The two invoice extractions {{ invoice_a }} and {{ invoice_b }}
            contain the same core information:
            - Same invoice number
            - Same total amount
            - Same customer name
            - Same line items (allowing for minor formatting differences)""",
            invoice_a=fc.col("invoice_a"),
            invoice_b=fc.col("invoice_b")
        )
    )

    # Use model A for agreements, flag disagreements for review
    return combined.with_column(
        "final_invoice",
        fc.when(
            fc.col("models_agree"),
            fc.col("invoice_a")
        ).otherwise(
            fc.lit(None)
        )
    ).with_column(
        "needs_review",
        ~fc.col("models_agree")
    )

This approach provides additional confidence for mission-critical extractions where errors carry significant business impact.

Conclusion

Processing OCR output with schema validation and LLM corrections traditionally required extensive glue code, manual error handling, and fragile orchestration. Fenic's semantic operators treat these workflows as native DataFrame operations, eliminating brittleness while maintaining the reliability and predictability production systems require.

The declarative approach—defining schemas once, applying corrections through natural language instructions, and validating through semantic predicates—produces maintainable pipelines that evolve with document format changes and business requirements. Organizations report dramatic time savings and cost reductions when moving from custom scripts to unified DataFrame architectures.

Start building reliable OCR processing pipelines today with Fenic. How to Pair Reducto OCR wit ... fcf08037b2d4f6e6e086cec3.md External Displaying How to Pair Reducto OCR with Schema Validation and 295df41efcf08037b2d4f6e6e086cec3.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.