Customer support teams generate thousands of hours of conversation data every month. Call transcripts, chat logs, and ticket threads contain critical insights about product issues, customer pain points, and support process bottlenecks. Yet most organizations struggle to extract actionable intelligence from this unstructured data.
The root problem isn't lack of data—it's the absence of infrastructure built for semantic processing at scale. Traditional analytics tools expect structured data in tidy tables. Support transcripts arrive as messy, unstructured text where the signal is buried in conversational noise.
This guide shows how to build production-grade transcript processing pipelines using Fenic, the open-source DataFrame framework from Typedef.ai. You'll learn to parse transcripts, extract structured insights, classify issues, and expose analytics tools that answer questions like "what are the top technical issues in APAC?" or "which support categories show rising sentiment trends?"
Why Traditional Analytics Fails for Support Transcripts
Support conversations differ fundamentally from structured operational data:
Format chaos: Transcripts arrive in multiple formats—SRT from video calls, WebVTT from web meetings, generic conversation logs, and proprietary formats from support platforms. Each requires different parsing logic.
Temporal relationships: Support conversations unfold over time. Context from earlier messages affects later responses. Traditional row-based processing destroys these relationships.
Semantic ambiguity: A customer saying "the payment page is broken" could mean a server error, a UX issue, or a browser compatibility problem. String matching misses these distinctions.
Scale requirements: Processing thousands of daily support interactions requires batch operations, rate limit management, and concurrent inference without manual async coordination.
Organizations typically attempt one of two approaches. The first involves building regex-heavy ETL pipelines that break with every format change. The second throws conversations directly at LLMs with ad-hoc prompts, producing unreliable results and unpredictable costs.
The Typedef Approach: Semantic Operations as DataFrame Primitives
Typedef's infrastructure treats semantic understanding as a native data operation. Rather than bolting AI onto traditional data tools, the platform provides a PySpark-inspired DataFrame API where LLM operations are first-class primitives alongside filter, join, and aggregate.
This inference-first architecture solves the fundamental mismatch between unstructured support data and structured analytics. Developers manipulate transcripts using familiar DataFrame operations while accessing semantic functions like classification, extraction, and sentiment analysis as native capabilities.
The framework handles parsing multiple transcript formats, maintains temporal relationships, batches inference calls efficiently, and provides automatic retry logic and rate limiting. Teams focus on defining what insights to extract rather than managing infrastructure complexity.
Setting Up the Environment
Install Fenic with required dependencies:
bashpip install fenic
Configure a session with your model provider. The framework supports OpenAI, Anthropic, Google, Cohere, and OpenRouter:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel config = SessionConfig( app_name="support_analytics", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=50000 ) }, default_language_model="gpt4" ) ) session = Session.get_or_create(config)
This configuration defines throughput limits (requests per minute, tokens per minute) that the framework uses for automatic rate limiting and batching optimization.
Parsing Transcript Formats
Fenic's transcript processing capabilities unify multiple formats into a single schema. The parse_transcript function handles SRT, WebVTT, and generic conversation formats:
pythonimport fenic.api.functions as fc from fenic.api.functions import text # Load transcript files df = session.read.docs( "support_transcripts/**/*.txt", content_type="markdown", recursive=True ) # Parse based on format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments into individual rows segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("segment") ).select( fc.col("file_path"), fc.col("segment.index").alias("index"), fc.col("segment.speaker").alias("speaker"), fc.col("segment.start_time").alias("start_time"), fc.col("segment.end_time").alias("end_time"), fc.col("segment.content").alias("text") )
The unified schema provides:
index: Entry number for orderingspeaker: Speaker identifier when availablestart_time/end_time: Temporal boundaries in secondscontent: Transcript textformat: Original format type
This standardization eliminates format-specific processing logic downstream. Whether analyzing call recordings, chat sessions, or ticket threads, the same analytics code applies.
Extracting Structured Support Data
Raw transcripts contain implicit structure—issue categories, severity levels, product areas, customer tiers. Schema-driven extraction transforms this unstructured text into typed data:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class SupportIssue(BaseModel): category: Literal["Technical", "Billing", "Feature Request", "Product Question"] severity: Literal["Low", "Medium", "High", "Critical"] product_area: str = Field(description="Specific product component or feature") resolution_provided: bool # Extract structured data from conversations extracted_df = segments_df.group_by("file_path").agg( fc.semantic.extract( fc.col("text"), SupportIssue ).alias("issue_data") ).unnest("issue_data") # Filter and analyze critical_technical = extracted_df.filter( (fc.col("category") == "Technical") & (fc.col("severity") == "Critical") )
The Pydantic schema acts as both documentation and validation. The framework guarantees type-safe results while eliminating brittle prompt engineering. When business requirements change, update the schema rather than tuning prompts.
Classification for Support Analytics
Support teams need to categorize conversations by topic, urgency, and customer segment. Semantic classification applies these taxonomies at scale:
python# Classify support interactions classified_df = segments_df.with_column( "category", fc.semantic.classify( fc.col("text"), ["Technical Issue", "Billing Question", "Feature Request", "Product Question", "Account Management", "Bug Report"] ) ).with_column( "urgency", fc.semantic.classify( fc.col("text"), ["Urgent", "Normal", "Low Priority"] ) ) # Aggregate by category category_distribution = classified_df.group_by("category").agg( fc.count("*").alias("interaction_count"), fc.count(fc.col("file_path")).alias("conversation_count") )
Classification happens in batch with automatic request optimization. The framework deduplicates similar text before inference, batches requests to respect rate limits, and handles retries transparently.
Sentiment Analysis Across Support Interactions
Sentiment trends reveal customer satisfaction patterns and product friction points:
python# Add sentiment analysis sentiment_df = segments_df.with_column( "sentiment", fc.semantic.analyze_sentiment(fc.col("text")) ) # Track sentiment by speaker speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") ) # Temporal sentiment analysis windowed_df = sentiment_df.with_column( "time_bucket", (fc.col("start_time") / 300).cast(IntegerType) * 300 # 5-minute buckets ) temporal_sentiment = windowed_df.group_by("file_path", "time_bucket").agg( fc.count("*").alias("segment_count"), fc.sum(fc.when(fc.col("sentiment") == "positive", 1).otherwise(0)).alias("positive"), fc.sum(fc.when(fc.col("sentiment") == "negative", 1).otherwise(0)).alias("negative") )
Temporal bucketing reveals sentiment evolution during conversations. A support call that starts negative but ends positive indicates successful issue resolution. Conversely, deteriorating sentiment flags escalation risks.
Semantic Filtering for Focused Analysis
Traditional filtering requires exact string matches. Semantic predicates enable content-based filtering using natural language:
python# Find segments discussing specific issues payment_issues = segments_df.filter( fc.semantic.predicate( "This text discusses payment processing, checkout, or billing problems: {{ text }}", text=fc.col("text") ) ) # Identify escalation language escalation_segments = segments_df.filter( fc.semantic.predicate( "This customer is expressing frustration or requesting manager escalation: {{ text }}", text=fc.col("text") ) ) # Find resolution confirmations resolved_segments = segments_df.filter( fc.semantic.predicate( "The customer confirms their issue is resolved or they are satisfied: {{ text }}", text=fc.col("text") ) )
Semantic filtering captures intent that keyword matching misses. A customer saying "I still can't complete my order" and "the checkout keeps failing" both indicate payment issues despite different wording.
Building Analytics Dashboards with Summarization
Support managers need executive summaries, not raw transcripts. The semantic.reduce operator aggregates conversation data into concise summaries:
pythonfrom fenic.core.types.summarize import KeyPoints, Paragraph # Generate conversation summaries summaries_df = segments_df.group_by("file_path").agg( fc.count("*").alias("segment_count"), fc.semantic.reduce( "Summarize this support conversation, highlighting the customer's issue, resolution steps taken, and outcome", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ) # Create daily executive summaries daily_summary = classified_df.filter( fc.col("category") == "Technical Issue" ).group_by( fc.regexp_extract(fc.col("file_path"), r"(\d{4}-\d{2}-\d{2})", 1).alias("date") ).agg( fc.count("*").alias("issue_count"), fc.semantic.reduce( "Summarize the common technical issues from today's support conversations", fc.col("text"), order_by=fc.col("start_time") ).alias("daily_summary") )
Summaries maintain temporal ordering through the order_by parameter. This ensures logical flow in conversation recaps where context from earlier messages informs later understanding.
Cross-Conversation Analysis with Semantic Joins
Finding patterns across multiple conversations requires semantic similarity matching. Semantic joins connect related issues without exact keyword overlap:
python# Extract key issues from different time periods week1_issues = segments_df.filter( fc.col("file_path").like("%2025-01-01%") ).select( fc.col("text").alias("week1_text"), fc.col("file_path").alias("week1_file") ) week2_issues = segments_df.filter( fc.col("file_path").like("%2025-01-08%") ).select( fc.col("text").alias("week2_text"), fc.col("file_path").alias("week2_file") ) # Find recurring issues across weeks recurring_issues = week1_issues.semantic.join( other=week2_issues, predicate=""" Week 1: {{ left_on }} Week 2: {{ right_on }} These support conversations discuss the same underlying product issue or user problem. """, left_on=fc.col("week1_text"), right_on=fc.col("week2_text") )
This pattern identifies recurring product issues that span multiple customer interactions. Unlike exact duplicate detection, semantic joins catch variations in how different customers describe the same underlying problem.
Real-World Results: RudderStack Case Study
RudderStack reduced triage time by 95% using Typedef's transcript and ticket processing capabilities. Their product surface spans event streaming, transformations, activation, and hundreds of integrations. Support signals arrived from sales calls, tickets, and documentation—mostly unstructured data creating triage backlogs.
The implementation ingested support ticket threads and sales call transcripts into a warehouse-native context layer. A triage agent running in Typedef classifies new requests, links to prior work, cites strategy documentation, and proposes decisions. Product managers approve in one pass rather than manual triage.
Key results:
- 95% reduction in PM time per triage
- 90%+ first-pass category acceptance
- Citations enabled prospect and community signals surfaced directly in Linear
- Coverage tracking showed 80-90% parsing reliability, providing clear signals when templates needed updates
The warehouse-native architecture kept taxonomy current with documentation changes. Semantic links between issues and PRDs provided explainability and higher accuracy. Linear write-back kept product managers in their existing workflow.
Creating MCP Tools for Agent Integration
The Model Context Protocol integration exposes DataFrame analytics as tools that agents can call:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType # Save processed data classified_df.write.save_as_table("support_transcripts", mode="overwrite") # Create search tool session.catalog.create_tool( tool_name="search_support_issues", tool_description="Search support conversations by category, severity, or keyword", tool_query=session.table("support_transcripts").filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("text").like(fc.lit("%").concat( fc.tool_param("keyword", StringType) ).concat(fc.lit("%")))) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam(name="category", description="Support category filter", default_value="Technical Issue"), ToolParam(name="keyword", description="Search term", default_value=""), ToolParam(name="limit", description="Max results", default_value=20) ] ) # Create analytics tool session.catalog.create_tool( tool_name="analyze_category_trends", tool_description="Get statistics and trends for a specific support category", tool_query=session.table("support_transcripts").filter( fc.col("category") == fc.tool_param("category", StringType) ).agg( fc.count("*").alias("total_issues"), fc.sum(fc.when(fc.col("sentiment") == "positive", 1).otherwise(0)).alias("positive_sentiment"), fc.sum(fc.when(fc.col("sentiment") == "negative", 1).otherwise(0)).alias("negative_sentiment"), fc.semantic.reduce( "Summarize common patterns in these support issues", fc.col("text"), order_by=fc.col("start_time") ).alias("trend_summary") ), tool_params=[ ToolParam(name="category", description="Support category", default_value="Technical Issue") ] )
Launch the MCP server to expose these tools:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get registered tools tools = session.catalog.list_tools() # Create and run server server = create_mcp_server( session, "SupportAnalytics", user_defined_tools=tools, concurrency_limit=8 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Agents can now query support analytics using natural language. Questions like "what are the top technical issues this week?" or "show me high-severity billing problems" trigger the appropriate tools and return structured results.
Production Deployment Patterns
Monitoring and Cost Tracking
Track inference costs and performance through built-in metrics:
python# Access query metrics metrics = session.table("fenic_system.query_metrics") metrics.select( fc.col("model"), fc.col("latency_ms"), fc.col("cost_usd"), fc.col("input_tokens"), fc.col("output_tokens") ).order_by(fc.col("cost_usd").desc()).show() # Aggregate by operation metrics.group_by("operation").agg( fc.count("*").alias("call_count"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency") ).show()
Cost visibility prevents budget surprises. Organizations report 54% infrastructure time reduction through semantic automation, with 40% data engineering savings and 44% modeling time reduction.
Error Handling for Production
Handle processing failures gracefully:
python# Add error handling safe_df = segments_df.with_column( "issue_category", fc.when( fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 10), fc.semantic.classify(fc.col("text"), ["Technical", "Billing", "Other"]) ).otherwise(fc.lit("unclassified")) ) # Log failures failed_df = segments_df.filter( fc.col("text").is_null() | (fc.length(fc.col("text")) <= 10) ).select( fc.col("file_path"), fc.lit("Invalid transcript content").alias("error") ) failed_df.write.csv("processing_errors.csv")
Batch Optimization
Control concurrency and batching through model configuration:
pythonconfig = SessionConfig( app_name="support_analytics", semantic=SemanticConfig( language_models={ "high_throughput": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, # Higher throughput tpm=100000 ) } ) )
The framework automatically batches inference calls based on these limits. For transcript processing at scale, organizations report 45% reduction in time-to-insight through efficient batching and caching.
Complete Support Analytics Pipeline
Here's a production-ready pipeline combining parsing, classification, sentiment analysis, and tool creation:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel import fenic.api.functions as fc from fenic.api.functions import text, semantic from pydantic import BaseModel, Field from typing import Literal # Setup config = SessionConfig( app_name="support_analytics", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=100000 ) } ) ) session = Session.get_or_create(config) # Load and parse transcripts raw_df = session.read.docs( "transcripts/**/*.srt", content_type="markdown", recursive=True ) parsed_df = raw_df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.col("file_path"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.start_time").alias("start_time"), fc.col("seg.content").alias("text") ) # Enrich with analytics enriched_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ).with_column( "category", semantic.classify( fc.col("text"), ["Technical", "Billing", "Feature Request", "Product Question"] ) ).with_column( "urgency", semantic.classify(fc.col("text"), ["Critical", "High", "Medium", "Low"]) ) # Generate summaries summaries_df = enriched_df.group_by("file_path").agg( fc.count("*").alias("segment_count"), semantic.reduce( "Summarize this support conversation", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ) # Save for analytics enriched_df.write.save_as_table("support_analytics", mode="overwrite") summaries_df.write.csv("support_summaries.csv") # Create MCP tools from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType session.catalog.create_tool( tool_name="query_support_data", tool_description="Query support conversations by category and urgency", tool_query=session.table("support_analytics").filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("urgency") == fc.tool_param("urgency", StringType)) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam(name="category", description="Issue category", default_value="Technical"), ToolParam(name="urgency", description="Urgency level", default_value="High"), ToolParam(name="limit", description="Max results", default_value=50) ] ) # Launch MCP server from fenic.api.mcp import create_mcp_server, run_mcp_server_sync server = create_mcp_server( session, "SupportAnalytics", user_defined_tools=session.catalog.list_tools() ) run_mcp_server_sync(server, transport="http", port=8000)
Best Practices for Production
Deduplicate before inference: Remove duplicate text segments before applying semantic operations. This reduces API costs significantly for conversations with repeated phrases or boilerplate language.
Cache intermediate results: Save enriched DataFrames to avoid reprocessing. Fenic's explicit caching speeds up iterative development and reduces unnecessary API calls.
Filter before expensive operations: Apply cheap filters (text length, speaker filters, date ranges) before semantic operations to minimize inference volume:
python# Only analyze substantial customer messages customer_messages = segments_df.filter( (fc.col("speaker") == "customer") & (fc.length(fc.split(fc.col("text"), " ")) > 5) ).with_column( "category", semantic.classify(fc.col("text"), ["Technical", "Billing", "Feature"]) )
Leverage row-level lineage: When debugging unexpected results, trace individual records through the pipeline to identify where transformations produced incorrect outputs.
Monitor coverage metrics: Track what percentage of transcripts parse successfully. Declining coverage indicates format changes requiring template updates.
Next Steps
Transcript processing unlocks customer support analytics at scale. By treating semantic operations as DataFrame primitives, teams build reliable pipelines that extract structured insights from conversational data.
The patterns outlined here—parsing multiple formats, schema-driven extraction, semantic classification, and MCP tool exposure—apply equally to sales call analysis, product feedback processing, and compliance monitoring.
For teams building support analytics infrastructure:
- Explore Fenic's GitHub repository for additional examples and documentation
- Review the semantic operators guide for advanced DataFrame operations
- Check Typedef's latest release for new features and optimizations
Start locally with the open-source framework, validate your pipeline on a subset of support data, then deploy to Typedef's cloud for zero-code scaling to production volumes.
