<< goback()

How to Scale Semantic Extraction Across Insurance Carriers Efficiently

Typedef Team

How to Scale Semantic Extraction Across Insurance Carriers Efficiently

Insurance operations generate massive volumes of unstructured data daily: policy documents, claims forms, medical records, underwriting reports, and customer correspondence. Processing this data manually creates bottlenecks, while traditional extraction tools struggle with format variations across carriers. This guide shows how to build production-grade semantic extraction pipelines that scale efficiently across multiple insurance carriers.

Why Traditional Approaches Fail at Scale

Insurance data presents unique extraction challenges that break conventional tools:

Format heterogeneity across carriers. Each carrier uses different document templates, field names, and data structures. A claims form from Carrier A looks nothing like one from Carrier B. Traditional regex-based extraction requires maintaining separate rule sets per carrier, creating unmaintainable technical debt.

Unstructured dominates the data landscape. Text narratives, scanned documents, and semi-structured forms comprise the majority of insurance data. Rule-based systems handle structured data well but fail on text that requires interpretation and context understanding.

Volume compounds complexity. Processing thousands of documents daily while maintaining accuracy requires infrastructure that most extraction tools weren't designed to handle. Batch processing, rate limiting, and error recovery become critical at scale.

Compliance requirements add constraints. HIPAA, state insurance regulations, and carrier-specific data handling policies demand audit trails, data lineage, and controlled access patterns that traditional point solutions don't provide.

Schema-Driven Extraction: The Foundation for Scale

Schema-driven extraction replaces brittle prompt engineering with type-safe data contracts. Research shows this approach achieves 74.2-96.1% F1 scores on extraction tasks without requiring task-specific labeled data for each document type.

The economics shift dramatically. Traditional supervised learning requires thousands of labeled examples per document type. One study found schema-guided extraction costs approximately $3 per document versus $133 for human-labeled training data in dialogue annotation contexts.

Defining Extraction Schemas

Schemas serve as both documentation and validation. Here's how to structure extraction for insurance documents:

python
from pydantic import BaseModel, Field
from typing import List, Literal, Optional

class ClaimItem(BaseModel):
    service_code: str = Field(description="CPT or procedure code")
    service_date: str = Field(description="Date of service in YYYY-MM-DD format")
    provider_name: str = Field(description="Name of healthcare provider")
    billed_amount: float = Field(description="Amount billed in USD")
    allowed_amount: Optional[float] = Field(description="Carrier-allowed amount")

class MedicalClaim(BaseModel):
    claim_id: str = Field(description="Unique claim identifier")
    patient_id: str = Field(description="Patient account number")
    diagnosis_codes: List[str] = Field(description="ICD-10 diagnosis codes")
    claim_status: Literal["pending", "approved", "denied", "under_review"]
    claim_items: List[ClaimItem] = Field(description="Line items on the claim")
    total_amount: float = Field(description="Total claim amount")

This schema works across carriers because it captures semantic intent rather than specific field positions or formats. The LLM maps carrier-specific terminology to your canonical schema automatically.

Implementing Extraction with Fenic

Fenic provides semantic extraction as a DataFrame operation. The same code processes documents from any carrier:

python
import fenic as fc

# Configure session with appropriate model
config = fc.SessionConfig(
    app_name="insurance_extraction",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt4": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=500,
                tpm=200_000
            )
        },
        default_language_model="gpt4"
    )
)

session = fc.Session.get_or_create(config)

# Load claims documents  
claims_df = session.read.pdf_metadata("claims/*.pdf")

# Extract structured data with schema
extracted = claims_df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        MedicalClaim
    ).alias("claim_data")
)

# Unnest and validate
structured_claims = (
    extracted
    .unnest("claim_data")
    .filter(fc.col("total_amount") > 0)
)

The extraction handles format variations automatically. The model interprets document content and maps it to your schema regardless of how the source carrier structures their data.

Building Production-Grade Extraction Pipelines

Production extraction requires more than basic schema application. You need preprocessing, validation, error recovery, and monitoring capabilities.

Multi-Stage Pipeline Architecture

Effective pipelines separate concerns across distinct processing stages:

python
# Stage 1: Document ingestion and preprocessing
raw_docs = (
    session.read.pdf_metadata("carriers/**/claims/*.pdf", recursive=True)
    .filter(fc.col("page_count") < 50)  # Filter unreasonably large files
    .filter(fc.col("file_size") < 10_000_000)  # 10MB limit
)

# Stage 2: Parse PDFs with page chunking
parsed = raw_docs.select(
    fc.col("file_path"),
    fc.col("carrier_id"),  # Derived from file path
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=False  # Optimize for text-heavy documents
    ).alias("content")
)

# Stage 3: Extract with carrier-specific context
extracted = parsed.with_column(
    "extracted_data",
    fc.semantic.extract(
        fc.col("content"),
        MedicalClaim,
        model_alias="gpt4",
        temperature=0.0  # Deterministic extraction
    )
)

# Stage 4: Validation and enrichment
validated = (
    extracted
    .unnest("extracted_data")
    .filter(fc.col("claim_id").is_not_null())
    .with_column(
        "validation_status",
        fc.when(fc.col("total_amount") == fc.col("claim_items").sum("billed_amount"))
        .then("valid")
        .otherwise("amount_mismatch")
    )
)

This staged approach enables independent testing and optimization of each phase. Parsing failures don't block extraction of successfully parsed documents.

Handling Multi-Carrier Variations

Different carriers require different preprocessing strategies. Use carrier identification to route documents appropriately:

python
# Identify carrier from file path or document metadata
carrier_tagged = parsed.with_column(
    "carrier",
    fc.when(fc.col("file_path").contains("aetna"))
    .then("aetna")
    .when(fc.col("file_path").contains("united"))
    .then("united_healthcare")
    .otherwise("unknown")
)

# Apply carrier-specific extraction parameters
def extract_by_carrier(df):
    return df.select(
        fc.col("*"),
        fc.when(fc.col("carrier") == "aetna")
        .then(fc.semantic.extract(fc.col("content"), MedicalClaim, temperature=0.0))
        .when(fc.col("carrier") == "united_healthcare")
        .then(fc.semantic.extract(fc.col("content"), MedicalClaim, temperature=0.1))
        .otherwise(fc.semantic.extract(fc.col("content"), MedicalClaim))
        .alias("claim_data")
    )

carrier_extracted = extract_by_carrier(carrier_tagged)

This pattern maintains a single extraction schema while accommodating carrier-specific quirks through parameter adjustments.

Multi-Carrier Deployment Strategies

Scaling across carriers requires deployment patterns that handle varying volumes, formats, and processing requirements.

Batch Processing Architecture

Process documents in batches to optimize throughput and manage costs. Fenic automatically batches API calls based on provider limits:

python
# Configure batching behavior
config = fc.SessionConfig(
    app_name="multi_carrier_extraction",
    semantic=fc.SemanticConfig(
        language_models={
            "batch_model": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=1000,  # Requests per minute limit
                tpm=500_000  # Tokens per minute limit
            )
        },
        default_language_model="batch_model"
    )
)

# Process carrier batches independently
def process_carrier_batch(carrier_name: str, batch_date: str):
    batch_path = f"incoming/{carrier_name}/{batch_date}/*.pdf"

    results = (
        session.read.pdf_metadata(batch_path)
        .select(
            fc.col("file_path"),
            fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
        )
        .select(
            fc.col("file_path"),
            fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
        )
        .unnest("claim")
    )

    # Write results to carrier-specific output
    results.write.parquet(f"processed/{carrier_name}/{batch_date}/")

    return results

# Schedule batch processing per carrier
carriers = ["aetna", "united", "cigna", "anthem"]
for carrier in carriers:
    process_carrier_batch(carrier, "2025-01-15")

Batch isolation ensures one carrier's processing issues don't block others.

Parallel Processing Patterns

Scale horizontally by processing multiple carriers or document batches simultaneously:

python
from concurrent.futures import ThreadPoolExecutor, as_completed

def process_batch(carrier: str, batch_id: str):
    try:
        result = process_carrier_batch(carrier, batch_id)
        return {"carrier": carrier, "batch": batch_id, "status": "success", "count": result.count()}
    except Exception as e:
        return {"carrier": carrier, "batch": batch_id, "status": "error", "error": str(e)}

# Process multiple batches in parallel
batches = [
    ("aetna", "2025-01-15"),
    ("united", "2025-01-15"),
    ("cigna", "2025-01-15"),
    ("anthem", "2025-01-15")
]

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_batch, carrier, batch) for carrier, batch in batches]

    for future in as_completed(futures):
        result = future.result()
        print(f"Processed {result['carrier']}/{result['batch']}: {result['status']}")

This pattern maximizes throughput when processing independent document sets.

Performance Optimization Techniques

Production extraction must balance accuracy, speed, and cost. Optimization focuses on reducing redundant processing and maximizing API efficiency.

Deduplication Before Extraction

Insurance documents often contain duplicates or near-duplicates. Extract unique content only:

python
# Deduplicate based on content fingerprints
from hashlib import sha256
from fenic.api.functions import udf
from fenic.core.types import StringType

@udf(return_type=StringType)
def compute_fingerprint(content: str) -> str:
    return sha256(content.encode()).hexdigest()

deduplicated = (
    parsed
    .with_column("fingerprint", compute_fingerprint(fc.col("content")))
    .drop_duplicates(["fingerprint"])
)

# Extract only unique documents
unique_extracted = deduplicated.select(
    fc.col("*"),
    fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
)

This approach reduces API calls proportionally to duplication rate. One carrier reported 40% duplicate rate across daily submissions.

Caching Extraction Results

Cache extracted data to avoid reprocessing:

python
# Check cache before extraction
cached_path = "cache/extractions.parquet"

try:
    cached = session.read.parquet(cached_path)
    cached_fingerprints = set(cached.select("fingerprint").collect())
except:
    cached_fingerprints = set()

# Process only new documents
new_docs = deduplicated.filter(
    ~fc.col("fingerprint").is_in(list(cached_fingerprints))
)

new_extracted = new_docs.select(
    fc.col("*"),
    fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
)

# Append to cache
if new_extracted.count() > 0:
    combined = cached.union(new_extracted)
    combined.write.parquet(cached_path, mode="overwrite")

Caching proves critical for reprocessing pipelines and development iteration.

Selective Extraction Depth

Not all documents require full extraction. Use classification to route documents:

python
# First pass: classify document type
classified = parsed.select(
    fc.col("*"),
    fc.semantic.classify(
        fc.col("content"),
        ["standard_claim", "complex_claim", "appeal", "correspondence"]
    ).alias("doc_type")
)

# Deep extraction only for relevant documents
standard_claims = (
    classified
    .filter(fc.col("doc_type") == "standard_claim")
    .select(
        fc.col("*"),
        fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
    )
)

# Lighter extraction for other types
correspondence = (
    classified
    .filter(fc.col("doc_type") == "correspondence")
    .select(
        fc.col("*"),
        fc.semantic.extract(fc.col("content"), CorrespondenceSummary).alias("summary")
    )
)

This tiered approach allocates compute budget to high-value documents.

Cost Management at Scale

Extraction costs scale with document volume and complexity. Effective cost management balances accuracy requirements against budget constraints.

Model Selection Strategy

Different models offer different cost-accuracy tradeoffs. Use OpenRouter for dynamic provider routing:

python
config = fc.SessionConfig(
    app_name="cost_optimized",
    semantic=fc.SemanticConfig(
        language_models={
            "cost_optimized": fc.OpenRouterLanguageModel(
                model_name="openai/gpt-4o-mini",
                profiles={
                    "default": fc.OpenRouterLanguageModel.Profile(
                        provider=fc.OpenRouterLanguageModel.Provider(
                            sort="latency"
                        )
                    )
                },
                default_profile="default"
            ),
            "accuracy_focused": fc.OpenAILanguageModel(
                model_name="gpt-4.1",
                rpm=100,
                tpm=50_000
            )
        },
        default_language_model="cost_optimized"
    )
)

# Route based on document complexity
def smart_extract(df):
    return df.select(
        fc.col("*"),
        fc.when(fc.col("page_count") > 20)
        .then(fc.semantic.extract(fc.col("content"), MedicalClaim, model_alias="accuracy_focused"))
        .otherwise(fc.semantic.extract(fc.col("content"), MedicalClaim, model_alias="cost_optimized"))
        .alias("claim")
    )

This pattern uses expensive models only when justified by complexity.

Token Usage Optimization

Minimize token consumption through preprocessing:

python
# Extract only relevant sections before semantic processing
# Note: Fenic uses text.extract() with templates, not regex_extract
# For regex-based extraction, use regexp_replace or text processing functions
relevant_sections = parsed.with_column(
    "relevant_content",
    fc.text.regexp_replace(
        fc.col("content"),
        r"^.*?(CLAIM DETAILS.*?)(?:NOTES:|$).*$",
        "$1"
    )
)

# Use extracted section for processing
optimized_extraction = relevant_sections.select(
    fc.col("*"),
    fc.semantic.extract(
        fc.col("relevant_content"),  # Smaller input = lower cost
        MedicalClaim
    ).alias("claim")
)

Reducing input size can cut token costs by 50-70% on verbose documents.

Implementing Cost Guardrails

Track and limit extraction costs in production:

python
from dataclasses import dataclass
from typing import Dict

@dataclass
class CostTracker:
    model_costs: Dict[str, float] = None

    def __post_init__(self):
        # Cost per 1M tokens (input/output)
        self.model_costs = {
            "gpt-4.1-nano": (0.30, 1.20),
            "gpt-4o-mini": (0.15, 0.60)
        }

    def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        input_cost, output_cost = self.model_costs.get(model, (0, 0))
        return (input_tokens * input_cost + output_tokens * output_cost) / 1_000_000

# Monitor costs during extraction
tracker = CostTracker()

def extract_with_budget(df, max_budget: float):
    estimated_cost = estimate_extraction_cost(df)

    if estimated_cost > max_budget:
        raise ValueError(f"Estimated cost ${estimated_cost:.2f} exceeds budget ${max_budget:.2f}")

    return df.select(
        fc.col("*"),
        fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
    )

Budget enforcement prevents cost overruns from unexpected document volumes.

Ensuring Compliance and Data Governance

Insurance data carries strict regulatory requirements. Production pipelines must implement comprehensive compliance controls.

PHI Redaction

Remove protected health information before processing:

python
import re

import re
from fenic.api.functions import udf
from fenic.core.types import StringType

@udf(return_type=StringType)
def redact_phi(text: str) -> str:
    # Redact SSN patterns
    text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN_REDACTED]', text)

    # Redact phone numbers
    text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_REDACTED]', text)

    # Redact email addresses
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL_REDACTED]', text)

    return text

# Apply redaction before extraction
redacted = parsed.with_column(
    "safe_content",
    redact_phi(fc.col("content"))
)

extracted = redacted.select(
    fc.col("*"),
    fc.semantic.extract(fc.col("safe_content"), MedicalClaim).alias("claim")
)

extracted = redacted.select(
    fc.col("*"),
    fc.semantic.extract(fc.col("safe_content"), MedicalClaim).alias("claim")
)

Redaction protects sensitive data while preserving document structure for extraction.

Audit Trail Implementation

Maintain complete processing lineage for compliance verification:

python
import datetime

def add_audit_metadata(df):
    return df.select(
        fc.col("*"),
        fc.lit(datetime.datetime.utcnow().isoformat()).alias("processed_at"),
        fc.lit("v1.2.3").alias("pipeline_version"),
        fc.col("file_path").alias("source_document")
    )

# Apply audit metadata at each stage
audited_parsed = add_audit_metadata(parsed)
audited_extracted = add_audit_metadata(extracted)

# Persist audit trail
audited_extracted.write.parquet(
    "audit/extractions/",
    mode="append",
    partition_by=["processed_date", "carrier"]
)

Audit trails enable compliance verification and issue investigation.

Access Control Patterns

Implement role-based access to extracted data:

python
# Separate outputs by access level
public_fields = extracted.select(
    fc.col("claim_id"),
    fc.col("claim_status"),
    fc.col("total_amount")
)

restricted_fields = extracted.select(
    fc.col("claim_id"),
    fc.col("patient_id"),
    fc.col("diagnosis_codes"),
    fc.col("claim_items")
)

# Write to separate locations with different permissions
public_fields.write.parquet("output/public/")
restricted_fields.write.parquet("output/restricted/")

Data separation enables granular access control at the storage layer.

Production Deployment and Monitoring

Production extraction requires robust deployment infrastructure and comprehensive monitoring.

Cloud Deployment with Typedef

Develop locally with Fenic, deploy to Typedef cloud with zero code changes:

python
# Local development configuration
dev_config = fc.SessionConfig(
    app_name="insurance_extraction",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt4": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=100,
                tpm=50_000
            )
        },
        default_language_model="gpt4"
    )
)

# Production configuration with cloud execution
prod_config = fc.SessionConfig(
    app_name="insurance_extraction",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt4": fc.OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=1000,
                tpm=500_000
            )
        },
        default_language_model="gpt4"
    ),
    cloud=fc.CloudConfig(size=fc.CloudExecutorSize.MEDIUM)
)

# Same extraction code works in both environments
session = fc.Session.get_or_create(prod_config)

Cloud deployment provides automatic scaling without pipeline modifications.

Coverage and Quality Metrics

Track extraction quality across carriers:

python
def compute_coverage_metrics(extracted_df):
    result = extracted_df.select(
        fc.count("*").alias("total_documents"),
        fc.count(fc.when(fc.col("claim_id").is_not_null()).then(1)).alias("successful_extractions"),
        fc.count(fc.when(fc.col("validation_status") == "valid").then(1)).alias("valid_extractions")
    ).collect()
    
    # Access the data from QueryResult
    metrics_data = result.data
    # Convert to dict if needed (assuming pydict format)
    if isinstance(metrics_data, list) and len(metrics_data) > 0:
        metrics = metrics_data[0]
    else:
        # Handle dict format
        metrics = {k: v[0] for k, v in metrics_data.items()}

    coverage_rate = metrics["successful_extractions"] / metrics["total_documents"]
    validation_rate = metrics["valid_extractions"] / metrics["successful_extractions"]

    return {
        "coverage": coverage_rate,
        "validation": validation_rate,
        "total": metrics["total_documents"]
    }
# Monitor per-carrier metrics
carrier_metrics = {}
for carrier in carriers:
    carrier_df = extracted.filter(fc.col("carrier") == carrier)
    carrier_metrics[carrier] = compute_coverage_metrics(carrier_df)

print(f"Coverage rates: {carrier_metrics}")

Coverage tracking identifies carriers requiring template adjustments.

Error Classification and Recovery

Categorize extraction failures for targeted fixes:

python
class ExtractionError(Exception):
    def __init__(self, document_id: str, error_type: str, message: str):
        self.document_id = document_id
        self.error_type = error_type
        self.message = message
        super().__init__(f"{error_type}: {message}")

def classify_extraction_error(error: Exception) -> str:
    error_msg = str(error).lower()

    if "timeout" in error_msg:
        return "timeout"
    elif "rate limit" in error_msg:
        return "rate_limit"
    elif "invalid schema" in error_msg:
        return "validation_error"
    elif "parse" in error_msg:
        return "parse_error"
    else:
        return "unknown_error"

# Track error distribution
error_tracker = {}
for error in failed_extractions:
    error_type = classify_extraction_error(error)
    error_tracker[error_type] = error_tracker.get(error_type, 0) + 1

print(f"Error distribution: {error_tracker}")

Error classification guides optimization priorities.

Alerting and Anomaly Detection

Implement automated alerts for extraction issues:

python
def check_extraction_health(metrics: dict, thresholds: dict):
    alerts = []

    if metrics["coverage"] < thresholds["min_coverage"]:
        alerts.append(f"Coverage rate {metrics['coverage']:.2%} below threshold {thresholds['min_coverage']:.2%}")

    if metrics["validation"] < thresholds["min_validation"]:
        alerts.append(f"Validation rate {metrics['validation']:.2%} below threshold {thresholds['min_validation']:.2%}")

    return alerts

thresholds = {
    "min_coverage": 0.90,
    "min_validation": 0.95
}

for carrier, metrics in carrier_metrics.items():
    alerts = check_extraction_health(metrics, thresholds)
    if alerts:
        print(f"ALERT - {carrier}: {', '.join(alerts)}")

Proactive alerting prevents quality degradation.

Real-World Implementation Patterns

Production extraction deployments follow common patterns based on operational requirements.

Daily Batch Processing

Most insurance operations run extraction on daily schedules:

python
def daily_extraction_pipeline(process_date: str):
    # Ingest documents from all carriers
    daily_docs = session.read.pdf_metadata(
        f"incoming/{process_date}/**/claims/*.pdf",
        recursive=True
    )

    # Parse and extract
    extracted = (
        daily_docs
        .select(
            fc.col("file_path"),
            fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
        )
        .select(
            fc.col("*"),
            fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
        )
        .unnest("claim")
    )

    # Validate and write
    validated = (
        extracted
        .filter(fc.col("claim_id").is_not_null())
        .with_column("process_date", fc.lit(process_date))
    )

    validated.write.parquet(
        f"output/{process_date}/claims.parquet",
        mode="overwrite"
    )

    # Generate metrics
    metrics = compute_coverage_metrics(validated)

    return metrics

# Run daily
import schedule

schedule.every().day.at("02:00").do(
    daily_extraction_pipeline,
    datetime.date.today().isoformat()
)

Scheduled processing handles predictable document volumes efficiently.

Real-Time Extraction

High-priority documents require immediate processing:

python
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class ClaimHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.src_path.endswith('.pdf'):
            self.process_claim(event.src_path)

    def process_claim(self, file_path: str):
        try:
            # Single-document extraction
            result_query = (
    session.read.pdf_metadata(file_path)
    .select(
        fc.col("file_path"),
        fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
    )
    .select(
        fc.col("*"),
        fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
    )
    .unnest("claim")
    .collect()
)

# Access the first row from the result data
result_data = result_query.data
if isinstance(result_data, list) and len(result_data) > 0:
    result = result_data[0]
else:
    # Handle dict format - convert to row dict
    result = {k: v[0] for k, v in result_data.items()}

            print(f"Processed: {file_path}")
            return result
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

# Monitor directory for new claims
observer = Observer()
observer.schedule(ClaimHandler(), "incoming/priority/", recursive=True)
observer.start()

Real-time processing ensures urgent claims receive immediate attention.

Incremental Processing

Update only changed documents in large datasets:

python
def incremental_extraction(checkpoint_path: str):
    # Load previous checkpoint
    try:
        processed = session.read.parquet(checkpoint_path)
        processed_paths = set(processed.select("file_path").collect())
    except:
        processed_paths = set()

    # Find new documents
    all_docs = session.read.pdf_metadata("carriers/**/*.pdf", recursive=True)
    new_docs = all_docs.filter(
        ~fc.col("file_path").is_in(list(processed_paths))
    )

    if new_docs.count() == 0:
        print("No new documents to process")
        return

    # Extract new documents
    new_extracted = (
        new_docs
        .select(
            fc.col("file_path"),
            fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
        )
        .select(
            fc.col("*"),
            fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim")
        )
        .unnest("claim")
    )

    # Update checkpoint
    combined = processed.union(new_extracted)
    combined.write.parquet(checkpoint_path, mode="overwrite")

    return new_extracted

# Run incremental updates
new_results = incremental_extraction("checkpoints/claims.parquet")

Incremental processing minimizes reprocessing costs.

Scaling Success Metrics

Successful extraction at scale delivers measurable operational improvements:

Processing time reduction. Organizations report 95% reduction in manual triage time after implementing semantic extraction. One case study showed RudderStack cut triage time by 95% using automated classification and extraction.

Cost efficiency. Research indicates schema-driven approaches can reduce labeling costs by 44× compared to supervised learning methods. Automated extraction eliminates annotation overhead while maintaining production accuracy.

Accuracy improvement. Schema-driven extraction achieves 74.2-96.1% F1 scores without task-specific training data. This matches or exceeds traditional supervised approaches that require thousands of labeled examples.

Operational scalability. Automated pipelines handle 2-3× document volume increases without proportional staff increases. Batch processing and intelligent routing enable efficient resource utilization.

Compliance adherence. Automated audit trails and data lineage provide complete processing transparency. Organizations maintain regulatory compliance while scaling throughput.

Getting Started

Begin with a focused pilot deployment:

  1. Select a single carrier and document type
  2. Define extraction schemas for target documents
  3. Build a basic pipeline with Fenic
  4. Validate accuracy against manual extraction
  5. Measure processing time and costs
  6. Expand to additional carriers incrementally

Start small, validate thoroughly, and scale systematically. Production-grade extraction builds on proven foundations rather than attempting complete automation immediately.

For implementation guidance, review semantic DataFrame operations and schema-driven extraction patterns. Additional resources cover LangGraph orchestration and production deployment strategies.

the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.