<< goback()

How to Process Embeddings and JSON as Native Data Types in Pipelines

Typedef Team

How to Process Embeddings and JSON as Native Data Types in Pipelines

Production AI pipelines require treating embeddings and JSON as first-class data types, not strings or serialized blobs. Fenic provides native EmbeddingType and JsonType support for type-safe operations without serialization overhead.

Native Data Types in Fenic

Most DataFrame frameworks treat embeddings as float arrays and JSON as strings. This creates problems:

  • Type information disappears during transformations
  • Operations require manual parsing and validation
  • Semantic operations need custom glue code
  • Schema inference fails for nested structures

Fenic solves this with specialized types that preserve semantics. An EmbeddingType column tracks dimensions and source model. A JsonType column supports structural queries without parsing.

EmbeddingType Fundamentals

EmbeddingType represents fixed-length embedding vectors with metadata.

python
from fenic.core.types.datatypes import EmbeddingType

# Define embedding type
embedding_type = EmbeddingType(
    dimensions=384,
    embedding_model="text-embedding-3-small"
)

The type tracks vector dimensions and generating model, making similarity computation and normalization type-safe.

JsonType Fundamentals

JsonType represents structured JSON data stored as strings but queryable with native operations.

python
from fenic.core.types.datatypes import JsonType

JsonType columns support:

  • JQ queries for extraction
  • Runtime type detection
  • Deep structural search
  • Type-preserving operations

Session Configuration

Configure your session with embedding and language models.

python
import fenic as fc

config = fc.SessionConfig(
    app_name="pipeline_demo",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt4": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "openai_embed": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100
            )
        },
        default_embedding_model="openai_embed"
    )
)

session = fc.Session.get_or_create(config)

Set your provider API key:

bash
export OPENAI_API_KEY=your_key_here

JSON Data Type Operations

Load JSON Files

Read JSON files directly as JsonType columns:

python
# Read JSON documents
df = session.read.docs(
    "data/**/*.json",
    content_type="json",
    recursive=True
)

# Schema: file_path (string), error (string), content (JsonType)
df.show()

Cast String Columns to JsonType

Convert string columns to JsonType for native operations:

python
from fenic.core.types.datatypes import JsonType

# Start with CSV containing JSON strings
raw_df = session.read.csv("logs.csv")

# Cast to JsonType
typed_df = raw_df.select(
    fc.col("id"),
    fc.col("payload").cast(JsonType).alias("json_data")
)

Query Nested Fields with JQ

Use JQ syntax to query JSON structures:

python
# Extract specific fields
result = df.select(
    fc.col("file_path"),
    fc.json.jq(fc.col("content"), ".event.type").alias("event_type"),
    fc.json.jq(fc.col("content"), ".user.id").alias("user_id"),
    fc.json.jq(fc.col("content"), ".metadata.timestamp").alias("timestamp")
)

result.show()

Query nested arrays:

python
# Extract array elements
tags = df.select(
    fc.json.jq(fc.col("content"), ".tags | map(.name)").alias("tag_names")
)

Detect and Filter JSON Types

Detect JSON types at runtime:

python
# Get JSON type for each value
types = df.select(
    fc.col("content"),
    fc.json.get_type(fc.col("content")).alias("json_type")
)

# Filter by type
arrays_only = df.filter(
    fc.json.get_type(fc.col("content")) == "array"
)

objects_only = df.filter(
    fc.json.get_type(fc.col("content")) == "object"
)

Search JSON Structures

Search recursively through JSON:

python
# Find objects containing specific structure
has_admin = df.filter(
    fc.json.contains(fc.col("content"), '{"role": "admin"}')
)

# Find exact array matches
has_permissions = df.filter(
    fc.json.contains(fc.col("content"), '["read", "write"]')
)

# Find primitive values
has_error_code = df.filter(
    fc.json.contains(fc.col("content"), '500')
)

The contains function uses type-aware matching:

  • Objects use partial matching
  • Arrays require exact matches
  • Primitives distinguish between 42 (number) and "42" (string)

Extract Template-Based Patterns

Extract structured data from text using templates:

python
# Parse log lines with mixed formats
logs = session.create_dataframe({
    "log": [
        "2024-01-15 ERROR Connection failed",
        'Name: "Smith, John", Price: 99.99, Tags: ["a", "b"]',
        'Title: "To Kill a Mockingbird", Author: Harper Lee'
    ]
})

# Extract with format rules
extracted = logs.select(
    fc.text.extract(
        fc.col("log"),
        "${date} ${level} ${message}"
    ).alias("parsed")
)

# Extract CSV and JSON fields
mixed = logs.select(
    fc.text.extract(
        fc.col("log"),
        'Name: ${name:csv}, Price: ${price}, Tags: ${tags:json}'
    ).alias("data")
)

Template syntax:

  • ${field} - Plain text extraction
  • ${field:csv} - CSV-aware parsing
  • ${field:json} - JSON with type preservation
  • ${field:quoted} - Quoted string extraction

Embedding Data Type Operations

Generate Embeddings

Create embeddings from text columns:

python
# Load text data
docs = session.read.csv("documents.csv")

# Generate embeddings
embedded = docs.select(
    fc.col("doc_id"),
    fc.col("text"),
    fc.semantic.embed(fc.col("text")).alias("embeddings")
)

# Result has EmbeddingType column with dimensions=384
embedded.show()

The embed function returns an EmbeddingType column tracking:

  • Vector dimensions
  • Source embedding model
  • Null handling

Compute Similarity Scores

Calculate similarity between embeddings and query vectors:

python
# Define query vector
query = [0.1, 0.2, 0.3, ...]  # 384 dimensions

# Compute cosine similarity
similar = embedded.select(
    fc.col("doc_id"),
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        query,
        metric="cosine"
    ).alias("similarity")
)

# Sort by relevance
ranked = similar.order_by(fc.col("similarity").desc())
ranked.show(10)

Available metrics:

  • cosine - Cosine similarity (range: -1 to 1)
  • dot - Dot product (raw inner product)
  • l2 - Euclidean distance (lower is more similar)

Compare Pairwise Embeddings

Compare embeddings between columns:

python
# Compare document pairs
pairs = session.create_dataframe({
    "doc1_id": [1, 2, 3],
    "doc1_text": ["first document", "second document", "third document"],
    "doc2_id": [4, 5, 6],
    "doc2_text": ["fourth document", "fifth document", "sixth document"]
})

# Embed both columns
embedded_pairs = pairs.select(
    fc.col("doc1_id"),
    fc.col("doc2_id"),
    fc.semantic.embed(fc.col("doc1_text")).alias("emb1"),
    fc.semantic.embed(fc.col("doc2_text")).alias("emb2")
)

# Compute pairwise similarity
similarity = embedded_pairs.select(
    fc.col("doc1_id"),
    fc.col("doc2_id"),
    fc.embedding.compute_similarity(
        fc.col("emb1"),
        fc.col("emb2"),
        metric="cosine"
    ).alias("similarity")
)

Normalize Embedding Vectors

Normalize vectors to unit length:

python
# Normalize embeddings
normalized = embedded.select(
    fc.col("doc_id"),
    fc.embedding.normalize(fc.col("embeddings")).alias("unit_embeddings")
)

# Dot product on normalized vectors equals cosine similarity
query_normalized = [0.6, 0.8, ...]  # Already normalized

results = normalized.select(
    fc.col("doc_id"),
    fc.embedding.compute_similarity(
        fc.col("unit_embeddings"),
        query_normalized,
        metric="dot"
    ).alias("cosine_sim")
)

Cluster Similar Items

Group similar items using K-means:

python
# Cluster documents
clustered = embedded.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=5,
    label_column="cluster_id",
    centroid_column="cluster_center"
)

# Analyze cluster distribution
cluster_stats = clustered.group_by("cluster_id").agg(
    fc.count("*").alias("count"),
    fc.avg("similarity").alias("avg_similarity")
)

cluster_stats.show()

Filter outliers using centroid distance:

python
# Remove rows far from cluster center
clean = clustered.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.col("cluster_center"),
        metric="cosine"
    ) > 0.7
)

Aggregate Embeddings

Compute average embeddings across groups:

python
# Group by category and average embeddings
category_embeddings = embedded.group_by("category").agg(
    fc.avg(fc.col("embeddings")).alias("avg_embedding")
)

# Result has EmbeddingType with preserved dimensions
category_embeddings.show()

Combined JSON and Embedding Pipelines

Process Structured Documents

Handle documents with metadata and content:

python
# Read JSON documents
docs = session.read.docs(
    "articles/**/*.json",
    content_type="json",
    recursive=True
)

# Extract metadata and content
parsed = docs.select(
    fc.json.jq(fc.col("content"), ".metadata.title").alias("title"),
    fc.json.jq(fc.col("content"), ".metadata.author").alias("author"),
    fc.json.jq(fc.col("content"), ".body").alias("text")
)

# Generate embeddings for text content
enriched = parsed.select(
    fc.col("title"),
    fc.col("author"),
    fc.col("text"),
    fc.semantic.embed(fc.col("text")).alias("text_embeddings")
)

Semantic Search with Metadata

Combine structural queries with semantic similarity:

python
# Filter by JSON structure, then rank by similarity
query_vector = [0.1, 0.2, ...]

results = enriched.filter(
    fc.json.contains(fc.col("metadata"), '{"status": "published"}')
).select(
    fc.col("title"),
    fc.col("author"),
    fc.embedding.compute_similarity(
        fc.col("text_embeddings"),
        query_vector,
        metric="cosine"
    ).alias("relevance")
).order_by(
    fc.col("relevance").desc()
).limit(10)

Cluster JSON Documents

Group documents by semantic similarity:

python
# Cluster with metadata
clustered_docs = enriched.semantic.with_cluster_labels(
    by=fc.col("text_embeddings"),
    num_clusters=10,
    label_column="topic_id"
)

# Extract cluster representatives
representatives = clustered_docs.group_by("topic_id").agg(
    fc.first(fc.col("title")).alias("example_title"),
    fc.count("*").alias("doc_count")
)

representatives.show()

Build Semantic Indexes

Create queryable indexes over structured content:

python
# Process PDFs and create searchable index
pdf_metadata = session.read.pdf_metadata(
    "reports/**/*.pdf",
    recursive=True
)

# Parse to markdown, then embed
indexed = pdf_metadata.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---"
    ).alias("markdown")
).select(
    fc.col("file_path"),
    fc.col("markdown"),
    fc.semantic.embed(fc.col("markdown")).alias("doc_embeddings")
)

# Save for later queries
indexed.write.parquet("indexed_documents.parquet")

Advanced Type Operations

Work with Nested Types

Combine embeddings with struct and array types:

python
from fenic.core.types.datatypes import StructType, StructField, ArrayType, StringType, FloatType, EmbeddingType

# Define nested schema
schema = StructType([
    StructField("metadata", StructType([
        StructField("tags", ArrayType(element_type=StringType)),
        StructField("score", FloatType)
    ])),
    StructField("embedding", EmbeddingType(384, "text-embedding-3-small"))
])

from fenic.core.types.datatypes import StructType, StructField, ArrayType, StringType, FloatType, EmbeddingType

# Create DataFrame with nested types
complex_df = session.create_dataframe({
    "doc": [
        {
            "metadata": {"tags": ["python", "ai"], "score": 0.95},
            "embedding": [0.1] * 384
        }
    ]
})

# Access nested fields
result = complex_df.select(
    fc.col("doc.metadata.tags").alias("tags"),
    fc.col("doc.embedding").alias("embedding")
)

Cast Between Compatible Types

Convert types safely:

python
from fenic.core.types.datatypes import JsonType, EmbeddingType

# Cast string to JsonType
json_df = raw_df.select(
    fc.col("json_string").cast(JsonType).alias("json_data")
)

# Cast array of floats to embedding
embedding_df = array_df.select(
    fc.col("vector").cast(
        EmbeddingType(384, "custom-model")
    ).alias("embedding")
)

Maintain Type Information

Preserve type through transformations:

python
# Embeddings preserve type through operations
processed = embedded.select(
    fc.col("doc_id"),
    fc.embedding.normalize(fc.col("embeddings")).alias("norm_emb")
)

# Result still has EmbeddingType with same dimensions and model
processed.schema

Production Best Practices

Deduplicate Before Embedding

Reduce costs by embedding unique content:

python
# Deduplicate text before embedding
unique_texts = docs.select("text").distinct()

# Generate embeddings once
embedded_unique = unique_texts.select(
    fc.col("text"),
    fc.semantic.embed(fc.col("text")).alias("embeddings")
)

# Join back to original data
result = docs.join(
    embedded_unique,
    on="text",
    how="left"
)

Batch JSON Operations

Process JSON in batches:

python
# Extract multiple fields in one pass
multi_extract = df.select(
    fc.json.jq(fc.col("data"), ".field1").alias("f1"),
    fc.json.jq(fc.col("data"), ".field2").alias("f2"),
    fc.json.jq(fc.col("data"), ".field3").alias("f3")
)

Validate JSON Structure

Validate at pipeline entry:

python
# Filter valid JSON objects
valid_docs = df.filter(
    fc.json.get_type(fc.col("content")) == "object"
)

# Log invalid entries
invalid = df.filter(
    fc.json.get_type(fc.col("content")).isin(["null", "string"])
)

invalid.select("file_path").show()

Store Embeddings Efficiently

Save embeddings for reuse:

python
# Write with native embedding type
embedded.write.parquet("embeddings.parquet")

# Read preserves type information
loaded = session.read.parquet("embeddings.parquet")

# Embeddings are immediately usable
results = loaded.select(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        query_vector,
        metric="cosine"
    )
)

Test Multiple Embedding Models

Compare different models:

python
# Configure multiple models
config = fc.SessionConfig(
    app_name="model_comparison",
    semantic=fc.SemanticConfig(
        embedding_models={
            "small": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100
            ),
            "large": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-large",
                rpm=100,
                tpm=100
            )
        },
        default_embedding_model="small"
    )
)

# Use different models
small_emb = docs.select(
    fc.semantic.embed(fc.col("text"), model_alias="small").alias("emb_small")
)

large_emb = docs.select(
    fc.semantic.embed(fc.col("text"), model_alias="large").alias("emb_large")
)

Complete Production Pipeline

Full pipeline combining JSON and embeddings:

python
import fenic as fc

# Configure session
session = fc.Session.get_or_create(
    fc.SessionConfig(
        app_name="document_pipeline",
        semantic=fc.SemanticConfig(
            language_models={
                "gpt4": fc.OpenAILanguageModel(
                    model_name="gpt-4o-mini",
                    rpm=100,
                    tpm=100
                )
            },
            default_language_model="gpt4",
            embedding_models={
                "openai": fc.OpenAIEmbeddingModel(
                    model_name="text-embedding-3-small",
                    rpm=100,
                    tpm=100
                )
            },
            default_embedding_model="openai"
        )
    )
)

# Load JSON documents
docs = session.read.docs(
    "data/**/*.json",
    content_type="json",
    recursive=True
)

# Extract structure
parsed = docs.select(
    fc.col("file_path"),
    fc.json.jq(fc.col("content"), ".title").alias("title"),
    fc.json.jq(fc.col("content"), ".body").alias("text"),
    fc.json.jq(fc.col("content"), ".metadata.tags").alias("tags")
).filter(
    fc.col("text").isNotNull()
)

# Generate embeddings
embedded = parsed.select(
    fc.col("file_path"),
    fc.col("title"),
    fc.col("text"),
    fc.col("tags"),
    fc.semantic.embed(fc.col("text")).alias("embeddings")
)

# Cluster documents
clustered = embedded.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=8,
    label_column="cluster_id"
)

# Find similar documents
query = [0.1] * 384
results = clustered.select(
    fc.col("file_path"),
    fc.col("title"),
    fc.col("cluster_id"),
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        query,
        metric="cosine"
    ).alias("similarity")
).order_by(
    fc.col("similarity").desc()
).limit(20)

# Save results
results.write.parquet("search_results.parquet")

results.show()

Key Implementation Points

Type-safe operations prevent serialization bugs and enable optimized execution. JSON operations query structure without parsing overhead. Embeddings maintain semantic meaning throughout transformations.

The advantage: explicit types that carry semantic information through your pipeline. Operations stay efficient because the framework optimizes based on type information.

For building reliable AI pipelines with semantic operators, native data types eliminate brittle glue code. The DataFrame semantic operations show performance gains from type-aware processing.

See the Fenic 0.5.0 release for date types and OpenRouter support. Check Fenic open source for architecture details.

For log processing examples, see building an LLM agent for log clustering. The RudderStack case study demonstrates production results.

Learn more at typedef.ai or explore additional resources.

the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.