Production AI pipelines require treating embeddings and JSON as first-class data types, not strings or serialized blobs. Fenic provides native EmbeddingType and JsonType support for type-safe operations without serialization overhead.
Native Data Types in Fenic
Most DataFrame frameworks treat embeddings as float arrays and JSON as strings. This creates problems:
- Type information disappears during transformations
- Operations require manual parsing and validation
- Semantic operations need custom glue code
- Schema inference fails for nested structures
Fenic solves this with specialized types that preserve semantics. An EmbeddingType column tracks dimensions and source model. A JsonType column supports structural queries without parsing.
EmbeddingType Fundamentals
EmbeddingType represents fixed-length embedding vectors with metadata.
pythonfrom fenic.core.types.datatypes import EmbeddingType # Define embedding type embedding_type = EmbeddingType( dimensions=384, embedding_model="text-embedding-3-small" )
The type tracks vector dimensions and generating model, making similarity computation and normalization type-safe.
JsonType Fundamentals
JsonType represents structured JSON data stored as strings but queryable with native operations.
pythonfrom fenic.core.types.datatypes import JsonType
JsonType columns support:
- JQ queries for extraction
- Runtime type detection
- Deep structural search
- Type-preserving operations
Session Configuration
Configure your session with embedding and language models.
pythonimport fenic as fc config = fc.SessionConfig( app_name="pipeline_demo", semantic=fc.SemanticConfig( language_models={ "gpt4": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100 ) }, default_language_model="gpt4", embedding_models={ "openai_embed": fc.OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100 ) }, default_embedding_model="openai_embed" ) ) session = fc.Session.get_or_create(config)
Set your provider API key:
bashexport OPENAI_API_KEY=your_key_here
JSON Data Type Operations
Load JSON Files
Read JSON files directly as JsonType columns:
python# Read JSON documents df = session.read.docs( "data/**/*.json", content_type="json", recursive=True ) # Schema: file_path (string), error (string), content (JsonType) df.show()
Cast String Columns to JsonType
Convert string columns to JsonType for native operations:
pythonfrom fenic.core.types.datatypes import JsonType # Start with CSV containing JSON strings raw_df = session.read.csv("logs.csv") # Cast to JsonType typed_df = raw_df.select( fc.col("id"), fc.col("payload").cast(JsonType).alias("json_data") )
Query Nested Fields with JQ
Use JQ syntax to query JSON structures:
python# Extract specific fields result = df.select( fc.col("file_path"), fc.json.jq(fc.col("content"), ".event.type").alias("event_type"), fc.json.jq(fc.col("content"), ".user.id").alias("user_id"), fc.json.jq(fc.col("content"), ".metadata.timestamp").alias("timestamp") ) result.show()
Query nested arrays:
python# Extract array elements tags = df.select( fc.json.jq(fc.col("content"), ".tags | map(.name)").alias("tag_names") )
Detect and Filter JSON Types
Detect JSON types at runtime:
python# Get JSON type for each value types = df.select( fc.col("content"), fc.json.get_type(fc.col("content")).alias("json_type") ) # Filter by type arrays_only = df.filter( fc.json.get_type(fc.col("content")) == "array" ) objects_only = df.filter( fc.json.get_type(fc.col("content")) == "object" )
Search JSON Structures
Search recursively through JSON:
python# Find objects containing specific structure has_admin = df.filter( fc.json.contains(fc.col("content"), '{"role": "admin"}') ) # Find exact array matches has_permissions = df.filter( fc.json.contains(fc.col("content"), '["read", "write"]') ) # Find primitive values has_error_code = df.filter( fc.json.contains(fc.col("content"), '500') )
The contains function uses type-aware matching:
- Objects use partial matching
- Arrays require exact matches
- Primitives distinguish between
42(number) and"42"(string)
Extract Template-Based Patterns
Extract structured data from text using templates:
python# Parse log lines with mixed formats logs = session.create_dataframe({ "log": [ "2024-01-15 ERROR Connection failed", 'Name: "Smith, John", Price: 99.99, Tags: ["a", "b"]', 'Title: "To Kill a Mockingbird", Author: Harper Lee' ] }) # Extract with format rules extracted = logs.select( fc.text.extract( fc.col("log"), "${date} ${level} ${message}" ).alias("parsed") ) # Extract CSV and JSON fields mixed = logs.select( fc.text.extract( fc.col("log"), 'Name: ${name:csv}, Price: ${price}, Tags: ${tags:json}' ).alias("data") )
Template syntax:
${field}- Plain text extraction${field:csv}- CSV-aware parsing${field:json}- JSON with type preservation${field:quoted}- Quoted string extraction
Embedding Data Type Operations
Generate Embeddings
Create embeddings from text columns:
python# Load text data docs = session.read.csv("documents.csv") # Generate embeddings embedded = docs.select( fc.col("doc_id"), fc.col("text"), fc.semantic.embed(fc.col("text")).alias("embeddings") ) # Result has EmbeddingType column with dimensions=384 embedded.show()
The embed function returns an EmbeddingType column tracking:
- Vector dimensions
- Source embedding model
- Null handling
Compute Similarity Scores
Calculate similarity between embeddings and query vectors:
python# Define query vector query = [0.1, 0.2, 0.3, ...] # 384 dimensions # Compute cosine similarity similar = embedded.select( fc.col("doc_id"), fc.embedding.compute_similarity( fc.col("embeddings"), query, metric="cosine" ).alias("similarity") ) # Sort by relevance ranked = similar.order_by(fc.col("similarity").desc()) ranked.show(10)
Available metrics:
cosine- Cosine similarity (range: -1 to 1)dot- Dot product (raw inner product)l2- Euclidean distance (lower is more similar)
Compare Pairwise Embeddings
Compare embeddings between columns:
python# Compare document pairs pairs = session.create_dataframe({ "doc1_id": [1, 2, 3], "doc1_text": ["first document", "second document", "third document"], "doc2_id": [4, 5, 6], "doc2_text": ["fourth document", "fifth document", "sixth document"] }) # Embed both columns embedded_pairs = pairs.select( fc.col("doc1_id"), fc.col("doc2_id"), fc.semantic.embed(fc.col("doc1_text")).alias("emb1"), fc.semantic.embed(fc.col("doc2_text")).alias("emb2") ) # Compute pairwise similarity similarity = embedded_pairs.select( fc.col("doc1_id"), fc.col("doc2_id"), fc.embedding.compute_similarity( fc.col("emb1"), fc.col("emb2"), metric="cosine" ).alias("similarity") )
Normalize Embedding Vectors
Normalize vectors to unit length:
python# Normalize embeddings normalized = embedded.select( fc.col("doc_id"), fc.embedding.normalize(fc.col("embeddings")).alias("unit_embeddings") ) # Dot product on normalized vectors equals cosine similarity query_normalized = [0.6, 0.8, ...] # Already normalized results = normalized.select( fc.col("doc_id"), fc.embedding.compute_similarity( fc.col("unit_embeddings"), query_normalized, metric="dot" ).alias("cosine_sim") )
Cluster Similar Items
Group similar items using K-means:
python# Cluster documents clustered = embedded.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=5, label_column="cluster_id", centroid_column="cluster_center" ) # Analyze cluster distribution cluster_stats = clustered.group_by("cluster_id").agg( fc.count("*").alias("count"), fc.avg("similarity").alias("avg_similarity") ) cluster_stats.show()
Filter outliers using centroid distance:
python# Remove rows far from cluster center clean = clustered.filter( fc.embedding.compute_similarity( fc.col("embeddings"), fc.col("cluster_center"), metric="cosine" ) > 0.7 )
Aggregate Embeddings
Compute average embeddings across groups:
python# Group by category and average embeddings category_embeddings = embedded.group_by("category").agg( fc.avg(fc.col("embeddings")).alias("avg_embedding") ) # Result has EmbeddingType with preserved dimensions category_embeddings.show()
Combined JSON and Embedding Pipelines
Process Structured Documents
Handle documents with metadata and content:
python# Read JSON documents docs = session.read.docs( "articles/**/*.json", content_type="json", recursive=True ) # Extract metadata and content parsed = docs.select( fc.json.jq(fc.col("content"), ".metadata.title").alias("title"), fc.json.jq(fc.col("content"), ".metadata.author").alias("author"), fc.json.jq(fc.col("content"), ".body").alias("text") ) # Generate embeddings for text content enriched = parsed.select( fc.col("title"), fc.col("author"), fc.col("text"), fc.semantic.embed(fc.col("text")).alias("text_embeddings") )
Semantic Search with Metadata
Combine structural queries with semantic similarity:
python# Filter by JSON structure, then rank by similarity query_vector = [0.1, 0.2, ...] results = enriched.filter( fc.json.contains(fc.col("metadata"), '{"status": "published"}') ).select( fc.col("title"), fc.col("author"), fc.embedding.compute_similarity( fc.col("text_embeddings"), query_vector, metric="cosine" ).alias("relevance") ).order_by( fc.col("relevance").desc() ).limit(10)
Cluster JSON Documents
Group documents by semantic similarity:
python# Cluster with metadata clustered_docs = enriched.semantic.with_cluster_labels( by=fc.col("text_embeddings"), num_clusters=10, label_column="topic_id" ) # Extract cluster representatives representatives = clustered_docs.group_by("topic_id").agg( fc.first(fc.col("title")).alias("example_title"), fc.count("*").alias("doc_count") ) representatives.show()
Build Semantic Indexes
Create queryable indexes over structured content:
python# Process PDFs and create searchable index pdf_metadata = session.read.pdf_metadata( "reports/**/*.pdf", recursive=True ) # Parse to markdown, then embed indexed = pdf_metadata.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---" ).alias("markdown") ).select( fc.col("file_path"), fc.col("markdown"), fc.semantic.embed(fc.col("markdown")).alias("doc_embeddings") ) # Save for later queries indexed.write.parquet("indexed_documents.parquet")
Advanced Type Operations
Work with Nested Types
Combine embeddings with struct and array types:
pythonfrom fenic.core.types.datatypes import StructType, StructField, ArrayType, StringType, FloatType, EmbeddingType # Define nested schema schema = StructType([ StructField("metadata", StructType([ StructField("tags", ArrayType(element_type=StringType)), StructField("score", FloatType) ])), StructField("embedding", EmbeddingType(384, "text-embedding-3-small")) ]) from fenic.core.types.datatypes import StructType, StructField, ArrayType, StringType, FloatType, EmbeddingType # Create DataFrame with nested types complex_df = session.create_dataframe({ "doc": [ { "metadata": {"tags": ["python", "ai"], "score": 0.95}, "embedding": [0.1] * 384 } ] }) # Access nested fields result = complex_df.select( fc.col("doc.metadata.tags").alias("tags"), fc.col("doc.embedding").alias("embedding") )
Cast Between Compatible Types
Convert types safely:
pythonfrom fenic.core.types.datatypes import JsonType, EmbeddingType # Cast string to JsonType json_df = raw_df.select( fc.col("json_string").cast(JsonType).alias("json_data") ) # Cast array of floats to embedding embedding_df = array_df.select( fc.col("vector").cast( EmbeddingType(384, "custom-model") ).alias("embedding") )
Maintain Type Information
Preserve type through transformations:
python# Embeddings preserve type through operations processed = embedded.select( fc.col("doc_id"), fc.embedding.normalize(fc.col("embeddings")).alias("norm_emb") ) # Result still has EmbeddingType with same dimensions and model processed.schema
Production Best Practices
Deduplicate Before Embedding
Reduce costs by embedding unique content:
python# Deduplicate text before embedding unique_texts = docs.select("text").distinct() # Generate embeddings once embedded_unique = unique_texts.select( fc.col("text"), fc.semantic.embed(fc.col("text")).alias("embeddings") ) # Join back to original data result = docs.join( embedded_unique, on="text", how="left" )
Batch JSON Operations
Process JSON in batches:
python# Extract multiple fields in one pass multi_extract = df.select( fc.json.jq(fc.col("data"), ".field1").alias("f1"), fc.json.jq(fc.col("data"), ".field2").alias("f2"), fc.json.jq(fc.col("data"), ".field3").alias("f3") )
Validate JSON Structure
Validate at pipeline entry:
python# Filter valid JSON objects valid_docs = df.filter( fc.json.get_type(fc.col("content")) == "object" ) # Log invalid entries invalid = df.filter( fc.json.get_type(fc.col("content")).isin(["null", "string"]) ) invalid.select("file_path").show()
Store Embeddings Efficiently
Save embeddings for reuse:
python# Write with native embedding type embedded.write.parquet("embeddings.parquet") # Read preserves type information loaded = session.read.parquet("embeddings.parquet") # Embeddings are immediately usable results = loaded.select( fc.embedding.compute_similarity( fc.col("embeddings"), query_vector, metric="cosine" ) )
Test Multiple Embedding Models
Compare different models:
python# Configure multiple models config = fc.SessionConfig( app_name="model_comparison", semantic=fc.SemanticConfig( embedding_models={ "small": fc.OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100 ), "large": fc.OpenAIEmbeddingModel( model_name="text-embedding-3-large", rpm=100, tpm=100 ) }, default_embedding_model="small" ) ) # Use different models small_emb = docs.select( fc.semantic.embed(fc.col("text"), model_alias="small").alias("emb_small") ) large_emb = docs.select( fc.semantic.embed(fc.col("text"), model_alias="large").alias("emb_large") )
Complete Production Pipeline
Full pipeline combining JSON and embeddings:
pythonimport fenic as fc # Configure session session = fc.Session.get_or_create( fc.SessionConfig( app_name="document_pipeline", semantic=fc.SemanticConfig( language_models={ "gpt4": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100 ) }, default_language_model="gpt4", embedding_models={ "openai": fc.OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100 ) }, default_embedding_model="openai" ) ) ) # Load JSON documents docs = session.read.docs( "data/**/*.json", content_type="json", recursive=True ) # Extract structure parsed = docs.select( fc.col("file_path"), fc.json.jq(fc.col("content"), ".title").alias("title"), fc.json.jq(fc.col("content"), ".body").alias("text"), fc.json.jq(fc.col("content"), ".metadata.tags").alias("tags") ).filter( fc.col("text").isNotNull() ) # Generate embeddings embedded = parsed.select( fc.col("file_path"), fc.col("title"), fc.col("text"), fc.col("tags"), fc.semantic.embed(fc.col("text")).alias("embeddings") ) # Cluster documents clustered = embedded.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=8, label_column="cluster_id" ) # Find similar documents query = [0.1] * 384 results = clustered.select( fc.col("file_path"), fc.col("title"), fc.col("cluster_id"), fc.embedding.compute_similarity( fc.col("embeddings"), query, metric="cosine" ).alias("similarity") ).order_by( fc.col("similarity").desc() ).limit(20) # Save results results.write.parquet("search_results.parquet") results.show()
Key Implementation Points
Type-safe operations prevent serialization bugs and enable optimized execution. JSON operations query structure without parsing overhead. Embeddings maintain semantic meaning throughout transformations.
The advantage: explicit types that carry semantic information through your pipeline. Operations stay efficient because the framework optimizes based on type information.
For building reliable AI pipelines with semantic operators, native data types eliminate brittle glue code. The DataFrame semantic operations show performance gains from type-aware processing.
See the Fenic 0.5.0 release for date types and OpenRouter support. Check Fenic open source for architecture details.
For log processing examples, see building an LLM agent for log clustering. The RudderStack case study demonstrates production results.
Learn more at typedef.ai or explore additional resources.
