Chat logs and conversation data contain valuable insights, but extracting meaningful patterns from unstructured dialogue requires more than simple keyword searches. Traditional text processing falls short when dealing with context, intent, and semantic relationships. This guide shows how to use semantic operations on conversation data to extract actionable insights at scale.
The Challenge with Conventional Chat Log Analysis
Most organizations collect conversation data from support chats, sales calls, team communications, or user feedback. The standard approach involves keyword searches, regex patterns, or custom scripts. These methods break down quickly:
Rigid pattern matching fails to capture meaning. A support chat mentioning "can't log in" and another saying "authentication isn't working" describe the same issue, but string matching treats them as different problems.
Context gets lost in aggregation. Sentiment analysis on individual messages misses the conversation arc. A frustrated opening can resolve into satisfaction, but isolated message scoring misses that progression.
Manual categorization doesn't scale. Human reviewers can classify hundreds of conversations, not thousands. As volume grows, coverage drops and backlogs pile up.
Multi-turn conversations require stateful understanding. Unlike isolated documents, chat logs have temporal structure. Who said what, when, and in response to what question all matter for proper analysis.
Semantic processing addresses these limitations by understanding meaning, not just text patterns. Fenic, Typedef's DataFrame framework for AI applications, brings semantic operations directly into data pipelines.
Why DataFrame Operations for Conversation Analysis
Chat logs are data. They arrive in various formats (JSON exports, CSV files, transcript formats), need parsing and transformation, and produce structured insights. DataFrames provide the natural abstraction for these operations.
Fenic extends the familiar DataFrame API with semantic operators. Instead of writing custom LLM integration code, you call semantic.classify(), semantic.extract(), or semantic.join() as standard column operations. The framework handles model orchestration, rate limits, batching, and error handling.
This approach keeps conversations queryable and transformations auditable. Every semantic operation is a DataFrame transformation with inputs, outputs, and lineage tracking.
Setting Up Fenic for Semantic Analysis
Install Fenic and configure a session with your model providers:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel config = SessionConfig( app_name="conversation_analysis", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config)
This configuration sets rate limits and model selection. Fenic automatically batches requests and handles throttling, so your pipeline stays reliable at scale.
Parsing Conversation Formats into Structured Data
Chat logs arrive in multiple formats. Fenic's parse_transcript() function normalizes three common formats into a unified schema.
Supported Formats
SRT (SubRip Subtitle)
1
00:00:01,000 --> 00:00:05,000
Customer: I can't access my account
2
00:00:05,500 --> 00:00:10,000
Agent: Let me look that up for you
WebVTT (Web Video Text Tracks)
WEBVTT
Customer
00:00:01.000 --> 00:00:05.000
I can't access my account
Agent
00:00:05.500 --> 00:00:10.000
Let me look that up for you
Generic Conversation
[00:00:01] Customer: I can't access my account
[00:00:05] Agent: Let me look that up for you
Parsing Implementation
pythonimport fenic.api.functions as fc from fenic.api.functions import text # Load conversation files df = session.read.docs( "conversations/**/*.txt", content_type="markdown", recursive=True ) # Parse transcript format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode into individual messages messages_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("segment") ).select( fc.col("file_path"), fc.col("segment.speaker").alias("speaker"), fc.col("segment.start_time").alias("timestamp"), fc.col("segment.content").alias("text") )
The unified schema provides:
speaker: Who said ittimestamp: When (in seconds)text: What was saidformat: Original format type
This structure lets you apply semantic operations consistently across different source formats.
Classifying Conversation Intent and Topics
Classification tags conversations with predefined categories. This works better than keyword matching because it understands semantic similarity.
pythonfrom fenic.api.functions import semantic # Classify support conversations by issue type classified_df = messages_df.with_column( "issue_category", semantic.classify( fc.col("text"), ["Account Access", "Payment Issue", "Technical Problem", "Product Question"] ) )
For better accuracy, use ClassDefinition objects with descriptions:
pythonfrom fenic.core.types.classify import ClassDefinition categories = [ ClassDefinition( label="Account Access", description="Login problems, password resets, account recovery" ), ClassDefinition( label="Payment Issue", description="Billing questions, failed transactions, refund requests" ), ClassDefinition( label="Technical Problem", description="Application bugs, feature malfunctions, system errors" ) ] classified_df = messages_df.with_column( "issue_category", semantic.classify(fc.col("text"), categories) )
Classification works per message or per conversation. Aggregate classifications to understand conversation-level topics:
python# Get primary topic per conversation conversation_topics = classified_df.group_by("file_path").agg( fc.mode(fc.col("issue_category")).alias("primary_topic"), fc.count("*").alias("message_count") )
Learn more about building reliable AI pipelines with semantic operators.
Extracting Structured Information from Dialogue
Conversations contain structured information hidden in unstructured text. Extraction pulls specific fields into typed schemas using Pydantic models.
pythonfrom pydantic import BaseModel, Field from typing import List class ActionItem(BaseModel): task: str = Field(description="What needs to be done") assignee: str = Field(description="Who is responsible") deadline: str = Field(description="Due date if mentioned") # Extract action items from meeting transcripts actions_df = messages_df.select( fc.col("file_path"), fc.col("text"), semantic.extract(fc.col("text"), ActionItem).alias("action") ).filter( fc.col("action").is_not_null() )
Extract multiple entities at once with nested models:
pythonclass CustomerInfo(BaseModel): name: str = Field(description="Customer's name") account_id: str = Field(description="Account identifier") issue_description: str = Field(description="Main problem described") sentiment: str = Field(description="Customer's emotional state") # Extract structured data from support conversations info_df = messages_df.with_column( "customer_info", semantic.extract(fc.col("text"), CustomerInfo) ).with_column( "name", fc.col("customer_info.name") ).with_column( "account", fc.col("customer_info.account_id") ).with_column( "issue", fc.col("customer_info.issue_description") )
Extraction handles optional fields naturally. Missing information produces null values without breaking the pipeline.
Analyzing Sentiment Across Conversations
Sentiment analysis on chat logs reveals emotional progression through conversations. Fenic's analyze_sentiment() function returns positive, negative, or neutral classifications.
python# Add sentiment to each message sentiment_df = messages_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Track sentiment changes over conversation sentiment_flow = sentiment_df.select( fc.col("file_path"), fc.col("speaker"), fc.col("timestamp"), fc.col("sentiment") ).order_by("file_path", "timestamp")
Aggregate sentiment by speaker to find patterns:
python# Speaker-level sentiment distribution speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("total_messages"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") )
Detect sentiment shifts within conversations:
python# Calculate sentiment score trend sentiment_score = sentiment_df.with_column( "score", fc.when(fc.col("sentiment") == "positive", 1) .when(fc.col("sentiment") == "negative", -1) .otherwise(0) ) # Track cumulative sentiment from fenic.api.window import Window window_spec = Window.partition_by("file_path").order_by("timestamp") trend_df = sentiment_score.with_column( "cumulative_sentiment", fc.sum(fc.col("score")).over(window_spec) )
This approach reveals whether conversations improve or deteriorate emotionally, which matters for support quality and customer satisfaction analysis.
Filtering Conversations with Semantic Predicates
Semantic predicates enable natural language filtering beyond exact string matches. Use semantic.predicate() to find messages matching complex conditions.
python# Find messages indicating customer frustration frustrated = messages_df.filter( semantic.predicate( "This message expresses frustration, anger, or dissatisfaction: {{ text }}", text=fc.col("text") ) )
Combine semantic and traditional filters:
python# Find recent urgent support issues urgent_recent = messages_df.filter( (fc.col("timestamp") > fc.lit(86400)) & # Last 24 hours semantic.predicate( "This support message indicates an urgent or critical issue: {{ text }}", text=fc.col("text") ) )
Use predicates for multi-column reasoning:
python# Find conversations where agent response matched customer question relevant_responses = messages_df.filter( semantic.predicate( """ Customer question: {{ question }} Agent response: {{ response }} The agent directly addressed the customer's question. """, question=fc.col("previous_message"), response=fc.col("text") ) )
Semantic predicates handle variations in phrasing that keyword matching misses. "I'm furious," "this is unacceptable," and "terrible experience" all match a frustration predicate despite different wording.
Generating Summaries and Insights with Semantic Reduction
Semantic reduction aggregates multiple messages into concise summaries. The semantic.reduce() function applies natural language instructions to grouped text.
python# Summarize each conversation conversation_summaries = messages_df.group_by("file_path").agg( semantic.reduce( "Summarize this conversation highlighting the main issue and resolution", fc.col("text"), order_by=fc.col("timestamp") ).alias("summary") )
The order_by parameter preserves temporal sequence, ensuring summaries respect conversation flow.
Generate different types of insights:
python# Key decisions per conversation decisions = messages_df.group_by("file_path").agg( semantic.reduce( "List the key decisions made in this conversation", fc.col("text"), order_by=fc.col("timestamp") ).alias("decisions") ) # Action items per meeting actions = messages_df.group_by("file_path").agg( semantic.reduce( "Extract action items with owners and deadlines", fc.col("text"), order_by=fc.col("timestamp") ).alias("action_items") )
Use group context to customize summaries:
python# Summaries with conversation metadata enriched_summaries = messages_df.group_by("file_path", "primary_speaker").agg( semantic.reduce( "Summarize this {{ primary_speaker }} conversation focusing on customer outcomes", fc.col("text"), group_context={"primary_speaker": fc.col("primary_speaker")}, order_by=fc.col("timestamp") ).alias("summary") )
Semantic reduction works for any text aggregation where you need coherent synthesis rather than concatenation.
Joining Conversations with Related Data
Semantic joins connect conversations based on meaning rather than exact matches. This enables matching support chats with knowledge base articles, or linking conversations discussing similar issues.
python# Load knowledge base articles kb_df = session.read.docs("knowledge_base/*.md", content_type="markdown") # Join conversations with relevant KB articles from textwrap import dedent matched = messages_df.semantic.join( other=kb_df, predicate=dedent(''' Customer Message: {{ left_on }} Knowledge Base Article: {{ right_on }} This article would help resolve the customer's issue. '''), left_on=fc.col("text"), right_on=fc.col("content") )
Find related conversations across different time periods:
python# Load historical conversations recent = messages_df.filter(fc.col("timestamp") > fc.lit(604800)) # Last week historical = messages_df.filter(fc.col("timestamp") <= fc.lit(604800)) # Find similar issues discussed before similar_issues = recent.semantic.join( other=historical, predicate=""" Recent conversation: {{ left_on }} Past conversation: {{ right_on }} These conversations discuss the same underlying problem. """, left_on=fc.col("text"), right_on=fc.col("text") )
Semantic joins power sophisticated analysis like:
- Matching customer questions to FAQ answers
- Linking conversations about related products
- Finding duplicate support tickets
- Connecting sales conversations to existing case studies
The join returns combined DataFrames with columns from both sides, enabling downstream analysis on matched pairs.
Clustering Conversations by Semantic Similarity
Clustering groups conversations by semantic similarity rather than keyword overlap. This reveals thematic patterns across large conversation sets.
python# Generate embeddings for conversation text embedded = messages_df.with_column( "embedding", semantic.embed(fc.col("text")) ) # Cluster using K-means clustered = embedded.semantic.with_cluster_labels( by="embedding", num_clusters=10 )
Analyze cluster characteristics:
python# Get most representative messages per cluster cluster_exemplars = clustered.group_by("cluster_label").agg( fc.count("*").alias("conversation_count"), fc.first(fc.col("text")).alias("example_message") ) # Examine cluster topics cluster_topics = clustered.group_by("cluster_label").agg( semantic.reduce( "Describe the common theme across these conversations", fc.col("text") ).alias("cluster_theme") )
Use clusters to route conversations or prioritize review:
python# Find high-urgency clusters urgent_clusters = clustered.filter( semantic.predicate( "These conversations indicate urgent customer issues: {{ text }}", text=fc.col("text") ) ).group_by("cluster_label").agg( fc.count("*").alias("urgent_count") )
Clustering scales to thousands of conversations and reveals patterns humans would miss in manual review. See the log clustering and triage guide for a detailed implementation.
Temporal Analysis of Conversation Patterns
Chat logs have time-based structure. Analyzing temporal patterns reveals how conversations evolve and when issues arise.
python# Add time-based windows from fenic.api.functions import col windowed = messages_df.with_column( "hour_bucket", (fc.col("timestamp") / 3600).cast("integer") ).with_column( "day_bucket", (fc.col("timestamp") / 86400).cast("integer") ) # Hourly conversation volume hourly_volume = windowed.group_by("hour_bucket").agg( fc.count("*").alias("message_count"), fc.count_distinct("file_path").alias("conversation_count") )
Track topic trends over time:
python# Classify messages, then aggregate by time classified_temporal = messages_df.with_column( "category", semantic.classify(fc.col("text"), ["Bug Report", "Feature Request", "Question"]) ).with_column( "week", (fc.col("timestamp") / 604800).cast("integer") ) # Weekly category distribution weekly_trends = classified_temporal.group_by("week", "category").agg( fc.count("*").alias("count") )
Identify sentiment changes throughout conversations:
python# Conversation-level sentiment progression sentiment_progression = messages_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ).with_column( "message_index", fc.row_number().over( Window.partition_by("file_path").order_by("timestamp") ) ) # First message vs last message sentiment sentiment_change = sentiment_progression.group_by("file_path").agg( fc.first(fc.col("sentiment")).alias("initial_sentiment"), fc.last(fc.col("sentiment")).alias("final_sentiment") )
Temporal analysis answers questions like: When do support volumes spike? Which issues trend upward? Do conversations end more positively than they start?
Creating Queryable Tools with Model Context Protocol
Make conversation insights accessible to agents and applications using the Model Context Protocol (MCP). MCP exposes DataFrame queries as callable tools.
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType from fenic.api.functions import text # Save processed conversations messages_df.write.save_as_table("conversations", mode="overwrite") # Create search tool session.catalog.create_tool( tool_name="search_conversations", tool_description="Search conversation messages by keyword or topic", tool_query=session.table("conversations").filter( fc.col("text").like( text.concat(fc.lit("%"), fc.tool_param("search_term", StringType), fc.lit("%")) ) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam( name="search_term", description="Keyword or phrase to find", has_default=True, default_value="" ), ToolParam( name="limit", description="Maximum results to return", has_default=True, default_value=20 ) ] )
Create analytics tools:
python# Conversation statistics by category session.catalog.create_tool( tool_name="category_stats", tool_description="Get conversation counts and sentiment by category", tool_query=session.table("conversations").filter( fc.col("issue_category") == fc.tool_param("category", StringType) ).agg( fc.count("*").alias("total_messages"), fc.count_distinct("file_path").alias("conversation_count"), fc.avg( fc.when(fc.col("sentiment") == "positive", 1) .when(fc.col("sentiment") == "negative", -1) .otherwise(0) ).alias("avg_sentiment_score") ), tool_params=[ ToolParam( name="category", description="Issue category to analyze", has_default=True, default_value="Technical Problem" ) ] )
Launch an MCP server:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get registered tools tools = session.catalog.list_tools() # Create and run server server = create_mcp_server( session, "ConversationAnalysisServer", user_defined_tools=tools, concurrency_limit=8 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Agents can now query conversation data through well-defined tools without direct DataFrame access. See the transcript processing guide for integration with LangChain.
Production Deployment Patterns
Production conversation analysis requires reliability, monitoring, and efficient resource usage.
Batch Processing for Scale
Process conversations in batches to maximize throughput:
pythonfrom fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel # Configure model for higher throughput config = SessionConfig( app_name="conversation_batch", semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, # Higher rate limit tpm=200000 ) } ) ) # Process in manageable chunks for batch in conversation_batches: batch_df = session.create_dataframe(batch) # Apply transformations processed = (batch_df .with_column("category", semantic.classify(fc.col("text"), categories)) .with_column("sentiment", semantic.analyze_sentiment(fc.col("text"))) ) # Persist results processed.write.save_as_table( "processed_conversations", mode="append" )
Cost Management
Control inference costs through strategic processing:
python# Deduplicate before expensive operations unique_messages = messages_df.select( fc.col("text") ).distinct() # Process unique messages only processed_unique = unique_messages.with_column( "classification", semantic.classify(fc.col("text"), categories) ) # Join back to original data full_df = messages_df.join( processed_unique, on="text", how="left" )
Error Handling
Handle failures gracefully in production:
python# Add error recovery columns safe_df = messages_df.with_column( "classification", fc.when( fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 0), semantic.classify(fc.col("text"), categories) ).otherwise(fc.lit("unclassified")) ) # Log failed records failed = messages_df.filter( fc.col("text").is_null() | (fc.length(fc.col("text")) == 0) ).select( fc.col("file_path"), fc.lit("Empty or null message").alias("error_reason") ) failed.write.csv("processing_errors.csv")
Monitoring and Metrics
Track pipeline performance:
python# Access built-in metrics metrics = session.table("fenic_system.query_metrics") # Aggregate by operation type operation_stats = metrics.group_by("operation").agg( fc.count("*").alias("call_count"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency_ms"), fc.sum("input_tokens").alias("total_input_tokens") ) operation_stats.show()
Production systems benefit from caching intermediate results, monitoring token usage, and implementing circuit breakers for model failures.
Complete Conversation Analysis Pipeline
Here's a full implementation combining the techniques above:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel import fenic.api.functions as fc from fenic.api.functions import text, semantic from pydantic import BaseModel, Field # Setup config = SessionConfig( app_name="conversation_insights", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # Load and parse conversations raw_df = session.read.docs( "conversations/**/*.srt", content_type="markdown", recursive=True ) parsed_df = raw_df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) messages_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.col("file_path"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.start_time").alias("timestamp"), fc.col("seg.content").alias("text") ) # Enrich with semantic analysis enriched_df = (messages_df .with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) .with_column( "category", semantic.classify( fc.col("text"), ["Technical Issue", "Billing Question", "Feature Request", "General"] ) ) ) # Generate conversation-level summaries summaries_df = enriched_df.group_by("file_path").agg( fc.count("*").alias("message_count"), fc.count_distinct("speaker").alias("speaker_count"), semantic.reduce( "Summarize this conversation focusing on key issues and outcomes", fc.col("text"), order_by=fc.col("timestamp") ).alias("summary") ) # Save results enriched_df.write.save_as_table("enriched_conversations", mode="overwrite") summaries_df.write.csv("conversation_summaries.csv") # Create MCP tools from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType session.catalog.create_tool( tool_name="search_by_category", tool_description="Find conversations by issue category", tool_query=session.table("enriched_conversations").filter( fc.col("category") == fc.tool_param("category", StringType) ), tool_params=[ ToolParam( name="category", description="Issue category to filter by", has_default=True, default_value="Technical Issue" ) ] ) print("Pipeline complete. Results saved and tools registered.")
This pipeline processes conversations from raw text through semantic enrichment to queryable insights.
Real-World Applications
Organizations use semantic conversation analysis for multiple use cases:
Support Operations: Automatically categorize incoming support chats, route to appropriate teams, and identify escalation-worthy conversations. Track resolution patterns and agent performance through sentiment progression analysis. RudderStack reduced triage time by 95% using similar semantic classification and context extraction.
Sales Intelligence: Extract buying signals from sales call transcripts, identify objections and concerns, and match conversations to relevant case studies or product information. Semantic joins connect prospect questions with existing solutions.
Product Feedback: Cluster feature requests to find common themes, extract specific improvement suggestions, and track sentiment around product areas over time. Classification separates bugs from feature requests from general questions.
Meeting Analytics: Generate action items from meeting transcripts, identify decisions and owners, and create searchable knowledge bases from conversation history. Reduction operations turn hour-long meetings into structured outputs.
Community Moderation: Detect problematic content beyond keyword lists, identify harassment or abuse through semantic understanding, and flag conversations requiring human review based on context.
The common thread: semantic operations handle variations in language that rule-based systems miss, scale to thousands of conversations, and produce structured outputs for downstream systems.
Getting Started
Start with a small set of conversations and basic semantic operations:
- Load 10-20 representative conversations
- Parse into structured format with
parse_transcript() - Apply one semantic operation (classification or sentiment)
- Examine results and adjust categories or prompts
- Expand to full dataset and add more operations
Fenic's local-first design means you can prototype entirely on your machine before scaling to production. The same code runs locally during development and in cloud deployment.
For more examples and ready-to-run implementations, explore the Fenic documentation, GitHub repository, and example notebooks.
Semantic processing transforms conversation data from text dumps into actionable insights. By treating semantic operations as DataFrame primitives, Fenic makes sophisticated analysis accessible without custom LLM integration code. The result is scalable, auditable, and production-ready conversation analysis that actually understands what people are saying.
