<< goback()

How to Build Scalable Semantic Search in AI Data Platforms

Typedef Team

How to Build Scalable Semantic Search in AI Data Platforms

Most organizations fail to scale semantic search beyond prototypes. Traditional data platforms lack the architecture needed for inference workloads, resulting in brittle implementations that can't handle production traffic. This guide shows how to build production-grade semantic search using inference-first architecture.

Inference-First Architecture for Semantic Search

Traditional approaches retrofit LLM capabilities onto SQL databases or DataFrame libraries built for structured data. The query optimizer can't see inference operations, making optimization impossible.

An inference-first architecture embeds semantic operations directly into the query engine as first-class primitives. When the optimizer recognizes semantic.extract() or semantic.join() as inference operations with specific characteristics—high latency, token costs, batching requirements—it applies optimizations impossible with external API calls.

Core architectural requirements:

  • Declarative operations for global optimization
  • Semantic operators as DataFrame primitives
  • Multi-provider abstraction without vendor lock-in
  • Native AI data types (Markdown, JSON, transcripts, embeddings)

Fenic implements this architecture by treating inference as first-class operations within a PySpark-inspired DataFrame framework.

Semantic Operators for Search Infrastructure

Build semantic search with operators that transform unstructured data into searchable formats.

Schema-Driven Extraction

Extract structured data using Pydantic schemas with semantic.extract:

python
from pydantic import BaseModel, Field
from typing import List, Literal
import fenic as fc

class DocumentMetadata(BaseModel):
    title: str = Field(description="Document title or heading")
    category: Literal["technical", "business", "legal", "support"]
    key_topics: List[str] = Field(description="Main topics covered")
    entities: List[str] = Field(description="Named entities mentioned")

structured = (
    documents
    .with_column(
        "metadata",
        fc.semantic.extract("raw_content", DocumentMetadata, model_alias="mini")
    )
    .unnest("metadata")
)

Define output structure once and get validated results consistently. After extraction, use standard DataFrame operations on structured fields.

Embedding Generation

Generate vector representations with semantic.embed:

python
searchable = (
    documents
    .with_column("raw_md", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks("raw_md", header_level=2)
    )
    .explode("chunks")
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("chunks").content)
    )
    .with_column(
        "metadata",
        fc.semantic.extract("chunks", DocumentMetadata, model_alias="cheap")
    )
)

searchable.write.parquet("s3://data-lake/searchable/")

The framework batches requests efficiently across OpenAI, Cohere, and Google providers.

Semantic Joins

Match based on meaning rather than exact values with semantic.join:

python
prompt = """
Does this document answer the user's question?
Question: {{left_on}}
Document: {{right_on}}
Consider topic relevance, completeness, and accuracy.
"""

relevant_docs = (
    queries.semantic.join(
        other=document_index,
        predicate=prompt,
        left_on=fc.col("user_query"),
        right_on=fc.col("doc_summary"),
        model_alias="fast"
    )
    .limit(10)
)

The framework optimizes joins through batching and embedding-based candidate filtering before expensive LLM verification.

Natural Language Filtering

Filter by content with semantic.predicate:

python
technical_issues = documents.filter(
    (fc.col("category") == "support") &
    fc.semantic.predicate(
        "Does this describe a technical problem? Content: {{content}}",
        content=fc.col("raw_text"),
        model_alias="nano"
    )
)

The query optimizer applies cheap conditions before expensive semantic predicates.

Document Processing Pipeline

Process large document corpora efficiently through batch pipelines rather than per-document LLM calls.

Structure-Aware Chunking

Chunk documents based on structure:

python
processed = (
    raw_docs
    .with_column("parsed_md", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks(
            "parsed_md",
            header_level=2
        )
    )
    .explode("sections")
)

Structure-aware chunking preserves section context and avoids mid-sentence splits, improving search quality significantly.

Multi-Provider Configuration

Configure models for specific tasks:

python
config = fc.SessionConfig(
    app_name="semantic_search",
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                "gpt-4o-nano",
                rpm=500,
                tpm=200_000
            ),
            "mini": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=300,
                tpm=150_000
            ),
            "flash": fc.GoogleVertexLanguageModel(
                "gemini-2.0-flash",
                rpm=300,
                tpm=150_000
            ),
            "powerful": fc.AnthropicLanguageModel(
                "claude-opus-4-0",
                rpm=100,
                input_tpm=100,
                output_tpm=100
            )
        },
        default_language_model="flash",
    ),
)

session = fc.Session.get_or_create(config)

Use cheap models for classification, reserve expensive models for tasks requiring accuracy. Switch providers without code changes using model aliases.

Building Production Search Indexes

Combine extraction, embedding, and metadata enrichment in optimized pipelines:

python
from pydantic import BaseModel, Field

class SearchMetadata(BaseModel):
    title: str
    summary: str = Field(description="2-sentence summary")
    topics: List[str] = Field(description="Key topics covered")
    difficulty: Literal["beginner", "intermediate", "advanced"]
    estimated_read_time: int = Field(description="Minutes to read")

search_index = (
    session.read.docs("s3://docs/**/*.md", content_type="markdown")
    .with_column("parsed", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks("parsed", header_level=2)
    )
    .explode("chunks")
    .with_column(
        "chunk_metadata",
        fc.semantic.extract("chunks", SearchMetadata, model_alias="mini")
    )
    .unnest("chunk_metadata")
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("chunks").content)
    )
    .with_column(
		    "fingerprint",
		    fc.concat_ws("|", fc.col("title"), fc.col("summary"))
)
    )
    .cache()
)

search_index.write.parquet("s3://indexes/docs_search/")

The declarative approach enables automatic batching of LLM calls across documents, reducing latency from sequential to batch processing.

Hybrid Search for Cost Optimization

Pure semantic search becomes expensive at scale. Hybrid approaches combine fast filtering with semantic ranking.

Pre-Filtering Strategy

Apply cheap filters before semantic operations:

python
candidates = (
    search_index
    .filter(fc.col("category") == "technical")
    .filter(fc.col("estimated_read_time") < 15)
    .filter(fc.col("title").contains("kubernetes"))
)

ranked = (
    candidates
    .with_column(
        "relevance",
        fc.semantic.map(
            "Rate relevance to '{{query}}' on scale 1-10: {{summary}}",
            query=fc.lit("container orchestration patterns"),
            summary=fc.col("summary"),
            model_alias="mini"
        )
    )
    .filter(fc.col("relevance") > 7)
    .order_by(fc.col("relevance").desc())
    .limit(10)
)

This reduces semantic processing to a fraction of the corpus, cutting costs by orders of magnitude.

Fuzzy Matching for Candidate Generation

Use fuzzy string matching before semantic operations:

python
candidates = (
    user_queries
    .join(document_index)
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("query_text"),
            fc.col("doc_title"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

final_matches = candidates.semantic.join(
    other=document_index,
    predicate="Does this document answer the query? Query: {{left_on}}, Doc: {{right_on}}",
    left_on=fc.col("query_text"),
    right_on=fc.col("doc_content"),
    model_alias="flash"
)

This pattern reduces semantic join operations from millions to thousands, making large-scale search economically viable.

Performance Optimization

Leverage automatic optimizations while applying strategic manual tuning.

Automatic Batching

The query engine batches operations automatically:

python
results = (
    large_corpus
    .with_column("summary", fc.semantic.map(
        "Summarize in 2 sentences: {{content}}",
        content=fc.col("raw_text"),
        model_alias="mini"
    ))
    .with_column("category", fc.semantic.classify(
        fc.col("summary"),
        classes=["technical", "business", "legal"],
        model_alias="nano"
    ))
    .collect()
)

# Access metrics correctly
metrics = results.metrics
lm = metrics.total_lm_metrics

# Calculate total tokens
total_input_tokens = lm.num_uncached_input_tokens + lm.num_cached_input_tokens
total_tokens = total_input_tokens + lm.num_output_tokens

print(f"Total tokens: {total_tokens}")
print(f"Total cost: ${lm.cost}")

The framework handles:

  • Request grouping for GPU utilization
  • Rate limit self-throttling
  • Automatic retry logic
  • Cost tracking through metrics

Caching for Development

Cache expensive operations during iteration:

python
base_index = (
    documents
    .with_column("embedding", fc.semantic.embed("content"))
    .with_column("metadata", fc.semantic.extract("content", Schema))
    .cache()
)

approach_1 = base_index.filter(condition_1).collect()
approach_2 = base_index.filter(condition_2).collect()
approach_3 = base_index.with_column("score", compute_score()).collect()

Model Profiles

Configure behavior dynamically:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "claude": fc.AnthropicLanguageModel(
                model_name="claude-opus-4-0",
                rpm=100,
                profiles={
                    "fast": fc.AnthropicLanguageModel.Profile(
                        thinking_token_budget=1024
                    ),
                    "thorough": fc.AnthropicLanguageModel.Profile(
                        thinking_token_budget=4096
                    )
                },
                default_profile="fast"
            )
        }
    )
)

quick_scan = df.semantic.map(prompt_simple, model_alias="claude")
deep_analysis = df.semantic.map(
    prompt_complex,
    model_alias=fc.ModelAlias(name="claude", profile="thorough")
)

Search APIs with MCP Tools

Expose search through Model Context Protocol tools:

python
from fenic import ToolParam, StringType, IntegerType
from fenic.api.mcp.server import create_mcp_server

# Create the base query with tool parameters
index = session.read.parquet("s3://indexes/docs_search/")

search_query = (
    index
    .filter(
        fc.coalesce(
            fc.col("category") == fc.tool_param("category", StringType),
            fc.lit(True)
        )
    )
    .limit(fc.tool_param("max_results", IntegerType))
    .select("title", "summary", "url")
)

# Register the tool with the catalog
session.catalog.create_tool(
    tool_name="semantic_search",
    tool_description="Search documents using semantic similarity",
    tool_query=search_query,
    tool_params=[
        ToolParam(name="category", description="Filter by category", has_default=True, default_value=None),
        ToolParam(name="max_results", description="Maximum number of results", has_default=True, default_value=10)
    ],
    result_limit=100
)

# Create MCP server
server = create_mcp_server(
    session=session,
    server_name="Search Server",
    user_defined_tools=[session.catalog.describe_tool("semantic_search")]
)

Production Implementation Pattern

A complete search system following the HN research agent pattern:

python
# Step 1: Denormalize base data
base_data = (
    session.read.parquet("s3://raw/documents/")
    .with_column("parsed", fc.col("content").cast(fc.MarkdownType))
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks("parsed", header_level=2)
    )
)

# Step 2: Build lookup tables
section_to_doc = (
    base_data
    .select("doc_id", "sections")
    .explode("sections")
    .select(
        fc.col("sections").section_id.alias("section_id"),
        fc.col("doc_id")
    )
)

# Step 3: Create searchable index
search_index = (
    base_data
    .explode("sections")
    .with_column(
        "embedding",
        fc.semantic.embed(fc.col("sections").content)
    )
    .with_column(
        "metadata",
        fc.semantic.extract(
            fc.col("sections").content,
            SearchMetadata,
            model_alias="mini"
        )
    )
)

# Step 4: Register tools
from fenic import ToolParam, StringType, IntegerType, FloatType
from fenic.api.mcp.server import create_mcp_server

# Create search_sections tool
search_sections_query = (
    search_index
    .filter(fc.col("sections").content.rlike(fc.tool_param("pattern", StringType)))
    .join(section_to_doc, on="section_id")
    .order_by("relevance_score")
    .limit(fc.tool_param("max_results", IntegerType))
)

session.catalog.create_tool(
    tool_name="search_sections",
    tool_description="Search for sections matching a pattern",
    tool_query=search_sections_query,
    tool_params=[
        ToolParam(name="pattern", description="Regex pattern to search for"),
        ToolParam(name="max_results", description="Maximum results", has_default=True, default_value=20)
    ],
    result_limit=100
)

# Create get_similar_sections tool
similar_sections_query = (
    search_index
    .filter(fc.col("section_id") != fc.tool_param("section_id", StringType))
    .order_by("similarity_score")
    .limit(20)
)

session.catalog.create_tool(
    tool_name="get_similar_sections",
    tool_description="Get sections similar to a given section",
    tool_query=similar_sections_query,
    tool_params=[
        ToolParam(name="section_id", description="Reference section ID")
    ],
    result_limit=100
)

# Create MCP server with both tools
server = create_mcp_server(
    session=session,
    server_name="Section Search Server",
    user_defined_tools=[
        session.catalog.describe_tool("search_sections"),
        session.catalog.describe_tool("get_similar_sections")
    ]
)

This provides:

  • Curated context through denormalized lookups
  • Efficient retrieval via precomputed embeddings
  • Tool-based agent interface
  • Cost control through strategic model selection

Monitoring and Observability

Track performance, costs, and quality:

python
result = search_pipeline.collect()
metrics = result.metrics

# Access execution time in milliseconds
logger.info(f"Execution time: {metrics.execution_time_ms}ms")

# Access LM metrics correctly
lm = metrics.total_lm_metrics
total_input_tokens = lm.num_uncached_input_tokens + lm.num_cached_input_tokens
total_tokens = total_input_tokens + lm.num_output_tokens

logger.info(f"Total tokens: {total_tokens}")
logger.info(f"Total cost: ${lm.cost}")

# Iterate through operator metrics
for op_metric in metrics.operator_metrics:
    if op_metric.lm_metrics and op_metric.lm_metrics.cost > 5.0:
        logger.warning(
            f"Expensive operator: {op_metric.operator_id}, "
            f"Cost: ${op_metric.lm_metrics.cost}, "
            f"Time: {op_metric.execution_time_ms}ms"
        )

Metrics enable cost attribution, performance profiling, quality monitoring, and capacity planning.

Production Deployment Best Practices

Separate Batch from Real-Time

Precompute expensive operations in batch pipelines:

python
# Offline batch pipeline
enriched_index = (
    raw_documents
    .with_column("embedding", fc.semantic.embed("content"))
    .with_column("metadata", fc.semantic.extract("content", Schema))
    .with_column(
        "summary",
        fc.semantic.map(
            "Summarize in 100 words: {{content}}",
            content=fc.col("content"),
            model_alias="mini"
        )
    )
)

enriched_index.write.parquet("s3://indexes/enriched/")

# Online serving: fast retrieval
def search_handler(query: str):
    index = session.read.parquet("s3://indexes/enriched/")
    query_emb = fc.semantic.embed(fc.lit(query))

    return (
        index
        .with_column("score", fc.cosine_similarity("embedding", query_emb))
        .order_by(fc.col("score").desc())
        .limit(10)
    )

Benefits: predictable latency, better resource utilization, easier debugging.

Implement Row-Level Lineage

Track data flow for debugging:

python
# Lineage is called on the DataFrame, not the QueryResult
lineage = search_pipeline.lineage()

# After collecting results, you would need to identify specific row IDs
# and trace them through the lineage
result = search_pipeline.collect()

# Example: trace specific rows backwards
# (assuming you have row identifiers to trace)
row_ids = ["uuid1", "uuid2"]  # These would be actual row identifiers
source_rows = lineage.backward(row_ids)

logger.info(f"Traced rows back to source: {source_rows}")

Design Clear Schemas

Schema quality impacts extraction accuracy:

python
class DocumentMetadata(BaseModel):
    """Extracted metadata for search indexing."""

    title: str = Field(
        description="Clear, descriptive title summarizing document content"
    )

    category: Literal["tutorial", "reference", "guide", "api_docs"] = Field(
        description=(
            "Document type: "
            "tutorial=step-by-step instructions, "
            "reference=API/technical specs, "
            "guide=conceptual explanation, "
            "api_docs=endpoint documentation"
        )
    )

    key_concepts: List[str] = Field(
        description="3-5 main technical concepts covered",
        min_items=3,
        max_items=5
    )

    difficulty: Literal["beginner", "intermediate", "advanced"] = Field(
        description=(
            "beginner=no prior knowledge, "
            "intermediate=basic familiarity, "
            "advanced=expert-level content"
        )
    )

Detailed descriptions with constraints reduce hallucination.

Test on Samples First

Validate logic and costs before scaling:

python
sample = full_corpus.limit(100)

test_result = (
    sample
    .with_column("embedding", fc.semantic.embed("content"))
    .with_column("metadata", fc.semantic.extract("content", Schema))
    .collect()
)

# Access metrics correctly
metrics = test_result.metrics
lm = metrics.total_lm_metrics

print(f"Cost for 100 docs: ${lm.cost}")
estimated_full_cost = lm.cost * (full_corpus.count() / 100)
print(f"Estimated full corpus cost: ${estimated_full_cost}")

# Access data correctly - test_result.data contains the actual dataframe
# Assuming polars format (default)
for row in test_result.data.head(10).iter_rows(named=True):
    print(f"Title: {row['metadata']['title']}")
    print(f"Category: {row['metadata']['category']}")

if quality_acceptable and cost_acceptable:
    full_result = full_corpus.with_column(...).collect()

Production Case Study

RudderStack's implementation shows these principles at scale:

Problem: Support tickets, documentation, and knowledge scattered across systems requiring 95% PM time for manual triage.

Architecture:

  1. Ingest from warehouse, Notion, support systems
  2. Build semantic context - enrich product taxonomy, create semantic links
  3. Expose MCP search tools
  4. Automated triage agent

Results:

  • 95% triage time reduction
  • 90%+ first-pass accuracy
  • Direct citation surfacing
  • Warehouse-native context stays current

Success factors: inference-first architecture, batch preprocessing, semantic linking, tool-based interfaces.

Patterns for Specific Use Cases

Semantic Clustering

Group documents without predefined categories:

python
clustered = (
    documents
    .with_column("embedding", fc.semantic.embed("content"))
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=20
    )
    .with_column(
        "cluster_summary",
        fc.semantic.reduce(
            "Summarize common themes: {{contents}}",
            fc.col("content"),
            model_alias="mini"
        )
    )
    .group_by("cluster_label")
)

Multi-Modal Search

Extend to images and PDFs:

python
docs = (
    session.read.pdf_metadata("data/reports/**/*.pdf", recursive=True)
    .with_column(
        "markdown",
        fc.semantic.parse_pdf(
            fc.col("file_path"),
            page_separator="--- PAGE {page} ---",
            describe_images=True
        )
    )
    .with_column(
        "sections",
        fc.markdown.extract_header_chunks("markdown", header_level=2)
    )
    .explode("sections")
    .with_column("embedding", fc.semantic.embed(fc.col("sections").content))
)

Time-Aware Search

Incorporate temporal dimensions:

python
time_aware_index = (
    documents
    .with_column("timestamp", fc.dt.to_timestamp("created_at", "yyyy-MM-dd"))
    .with_column("embedding", fc.semantic.embed("content"))
    .with_column(
        "recency_boost",
        fc.when(
            fc.col("timestamp") > fc.current_timestamp() - fc.expr("INTERVAL 30 DAYS"),
            1.5
        ).otherwise(1.0)
    )
)

results = (
    time_aware_index
    .with_column(
        "base_similarity",
        fc.cosine_similarity(fc.col("embedding"), query_embedding)
    )
    .with_column(
        "final_score",
        fc.col("base_similarity") * fc.col("recency_boost")
    )
    .order_by(fc.col("final_score").desc())
)

Migration from Legacy Systems

Adopt semantic capabilities incrementally.

Phase 1: Semantic Enrichment

Enhance existing indexes:

python
existing_index = session.read.parquet("s3://legacy/search_index/")

enriched = (
    existing_index
    .with_column("embedding", fc.semantic.embed("content"))
    .with_column(
        "semantic_metadata",
        fc.semantic.extract("content", EnhancedSchema, model_alias="mini")
    )
)

enriched.write.parquet("s3://semantic/search_index/")

Phase 2: Hybrid Retrieval

Combine keyword and semantic:

python
def hybrid_search(query: str, k: int = 10):
    keyword_results = legacy_search(query, k=50)
    query_embedding = fc.semantic.embed(fc.lit(query))

    reranked = (
        session.read.parquet(keyword_results)
        .with_column(
            "semantic_score",
            fc.cosine_similarity(fc.col("embedding"), query_embedding)
        )
        .with_column(
            "combined_score",
            0.7 * fc.col("semantic_score") + 0.3 * fc.col("keyword_score")
        )
        .order_by(fc.col("combined_score").desc())
        .limit(k)
    )

    return reranked.collect()

Phase 3: Full Migration

Move to semantic-first architecture after validation.

Performance Benchmarks

Based on production deployments:

Performance gains:

  • 100-300x speedups vs naive implementations
  • Order-of-magnitude improvements for semantic joins
  • 45% reduction in time-to-insight

Cost optimization:

  • 10x year-over-year inference cost decline
  • Model cascades reduce invocations by orders of magnitude
  • Strategic model selection cuts costs 80%

Accuracy metrics:

  • 97% accuracy on classification tasks with proper architecture
  • Schema-driven extraction eliminates validation overhead
  • Statistical guarantees enable production deployment

Implementation Checklist

Architecture:

  • Implement inference-first platform with semantic operators
  • Configure multi-provider model access
  • Set up native AI data types

Indexing:

  • Build batch processing pipelines
  • Generate embeddings for corpus
  • Extract structured metadata
  • Create lookup tables for fast retrieval

Search:

  • Implement hybrid filtering strategies
  • Configure semantic joins for ranking
  • Set up MCP tools for agent access
  • Add temporal awareness if needed

Operations:

  • Deploy monitoring and metrics tracking
  • Implement row-level lineage
  • Set up cost tracking
  • Configure caching strategies

Testing:

  • Validate on small samples
  • Estimate full corpus costs
  • Test quality thresholds
  • Measure latency targets

Next Steps

Start with a focused implementation:

  1. Install Fenic and configure provider access
  2. Build a simple extraction pipeline on sample documents
  3. Generate embeddings and test similarity search
  4. Add semantic operators incrementally
  5. Measure performance and costs
  6. Scale to production after validation

The semantic search patterns in this guide enable production systems that scale reliably while maintaining cost control. Organizations implementing inference-first architectures report significant improvements in search quality, development speed, and operational efficiency.


Resources:

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.