Insurance operations generate massive volumes of unstructured data daily: policy documents, claims forms, medical records, underwriting reports, and customer correspondence. Processing this data manually creates bottlenecks, while traditional extraction tools struggle with format variations across carriers. This guide shows how to build production-grade semantic extraction pipelines that scale efficiently across multiple insurance carriers.
Why Traditional Approaches Fail at Scale
Insurance data presents unique extraction challenges that break conventional tools:
Format heterogeneity across carriers. Each carrier uses different document templates, field names, and data structures. A claims form from Carrier A looks nothing like one from Carrier B. Traditional regex-based extraction requires maintaining separate rule sets per carrier, creating unmaintainable technical debt.
Unstructured dominates the data landscape. Text narratives, scanned documents, and semi-structured forms comprise the majority of insurance data. Rule-based systems handle structured data well but fail on text that requires interpretation and context understanding.
Volume compounds complexity. Processing thousands of documents daily while maintaining accuracy requires infrastructure that most extraction tools weren't designed to handle. Batch processing, rate limiting, and error recovery become critical at scale.
Compliance requirements add constraints. HIPAA, state insurance regulations, and carrier-specific data handling policies demand audit trails, data lineage, and controlled access patterns that traditional point solutions don't provide.
Schema-Driven Extraction: The Foundation for Scale
Schema-driven extraction replaces brittle prompt engineering with type-safe data contracts. Research shows this approach achieves 74.2-96.1% F1 scores on extraction tasks without requiring task-specific labeled data for each document type.
The economics shift dramatically. Traditional supervised learning requires thousands of labeled examples per document type. One study found schema-guided extraction costs approximately $3 per document versus $133 for human-labeled training data in dialogue annotation contexts.
Defining Extraction Schemas
Schemas serve as both documentation and validation. Here's how to structure extraction for insurance documents:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal, Optional class ClaimItem(BaseModel): service_code: str = Field(description="CPT or procedure code") service_date: str = Field(description="Date of service in YYYY-MM-DD format") provider_name: str = Field(description="Name of healthcare provider") billed_amount: float = Field(description="Amount billed in USD") allowed_amount: Optional[float] = Field(description="Carrier-allowed amount") class MedicalClaim(BaseModel): claim_id: str = Field(description="Unique claim identifier") patient_id: str = Field(description="Patient account number") diagnosis_codes: List[str] = Field(description="ICD-10 diagnosis codes") claim_status: Literal["pending", "approved", "denied", "under_review"] claim_items: List[ClaimItem] = Field(description="Line items on the claim") total_amount: float = Field(description="Total claim amount")
This schema works across carriers because it captures semantic intent rather than specific field positions or formats. The LLM maps carrier-specific terminology to your canonical schema automatically.
Implementing Extraction with Fenic
Fenic provides semantic extraction as a DataFrame operation. The same code processes documents from any carrier:
pythonimport fenic as fc # Configure session with appropriate model config = fc.SessionConfig( app_name="insurance_extraction", semantic=fc.SemanticConfig( language_models={ "gpt4": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 ) }, default_language_model="gpt4" ) ) session = fc.Session.get_or_create(config) # Load claims documents claims_df = session.read.pdf_metadata("claims/*.pdf") # Extract structured data with schema extracted = claims_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), MedicalClaim ).alias("claim_data") ) # Unnest and validate structured_claims = ( extracted .unnest("claim_data") .filter(fc.col("total_amount") > 0) )
The extraction handles format variations automatically. The model interprets document content and maps it to your schema regardless of how the source carrier structures their data.
Building Production-Grade Extraction Pipelines
Production extraction requires more than basic schema application. You need preprocessing, validation, error recovery, and monitoring capabilities.
Multi-Stage Pipeline Architecture
Effective pipelines separate concerns across distinct processing stages:
python# Stage 1: Document ingestion and preprocessing raw_docs = ( session.read.pdf_metadata("carriers/**/claims/*.pdf", recursive=True) .filter(fc.col("page_count") < 50) # Filter unreasonably large files .filter(fc.col("file_size") < 10_000_000) # 10MB limit ) # Stage 2: Parse PDFs with page chunking parsed = raw_docs.select( fc.col("file_path"), fc.col("carrier_id"), # Derived from file path fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=False # Optimize for text-heavy documents ).alias("content") ) # Stage 3: Extract with carrier-specific context extracted = parsed.with_column( "extracted_data", fc.semantic.extract( fc.col("content"), MedicalClaim, model_alias="gpt4", temperature=0.0 # Deterministic extraction ) ) # Stage 4: Validation and enrichment validated = ( extracted .unnest("extracted_data") .filter(fc.col("claim_id").is_not_null()) .with_column( "validation_status", fc.when(fc.col("total_amount") == fc.col("claim_items").sum("billed_amount")) .then("valid") .otherwise("amount_mismatch") ) )
This staged approach enables independent testing and optimization of each phase. Parsing failures don't block extraction of successfully parsed documents.
Handling Multi-Carrier Variations
Different carriers require different preprocessing strategies. Use carrier identification to route documents appropriately:
python# Identify carrier from file path or document metadata carrier_tagged = parsed.with_column( "carrier", fc.when(fc.col("file_path").contains("aetna")) .then("aetna") .when(fc.col("file_path").contains("united")) .then("united_healthcare") .otherwise("unknown") ) # Apply carrier-specific extraction parameters def extract_by_carrier(df): return df.select( fc.col("*"), fc.when(fc.col("carrier") == "aetna") .then(fc.semantic.extract(fc.col("content"), MedicalClaim, temperature=0.0)) .when(fc.col("carrier") == "united_healthcare") .then(fc.semantic.extract(fc.col("content"), MedicalClaim, temperature=0.1)) .otherwise(fc.semantic.extract(fc.col("content"), MedicalClaim)) .alias("claim_data") ) carrier_extracted = extract_by_carrier(carrier_tagged)
This pattern maintains a single extraction schema while accommodating carrier-specific quirks through parameter adjustments.
Multi-Carrier Deployment Strategies
Scaling across carriers requires deployment patterns that handle varying volumes, formats, and processing requirements.
Batch Processing Architecture
Process documents in batches to optimize throughput and manage costs. Fenic automatically batches API calls based on provider limits:
python# Configure batching behavior config = fc.SessionConfig( app_name="multi_carrier_extraction", semantic=fc.SemanticConfig( language_models={ "batch_model": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=1000, # Requests per minute limit tpm=500_000 # Tokens per minute limit ) }, default_language_model="batch_model" ) ) # Process carrier batches independently def process_carrier_batch(carrier_name: str, batch_date: str): batch_path = f"incoming/{carrier_name}/{batch_date}/*.pdf" results = ( session.read.pdf_metadata(batch_path) .select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .select( fc.col("file_path"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) .unnest("claim") ) # Write results to carrier-specific output results.write.parquet(f"processed/{carrier_name}/{batch_date}/") return results # Schedule batch processing per carrier carriers = ["aetna", "united", "cigna", "anthem"] for carrier in carriers: process_carrier_batch(carrier, "2025-01-15")
Batch isolation ensures one carrier's processing issues don't block others.
Parallel Processing Patterns
Scale horizontally by processing multiple carriers or document batches simultaneously:
pythonfrom concurrent.futures import ThreadPoolExecutor, as_completed def process_batch(carrier: str, batch_id: str): try: result = process_carrier_batch(carrier, batch_id) return {"carrier": carrier, "batch": batch_id, "status": "success", "count": result.count()} except Exception as e: return {"carrier": carrier, "batch": batch_id, "status": "error", "error": str(e)} # Process multiple batches in parallel batches = [ ("aetna", "2025-01-15"), ("united", "2025-01-15"), ("cigna", "2025-01-15"), ("anthem", "2025-01-15") ] with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(process_batch, carrier, batch) for carrier, batch in batches] for future in as_completed(futures): result = future.result() print(f"Processed {result['carrier']}/{result['batch']}: {result['status']}")
This pattern maximizes throughput when processing independent document sets.
Performance Optimization Techniques
Production extraction must balance accuracy, speed, and cost. Optimization focuses on reducing redundant processing and maximizing API efficiency.
Deduplication Before Extraction
Insurance documents often contain duplicates or near-duplicates. Extract unique content only:
python# Deduplicate based on content fingerprints from hashlib import sha256 from fenic.api.functions import udf from fenic.core.types import StringType @udf(return_type=StringType) def compute_fingerprint(content: str) -> str: return sha256(content.encode()).hexdigest() deduplicated = ( parsed .with_column("fingerprint", compute_fingerprint(fc.col("content"))) .drop_duplicates(["fingerprint"]) ) # Extract only unique documents unique_extracted = deduplicated.select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") )
This approach reduces API calls proportionally to duplication rate. One carrier reported 40% duplicate rate across daily submissions.
Caching Extraction Results
Cache extracted data to avoid reprocessing:
python# Check cache before extraction cached_path = "cache/extractions.parquet" try: cached = session.read.parquet(cached_path) cached_fingerprints = set(cached.select("fingerprint").collect()) except: cached_fingerprints = set() # Process only new documents new_docs = deduplicated.filter( ~fc.col("fingerprint").is_in(list(cached_fingerprints)) ) new_extracted = new_docs.select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) # Append to cache if new_extracted.count() > 0: combined = cached.union(new_extracted) combined.write.parquet(cached_path, mode="overwrite")
Caching proves critical for reprocessing pipelines and development iteration.
Selective Extraction Depth
Not all documents require full extraction. Use classification to route documents:
python# First pass: classify document type classified = parsed.select( fc.col("*"), fc.semantic.classify( fc.col("content"), ["standard_claim", "complex_claim", "appeal", "correspondence"] ).alias("doc_type") ) # Deep extraction only for relevant documents standard_claims = ( classified .filter(fc.col("doc_type") == "standard_claim") .select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) ) # Lighter extraction for other types correspondence = ( classified .filter(fc.col("doc_type") == "correspondence") .select( fc.col("*"), fc.semantic.extract(fc.col("content"), CorrespondenceSummary).alias("summary") ) )
This tiered approach allocates compute budget to high-value documents.
Cost Management at Scale
Extraction costs scale with document volume and complexity. Effective cost management balances accuracy requirements against budget constraints.
Model Selection Strategy
Different models offer different cost-accuracy tradeoffs. Use OpenRouter for dynamic provider routing:
pythonconfig = fc.SessionConfig( app_name="cost_optimized", semantic=fc.SemanticConfig( language_models={ "cost_optimized": fc.OpenRouterLanguageModel( model_name="openai/gpt-4o-mini", profiles={ "default": fc.OpenRouterLanguageModel.Profile( provider=fc.OpenRouterLanguageModel.Provider( sort="latency" ) ) }, default_profile="default" ), "accuracy_focused": fc.OpenAILanguageModel( model_name="gpt-4.1", rpm=100, tpm=50_000 ) }, default_language_model="cost_optimized" ) ) # Route based on document complexity def smart_extract(df): return df.select( fc.col("*"), fc.when(fc.col("page_count") > 20) .then(fc.semantic.extract(fc.col("content"), MedicalClaim, model_alias="accuracy_focused")) .otherwise(fc.semantic.extract(fc.col("content"), MedicalClaim, model_alias="cost_optimized")) .alias("claim") )
This pattern uses expensive models only when justified by complexity.
Token Usage Optimization
Minimize token consumption through preprocessing:
python# Extract only relevant sections before semantic processing # Note: Fenic uses text.extract() with templates, not regex_extract # For regex-based extraction, use regexp_replace or text processing functions relevant_sections = parsed.with_column( "relevant_content", fc.text.regexp_replace( fc.col("content"), r"^.*?(CLAIM DETAILS.*?)(?:NOTES:|$).*$", "$1" ) ) # Use extracted section for processing optimized_extraction = relevant_sections.select( fc.col("*"), fc.semantic.extract( fc.col("relevant_content"), # Smaller input = lower cost MedicalClaim ).alias("claim") )
Reducing input size can cut token costs by 50-70% on verbose documents.
Implementing Cost Guardrails
Track and limit extraction costs in production:
pythonfrom dataclasses import dataclass from typing import Dict @dataclass class CostTracker: model_costs: Dict[str, float] = None def __post_init__(self): # Cost per 1M tokens (input/output) self.model_costs = { "gpt-4.1-nano": (0.30, 1.20), "gpt-4o-mini": (0.15, 0.60) } def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float: input_cost, output_cost = self.model_costs.get(model, (0, 0)) return (input_tokens * input_cost + output_tokens * output_cost) / 1_000_000 # Monitor costs during extraction tracker = CostTracker() def extract_with_budget(df, max_budget: float): estimated_cost = estimate_extraction_cost(df) if estimated_cost > max_budget: raise ValueError(f"Estimated cost ${estimated_cost:.2f} exceeds budget ${max_budget:.2f}") return df.select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") )
Budget enforcement prevents cost overruns from unexpected document volumes.
Ensuring Compliance and Data Governance
Insurance data carries strict regulatory requirements. Production pipelines must implement comprehensive compliance controls.
PHI Redaction
Remove protected health information before processing:
pythonimport re import re from fenic.api.functions import udf from fenic.core.types import StringType @udf(return_type=StringType) def redact_phi(text: str) -> str: # Redact SSN patterns text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN_REDACTED]', text) # Redact phone numbers text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_REDACTED]', text) # Redact email addresses text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL_REDACTED]', text) return text # Apply redaction before extraction redacted = parsed.with_column( "safe_content", redact_phi(fc.col("content")) ) extracted = redacted.select( fc.col("*"), fc.semantic.extract(fc.col("safe_content"), MedicalClaim).alias("claim") ) extracted = redacted.select( fc.col("*"), fc.semantic.extract(fc.col("safe_content"), MedicalClaim).alias("claim") )
Redaction protects sensitive data while preserving document structure for extraction.
Audit Trail Implementation
Maintain complete processing lineage for compliance verification:
pythonimport datetime def add_audit_metadata(df): return df.select( fc.col("*"), fc.lit(datetime.datetime.utcnow().isoformat()).alias("processed_at"), fc.lit("v1.2.3").alias("pipeline_version"), fc.col("file_path").alias("source_document") ) # Apply audit metadata at each stage audited_parsed = add_audit_metadata(parsed) audited_extracted = add_audit_metadata(extracted) # Persist audit trail audited_extracted.write.parquet( "audit/extractions/", mode="append", partition_by=["processed_date", "carrier"] )
Audit trails enable compliance verification and issue investigation.
Access Control Patterns
Implement role-based access to extracted data:
python# Separate outputs by access level public_fields = extracted.select( fc.col("claim_id"), fc.col("claim_status"), fc.col("total_amount") ) restricted_fields = extracted.select( fc.col("claim_id"), fc.col("patient_id"), fc.col("diagnosis_codes"), fc.col("claim_items") ) # Write to separate locations with different permissions public_fields.write.parquet("output/public/") restricted_fields.write.parquet("output/restricted/")
Data separation enables granular access control at the storage layer.
Production Deployment and Monitoring
Production extraction requires robust deployment infrastructure and comprehensive monitoring.
Cloud Deployment with Typedef
Develop locally with Fenic, deploy to Typedef cloud with zero code changes:
python# Local development configuration dev_config = fc.SessionConfig( app_name="insurance_extraction", semantic=fc.SemanticConfig( language_models={ "gpt4": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=100, tpm=50_000 ) }, default_language_model="gpt4" ) ) # Production configuration with cloud execution prod_config = fc.SessionConfig( app_name="insurance_extraction", semantic=fc.SemanticConfig( language_models={ "gpt4": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=1000, tpm=500_000 ) }, default_language_model="gpt4" ), cloud=fc.CloudConfig(size=fc.CloudExecutorSize.MEDIUM) ) # Same extraction code works in both environments session = fc.Session.get_or_create(prod_config)
Cloud deployment provides automatic scaling without pipeline modifications.
Coverage and Quality Metrics
Track extraction quality across carriers:
pythondef compute_coverage_metrics(extracted_df): result = extracted_df.select( fc.count("*").alias("total_documents"), fc.count(fc.when(fc.col("claim_id").is_not_null()).then(1)).alias("successful_extractions"), fc.count(fc.when(fc.col("validation_status") == "valid").then(1)).alias("valid_extractions") ).collect() # Access the data from QueryResult metrics_data = result.data # Convert to dict if needed (assuming pydict format) if isinstance(metrics_data, list) and len(metrics_data) > 0: metrics = metrics_data[0] else: # Handle dict format metrics = {k: v[0] for k, v in metrics_data.items()} coverage_rate = metrics["successful_extractions"] / metrics["total_documents"] validation_rate = metrics["valid_extractions"] / metrics["successful_extractions"] return { "coverage": coverage_rate, "validation": validation_rate, "total": metrics["total_documents"] } # Monitor per-carrier metrics carrier_metrics = {} for carrier in carriers: carrier_df = extracted.filter(fc.col("carrier") == carrier) carrier_metrics[carrier] = compute_coverage_metrics(carrier_df) print(f"Coverage rates: {carrier_metrics}")
Coverage tracking identifies carriers requiring template adjustments.
Error Classification and Recovery
Categorize extraction failures for targeted fixes:
pythonclass ExtractionError(Exception): def __init__(self, document_id: str, error_type: str, message: str): self.document_id = document_id self.error_type = error_type self.message = message super().__init__(f"{error_type}: {message}") def classify_extraction_error(error: Exception) -> str: error_msg = str(error).lower() if "timeout" in error_msg: return "timeout" elif "rate limit" in error_msg: return "rate_limit" elif "invalid schema" in error_msg: return "validation_error" elif "parse" in error_msg: return "parse_error" else: return "unknown_error" # Track error distribution error_tracker = {} for error in failed_extractions: error_type = classify_extraction_error(error) error_tracker[error_type] = error_tracker.get(error_type, 0) + 1 print(f"Error distribution: {error_tracker}")
Error classification guides optimization priorities.
Alerting and Anomaly Detection
Implement automated alerts for extraction issues:
pythondef check_extraction_health(metrics: dict, thresholds: dict): alerts = [] if metrics["coverage"] < thresholds["min_coverage"]: alerts.append(f"Coverage rate {metrics['coverage']:.2%} below threshold {thresholds['min_coverage']:.2%}") if metrics["validation"] < thresholds["min_validation"]: alerts.append(f"Validation rate {metrics['validation']:.2%} below threshold {thresholds['min_validation']:.2%}") return alerts thresholds = { "min_coverage": 0.90, "min_validation": 0.95 } for carrier, metrics in carrier_metrics.items(): alerts = check_extraction_health(metrics, thresholds) if alerts: print(f"ALERT - {carrier}: {', '.join(alerts)}")
Proactive alerting prevents quality degradation.
Real-World Implementation Patterns
Production extraction deployments follow common patterns based on operational requirements.
Daily Batch Processing
Most insurance operations run extraction on daily schedules:
pythondef daily_extraction_pipeline(process_date: str): # Ingest documents from all carriers daily_docs = session.read.pdf_metadata( f"incoming/{process_date}/**/claims/*.pdf", recursive=True ) # Parse and extract extracted = ( daily_docs .select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) .unnest("claim") ) # Validate and write validated = ( extracted .filter(fc.col("claim_id").is_not_null()) .with_column("process_date", fc.lit(process_date)) ) validated.write.parquet( f"output/{process_date}/claims.parquet", mode="overwrite" ) # Generate metrics metrics = compute_coverage_metrics(validated) return metrics # Run daily import schedule schedule.every().day.at("02:00").do( daily_extraction_pipeline, datetime.date.today().isoformat() )
Scheduled processing handles predictable document volumes efficiently.
Real-Time Extraction
High-priority documents require immediate processing:
pythonfrom watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class ClaimHandler(FileSystemEventHandler): def on_created(self, event): if event.src_path.endswith('.pdf'): self.process_claim(event.src_path) def process_claim(self, file_path: str): try: # Single-document extraction result_query = ( session.read.pdf_metadata(file_path) .select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) .unnest("claim") .collect() ) # Access the first row from the result data result_data = result_query.data if isinstance(result_data, list) and len(result_data) > 0: result = result_data[0] else: # Handle dict format - convert to row dict result = {k: v[0] for k, v in result_data.items()} print(f"Processed: {file_path}") return result except Exception as e: print(f"Error processing {file_path}: {e}") # Monitor directory for new claims observer = Observer() observer.schedule(ClaimHandler(), "incoming/priority/", recursive=True) observer.start()
Real-time processing ensures urgent claims receive immediate attention.
Incremental Processing
Update only changed documents in large datasets:
pythondef incremental_extraction(checkpoint_path: str): # Load previous checkpoint try: processed = session.read.parquet(checkpoint_path) processed_paths = set(processed.select("file_path").collect()) except: processed_paths = set() # Find new documents all_docs = session.read.pdf_metadata("carriers/**/*.pdf", recursive=True) new_docs = all_docs.filter( ~fc.col("file_path").is_in(list(processed_paths)) ) if new_docs.count() == 0: print("No new documents to process") return # Extract new documents new_extracted = ( new_docs .select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .select( fc.col("*"), fc.semantic.extract(fc.col("content"), MedicalClaim).alias("claim") ) .unnest("claim") ) # Update checkpoint combined = processed.union(new_extracted) combined.write.parquet(checkpoint_path, mode="overwrite") return new_extracted # Run incremental updates new_results = incremental_extraction("checkpoints/claims.parquet")
Incremental processing minimizes reprocessing costs.
Scaling Success Metrics
Successful extraction at scale delivers measurable operational improvements:
Processing time reduction. Organizations report 95% reduction in manual triage time after implementing semantic extraction. One case study showed RudderStack cut triage time by 95% using automated classification and extraction.
Cost efficiency. Research indicates schema-driven approaches can reduce labeling costs by 44× compared to supervised learning methods. Automated extraction eliminates annotation overhead while maintaining production accuracy.
Accuracy improvement. Schema-driven extraction achieves 74.2-96.1% F1 scores without task-specific training data. This matches or exceeds traditional supervised approaches that require thousands of labeled examples.
Operational scalability. Automated pipelines handle 2-3× document volume increases without proportional staff increases. Batch processing and intelligent routing enable efficient resource utilization.
Compliance adherence. Automated audit trails and data lineage provide complete processing transparency. Organizations maintain regulatory compliance while scaling throughput.
Getting Started
Begin with a focused pilot deployment:
- Select a single carrier and document type
- Define extraction schemas for target documents
- Build a basic pipeline with Fenic
- Validate accuracy against manual extraction
- Measure processing time and costs
- Expand to additional carriers incrementally
Start small, validate thoroughly, and scale systematically. Production-grade extraction builds on proven foundations rather than attempting complete automation immediately.
For implementation guidance, review semantic DataFrame operations and schema-driven extraction patterns. Additional resources cover LangGraph orchestration and production deployment strategies.
