Insurance companies process millions of unstructured documents—policies, claims, underwriting reports, and regulatory filings. LangChain handles agent orchestration, but lacks the infrastructure to process this data at scale. Typedef.ai's Fenic framework fills this gap with DataFrame-based semantic operations.
This guide shows you how to build production-grade insurance analytics by combining LangChain agents with Fenic's semantic processing.
The Insurance Data Problem
Insurance workflows face three technical challenges:
Document variety: Policies arrive as PDFs, scanned images, and unstructured text across multiple carriers and lines of business.
Processing scale: Quarterly document volumes reach millions while response time requirements stay under one second for risk assessment and claims triage.
Regulatory requirements: Audit trails and compliance documentation require deterministic transformations with complete lineage tracking.
Traditional data pipelines break when handling unstructured insurance documents. LLM-based solutions provide semantic understanding but lack production reliability.
Architecture Pattern
Fenic treats inference as a first-class data operation. The pattern:
Insurance Documents → Fenic (batch semantic operations) → Structured Data → LangChain Agents (decisions)
Fenic handles:
- Batch processing of thousands of policy documents with automatic rate limiting
- Type-safe extraction from unstructured policy language into validated schemas
- Deterministic transformations with caching and lineage tracking
- Model Context Protocol integration for LangChain agent access
LangChain handles:
- Underwriting decisions
- Customer interactions
- Real-time agent workflows
Setup and Configuration
Install Fenic:
bashpip install fenic
Configure session with model credentials:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.core.types.inference.openai import OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api import functions as fc config = SessionConfig( app_name="insurance_analytics", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=500, tpm=500000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Processing Policy Documents
Load policy documents:
python# First, get PDF metadata to get file paths pdf_files = session.read.pdf_metadata( "policies/**/*.pdf", recursive=True ) # Parse PDFs to markdown using semantic.parse_pdf policies_df = pdf_files.select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ).filter( fc.col("content").is_not_null() ) # For markdown files, use read.docs separately md_files = session.read.docs( "policies/**/*.md", content_type="markdown", recursive=True ) md_policies = md_files.filter( fc.col("error").is_null() ).select( fc.col("file_path"), fc.col("content") ) # Combine both sources if needed policies_df = policies_df.union(md_policies)
Structured Data Extraction
Define schema-driven extraction with Pydantic models:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class Coverage(BaseModel): type: Literal["liability", "collision", "comprehensive", "medical"] limit: float = Field(description="Coverage limit in USD") deductible: float = Field(description="Deductible amount in USD") premium: float = Field(description="Annual premium in USD") class PolicyHolder(BaseModel): name: str age: int address: str risk_factors: List[str] = Field(description="Identified risk factors") class PolicyData(BaseModel): policy_number: str effective_date: str expiration_date: str policy_type: Literal["auto", "home", "life", "health", "commercial"] policyholder: PolicyHolder coverages: List[Coverage] total_premium: float
Extract structured data:
pythonextracted_policies = policies_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=PolicyData ).alias("policy_data") ) structured_policies = extracted_policies.select( fc.col("file_path"), fc.col("policy_data.policy_number").alias("policy_number"), fc.col("policy_data.policy_type").alias("policy_type"), fc.col("policy_data.total_premium").alias("total_premium"), fc.col("policy_data.policyholder").alias("policyholder"), fc.col("policy_data.coverages").alias("coverages") )
Claims Processing and Triage
Load and process claims documents:
pythonclaims_df = session.read.docs( "claims/**/*.txt", content_type="markdown", recursive=True ) class ClaimDetails(BaseModel): claim_number: str incident_date: str incident_type: Literal["accident", "theft", "fire", "water_damage", "liability", "medical"] severity: Literal["minor", "moderate", "severe", "catastrophic"] estimated_loss: float = Field(description="Estimated loss amount in USD") description: str requires_investigation: bool = Field(description="Whether claim needs special investigation") processed_claims = claims_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=ClaimDetails ).alias("claim_data") )
Triage by severity:
pythonhigh_priority_claims = processed_claims.filter( (fc.col("claim_data.severity").isin(["severe", "catastrophic"])) | (fc.col("claim_data.requires_investigation") == True) ).select( fc.col("claim_data.claim_number"), fc.col("claim_data.incident_type"), fc.col("claim_data.severity"), fc.col("claim_data.estimated_loss") )
Semantic Policy Matching
Use semantic joins to match claims with relevant policy coverages:
pythonfrom fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection examples = JoinExampleCollection() examples.create_example(JoinExample( left="Auto accident with collision damage to 2018 Honda Civic", right="Collision coverage with $500 deductible for vehicle damage", output=True )) examples.create_example(JoinExample( left="Water damage from burst pipe in basement", right="Comprehensive auto insurance policy", output=False )) matched_coverage = processed_claims.semantic.join( other=structured_policies, predicate=""" Claim: {{ left_on }} Coverage: {{ right_on }} This coverage applies to this type of claim. """, left_on=fc.col("claim_data.description"), right_on=fc.col("coverages"), examples=examples )
Risk Assessment and Classification
Classify applications by risk level:
pythonfrom fenic.core.types.classify import ClassDefinition risk_categories = [ ClassDefinition( label="Low Risk", description="Clean history, stable employment, low-risk occupation, good credit" ), ClassDefinition( label="Moderate Risk", description="Minor incidents in history, moderate-risk factors present" ), ClassDefinition( label="High Risk", description="Multiple incidents, high-risk occupation or location, poor credit history" ), ClassDefinition( label="Uninsurable", description="Extreme risk factors, fraud indicators, regulatory red flags" ) ] # Get PDF metadata first pdf_files = session.read.pdf_metadata("applications/**/*.pdf", recursive=True) # Parse PDFs to markdown applications_df = pdf_files.select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ).filter( fc.col("content").is_not_null() ) risk_assessed = applications_df.select( fc.col("file_path"), fc.semantic.classify( fc.col("content"), classes=risk_categories ).alias("risk_classification") ) high_risk_apps = risk_assessed.filter( fc.col("risk_classification").isin(["High Risk", "Uninsurable"]) )
Fraud Detection with Clustering
Group similar claims to detect patterns:
pythonclaims_with_embeddings = processed_claims.select( fc.col("claim_data.claim_number"), fc.col("claim_data.description"), fc.col("claim_data.incident_type"), fc.semantic.embed( fc.col("claim_data.description") ).alias("embeddings") ) clustered_claims = claims_with_embeddings.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=15, label_column="cluster_id", centroid_column="cluster_centroid" ) cluster_analysis = clustered_claims.group_by("cluster_id").agg( fc.count("*").alias("claim_count"), fc.first(fc.col("description")).alias("representative_claim"), fc.collect_list(fc.col("claim_number")).alias("claim_numbers") ) suspicious_clusters = cluster_analysis.filter( fc.col("claim_count") > 5 )
Compliance Validation
Extract compliance-relevant information:
pythonclass ComplianceData(BaseModel): has_required_disclosures: bool state_specific_requirements_met: bool missing_clauses: List[str] = Field(description="List of missing required clauses") regulatory_violations: List[str] = Field(description="Potential regulatory issues") filing_requirements: List[str] = Field(description="Required regulatory filings") compliance_check = structured_policies.select( fc.col("policy_number"), fc.col("policy_type"), fc.semantic.extract( fc.col("file_path"), response_format=ComplianceData ).alias("compliance") ) non_compliant = compliance_check.filter( (fc.col("compliance.has_required_disclosures") == False) | (fc.size(fc.col("compliance.regulatory_violations")) > 0) )
Creating MCP Tools for LangChain
Model Context Protocol tools expose Fenic DataFrames to LangChain agents.
Save processed data:
pythonstructured_policies.write.save_as_table("policies", mode="overwrite") processed_claims.write.save_as_table("claims", mode="overwrite")
Create policy search tool:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType, FloatType session.catalog.create_tool( tool_name="search_policies", tool_description="Search insurance policies by type, coverage amount, or policyholder details", tool_query=session.table("policies").filter( (fc.col("policy_type") == fc.tool_param("policy_type", StringType)) & (fc.col("total_premium") <= fc.tool_param("max_premium", FloatType)) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam( name="policy_type", description="Type of policy: auto, home, life, health, or commercial" ), ToolParam( name="max_premium", description="Maximum annual premium in USD", default_value=10000.0, has_default=True ), ToolParam( name="limit", description="Maximum number of results", default_value=20, has_default=True ) ], result_limit=50 )
Create claims analysis tool:
pythonsession.catalog.create_tool( tool_name="analyze_claims_by_severity", tool_description="Get claims statistics filtered by severity level", tool_query=session.table("claims").filter( fc.col("claim_data.severity") == fc.tool_param("severity", StringType) ).agg( fc.count("*").alias("total_claims"), fc.sum(fc.col("claim_data.estimated_loss")).alias("total_estimated_loss"), fc.avg(fc.col("claim_data.estimated_loss")).alias("avg_claim_value") ), tool_params=[ ToolParam( name="severity", description="Severity level: minor, moderate, severe, or catastrophic" ) ] )
Create policy-claim matching tool:
pythonmatch_query = session.table("policies").join( session.table("claims"), fc.col("policies.policy_number") == fc.tool_param("policy_number", StringType), "inner" ).select( fc.col("policies.policy_number"), fc.col("policies.policy_type"), fc.col("claims.claim_data") ) session.catalog.create_tool( tool_name="get_policy_claims", tool_description="Retrieve all claims associated with a specific policy number", tool_query=match_query, tool_params=[ ToolParam( name="policy_number", description="The policy number to search for" ) ], result_limit=100 )
Deploying the MCP Server
Launch HTTP server:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="InsuranceAnalyticsServer", user_defined_tools=tools, concurrency_limit=15 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI deployment:
pythonfrom fenic.api.mcp import run_mcp_server_asgi app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
LangChain Agent Integration
Connect agents to MCP server:
pythonfrom langchain.agents import AgentExecutor, create_openai_functions_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder import requests def call_fenic_tool(tool_name: str, **params): response = requests.post( "http://127.0.0.1:8000/mcp", json={ "tool": tool_name, "parameters": params } ) return response.json() langchain_tools = [ Tool( name="search_policies", func=lambda policy_type, max_premium=10000, limit=20: call_fenic_tool( "search_policies", policy_type=policy_type, max_premium=max_premium, limit=limit ), description="Search insurance policies by type, coverage amount, or policyholder details" ), Tool( name="analyze_claims_by_severity", func=lambda severity: call_fenic_tool( "analyze_claims_by_severity", severity=severity ), description="Get claims statistics filtered by severity level" ), Tool( name="get_policy_claims", func=lambda policy_number: call_fenic_tool( "get_policy_claims", policy_number=policy_number ), description="Retrieve all claims associated with a specific policy number" ) ] prompt = ChatPromptTemplate.from_messages([ ("system", """You are an insurance analyst assistant. Use the available tools to: - Search and analyze insurance policies - Review claims data and identify patterns - Match claims to appropriate policy coverages - Assess risk levels and provide underwriting recommendations Always provide specific data and reasoning for your recommendations."""), ("user", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) llm = ChatOpenAI(model="gpt-4", temperature=0) agent = create_openai_functions_agent(llm, langchain_tools, prompt) agent_executor = AgentExecutor( agent=agent, tools=langchain_tools, verbose=True ) result = agent_executor.invoke({ "input": "Find all auto policies with premiums under $2000 and check if any have severe claims" })
Advanced Workflows
Carrier Policy Comparison
Compare coverage terms across carriers:
python# Parse PDFs from carrier A carrier_a_pdf_files = session.read.pdf_metadata("carrier_a/**/*.pdf", recursive=True) carrier_a_policies = carrier_a_pdf_files.select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ).filter( fc.col("content").is_not_null() ) # Parse PDFs from carrier B carrier_b_pdf_files = session.read.pdf_metadata("carrier_b/**/*.pdf", recursive=True) carrier_b_policies = carrier_b_pdf_files.select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ).filter( fc.col("content").is_not_null() ) def extract_coverage_terms(df, carrier_name): return df.select( fc.lit(carrier_name).alias("carrier"), fc.col("content"), fc.semantic.extract( fc.col("content"), response_format=PolicyData ).alias("terms") ) carrier_a_terms = extract_coverage_terms(carrier_a_policies, "Carrier A") carrier_b_terms = extract_coverage_terms(carrier_b_policies, "Carrier B") equivalent_coverages = carrier_a_terms.semantic.join( other=carrier_b_terms, predicate=""" Policy A: {{ left_on }} Policy B: {{ right_on }} These policies offer equivalent coverage for similar risk profiles. """, left_on=fc.col("terms.coverages"), right_on=fc.col("terms.coverages") )
Fraud Pattern Detection
Identify suspicious claim patterns:
pythonfraud_indicators = processed_claims.select( fc.col("claim_data.claim_number"), fc.col("claim_data.description"), fc.col("claim_data.estimated_loss"), fc.semantic.embed(fc.col("claim_data.description")).alias("embeddings") ).semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=20, label_column="pattern_cluster" ) cluster_scores = fraud_indicators.group_by("pattern_cluster").agg( fc.count("*").alias("cluster_size"), fc.avg(fc.col("estimated_loss")).alias("avg_loss"), fc.stddev(fc.col("estimated_loss")).alias("loss_stddev") ).with_column( "fraud_risk_score", fc.when( (fc.col("cluster_size") > 10) & (fc.col("loss_stddev") < 1000), fc.lit("HIGH") ).otherwise(fc.lit("LOW")) ) high_risk_claims = fraud_indicators.join( cluster_scores, "pattern_cluster" ).filter( fc.col("fraud_risk_score") == "HIGH" )
Pricing Factor Analysis
Extract and analyze pricing patterns:
pythonclass PricingFactors(BaseModel): base_rate: float risk_multipliers: dict = Field(description="Risk factor adjustments") discounts_applied: List[str] final_premium: float pricing_analysis = structured_policies.select( fc.col("policy_type"), fc.col("policyholder.age"), fc.col("policyholder.risk_factors"), fc.col("total_premium"), fc.semantic.extract( fc.col("file_path"), response_format=PricingFactors ).alias("pricing_breakdown") ) demographic_pricing = pricing_analysis.group_by( fc.col("policy_type"), fc.floor(fc.col("policyholder.age") / 10).alias("age_bracket") ).agg( fc.avg(fc.col("total_premium")).alias("avg_premium"), fc.count("*").alias("policy_count") )
Performance Optimization
Batch Processing
Process large document sets in batches:
python# Get PDF metadata first pdf_files = session.read.pdf_metadata("all_policies/**/*.pdf", recursive=True) # Parse PDFs to markdown large_policy_set = pdf_files.select( fc.col("file_path"), fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ).filter( fc.col("content").is_not_null() ) batch_size = 1000 total_docs = large_policy_set.count() for offset in range(0, total_docs, batch_size): batch = large_policy_set.limit(batch_size).offset(offset) processed_batch = batch.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=PolicyData ).alias("policy_data") ) processed_batch.write.save_as_table( "all_policies_processed", mode="append" )
Caching Embeddings
Cache expensive operations:
pythonpolicies_with_embeddings = structured_policies.select( fc.col("policy_number"), fc.col("file_path"), fc.semantic.embed(fc.col("file_path")).alias("policy_embeddings") ) policies_with_embeddings.write.save_as_table( "cached_policy_embeddings", mode="overwrite" ) cached_embeddings = session.table("cached_policy_embeddings")
Rate Limit Configuration
Configure model rate limits:
pythonconfig = SessionConfig( app_name="insurance_analytics", semantic=SemanticConfig( language_models={ "fast_model": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ), "accurate_model": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100000 ) }, default_language_model="fast_model" ) )
Monitoring and Cost Tracking
Track processing metrics:
pythonmetrics_df = session.table("fenic_system.query_metrics") cost_analysis = metrics_df.select( fc.col("query_id"), fc.col("total_lm_cost"), fc.col("total_lm_requests"), fc.col("end_ts") ).order_by(fc.col("end_ts").desc()).limit(100) hourly_costs = session.sql(""" SELECT DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour, SUM(total_lm_cost) as total_cost, SUM(total_lm_requests) as total_requests, AVG(total_lm_cost) as avg_cost_per_query FROM {metrics} WHERE end_ts > NOW() - INTERVAL '7 days' GROUP BY hour ORDER BY hour DESC """, metrics=metrics_df) hourly_costs.show()
Production Deployment
Error Handling
Add validation layers:
pythonvalidated_policies = extracted_policies.with_column( "is_valid", fc.when( fc.col("policy_data.policy_number").is_not_null() & (fc.col("policy_data.total_premium") > 0), True ).otherwise(False) ) valid_policies = validated_policies.filter(fc.col("is_valid")) invalid_policies = validated_policies.filter(~fc.col("is_valid")) invalid_policies.select( fc.col("file_path"), fc.lit("Validation failed: missing required fields").alias("error") ).write.csv("validation_errors.csv")
Incremental Processing
Process only new documents:
pythonexisting_policies = session.table("policies") processed_paths = existing_policies.select( fc.col("file_path") ).distinct().collect() processed_set = {row["file_path"] for row in processed_paths} new_policies = policies_df.filter( ~fc.col("file_path").isin(list(processed_set)) ) new_processed = new_policies.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=PolicyData ).alias("policy_data") ) new_processed.write.save_as_table("policies", mode="append")
Implementation Summary
This architecture delivers production-grade insurance analytics by combining:
- Fenic for batch processing: Handle semantic operations on insurance documents at scale with semantic operators
- Type-safe extraction: Use Pydantic schemas for consistent structured output
- Semantic matching: Apply semantic joins for claims-to-coverage matching and classification for risk assessment
- MCP integration: Deploy tools that LangChain agents can access directly
- Production optimization: Cache embeddings, batch process documents, and monitor costs
The separation of concerns enables reliable insurance analytics: Fenic handles data transformation and enrichment while LangChain agents make underwriting decisions and handle customer interactions.
For more implementation patterns, see Typedef's platform resources, RudderStack case study, and semantic operator documentation. How to Augment LangChain with ... 1efcf080008b5cef507003f305.md External Displaying How to Augment LangChain with Typedef ai for Insur 290df41efcf080008b5cef507003f305.md.

