LangChain handles orchestration and agent logic effectively, but managing typed, structured data at scale requires a different approach. Fenic provides a comprehensive type system designed specifically for AI workloads, enabling you to process unstructured text with the same rigor as structured databases.
This guide covers how to leverage Fenic's data types to build robust data pipelines that feed into LangChain agents and applications.
The Data Type Challenge in LLM Applications
LangChain agents work with raw text and dictionaries, but production applications need stronger guarantees. When processing documents, logs, or user inputs, you need to ensure data conforms to expected structures before it reaches your agents.
Traditional approaches force you to choose between flexibility and safety. String processing is flexible but error-prone. Pydantic models provide validation but don't integrate with batch processing. Fenic bridges this gap by providing a type system that works at DataFrame scale while maintaining type safety.
Fenic's Type System Architecture
Fenic's type system follows a hierarchical structure:
- Primitive types for basic values (strings, integers, floats, booleans, dates)
- Composite types for structured data (arrays, structs)
- Specialized AI types for domain-specific content (embeddings, markdown, transcripts)
All types inherit from the base DataType class and integrate seamlessly with DataFrame operations. This design enables type-safe transformations across millions of rows without sacrificing performance.
Primitive Data Types
StringType: Text Processing Foundation
StringType represents UTF-8 encoded strings and serves as the foundation for text operations.
pythonfrom fenic.api.session import Session from fenic.core.types.datatypes import StringType from fenic.core.types.schema import Schema, ColumnField import fenic.api.functions as fc session = Session.get_or_create() # Define schema with StringType schema = Schema([ ColumnField(name="user_id", data_type=StringType), ColumnField(name="message", data_type=StringType) ]) # Load CSV with explicit string types df = session.read.csv( "data/messages.csv", schema=schema ) # String operations maintain type safety cleaned = df.select( fc.col("user_id"), fc.text.trim(fc.col("message")).alias("clean_message") )
String types work with all text manipulation functions including trim, upper, lower, substring, and pattern matching operations.
Numeric Types: IntegerType, FloatType, DoubleType
Numeric types enable mathematical operations and statistical aggregations within your data pipelines.
pythonfrom fenic.core.types.datatypes import IntegerType, FloatType, DoubleType # Schema with numeric types numeric_schema = Schema([ ColumnField(name="product_id", data_type=IntegerType), ColumnField(name="price", data_type=FloatType), ColumnField(name="revenue", data_type=DoubleType) ]) sales_df = session.read.csv("data/sales.csv", schema=numeric_schema) # Type-safe numeric operations enriched = sales_df.select( fc.col("product_id"), fc.col("price"), (fc.col("price") * 1.1).alias("price_with_tax"), fc.col("revenue") )
IntegerType handles signed integers, FloatType represents 32-bit floating-point numbers, and DoubleType provides 64-bit precision for financial calculations.
BooleanType: Conditional Logic
BooleanType enables conditional filtering and boolean operations.
pythonfrom fenic.core.types.datatypes import BooleanType # Create boolean columns filtered_df = sales_df.select( fc.col("product_id"), (fc.col("price") > 100).alias("is_premium"), (fc.col("revenue") > 10000).alias("high_performer") ) # Filter using boolean columns premium_products = filtered_df.filter(fc.col("is_premium"))
Temporal Types: DateType and TimestampType
Temporal types handle time-based data with timezone awareness, critical for log analysis and time-series processing.
pythonfrom fenic.core.types.datatypes import DateType, TimestampType import fenic.api.functions.dt as dt # Parse timestamps from text logs_df = session.read.docs("data/logs/*.json", content_type="json") timestamp_df = logs_df.select( fc.col("event_id"), dt.to_timestamp( fc.col("timestamp_str"), "yyyy-MM-dd HH:mm:ss" ).alias("event_time") ) # Convert between timezones utc_to_local = timestamp_df.select( fc.col("event_id"), dt.to_utc_timestamp( fc.col("event_time"), "America/Los_Angeles" ).alias("local_time") )
Fenic's timestamp operations follow Spark semantics, enabling timezone conversions with to_utc_timestamp and from_utc_timestamp functions.
Composite Data Types
ArrayType: Homogeneous Collections
ArrayType represents variable-length arrays of elements with a single data type.
pythonfrom fenic.core.types.datatypes import ArrayType # Array of strings for tags array_schema = Schema([ ColumnField(name="doc_id", data_type=StringType), ColumnField(name="tags", data_type=ArrayType(StringType)) ]) # Create arrays through transformations chunked_df = docs_df.select( fc.col("file_path"), fc.text.recursive_token_chunk( fc.col("content"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") # Returns ArrayType(StringType) ) # Explode arrays into rows exploded = chunked_df.select( fc.col("file_path"), fc.col("chunks") ).explode("chunks")
Arrays enable batch operations on collections while maintaining type safety. You can nest arrays for multi-dimensional data structures.
StructType: Typed Records
StructType defines structured records with named fields, similar to database rows or JSON objects.
pythonfrom fenic.core.types.datatypes import StructType, StructField # Define nested structure address_type = StructType([ StructField("street", StringType), StructField("city", StringType), StructField("zipcode", StringType) ]) customer_schema = Schema([ ColumnField(name="customer_id", data_type=IntegerType), ColumnField(name="name", data_type=StringType), ColumnField(name="address", data_type=address_type) ]) # Access nested fields customers = session.read.csv("data/customers.csv", schema=customer_schema) city_df = customers.select( fc.col("customer_id"), fc.col("address.city").alias("city") )
Structs work seamlessly with Pydantic models for extraction operations, enabling schema-driven data processing.
Specialized AI-Native Data Types
MarkdownType: First-Class Document Processing
MarkdownType treats markdown as a structured data type with native parsing and extraction operations.
pythonfrom fenic.core.types.datatypes import MarkdownType # Load markdown documents docs = session.read.docs( "data/docs/**/*.md", content_type="markdown", recursive=True ) # Cast to MarkdownType for specialized operations markdown_df = docs.select( fc.col("file_path"), fc.col("content").cast(MarkdownType).alias("markdown_content") ) # Extract headers and structure structured = markdown_df.select( fc.col("file_path"), fc.markdown.extract_header_chunks( fc.col("markdown_content"), header_level=2 ).alias("sections") )
MarkdownType enables header-based chunking, table extraction, and structural analysis without custom parsing logic.
EmbeddingType: Vector Representations
EmbeddingType represents fixed-length embedding vectors with dimension and model tracking.
pythonfrom fenic.core.types.datatypes import EmbeddingType from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAIEmbeddingModel ) # Configure embedding model config = SessionConfig( app_name="embedding_pipeline", semantic=SemanticConfig( embedding_models={ "ada": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="ada" ) ) session = Session.get_or_create(config) # Generate embeddings with automatic type inference embedded = docs_df.select( fc.col("doc_id"), fc.col("content"), fc.semantic.embed(fc.col("content")).alias("vector") ) # Vector now has EmbeddingType(1536, embedding_model="text-embedding-3-small")
EmbeddingType integrates with similarity operations:
pythonfrom fenic.api.functions import embedding # Compute cosine similarity query_vector = embedded.filter( fc.col("doc_id") == "query_doc" ).select(fc.col("vector")).to_pylist()[0]["vector"] similar_docs = embedded.select( fc.col("doc_id"), fc.embedding.compute_similarity( fc.col("vector"), query_vector, metric="cosine" ).alias("similarity") ).order_by(fc.col("similarity").desc()).limit(10)
TranscriptType: Structured Speech Data
TranscriptType handles transcripts with speaker attribution and timestamps.
pythonfrom fenic.core.types.datatypes import TranscriptType # Load transcript data transcripts = session.read.docs( "data/calls/*.srt", content_type="transcript", recursive=True ) # Type as TranscriptType for specialized operations transcript_df = transcripts.select( fc.col("file_path"), fc.col("content").cast(TranscriptType).alias("transcript") )
TranscriptType understands SRT, WebVTT, and generic transcript formats, enabling speaker-aware processing.
JsonType and HtmlType: Structured Content
JsonType and HtmlType provide specialized handling for JSON and HTML content.
pythonfrom fenic.core.types.datatypes import JsonType, HtmlType # JSON processing with JQ expressions json_df = session.read.docs("data/config/*.json", content_type="json") extracted = json_df.select( fc.col("file_path"), fc.json.jq(fc.col("content"), ".metadata.author").alias("author"), fc.json.jq(fc.col("content"), ".settings").alias("settings") ) # HTML processing html_df = session.read.docs("data/pages/*.html", content_type="html") cleaned = html_df.select( fc.col("url"), fc.col("content").cast(HtmlType).alias("html_content") )
DocumentPathType: Path Management
DocumentPathType handles local file paths and remote URLs uniformly.
pythonfrom fenic.core.types.datatypes import DocumentPathType # Path operations work with both local and remote paths paths_df = session.create_dataframe([ {"path": "/data/docs/report.pdf"}, {"path": "https://example.com/data/file.pdf"}, {"path": "s3://bucket/documents/summary.pdf"} ]) # Cast to DocumentPathType for validation typed_paths = paths_df.select( fc.col("path").cast(DocumentPathType).alias("document_path") )
Schema-Driven Data Processing
Schemas provide compile-time validation and self-documenting pipelines.
pythonfrom fenic.core.types.schema import Schema, ColumnField # Define comprehensive schema ticket_schema = Schema([ ColumnField(name="ticket_id", data_type=IntegerType), ColumnField(name="subject", data_type=StringType), ColumnField(name="description", data_type=StringType), ColumnField(name="created_at", data_type=TimestampType), ColumnField(name="priority", data_type=IntegerType), ColumnField(name="tags", data_type=ArrayType(StringType)) ]) # Load with schema enforcement tickets = session.read.csv( "data/tickets.csv", schema=ticket_schema ) # Schema violations fail fast # Invalid data types, missing columns, or malformed rows generate clear errors
Schemas serve as contracts between data sources and processing logic, catching errors early in development.
Type-Safe Extraction with Pydantic Integration
Fenic's extraction operations convert unstructured text into typed structures using Pydantic models.
pythonfrom pydantic import BaseModel, Field from typing import List # Define extraction schema class CustomerFeedback(BaseModel): sentiment: str = Field(description="Overall sentiment: positive, negative, neutral") product_mentioned: str = Field(description="Product name if mentioned") issues: List[str] = Field(description="List of issues raised") action_items: List[str] = Field(description="Recommended actions") # Extract structured data feedback_df = session.read.docs("data/feedback/**/*.txt", content_type="text") structured_feedback = feedback_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=CustomerFeedback ).alias("feedback_data") ) # Access nested fields with type safety issues_df = structured_feedback.select( fc.col("file_path"), fc.col("feedback_data.sentiment").alias("sentiment"), fc.col("feedback_data.issues").alias("issues") )
The extracted fields maintain their types throughout the pipeline, enabling type-safe transformations.
Building Type-Safe Pipelines for LangChain Integration
Combine Fenic's types to create robust preprocessing pipelines.
pythonfrom pydantic import BaseModel from typing import List # Complete pipeline with multiple type transformations class Document(BaseModel): summary: str = Field(description="Brief document summary") entities: List[str] = Field(description="Named entities") topics: List[str] = Field(description="Main topics") # 1. Load documents raw_docs = session.read.docs("data/corpus/**/*.md", content_type="markdown") # 2. Type as markdown markdown_docs = raw_docs.select( fc.col("file_path"), fc.col("content").cast(MarkdownType).alias("markdown") ) # 3. Chunk with array type chunked = markdown_docs.select( fc.col("file_path"), fc.text.recursive_token_chunk( fc.col("markdown"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") # ArrayType(StringType) ) # 4. Explode to rows chunk_rows = chunked.explode("chunks").select( fc.col("file_path"), fc.col("chunks").alias("chunk_text") ) # 5. Generate embeddings embedded = chunk_rows.select( fc.col("file_path"), fc.col("chunk_text"), fc.semantic.embed(fc.col("chunk_text")).alias("embedding") # EmbeddingType ) # 6. Extract structured metadata metadata_df = raw_docs.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=Document ).alias("metadata") # StructType ) # 7. Join typed datasets final = embedded.join( metadata_df, on="file_path", how="left" )
Each transformation maintains type information, enabling IDE autocomplete and compile-time error checking.
Creating Tool Parameters with Type Safety
Tool parameters leverage Fenic's type system for runtime validation.
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types.datatypes import StringType, IntegerType, FloatType # Load and prepare data products = session.read.csv("data/products.csv") # Create parameterized query with typed parameters search_query = products.filter( fc.col("category").contains( fc.tool_param("category", StringType) ) & (fc.col("price") >= fc.tool_param("min_price", FloatType)) & (fc.col("price") <= fc.tool_param("max_price", FloatType)) ).select( fc.col("product_id"), fc.col("name"), fc.col("price"), fc.col("category") ) # Register tool with type-checked parameters session.catalog.create_tool( tool_name="search_products", tool_description="Search products by category and price range", tool_query=search_query, tool_params=[ ToolParam( name="category", description="Product category to search" ), ToolParam( name="min_price", description="Minimum price in USD", default_value=0.0, has_default=True ), ToolParam( name="max_price", description="Maximum price in USD", default_value=10000.0, has_default=True ) ], result_limit=50 )
The type system validates parameter values at runtime, preventing invalid tool calls.
Exporting Typed Data to LangChain
Export processed data in formats LangChain can consume while preserving type information.
python# Export to Parquet with full type preservation final.write.parquet("output/processed_docs.parquet") # Save as catalog table final.write.save_as_table("processed_documents", mode="overwrite") # Convert to pandas with PyArrow types pandas_df = final.to_pandas() # Create LangChain documents with typed metadata from langchain.schema import Document as LangChainDoc documents = [ LangChainDoc( page_content=row["chunk_text"], metadata={ "file_path": row["file_path"], "embedding": row["embedding"], # Type preserved "summary": row["metadata"]["summary"], "entities": row["metadata"]["entities"], "topics": row["metadata"]["topics"] } ) for row in pandas_df.to_dict('records') ]
Type information flows through to LangChain, enabling type-aware agent operations.
Handling Null Values and Optional Types
All Fenic types support null values by default, matching SQL semantics.
python# Create null values explicitly null_df = session.create_dataframe([ {"id": 1, "value": "present"}, {"id": 2, "value": None} ]) # Null handling operations filtered = null_df.filter(fc.col("value").is_not_null()) # Coalesce to provide defaults with_defaults = null_df.select( fc.col("id"), fc.coalesce(fc.col("value"), fc.lit("default")).alias("value") ) # Create empty values by type empty_array = fc.empty(ArrayType(StringType)) # Returns [] empty_struct = fc.empty(StructType([ StructField("name", StringType) ])) # Returns {name: None}
Explicit null handling prevents runtime errors in production pipelines.
Advanced Type Composition
Combine types to model complex data structures.
python# Nested structures nested_type = StructType([ StructField("user_id", IntegerType), StructField("metadata", StructType([ StructField("created_at", TimestampType), StructField("tags", ArrayType(StringType)), StructField("scores", ArrayType(FloatType)) ])), StructField("embedding", EmbeddingType(1536, embedding_model="text-embedding-3-small")) ]) # Arrays of structs events_type = ArrayType(StructType([ StructField("timestamp", TimestampType), StructField("event_type", StringType), StructField("payload", JsonType) ]))
Nested types enable modeling rich domain structures while maintaining type safety.
Type Casting and Conversion
Cast between types when pipeline requirements change.
python# String to numeric numeric_df = df.select( fc.col("id"), fc.col("price_str").cast(FloatType).alias("price") ) # Parse timestamps from strings timestamp_df = df.select( fc.col("id"), fc.dt.to_timestamp( fc.col("date_string"), "yyyy-MM-dd" ).alias("parsed_date") ) # Cast to specialized types specialized = df.select( fc.col("id"), fc.col("content").cast(MarkdownType).alias("markdown"), fc.col("config").cast(JsonType).alias("json_config") )
Type casts enable seamless integration with external data sources.
Performance Optimization with Types
Type information enables query optimization.
python# Explicit types allow pushdown optimizations typed_schema = Schema([ ColumnField(name="id", data_type=IntegerType), ColumnField(name="timestamp", data_type=TimestampType), ColumnField(name="value", data_type=DoubleType) ]) # Filters on typed columns use optimized predicates optimized = df.filter( (fc.col("timestamp") >= fc.lit("2024-01-01")) & (fc.col("value") > 100.0) ).select( fc.col("id"), fc.col("value") )
The query planner uses type information to select efficient execution strategies.
Error Handling with Types
Type mismatches generate clear, actionable errors.
python# Type mismatch caught at query planning try: invalid = df.select( fc.col("string_column") + fc.col("int_column") # Type error ) except TypeError as e: print(f"Type error: {e}") # Schema validation errors try: df = session.read.csv( "data/invalid.csv", schema=Schema([ ColumnField(name="id", data_type=IntegerType) ]) ) except ValueError as e: print(f"Schema error: {e}")
Early type checking prevents runtime failures in production.
Best Practices
Define schemas explicitly for production pipelines. Explicit schemas serve as documentation and catch errors early.
Use specialized types like MarkdownType and EmbeddingType instead of treating everything as strings. Specialized types unlock domain-specific operations.
Leverage type inference during exploration but add explicit types before deployment. Type inference accelerates prototyping while explicit types ensure production reliability.
Cast types at data boundaries when loading external data. This isolates type conversions and simplifies debugging.
Validate parameter types in tool definitions using ToolParam. Type-checked parameters prevent invalid agent calls.
Export with type preservation when interfacing with LangChain. Use Parquet or Arrow formats to maintain type information across system boundaries.
Handle nulls explicitly in critical paths. Null handling prevents unexpected failures when processing real-world data.
Integration Patterns
Fenic's type system integrates with LangChain through several patterns:
Preprocessing pattern: Use Fenic for batch data preprocessing with strong typing, then export to LangChain for agent consumption.
Tool pattern: Create MCP tools with typed parameters that LangChain agents can call with runtime validation.
Hybrid pattern: Use Fenic for data-intensive operations and LangChain for orchestration, passing typed data structures between systems.
The type system ensures data quality throughout the pipeline, reducing debugging time and improving reliability.
Next Steps
Start with primitive types for basic data processing, then incorporate composite and specialized types as requirements grow. The type system scales from simple scripts to production pipelines without architectural changes.
Explore Fenic's semantic operators for AI-native data transformations, and review the LangChain integration guide for complete implementation patterns.
The combination of Fenic's type system with LangChain's orchestration capabilities enables building production-grade AI applications with the reliability of traditional data systems. How to Use Fenic Data Types ... efcf08073a6b6d8397e45fdf1.md External Displaying How to Use Fenic Data Types to Extend LangChain Da 29cdf41efcf08073a6b6d8397e45fdf1.md.

