Processing large volumes of unstructured data with LLMs creates a critical challenge: token waste. Every redundant API call, duplicate embedding, or inefficient prompt inflates costs and slows throughput. For teams running batch operations at scale, these inefficiencies compound quickly.
This guide shows practical strategies for minimizing token waste using Fenic, a DataFrame framework built for AI workloads. You'll learn how to structure pipelines that eliminate redundant processing, optimize inference operations, and maintain predictable costs.
Token Waste Patterns in Batch Processing
Token waste occurs when pipelines process identical content multiple times, send redundant data to model APIs, or fail to batch operations efficiently. Common patterns:
- Embedding duplicate text across different records
- Re-summarizing identical content in separate pipeline runs
- Processing entire documents when only specific sections changed
- Making individual API calls instead of batching requests
- Sending verbose context when concise fingerprints suffice
Cost impact scales linearly with data volume, making optimization essential for production workloads.
Deduplicate Before Processing
The most effective token reduction happens before any model call: deduplicate input data. When processing logs, support tickets, or user feedback, identical or near-identical content appears multiple times.
Fingerprinting for Stable Deduplication
Create stable fingerprints that capture semantic meaning while ignoring volatile details. From the log clustering guide:
python# Extract stable components df = df.with_column( "fingerprint", text.concat( col("service"), lit("|"), col("error_symbol"), lit("|"), col("normalized_message") ) ) # Deduplicate on fingerprint before embedding unique_df = df.group_by("fingerprint").agg( first("raw_message").alias("representative"), count("*").alias("occurrence_count") ) # Embed only unique fingerprints embedded = unique_df.with_column( "embedding", semantic.embed(col("representative")) )
This reduces embedding costs by processing each distinct issue once.
Token-Level Deduplication
Use Fenic's count_tokens function to identify high-token content:
python# Count tokens before processing df = df.with_column( "token_count", text.count_tokens(col("content")) ) # Separate high-token items for specialized handling standard_items = df.filter(col("token_count") < 1000) long_items = df.filter(col("token_count") >= 1000) # Process long items with chunking chunked = long_items.with_column( "chunks", text.recursive_token_chunk(col("content"), 500, 10) )
Batch Operations Aggressively
Individual API calls waste tokens through repeated connection overhead and prevent rate limit optimization. Fenic automatically batches semantic operations.
Automatic Batching Configuration
Configure models with appropriate rate limits to enable intelligent batching:
pythonconfig = SessionConfig( app_name="batch_processor", semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=1000, tpm=1_000_000 ) } ) )
Fenic respects these limits while maximizing throughput through concurrent request batching and self-throttling, detailed in semantic operators.
Group Operations by Cost Profile
Batch similar operations together:
python# Batch all embeddings first df_with_embeddings = df.with_column( "embedding", semantic.embed(col("text")) ) # Then batch classifications df_classified = df_with_embeddings.with_column( "category", semantic.classify( col("text"), ["technical", "billing", "feature_request"] ) ) # Finally, batch expensive summarizations only where needed df_final = df_classified.with_column( "summary", when( col("category") == "technical", semantic.summarize(col("text")) ).otherwise(lit(None)) )
Cache Intermediate Results
Recomputing identical semantic operations across pipeline iterations burns tokens unnecessarily. Fenic provides explicit caching mechanisms.
DataFrame Persistence
Mark DataFrames for caching after expensive operations:
python# Persist after expensive embedding generation embedded_df = (df .with_column("embedding", semantic.embed(col("content"))) .persist() ) # Reuse cached embeddings in multiple downstream operations clusters = embedded_df.semantic.with_cluster_labels( by="embedding", num_clusters=10 ) similar_pairs = embedded_df.semantic.sim_join( embedded_df, left_on=col("embedding"), right_on=col("embedding"), k=5 )
Table-Based Caching for Multi-Run Pipelines
For pipelines that run repeatedly on overlapping data:
python# First run: compute and save embeddings = df.with_column( "embedding", semantic.embed(col("content")) ) embeddings.write.save_as_table( "content_embeddings", mode="overwrite" ) # Subsequent runs: load cached embeddings cached = session.table("content_embeddings") new_content = new_df.join( cached, on="content_id", how="left" ) # Only embed new items needs_embedding = new_content.filter(col("embedding").is_null()) fresh_embeddings = needs_embedding.with_column( "embedding", semantic.embed(col("content")) )
Optimize Prompt Length and Context
Long prompts consume tokens on every API call. Minimize prompt verbosity while maintaining semantic clarity.
Schema-Driven Extraction
Replace verbose prompt instructions with Pydantic schemas:
pythonfrom pydantic import BaseModel, Field class TicketExtraction(BaseModel): severity: Literal["low", "medium", "high", "critical"] affected_component: str user_impact: str # Concise extraction without lengthy prompts df = df.with_column( "extracted", semantic.extract(col("ticket_text"), TicketExtraction) )
The schema serves as both specification and validation.
Template Variables for Reusable Context
Use template variables instead of repeating static instructions:
python# Efficient: template with variables df.with_column( "summary", semantic.reduce( "Summarize issues from {{department}} in {{region}}", col("issue_text"), group_context={ "department": col("department"), "region": col("region") } ) )
Use Semantic Operations Efficiently
Fenic's semantic operators minimize token usage while maintaining quality.
Classification Over Extraction
Use semantic.classify for categorical assignments:
python# Efficient: direct classification df.with_column( "priority", semantic.classify( col("ticket"), ["urgent", "normal", "low"] ) )
Classification operations use optimized prompts and constrained outputs.
Semantic Joins for Bulk Matching
semantic.join handles batching internally:
python# Semantic Joins for Bulk Matching matches = candidates.semantic.join( other=requirements, predicate="Does this candidate ({{ left_on }}) meet the job requirements ({{ right_on }})?", left_on=col("resume"), right_on=col("job_description") )
Pattern demonstrated in Hacker News research agent.
Predicate Filtering for Selective Processing
Filter records semantically before expensive operations:
python# Predicate Filtering for Selective Processing relevant = df.filter( semantic.predicate( "Is this about payment processing issues? {{ ticket_text }}", ticket_text=col("ticket_text") ) ) # Then apply expensive operations only to filtered subset summarized = relevant.with_column( "detailed_summary", semantic.summarize(col("ticket_text")) )
Strategic Chunking for Large Documents
Process portions of documents rather than entire files.
Content-Aware Chunking
Use recursive token chunking to maintain semantic boundaries:
python# Chunk with context preservation df = df.with_column( "chunks", text.recursive_token_chunk( col("document"), chunk_size=500, chunk_overlap_percentage=10 ) ) # Process chunks individually df = df.explode("chunks") df = df.with_column( "chunk_classification", semantic.classify(col("chunks"), ["relevant", "irrelevant"]) ) # Filter to relevant chunks before expensive operations relevant_chunks = df.filter(col("chunk_classification") == "relevant")
Structured Document Extraction
For markdown documents, extract specific sections:
python# Convert to structured JSON df = df.with_column( "structured", markdown.to_json(col("markdown_content")) ) # Extract only specific sections with jq df = df.with_column( "technical_sections", json.jq( col("structured"), '.. | select(.type == "heading" and .level == 2 and .text | contains("Technical"))' ) )
Monitor and Optimize Token Usage
Fenic tracks metrics for every operation.
Query Metrics Analysis
Access detailed token consumption:
python# Execute query and capture metrics result = df.with_column( "embedding", semantic.embed(col("text")) ).collect() # Access metrics through session metrics_df = session.table("fenic_system.query_metrics") # Analyze token usage patterns token_analysis = session.sql(""" SELECT CAST(SUM(total_lm_cost) AS DOUBLE) as total_cost, CAST(SUM(total_lm_requests) AS DOUBLE) as total_requests, CAST(AVG(total_lm_cost / total_lm_requests) AS DOUBLE) as avg_cost_per_request FROM {df} WHERE session_id = '{session_id}' """, df=metrics_df, session_id=session.session_id) token_analysis.show()
Operator-Level Profiling
Identify which operations consume the most tokens:
python# Get execution plan with detailed metrics metrics = result.get_execution_plan_details() print(metrics) # Example output shows token usage per operator: # ┌─ Project [execution: 245ms, rows: 1000] # │ └─ LM: requests=10, tokens=15000, cost=$0.15 # └─ SemanticEmbed [execution: 1823ms, rows: 1000] # └─ RM: requests=5, tokens=50000, cost=$0.25
Rate Limiting as Token Conservation
Proper rate limit configuration prevents wasteful retry storms.
Provider-Specific Configuration
Configure separate limits for input and output tokens:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "claude": AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100_000, output_tpm=50_000 ) } ) )
Configuration shown in RudderStack case study.
Profile-Based Optimization
Use model profiles to control token budgets:
pythonconfig = SemanticConfig( language_models={ "reasoning": OpenAILanguageModel( model_name="o4-mini", rpm=500, tpm=1_000_000, profiles={ "quick": OpenAILanguageModel.Profile( reasoning_effort="low" ), "thorough": OpenAILanguageModel.Profile( reasoning_effort="high" ) }, default_profile="quick" ) } ) # Use appropriate profile per task df_simple = df.with_column( "category", semantic.classify(col("text"), classes, model_alias="reasoning") ) df_complex = df.with_column( "analysis", semantic.map( "Analyze root cause: {{ error_log }}", error_log=col("error_log"), model_alias=ModelAlias(name="reasoning", profile="thorough") ) )
Combine Deterministic and Semantic Operations
Not every operation requires LLM inference.
Rule-Based Preprocessing
Filter obvious cases with rules before applying semantic operations:
python# Deterministic severity tagging df = df.with_column( "initial_severity", when( (col("level") == "ERROR") | col("message").contains("timeout") | col("message").contains("connection refused"), lit("error") ).when( col("message").contains("retry") | col("message").contains("degraded"), lit("warn") ).otherwise(lit("info")) ) # Apply LLM only to ambiguous cases ambiguous = df.filter(col("initial_severity") == "info") classified = ambiguous.with_column( "semantic_severity", semantic.classify( col("message"), ["error", "warn", "info"] ) ) # Combine results final = df.join( classified, on="id", how="left" ).with_column( "severity", coalesce(col("semantic_severity"), col("initial_severity")) )
Text Functions for Simple Transformations
Use Fenic's built-in text functions for operations that don't require semantic processing:
python# Text Functions for Simple Transformations df = df.with_column( "cleaned", text.trim( text.regexp_replace(col("raw_text"), r'\d{4}-\d{4}-\d{4}-\d{4}', '[REDACTED]') ) ) # Reserve semantic operations for intent classification df = df.with_column( "intent", semantic.classify(col("cleaned"), ["request", "complaint", "question"]) )
Production Patterns
Incremental Processing Pipeline
Process only new or changed content:
python# Load previous run results previous = session.table("processed_content") # Identify new items new_items = current_batch.join( previous, on="content_hash", how="left_anti" ) # Process only new items processed_new = new_items.with_column( "embedding", semantic.embed(col("content")) ).with_column( "category", semantic.classify(col("content"), categories) ) # Combine with previous results updated = previous.union(processed_new) updated.write.save_as_table("processed_content", mode="overwrite")
Cost-Aware Model Selection
Route workload types to appropriate models:
pythonconfig = SemanticConfig( language_models={ "cheap": OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=1000, tpm=500_000 ), "quality": OpenAILanguageModel( model_name="gpt-4.1-pro", rpm=100, tpm=50_000 ) } ) # Use cheap model for high-volume classification df = df.with_column( "category", semantic.classify( col("text"), categories, model_alias="cheap" ) ) # Use quality model for critical analysis critical = df.filter(col("category") == "high_value") analyzed = critical.with_column( "detailed_analysis", semantic.extract( col("text"), AnalysisSchema, model_alias="quality" ) )
Measuring Success
Track these metrics to quantify token waste reduction:
Tokens per Processed Item
Monitor average token consumption per record:
pythonmetrics_df = session.table("fenic_system.query_metrics") efficiency = session.sql(""" SELECT execution_id, num_output_rows, total_lm_input_tokens, CAST(total_lm_input_tokens AS DOUBLE) / num_output_rows as tokens_per_item FROM {df} """, df=metrics_df)
Additional Metrics
- Cache Hit Rate: Track reuse versus recomputation frequency
- Processing Time per Token: Lower ratios indicate better batching
- Cost per Unit of Output: Total spend compared to processed volume
Implementation Checklist
Apply these optimizations systematically:
- Deduplicate input data on content fingerprints before model operations
- Configure appropriate rate limits to enable automatic batching
- Cache intermediate results for reuse across pipeline runs
- Profile models for different task types to optimize token budgets
- Filter data with rules and semantic predicates before expensive operations
- Chunk large documents strategically to process relevant sections
- Monitor token usage through Fenic's metrics system
- Iterate based on measured per-item token consumption
Summary
Minimizing token waste requires treating LLM operations as first-class data operations. Fenic's DataFrame abstraction provides tools to deduplicate, batch, cache, and optimize semantic operations within a familiar programming model.
Optimize at the data layer. Structure pipelines that eliminate redundant processing, cache intelligently, and apply semantic operations selectively for dramatic reductions in token consumption while maintaining output quality.
For teams processing thousands or millions of records through LLMs, these optimizations compound to material cost savings and faster throughput. Start with deduplication and batching, add caching as pipelines mature, and use metrics to drive continuous optimization.
Learn more about building efficient AI pipelines at Typedef and explore Fenic open source for implementation details. How to Minimize Token Waste ... efcf0806aaa55f547fbf0c74e.md External Displaying How to Minimize Token Waste in Large-Scale LLM Bat 2aadf41efcf0806aaa55f547fbf0c74e.md.

