<< goback()

How to Handle Unstructured Inputs in LangChain Agents via Typedef.ai

Typedef Team

How to Handle Unstructured Inputs in LangChain Agents via Typedef.ai

LangChain agents require structured, clean data to function effectively. Raw unstructured inputs—PDFs, transcripts, CSVs, markdown files—cause agents to waste tokens on parsing, exceed context limits, and produce inconsistent outputs.

Typedef.ai's Fenic provides a DataFrame-based preprocessing layer that transforms unstructured data into structured, queryable formats before reaching your agents.

Why Unstructured Inputs Break Agent Workflows

Unstructured data introduces three critical problems:

Token inefficiency: Agents spend computational resources parsing formats rather than reasoning over content.

Context window violations: Raw documents exceed token limits, forcing manual chunking logic scattered across codebases.

Inconsistent results: Lack of schema validation produces unpredictable outputs that downstream systems cannot reliably consume.

Fenic addresses these issues with a PySpark-inspired DataFrame API where semantic operations are first-class citizens, not external bolt-ons.

Initial Setup

Install Fenic:

bash
pip install fenic

Configure your session with model credentials:

python
from fenic.api.session import Session, SessionConfig
from fenic.api.session.config import SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel

config = SessionConfig(
    app_name="langchain_preprocessing",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

The configuration handles rate limiting and request batching automatically.

Loading Documents

Fenic provides format-specific readers:

Markdown Files

python
from fenic.api.functions import col

docs_df = session.read.docs(
    "data/documents/**/*.md",
    content_type="markdown",
    recursive=True
)

docs_df = docs_df.filter(col("error").is_null())

CSV Files

python
csv_df = session.read.csv("data/*.csv")

With explicit schema:

python
from fenic.core.types.schema import Schema, ColumnField
from fenic.core.types.datatypes import StringType, IntegerType

csv_df = session.read.csv(
    "data/products.csv",
    schema=Schema([
        ColumnField(name="product_id", data_type=IntegerType),
        ColumnField(name="description", data_type=StringType)
    ])
)

PDF Metadata

python
pdf_metadata = session.read.pdf_metadata(
    "data/pdfs/**/*.pdf",
    recursive=True
)

relevant_pdfs = pdf_metadata.filter(col("page_count") < 100)

Text Chunking for RAG

Chunking strategies determine how effectively agents can retrieve relevant context.

Recursive Token Chunking

Splits at paragraph breaks and sentence boundaries:

python
from fenic.api.functions import text, col

chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=500,
        chunk_overlap_percentage=10
    ).alias("chunks")
)

chunked_df = chunked_df.select(
    col("file_path"),
    col("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Character-Based Chunking

Fixed-size chunks:

python
chunked_df = docs_df.select(
    col("file_path"),
    text.character_chunk(
        col("content"),
        chunk_size=1000,
        chunk_overlap_percentage=15
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Word-Based Chunking

Custom delimiters for domain-specific splitting:

python
chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_word_chunk(
        col("content"),
        chunk_size=200,
        chunk_overlap_percentage=10,
        chunking_character_set_custom_characters=['\n\n', '\n', '.', ' ']
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Embedding Generation

Generate embeddings with automatic batching:

python
from fenic.api.functions import semantic

embedded_df = chunked_df.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding")
)

embedded_df = embedded_df.with_column(
    text.count_tokens(col("chunk_text")).alias("token_count")
)

Fenic handles API rate limits and concurrent request batching without manual async code.

Schema-Driven Extraction

Extract structured data using Pydantic models:

python
from pydantic import BaseModel, Field
from typing import List

class Entity(BaseModel):
    name: str = Field(description="Entity name")
    type: str = Field(description="Entity type: person, organization, or location")

class ExtractedInfo(BaseModel):
    entities: List[Entity] = Field(description="Named entities found in text")
    summary: str = Field(description="Brief summary of the content")
    topics: List[str] = Field(description="Main topics discussed")

extracted_df = docs_df.select(
    col("file_path"),
    semantic.extract(
        col("content"),
        response_format=ExtractedInfo
    ).alias("extracted_data")
)

extracted_df = extracted_df.select(
    col("file_path"),
    col("extracted_data.summary").alias("summary"),
    col("extracted_data.entities").alias("entities"),
    col("extracted_data.topics").alias("topics")
)

This eliminates manual prompt engineering and validation logic.

Document Classification

Categorize documents into predefined classes:

python
from fenic.api.functions.semantic import ClassDefinition

categories = [
    ClassDefinition(
        label="Technical Documentation",
        description="API docs, architecture guides, technical specifications"
    ),
    ClassDefinition(
        label="Business Content",
        description="Marketing materials, business plans, proposals"
    ),
    ClassDefinition(
        label="Support Material",
        description="FAQs, troubleshooting guides, user support content"
    )
]

classified_df = docs_df.select(
    col("file_path"),
    col("content"),
    semantic.classify(
        col("content"),
        classes=categories
    ).alias("category")
)

Semantic Filtering

Filter using natural language predicates:

python
relevant_df = docs_df.filter(
    semantic.predicate(
        "This text discusses product features or roadmap planning: {{ text }}",
        text=col("content")
    )
)

Transcript Processing

Parsing Multiple Formats

Fenic supports SRT, WebVTT, and generic formats:

python
from fenic.api.functions import text

transcripts_df = session.read.docs(
    "transcripts/**/*.txt",
    content_type="markdown",
    recursive=True
)

parsed_df = transcripts_df.select(
    col("file_path"),
    text.parse_transcript(col("content"), "srt").alias("segments")
)

segments_df = parsed_df.select(
    col("file_path"),
    col("segments")
).explode("segments").select(
    col("file_path"),
    col("segments.index").alias("index"),
    col("segments.speaker").alias("speaker"),
    col("segments.start_time").alias("start_time"),
    col("segments.end_time").alias("end_time"),
    col("segments.content").alias("text")
)

Sentiment Analysis on Transcripts

python
sentiment_df = segments_df.with_column(
    "sentiment",
    semantic.analyze_sentiment(col("text"))
)

speaker_sentiment = sentiment_df.group_by("speaker").agg(
    fc.count("*").alias("segment_count"),
    fc.sum(
        fc.when(fc.col("sentiment") == "positive", 1).otherwise(0)
    ).alias("positive_count"),
    fc.sum(
        fc.when(fc.col("sentiment") == "negative", 1).otherwise(0)
    ).alias("negative_count")
)

Extracting Action Items

python
from pydantic import BaseModel, Field

class ActionItem(BaseModel):
    task: str = Field(description="The action to be taken")
    assignee: str = Field(description="Person responsible")
    deadline: str = Field(description="Due date if mentioned")

actions_df = segments_df.select(
    col("file_path"),
    col("text"),
    semantic.extract(
        col("text"),
        ActionItem
    ).alias("action_item")
).filter(
    col("action_item").is_not_null()
)

Building Complete RAG Pipelines

Combine operations into a production pipeline:

python
from fenic.api.functions import dt
import fenic.api.functions as fc

# Load documents
raw_docs = session.read.docs("data/docs/**/*.md", content_type="markdown", recursive=True)

# Clean text
cleaned_docs = raw_docs.select(
    col("file_path"),
    text.trim(col("content")).alias("content")
).filter(col("content") != "")

# Chunk documents
chunked = cleaned_docs.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=400,
        chunk_overlap_percentage=10
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

# Generate embeddings
embedded = chunked.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding"),
    text.count_tokens(col("chunk_text")).alias("tokens")
)

# Add metadata
final_df = embedded.with_column("processed_at", dt.current_timestamp())

# Save results
final_df.write.save_as_table("processed_documents", mode="overwrite")

Semantic Search Operations

Perform similarity search in DataFrames:

python
from fenic.api.functions import embedding

query_text = "machine learning best practices"
query_df = session.create_dataframe([{"query": query_text}])
query_embedding = query_df.select(
    semantic.embed(col("query")).alias("query_emb")
).collect()[0]["query_emb"]

results_df = embedded_df.select(
    col("file_path"),
    col("chunk_text"),
    embedding.compute_similarity(
        col("embedding"),
        query_embedding,
        metric="cosine"
    ).alias("similarity_score")
)

top_results = results_df.order_by(
    col("similarity_score").desc()
).limit(10)

MCP Tool Creation

Create tools using Model Context Protocol for LangChain agent integration:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

search_query = final_df.filter(
    col("file_path").contains(
        fc.tool_param("path_filter", StringType)
    ) &
    (col("tokens") >= fc.tool_param("min_tokens", IntegerType))
).select(
    col("file_path"),
    col("chunk_text"),
    col("tokens")
)

session.catalog.create_tool(
    tool_name="search_documents",
    tool_description="Search processed documents by path and minimum token count",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="path_filter",
            description="Filter documents by path substring"
        ),
        ToolParam(
            name="min_tokens",
            description="Minimum number of tokens per chunk",
            default_value=100,
            has_default=True
        )
    ],
    result_limit=50
)

MCP Server Deployment

Synchronous HTTP Server

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="DocumentDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production ASGI Deployment

python
from fenic.api.mcp import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with: uvicorn app:app --host 0.0.0.0 --port 8000

CLI Deployment

Use the fenic-serve command:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_documents

# Use stdio transport
fenic-serve --transport stdio

LangChain Integration

Export processed data for LangChain consumption:

python
# Export to CSV
final_df.write.csv("output/chunks_with_embeddings.csv")

# Convert to pandas
pandas_df = final_df.to_pandas()

# Create LangChain documents
from langchain.schema import Document

documents = [
    Document(
        page_content=row["chunk_text"],
        metadata={
            "file_path": row["file_path"],
            "chunk_id": row["chunk_id"],
            "tokens": row["tokens"]
        }
    )
    for row in pandas_df.to_dict('records')
]

Semantic Joins

Join datasets based on meaning:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python experience",
    right="Backend Developer - Python/Go",
    output=True
))
examples.create_example(JoinExample(
    left="Marketing Manager with social media expertise",
    right="Senior Software Engineer",
    output=False
))

matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
    Job Requirements: {{ left_on }}
    Candidate Skills: {{ right_on }}
    The candidate meets the core requirements for this role.
    '''),
    left_on=col("job_requirements"),
    right_on=col("candidate_skills"),
    examples=examples
)

Semantic Clustering

Group similar items using embeddings:

python
df_with_embeddings = df.select(
    col("ticket_id"),
    col("description"),
    semantic.embed(col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

cluster_summary = clustered_df.group_by("cluster_id").agg(
    fc.count("*").alias("ticket_count"),
    fc.first(col("description")).alias("sample_description")
)

Performance Optimization

Batch Processing

Process large datasets in chunks:

python
large_docs = session.read.docs("data/large_corpus/**/*.md", recursive=True)

batch_size = 1000
total_rows = large_docs.count()

for offset in range(0, total_rows, batch_size):
    batch = large_docs.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        col("file_path"),
        semantic.embed(col("content")).alias("embedding")
    )

    processed_batch.write.parquet(
        f"output/batch_{offset}.parquet",
        mode="overwrite"
    )

Token Monitoring

Track usage and costs:

python
import fenic.api.functions as fc

metrics_df = embedded_df.select(
    fc.sum(col("token_count")).alias("total_tokens"),
    fc.count("*").alias("total_chunks"),
    fc.avg(col("token_count")).alias("avg_tokens_per_chunk")
)

metrics_df.show()

Caching Operations

python
embeddings_df = documents_df.select(
    col("doc_id"),
    semantic.embed(col("content")).alias("embeddings")
)

embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

cached_embeddings = session.table("document_embeddings")

Error Handling

Separate successful and failed operations:

python
from fenic.api.functions.builtin import when, lit

processed_df = docs_df.with_column(
    fc.when(col("error").is_null(), True)
    .otherwise(False)
    .alias("is_valid")
)

success_df = processed_df.filter(col("is_valid"))
failed_df = processed_df.filter(~col("is_valid"))

failed_df.select(col("file_path"), col("error")).show()

Incremental Processing

Track processed documents:

python
processed_df.write.save_as_table("processed_documents", mode="append")

existing = session.table("processed_documents")
existing_paths = existing.select(col("file_path")).distinct().collect()
processed_paths = {row["file_path"] for row in existing_paths}

new_docs = docs_df.filter(
    ~col("file_path").isin(list(processed_paths))
)

Vector Database Export

Prepare data for vector database ingestion:

python
from fenic.api.functions.builtin import struct

export_df = embedded_df.select(
    col("chunk_id"),
    col("chunk_text"),
    col("embedding"),
    fc.struct(
        col("file_path"),
        col("tokens")
    ).alias("metadata")
)

export_df.write.parquet("output/vector_db_ready.parquet")

SQL Interface

Run SQL queries on DataFrames:

python
results = session.sql("""
    SELECT file_path,
           COUNT(*) as chunk_count,
           AVG(tokens) as avg_tokens,
           MAX(similarity_score) as max_similarity
    FROM {chunks}
    WHERE tokens BETWEEN 100 AND 500
    GROUP BY file_path
    HAVING chunk_count > 5
    ORDER BY max_similarity DESC
""", chunks=results_df)

Template-Based Extraction

Extract data from formatted text:

python
log_df = session.create_dataframe([
    {"log": "2024-01-15 ERROR Connection failed to server"},
    {"log": "2024-01-15 INFO Request processed successfully"}
])

parsed_logs = log_df.select(
    text.extract(
        col("log"),
        template="${date} ${level} ${message}"
    ).alias("parsed")
)

parsed_logs = parsed_logs.select(
    col("parsed.date").alias("date"),
    col("parsed.level").alias("level"),
    col("parsed.message").alias("message")
)

Jinja Template Rendering

Generate prompts dynamically:

python
from fenic.api.functions.builtin import lit

prompt_df = chunked_df.select(
    text.jinja(
        """Answer the following question based on the context:

Context: {{ context }}

Question: {{ question }}

Provide a detailed answer based solely on the context provided.""",
        context=col("chunk_text"),
        question=fc.lit("What are the main concepts discussed?")
    ).alias("prompt")
)

Production Example

Complete implementation:

python
from fenic.api.session import Session, SessionConfig
from fenic.api.session.config import SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# Configure session
config = SessionConfig(
    app_name="document_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# Load data
docs_df = session.read.docs("./docs/**/*.md", content_type="markdown", recursive=True)

# Define schema
class DocumentInsight(BaseModel):
    summary: str = Field(description="Brief summary of the document")
    key_topics: List[str] = Field(description="Main topics discussed")
    sentiment: str = Field(description="Overall sentiment")

# Process
processed = docs_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=DocumentInsight).alias("insights")
)

# Save
processed.write.save_as_table("processed_docs", mode="overwrite")

# Create tool
search_query = session.table("processed_docs").filter(
    fc.col("insights")["key_topics"].contains(
        fc.tool_param("topic", StringType)
    )
)

session.catalog.create_tool(
    tool_name="search_by_topic",
    tool_description="Find documents discussing specific topics",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="topic",
            description="Topic to search for in documents"
        )
    ],
    result_limit=20
)

# Deploy
tools = session.catalog.list_tools()
server = create_mcp_server(
    session=session,
    server_name="DocumentIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Configuration Best Practices

Chunk Size Selection

  • 8K context models: 400 tokens, 10% overlap
  • 128K context models: 2000 tokens, 5% overlap

Model Selection

Configure tiers for cost optimization:

python
language_models = {
    "fast": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000),
    "balanced": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100000, output_tpm=50000),
    "accurate": fc.OpenAILanguageModel(model_name="gpt-4o", rpm=100, tpm=100000)
}

Use appropriate models per task:

python
df.select("*", fc.semantic.map(
    "Classify {{ text }}",
    text=fc.col("text"),
    model_alias="fast"
).alias("category"))

df.select("*", fc.semantic.extract(
    fc.col("complex_doc"),
    Schema,
    model_alias="accurate"
).alias("extracted"))

Key Implementation Points

Fenic provides LangChain agents with structured preprocessing through:

  • DataFrame abstraction for unstructured data handling
  • Native text operations for chunking and parsing
  • Automatic batching and rate limiting
  • Type-safe extraction via Pydantic schemas
  • MCP integration for seamless tool creation
  • Built-in monitoring and lineage tracking

The separation of data preparation from agent reasoning reduces costs, improves reliability, and simplifies maintenance at scale.

Additional Resources

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.