<< goback()

How to Build Scalable AI Data Pipelines with an Inference-First Engine

Typedef Team

How to Build Scalable AI Data Pipelines with an Inference-First Engine

Most AI projects fail to reach production because traditional data infrastructure wasn't built for LLM operations. Teams spend months stitching together OCR services, transcription APIs, vector databases, and data warehouses with brittle glue code that breaks at scale. The solution requires rethinking data engines from first principles with inference as a native operation.

What Makes an Engine Inference-First

Legacy data platforms optimize for rows and columns. They treat LLM calls as external black boxes through User Defined Functions (UDFs). The query engine cannot see inside these functions, preventing automatic batching, intelligent caching, operation reordering, and cost optimization.

An inference-first engine rebuilds the query optimizer with model awareness at its core. Fenic, the open-source framework from Typedef, demonstrates this architecture by embedding semantic operations directly into the DataFrame API. When the query engine sees semantic.extract() or semantic.join(), it knows this is inference with specific characteristics: high latency, token costs, batching opportunities, and caching potential.

This visibility enables optimizations impossible with traditional approaches:

  • Batch API calls across rows to minimize network overhead
  • Cache repeated inference patterns automatically
  • Reorder operations to reduce data volume before expensive inference
  • Estimate costs accurately before execution
  • Self-throttle to respect provider rate limits while maximizing throughput

The result: teams build deterministic workflows on top of non-deterministic models with production-grade reliability.

Core Architecture Components

DataFrames as the Unifying Layer

AI applications are data pipelines. They take inputs, reason over context, generate outputs, and log results. The DataFrame abstraction handles these patterns naturally while adding structure to probabilistic systems.

Lineage tracking: Every column and row has traceable origins, even from non-deterministic model outputs. When debugging unexpected results, trace backwards through transformations to identify where issues originated.

Columnar consistency: Whether dealing with summaries, embeddings, or classification scores, columns maintain structured types throughout the pipeline. This consistency eliminates manual type checking and validation code.

Deterministic transformations: Wrapping inference in declarative logic creates a pattern: model + prompt + input → output. This enables versioning, caching, and reproducibility with stochastic models.

AI-Native Data Types

Generic text columns force preprocessing logic throughout your codebase. Fenic provides specialized types that handle AI formats natively:

MarkdownType: Parse and extract structure from markdown documents without external preprocessing. The engine treats headers, lists, and code blocks as first-class structures.

TranscriptType: Process SRT, WebVTT, and transcript formats with speaker and timestamp awareness. Query by speaker, extract time ranges, or segment conversations without manual parsing.

JsonType: Manipulate nested JSON using JQ expressions within DataFrame operations. Extract deeply nested fields and transform structures without custom parsing functions.

DocumentPathType: Load PDFs, Word documents, and text files with automatic content extraction. The engine handles file I/O, format detection, and text extraction as native operations.

EmbeddingType: Store and operate on vector embeddings with similarity operations built in. Calculate cosine similarity, find nearest neighbors, or cluster embeddings using standard DataFrame syntax.

The Fenic 0.5.0 release added Date and Timestamp types with timezone-aware conversions, plus enhanced PDF parsing with page chunking and metadata extraction.

Multi-Provider Model Integration

Production systems need flexibility to route tasks to appropriate models. Fast, cheap models handle simple classification while capable models tackle extraction. Managing this across OpenAI, Anthropic, Google, and other providers manually creates operational overhead.

An inference-first engine handles provider differences transparently:

python
import fenic as fc

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            ),
            "accurate": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=50,
                input_tpm=100000,
                output_tpm=50000
            ),
            "efficient": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=200,
                tpm=200000
            )
        },
        default_language_model="fast"
    )
)

session = fc.Session.get_or_create(config)

Rate limiting parameters (rpm and tpm) prevent throttling. The engine automatically batches requests, implements retry logic, and self-throttles to stay within limits while maximizing throughput through async I/O.

Implementing Semantic Operations

Schema-Driven Extraction

Type-safe extraction from unstructured text eliminates prompt engineering brittleness. Define a Pydantic schema once and get validated results:

python
from pydantic import BaseModel, Field
from typing import Literal, List

class PolicyInsight(BaseModel):
    risk_level: Literal["low", "medium", "high", "critical"]
    coverage_gaps: List[str]
    recommendations: List[str]
    confidence: float = Field(ge=0, le=1)

results = df.select(
    "*",
    fc.semantic.extract(
        fc.col("policy_text"),
        PolicyInsight
    ).alias("insights")
)

The schema serves as documentation and validation. The engine constructs prompts automatically that produce the specified structure and validates responses against constraints. When models return invalid data, the framework retries with clarifying instructions.

Semantic Filtering

Traditional filtering relies on exact matches or regular expressions. Semantic predicates enable content-based filtering:

python
applicants = df.filter(
    (fc.col("years_of_experience") > 5) &
    fc.semantic.predicate(
        "Has MCP Protocol experience? Resume: {{resume}}",
        resume=fc.col("resume")
    )
)

This combines traditional boolean logic with semantic evaluation. The query engine optimizes both together—filtering on cheap boolean conditions before invoking expensive LLM predicates.

Predicates support Jinja templating with loops, conditionals, and dynamic row-specific logic:

python
fc.semantic.predicate(
    """
    Does this feedback mention {{ search_term }}?
    {% if priority == "high" %}
    Only return true if it's a critical issue.
    {% endif %}
    Feedback: {{ feedback_text }}
    """,
    search_term=fc.lit("UI problems"),
    priority=fc.col("priority"),
    feedback_text=fc.col("raw_feedback")
)

Meaning-Based Joins

Traditional joins match on exact values. Semantic joins determine matches based on meaning:

python
prompt = """
Is this candidate a good fit for the job?

Candidate Background: {{left_on}}
Job Requirements: {{right_on}}

Consider technical skills, experience level, and domain knowledge.
"""

matched = applicants.semantic.join(
    other=jobs_df,
    predicate=prompt,
    left_on=fc.col("resume"),
    right_on=fc.col("job_description")
).limit(5)

The predicate receives both left and right row data, enabling sophisticated matching logic. The engine optimizes semantic joins by batching LLM calls across candidate pairs and caching decisions for repeated comparisons.

This pattern works for:

  • Matching documents to queries in RAG systems
  • Linking related records across databases
  • Finding similar content
  • Deduplication based on semantic similarity

Classification and Transformation

The semantic.classify operator categorizes text with few-shot examples:

python
from fenic.core.types.semantic_examples import ClassifyExample, ClassifyExampleCollection

examples = ClassifyExampleCollection()
examples.create_example(ClassifyExample(input="App crashed on startup", output="bug"))
examples.create_example(ClassifyExample(input="Need ability to export data", output="feature"))
examples.create_example(ClassifyExample(input="How do I reset my password?", output="question"))

classified = df.select(
    "*",
    fc.semantic.classify(
        fc.col("feedback"),
        classes=["bug", "feature", "question"],
        examples=examples
    ).alias("category")
)

The semantic.map operator applies natural language transformations:

python
expanded = df.select(
    "*",
    fc.semantic.map(
        "Expand abbreviations in: {{text}}",
        text=fc.col("message")
    ).alias("expanded_text")
)

Semantic Aggregation

Group data by semantic similarity rather than exact matches:

python
# Use with_cluster_labels for semantic grouping instead
grouped = (
    df
    .with_column(
        "issue_embedding",
        fc.semantic.embed(fc.col("issue_description"))
    )
    .semantic.with_cluster_labels(
        by=fc.col("issue_embedding"),
        num_clusters=5,
        label_column="cluster_label"
    )
    .group_by("cluster_label")
    .agg(
		    fc.semantic.reduce(
		       "Summarize common themes",
		        fc.col("issue_description")
    ).alias("theme")
)
)

This enables clustering semantically related content without manual category definitions.

Production Reliability Features

Automatic Optimization and Batching

The query optimizer groups API calls efficiently. Instead of individual requests per row, the engine batches operations:

python
pipeline = (
    df
    .filter(fc.col("priority") == "high")
    .select(
        "*",
        fc.semantic.extract(fc.col("content"), TicketSchema).alias("ticket")
    )
    .filter(
        fc.semantic.predicate(
            "Is this urgent? {{description}}",
            description=fc.col("ticket.description")
        )
    )
)

Behind the scenes:

  1. Batches all extraction calls for high-priority tickets
  2. Respects provider rate limits through self-throttling
  3. Executes predicate filtering in parallel where possible
  4. Caches repeated inference patterns automatically

Async I/O and concurrent request batching maximize throughput while staying within rate limits.

Error Handling and Resilience

Built-in retry logic with exponential backoff handles transient failures. Rate limit errors trigger intelligent back-off rather than cascade failures.

Token counting and cost tracking provide visibility:

python
result = pipeline.collect()

print(f"Tokens used: {result.metrics.total_lm_metrics.num_output_tokens}")
print(f"Cost: ${result.metrics.total_lm_metrics.cost}")
print(f"Execution time: {result.metrics.execution_time_ms}ms")

The framework gracefully handles quota exhaustion by failing fast with clear error messages. Token capacity guardrails raise errors early if requests cannot fit within model limits.

Data Lineage and Debugging

Row-level lineage tracks individual record processing history. When debugging unexpected outputs, trace backwards through the computation graph:

python
result = pipeline.collect()

# Access the data in a supported format (e.g., Polars DataFrame)
df_result = result.data

# Iterate through the data
for row in df_result.iter_rows(named=True):
    # Process row data as needed
    print(f"Row data: {row}")

Explicit caching speeds iterative development:

python
expensive_pipeline = (
    df
    .select(fc.semantic.extract(fc.col("document"), Schema))
    .cache()
)

analysis_a = expensive_pipeline.filter(condition_a)
analysis_b = expensive_pipeline.filter(condition_b)

The explain() method visualizes query plans before execution:

python
pipeline.explain()
# Shows operation ordering, batching strategy, and estimated costs

Real-World Implementation

Document Triage at Scale

RudderStack reduced triage time by 95% using an inference-first pipeline processing support tickets, sales calls, and product documentation:

python
tickets = session.read.csv("s3://data/support_tickets/*.csv")
transcripts = session.read.docs("s3://data/calls/**/*.srt", content_type="transcript")
docs = session.read.docs("data/notion/**/*.md", content_type="markdown")

taxonomy = (
    docs
    .with_column("chunks", fc.markdown.extract_header_chunks(fc.col("content")))
    .explode("chunks")
    .select(
        "*",
        fc.semantic.extract(
            fc.col("chunks.content"),
            ProductFeatureSchema
        ).alias("features")
    )
)

processed_tickets = (
    tickets
    .semantic.join(
        other=taxonomy,
        predicate="Ticket relates to feature",
        left_on=fc.col("description"),
        right_on=fc.col("features.name")
    )
    .select(
        "*",
        fc.semantic.classify(
            fc.col("description"),
            examples=triage_examples
        ).alias("category")
    )
)

The pipeline achieves 90%+ first-pass category acceptance while surfacing relevant context from thousands of documents. Product managers review triage decisions in one pass instead of spending hours researching each request.

Content Classification Pipeline

Media companies classify and enrich articles at scale:

python
articles = (
    session
    .read.docs("content/**/*.md", content_type="markdown")
    .with_column("chunks", fc.markdown.extract_header_chunks(fc.col("content")))
    .explode("chunks")
    .select(
        "*",
        fc.semantic.classify(
            fc.col("chunks.content"),
            examples=content_examples
        ).alias("category"),
        fc.semantic.extract(
            fc.col("chunks.content"),
            EntitySchema
        ).alias("entities"),
        fc.semantic.embed(fc.col("chunks.content")).alias("embedding")
    )
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=50,
        label_column="topic_cluster"
    )
)

One pipeline handles classification, entity extraction, embedding generation, and semantic clustering without separate inference infrastructure.

Insurance Policy Analysis

Matic Insurance built semantic extraction pipelines across thousands of policies:

python
policies = (
    session
    .read.pdf_metadata("policies/**/*.pdf", recursive=True)
    .filter(fc.col("page_count") < 100)
    .select(
        "*",
        fc.semantic.parse_pdf(
            fc.col("file_path"),
            page_separator="--- PAGE {page} ---",
            describe_images=True
        ).alias("policy_text")
    )
    .select(
        "*",
        fc.semantic.extract(
            fc.col("policy_text"),
            PolicyCoverageSchema
        ).alias("coverage")
    )
    .unnest("coverage")
)

gaps = policies.semantic.join(
    other=standard_coverages,
    predicate="Policy lacks this coverage",
    left_on=fc.col("coverage.type"),
    right_on=fc.col("required_coverage")
)

Teams deploy pipelines in days instead of months, dramatically reducing errors from manual analysis while cutting costs.

Building Your First Pipeline

Installation and Setup

bash
pip install fenic

Configure model providers:

python
import fenic as fc
from pydantic import BaseModel

config = fc.SessionConfig(
    app_name="production_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = fc.Session.get_or_create(config)

Basic Pipeline Pattern

python
class FeedbackInsight(BaseModel):
    summary: str
    sentiment: Literal["positive", "negative", "neutral"]
    key_issues: List[str]
    priority: Literal["low", "medium", "high"]

df = session.read.csv("feedback.csv")

insights = (
    df
    .select(
        "*",
        fc.semantic.extract(
            fc.col("customer_feedback"),
            FeedbackInsight
        ).alias("insights")
    )
    .filter(fc.col("insights.priority") == "high")
    .with_column(
        "issues_embedding",
        fc.semantic.embed(fc.col("insights.key_issues").cast(fc.StringType))
    )
    .semantic.with_cluster_labels(
        by=fc.col("issues_embedding"),
        num_clusters=5,
        label_column="issue_cluster"
    )
    .group_by("issue_cluster")
    .agg(
		    fc.count("*").alias("issue_count"),
		    fc.semantic.reduce(
		        "Summarize common themes",
		        fc.col("customer_feedback")
    ).alias("cluster_theme")
)
)

results = insights.collect()

This pipeline extracts structured insights, filters high-priority items, clusters similar issues semantically, and generates summaries per cluster—all in declarative DataFrame operations.

Multi-Stage Processing

python
documents = (
    session
    .read.pdf_metadata("documents/**/*.pdf", recursive=True)
    .filter(fc.col("page_count").between(5, 100))
    .select(
        "*",
        fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
    )
    .cache()
)

extracted = (
    documents
    .select(
        "*",
        fc.semantic.extract(
            fc.col("content"),
            DocumentMetadataSchema
        ).alias("metadata")
    )
    .unnest("metadata")
)

enriched = (
    extracted
    .select(
        "*",
        fc.semantic.classify(
            fc.col("content"),
            examples=doc_type_examples
        ).alias("doc_type"),
        fc.semantic.embed(fc.col("content")).alias("content_embedding")
    )
)

relationships = enriched.semantic.join(
    other=enriched,
    predicate="Documents discuss related topics",
    left_on=fc.col("content"),
    right_on=fc.col("content")
)

Each stage builds on previous results. Caching expensive operations prevents redundant processing.

Best Practices for Scale

Define Schemas Once

Create reusable Pydantic models as single sources of truth:

python
class CustomerIntent(BaseModel):
    """Schema for customer intent extraction"""
    primary_intent: Literal["purchase", "support", "inquiry", "complaint"]
    urgency: Literal["immediate", "soon", "flexible"]
    entities: List[str]
    confidence: float = Field(ge=0, le=1)

pipeline_a = df.select(fc.semantic.extract(fc.col("message"), CustomerIntent))
pipeline_b = df.select(fc.semantic.extract(fc.col("email"), CustomerIntent))

This eliminates prompt brittleness and ensures consistent extraction.

Leverage Lazy Evaluation

Build complete pipelines before execution to enable global optimization:

python
pipeline = (
    df
    .filter(base_condition)
    .select("*", fc.semantic.extract(fc.col("text"), Schema).alias("extracted"))
    .join(reference_data, on="key")
    .with_column("category", fc.semantic.classify(fc.col("text"), classes=["A", "B"]))
    .filter(result_condition)
    .cache()
)

results = pipeline.collect()

The optimizer reorders operations, identifies batching opportunities, and eliminates redundant operations.

Configure Models Appropriately

Use the right model for each task:

python
language_models = {
    "nano": fc.OpenAILanguageModel(
        model_name="gpt-4o-mini",
        rpm=500,
        tpm=200000
    ),
    "standard": fc.AnthropicLanguageModel(
        model_name="claude-3-5-haiku-latest",
        rpm=100,
        input_tpm=100000,
        output_tpm=50000
    ),
    "power": fc.OpenAILanguageModel(
        model_name="gpt-4o",
        rpm=100,
        tpm=100000
    )
}

pipeline = (
    df
    .select(
        "*",
        fc.semantic.classify(
            fc.col("text"),
            examples=examples,
            model_alias="nano"
        ).alias("category"),
        fc.semantic.extract(
            fc.col("complex_doc"),
            DetailedSchema,
            model_alias="power"
        ).alias("extracted")
    )
)

The cost difference between models is often 10-100x. Strategic selection reduces costs by 80% while maintaining quality.

Implement Incremental Processing

For large datasets, process incrementally:

python
batch_size = 1000

for offset in range(0, total_rows, batch_size):
    batch = (
        df
        .limit(batch_size, offset)
        .select(fc.semantic.extract(...))
        .cache()
    )

    batch.write.parquet(f"output/batch_{offset}.parquet")

Write intermediate results to recover from failures without reprocessing.

Scaling from Local to Production

Local Development

Develop and test complete pipelines locally:

python
session = fc.Session.get_or_create(
    fc.SessionConfig(app_name="local_dev")
)

df = session.read.csv("sample_data.csv")

processed = (
    df
    .select(fc.semantic.extract(fc.col("text"), Schema))
    .cache()
)

processed.write.parquet("local_results.parquet")

The full engine runs locally with no external dependencies beyond model API access.

Production Deployment

Deploy the same code to cloud execution with zero changes:

python
session = fc.Session.get_or_create(
    fc.SessionConfig(
        app_name="production",
        cloud=fc.CloudConfig(
            size=fc.CloudExecutorSize.LARGE
        )
    )
)

df = session.read.csv("s3://production-data/**/*.csv")

processed = (
    df
    .select(fc.semantic.extract(fc.col("text"), Schema))
    .cache()
)

processed.write.parquet("s3://results/output.parquet")

Typedef Cloud handles serverless execution, automatic scaling, and infrastructure management.

Measuring Pipeline Performance

Cost and Performance Metrics

Track token usage, costs, and execution time:

python
result = pipeline.collect()

metrics = result.metrics
print(f"Total tokens: {metrics.total_lm_metrics.num_input_tokens + metrics.total_lm_metrics.num_output_tokens}")
print(f"Total cost: ${metrics.total_lm_metrics.cost:.4f}")
print(f"Execution time: {metrics.execution_time_ms / 1000:.2f}s")

for op_metrics in metrics.operator_metrics:
    print(f"{op_metrics.operator_name}: {op_metrics.execution_time_ms}ms")

Use these metrics to identify optimization opportunities and track costs.

Quality Metrics

Monitor extraction accuracy:

python
sample = pipeline.limit(100).collect()

validation_errors = sum(1 for row in sample.rows if not row.is_valid)
accuracy = (len(sample.rows) - validation_errors) / len(sample.rows)

print(f"Schema validation accuracy: {accuracy:.2%}")

Implement automated quality checks in CI/CD pipelines.

Advanced Patterns

Batch Preprocessing for Agents

Separate heavy lifting from real-time agents:

python
enriched_data = (
    raw_documents
    .with_column("raw_md", fc.col("content").cast(fc.MarkdownType))
    .with_column("chunks", fc.markdown.extract_header_chunks("raw_md", header_level=2))
    .explode("chunks")
    .with_column("embedding", fc.semantic.embed(fc.col("chunks").content))
    .with_column(
        "metadata",
        fc.semantic.extract("chunks", DocumentMetadata, model_alias="cheap")
    )
)

enriched_data.write.parquet("s3://my-bucket/enriched/")

Agents query enriched data without expensive inference at request time. This provides predictable responsiveness, better resource utilization, and cleaner separation between planning and execution.

Hybrid Fuzzy and Semantic Matching

The Fenic 0.4.0 release added built-in fuzzy string matching. Use fuzzy matching for initial candidate selection before expensive semantic joins:

python
candidates = (
    left_df.join(right_df)
    .with_column(
        "fuzzy_score",
        fc.text.compute_fuzzy_ratio(
            fc.col("company_name"),
            fc.col("business_name"),
            method="jaro_winkler"
        )
    )
    .filter(fc.col("fuzzy_score") > 80)
)

final = candidates.semantic.join(
    predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}",
    left_on=fc.col("company_description"),
    right_on=fc.col("business_description")
)

This hybrid approach reduces costs by orders of magnitude.

Declarative Tool Creation

Version 0.4.0 introduced declarative tool creation for agent function calling:

python
from fenic.core.mcp.types import ToolParam

session.catalog.create_tool(
    tool_name="search_documents",
    tool_description="Search through company documents",
    tool_query=df,
    tool_params=[
        ToolParam(
            name="search_term",
            description="The term to search for",
            default_value="default"
        ),
        ToolParam(
            name="limit",
            description="Max results to return",
            default_value=10
        )
    ],
    result_limit=50
)

Tools are type-safe, discoverable, and automatically consumable by MCP servers.

Integration with Existing Infrastructure

Lakehouse-Native Architecture

Fenic reads and writes standard formats without data movement:

python
df = session.read.parquet("s3://data-lake/raw/*.parquet")
processed = df.semantic.extract(...).filter(...)
processed.write.parquet("s3://data-lake/processed/")

Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration with existing infrastructure.

Hybrid Pipelines

Mix Fenic with existing workflows:

python
# Spark preprocessing
spark_df.write.parquet("s3://interim/")

# Fenic semantic enrichment
enriched = (
    session.read.parquet("s3://interim/")
    .semantic.extract(...)
    .semantic.classify(...)
)

enriched.write.parquet("s3://final/")

# Continue with downstream processing

Use the right tool for each step.

Why Inference-First Architecture Matters

Traditional data platforms treat LLM calls as external black-box UDFs that query optimizers cannot inspect. Fenic's inference-first approach embeds LLM operations directly into the query engine.

When the query optimizer sees semantic.extract() or semantic.join(), it knows this is inference with high latency, token costs, batching benefits, and caching opportunities. The optimizer can:

  • Reorder operations to minimize data processed by expensive inference
  • Batch requests across rows to amortize fixed costs
  • Cache aggressively since deterministic operations produce consistent outputs
  • Parallelize intelligently across providers
  • Estimate costs accurately before execution

This is impossible when LLM calls are hidden in UDFs or microservices. Semantic operators make inference visible to the optimizer, enabling optimizations that improve performance and reduce costs.

The declarative API provides auditability and reproducibility. Every operation is explicitly defined with inputs, prompts, and model configurations tracked automatically. Row-level lineage traces data flow through transformations.

Combined with native AI data types, automatic batch optimization, multi-provider support, and production-grade error handling, Fenic represents infrastructure purpose-built for AI workloads.

Getting Started

Install Fenic:

bash
pip install fenic

Set up your first pipeline:

python
import fenic as fc
from pydantic import BaseModel

class InsightSchema(BaseModel):
    summary: str
    key_points: list[str]
    sentiment: str

config = fc.SessionConfig(
    app_name="first_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "default": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = fc.Session.get_or_create(config)

df = session.read.csv("feedback.csv")

insights = (
    df
    .select("*", fc.semantic.extract(fc.col("feedback"), InsightSchema).alias("insights"))
    .with_column("key_points_embedding", fc.semantic.embed(fc.col("insights.key_points").cast(fc.StringType)))
    .semantic.with_cluster_labels(
        by=fc.col("key_points_embedding"),
        num_clusters=5,
        label_column="cluster_label"
    )
    .group_by("cluster_label")
    .agg(fc.semantic.reduce("Summarize cluster themes", fc.col("feedback")))
)

insights.show()

Additional Resources

Typedef platform overview - Learn about the complete AI data engine

Fenic GitHub repository - Explore the open-source framework

Fenic 0.5.0 release notes - Latest features including PDF parsing and date types

Fenic 0.4.0 release notes - Declarative tools and MCP integration

LangChain integration guide - Use Fenic with LangChain for agentic workflows

Semantic operator patterns - Advanced patterns for data transformation

RudderStack case study - How one company achieved 95% triage time reduction

Log clustering guide - Build agents for log analysis

Typedef with LangGraph - Orchestrate reliable agents

An inference-first architecture eliminates the brittle glue code that prevents AI systems from reaching production. By treating model inference as a native operation within the data engine, teams build deterministic workflows on top of non-deterministic models with production-grade reliability and scalability. How to Build Scalable AI Dat ... efcf080f2a243e07de3021c4e.md External Displaying How to Build Scalable AI Data Pipelines with an In 2a5df41efcf080f2a243e07de3021c4e.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.