<< goback()

How to Augment LangChain with Typedef.ai for Transcript Processing

Typedef Team

How to Augment LangChain with Typedef.ai for Transcript Processing

LangChain provides a framework for building applications with language models, but processing transcripts at scale requires robust data infrastructure. Fenic, the open-source DataFrame framework from Typedef.ai, fills this gap by bringing structured batch processing to unstructured transcript data.

This guide shows how to use Fenic as a preprocessing layer for LangChain applications, leverage the Model Context Protocol for agent integration, and process transcripts efficiently with semantic operations.

Why Augment LangChain with Fenic

LangChain excels at orchestrating language model interactions, but transcript processing introduces specific challenges:

Scale and Efficiency

  • Batch processing hundreds or thousands of transcripts
  • Managing rate limits across multiple model providers
  • Concurrent inference without manual async coordination

Data Structure

  • Parsing multiple transcript formats (SRT, WebVTT, generic)
  • Maintaining temporal relationships in conversation data
  • Joining transcript segments with metadata

Production Reliability

  • Deterministic transformations for testing and debugging
  • Query lineage for audit trails
  • Automatic retry logic and error handling

Fenic addresses these requirements with a DataFrame API that handles inference as a first-class operation.

Architecture Patterns

Pattern 1: Preprocessing Layer

Use Fenic to prepare transcript data before LangChain processing:

Transcripts → Fenic (parse, structure, enrich) → LangChain (orchestration, decisions)

Fenic handles batch operations while LangChain manages interactive flows.

Pattern 2: MCP Integration

Expose Fenic operations as Model Context Protocol tools that LangChain agents can call:

LangChain Agent → MCP Server (Fenic tools) → Structured Data

Agents access transcript analysis without leaving their execution context.

Pattern 3: Hybrid Processing

Combine both approaches for advanced workflows:

Fenic (batch enrichment) → Data Store → LangChain (interactive) ↔ MCP (Fenic tools)

Setting Up the Environment

Install Fenic with required dependencies:

bash
pip install fenic

Configure a session with your model providers:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.core.types.inference.openai import OpenAILanguageModel

config = SessionConfig(
    app_name="transcript_processor",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100
            )
        }
    )
)

session = Session.get_or_create(config)

Parsing Transcript Formats

Fenic's parse_transcript function converts multiple formats into a unified schema.

Supported Formats

SRT (SubRip Subtitle)

1
00:00:01,000 --> 00:00:05,000
First line of dialogue

2
00:00:05,500 --> 00:00:10,000
Second line of dialogue

WebVTT (Web Video Text Tracks)

WEBVTT

Speaker 1
00:00:01.000 --> 00:00:05.000
First line of dialogue

Speaker 2
00:00:05.500 --> 00:00:10.000
Second line of dialogue

Generic Conversation

[00:00:01] Speaker 1: First line of dialogue
[00:00:05] Speaker 2: Second line of dialogue

Parsing Implementation

python
import fenic.api.functions as fc
from fenic.api.functions import text

# Load transcript files
df = session.read.docs(
    "transcripts/**/*.txt",
    content_type="markdown",
    recursive=True
)

# Parse SRT format
parsed_df = df.select(
    fc.col("file_path"),
    text.parse_transcript(fc.col("content"), "srt").alias("segments")
)

# Explode segments into rows
segments_df = parsed_df.select(
    fc.col("file_path"),
    fc.explode(fc.col("segments")).alias("segment")
).select(
    fc.col("file_path"),
    fc.col("segment.index").alias("index"),
    fc.col("segment.speaker").alias("speaker"),
    fc.col("segment.start_time").alias("start_time"),
    fc.col("segment.end_time").alias("end_time"),
    fc.col("segment.content").alias("text")
)

The unified schema provides:

  • index: Entry number (1-based)
  • speaker: Speaker identifier (when available)
  • start_time: Timestamp in seconds
  • end_time: End timestamp in seconds
  • duration: Segment duration
  • content: Transcript text
  • format: Original format type

Semantic Operations on Transcripts

Fenic provides semantic functions that apply language models declaratively within DataFrame operations.

Classification

Categorize transcript segments by topic or intent:

python
from fenic.api.functions import semantic

# Classify support call segments
classified_df = segments_df.with_column(
    "category",
    semantic.classify(
        fc.col("text"),
        ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"]
    )
)

Sentiment Analysis

Analyze emotional tone across conversations:

python
# Add sentiment scores
sentiment_df = segments_df.with_column(
    "sentiment",
    semantic.analyze_sentiment(fc.col("text"))
)

# Aggregate by speaker
speaker_sentiment = sentiment_df.group_by("speaker").agg(
    fc.count("*").alias("segment_count"),
    fc.sum(
        fc.when(fc.col("sentiment") == "positive", 1).otherwise(0)
    ).alias("positive_count"),
    fc.sum(
        fc.when(fc.col("sentiment") == "negative", 1).otherwise(0)
    ).alias("negative_count")
)

Extraction

Pull structured information from unstructured dialogue:

python
from pydantic import BaseModel, Field
from typing import List

class ActionItem(BaseModel):
    task: str = Field(description="The action to be taken")
    assignee: str = Field(description="Person responsible")
    deadline: str = Field(description="Due date if mentioned")

# Extract action items from meeting segments
actions_df = segments_df.select(
    fc.col("file_path"),
    fc.col("text"),
    semantic.extract(
        fc.col("text"),
        ActionItem
    ).alias("action_item")
).filter(
    fc.col("action_item").is_not_null()
)

Summarization

Generate concise summaries of conversation sections:

python
from fenic.core.types.summarize import KeyPoints, Paragraph

# Summarize each file's transcript
summary_df = segments_df.group_by("file_path").agg(
    semantic.reduce(
        "Summarize this conversation highlighting key decisions and outcomes",
        fc.col("text"),
        order_by=fc.col("start_time")
    ).alias("summary")
)

Semantic Filtering

Filter transcripts using natural language predicates:

python
# Find segments discussing specific topics
relevant_df = segments_df.filter(
    semantic.predicate(
        "This text discusses product features or roadmap planning: {{ text }}",
        text=fc.col("text")
    )
)

Creating MCP Tools from DataFrames

The Model Context Protocol allows LangChain agents to call Fenic operations as tools.

Declarative Tool Creation

Register DataFrame queries as reusable tools:

python
from fenic.core.mcp.types import ToolParam

# Save processed transcripts as a table
segments_df.write.save_as_table("transcripts", mode="overwrite")

# Create a search tool
from fenic.core.types import StringType, IntegerType

session.catalog.create_tool(
    tool_name="search_transcripts",
    tool_description="Search transcript segments by keyword or phrase",
    tool_query=session.table("transcripts").filter(
        fc.col("text").like(fc.lit("%").concat(
            fc.tool_param("search_term", StringType)
        ).concat(fc.lit("%")))
    ).limit(fc.tool_param("limit", IntegerType)),
    tool_params=[
        ToolParam(
            name="search_term",
            description="Keyword or phrase to search for",
            default_value=""
        ),
        ToolParam(
            name="limit",
            description="Maximum number of results",
            default_value=10
        )
    ],
    result_limit=50
)

# Create an analytics tool
from fenic.core.types import StringType

session.catalog.create_tool(
    tool_name="analyze_speaker",
    tool_description="Get statistics and sentiment for a specific speaker",
    tool_query=session.table("transcripts").filter(
        fc.col("speaker") == fc.tool_param("speaker_name", StringType)
    ).agg(
        fc.count("*").alias("total_segments"),
        fc.avg(fc.col("end_time") - fc.col("start_time")).alias("avg_segment_duration"),
        semantic.reduce(
            "Summarize this speaker's main points and communication style",
            fc.col("text"),
            order_by=fc.col("start_time")
        ).alias("summary")
    ),
    tool_params=[
        ToolParam(
            name="speaker_name",
            description="Name or identifier of the speaker",
            default_value=""
        )
    ]
)

Running an MCP Server

Launch the server to expose tools to LangChain:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

# Get all registered tools
tools = session.catalog.list_tools()

# Create server
server = create_mcp_server(
    session,
    "TranscriptServer",
    user_defined_tools=tools,
    concurrency_limit=8
)

# Run with HTTP transport
run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

LangChain Integration via MCP

Connect LangChain agents to the MCP server:

python
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
import requests

# Create wrapper for Fenic MCP tool
def call_fenic_tool(tool_name: str, **params):
    response = requests.post(
        "http://127.0.0.1:8000/mcp",
        json={
            "tool": tool_name,
            "parameters": params
        }
    )
    return response.json()

# Define LangChain tools
langchain_tools = [
    Tool(
        name="search_transcripts",
        func=lambda q, limit=10: call_fenic_tool(
            "search_transcripts",
            search_term=q,
            limit=limit
        ),
        description="Search transcript segments by keyword or phrase"
    ),
    Tool(
        name="analyze_speaker",
        func=lambda speaker: call_fenic_tool(
            "analyze_speaker",
            speaker_name=speaker
        ),
        description="Get statistics and analysis for a specific speaker"
    )
]

# Create agent
llm = ChatOpenAI(model="gpt-4")
agent = create_openai_functions_agent(llm, langchain_tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=langchain_tools)

# Agent can now call Fenic tools
result = agent_executor.invoke({
    "input": "What did Sarah discuss in the meeting?"
})

Advanced Transcript Processing Workflows

Multi-File Analysis

Process directories of transcripts:

python
from fenic.core.types.summarize import KeyPoints, Paragraph
# Load all transcripts
all_transcripts = session.read.docs(
    ["meetings/**/*.srt", "calls/**/*.vtt"],
    content_type="markdown",
    recursive=True
)

# Parse based on file extension
parsed = all_transcripts.with_column(
    "format",
    fc.when(fc.col("file_path").like("%.srt"), fc.lit("srt"))
    .when(fc.col("file_path").like("%.vtt"), fc.lit("webvtt"))
    .otherwise(fc.lit("generic"))
).with_column(
    "segments",
    text.parse_transcript(fc.col("content"), fc.col("format"))
)

# Enrich with metadata
enriched = parsed.select(
    fc.col("file_path"),
    fc.explode(fc.col("segments")).alias("seg")
).select(
    fc.regexp_extract(
        fc.col("file_path"),
        r"(\d{4}-\d{2}-\d{2})",
        1
    ).alias("date"),
    fc.col("seg.speaker").alias("speaker"),
    fc.col("seg.content").alias("text"),
    semantic.classify(
        fc.col("seg.content"),
        ["Question", "Answer", "Statement", "Action Item"]
    ).alias("utterance_type")
)

Temporal Analysis

Track sentiment changes over time:

python
# Add time windows
windowed_df = segments_df.with_column(
    "time_bucket",
    (fc.col("start_time") / 300).cast("integer") * 300  # 5-minute buckets
).with_column(
    "sentiment",
    semantic.analyze_sentiment(fc.col("text"))
)

# Aggregate by time window
temporal_sentiment = windowed_df.group_by("file_path", "time_bucket").agg(
    fc.count("*").alias("segment_count"),
    fc.sum(
        fc.when(fc.col("sentiment") == "positive", 1).otherwise(0)
    ).alias("positive"),
    fc.sum(
        fc.when(fc.col("sentiment") == "negative", 1).otherwise(0)
    ).alias("negative")
)

Speaker Diarization Enhancement

Join speaker metadata with transcript segments:

python
# Load speaker information
speakers_df = session.read.csv("speaker_metadata.csv")

# Fuzzy join on speaker names
from fenic.api.functions import text as text_fn

joined_df = segments_df.join(
    speakers_df,
    text_fn.compute_fuzzy_ratio(
        fc.col("speaker"),
        speakers_df["speaker_name"],
        "jaro_winkler"
    ) > 85,
    "left"
).select(
    fc.col("file_path"),
    fc.coalesce(speakers_df["full_name"], fc.col("speaker")).alias("speaker"),
    speakers_df["department"].alias("department"),
    fc.col("text"),
    fc.col("start_time")
)

Cross-Transcript Semantic Join

Link related discussion points across different transcripts:

python
# Extract key topics from each transcript
topics_df1 = segments_df.filter(
    fc.col("file_path").like("%meeting_1%")
).select(
    fc.col("text").alias("meeting_1_text")
)

topics_df2 = segments_df.filter(
    fc.col("file_path").like("%meeting_2%")
).select(
    fc.col("text").alias("meeting_2_text")
)

# Semantic join to find related discussions
related = topics_df1.semantic.join(
    other=topics_df2,
    predicate="""
    Meeting 1: {{ left_on }}
    Meeting 2: {{ right_on }}
    These segments discuss the same topic or decision.
    """,
    left_on=fc.col("meeting_1_text"),
    right_on=fc.col("meeting_2_text")
)

Async UDFs for External APIs

Integrate third-party services with concurrent execution:

python
import fenic as fc
from fenic.core.types import StructType, StructField, StringType, IntegerType
import aiohttp

@fc.async_udf(
    return_type=StructType([
        StructField("entity_type", StringType),
        StructField("confidence", IntegerType)
    ]),
    max_concurrency=10,
    timeout_seconds=5,
    num_retries=2
)
async def call_entity_api(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/entities",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "entity_type": data.get("type"),
                "confidence": data.get("confidence")
            }

# Apply to transcript segments
enriched_df = segments_df.select(
    fc.col("text"),
    call_entity_api(fc.col("text")).alias("entity_info")
)

Performance Optimization

Batch Size Tuning

Fenic automatically batches inference calls. Control concurrency through model configuration:

python
config = SessionConfig(
    app_name="transcript_processor",
    semantic=SemanticConfig(
        language_models={
            "fast_model": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,  # Higher throughput
                tpm=50000
            )
        }
    )
)

Selective Processing

Filter before applying expensive operations:

python
# Only analyze segments longer than 10 words
filtered_df = segments_df.filter(
    fc.length(fc.split(fc.col("text"), " ")) > 10
).with_column(
    "summary",
    semantic.summarize(fc.col("text"))
)

Caching Results

Save intermediate results to avoid recomputation:

python
# Save enriched data
enriched_df.write.save_as_table("enriched_transcripts", mode="overwrite")

# Load for subsequent processing
cached_df = session.table("enriched_transcripts")

Metrics and Monitoring

Track inference costs and latency:

python
# Access built-in metrics
metrics = session.table("fenic_system.query_metrics")

metrics.select(
    fc.col("model"),
    fc.col("latency_ms"),
    fc.col("cost_usd"),
    fc.col("input_tokens"),
    fc.col("output_tokens")
).order_by(fc.col("cost_usd").desc()).show()

# Aggregate by operation
metrics.group_by("operation").agg(
    fc.count("*").alias("call_count"),
    fc.sum("cost_usd").alias("total_cost"),
    fc.avg("latency_ms").alias("avg_latency")
).show()

Production Deployment Patterns

ASGI Server Integration

Deploy MCP servers in production environments:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

server = create_mcp_server(
    session,
    "TranscriptServer",
    user_defined_tools=session.catalog.list_tools()
)

app = run_mcp_server_asgi(
    server,
    stateless_http=True,
    port=8000,
    host="0.0.0.0",
    path="/mcp"
)

# Launch with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

Error Handling

Handle failures gracefully in production pipelines:

python
# Add error columns
safe_df = segments_df.with_column(
    "classification",
    fc.when(
        fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 0),
        semantic.classify(fc.col("text"), ["Type A", "Type B"])
    ).otherwise(fc.lit("unclassified"))
)

# Log failures
failed_df = segments_df.filter(
    fc.col("text").is_null()
).select(
    fc.col("file_path"),
    fc.lit("Null text content").alias("error")
)

failed_df.write.csv("processing_errors.csv")

Example: Complete Transcript Analysis Pipeline

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig
from fenic.core.types.inference.openai import OpenAILanguageModel
import fenic.api.functions as fc
from fenic.api.functions import text, semantic
from pydantic import BaseModel, Field
from typing import List

# 1. Setup
config = SessionConfig(
    app_name="transcript_analysis",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=10000
            )
        }
    )
)
session = Session.get_or_create(config)

# 2. Load and parse transcripts
raw_df = session.read.docs(
    "transcripts/**/*.srt",
    content_type="markdown",
    recursive=True
)

parsed_df = raw_df.select(
    fc.col("file_path"),
    text.parse_transcript(fc.col("content"), "srt").alias("segments")
)

segments_df = parsed_df.select(
    fc.col("file_path"),
    fc.explode(fc.col("segments")).alias("seg")
).select(
    fc.col("file_path"),
    fc.col("seg.speaker").alias("speaker"),
    fc.col("seg.start_time").alias("start_time"),
    fc.col("seg.content").alias("text")
)

# 3. Enrich with semantic analysis
enriched_df = segments_df.with_column(
    "sentiment",
    semantic.analyze_sentiment(fc.col("text"))
).with_column(
    "category",
    semantic.classify(
        fc.col("text"),
        ["Technical", "Business", "Administrative", "Other"]
    )
)

# 4. Generate summaries per file
summaries_df = enriched_df.group_by("file_path").agg(
    fc.count("*").alias("segment_count"),
    semantic.reduce(
        "Create a concise summary of this conversation",
        fc.col("text"),
        order_by=fc.col("start_time")
    ).alias("summary")
)

# 5. Save results
enriched_df.write.save_as_table("processed_transcripts", mode="overwrite")
summaries_df.write.csv("transcript_summaries.csv")

# 6. Create MCP tools for LangChain
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

session.catalog.create_tool(
    tool_name="query_transcripts",
    tool_description="Search and filter processed transcripts",
    tool_query=session.table("processed_transcripts").filter(
        fc.col("category") == fc.tool_param("category", StringType)
    ).limit(fc.tool_param("limit", IntegerType)),
    tool_params=[
        ToolParam(name="category", description="Filter by category", default_value="Technical"),
        ToolParam(name="limit", description="Max results", default_value=20)
    ]
)

# 7. Launch MCP server
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

server = create_mcp_server(
    session,
    "TranscriptAnalysis",
    user_defined_tools=session.catalog.list_tools()
)

run_mcp_server_sync(server, transport="http", port=8000)

Resources

Conclusion

Fenic brings production-grade data processing to transcript analysis. By handling parsing, semantic enrichment, and batch inference within a DataFrame API, it provides the structured preprocessing layer that LangChain applications need.

The Model Context Protocol integration enables seamless agent access to transcript tools, while semantic operations eliminate custom prompt engineering for common tasks. For teams building LangChain applications at scale, Fenic offers a path to reliable, efficient transcript processing.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.