Most organizations fail to scale semantic search beyond prototypes. Traditional data platforms lack the architecture needed for inference workloads, resulting in brittle implementations that can't handle production traffic. This guide shows how to build production-grade semantic search using inference-first architecture.
Inference-First Architecture for Semantic Search
Traditional approaches retrofit LLM capabilities onto SQL databases or DataFrame libraries built for structured data. The query optimizer can't see inference operations, making optimization impossible.
An inference-first architecture embeds semantic operations directly into the query engine as first-class primitives. When the optimizer recognizes semantic.extract() or semantic.join() as inference operations with specific characteristics—high latency, token costs, batching requirements—it applies optimizations impossible with external API calls.
Core architectural requirements:
- Declarative operations for global optimization
- Semantic operators as DataFrame primitives
- Multi-provider abstraction without vendor lock-in
- Native AI data types (Markdown, JSON, transcripts, embeddings)
Fenic implements this architecture by treating inference as first-class operations within a PySpark-inspired DataFrame framework.
Semantic Operators for Search Infrastructure
Build semantic search with operators that transform unstructured data into searchable formats.
Schema-Driven Extraction
Extract structured data using Pydantic schemas with semantic.extract:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal import fenic as fc class DocumentMetadata(BaseModel): title: str = Field(description="Document title or heading") category: Literal["technical", "business", "legal", "support"] key_topics: List[str] = Field(description="Main topics covered") entities: List[str] = Field(description="Named entities mentioned") structured = ( documents .with_column( "metadata", fc.semantic.extract("raw_content", DocumentMetadata, model_alias="mini") ) .unnest("metadata") )
Define output structure once and get validated results consistently. After extraction, use standard DataFrame operations on structured fields.
Embedding Generation
Generate vector representations with semantic.embed:
pythonsearchable = ( documents .with_column("raw_md", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("raw_md", header_level=2) ) .explode("chunks") .with_column( "embedding", fc.semantic.embed(fc.col("chunks").content) ) .with_column( "metadata", fc.semantic.extract("chunks", DocumentMetadata, model_alias="cheap") ) ) searchable.write.parquet("s3://data-lake/searchable/")
The framework batches requests efficiently across OpenAI, Cohere, and Google providers.
Semantic Joins
Match based on meaning rather than exact values with semantic.join:
pythonprompt = """ Does this document answer the user's question? Question: {{left_on}} Document: {{right_on}} Consider topic relevance, completeness, and accuracy. """ relevant_docs = ( queries.semantic.join( other=document_index, predicate=prompt, left_on=fc.col("user_query"), right_on=fc.col("doc_summary"), model_alias="fast" ) .limit(10) )
The framework optimizes joins through batching and embedding-based candidate filtering before expensive LLM verification.
Natural Language Filtering
Filter by content with semantic.predicate:
pythontechnical_issues = documents.filter( (fc.col("category") == "support") & fc.semantic.predicate( "Does this describe a technical problem? Content: {{content}}", content=fc.col("raw_text"), model_alias="nano" ) )
The query optimizer applies cheap conditions before expensive semantic predicates.
Document Processing Pipeline
Process large document corpora efficiently through batch pipelines rather than per-document LLM calls.
Structure-Aware Chunking
Chunk documents based on structure:
pythonprocessed = ( raw_docs .with_column("parsed_md", fc.col("content").cast(fc.MarkdownType)) .with_column( "sections", fc.markdown.extract_header_chunks( "parsed_md", header_level=2 ) ) .explode("sections") )
Structure-aware chunking preserves section context and avoids mid-sentence splits, improving search quality significantly.
Multi-Provider Configuration
Configure models for specific tasks:
pythonconfig = fc.SessionConfig( app_name="semantic_search", semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( "gpt-4o-nano", rpm=500, tpm=200_000 ), "mini": fc.OpenAILanguageModel( "gpt-4o-mini", rpm=300, tpm=150_000 ), "flash": fc.GoogleVertexLanguageModel( "gemini-2.0-flash", rpm=300, tpm=150_000 ), "powerful": fc.AnthropicLanguageModel( "claude-opus-4-0", rpm=100, input_tpm=100, output_tpm=100 ) }, default_language_model="flash", ), ) session = fc.Session.get_or_create(config)
Use cheap models for classification, reserve expensive models for tasks requiring accuracy. Switch providers without code changes using model aliases.
Building Production Search Indexes
Combine extraction, embedding, and metadata enrichment in optimized pipelines:
pythonfrom pydantic import BaseModel, Field class SearchMetadata(BaseModel): title: str summary: str = Field(description="2-sentence summary") topics: List[str] = Field(description="Key topics covered") difficulty: Literal["beginner", "intermediate", "advanced"] estimated_read_time: int = Field(description="Minutes to read") search_index = ( session.read.docs("s3://docs/**/*.md", content_type="markdown") .with_column("parsed", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("parsed", header_level=2) ) .explode("chunks") .with_column( "chunk_metadata", fc.semantic.extract("chunks", SearchMetadata, model_alias="mini") ) .unnest("chunk_metadata") .with_column( "embedding", fc.semantic.embed(fc.col("chunks").content) ) .with_column( "fingerprint", fc.concat_ws("|", fc.col("title"), fc.col("summary")) ) ) .cache() ) search_index.write.parquet("s3://indexes/docs_search/")
The declarative approach enables automatic batching of LLM calls across documents, reducing latency from sequential to batch processing.
Hybrid Search for Cost Optimization
Pure semantic search becomes expensive at scale. Hybrid approaches combine fast filtering with semantic ranking.
Pre-Filtering Strategy
Apply cheap filters before semantic operations:
pythoncandidates = ( search_index .filter(fc.col("category") == "technical") .filter(fc.col("estimated_read_time") < 15) .filter(fc.col("title").contains("kubernetes")) ) ranked = ( candidates .with_column( "relevance", fc.semantic.map( "Rate relevance to '{{query}}' on scale 1-10: {{summary}}", query=fc.lit("container orchestration patterns"), summary=fc.col("summary"), model_alias="mini" ) ) .filter(fc.col("relevance") > 7) .order_by(fc.col("relevance").desc()) .limit(10) )
This reduces semantic processing to a fraction of the corpus, cutting costs by orders of magnitude.
Fuzzy Matching for Candidate Generation
Use fuzzy string matching before semantic operations:
pythoncandidates = ( user_queries .join(document_index) .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("query_text"), fc.col("doc_title"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) ) final_matches = candidates.semantic.join( other=document_index, predicate="Does this document answer the query? Query: {{left_on}}, Doc: {{right_on}}", left_on=fc.col("query_text"), right_on=fc.col("doc_content"), model_alias="flash" )
This pattern reduces semantic join operations from millions to thousands, making large-scale search economically viable.
Performance Optimization
Leverage automatic optimizations while applying strategic manual tuning.
Automatic Batching
The query engine batches operations automatically:
pythonresults = ( large_corpus .with_column("summary", fc.semantic.map( "Summarize in 2 sentences: {{content}}", content=fc.col("raw_text"), model_alias="mini" )) .with_column("category", fc.semantic.classify( fc.col("summary"), classes=["technical", "business", "legal"], model_alias="nano" )) .collect() ) # Access metrics correctly metrics = results.metrics lm = metrics.total_lm_metrics # Calculate total tokens total_input_tokens = lm.num_uncached_input_tokens + lm.num_cached_input_tokens total_tokens = total_input_tokens + lm.num_output_tokens print(f"Total tokens: {total_tokens}") print(f"Total cost: ${lm.cost}")
The framework handles:
- Request grouping for GPU utilization
- Rate limit self-throttling
- Automatic retry logic
- Cost tracking through metrics
Caching for Development
Cache expensive operations during iteration:
pythonbase_index = ( documents .with_column("embedding", fc.semantic.embed("content")) .with_column("metadata", fc.semantic.extract("content", Schema)) .cache() ) approach_1 = base_index.filter(condition_1).collect() approach_2 = base_index.filter(condition_2).collect() approach_3 = base_index.with_column("score", compute_score()).collect()
Model Profiles
Configure behavior dynamically:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "claude": fc.AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, profiles={ "fast": fc.AnthropicLanguageModel.Profile( thinking_token_budget=1024 ), "thorough": fc.AnthropicLanguageModel.Profile( thinking_token_budget=4096 ) }, default_profile="fast" ) } ) ) quick_scan = df.semantic.map(prompt_simple, model_alias="claude") deep_analysis = df.semantic.map( prompt_complex, model_alias=fc.ModelAlias(name="claude", profile="thorough") )
Search APIs with MCP Tools
Expose search through Model Context Protocol tools:
pythonfrom fenic import ToolParam, StringType, IntegerType from fenic.api.mcp.server import create_mcp_server # Create the base query with tool parameters index = session.read.parquet("s3://indexes/docs_search/") search_query = ( index .filter( fc.coalesce( fc.col("category") == fc.tool_param("category", StringType), fc.lit(True) ) ) .limit(fc.tool_param("max_results", IntegerType)) .select("title", "summary", "url") ) # Register the tool with the catalog session.catalog.create_tool( tool_name="semantic_search", tool_description="Search documents using semantic similarity", tool_query=search_query, tool_params=[ ToolParam(name="category", description="Filter by category", has_default=True, default_value=None), ToolParam(name="max_results", description="Maximum number of results", has_default=True, default_value=10) ], result_limit=100 ) # Create MCP server server = create_mcp_server( session=session, server_name="Search Server", user_defined_tools=[session.catalog.describe_tool("semantic_search")] )
Production Implementation Pattern
A complete search system following the HN research agent pattern:
python# Step 1: Denormalize base data base_data = ( session.read.parquet("s3://raw/documents/") .with_column("parsed", fc.col("content").cast(fc.MarkdownType)) .with_column( "sections", fc.markdown.extract_header_chunks("parsed", header_level=2) ) ) # Step 2: Build lookup tables section_to_doc = ( base_data .select("doc_id", "sections") .explode("sections") .select( fc.col("sections").section_id.alias("section_id"), fc.col("doc_id") ) ) # Step 3: Create searchable index search_index = ( base_data .explode("sections") .with_column( "embedding", fc.semantic.embed(fc.col("sections").content) ) .with_column( "metadata", fc.semantic.extract( fc.col("sections").content, SearchMetadata, model_alias="mini" ) ) ) # Step 4: Register tools from fenic import ToolParam, StringType, IntegerType, FloatType from fenic.api.mcp.server import create_mcp_server # Create search_sections tool search_sections_query = ( search_index .filter(fc.col("sections").content.rlike(fc.tool_param("pattern", StringType))) .join(section_to_doc, on="section_id") .order_by("relevance_score") .limit(fc.tool_param("max_results", IntegerType)) ) session.catalog.create_tool( tool_name="search_sections", tool_description="Search for sections matching a pattern", tool_query=search_sections_query, tool_params=[ ToolParam(name="pattern", description="Regex pattern to search for"), ToolParam(name="max_results", description="Maximum results", has_default=True, default_value=20) ], result_limit=100 ) # Create get_similar_sections tool similar_sections_query = ( search_index .filter(fc.col("section_id") != fc.tool_param("section_id", StringType)) .order_by("similarity_score") .limit(20) ) session.catalog.create_tool( tool_name="get_similar_sections", tool_description="Get sections similar to a given section", tool_query=similar_sections_query, tool_params=[ ToolParam(name="section_id", description="Reference section ID") ], result_limit=100 ) # Create MCP server with both tools server = create_mcp_server( session=session, server_name="Section Search Server", user_defined_tools=[ session.catalog.describe_tool("search_sections"), session.catalog.describe_tool("get_similar_sections") ] )
This provides:
- Curated context through denormalized lookups
- Efficient retrieval via precomputed embeddings
- Tool-based agent interface
- Cost control through strategic model selection
Monitoring and Observability
Track performance, costs, and quality:
pythonresult = search_pipeline.collect() metrics = result.metrics # Access execution time in milliseconds logger.info(f"Execution time: {metrics.execution_time_ms}ms") # Access LM metrics correctly lm = metrics.total_lm_metrics total_input_tokens = lm.num_uncached_input_tokens + lm.num_cached_input_tokens total_tokens = total_input_tokens + lm.num_output_tokens logger.info(f"Total tokens: {total_tokens}") logger.info(f"Total cost: ${lm.cost}") # Iterate through operator metrics for op_metric in metrics.operator_metrics: if op_metric.lm_metrics and op_metric.lm_metrics.cost > 5.0: logger.warning( f"Expensive operator: {op_metric.operator_id}, " f"Cost: ${op_metric.lm_metrics.cost}, " f"Time: {op_metric.execution_time_ms}ms" )
Metrics enable cost attribution, performance profiling, quality monitoring, and capacity planning.
Production Deployment Best Practices
Separate Batch from Real-Time
Precompute expensive operations in batch pipelines:
python# Offline batch pipeline enriched_index = ( raw_documents .with_column("embedding", fc.semantic.embed("content")) .with_column("metadata", fc.semantic.extract("content", Schema)) .with_column( "summary", fc.semantic.map( "Summarize in 100 words: {{content}}", content=fc.col("content"), model_alias="mini" ) ) ) enriched_index.write.parquet("s3://indexes/enriched/") # Online serving: fast retrieval def search_handler(query: str): index = session.read.parquet("s3://indexes/enriched/") query_emb = fc.semantic.embed(fc.lit(query)) return ( index .with_column("score", fc.cosine_similarity("embedding", query_emb)) .order_by(fc.col("score").desc()) .limit(10) )
Benefits: predictable latency, better resource utilization, easier debugging.
Implement Row-Level Lineage
Track data flow for debugging:
python# Lineage is called on the DataFrame, not the QueryResult lineage = search_pipeline.lineage() # After collecting results, you would need to identify specific row IDs # and trace them through the lineage result = search_pipeline.collect() # Example: trace specific rows backwards # (assuming you have row identifiers to trace) row_ids = ["uuid1", "uuid2"] # These would be actual row identifiers source_rows = lineage.backward(row_ids) logger.info(f"Traced rows back to source: {source_rows}")
Design Clear Schemas
Schema quality impacts extraction accuracy:
pythonclass DocumentMetadata(BaseModel): """Extracted metadata for search indexing.""" title: str = Field( description="Clear, descriptive title summarizing document content" ) category: Literal["tutorial", "reference", "guide", "api_docs"] = Field( description=( "Document type: " "tutorial=step-by-step instructions, " "reference=API/technical specs, " "guide=conceptual explanation, " "api_docs=endpoint documentation" ) ) key_concepts: List[str] = Field( description="3-5 main technical concepts covered", min_items=3, max_items=5 ) difficulty: Literal["beginner", "intermediate", "advanced"] = Field( description=( "beginner=no prior knowledge, " "intermediate=basic familiarity, " "advanced=expert-level content" ) )
Detailed descriptions with constraints reduce hallucination.
Test on Samples First
Validate logic and costs before scaling:
pythonsample = full_corpus.limit(100) test_result = ( sample .with_column("embedding", fc.semantic.embed("content")) .with_column("metadata", fc.semantic.extract("content", Schema)) .collect() ) # Access metrics correctly metrics = test_result.metrics lm = metrics.total_lm_metrics print(f"Cost for 100 docs: ${lm.cost}") estimated_full_cost = lm.cost * (full_corpus.count() / 100) print(f"Estimated full corpus cost: ${estimated_full_cost}") # Access data correctly - test_result.data contains the actual dataframe # Assuming polars format (default) for row in test_result.data.head(10).iter_rows(named=True): print(f"Title: {row['metadata']['title']}") print(f"Category: {row['metadata']['category']}") if quality_acceptable and cost_acceptable: full_result = full_corpus.with_column(...).collect()
Production Case Study
RudderStack's implementation shows these principles at scale:
Problem: Support tickets, documentation, and knowledge scattered across systems requiring 95% PM time for manual triage.
Architecture:
- Ingest from warehouse, Notion, support systems
- Build semantic context - enrich product taxonomy, create semantic links
- Expose MCP search tools
- Automated triage agent
Results:
- 95% triage time reduction
- 90%+ first-pass accuracy
- Direct citation surfacing
- Warehouse-native context stays current
Success factors: inference-first architecture, batch preprocessing, semantic linking, tool-based interfaces.
Patterns for Specific Use Cases
Semantic Clustering
Group documents without predefined categories:
pythonclustered = ( documents .with_column("embedding", fc.semantic.embed("content")) .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=20 ) .with_column( "cluster_summary", fc.semantic.reduce( "Summarize common themes: {{contents}}", fc.col("content"), model_alias="mini" ) ) .group_by("cluster_label") )
Multi-Modal Search
Extend to images and PDFs:
pythondocs = ( session.read.pdf_metadata("data/reports/**/*.pdf", recursive=True) .with_column( "markdown", fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ) ) .with_column( "sections", fc.markdown.extract_header_chunks("markdown", header_level=2) ) .explode("sections") .with_column("embedding", fc.semantic.embed(fc.col("sections").content)) )
Time-Aware Search
Incorporate temporal dimensions:
pythontime_aware_index = ( documents .with_column("timestamp", fc.dt.to_timestamp("created_at", "yyyy-MM-dd")) .with_column("embedding", fc.semantic.embed("content")) .with_column( "recency_boost", fc.when( fc.col("timestamp") > fc.current_timestamp() - fc.expr("INTERVAL 30 DAYS"), 1.5 ).otherwise(1.0) ) ) results = ( time_aware_index .with_column( "base_similarity", fc.cosine_similarity(fc.col("embedding"), query_embedding) ) .with_column( "final_score", fc.col("base_similarity") * fc.col("recency_boost") ) .order_by(fc.col("final_score").desc()) )
Migration from Legacy Systems
Adopt semantic capabilities incrementally.
Phase 1: Semantic Enrichment
Enhance existing indexes:
pythonexisting_index = session.read.parquet("s3://legacy/search_index/") enriched = ( existing_index .with_column("embedding", fc.semantic.embed("content")) .with_column( "semantic_metadata", fc.semantic.extract("content", EnhancedSchema, model_alias="mini") ) ) enriched.write.parquet("s3://semantic/search_index/")
Phase 2: Hybrid Retrieval
Combine keyword and semantic:
pythondef hybrid_search(query: str, k: int = 10): keyword_results = legacy_search(query, k=50) query_embedding = fc.semantic.embed(fc.lit(query)) reranked = ( session.read.parquet(keyword_results) .with_column( "semantic_score", fc.cosine_similarity(fc.col("embedding"), query_embedding) ) .with_column( "combined_score", 0.7 * fc.col("semantic_score") + 0.3 * fc.col("keyword_score") ) .order_by(fc.col("combined_score").desc()) .limit(k) ) return reranked.collect()
Phase 3: Full Migration
Move to semantic-first architecture after validation.
Performance Benchmarks
Based on production deployments:
Performance gains:
- 100-300x speedups vs naive implementations
- Order-of-magnitude improvements for semantic joins
- 45% reduction in time-to-insight
Cost optimization:
- 10x year-over-year inference cost decline
- Model cascades reduce invocations by orders of magnitude
- Strategic model selection cuts costs 80%
Accuracy metrics:
- 97% accuracy on classification tasks with proper architecture
- Schema-driven extraction eliminates validation overhead
- Statistical guarantees enable production deployment
Implementation Checklist
Architecture:
- Implement inference-first platform with semantic operators
- Configure multi-provider model access
- Set up native AI data types
Indexing:
- Build batch processing pipelines
- Generate embeddings for corpus
- Extract structured metadata
- Create lookup tables for fast retrieval
Search:
- Implement hybrid filtering strategies
- Configure semantic joins for ranking
- Set up MCP tools for agent access
- Add temporal awareness if needed
Operations:
- Deploy monitoring and metrics tracking
- Implement row-level lineage
- Set up cost tracking
- Configure caching strategies
Testing:
- Validate on small samples
- Estimate full corpus costs
- Test quality thresholds
- Measure latency targets
Next Steps
Start with a focused implementation:
- Install Fenic and configure provider access
- Build a simple extraction pipeline on sample documents
- Generate embeddings and test similarity search
- Add semantic operators incrementally
- Measure performance and costs
- Scale to production after validation
The semantic search patterns in this guide enable production systems that scale reliably while maintaining cost control. Organizations implementing inference-first architectures report significant improvements in search quality, development speed, and operational efficiency.
Resources:
- Fenic Open Source
- Semantic Operators Guide
- Composable Semantic Operators
- RudderStack Case Study
- HN Research Agent
- Latest Release Notes How to Build Scalable Semant ... efcf08056a718d06ef7a80fb7.md External Displaying How to Build Scalable Semantic Search in AI Data P 2a5df41efcf08056a718d06ef7a80fb7.md.

