Introduction
Building production-grade LLM applications requires robust data infrastructure. While frameworks like LangChain handle orchestration and agent logic, managing unstructured text data at scale presents unique challenges. Fenic addresses this gap with a PySpark-inspired DataFrame API designed specifically for AI workloads.
This guide demonstrates how to leverage Fenic as a data layer alongside LangChain, handling text preprocessing, embedding generation, and semantic operations at scale.
Why Fenic for Text Processing
Traditional data processing frameworks weren't built for LLM workflows. Fenic provides:
- Native text operations: Chunking, tokenization, and extraction built into the DataFrame API
- Semantic capabilities: Generate embeddings and perform LLM-based transformations directly on DataFrames
- Scalable processing: Handle large document collections with efficient batch operations
- Unified interface: Consistent API from data ingestion to semantic operations
Setting Up Fenic
Install Fenic and configure your session with the necessary model credentials:
pythonfrom fenic.api.session import Session, SessionConfig from fenic.api.session.config import ( SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel ) # Configure session with LLM and embedding models config = SessionConfig( app_name="langchain_data_layer", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Loading Unstructured Documents
Fenic provides specialized readers for various document formats:
Loading Markdown Documents
python# Load all markdown files from a directory docs_df = session.read.docs( "data/documents/**/*.md", content_type="markdown", recursive=True ) # Filter out any documents that failed to load docs_df = docs_df.filter(col("error").is_null())
Loading CSV Data
python# Load CSV files with automatic schema inference csv_df = session.read.csv("data/*.csv") # Or with explicit schema for better control from fenic.core.types.schema import Schema, ColumnField from fenic.core.types.datatypes import StringType, IntegerType csv_df = session.read.csv( "data/products.csv", schema=Schema([ ColumnField(name="product_id", data_type=IntegerType), ColumnField(name="description", data_type=StringType) ]) )
Loading PDF Metadata
python# Extract metadata from PDFs for filtering and organization pdf_metadata = session.read.pdf_metadata( "data/pdfs/**/*.pdf", recursive=True ) # Filter by page count or other metadata relevant_pdfs = pdf_metadata.filter(col("page_count") < 100)
Text Chunking Strategies
Proper chunking is critical for RAG applications. Fenic provides multiple chunking strategies:
Recursive Token Chunking
pythonfrom fenic.api.functions import text, col # Chunk documents preserving structure at natural boundaries chunked_df = docs_df.select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=500, chunk_overlap_percentage=10 ).alias("chunks") ) # Explode chunks into individual rows chunked_df = chunked_df.select( col("file_path"), col("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
The recursive approach attempts to split at paragraph breaks, sentence boundaries, and other natural divisions, maintaining context better than simple character splits.
Character-Based Chunking
python# For simpler, fixed-size chunks chunked_df = docs_df.select( col("file_path"), text.character_chunk( col("content"), chunk_size=1000, chunk_overlap_percentage=15 ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Word-Based Chunking
python# Chunk by word count with custom delimiters chunked_df = docs_df.select( col("file_path"), text.recursive_word_chunk( col("content"), chunk_size=200, chunk_overlap_percentage=10, chunking_character_set_custom_characters=['\n\n', '\n', '.', ' '] ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") )
Generating Embeddings
Once text is chunked, generate embeddings for vector search:
pythonfrom fenic.api.functions import semantic # Generate embeddings for all chunks embedded_df = chunked_df.select( col("file_path"), col("chunk_text"), semantic.embed(col("chunk_text")).alias("embedding") ) # Token counting for monitoring embedded_df = embedded_df.with_column( text.count_tokens(col("chunk_text")).alias("token_count") )
Semantic Similarity Search
Fenic enables semantic search operations directly in the DataFrame API:
pythonfrom fenic.api.functions import embedding import numpy as np # Generate query embedding (this would typically come from your application) query_text = "machine learning best practices" query_df = session.create_dataframe([{"query": query_text}]) query_embedding = query_df.select( semantic.embed(col("query")).alias("query_emb") ).collect()[0]["query_emb"] # Compute similarity scores results_df = embedded_df.select( col("file_path"), col("chunk_text"), embedding.compute_similarity( col("embedding"), query_embedding, metric="cosine" ).alias("similarity_score") ) # Get top matches top_results = results_df.order_by( col("similarity_score").desc() ).limit(10)
Text Extraction and Structured Data
Extract structured information from unstructured text:
pythonfrom pydantic import BaseModel, Field from typing import List # Define extraction schema class Entity(BaseModel): name: str = Field(description="Entity name") type: str = Field(description="Entity type: person, organization, or location") class ExtractedInfo(BaseModel): entities: List[Entity] = Field(description="Named entities found in text") summary: str = Field(description="Brief summary of the content") topics: List[str] = Field(description="Main topics discussed") # Extract structured data extracted_df = docs_df.select( col("file_path"), semantic.extract( col("content"), response_format=ExtractedInfo ).alias("extracted_data") ) # Access nested fields extracted_df = extracted_df.select( col("file_path"), col("extracted_data.summary").alias("summary"), col("extracted_data.entities").alias("entities"), col("extracted_data.topics").alias("topics") )
Text Classification
Classify documents into categories:
pythonfrom fenic.api.functions.semantic import ClassDefinition # Define classification categories categories = [ ClassDefinition( label="Technical Documentation", description="API docs, architecture guides, technical specifications" ), ClassDefinition( label="Business Content", description="Marketing materials, business plans, proposals" ), ClassDefinition( label="Support Material", description="FAQs, troubleshooting guides, user support content" ) ] # Classify documents classified_df = docs_df.select( col("file_path"), col("content"), semantic.classify( col("content"), classes=categories ).alias("category") )
Building a RAG Data Pipeline
Combine these operations into a complete pipeline:
pythonfrom fenic.api.functions import text, col, semantic, dt from fenic.api.functions.builtin import md5 # 1. Load documents raw_docs = session.read.docs("data/docs/**/*.md", content_type="markdown", recursive=True) # 2. Clean and prepare text cleaned_docs = raw_docs.select( col("file_path"), text.trim(col("content")).alias("content") ).filter(col("content") != "") # 3. Chunk documents chunked = cleaned_docs.select( col("file_path"), text.recursive_token_chunk( col("content"), chunk_size=400, chunk_overlap_percentage=10 ).alias("chunks") ).explode("chunks").select( col("file_path"), col("chunks").alias("chunk_text") ) # 4. Generate embeddings embedded = chunked.select( col("file_path"), col("chunk_text"), semantic.embed(col("chunk_text")).alias("embedding"), text.count_tokens(col("chunk_text")).alias("tokens") ) # 5. Add metadata final_df = embedded.with_columns( md5(col("chunk_text")).alias("chunk_id"), dt.current_timestamp().alias("processed_at") ) # 6. Export for use in LangChain or other frameworks final_df.write.parquet("output/processed_chunks.parquet")
Integration with LangChain
Export Fenic-processed data for LangChain consumption:
python# Export to formats LangChain can consume final_df.write.csv("output/chunks_with_embeddings.csv") # Or save as a table in Fenic's catalog for querying final_df.write.save_as_table("processed_documents", mode="overwrite") # Later, retrieve processed data processed_data = session.table("processed_documents") # Convert to pandas for LangChain integration pandas_df = processed_data.to_pandas() # Use in LangChain from langchain.schema import Document documents = [ Document( page_content=row["chunk_text"], metadata={ "file_path": row["file_path"], "chunk_id": row["chunk_id"], "tokens": row["tokens"] } ) for row in pandas_df.to_dict('records') ]
Advanced Text Operations
Template-Based Extraction
python# Extract structured data from formatted text log_df = session.create_dataframe([ {"log": "2024-01-15 ERROR Connection failed to server"}, {"log": "2024-01-15 INFO Request processed successfully"} ]) parsed_logs = log_df.select( text.extract( col("log"), template="${date} ${level} ${message}" ).alias("parsed") ) # Access parsed fields parsed_logs = parsed_logs.select( col("parsed.date").alias("date"), col("parsed.level").alias("level"), col("parsed.message").alias("message") )
Jinja Template Rendering
pythonfrom fenic.api.functions.builtin import lit # Generate prompts dynamically prompt_df = chunked_df.select( text.jinja( """Answer the following question based on the context: Context: {{ context }} Question: {{ question }} Provide a detailed answer based solely on the context provided.""", context=col("chunk_text"), question=fc.lit("What are the main concepts discussed?") ).alias("prompt") )
Semantic Join Operations
Join datasets based on semantic similarity:
python# Product descriptions from two sources products_a = session.create_dataframe([ {"id": 1, "desc": "wireless bluetooth headphones"}, {"id": 2, "desc": "laptop computer with touchscreen"} ]) products_b = session.create_dataframe([ {"id": 101, "name": "Premium Audio Headset"}, {"id": 102, "name": "Portable Touchscreen Notebook"} ]) # Semantic join to match similar products from fenic.api.functions import col matched = products_a.semantic.join( other=products_b, predicate="Does this product description match this product name? Description: {{left_on}} Name: {{right_on}}", left_on=col("desc"), right_on=col("name") )
Performance Optimization
Batch Processing
python# Process large datasets in batches large_docs = session.read.docs("data/large_corpus/**/*.md", recursive=True) # Process in chunks to manage memory batch_size = 1000 total_rows = large_docs.count() for offset in range(0, total_rows, batch_size): batch = large_docs.limit(batch_size).offset(offset) # Process batch processed_batch = batch.select( col("file_path"), semantic.embed(col("content")).alias("embedding") ) # Save batch results processed_batch.write.parquet( f"output/batch_{offset}.parquet", mode="overwrite" )
Monitoring Token Usage
pythonfrom fenic.api.functions.builtin import sum, count, avg # Track token consumption metrics_df = embedded_df.select( fc.sum(col("token_count")).alias("total_tokens"), fc.count("*").alias("total_chunks"), fc.avg(col("token_count")).alias("avg_tokens_per_chunk") ) # View metrics metrics_df.show() # Access session metrics for API calls session.stop() # Prints comprehensive session metrics
Exporting Data for Vector Databases
Prepare data for vector database ingestion:
pythonfrom fenic.api.functions.builtin import struct # Format for Pinecone, Weaviate, or other vector stores export_df = embedded_df.select( col("chunk_id"), col("chunk_text"), col("embedding"), fc.struct( col("file_path"), col("tokens") ).alias("metadata") ) # Export to JSON for vector DB ingestion export_df.write.parquet("output/vector_db_ready.parquet") # Or convert to dictionary format vector_records = export_df.collect() for record in vector_records: # Insert into vector database vector_db.upsert( id=record["chunk_id"], values=record["embedding"], metadata={ "text": record["chunk_text"], "file_path": record["metadata"]["file_path"], "tokens": record["metadata"]["tokens"] } )
Working with SQL
For advanced queries, use Fenic's SQL interface:
python# Complex filtering and aggregation with SQL results = session.sql(""" SELECT file_path, COUNT(*) as chunk_count, AVG(tokens) as avg_tokens, MAX(similarity_score) as max_similarity FROM {chunks} WHERE tokens BETWEEN 100 AND 500 GROUP BY file_path HAVING chunk_count > 5 ORDER BY max_similarity DESC """, chunks=results_df)
Best Practices
Chunk Size Selection
Choose chunk sizes based on your model's context window:
python# For models with 8K context window chunk_size = 400 # tokens overlap = 10 # percent # For models with 128K context window chunk_size = 2000 # tokens overlap = 5 # percent
Error Handling
pythonfrom fenic.api.functions.builtin import when, lit # Handle documents that fail to process processed_df = docs_df.with_column( fc.when(col("error").is_null(), True) .otherwise(False) .alias("is_valid") ) # Separate successful and failed processing success_df = processed_df.filter(col("is_valid")) failed_df = processed_df.filter(~col("is_valid")) # Log failures failed_df.select(col("file_path"), col("error")).show()
Incremental Processing
python# Save processing state processed_df.write.save_as_table("processed_documents", mode="append") # Load existing processed files existing = session.table("processed_documents") existing_paths = existing.select(col("file_path")).distinct().collect() processed_paths = {row["file_path"] for row in existing_paths} # Filter new documents new_docs = docs_df.filter( ~col("file_path").isin(list(processed_paths)) )
Conclusion
Fenic provides a production-ready data layer for LLM applications, handling the challenges of unstructured text processing at scale. By leveraging Fenic's DataFrame API alongside LangChain's orchestration capabilities, you can build robust RAG pipelines with:
- Efficient text chunking and tokenization
- Native embedding generation and semantic search
- Structured data extraction from unstructured text
- Scalable batch processing
- Comprehensive monitoring and metrics
The combination of Fenic's data processing capabilities with LangChain's agent framework enables you to build sophisticated AI applications without compromising on performance or maintainability.
For more information, visit the Fenic documentation and explore the Typedef platform.

