Document processing pipelines require extracting structured data from PDFs, Word files, HTML, and other formats. Most teams waste months building brittle glue code to parse documents, validate outputs, and link related information across files.
Typedef provides an inference-first data engine where semantic operations work as native DataFrame primitives. This guide shows how to build production document pipelines using typed schemas, vector embeddings, and semantic joins.
Why Traditional Document Pipelines Fail
Current document processing approaches create technical debt:
- Custom parsing scripts for each file type
- Manual validation of extracted data
- Scattered LLM API calls across microservices
- No semantic relationship detection
- Rate limiting failures at scale
Typedef's architecture treats inference as a first-class operation within the query engine, enabling automatic optimization, batching, and cost management.
Installing Fenic for Document Processing
Fenic requires Python 3.10, 3.11, or 3.12:
bashpip install fenic
Configure API credentials:
bashexport OPENAI_API_KEY="your-key" export ANTHROPIC_API_KEY="your-key" export GOOGLE_API_KEY="your-key"
Initialize a session:
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import List, Literal config = fc.SessionConfig( app_name="document_pipeline", semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( "gpt-4o-mini", rpm=500, tpm=200_000 ), "accurate": fc.AnthropicLanguageModel( "claude-3-5-haiku-latest", rpm=100, input_tpm=100_000, output_tpm=50_000 ), }, default_language_model="fast", ), ) session = fc.Session.get_or_create(config)
Native Document Types in Fenic
Fenic provides specialized data types optimized for document processing:
- MarkdownType - Parse and extract structure from markdown files
- TranscriptType - Process SRT, WebVTT formats with speaker awareness
- JsonType - Manipulate nested JSON with JQ expressions
- HtmlType - Handle raw HTML markup
- DocumentPathType - Load PDFs and text files
These types enable native operations without preprocessing steps.
Parsing PDFs with Typed Output
Fenic 0.5.0 introduced native PDF parsing with page chunking:
python# Load PDF metadata pdfs = session.read.pdf_metadata("contracts/**/*.pdf", recursive=True) # Parse to markdown with page separators parsed = pdfs.select( fc.col("file_path"), fc.col("page_count"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True, ).alias("markdown_content") ) parsed.show()
The read.pdf_metadata function extracts size, page count, author, creation dates, and other metadata before parsing. This enables filtering documents before expensive parsing operations.
Schema-Driven Extraction from Documents
Define Pydantic schemas for type-safe extraction:
pythonclass ContractClause(BaseModel): clause_type: Literal["payment", "termination", "liability", "confidentiality"] parties: List[str] = Field(description="Legal entities bound by this clause") obligations: List[str] = Field(description="Specific requirements or duties") effective_date: str = Field(description="Date format YYYY-MM-DD") amount: float | None = Field(description="Dollar amounts if mentioned") class InvoiceData(BaseModel): invoice_number: str vendor_name: str total_amount: float = Field(ge=0) line_items: List[str] due_date: str
Apply extraction to parsed documents:
python# Extract structured data from markdown extracted = ( parsed .select( "*", fc.semantic.extract( fc.col("markdown_content"), ContractClause, model_alias="accurate" ).alias("clause_data") ) .unnest("clause_data") ) # Filter high-value clauses payment_terms = extracted.filter( fc.col("clause_type").isin(["payment", "liability"]) ) payment_terms.show()
The semantic.extract operator validates outputs against the schema automatically, preventing invalid data from entering downstream systems.
Processing Multiple Document Types
Load entire directories into DataFrames:
pythonfrom fenic.core.types import MarkdownType, JsonType # Load markdown documents md_docs = session.read.docs( "/data/documentation/", content_type="markdown", recursive=True ) # Extract structured sections sections = ( md_docs .with_column( "chunks", fc.markdown.extract_header_chunks( fc.col("content"), header_level=2 ) ) .explode("chunks") ) sections.show()
The extract_header_chunks function leverages document structure for semantically meaningful chunks instead of naive character splitting.
Vector Embeddings for Semantic Search
Generate embeddings as a native column operation:
python# Create embeddings for document chunks embedded = ( sections .with_column( "chunk_embedding", fc.semantic.embed(fc.col("chunks").content) ) ) # Cluster similar content clustered = ( embedded .semantic.with_cluster_labels( by=fc.col("chunk_embedding"), num_clusters=8, label_column="cluster_id" ) ) # Summarize each cluster cluster_themes = ( clustered .group_by("cluster_id") .agg( fc.semantic.reduce( "Identify the common theme across these sections", fc.col("chunks").content, model_alias="fast" ).alias("theme") ) ) cluster_themes.show()
EmbeddingType is a first-class data type enabling vector operations without external database dependencies during processing.
Semantic Joins Across Documents
Link related content across documents using meaning-based matching:
python# Load two document sets contracts_df = session.read.pdf_metadata("contracts/**/*.pdf") policies_df = session.read.pdf_metadata("policies/**/*.pdf") # Parse both contracts_parsed = contracts_df.select( "*", fc.semantic.parse_pdf(fc.col("file_path")).alias("contract_text") ) policies_parsed = policies_df.select( "*", fc.semantic.parse_pdf(fc.col("file_path")).alias("policy_text") ) # Define semantic join predicate join_predicate = """ Does the contract clause address this policy requirement? Contract: {{left_on}} Policy: {{right_on}} Consider: - Does the clause satisfy the policy intent? - Are required controls present? - Is language sufficiently specific? """ # Perform semantic join matched = ( contracts_parsed .semantic.join( other=policies_parsed, predicate=join_predicate, left_on=fc.col("contract_text"), right_on=fc.col("policy_text"), model_alias="accurate" ) ) matched.select( "file_path", "policy_text", "contract_text" ).show()
Semantic joins enable sophisticated matching logic beyond simple text similarity.
Complete Document Processing Pipeline
Combine operations into production workflows:
pythonclass DocumentMetadata(BaseModel): doc_type: Literal["contract", "invoice", "report", "memo"] primary_entities: List[str] key_dates: List[str] financial_terms: List[str] action_items: List[str] class ContractTerm(BaseModel): term_type: Literal["payment", "delivery", "warranty", "indemnity"] description: str parties_affected: List[str] start_date: str end_date: str def process_document_batch(input_path: str, output_path: str): """ End-to-end document processing pipeline """ # Load PDF metadata docs = session.read.pdf_metadata(input_path, recursive=True) # Parse PDFs to markdown parsed = docs.select( "*", fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---" ).alias("content") ) # Extract document-level metadata with_metadata = ( parsed .select( "*", fc.semantic.extract( fc.col("content"), DocumentMetadata, model_alias="fast" ).alias("metadata") ) .unnest("metadata") ) # Extract contract terms with_terms = ( with_metadata .filter(fc.col("doc_type") == "contract") .select( "*", fc.semantic.extract( fc.col("content"), ContractTerm, model_alias="accurate" ).alias("terms") ) .unnest("terms") ) # Generate embeddings with_embeddings = ( with_terms .with_column( "term_embedding", fc.semantic.embed(fc.col("description")) ) ) # Classify priority classified = ( with_embeddings .with_column( "priority", fc.semantic.classify( fc.col("description"), classes=["critical", "high", "medium", "low"], model_alias="fast" ) ) ) # Write results and capture metrics write_metrics = classified.write.parquet(output_path) return { "documents_processed": docs.count(), "terms_extracted": classified.count(), "total_cost_usd": write_metrics.total_lm_metrics.cost, "execution_time_sec": write_metrics.execution_time_ms / 1000 } # Execute pipeline stats = process_document_batch( input_path="data/contracts/**/*.pdf", output_path="s3://bucket/processed/" ) print(f"Pipeline metrics: {stats}")
This pattern demonstrates inference-first architecture where semantic operations compose with traditional DataFrame transformations.
Processing Transcripts with Speaker Awareness
Handle meeting transcripts and call recordings:
pythonfrom pathlib import Path class MeetingSegment(BaseModel): speaker: str start_time: float end_time: float key_points: List[str] action_items: List[str] # Load transcript transcript_data = Path("meetings/standup.json").read_text() df = session.create_dataframe({"transcript": [transcript_data]}) # Chunk with speaker awareness processed = ( df .select( "*", fc.text.recursive_token_chunk( "transcript", chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks") ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract( "chunk", MeetingSegment, model_alias="fast" ).alias("segment") ) .unnest("segment") ) # Aggregate by speaker speaker_summary = ( processed .group_by(fc.col("speaker")) .agg( fc.semantic.reduce( "Summarize this speaker's contributions", fc.col("key_points").cast(fc.StringType), model_alias="fast" ).alias("summary") ) ) speaker_summary.show()
TranscriptType preserves speaker identity and timestamps through transformations.
Optimizing Pipeline Performance
Model Selection Strategy
Use cost-efficient models for high-volume tasks:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel("gpt-4o-nano", rpm=500), "mini": fc.OpenAILanguageModel("gpt-4o-mini", rpm=500), "full": fc.AnthropicLanguageModel("claude-opus-4", rpm=100), } ) ) # Classification with nano model .semantic.classify( fc.col("text"), classes=["urgent", "routine"], model_alias="nano" ) # Extraction with mini model .semantic.extract( fc.col("text"), StandardSchema, model_alias="mini" ) # Critical reasoning with full model .semantic.extract( fc.col("text"), ComplexSchema, model_alias="full" )
Cost differences between models range from 10-100x, making strategic selection essential.
Explicit Caching
Cache expensive operations to prevent redundant processing:
python# Execute once and materialize to avoid re-execution result = df.select( "*", fc.semantic.extract( fc.col("text"), Schema, model_alias="accurate" ).alias("extracted") ).unnest("extracted").collect() # Convert back to DataFrame for further operations # (work with result.data in your preferred format) extracted = session.create_dataframe(result.data) # Multiple filters use the materialized results critical = extracted.filter(fc.col("priority") == "critical").collect() high = extracted.filter(fc.col("priority") == "high").collect()
Batch Processing at Scale
Process directories efficiently:
python# Load all documents all_docs = session.read.pdf_metadata( "archive/**/*.pdf", recursive=True ) # Filter before parsing recent_docs = all_docs.filter( fc.col("modified_date") > "2024-01-01" ) # Parse and process processed = ( recent_docs .select( "*", fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) .semantic.extract(fc.col("content"), Schema) .write.parquet("s3://output/") )
The framework handles rate limiting and retries automatically.
Building Knowledge Graphs from Documents
Extract entities and relationships:
pythonclass Entity(BaseModel): entity_type: Literal["person", "organization", "location", "date"] entity_name: str context: str class Relationship(BaseModel): source_entity: str target_entity: str relationship_type: Literal["employed_by", "located_in", "reports_to"] confidence: Literal["high", "medium", "low"] # Extract entities entities = ( parsed_docs .select( "*", fc.semantic.extract( fc.col("content"), Entity, model_alias="fast" ).alias("entity") ) .unnest("entity") .with_column( "entity_embedding", fc.semantic.embed( fc.col("entity_name") + " " + fc.col("context") ) ) ) # Extract relationships relationships = ( parsed_docs .select( "*", fc.semantic.extract( fc.col("content"), Relationship, model_alias="accurate" ).alias("rel") ) .unnest("rel") ) # Link entities across documents entity_links = ( entities .alias("e1") .semantic.join( other=entities.alias("e2"), predicate=""" Do these refer to the same real-world entity? Entity 1: {{left_on}} Entity 2: {{right_on}} """, left_on=fc.col("entity_name"), right_on=fc.col("entity_name"), model_alias="accurate" ) .filter(fc.col("e1.file_path") != fc.col("e2.file_path")) ) # Write knowledge graph entities.write.parquet("kg/entities/") relationships.write.parquet("kg/relationships/") entity_links.write.parquet("kg/links/")
Monitoring Pipeline Metrics
Track costs and performance:
pythonresult = pipeline.collect() metrics = result.metrics print(f"Total tokens: {metrics.total_lm_metrics.num_output_tokens}") print(f"Total cost: ${metrics.total_lm_metrics.cost:.2f}") print(f"Execution time: {metrics.execution_time_ms/1000:.2f}s") # Per-model breakdown for model_name, model_metrics in metrics.lm_metrics.items(): print(f"\n{model_name}:") print(f" Requests: {model_metrics.num_requests}") print(f" Input tokens: {model_metrics.num_input_tokens}") print(f" Output tokens: {model_metrics.num_output_tokens}") print(f" Cost: ${model_metrics.cost:.2f}")
Built-in metrics and observability provide visibility into resource usage.
Advanced Filtering with Semantic Predicates
Apply natural language filters to documents:
python# Load and parse documents docs = session.read.pdf_metadata("legal/**/*.pdf") parsed = docs.select( "*", fc.semantic.parse_pdf(fc.col("file_path")).alias("content") ) # Filter with semantic predicate relevant = parsed.filter( fc.semantic.predicate( """ Does this document contain information about: - Intellectual property rights - Licensing terms - Technology transfer Content: {{content}} """, content=fc.col("content") ) ) relevant.show()
Semantic predicates enable content-based filtering without regex patterns.
Integrating with Data Infrastructure
Write to Lakehouses
Export to standard formats:
python# Write to Parquet processed.write.parquet("s3://bucket/output/") # Write to table in catalog (for persistent storage) processed.write.save_as_table("processed_documents", mode="append") # Write to CSV (alternative format) processed.write.csv("s3://bucket/output.csv")
Lakehouse-native architecture enables seamless integration without data movement.
Vector Database Export
Prepare data for vector search:
python# Generate embeddings with metadata vector_data = ( processed .select( fc.col("document_id").alias("id"), fc.col("content").alias("text"), fc.col("doc_type").alias("category"), fc.col("file_path").alias("source"), fc.semantic.embed(fc.col("content")).alias("embedding") ) ) # Write for vector DB ingestion vector_data.write.parquet("vector_store_input/")
Production Best Practices
Schema Design Guidelines
Create clear, specific schemas:
pythonclass PurchaseOrder(BaseModel): po_number: str = Field(description="Unique PO identifier") vendor_name: str = Field(description="Legal vendor name") order_date: str = Field(description="Date format YYYY-MM-DD") line_items: List[str] = Field(description="Items being purchased") total_amount: float = Field(ge=0, description="Total in USD") approval_status: Literal["pending", "approved", "rejected"]
Field descriptions and constraints improve extraction accuracy.
Error Handling
Implement validation checks:
python# Check for extraction failures BEFORE collecting failed = pipeline.filter(fc.col("extracted_data").is_null()) if failed.count() > 0: print(f"Warning: {failed.count()} documents failed extraction") failed.select("file_path").show()
Cost Estimation
Test on samples before full runs:
python# Process sample sample = df.limit(50).semantic.extract(...).collect() cost_per_doc = sample.metrics.total_lm_metrics.cost / 50 total_docs = df.count() estimated_cost = cost_per_doc * total_docs print(f"Estimated cost for {total_docs} docs: ${estimated_cost:.2f}")
Scaling to Cloud Execution
Deploy with Typedef Cloud for automatic scaling:
pythonconfig = fc.SessionConfig( app_name="production_docs", semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel("gpt-4o-mini", rpm=500) } ), cloud=fc.CloudConfig( size=fc.CloudExecutorSize.LARGE ) ) session = fc.Session.get_or_create(config) # Same code scales automatically df = session.read.pdf_metadata("s3://inputs/**/*.pdf") processed = df.semantic.parse_pdf(...).write.parquet("s3://outputs/")
Zero code changes enable local-to-cloud deployment.
Key Takeaways
Typedef provides native capabilities for document ingestion pipelines:
- Native PDF parsing with page chunking and metadata extraction
- Typed schemas eliminate brittle prompt engineering
- Semantic joins link related content across documents
- Vector operations as first-class DataFrame operations
- Automatic optimization handles batching, caching, rate limiting
- Built-in observability tracks costs and performance
- Lakehouse integration works with existing infrastructure
Teams building document processing systems report 100x time savings and dramatic cost reductions compared to custom microservices.
Start building document pipelines with Fenic on GitHub, read the open source announcement, or explore semantic operators for data transformation. How to Supercharge Unstruct ... fcf08013a051c820742cb4b1.md External Displaying How to Supercharge Unstructured io Ingestion with 295df41efcf08013a051c820742cb4b1.md.

