LangChain agents show promise for fraud detection, but their effectiveness breaks down when handling the scale and complexity of financial data. Raw transaction logs, unstructured merchant descriptions, and variable user behavior patterns create bottlenecks where agents waste tokens on data parsing instead of fraud analysis.
Fenic addresses this by providing semantic operations within a DataFrame framework, enabling you to preprocess, structure, and enrich fraud signals before they reach your LangChain agents. This separation improves detection accuracy while reducing API costs and latency.
The Fraud Detection Challenge in LangChain Workflows
Traditional LangChain fraud detection implementations face three critical issues:
Unstructured financial data requires extensive preprocessing. Transaction descriptions, merchant names, and user activity logs need extraction and normalization before agents can identify fraud patterns effectively.
Context window constraints limit detection scope. Agents cannot process entire transaction histories within token limits, forcing you to choose between breadth of analysis and depth of pattern recognition.
Inconsistent tool interfaces create maintenance overhead. Custom Python functions for transaction queries lack type safety and require constant updates as fraud patterns evolve.
Fenic's semantic DataFrame API solves these problems through batch preprocessing that produces clean, structured fraud signals ready for agent analysis.
Semantic Operations for Fraud Pattern Recognition
Fenic provides eight core semantic operators accessible through the df.semantic interface. Each operator handles specific fraud detection tasks without requiring manual prompt engineering or validation logic.
Extract Structured Fraud Signals with semantic.extract
The semantic.extract operator transforms unstructured transaction data into typed fraud signals using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal import fenic.api.functions.core as fc class TransactionRisk(BaseModel): transaction_type: Literal["purchase", "refund", "withdrawal", "transfer"] merchant_category: str = Field(description="Standardized merchant category") risk_indicators: List[str] = Field(description="Detected risk flags") location_mismatch: bool = Field(description="Transaction location differs from user pattern") velocity_concern: bool = Field(description="Unusually high transaction frequency") # Process raw transaction descriptions fraud_signals = transactions_df.select( fc.col("transaction_id"), fc.col("amount"), fc.semantic.extract( fc.col("raw_description"), response_format=TransactionRisk ).alias("risk_data") ).unnest("risk_data")
This approach ensures type-safe results while maintaining familiar DataFrame operations. The schema serves as both documentation and validation for downstream fraud analysis.
Classify Transactions by Fraud Type
The semantic.classify operator categorizes transactions into predefined fraud patterns using few-shot examples:
pythonimport fenic.api.functions.core as fc from fenic.core.types.semantic_examples import ClassifyExample, ClassifyExampleCollection from fenic.core.types.classify import ClassDefinition # Define fraud categories with descriptions fraud_categories = [ ClassDefinition( label="account_takeover", description="Unauthorized access to existing account with pattern changes" ), ClassDefinition( label="card_testing", description="Multiple small transactions testing stolen card validity" ), ClassDefinition( label="friendly_fraud", description="Legitimate cardholder disputes valid charges" ), ClassDefinition( label="synthetic_identity", description="Fabricated identity using real and fake information" ), ClassDefinition( label="legitimate", description="Normal transaction with no fraud indicators" ) ] # Provide classification examples examples = ClassifyExampleCollection() examples.create_example(ClassifyExample( input="Five transactions under $1 to different merchants within 2 minutes", output="card_testing" )) examples.create_example(ClassifyExample( input="Large purchase from new location after password reset from unfamiliar IP", output="account_takeover" )) # Classify transaction patterns classified_df = fraud_signals.select( fc.col("transaction_id"), fc.semantic.classify( fc.col("risk_data.risk_indicators"), classes=fraud_categories, examples=examples ).alias("fraud_type") )
Filter High-Risk Transactions with semantic.predicate
The semantic.predicate operator enables natural language filtering that goes beyond simple rule matching:
pythonimport fenic.api.functions.core as fc from fenic.core.types.semantic_examples import PredicateExample, PredicateExampleCollection # Create predicate examples predicate_examples = PredicateExampleCollection() predicate_examples.create_example(PredicateExample( input={"desc": "Purchase from new merchant in foreign country, user typically shops locally"}, output=True )) predicate_examples.create_example(PredicateExample( input={"desc": "Regular monthly subscription charge from known merchant"}, output=False )) # Apply semantic filtering high_risk_transactions = fraud_signals.filter( (fc.col("amount") > 500) & fc.semantic.predicate( "Transaction pattern: {{ desc }}. This shows characteristics of fraud.", desc=fc.col("risk_data.risk_indicators"), examples=predicate_examples ) )
This combines traditional column filtering with semantic knowledge, allowing advanced content-based detection without writing custom inference code.
Match Fraud Patterns with semantic.join
The semantic.join operator enables joining datasets based on semantic similarity rather than exact matches:
pythonimport fenic.api.functions.core as fc from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection # Create join examples join_examples = JoinExampleCollection() join_examples.create_example(JoinExample( left="Multiple rapid transactions from electronics retailers in different states", right="Card testing pattern: sequential small purchases across varied merchants", output=True )) join_examples.create_example(JoinExample( left="Single large purchase from luxury goods retailer", right="Account takeover: sudden high-value transactions after credential change", output=False )) # Join transactions with known fraud patterns matched_fraud = transactions_df.semantic.join( other=known_fraud_patterns_df, predicate="""Current Transaction: {{ left_on }} Known Fraud Pattern: {{ right_on }} The transaction matches this fraud pattern.""", left_on=fc.col("transaction_summary"), right_on=fc.col("pattern_description"), examples=join_examples )
This powerful operation enables sophisticated matching based on meaning rather than keywords, critical for identifying fraud variations.
Embedding-Based Fraud Clustering
Fenic's embedding operations enable clustering of transaction patterns for anomaly detection:
pythonimport fenic.api.functions.core as fc # Generate embeddings for transaction descriptions embedded_transactions = fraud_signals.select( fc.col("transaction_id"), fc.col("amount"), fc.col("merchant_category"), fc.semantic.embed( fc.col("risk_data.risk_indicators") ).alias("risk_embeddings") ) # Cluster similar transaction patterns clustered_fraud = embedded_transactions.semantic.with_cluster_labels( by=fc.col("risk_embeddings"), num_clusters=15, label_column="fraud_cluster_id", centroid_column="cluster_centroid" ) # Analyze cluster characteristics cluster_analysis = clustered_fraud.group_by("fraud_cluster_id").agg( fc.count("*").alias("transaction_count"), fc.avg("amount").alias("avg_amount"), fc.first(fc.col("merchant_category")).alias("primary_category") )
Agents can query pre-clustered data to identify emerging fraud patterns without processing every transaction individually.
Detect Outliers Using Centroid Distance
Filter transactions that deviate significantly from their cluster patterns:
pythonimport fenic.api.functions.core as fc import fenic.api.functions.embedding as embedding # Calculate similarity to cluster centroid outlier_transactions = clustered_fraud.with_column( "similarity_to_cluster", embedding.compute_similarity( fc.col("risk_embeddings"), fc.col("cluster_centroid"), metric="cosine" ) ).filter( fc.col("similarity_to_cluster") < 0.6 )
This identifies transactions that don't fit established patterns, surfacing potential new fraud techniques.
Building MCP Tools for LangChain Fraud Agents
The Model Context Protocol enables seamless integration between Fenic's fraud data pipelines and LangChain agents. Create tools that agents can call to access processed fraud signals.
Setting Up Fenic for Fraud Detection
Configure a session with semantic capabilities:
pythonfrom fenic.api.session.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel ) config = SessionConfig( app_name="fraud_detection_pipeline", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=500, tpm=500000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Create Parameterized Fraud Detection Tools
Define DataFrame queries with parameters using tool_param:
pythonimport fenic.api.functions.core as fc from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, FloatType, IntegerType # Load and process transactions transactions_df = session.read.csv("./data/transactions.csv") # Process with semantic operations fraud_analysis = transactions_df.select( fc.col("transaction_id"), fc.col("user_id"), fc.col("amount"), fc.col("merchant"), fc.semantic.extract( fc.col("description"), response_format=TransactionRisk ).alias("risk_data") ).unnest("risk_data") # Create fraud search tool fraud_search_query = fraud_analysis.filter( (fc.col("amount") >= fc.tool_param("min_amount", FloatType)) & fc.col("merchant_category").contains( fc.tool_param("merchant_category", StringType) ) ).select( fc.col("transaction_id"), fc.col("user_id"), fc.col("amount"), fc.col("merchant"), fc.col("risk_indicators") ) session.catalog.create_tool( tool_name="search_suspicious_transactions", tool_description="Search for potentially fraudulent transactions by amount and merchant category", tool_query=fraud_search_query, tool_params=[ ToolParam( name="min_amount", description="Minimum transaction amount to flag", default_value=100.0, has_default=True ), ToolParam( name="merchant_category", description="Merchant category to filter (e.g., 'electronics', 'travel')" ) ], result_limit=50 )
Build Multi-Step Fraud Analysis Tools
Chain semantic operations for sophisticated fraud detection:
pythonimport fenic.api.functions.core as fc import fenic.api.functions.embedding as embedding # Load transaction history transaction_history = session.read.csv("./data/full_transaction_history.csv") # Generate embeddings and cluster processed_history = transaction_history.select( fc.col("transaction_id"), fc.col("user_id"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ).semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=20, label_column="pattern_cluster" ) # Create similarity search tool similar_fraud_query = processed_history.filter( embedding.compute_similarity( fc.col("embeddings"), fc.tool_param("query_embedding", fc.col("embeddings").data_type), metric="cosine" ) > 0.75 ).select( fc.col("transaction_id"), fc.col("user_id"), fc.col("description"), fc.col("pattern_cluster") ) session.catalog.create_tool( tool_name="find_similar_fraud_patterns", tool_description="Find historical transactions with similar fraud patterns", tool_query=similar_fraud_query, tool_params=[ ToolParam( name="query_embedding", description="Embedding vector of the suspicious transaction" ) ], result_limit=25 )
Deploying MCP Server for LangChain Integration
Deploy your Fenic fraud detection tools as an MCP server that LangChain agents can access:
Production ASGI Deployment
pythonfrom fenic.api.mcp.server import create_mcp_server, run_mcp_server_asgi session = Session.get_or_create(config) tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="FraudDetectionServer", user_defined_tools=tools, concurrency_limit=20 ) app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy with uvicorn: uvicorn app:app --host 0.0.0.0 --port 8000
CLI-Based Server for Development
Use the fenic-serve command for rapid deployment during development:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific fraud detection tools fenic-serve --tools search_suspicious_transactions find_similar_fraud_patterns # Use stdio transport for direct LangChain integration fenic-serve --transport stdio
More details on MCP integration are available in the Fenic 0.4.0 release.
Production Fraud Detection Patterns
Hybrid Search with Risk Scores
Combine semantic similarity with structured fraud rules:
pythonimport fenic.api.functions.core as fc import fenic.api.functions.embedding as embedding from fenic.core.types import StringType, FloatType from fenic.core.mcp.types import ToolParam # Add embeddings to transaction data transactions_with_embeddings = fraud_analysis.select( fc.col("transaction_id"), fc.col("user_id"), fc.col("amount"), fc.col("merchant_category"), fc.col("risk_indicators"), fc.semantic.embed(fc.col("risk_indicators")).alias("risk_embeddings") ) # Create hybrid fraud detection tool hybrid_fraud_search = transactions_with_embeddings.filter( (fc.col("merchant_category") == fc.tool_param("category", StringType)) & (fc.col("amount") >= fc.tool_param("min_amount", FloatType)) & (fc.col("amount") <= fc.tool_param("max_amount", FloatType)) ).with_column( "pattern_similarity", embedding.compute_similarity( fc.col("risk_embeddings"), fc.tool_param("risk_pattern_embedding", fc.col("risk_embeddings").data_type), metric="cosine" ) ).filter( fc.col("pattern_similarity") > 0.7 ).order_by( fc.col("pattern_similarity").desc() ) session.catalog.create_tool( tool_name="hybrid_fraud_detection", tool_description="Search for fraudulent transactions using category, amount range, and pattern similarity", tool_query=hybrid_fraud_search, tool_params=[ ToolParam(name="category", description="Merchant category to analyze"), ToolParam(name="min_amount", description="Minimum transaction amount"), ToolParam(name="max_amount", description="Maximum transaction amount"), ToolParam(name="risk_pattern_embedding", description="Embedding of known fraud pattern") ], result_limit=30 )
Time-Series Velocity Analysis
Detect fraud through transaction velocity patterns:
pythonimport fenic.api.functions.core as fc import fenic.api.functions.dt as dt from fenic.core.types import StringType from fenic.core.mcp.types import ToolParam # Analyze transaction velocity velocity_analysis = transaction_history.with_column( "hour_of_day", dt.hour(fc.col("timestamp")) ).group_by("user_id", "hour_of_day").agg( fc.count("*").alias("hourly_transaction_count"), fc.sum("amount").alias("hourly_total_amount"), fc.collect_list("merchant_category").alias("merchant_categories") ) # Flag velocity anomalies velocity_flags = velocity_analysis.filter( (fc.col("hourly_transaction_count") > 10) | (fc.col("hourly_total_amount") > 5000) ) # Create velocity monitoring tool velocity_query = velocity_flags.filter( fc.col("user_id") == fc.tool_param("user_id", StringType) ) session.catalog.create_tool( tool_name="check_user_velocity", tool_description="Analyze transaction velocity patterns for a specific user", tool_query=velocity_query, tool_params=[ ToolParam( name="user_id", description="User ID to analyze for velocity-based fraud" ) ], result_limit=24 )
Optimizing for Production Fraud Detection
Cache Embeddings to Reduce Costs
Save expensive semantic operations to avoid recomputation:
python# Generate and cache transaction embeddings transaction_embeddings = transactions_df.select( fc.col("transaction_id"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ) # Materialize to catalog for reuse transaction_embeddings.write.save_as_table( "transaction_embeddings", mode="overwrite" ) # Reuse cached embeddings in fraud detection tools cached_embeddings = session.table("transaction_embeddings")
Batch Processing for Large Transaction Volumes
Process transactions in batches to control memory and API usage:
python# Process large transaction datasets incrementally large_transaction_set = session.read.csv("./data/large_transactions.csv") batch_size = 5000 offset = 0 while True: batch = large_transaction_set.limit(batch_size).offset(offset) # Process batch with semantic operations processed_batch = batch.select( fc.col("transaction_id"), fc.semantic.extract( fc.col("description"), response_format=TransactionRisk ).alias("risk_data") ) # Append to results table processed_batch.write.save_as_table( "fraud_analysis_results", mode="append" ) if batch.count() < batch_size: break offset += batch_size
Monitor Costs and Performance
Track fraud detection pipeline metrics:
python# Query metrics table for cost analysis metrics_df = session.table("fenic_system.query_metrics") # Analyze recent fraud detection queries recent_fraud_queries = metrics_df.select( fc.col("query_id"), fc.col("total_lm_cost"), fc.col("total_lm_requests"), fc.col("end_ts") ).order_by( fc.col("end_ts").desc() ).limit(50) recent_fraud_queries.show() # Aggregate costs by time period cost_by_hour = session.sql(""" SELECT DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour, SUM(total_lm_cost) as total_cost, SUM(total_lm_requests) as total_requests, COUNT(*) as query_count FROM {metrics} GROUP BY hour ORDER BY hour DESC LIMIT 168 """, metrics=metrics_df) cost_by_hour.show()
Complete Fraud Detection Implementation
Here's an end-to-end fraud detection pipeline with LangChain integration:
pythonfrom fenic.api.session.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, FloatType import fenic.api.functions.core as fc from pydantic import BaseModel, Field from typing import List, Literal # 1. Configure session config = SessionConfig( app_name="fraud_detection_system", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=500, tpm=500000 ) } ) ) session = Session.get_or_create(config) # 2. Define fraud signal schema class FraudSignal(BaseModel): risk_level: Literal["low", "medium", "high", "critical"] fraud_indicators: List[str] = Field(description="Detected fraud flags") merchant_risk: str = Field(description="Merchant risk assessment") geographic_anomaly: bool = Field(description="Transaction from unusual location") # 3. Load and process transaction data transactions_df = session.read.csv("./data/transactions.csv") user_profiles_df = session.read.csv("./data/user_profiles.csv") # 4. Extract fraud signals fraud_analysis = transactions_df.select( fc.col("transaction_id"), fc.col("user_id"), fc.col("amount"), fc.col("merchant"), fc.semantic.extract( fc.col("description"), response_format=FraudSignal ).alias("fraud_signal") ).unnest("fraud_signal") # 5. Generate embeddings for clustering fraud_with_embeddings = fraud_analysis.select( fc.col("transaction_id"), fc.col("user_id"), fc.col("risk_level"), fc.col("fraud_indicators"), fc.semantic.embed(fc.col("fraud_indicators")).alias("risk_embeddings") ) # 6. Cluster fraud patterns clustered_fraud = fraud_with_embeddings.semantic.with_cluster_labels( by=fc.col("risk_embeddings"), num_clusters=12, label_column="fraud_pattern_cluster" ) # 7. Create fraud detection tool high_risk_query = clustered_fraud.filter( fc.col("risk_level").is_in(["high", "critical"]) & (fc.col("amount") >= fc.tool_param("min_amount", FloatType)) ).select( fc.col("transaction_id"), fc.col("user_id"), fc.col("amount"), fc.col("risk_level"), fc.col("fraud_indicators"), fc.col("fraud_pattern_cluster") ) session.catalog.create_tool( tool_name="detect_high_risk_transactions", tool_description="Identify high-risk and critical fraud transactions above specified amount", tool_query=high_risk_query, tool_params=[ ToolParam( name="min_amount", description="Minimum transaction amount to analyze", default_value=50.0, has_default=True ) ], result_limit=100 ) # 8. Create pattern analysis tool pattern_analysis_query = clustered_fraud.group_by("fraud_pattern_cluster").agg( fc.count("*").alias("occurrence_count"), fc.avg("amount").alias("avg_transaction_amount"), fc.collect_list("fraud_indicators").alias("common_indicators") ) session.catalog.create_tool( tool_name="analyze_fraud_patterns", tool_description="Analyze clustered fraud patterns and their characteristics", tool_query=pattern_analysis_query, tool_params=[], result_limit=20 ) # 9. Deploy MCP server tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="FraudDetectionServer", user_defined_tools=tools, concurrency_limit=15 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Key Implementation Principles
Preprocess data with semantic operations. Use extraction, classification, and clustering to structure fraud signals before agents access them, reducing token usage and improving detection accuracy.
Build type-safe tools. Create parameterized tools using tool_param for compile-time safety and runtime validation, ensuring agents receive consistent fraud data.
Deploy via MCP. Expose Fenic DataFrames through MCP servers that LangChain agents can call directly, maintaining a clean separation between data processing and agent reasoning.
Optimize for production. Cache embeddings, batch process large transaction volumes, and monitor costs through Fenic's metrics system to maintain performance at scale.
Leverage clustering for pattern detection. Use automatic clustering to identify emerging fraud patterns without manual rule creation, enabling adaptive fraud detection.
By separating fraud data preparation from agent reasoning, Fenic enables cleaner architectures where LangChain agents focus on detection decisions while semantic pipelines handle the complexity of transaction analysis. This separation improves reliability, reduces costs, and makes fraud detection systems easier to maintain and scale.
For implementation guidance, see the LangChain integration resource, explore the semantic operators documentation, and review the log clustering example for similar pattern detection workflows. How to Enhance LangChain Fr ... fcf08097b47fcc8b03d7ee11.md External Displaying How to Enhance LangChain Fraud Detection with Feni 290df41efcf08097b47fcc8b03d7ee11.md.

