<< goback()

How to Minimize Token Waste in Large-Scale LLM Batch Processing

Typedef Team

How to Minimize Token Waste in Large-Scale LLM Batch Processing

Processing large volumes of unstructured data with LLMs creates a critical challenge: token waste. Every redundant API call, duplicate embedding, or inefficient prompt inflates costs and slows throughput. For teams running batch operations at scale, these inefficiencies compound quickly.

This guide shows practical strategies for minimizing token waste using Fenic, a DataFrame framework built for AI workloads. You'll learn how to structure pipelines that eliminate redundant processing, optimize inference operations, and maintain predictable costs.

Token Waste Patterns in Batch Processing

Token waste occurs when pipelines process identical content multiple times, send redundant data to model APIs, or fail to batch operations efficiently. Common patterns:

  • Embedding duplicate text across different records
  • Re-summarizing identical content in separate pipeline runs
  • Processing entire documents when only specific sections changed
  • Making individual API calls instead of batching requests
  • Sending verbose context when concise fingerprints suffice

Cost impact scales linearly with data volume, making optimization essential for production workloads.

Deduplicate Before Processing

The most effective token reduction happens before any model call: deduplicate input data. When processing logs, support tickets, or user feedback, identical or near-identical content appears multiple times.

Fingerprinting for Stable Deduplication

Create stable fingerprints that capture semantic meaning while ignoring volatile details. From the log clustering guide:

python
# Extract stable components
df = df.with_column(
    "fingerprint",
    text.concat(
        col("service"),
        lit("|"),
        col("error_symbol"),
        lit("|"),
        col("normalized_message")
    )
)

# Deduplicate on fingerprint before embedding
unique_df = df.group_by("fingerprint").agg(
    first("raw_message").alias("representative"),
    count("*").alias("occurrence_count")
)

# Embed only unique fingerprints
embedded = unique_df.with_column(
    "embedding",
    semantic.embed(col("representative"))
)

This reduces embedding costs by processing each distinct issue once.

Token-Level Deduplication

Use Fenic's count_tokens function to identify high-token content:

python
# Count tokens before processing
df = df.with_column(
    "token_count",
    text.count_tokens(col("content"))
)

# Separate high-token items for specialized handling
standard_items = df.filter(col("token_count") < 1000)
long_items = df.filter(col("token_count") >= 1000)

# Process long items with chunking
chunked = long_items.with_column(
    "chunks",
    text.recursive_token_chunk(col("content"), 500, 10)
)

Batch Operations Aggressively

Individual API calls waste tokens through repeated connection overhead and prevent rate limit optimization. Fenic automatically batches semantic operations.

Automatic Batching Configuration

Configure models with appropriate rate limits to enable intelligent batching:

python
config = SessionConfig(
    app_name="batch_processor",
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=500,
                tpm=200_000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=1000,
                tpm=1_000_000
            )
        }
    )
)

Fenic respects these limits while maximizing throughput through concurrent request batching and self-throttling, detailed in semantic operators.

Group Operations by Cost Profile

Batch similar operations together:

python
# Batch all embeddings first
df_with_embeddings = df.with_column(
    "embedding",
    semantic.embed(col("text"))
)

# Then batch classifications
df_classified = df_with_embeddings.with_column(
    "category",
    semantic.classify(
        col("text"),
        ["technical", "billing", "feature_request"]
    )
)

# Finally, batch expensive summarizations only where needed
df_final = df_classified.with_column(
    "summary",
    when(
        col("category") == "technical",
        semantic.summarize(col("text"))
    ).otherwise(lit(None))
)

Cache Intermediate Results

Recomputing identical semantic operations across pipeline iterations burns tokens unnecessarily. Fenic provides explicit caching mechanisms.

DataFrame Persistence

Mark DataFrames for caching after expensive operations:

python
# Persist after expensive embedding generation
embedded_df = (df
    .with_column("embedding", semantic.embed(col("content")))
    .persist()
)

# Reuse cached embeddings in multiple downstream operations
clusters = embedded_df.semantic.with_cluster_labels(
    by="embedding",
    num_clusters=10
)

similar_pairs = embedded_df.semantic.sim_join(
    embedded_df,
    left_on=col("embedding"),
    right_on=col("embedding"),
    k=5
)

Table-Based Caching for Multi-Run Pipelines

For pipelines that run repeatedly on overlapping data:

python
# First run: compute and save
embeddings = df.with_column(
    "embedding",
    semantic.embed(col("content"))
)
embeddings.write.save_as_table(
    "content_embeddings",
    mode="overwrite"
)

# Subsequent runs: load cached embeddings
cached = session.table("content_embeddings")
new_content = new_df.join(
    cached,
    on="content_id",
    how="left"
)

# Only embed new items
needs_embedding = new_content.filter(col("embedding").is_null())
fresh_embeddings = needs_embedding.with_column(
    "embedding",
    semantic.embed(col("content"))
)

Optimize Prompt Length and Context

Long prompts consume tokens on every API call. Minimize prompt verbosity while maintaining semantic clarity.

Schema-Driven Extraction

Replace verbose prompt instructions with Pydantic schemas:

python
from pydantic import BaseModel, Field

class TicketExtraction(BaseModel):
    severity: Literal["low", "medium", "high", "critical"]
    affected_component: str
    user_impact: str

# Concise extraction without lengthy prompts
df = df.with_column(
    "extracted",
    semantic.extract(col("ticket_text"), TicketExtraction)
)

The schema serves as both specification and validation.

Template Variables for Reusable Context

Use template variables instead of repeating static instructions:

python
# Efficient: template with variables
df.with_column(
    "summary",
    semantic.reduce(
        "Summarize issues from {{department}} in {{region}}",
        col("issue_text"),
        group_context={
            "department": col("department"),
            "region": col("region")
        }
    )
)

Use Semantic Operations Efficiently

Fenic's semantic operators minimize token usage while maintaining quality.

Classification Over Extraction

Use semantic.classify for categorical assignments:

python
# Efficient: direct classification
df.with_column(
    "priority",
    semantic.classify(
        col("ticket"),
        ["urgent", "normal", "low"]
    )
)

Classification operations use optimized prompts and constrained outputs.

Semantic Joins for Bulk Matching

semantic.join handles batching internally:

python
# Semantic Joins for Bulk Matching
matches = candidates.semantic.join(
    other=requirements,
    predicate="Does this candidate ({{ left_on }}) meet the job requirements ({{ right_on }})?",
    left_on=col("resume"),
    right_on=col("job_description")
)

Pattern demonstrated in Hacker News research agent.

Predicate Filtering for Selective Processing

Filter records semantically before expensive operations:

python
# Predicate Filtering for Selective Processing
relevant = df.filter(
    semantic.predicate(
        "Is this about payment processing issues? {{ ticket_text }}",
        ticket_text=col("ticket_text")
    )
)

# Then apply expensive operations only to filtered subset
summarized = relevant.with_column(
    "detailed_summary",
    semantic.summarize(col("ticket_text"))
)

Strategic Chunking for Large Documents

Process portions of documents rather than entire files.

Content-Aware Chunking

Use recursive token chunking to maintain semantic boundaries:

python
# Chunk with context preservation
df = df.with_column(
    "chunks",
    text.recursive_token_chunk(
        col("document"),
        chunk_size=500,
        chunk_overlap_percentage=10
    )
)

# Process chunks individually
df = df.explode("chunks")
df = df.with_column(
    "chunk_classification",
    semantic.classify(col("chunks"), ["relevant", "irrelevant"])
)

# Filter to relevant chunks before expensive operations
relevant_chunks = df.filter(col("chunk_classification") == "relevant")

Structured Document Extraction

For markdown documents, extract specific sections:

python
# Convert to structured JSON
df = df.with_column(
    "structured",
    markdown.to_json(col("markdown_content"))
)

# Extract only specific sections with jq
df = df.with_column(
    "technical_sections",
    json.jq(
        col("structured"),
        '.. | select(.type == "heading" and .level == 2 and .text | contains("Technical"))'
    )
)

Monitor and Optimize Token Usage

Fenic tracks metrics for every operation.

Query Metrics Analysis

Access detailed token consumption:

python
# Execute query and capture metrics
result = df.with_column(
    "embedding",
    semantic.embed(col("text"))
).collect()

# Access metrics through session
metrics_df = session.table("fenic_system.query_metrics")

# Analyze token usage patterns
token_analysis = session.sql("""
    SELECT
        CAST(SUM(total_lm_cost) AS DOUBLE) as total_cost,
        CAST(SUM(total_lm_requests) AS DOUBLE) as total_requests,
        CAST(AVG(total_lm_cost / total_lm_requests) AS DOUBLE) as avg_cost_per_request
    FROM {df}
    WHERE session_id = '{session_id}'
""", df=metrics_df, session_id=session.session_id)

token_analysis.show()

Operator-Level Profiling

Identify which operations consume the most tokens:

python
# Get execution plan with detailed metrics
metrics = result.get_execution_plan_details()
print(metrics)

# Example output shows token usage per operator:
# ┌─ Project [execution: 245ms, rows: 1000]
# │  └─ LM: requests=10, tokens=15000, cost=$0.15
# └─ SemanticEmbed [execution: 1823ms, rows: 1000]
#    └─ RM: requests=5, tokens=50000, cost=$0.25

Rate Limiting as Token Conservation

Proper rate limit configuration prevents wasteful retry storms.

Provider-Specific Configuration

Configure separate limits for input and output tokens:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "claude": AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000
            )
        }
    )
)

Configuration shown in RudderStack case study.

Profile-Based Optimization

Use model profiles to control token budgets:

python
config = SemanticConfig(
    language_models={
        "reasoning": OpenAILanguageModel(
            model_name="o4-mini",
            rpm=500,
            tpm=1_000_000,
            profiles={
                "quick": OpenAILanguageModel.Profile(
                    reasoning_effort="low"
                ),
                "thorough": OpenAILanguageModel.Profile(
                    reasoning_effort="high"
                )
            },
            default_profile="quick"
        )
    }
)

# Use appropriate profile per task
df_simple = df.with_column(
    "category",
    semantic.classify(col("text"), classes, model_alias="reasoning")
)

df_complex = df.with_column(
    "analysis",
    semantic.map(
        "Analyze root cause: {{ error_log }}",
        error_log=col("error_log"),
        model_alias=ModelAlias(name="reasoning", profile="thorough")
    )
)

Combine Deterministic and Semantic Operations

Not every operation requires LLM inference.

Rule-Based Preprocessing

Filter obvious cases with rules before applying semantic operations:

python
# Deterministic severity tagging
df = df.with_column(
    "initial_severity",
    when(
        (col("level") == "ERROR") |
        col("message").contains("timeout") |
        col("message").contains("connection refused"),
        lit("error")
    ).when(
        col("message").contains("retry") |
        col("message").contains("degraded"),
        lit("warn")
    ).otherwise(lit("info"))
)

# Apply LLM only to ambiguous cases
ambiguous = df.filter(col("initial_severity") == "info")
classified = ambiguous.with_column(
    "semantic_severity",
    semantic.classify(
        col("message"),
        ["error", "warn", "info"]
    )
)

# Combine results
final = df.join(
    classified,
    on="id",
    how="left"
).with_column(
    "severity",
    coalesce(col("semantic_severity"), col("initial_severity"))
)

Text Functions for Simple Transformations

Use Fenic's built-in text functions for operations that don't require semantic processing:

python
# Text Functions for Simple Transformations
df = df.with_column(
    "cleaned",
    text.trim(
        text.regexp_replace(col("raw_text"), r'\d{4}-\d{4}-\d{4}-\d{4}', '[REDACTED]')
    )
)

# Reserve semantic operations for intent classification
df = df.with_column(
    "intent",
    semantic.classify(col("cleaned"), ["request", "complaint", "question"])
)

Production Patterns

Incremental Processing Pipeline

Process only new or changed content:

python
# Load previous run results
previous = session.table("processed_content")

# Identify new items
new_items = current_batch.join(
    previous,
    on="content_hash",
    how="left_anti"
)

# Process only new items
processed_new = new_items.with_column(
    "embedding",
    semantic.embed(col("content"))
).with_column(
    "category",
    semantic.classify(col("content"), categories)
)

# Combine with previous results
updated = previous.union(processed_new)
updated.write.save_as_table("processed_content", mode="overwrite")

Cost-Aware Model Selection

Route workload types to appropriate models:

python
config = SemanticConfig(
    language_models={
        "cheap": OpenAILanguageModel(
            model_name="gpt-4.1-nano",
            rpm=1000,
            tpm=500_000
        ),
        "quality": OpenAILanguageModel(
            model_name="gpt-4.1-pro",
            rpm=100,
            tpm=50_000
        )
    }
)

# Use cheap model for high-volume classification
df = df.with_column(
    "category",
    semantic.classify(
        col("text"),
        categories,
        model_alias="cheap"
    )
)

# Use quality model for critical analysis
critical = df.filter(col("category") == "high_value")
analyzed = critical.with_column(
    "detailed_analysis",
    semantic.extract(
        col("text"),
        AnalysisSchema,
        model_alias="quality"
    )
)

Measuring Success

Track these metrics to quantify token waste reduction:

Tokens per Processed Item

Monitor average token consumption per record:

python
metrics_df = session.table("fenic_system.query_metrics")
efficiency = session.sql("""
    SELECT
        execution_id,
        num_output_rows,
        total_lm_input_tokens,
        CAST(total_lm_input_tokens AS DOUBLE) / num_output_rows as tokens_per_item
    FROM {df}
""", df=metrics_df)

Additional Metrics

  • Cache Hit Rate: Track reuse versus recomputation frequency
  • Processing Time per Token: Lower ratios indicate better batching
  • Cost per Unit of Output: Total spend compared to processed volume

Implementation Checklist

Apply these optimizations systematically:

  1. Deduplicate input data on content fingerprints before model operations
  2. Configure appropriate rate limits to enable automatic batching
  3. Cache intermediate results for reuse across pipeline runs
  4. Profile models for different task types to optimize token budgets
  5. Filter data with rules and semantic predicates before expensive operations
  6. Chunk large documents strategically to process relevant sections
  7. Monitor token usage through Fenic's metrics system
  8. Iterate based on measured per-item token consumption

Summary

Minimizing token waste requires treating LLM operations as first-class data operations. Fenic's DataFrame abstraction provides tools to deduplicate, batch, cache, and optimize semantic operations within a familiar programming model.

Optimize at the data layer. Structure pipelines that eliminate redundant processing, cache intelligently, and apply semantic operations selectively for dramatic reductions in token consumption while maintaining output quality.

For teams processing thousands or millions of records through LLMs, these optimizations compound to material cost savings and faster throughput. Start with deduplication and batching, add caching as pipelines mature, and use metrics to drive continuous optimization.

Learn more about building efficient AI pipelines at Typedef and explore Fenic open source for implementation details. How to Minimize Token Waste ... efcf0806aaa55f547fbf0c74e.md External Displaying How to Minimize Token Waste in Large-Scale LLM Bat 2aadf41efcf0806aaa55f547fbf0c74e.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.