Processing audio transcripts at scale requires robust data infrastructure that handles both structured operations and semantic understanding. Conversational AI systems demand pipelines that parse multiple transcript formats, extract structured information, and generate insights without brittle glue code or manual prompt engineering.
This guide demonstrates how to build production-ready transcript processing pipelines using Fenic, the open-source DataFrame framework from Typedef. The framework treats LLM inference as a first-class operation, enabling deterministic workflows on non-deterministic models.
Understanding Transcript Processing Requirements
Conversational AI applications require specialized capabilities beyond traditional text processing:
Format Handling
- Parse SRT (SubRip Subtitle), WebVTT (Web Video Text Tracks), and generic conversation formats
- Maintain temporal relationships between conversation segments
- Preserve speaker identity through transformations
- Handle timestamp synchronization across different formats
Semantic Analysis
- Extract structured entities and relationships from unstructured dialogue
- Classify conversation segments by topic, intent, or sentiment
- Aggregate insights across multiple speakers and time windows
- Generate summaries that preserve conversation context
Production Reliability
- Batch process hundreds or thousands of transcripts efficiently
- Manage rate limits across multiple model providers
- Implement automatic retry logic for transient failures
- Track lineage for audit trails and debugging
Fenic addresses these requirements through a DataFrame API where inference operations receive the same optimization treatment as traditional data operations.
Setting Up the Processing Environment
Install Fenic with Python 3.10, 3.11, or 3.12:
bashpip install fenic
Configure model providers through environment variables:
bashexport OPENAI_API_KEY="your-openai-api-key" export ANTHROPIC_API_KEY="your-anthropic-api-key" export GOOGLE_API_KEY="your-google-api-key"
Initialize a session with semantic configuration:
pythonimport fenic as fc from fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.api.session.config import OpenAILanguageModel, AnthropicLanguageModel from fenic.core.types.inference.anthropic import AnthropicLanguageModel config = SessionConfig( app_name="transcript_processor", semantic=SemanticConfig( language_models={ "mini": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ), "claude": AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100_000, output_tpm=100_000 ) }, default_language_model="mini" ) ) session = Session.get_or_create(config)
This configuration establishes model aliases that abstract provider-specific details. Rate limiting parameters (rpm for requests per minute, tpm for tokens per minute) prevent throttling while Fenic automatically batches requests and implements self-throttling to maximize throughput.
Parsing Transcript Formats
Fenic provides native support for three transcript formats through the parse_transcript function.
Supported Format Specifications
SRT Format (SubRip Subtitle)
1
00:00:01,000 --> 00:00:05,000
First line of dialogue
2
00:00:05,500 --> 00:00:10,000
Second line of dialogue
WebVTT Format (Web Video Text Tracks)
WEBVTT
Speaker 1
00:00:01.000 --> 00:00:05.000
First line of dialogue
Speaker 2
00:00:05.500 --> 00:00:10.000
Second line of dialogue
Generic Conversation Format
[00:00:01] Speaker 1: First line of dialogue
[00:00:05] Speaker 2: Second line of dialogue
Unified Parsing Implementation
pythonimport fenic.api.functions as fc from fenic.api.functions import text # Load transcript files df = session.read.docs( "transcripts/**/*.txt", content_type="markdown", recursive=True ) # Parse SRT format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments into rows segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("segment") ).select( fc.col("file_path"), fc.col("segment.index").alias("index"), fc.col("segment.speaker").alias("speaker"), fc.col("segment.start_time").alias("start_time"), fc.col("segment.end_time").alias("end_time"), fc.col("segment.content").alias("text") )
The unified schema provides consistent access to:
index: Entry number (1-based indexing)speaker: Speaker identifier when availablestart_time: Timestamp in floating-point secondsend_time: End timestamp in secondsduration: Calculated segment durationcontent: Transcript text contentformat: Original format type for lineage tracking
Applying Semantic Operations to Transcripts
Fenic's semantic operators enable content understanding without custom inference code.
Classification of Conversation Segments
Categorize transcript segments by topic or intent using semantic.classify:
pythonfrom fenic.api.functions import semantic # Classify support call segments classified_df = segments_df.with_column( "category", semantic.classify( fc.col("text"), ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"] ) )
For more precise categorization, use ClassDefinition objects with descriptions:
pythonfrom fenic.core.types.classify import ClassDefinition categories = [ ClassDefinition( label="Technical Issue", description="Problems with product functionality, bugs, or technical difficulties" ), ClassDefinition( label="Billing Question", description="Questions about charges, payments, or subscriptions" ), ClassDefinition( label="Feature Request", description="Requests for new capabilities or improvements" ), ClassDefinition( label="General Inquiry", description="General questions or non-technical assistance" ) ] classified_df = segments_df.with_column( "category", semantic.classify(fc.col("text"), categories) )
Sentiment Analysis Across Conversations
Analyze emotional tone with semantic.analyze_sentiment:
python# Add sentiment scores sentiment_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Aggregate by speaker speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") )
The function returns one of three values: 'positive', 'negative', or 'neutral', enabling straightforward aggregation and filtering.
Structured Information Extraction
Extract structured entities using semantic.extract with Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List class ActionItem(BaseModel): task: str = Field(description="The action to be taken") assignee: str = Field(description="Person responsible") deadline: str = Field(description="Due date if mentioned") # Extract action items from meeting segments actions_df = segments_df.select( fc.col("file_path"), fc.col("text"), semantic.extract( fc.col("text"), ActionItem ).alias("action_item") ).filter( fc.col("action_item").is_not_null() )
The schema provides type-safe extraction where field descriptions guide the model. This approach eliminates manual JSON parsing and validation, handling schema validation, error handling, and retries automatically.
Conversation Summarization
Generate summaries using semantic.reduce for aggregation:
python# Summarize each file's transcript summary_df = segments_df.group_by("file_path").agg( semantic.reduce( "Summarize this conversation highlighting key decisions and outcomes", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") )
The order_by parameter preserves temporal sequence, ensuring summaries respect conversation flow rather than processing segments in arbitrary order.
Semantic Filtering with Natural Language
Filter transcripts using natural language predicates:
python# Find segments discussing specific topics relevant_df = segments_df.filter( semantic.predicate( "This text discusses product features or roadmap planning: {{ text }}", text=fc.col("text") ) )
The predicate evaluates per row, allowing complex content-based filtering without regex patterns or keyword matching.
Building Production Transcript Analysis Pipelines
Multi-File Processing Pattern
Process directories of transcripts with format detection:
python# Load all transcripts all_transcripts = session.read.docs( ["meetings/**/*.srt", "calls/**/*.vtt"], content_type="markdown", recursive=True ) # Parse based on file extension parsed = all_transcripts.with_column( "format", fc.when(fc.col("file_path").like("%.srt"), fc.lit("srt")) .when(fc.col("file_path").like("%.vtt"), fc.lit("webvtt")) .otherwise(fc.lit("generic")) ).with_column( "segments", text.parse_transcript(fc.col("content"), fc.col("format")) ) # Enrich with metadata enriched = parsed.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.regexp_extract( fc.col("file_path"), r"(\d{4}-\d{2}-\d{2})", 1 ).alias("date"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.content").alias("text"), semantic.classify( fc.col("seg.content"), ["Question", "Answer", "Statement", "Action Item"] ).alias("utterance_type") )
Temporal Analysis Implementation
Track sentiment changes over time with windowed aggregation:
python# Add time windows (5-minute buckets) windowed_df = segments_df.with_column( "time_bucket", (fc.col("start_time") / 300).cast("integer") * 300 ).with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Aggregate by time window temporal_sentiment = windowed_df.group_by("file_path", "time_bucket").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative") )
This pattern enables identification of conversation tone shifts, useful for quality monitoring in customer support or sales calls.
Speaker Diarization Enhancement
Enrich transcripts with speaker metadata through fuzzy joins:
pythonfrom fenic.api.functions import text as text_fn # Fuzzy join on speaker names joined_df = segments_df.join( speakers_df, text_fn.compute_fuzzy_ratio( fc.col("speaker"), speakers_df["speaker_name"], method="jaro_winkler" ) > 85, "left" ).select( fc.col("file_path"), fc.coalesce(speakers_df["full_name"], fc.col("speaker")).alias("speaker"), speakers_df["department"].alias("department"), fc.col("text"), fc.col("start_time") )
Fuzzy string matching with six algorithms (Levenshtein, Jaro-Winkler, and others) handles variations in speaker name formatting without manual mapping.
Cross-Transcript Semantic Relationships
Link related discussion points across different transcripts:
python# Extract key topics from each transcript topics_df1 = segments_df.filter( fc.col("file_path").like("%meeting_1%") ).select( fc.col("text").alias("meeting_1_text") ) topics_df2 = segments_df.filter( fc.col("file_path").like("%meeting_2%") ).select( fc.col("text").alias("meeting_2_text") ) # Semantic join to find related discussions related = topics_df1.semantic.join( other=topics_df2, predicate=""" Meeting 1: {{ left_on }} Meeting 2: {{ right_on }} These segments discuss the same topic or decision. """, left_on=fc.col("meeting_1_text"), right_on=fc.col("meeting_2_text") )
Semantic joins determine matches based on meaning rather than exact values, ideal for tracking decisions across multiple meetings or identifying recurring themes.
Creating MCP Tools for Agent Integration
The Model Context Protocol enables AI agents to access transcript analysis without leaving their execution context. Fenic's declarative tool support transforms DataFrame queries into reusable tools.
Declarative Tool Registration
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType # Save processed transcripts as a table segments_df.write.save_as_table("transcripts", mode="overwrite") # Create a search tool session.catalog.create_tool( tool_name="search_transcripts", tool_description="Search transcript segments by keyword or phrase", tool_query=session.table("transcripts").filter( fc.col("text").like(fc.lit("%").concat( fc.tool_param("search_term", StringType) ).concat(fc.lit("%"))) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam( name="search_term", description="Keyword or phrase to search for", default_value="" ), ToolParam( name="limit", description="Maximum number of results", default_value=10 ) ], result_limit=50 ) # Create an analytics tool session.catalog.create_tool( tool_name="analyze_speaker", tool_description="Get statistics and sentiment for a specific speaker", tool_query=session.table("transcripts").filter( fc.col("speaker") == fc.tool_param("speaker_name", StringType) ).agg( fc.count("*").alias("total_segments"), fc.avg(fc.col("end_time") - fc.col("start_time")).alias("avg_segment_duration"), semantic.reduce( "Summarize this speaker's main points and communication style", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ), tool_params=[ ToolParam( name="speaker_name", description="Name or identifier of the speaker", default_value="" ) ] )
MCP Server Deployment
Launch the server to expose tools:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get all registered tools tools = session.catalog.list_tools() # Create server server = create_mcp_server( session, "TranscriptServer", user_defined_tools=tools, concurrency_limit=8 ) # Run with HTTP transport run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Agents can now query transcript data without expensive inference in user-facing paths, providing predictable response times while leveraging Fenic's batch processing capabilities.
Optimizing Performance and Costs
Batch Size Configuration
Control concurrency through model configuration:
pythonconfig = SessionConfig( app_name="transcript_processor", semantic=SemanticConfig( language_models={ "fast_model": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, # Higher throughput tpm=200_000 ) } ) )
Fenic automatically batches inference calls, but rate limit parameters ensure operations stay within provider constraints while maximizing throughput.
Selective Processing Strategy
Filter before applying expensive operations:
python# Only analyze segments longer than 10 words filtered_df = segments_df.filter( fc.length(fc.split(fc.col("text"), " ")) > 10 ).with_column( "summary", semantic.summarize(fc.col("text")) )
The query optimizer pushes filters down to reduce data volume before inference, minimizing costs.
Caching Intermediate Results
Save enriched data to avoid recomputation:
python# Save enriched data enriched_df.write.save_as_table("enriched_transcripts", mode="overwrite") # Load for subsequent processing cached_df = session.table("enriched_transcripts")
Explicit caching eliminates redundant API calls during iterative development and testing.
Metrics Tracking and Monitoring
Access built-in metrics for optimization:
python# Access metrics metrics = session.table("fenic_system.query_metrics") metrics.select( fc.col("model"), fc.col("latency_ms"), fc.col("cost_usd"), fc.col("input_tokens"), fc.col("output_tokens") ).order_by(fc.col("cost_usd").desc()).show() # Aggregate by operation metrics.group_by("operation").agg( fc.count("*").alias("call_count"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency") ).show()
Built-in token counting and cost tracking provide visibility into resource usage for targeted optimization.
Complete Pipeline Example
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.api.session.config import OpenAILanguageModel, AnthropicLanguageModel import fenic.api.functions as fc from fenic.api.functions import text, semantic from pydantic import BaseModel, Field from typing import List # 1. Setup config = SessionConfig( app_name="transcript_analysis", semantic=SemanticConfig( language_models={ "mini": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ) } ) ) session = Session.get_or_create(config) # 2. Load and parse transcripts raw_df = session.read.docs( "transcripts/**/*.srt", content_type="markdown", recursive=True ) parsed_df = raw_df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.col("file_path"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.start_time").alias("start_time"), fc.col("seg.content").alias("text") ) # 3. Enrich with semantic analysis enriched_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ).with_column( "category", semantic.classify( fc.col("text"), ["Technical", "Business", "Administrative", "Other"] ) ) # 4. Generate summaries per file summaries_df = enriched_df.group_by("file_path").agg( fc.count("*").alias("segment_count"), semantic.reduce( "Create a concise summary of this conversation", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ) # 5. Save results enriched_df.write.save_as_table("processed_transcripts", mode="overwrite") summaries_df.write.csv("transcript_summaries.csv") # 6. Create MCP tools from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType session.catalog.create_tool( tool_name="query_transcripts", tool_description="Search and filter processed transcripts", tool_query=session.table("processed_transcripts").filter( fc.col("category") == fc.tool_param("category", StringType) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam(name="category", description="Filter by category", default_value="Technical"), ToolParam(name="limit", description="Max results", default_value=20) ] ) # 7. Launch MCP server from fenic.api.mcp import create_mcp_server, run_mcp_server_sync server = create_mcp_server( session, "TranscriptAnalysis", user_defined_tools=session.catalog.list_tools() ) run_mcp_server_sync(server, transport="http", port=8000)
This complete pipeline demonstrates the key patterns for production transcript processing: parsing, semantic enrichment, aggregation, persistence, and tool creation.
Best Practices for Transcript Processing
Strategic Model Selection
Use smaller models for simple tasks and reserve expensive models for complex reasoning:
python# Classification with cheaper model .with_column( "category", semantic.classify(fc.col("text"), categories, model_alias="mini") ) # Complex extraction with more capable model .with_column( "detailed_analysis", semantic.extract(fc.col("text"), ComplexSchema, model_alias="claude") )
The cost difference between models can reach 100x. Strategic selection reduces expenses by 80% while maintaining quality for appropriate tasks.
Schema Design with Clear Descriptions
Field descriptions guide extraction accuracy:
pythonclass MeetingInsights(BaseModel): key_decisions: List[str] = Field( description="Major decisions made during the meeting" ) action_items: List[str] = Field( description="Specific tasks assigned to team members with deadlines" ) blockers: List[str] = Field( description="Obstacles or issues preventing progress" ) participants: List[str] = Field( description="Full names of all meeting participants" )
Clear descriptions with examples and constraints improve results significantly while Literal types constrain outputs to valid categories.
Incremental Testing with Small Datasets
Validate logic and estimate costs before scaling:
python# Development: 100 rows df_sample = df.limit(100) result = df_sample.with_column( "extracted", semantic.extract(fc.col("text"), schema) ).collect() print(f"Cost for 100 rows: ${result.metrics().lm_metrics.total_cost}") # Validate results, then scale df_full.with_column( "extracted", semantic.extract(fc.col("text"), schema) ).write.parquet("output/")
Lazy evaluation and metrics enable cost estimation before processing millions of rows.
Hybrid Fuzzy and Semantic Matching
Use fuzzy matching for initial candidate selection before expensive semantic operations:
python# Fast fuzzy scoring for blocking candidates = ( left_df.join(right_df) # Cross join .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) # Expensive semantic matching on candidates only final = candidates.semantic.join( predicate="Are these the same company? Left: {{left_on}}, Right: {{right_on}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This hybrid approach reduces costs by orders of magnitude compared to semantic joins on full cross-products.
Production Deployment Patterns
ASGI Server Integration
Deploy MCP servers in production environments:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi server = create_mcp_server( session, "TranscriptServer", user_defined_tools=session.catalog.list_tools() ) app = run_mcp_server_asgi( server, stateless_http=True, path="/mcp" ) # Launch with uvicorn # uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
Error Handling Implementation
Handle failures gracefully in production pipelines:
python# Add error columns safe_df = segments_df.with_column( "classification", fc.when( fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 0), semantic.classify(fc.col("text"), ["Type A", "Type B"]) ).otherwise(fc.lit("unclassified")) ) # Log failures failed_df = segments_df.filter( fc.col("text").is_null() ).select( fc.col("file_path"), fc.lit("Null text content").alias("error") ) failed_df.write.csv("processing_errors.csv")
Built-in retry logic and self-throttling handle transient failures automatically, but explicit error handling improves debugging and monitoring.
Integration with Existing Systems
Fenic's lakehouse-native architecture reads and writes standard formats without data movement:
python# Read from existing data lake df = session.read.parquet("s3://data-lake/raw/*.parquet") # Process with semantic operators processed = df.with_column( "extracted", semantic.extract(fc.col("content"), schema) ).with_column( "sentiment", semantic.analyze_sentiment(fc.col("content")) ) # Write back to lake processed.write.parquet("s3://data-lake/processed/")
Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration with existing infrastructure. Built on Apache Arrow for ecosystem interoperability—processed data works with Spark, Polars, DuckDB, and pandas.
Conclusion
Processing audio transcripts for conversational AI insights requires infrastructure that handles format parsing, semantic understanding, and production reliability without brittle glue code. Fenic provides this foundation by treating inference as a first-class operation within a familiar DataFrame API.
The framework's semantic operators eliminate custom prompt engineering for common tasks while automatic batching, retry logic, and rate limiting ensure production stability. Combined with declarative tool creation through MCP, teams can build sophisticated transcript analysis pipelines that serve real-time agents with predictable performance.
How Typedef Cut RudderStack's Triage Time by 95% demonstrates these capabilities in production, processing thousands of support tickets and logs with semantic extraction and classification. Building agentic applications with declarative DataFrame APIs provides additional patterns for combining Fenic with agent frameworks.
Start with simple operations like semantic.classify or semantic.extract on small datasets, validate results and costs, then scale to production with confidence that the infrastructure handles optimization, error handling, and observability automatically.
Additional Resources:
