<< goback()

How to Leverage Fenic DataFrames in LangChain for Multimodal Data Integration

Typedef Team

How to Leverage Fenic DataFrames in LangChain for Multimodal Data Integration

Building production LLM applications means handling diverse data types simultaneously—text documents, audio transcripts, images, structured databases, and video content. LangChain provides orchestration for agent logic, but managing multimodal data at scale requires specialized infrastructure. Fenic offers a DataFrame-based data layer that preprocesses, enriches, and structures multimodal content before agents consume it.

The Multimodal Data Infrastructure Problem

Most valuable enterprise data exists across multiple modalities. Text documents come in markdown, PDFs, HTML, and plain text. Audio files need transcription with speaker identification. Images require vision model analysis. Structured data lives in CSVs, JSON, and databases. Video combines temporal, visual, and audio components.

Traditional data stacks fail at multimodal AI workloads. Query engines treat LLM calls as external black boxes, preventing optimization. Rate limits across model providers require manual coordination. Context window constraints force custom chunking logic. Different modalities need separate processing pipelines. The result: fragile glue code connecting OCR services, transcription APIs, vision models, LLM providers, vector databases, and warehouses.

Fenic solves this through an inference-first architecture where semantic operations are native DataFrame primitives rather than external functions.

Why Fenic for Multimodal LangChain Applications

Fenic provides a PySpark-inspired DataFrame API built specifically for AI workloads. The framework handles three critical capabilities:

Native multimodal types extend beyond standard data formats. MarkdownType parses document structure with operations like header-based chunking. TranscriptType handles SRT, WebVTT, and generic formats with speaker and timestamp awareness. JsonType enables JQ expressions for nested data manipulation. EmbeddingType represents vectors with semantic similarity operations. DocumentPathType loads PDFs and text files directly.

Semantic operations as DataFrame primitives include extraction with Pydantic schemas, classification with few-shot learning, clustering using embeddings, semantic joins based on meaning, and filtering with natural language predicates. The query engine understands when inference happens, enabling automatic batching, intelligent caching, and cost optimization.

Multi-provider orchestration offers unified interfaces for OpenAI, Anthropic, Google, and other providers with declarative rate limiting, automatic retry logic, and provider-specific fallbacks.

The recommended pattern separates concerns. Fenic handles batch preprocessing—loading multimodal data, extracting structure, generating embeddings, and preparing clean context. LangChain agents receive structured data and focus on planning and decision-making.

Session Configuration for Multimodal Workloads

Install Fenic and configure providers:

bash
pip install fenic

Set up a session with multiple model tiers:

python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    AnthropicLanguageModel,
    OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="multimodal_integration",
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "accurate": AnthropicLanguageModel(
                model_name="claude-opus-4-0",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            ),
            "reasoning": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=100,
                tpm=100_000
            )
        },
        default_language_model="fast",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100_000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

This configuration establishes model tiers for cost optimization—use fast models for classification and extraction, reserve accurate models for complex reasoning.

Loading Multimodal Data Sources

Fenic provides specialized readers for different data types:

Text Documents

python
import fenic.api.functions as fc
from fenic.api.functions import col

# Markdown files
docs_df = session.read.docs(
    "data/documents/**/*.md",
    content_type="markdown",
    recursive=True
)

# Filter successful loads
docs_df = docs_df.filter(col("error").is_null())

Structured Data

python
from fenic.core.types.schema import Schema, ColumnField
from fenic.core.types.datatypes import StringType, IntegerType

# CSV with explicit schema
csv_df = session.read.csv(
    "data/products.csv",
    schema=Schema([
        ColumnField(name="product_id", data_type=IntegerType),
        ColumnField(name="description", data_type=StringType),
        ColumnField(name="category", data_type=StringType)
    ])
)

# JSON files
json_df = session.read.json("data/logs/*.json")

Audio Transcripts

Fenic's TranscriptType handles multiple formats with unified parsing:

python
from fenic.api.functions import text

# Load transcript files
transcript_df = session.read.docs(
    "transcripts/**/*.srt",
    content_type="markdown",
    recursive=True
)

# Parse SRT format
parsed_df = transcript_df.select(
    col("file_path"),
    text.parse_transcript(col("content"), "srt").alias("segments")
)

# Explode into individual segments
segments_df = parsed_df.select(
    col("file_path"),
    fc.explode(col("segments")).alias("segment")
).select(
    col("file_path"),
    col("segment.speaker").alias("speaker"),
    col("segment.start_time").alias("start_time"),
    col("segment.end_time").alias("end_time"),
    col("segment.content").alias("text")
)

HuggingFace Datasets

Fenic 0.4.0 added direct HuggingFace integration:

python
# Load from HuggingFace
hf_df = session.read.csv("hf://datasets/squad/default/train.csv")

# Parquet files
hf_parquet = session.read.parquet(
    "hf://datasets/cais/mmlu/astronomy/*.parquet"
)

PDF Metadata

python
pdf_metadata = session.read.pdf_metadata(
    "data/pdfs/**/*.pdf",
    recursive=True
)

# Filter by characteristics
relevant_pdfs = pdf_metadata.filter(
    (col("page_count") < 100) &
    (col("page_count") > 10)
)

Text Chunking for Multimodal Context

Proper chunking maintains relationships across modalities. Fenic provides three strategies:

Recursive Token Chunking

Split at natural boundaries while respecting token limits:

python
from fenic.api.functions import text

chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=500,
        chunk_overlap_percentage=10
    ).alias("chunks")
)

# Expand chunks into rows
chunked_df = chunked_df.select(
    col("file_path"),
    col("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Character-Based Chunking

Fixed-size chunks for consistent processing:

python
chunked_df = docs_df.select(
    col("file_path"),
    text.character_chunk(
        col("content"),
        chunk_size=1000,
        chunk_overlap_percentage=15
    ).alias("chunks")
)

# Expand chunks into rows
chunked_df = chunked_df.select(
    col("file_path"),
    col("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Word-Based Chunking

Control chunk boundaries with custom delimiters:

python
chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_word_chunk(
        col("content"),
        chunk_size=200,
        chunk_overlap_percentage=10,
        chunking_character_set_custom_characters=['\n\n', '\n', '.', ' ']
    ).alias("chunks")
).explode("chunks")

Semantic Extraction Across Modalities

Extract structured data from unstructured multimodal content using Pydantic schemas:

python
from pydantic import BaseModel, Field
from typing import List, Literal
from fenic.api.functions import semantic

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key features")
    category: Literal["electronics", "furniture", "clothing", "other"]

# Extract from text descriptions
product_df = session.read.csv("product_descriptions.csv")

extracted_df = product_df.select(
    col("product_id"),
    semantic.extract(
        col("description"),
        response_format=ProductInfo
    ).alias("product_data")
)

# Flatten nested structure
structured_df = extracted_df.select(
    col("product_id"),
    col("product_data.name").alias("name"),
    col("product_data.price").alias("price"),
    col("product_data.features").alias("features"),
    col("product_data.category").alias("category")
)

Meeting Transcript Analysis

Extract action items and decisions from audio transcripts:

python
class MeetingInsight(BaseModel):
    speaker: str
    key_points: List[str]
    action_items: List[str]
    sentiment: Literal["positive", "neutral", "negative"]

meeting_analysis = segments_df.with_column(
    "analysis",
    semantic.extract(col("text"), MeetingInsight)
).unnest("analysis")

# Filter segments with action items
actionable = meeting_analysis.filter(
    fc.length(col("action_items")) > 0
)

# Aggregate by speaker
from fenic.api.functions.builtin import collect_list

speaker_summary = actionable.group_by("speaker").agg(
    collect_list("action_items").alias("all_actions"),
    collect_list("key_points").alias("all_points")
)

Embedding Generation for Multimodal Search

Generate embeddings for semantic similarity across data types:

python
from fenic.api.functions import semantic, text

# Generate embeddings for text chunks
embedded_df = chunked_df.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding")
)

# Add token counting
embedded_df = embedded_df.with_column(
    text.count_tokens(col("chunk_text")).alias("token_count")
)

Semantic Similarity Search

Query embeddings to find relevant content:

python
from fenic.api.functions import embedding

# Generate query embedding
query_text = "machine learning deployment strategies"
query_df = session.create_dataframe([{"query": query_text}])

query_embedding = query_df.select(
    semantic.embed(col("query")).alias("query_emb")
).collect()[0]["query_emb"]

# Compute similarity scores
results_df = embedded_df.select(
    col("file_path"),
    col("chunk_text"),
    embedding.compute_similarity(
        col("embedding"),
        query_embedding,
        metric="cosine"
    ).alias("similarity_score")
)

# Get top matches
top_results = results_df.order_by(
    col("similarity_score").desc()
).limit(10)

Classification for Multimodal Content

Classify content across different modalities:

python
from fenic.api.functions.semantic import ClassDefinition

categories = [
    ClassDefinition(
        label="Technical Documentation",
        description="API docs, architecture guides, technical specifications"
    ),
    ClassDefinition(
        label="Business Content",
        description="Marketing materials, business plans, proposals"
    ),
    ClassDefinition(
        label="Support Material",
        description="FAQs, troubleshooting guides, user support content"
    ),
    ClassDefinition(
        label="Training Content",
        description="Tutorials, educational materials, onboarding guides"
    )
]

classified_df = docs_df.select(
    col("file_path"),
    col("content"),
    semantic.classify(
        col("content"),
        classes=categories
    ).alias("category")
)

Transcript Classification

Categorize conversation segments:

python
# Classify support call segments
call_types = ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"]

classified_segments = segments_df.with_column(
    "call_type",
    semantic.classify(col("text"), call_types)
)

# Analyze distribution
from fenic.api.functions.builtin import count

type_distribution = classified_segments.group_by("call_type").agg(
    count("*").alias("segment_count")
).order_by(col("segment_count").desc())

Semantic Joins for Cross-Modal Relationships

Join datasets based on meaning rather than exact matches:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

# Create examples
examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL",
    right="Backend Developer - Python/Go",
    output=True
))
examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

# Semantic join between job postings and candidate profiles
matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
        Job Requirements: {{ left_on }}
        Candidate Skills: {{ right_on }}
        The candidate meets the core requirements for this role.
    '''),
    left_on=col("job_requirements"),
    right_on=col("candidate_skills"),
    examples=examples
)

Cross-Transcript Analysis

Link related discussions across different meeting transcripts:

python
# Extract topics from meeting 1
meeting1_df = segments_df.filter(
    col("file_path").like("%meeting_1%")
).select(
    col("text").alias("meeting1_text")
)

# Extract topics from meeting 2
meeting2_df = segments_df.filter(
    col("file_path").like("%meeting_2%")
).select(
    col("text").alias("meeting2_text")
)

# Find related discussions
related_discussions = meeting1_df.semantic.join(
    other=meeting2_df,
    predicate="""
        Meeting 1: {{ left_on }}
        Meeting 2: {{ right_on }}
        These segments discuss the same topic or decision.
    """,
    left_on=col("meeting1_text"),
    right_on=col("meeting2_text")
)

Semantic Clustering for Pattern Detection

Group similar content across modalities:

python
# Generate embeddings
df_with_embeddings = docs_df.select(
    col("document_id"),
    col("content"),
    semantic.embed(col("content")).alias("embeddings")
)

# Cluster documents
clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

# Analyze clusters
from fenic.api.functions.builtin import first

cluster_summary = clustered_df.group_by("cluster_id").agg(
    count("*").alias("document_count"),
    first(col("content")).alias("sample_content")
)

Building Complete Multimodal Pipelines

Combine operations into end-to-end workflows:

python
from fenic.api.functions import text, col, semantic, dt
from fenic.api.functions.builtin import md5

# Load multimodal data
raw_docs = session.read.docs(
    "data/docs/**/*.md",
    content_type="markdown",
    recursive=True
)

# Complete pipeline
final_df = (
    raw_docs
    # Clean text
    .select(
        col("file_path"),
        text.trim(col("content")).alias("content")
    )
    .filter(col("content") != "")
    # Chunk documents
    .select(
        col("file_path"),
        text.recursive_token_chunk(
            col("content"),
            chunk_size=400,
            chunk_overlap_percentage=10
        ).alias("chunks")
    )
    .explode("chunks")
    .select(
        col("file_path"),
        col("chunks").alias("chunk_text")
    )
    # Generate embeddings
    .with_column(
        "embedding",
        semantic.embed(col("chunk_text"))
    )
    # Add metadata
    .with_column(
    "chunk_id",
    text.count_tokens(col("chunk_text"))  # Use a proper hash or ID generation
)
.with_column(
    "tokens",
    text.count_tokens(col("chunk_text"))
)
.with_column(
    "processed_at",
    dt.current_timestamp()
)
)

# Persist results
final_df.write.save_as_table("multimodal_index", mode="overwrite")

Exporting to LangChain

Convert Fenic-processed data for LangChain consumption:

python
# Save as table
final_df.write.save_as_table("processed_documents", mode="overwrite")

# Retrieve for LangChain
processed_data = session.table("processed_documents")

# Convert to pandas
pandas_df = processed_data.to_pandas()

# Create LangChain documents
from langchain.schema import Document

documents = [
    Document(
        page_content=row["chunk_text"],
        metadata={
            "file_path": row["file_path"],
            "chunk_id": row["chunk_id"],
            "tokens": row["tokens"],
            "embedding": row["embedding"]
        }
    )
    for row in pandas_df.to_dict('records')
]

Export to Vector Databases

Prepare data for Pinecone, Weaviate, or similar stores:

python
from fenic.api.functions.builtin import struct

export_df = final_df.select(
    col("chunk_id"),
    col("chunk_text"),
    col("embedding"),
    struct(
        col("file_path"),
        col("tokens"),
        col("processed_at")
    ).alias("metadata")
)

# Export to parquet
export_df.write.parquet("output/vector_db_ready.parquet")

# Or collect for direct insertion
vector_records = export_df.collect()

for record in vector_records:
    vector_db.upsert(
        id=record["chunk_id"],
        values=record["embedding"],
        metadata={
            "text": record["chunk_text"],
            "file_path": record["metadata"]["file_path"],
            "tokens": record["metadata"]["tokens"]
        }
    )

Model Context Protocol Integration

MCP enables declarative tool creation for LangChain agents:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

# Create parameterized search tool
search_query = session.table("multimodal_index").filter(
    col("chunk_text").contains(
        fc.tool_param("search_term", StringType)
    )
).limit(fc.tool_param("limit", IntegerType))

session.catalog.create_tool(
    tool_name="search_documents",
    tool_description="Search processed documents by keyword",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="search_term",
            description="Keyword or phrase to search for"
        ),
        ToolParam(
            name="limit",
            description="Maximum results to return",
            default_value=10,
            has_default=True
        )
    ],
    result_limit=50
)

Multi-Step Semantic Tools

Build tools that chain semantic operations:

python
# Load support tickets
tickets_df = session.read.csv("./data/support_tickets.csv")

# Process with semantic operations
processed_tickets = (
    tickets_df
    .select(
        col("ticket_id"),
        col("subject"),
        col("description"),
        semantic.embed(col("description")).alias("embeddings")
    )
    .semantic.with_cluster_labels(
        by=col("embeddings"),
        num_clusters=15,
        label_column="category_cluster"
    )
)

# Create similarity search tool
similar_tickets_query = processed_tickets.filter(
    embedding.compute_similarity(
        col("embeddings"),
        fc.tool_param("query_vector", col("embeddings").data_type),
        metric="cosine"
    ) > 0.7
).select(
    col("ticket_id"),
    col("subject"),
    col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a query",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of search query"
        )
    ],
    result_limit=10
)

Deploying MCP Servers

Launch MCP servers for LangChain agent access:

Synchronous Server

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="MultimodalDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

python
from fenic.api.mcp import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000

Async UDFs for External Services

Async UDFs enable concurrent API calls while maintaining DataFrame semantics:

python
import aiohttp
import fenic.api.functions as fc
from fenic.api.functions.builtin import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

# Apply to DataFrame
enriched_df = reviews_df.select(
    col("review_id"),
    col("review_text"),
    analyze_sentiment(col("review_text")).alias("sentiment_analysis")
)

Performance Optimization

Batch Processing

Control memory and API usage through batching:

python
large_df = session.read.docs("data/large_corpus/**/*.md", recursive=True)

# Process the entire DataFrame
processed_df = large_df.select(
    col("file_path"),
    semantic.embed(col("content")).alias("embedding")
)

# Write results
processed_df.write.parquet(
    "output/embeddings.parquet",
    mode="overwrite"
)

Caching Expensive Operations

Materialize intermediate results:

python
# Cache expensive semantic operations
embeddings_df = documents_df.select(
    col("doc_id"),
    semantic.embed(col("content")).alias("embeddings")
)

# Persist to catalog
embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

# Reuse in multiple tools
cached_embeddings = session.table("document_embeddings")

Rate Limit Management

Configure per-model limits in session config:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500_000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100_000
            )
        },
        default_language_model="fast"
    )
)

Monitoring and Metrics

Track query performance and costs:

python
# Query metrics table
metrics_df = session.table("fenic_system.query_metrics")

# Analyze recent queries
recent_queries = metrics_df.select(
    col("query_id"),
    col("total_lm_cost"),
    col("total_lm_requests"),
    col("end_ts")
).order_by(col("end_ts").desc()).limit(10)

recent_queries.show()

# Aggregate costs by time window
from fenic.api.functions.builtin import sum as sum_func

cost_analysis = session.sql("""
    SELECT
        DATE_TRUNC('hour', CAST(end_ts AS TIMESTAMP)) as hour,
        SUM(total_lm_cost) as total_cost,
        SUM(total_lm_requests) as total_requests
    FROM {metrics}
    GROUP BY hour
    ORDER BY hour DESC
""", metrics=metrics_df)

cost_analysis.show()

Production Best Practices

Chunk Size Selection

Choose sizes based on model context windows:

python
# For 8K context models
chunk_size = 400  # tokens
overlap = 10  # percent

# For 128K context models
chunk_size = 2000  # tokens
overlap = 5  # percent

Error Handling

Handle processing failures gracefully:

python
from fenic.api.functions.builtin import when, lit

processed_df = docs_df.with_column(
    "is_valid",
    when(col("error").is_null(), True).otherwise(False)
)

# Separate successful and failed
success_df = processed_df.filter(col("is_valid"))
failed_df = processed_df.filter(~col("is_valid"))

# Log failures
failed_df.select(
    col("file_path"),
    col("error")
).write.csv("processing_errors.csv")

Incremental Processing

Avoid reprocessing existing data:

python
# Save processing state
processed_df.write.save_as_table("processed_documents", mode="append")

# Load existing paths
existing = session.table("processed_documents")
existing_paths = existing.select(col("file_path")).distinct().collect()
processed_paths = {row["file_path"] for row in existing_paths}

# Filter new documents only
new_docs = docs_df.filter(
    ~col("file_path").isin(list(processed_paths))
)

Complete Integration Example

End-to-end multimodal pipeline with LangChain integration:

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType
import fenic.api.functions as fc
from fenic.api.functions import semantic, text
from pydantic import BaseModel, Field
from typing import List

# Configure session
config = SessionConfig(
    app_name="multimodal_integration",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100_000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100_000
            )
        }
    )
)

session = Session.get_or_create(config)

# Load multimodal data
docs_df = session.read.docs("data/**/*.md", content_type="markdown", recursive=True)
transcripts_df = session.read.docs("transcripts/**/*.srt", content_type="markdown", recursive=True)

# Define extraction schema
class ContentInsight(BaseModel):
    summary: str = Field(description="Brief content summary")
    topics: List[str] = Field(description="Main topics")
    sentiment: str = Field(description="Overall sentiment")

# Process documents
processed_docs = (
    docs_df
    .select(
        fc.col("file_path"),
        text.recursive_token_chunk(
            fc.col("content"),
            chunk_size=400,
            chunk_overlap_percentage=10
        ).alias("chunks")
    )
    .explode("chunks")
    .select(
        fc.col("file_path"),
        fc.col("chunks").alias("chunk_text")
    )
    .with_column(
        "embedding",
        semantic.embed(fc.col("chunk_text"))
    )
    .with_column(
        "insights",
        semantic.extract(fc.col("chunk_text"), ContentInsight)
    )
)

# Process transcripts
parsed_transcripts = (
    transcripts_df
    .select(
        fc.col("file_path"),
        text.parse_transcript(fc.col("content"), "srt").alias("segments")
    )
    .explode("segments")
    .select(
        fc.col("file_path"),
        fc.col("segments.speaker").alias("speaker"),
        fc.col("segments.content").alias("text")
    )
    .with_column(
        "category",
        semantic.classify(
            fc.col("text"),
            ["Question", "Answer", "Action Item", "Discussion"]
        )
    )
)

# Save to catalog
processed_docs.write.save_as_table("document_index", mode="overwrite")
parsed_transcripts.write.save_as_table("transcript_index", mode="overwrite")

# Create MCP tools
session.catalog.create_tool(
    tool_name="search_content",
    tool_description="Search across documents and transcripts",
    tool_query=session.table("document_index").filter(
        fc.col("chunk_text").contains(
            fc.tool_param("query", StringType)
        )
    ),
    tool_params=[
        ToolParam(name="query", description="Search query")
    ],
    result_limit=20
)

# Deploy MCP server
tools = session.catalog.list_tools()
server = create_mcp_server(
    session=session,
    server_name="MultimodalServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Key Takeaways

Fenic provides production-ready infrastructure for multimodal data integration in LangChain applications. The framework handles text, audio, structured data, and embeddings through a unified DataFrame API with native semantic operations.

By separating batch preprocessing from real-time agent execution, teams achieve faster response times, lower costs, and better debuggability. The declarative approach enables automatic optimization across model providers while maintaining familiar DataFrame semantics.

Integration with LangChain occurs through standard data formats, the Model Context Protocol, or direct programmatic access. This flexibility lets teams structure agentic workflows based on specific requirements rather than infrastructure constraints.

For teams building multimodal AI applications, Fenic eliminates fragile glue code connecting different services and APIs. The result is more maintainable, scalable infrastructure that brings structure to probabilistic AI systems.

Explore Fenic on GitHub, review integration guides, and learn about building reliable AI pipelines. How to Leverage Fenic DataFr ... efcf080ffbcd7fab66cc38f07.md External Displaying How to Leverage Fenic DataFrames in LangChain for 28fdf41efcf080ffbcd7fab66cc38f07.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.