<< goback()

How to Augment LangChain with Typedef.ai for Insurance Analytics

Typedef Team

How to Augment LangChain with Typedef.ai for Insurance Analytics

Insurance companies process millions of unstructured documents—policies, claims, underwriting reports, and regulatory filings. LangChain handles agent orchestration, but lacks the infrastructure to process this data at scale. Typedef.ai's Fenic framework fills this gap with DataFrame-based semantic operations.

This guide shows you how to build production-grade insurance analytics by combining LangChain agents with Fenic's semantic processing.

The Insurance Data Problem

Insurance workflows face three technical challenges:

Document variety: Policies arrive as PDFs, scanned images, and unstructured text across multiple carriers and lines of business.

Processing scale: Quarterly document volumes reach millions while response time requirements stay under one second for risk assessment and claims triage.

Regulatory requirements: Audit trails and compliance documentation require deterministic transformations with complete lineage tracking.

Traditional data pipelines break when handling unstructured insurance documents. LLM-based solutions provide semantic understanding but lack production reliability.

Architecture Pattern

Fenic treats inference as a first-class data operation. The pattern:

Insurance Documents → Fenic (batch semantic operations) → Structured Data → LangChain Agents (decisions)

Fenic handles:

  • Batch processing of thousands of policy documents with automatic rate limiting
  • Type-safe extraction from unstructured policy language into validated schemas
  • Deterministic transformations with caching and lineage tracking
  • Model Context Protocol integration for LangChain agent access

LangChain handles:

  • Underwriting decisions
  • Customer interactions
  • Real-time agent workflows

Setup and Configuration

Install Fenic:

bash
pip install fenic

Configure session with model credentials:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.core.types.inference.openai import OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api import functions as fc

config = SessionConfig(
    app_name="insurance_analytics",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=500,
                tpm=500000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Processing Policy Documents

Load policy documents:

python
# First, get PDF metadata to get file paths
pdf_files = session.read.pdf_metadata(
    "policies/**/*.pdf",
    recursive=True
)

# Parse PDFs to markdown using semantic.parse_pdf
policies_df = pdf_files.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
).filter(
    fc.col("content").is_not_null()
)

# For markdown files, use read.docs separately
md_files = session.read.docs(
    "policies/**/*.md",
    content_type="markdown",
    recursive=True
)

md_policies = md_files.filter(
    fc.col("error").is_null()
).select(
    fc.col("file_path"),
    fc.col("content")
)

# Combine both sources if needed
policies_df = policies_df.union(md_policies)

Structured Data Extraction

Define schema-driven extraction with Pydantic models:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class Coverage(BaseModel):
    type: Literal["liability", "collision", "comprehensive", "medical"]
    limit: float = Field(description="Coverage limit in USD")
    deductible: float = Field(description="Deductible amount in USD")
    premium: float = Field(description="Annual premium in USD")

class PolicyHolder(BaseModel):
    name: str
    age: int
    address: str
    risk_factors: List[str] = Field(description="Identified risk factors")

class PolicyData(BaseModel):
    policy_number: str
    effective_date: str
    expiration_date: str
    policy_type: Literal["auto", "home", "life", "health", "commercial"]
    policyholder: PolicyHolder
    coverages: List[Coverage]
    total_premium: float

Extract structured data:

python
extracted_policies = policies_df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=PolicyData
    ).alias("policy_data")
)

structured_policies = extracted_policies.select(
    fc.col("file_path"),
    fc.col("policy_data.policy_number").alias("policy_number"),
    fc.col("policy_data.policy_type").alias("policy_type"),
    fc.col("policy_data.total_premium").alias("total_premium"),
    fc.col("policy_data.policyholder").alias("policyholder"),
    fc.col("policy_data.coverages").alias("coverages")
)

Claims Processing and Triage

Load and process claims documents:

python
claims_df = session.read.docs(
    "claims/**/*.txt",
    content_type="markdown",
    recursive=True
)

class ClaimDetails(BaseModel):
    claim_number: str
    incident_date: str
    incident_type: Literal["accident", "theft", "fire", "water_damage", "liability", "medical"]
    severity: Literal["minor", "moderate", "severe", "catastrophic"]
    estimated_loss: float = Field(description="Estimated loss amount in USD")
    description: str
    requires_investigation: bool = Field(description="Whether claim needs special investigation")

processed_claims = claims_df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ClaimDetails
    ).alias("claim_data")
)

Triage by severity:

python
high_priority_claims = processed_claims.filter(
    (fc.col("claim_data.severity").isin(["severe", "catastrophic"])) |
    (fc.col("claim_data.requires_investigation") == True)
).select(
    fc.col("claim_data.claim_number"),
    fc.col("claim_data.incident_type"),
    fc.col("claim_data.severity"),
    fc.col("claim_data.estimated_loss")
)

Semantic Policy Matching

Use semantic joins to match claims with relevant policy coverages:

python
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Auto accident with collision damage to 2018 Honda Civic",
    right="Collision coverage with $500 deductible for vehicle damage",
    output=True
))
examples.create_example(JoinExample(
    left="Water damage from burst pipe in basement",
    right="Comprehensive auto insurance policy",
    output=False
))

matched_coverage = processed_claims.semantic.join(
    other=structured_policies,
    predicate="""
    Claim: {{ left_on }}
    Coverage: {{ right_on }}
    This coverage applies to this type of claim.
    """,
    left_on=fc.col("claim_data.description"),
    right_on=fc.col("coverages"),
    examples=examples
)

Risk Assessment and Classification

Classify applications by risk level:

python
from fenic.core.types.classify import ClassDefinition

risk_categories = [
    ClassDefinition(
        label="Low Risk",
        description="Clean history, stable employment, low-risk occupation, good credit"
    ),
    ClassDefinition(
        label="Moderate Risk",
        description="Minor incidents in history, moderate-risk factors present"
    ),
    ClassDefinition(
        label="High Risk",
        description="Multiple incidents, high-risk occupation or location, poor credit history"
    ),
    ClassDefinition(
        label="Uninsurable",
        description="Extreme risk factors, fraud indicators, regulatory red flags"
    )
]

# Get PDF metadata first
pdf_files = session.read.pdf_metadata("applications/**/*.pdf", recursive=True)

# Parse PDFs to markdown
applications_df = pdf_files.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
).filter(
    fc.col("content").is_not_null()
)

risk_assessed = applications_df.select(
    fc.col("file_path"),
    fc.semantic.classify(
        fc.col("content"),
        classes=risk_categories
    ).alias("risk_classification")
)

high_risk_apps = risk_assessed.filter(
    fc.col("risk_classification").isin(["High Risk", "Uninsurable"])
)

Fraud Detection with Clustering

Group similar claims to detect patterns:

python
claims_with_embeddings = processed_claims.select(
    fc.col("claim_data.claim_number"),
    fc.col("claim_data.description"),
    fc.col("claim_data.incident_type"),
    fc.semantic.embed(
        fc.col("claim_data.description")
    ).alias("embeddings")
)

clustered_claims = claims_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

cluster_analysis = clustered_claims.group_by("cluster_id").agg(
    fc.count("*").alias("claim_count"),
    fc.first(fc.col("description")).alias("representative_claim"),
    fc.collect_list(fc.col("claim_number")).alias("claim_numbers")
)

suspicious_clusters = cluster_analysis.filter(
    fc.col("claim_count") > 5
)

Compliance Validation

Extract compliance-relevant information:

python
class ComplianceData(BaseModel):
    has_required_disclosures: bool
    state_specific_requirements_met: bool
    missing_clauses: List[str] = Field(description="List of missing required clauses")
    regulatory_violations: List[str] = Field(description="Potential regulatory issues")
    filing_requirements: List[str] = Field(description="Required regulatory filings")

compliance_check = structured_policies.select(
    fc.col("policy_number"),
    fc.col("policy_type"),
    fc.semantic.extract(
        fc.col("file_path"),
        response_format=ComplianceData
    ).alias("compliance")
)

non_compliant = compliance_check.filter(
    (fc.col("compliance.has_required_disclosures") == False) |
    (fc.size(fc.col("compliance.regulatory_violations")) > 0)
)

Creating MCP Tools for LangChain

Model Context Protocol tools expose Fenic DataFrames to LangChain agents.

Save processed data:

python
structured_policies.write.save_as_table("policies", mode="overwrite")
processed_claims.write.save_as_table("claims", mode="overwrite")

Create policy search tool:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType, FloatType

session.catalog.create_tool(
    tool_name="search_policies",
    tool_description="Search insurance policies by type, coverage amount, or policyholder details",
    tool_query=session.table("policies").filter(
        (fc.col("policy_type") == fc.tool_param("policy_type", StringType)) &
        (fc.col("total_premium") <= fc.tool_param("max_premium", FloatType))
    ).limit(fc.tool_param("limit", IntegerType)),
    tool_params=[
        ToolParam(
            name="policy_type",
            description="Type of policy: auto, home, life, health, or commercial"
        ),
        ToolParam(
            name="max_premium",
            description="Maximum annual premium in USD",
            default_value=10000.0,
            has_default=True
        ),
        ToolParam(
            name="limit",
            description="Maximum number of results",
            default_value=20,
            has_default=True
        )
    ],
    result_limit=50
)

Create claims analysis tool:

python
session.catalog.create_tool(
    tool_name="analyze_claims_by_severity",
    tool_description="Get claims statistics filtered by severity level",
    tool_query=session.table("claims").filter(
        fc.col("claim_data.severity") == fc.tool_param("severity", StringType)
    ).agg(
        fc.count("*").alias("total_claims"),
        fc.sum(fc.col("claim_data.estimated_loss")).alias("total_estimated_loss"),
        fc.avg(fc.col("claim_data.estimated_loss")).alias("avg_claim_value")
    ),
    tool_params=[
        ToolParam(
            name="severity",
            description="Severity level: minor, moderate, severe, or catastrophic"
        )
    ]
)

Create policy-claim matching tool:

python
match_query = session.table("policies").join(
    session.table("claims"),
    fc.col("policies.policy_number") == fc.tool_param("policy_number", StringType),
    "inner"
).select(
    fc.col("policies.policy_number"),
    fc.col("policies.policy_type"),
    fc.col("claims.claim_data")
)

session.catalog.create_tool(
    tool_name="get_policy_claims",
    tool_description="Retrieve all claims associated with a specific policy number",
    tool_query=match_query,
    tool_params=[
        ToolParam(
            name="policy_number",
            description="The policy number to search for"
        )
    ],
    result_limit=100
)

Deploying the MCP Server

Launch HTTP server:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="InsuranceAnalyticsServer",
    user_defined_tools=tools,
    concurrency_limit=15
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI deployment:

python
from fenic.api.mcp import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

LangChain Agent Integration

Connect agents to MCP server:

python
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
import requests

def call_fenic_tool(tool_name: str, **params):
    response = requests.post(
        "http://127.0.0.1:8000/mcp",
        json={
            "tool": tool_name,
            "parameters": params
        }
    )
    return response.json()

langchain_tools = [
    Tool(
        name="search_policies",
        func=lambda policy_type, max_premium=10000, limit=20: call_fenic_tool(
            "search_policies",
            policy_type=policy_type,
            max_premium=max_premium,
            limit=limit
        ),
        description="Search insurance policies by type, coverage amount, or policyholder details"
    ),
    Tool(
        name="analyze_claims_by_severity",
        func=lambda severity: call_fenic_tool(
            "analyze_claims_by_severity",
            severity=severity
        ),
        description="Get claims statistics filtered by severity level"
    ),
    Tool(
        name="get_policy_claims",
        func=lambda policy_number: call_fenic_tool(
            "get_policy_claims",
            policy_number=policy_number
        ),
        description="Retrieve all claims associated with a specific policy number"
    )
]

prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an insurance analyst assistant. Use the available tools to:
    - Search and analyze insurance policies
    - Review claims data and identify patterns
    - Match claims to appropriate policy coverages
    - Assess risk levels and provide underwriting recommendations

    Always provide specific data and reasoning for your recommendations."""),
    ("user", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

llm = ChatOpenAI(model="gpt-4", temperature=0)
agent = create_openai_functions_agent(llm, langchain_tools, prompt)
agent_executor = AgentExecutor(
    agent=agent,
    tools=langchain_tools,
    verbose=True
)

result = agent_executor.invoke({
    "input": "Find all auto policies with premiums under $2000 and check if any have severe claims"
})

Advanced Workflows

Carrier Policy Comparison

Compare coverage terms across carriers:

python
# Parse PDFs from carrier A
carrier_a_pdf_files = session.read.pdf_metadata("carrier_a/**/*.pdf", recursive=True)
carrier_a_policies = carrier_a_pdf_files.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
).filter(
    fc.col("content").is_not_null()
)

# Parse PDFs from carrier B
carrier_b_pdf_files = session.read.pdf_metadata("carrier_b/**/*.pdf", recursive=True)
carrier_b_policies = carrier_b_pdf_files.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
).filter(
    fc.col("content").is_not_null()
)

def extract_coverage_terms(df, carrier_name):
    return df.select(
        fc.lit(carrier_name).alias("carrier"),
        fc.col("content"),
        fc.semantic.extract(
            fc.col("content"),
            response_format=PolicyData
        ).alias("terms")
    )

carrier_a_terms = extract_coverage_terms(carrier_a_policies, "Carrier A")
carrier_b_terms = extract_coverage_terms(carrier_b_policies, "Carrier B")

equivalent_coverages = carrier_a_terms.semantic.join(
    other=carrier_b_terms,
    predicate="""
    Policy A: {{ left_on }}
    Policy B: {{ right_on }}
    These policies offer equivalent coverage for similar risk profiles.
    """,
    left_on=fc.col("terms.coverages"),
    right_on=fc.col("terms.coverages")
)

Fraud Pattern Detection

Identify suspicious claim patterns:

python
fraud_indicators = processed_claims.select(
    fc.col("claim_data.claim_number"),
    fc.col("claim_data.description"),
    fc.col("claim_data.estimated_loss"),
    fc.semantic.embed(fc.col("claim_data.description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=20,
    label_column="pattern_cluster"
)

cluster_scores = fraud_indicators.group_by("pattern_cluster").agg(
    fc.count("*").alias("cluster_size"),
    fc.avg(fc.col("estimated_loss")).alias("avg_loss"),
    fc.stddev(fc.col("estimated_loss")).alias("loss_stddev")
).with_column(
    "fraud_risk_score",
    fc.when(
        (fc.col("cluster_size") > 10) & (fc.col("loss_stddev") < 1000),
        fc.lit("HIGH")
    ).otherwise(fc.lit("LOW"))
)

high_risk_claims = fraud_indicators.join(
    cluster_scores,
    "pattern_cluster"
).filter(
    fc.col("fraud_risk_score") == "HIGH"
)

Pricing Factor Analysis

Extract and analyze pricing patterns:

python
class PricingFactors(BaseModel):
    base_rate: float
    risk_multipliers: dict = Field(description="Risk factor adjustments")
    discounts_applied: List[str]
    final_premium: float

pricing_analysis = structured_policies.select(
    fc.col("policy_type"),
    fc.col("policyholder.age"),
    fc.col("policyholder.risk_factors"),
    fc.col("total_premium"),
    fc.semantic.extract(
        fc.col("file_path"),
        response_format=PricingFactors
    ).alias("pricing_breakdown")
)

demographic_pricing = pricing_analysis.group_by(
    fc.col("policy_type"),
    fc.floor(fc.col("policyholder.age") / 10).alias("age_bracket")
).agg(
    fc.avg(fc.col("total_premium")).alias("avg_premium"),
    fc.count("*").alias("policy_count")
)

Performance Optimization

Batch Processing

Process large document sets in batches:

python
# Get PDF metadata first
pdf_files = session.read.pdf_metadata("all_policies/**/*.pdf", recursive=True)

# Parse PDFs to markdown
large_policy_set = pdf_files.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
).filter(
    fc.col("content").is_not_null()
)
batch_size = 1000
total_docs = large_policy_set.count()

for offset in range(0, total_docs, batch_size):
    batch = large_policy_set.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        fc.col("file_path"),
        fc.semantic.extract(
            fc.col("content"),
            response_format=PolicyData
        ).alias("policy_data")
    )

    processed_batch.write.save_as_table(
        "all_policies_processed",
        mode="append"
    )

Caching Embeddings

Cache expensive operations:

python
policies_with_embeddings = structured_policies.select(
    fc.col("policy_number"),
    fc.col("file_path"),
    fc.semantic.embed(fc.col("file_path")).alias("policy_embeddings")
)

policies_with_embeddings.write.save_as_table(
    "cached_policy_embeddings",
    mode="overwrite"
)

cached_embeddings = session.table("cached_policy_embeddings")

Rate Limit Configuration

Configure model rate limits:

python
config = SessionConfig(
    app_name="insurance_analytics",
    semantic=SemanticConfig(
        language_models={
            "fast_model": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate_model": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast_model"
    )
)

Monitoring and Cost Tracking

Track processing metrics:

python
metrics_df = session.table("fenic_system.query_metrics")

cost_analysis = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(100)

hourly_costs = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests,
        AVG(total_lm_cost) as avg_cost_per_query
    FROM {metrics}
    WHERE end_ts > NOW() - INTERVAL '7 days'
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

hourly_costs.show()

Production Deployment

Error Handling

Add validation layers:

python
validated_policies = extracted_policies.with_column(
    "is_valid",
    fc.when(
        fc.col("policy_data.policy_number").is_not_null() &
        (fc.col("policy_data.total_premium") > 0),
        True
    ).otherwise(False)
)

valid_policies = validated_policies.filter(fc.col("is_valid"))
invalid_policies = validated_policies.filter(~fc.col("is_valid"))

invalid_policies.select(
    fc.col("file_path"),
    fc.lit("Validation failed: missing required fields").alias("error")
).write.csv("validation_errors.csv")

Incremental Processing

Process only new documents:

python
existing_policies = session.table("policies")
processed_paths = existing_policies.select(
    fc.col("file_path")
).distinct().collect()

processed_set = {row["file_path"] for row in processed_paths}

new_policies = policies_df.filter(
    ~fc.col("file_path").isin(list(processed_set))
)

new_processed = new_policies.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=PolicyData
    ).alias("policy_data")
)

new_processed.write.save_as_table("policies", mode="append")

Implementation Summary

This architecture delivers production-grade insurance analytics by combining:

  • Fenic for batch processing: Handle semantic operations on insurance documents at scale with semantic operators
  • Type-safe extraction: Use Pydantic schemas for consistent structured output
  • Semantic matching: Apply semantic joins for claims-to-coverage matching and classification for risk assessment
  • MCP integration: Deploy tools that LangChain agents can access directly
  • Production optimization: Cache embeddings, batch process documents, and monitor costs

The separation of concerns enables reliable insurance analytics: Fenic handles data transformation and enrichment while LangChain agents make underwriting decisions and handle customer interactions.

For more implementation patterns, see Typedef's platform resources, RudderStack case study, and semantic operator documentation. How to Augment LangChain with ... 1efcf080008b5cef507003f305.md External Displaying How to Augment LangChain with Typedef ai for Insur 290df41efcf080008b5cef507003f305.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.