<< goback()

How to Enable Content Classification and Tagging at Scale for Media Platforms

Typedef Team

How to Enable Content Classification and Tagging at Scale for Media Platforms

Media platforms managing millions of assets need automated classification systems that exceed human accuracy while processing content at scale. Transformer-based classifiers achieve 91% accuracy compared to 82% for traditional keyword methods—a 9 percentage point improvement that translates to thousands of correctly classified assets daily.

This technical guide shows how to build production classification systems using Typedef's Fenic framework, a PySpark-inspired DataFrame platform where LLM inference operates as a first-class query operation.

Traditional Tagging Approaches Break at Scale

Volume Exceeds Manual Processing Capacity

49% of CMS users spend over an hour publishing content, with 14% experiencing full-day delays. Manual metadata generation creates bottlenecks that compound across thousands of assets. Organizations processing 10,000+ monthly assets cannot sustain manual workflows.

Format Drift Degrades Rule-Based Systems

Content structures evolve—producers change templates, add metadata fields, switch formats. Regex patterns capturing 90% of content six months ago degrade to 60% as schemas drift. These failures remain silent until dashboards lose signal during critical incidents.

Keyword Matching Misses Context

Traditional systems identify "timeout" but miss sarcasm, regional references, and sentiment that humans recognize instantly. BERT-based models capture contextual relationships through bidirectional attention that keyword approaches cannot replicate.

The digital asset management market grows from $4.22 billion to $11.94 billion by 2030, driven by enterprises treating intelligent classification as infrastructure rather than optional tooling.

Production Classification Architecture

Inference-First Query Engine

Traditional platforms treat LLM calls as external black boxes that optimizers cannot inspect. Fenic's architecture embeds semantic operations directly into the query engine, enabling optimization of inference alongside CPU and memory operations.

The query optimizer sees semantic.extract() and semantic.classify() as first-class operations with specific characteristics: high latency, token costs, batching benefits, caching opportunities. This enables:

  • Operation reordering to minimize expensive inference
  • Request batching across rows
  • Aggressive caching for deterministic operations
  • Intelligent parallelization across providers
  • Accurate pre-execution cost estimation

Batch vs. Real-Time Processing

Production systems separate heavy inference from user-facing paths:

Batch preprocessing (offline):

  • Extract structured metadata from raw assets
  • Generate embeddings for semantic search
  • Classify content into taxonomy categories
  • Create summaries and descriptions

Real-time agents (online):

  • Query enriched data without inference latency
  • Provide predictable response times
  • Enable user-facing features

RudderStack reduced triage time by 95% using this pattern—batch classification with fast read-only queries.

Build Classification Pipelines with Fenic

Configure Your Environment

Install Fenic (Python 3.10-3.12):

bash
pip install fenic

Set provider keys:

bash
export OPENAI_API_KEY="your-openai-api-key"
export ANTHROPIC_API_KEY="your-anthropic-api-key"
export GOOGLE_API_KEY="your-google-api-key"

Initialize session with model aliases:

python
import fenic as fc

config = fc.SessionConfig(
    app_name="media_classification",
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                "gpt-4o-nano",
                rpm=500,
                tpm=200_000
            ),
            "mini": fc.OpenAILanguageModel(
                "gpt-4o-mini",
                rpm=300,
                tpm=150_000
            ),
            "flash": fc.GoogleVertexLanguageModel(
                "gemini-2.0-flash-lite",
                rpm=300,
                tpm=150_000
            ),
        },
        default_language_model="flash",
    ),
)

session = fc.Session.get_or_create(config)

Model aliases abstract provider details—swap models without changing pipeline code. Rate limits (rpm/tpm) prevent throttling while Fenic batches requests automatically.

Schema-Driven Content Extraction

Transform unstructured content to structured data using Pydantic schemas:

python
from pydantic import BaseModel, Field
from typing import List, Literal

class ContentMetadata(BaseModel):
    primary_category: Literal[
        "news", "entertainment", "sports",
        "education", "business", "technology"
    ]
    subcategories: List[str] = Field(
        description="Specific topics covered"
    )
    sentiment: Literal["positive", "neutral", "negative"]
    target_audience: Literal[
        "general", "professional", "academic", "youth"
    ]
    content_warnings: List[str] = Field(
        description="Sensitive content requiring warnings"
    )
    key_entities: List[str] = Field(
        description="People, organizations, locations mentioned"
    )

media_assets = (
    df
    .with_column(
        "metadata",
        fc.semantic.extract(fc.col("transcript"), ContentMetadata, model_alias="mini")
    )
    .unnest("metadata")
    .filter(fc.col("primary_category") == "technology")
    .filter(fc.col("sentiment") != "negative")
)

Schemas guide extraction with clear structure and constraints. Field descriptions provide context for accurate results. After extraction, filter and aggregate using standard DataFrame operations.

Fenic handles schema validation, error handling, and retries with row-level lineage for debugging.

Natural Language Content Filtering

Filter content with semantic predicates:

python
filtered_content = df.filter(
    (fc.col("duration_seconds") > 300) &
    fc.semantic.predicate(
        """Does this content discuss climate policy
        or environmental regulation?

        Content: {{description}}""",
        description=fc.col("description")
    )
)

Combine boolean logic with semantic evaluation. The query engine optimizes both—cheap boolean filters run first before expensive LLM predicates.

Dynamic prompts with Jinja templates:

python
fc.semantic.predicate(
    """Does this content mention {{ topic }}?
    {% if priority == "high" %}
    Only true if prominently featured.
    {% endif %}

    Content: {{ content_text }}""",
    topic=fc.lit("artificial intelligence"),
    priority=fc.col("priority"),
    content_text=fc.col("transcript")
)

Templates evaluate per row, enabling filtering that adapts to record characteristics.

Multi-Label Classification

Media assets often belong to multiple categories. Multi-label systems require specialized handling as aggregate metrics mask per-label performance:

python
class MultiLabelClassification(BaseModel):
    topics: List[Literal[
        "technology", "science", "health", "politics",
        "business", "culture", "environment", "education"
    ]] = Field(description="All relevant topic categories")

    content_types: List[Literal[
        "tutorial", "news", "opinion", "interview",
        "review", "analysis", "entertainment"
    ]] = Field(description="Content format classifications")

    maturity_rating: Literal["all_ages", "teen", "mature", "explicit"]

    accessibility_features: List[Literal[
        "closed_captions", "audio_description",
        "sign_language", "transcript_available"
    ]]

classified = (
    media_df
    .with_column(
        "classification",
        fc.semantic.extract(fc.col("content"), MultiLabelClassification, model_alias="mini")
    )
    .unnest("classification")
    .explode("topics")
)

tech_health = classified.filter(
    fc.col("topics").is_in(["technology", "health"])
)

Explode list fields to create rows per category, enabling precise filtering and aggregation across multi-label dimensions.

Optimize Performance and Cost

Strategic Model Selection

Model costs vary 10-100x between sizes. Strategic selection cuts expenses by 80% while maintaining quality:

python
semantic_config = fc.SemanticConfig(
    language_models={
        "nano": fc.OpenAILanguageModel("gpt-4o-nano"),      # Simple classification
        "mini": fc.OpenAILanguageModel("gpt-4o-mini"),      # Structured extraction
        "flash": fc.GoogleVertexLanguageModel("gemini-2.0-flash-lite"),  # Bulk processing
        "opus": fc.AnthropicLanguageModel("claude-opus-4")  # Advanced reasoning
    }
)

# Binary classification - use nano
fc.semantic.classify(fc.col("text"), ["safe", "unsafe"], model_alias="nano")

# Structured extraction - use mini
fc.semantic.extract(fc.col("content"), ContentSchema, model_alias="mini")

# Nuanced analysis - use opus only when required
fc.semantic.map(analysis_prompt, model_alias="opus")

DistilBERT maintains 90% accuracy vs. BERT's 91% while cutting compute costs 40-60%. Apply this principle: smaller models for appropriate tasks.

Automatic Batching and Rate Limiting

Fenic batches requests and self-throttles based on configured limits:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "flash": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash-lite",
                rpm=300,  # Requests per minute
                tpm=150_000  # Tokens per minute
            )
        }
    )
)

The engine tracks real-time token usage and adjusts rates to stay within provider constraints. Async I/O with concurrent batching maximizes throughput while preventing throttling.

Reduce costs through deduplication before embedding or classification:

python
deduplicated = (
    df
    .drop_duplicates([fc.col("normalized_content")])
    .with_column("embedding", fc.semantic.embed(fc.col("content")))
)

Process unique content rather than raw volume—often 40-60% cost reduction for media libraries with republished content.

Intelligent Caching

Cache expensive operations for reuse:

python
cached_classifications = (
    df
    .filter(fc.col("publish_date") >= "2024-01-01")
    .with_column("extracted", fc.semantic.extract(fc.col("content"), Schema))
    .persist()
)

# Multiple operations reuse cached classifications
tech_content = cached_classifications.filter(fc.col("extracted").category == "tech")
news_content = cached_classifications.filter(fc.col("extracted").category == "news")

Fenic also caches identical inference calls automatically within sessions—no redundant API calls for repeated prompts with identical inputs.

Production Reliability Features

Error Handling and Retry Logic

Fenic provides built-in retry logic for transient failures. For 429 quota exhausted responses, the system fails fast with clear errors rather than retry loops.

Token capacity guardrails validate context limits before submission:

python
large_document_extraction = (
    df
    .with_column(
        "summary",
        fc.semantic.map(
            "Summarize this document: {{content}}",
            content=fc.col("full_text"),
            model_alias="mini"
        )
    )
)
# Raises clear error if documents exceed model token limits

Prevents silent failures where requests truncate without notification.

Data Lineage for Debugging

Every column and row has traceable origins:

python
pipeline = (
    media_df
    .filter(fc.col("duration") > 600)
    .with_column("extracted", fc.semantic.extract(fc.col("transcript"), ContentSchema))
    .unnest("extracted")
    .filter(fc.col("category") == "education")
)

lineage = pipeline.lineage()
result = pipeline.collect()

Row-level lineage enables debugging when classifications fail or produce unexpected results. Trace backwards to identify error origins.

Monitoring and Observability

Built-in metrics provide first-class observability:

python
result = classification_pipeline.collect()
metrics = result.metrics

print(f"Total tokens: {metrics.lm_metrics.total_tokens}")
print(f"Total cost: ${metrics.lm_metrics.total_cost}")
print(f"Execution time: {metrics.execution_time}s")

# Operator-level metrics show spending patterns
for op_metric in metrics.operator_metrics:
    if op_metric.cost > 5.0:
        print(f"Expensive operator: {op_metric.name}, Cost: ${op_metric.cost}")

Track model usage distribution:

python
print(f"Nano usage: {metrics.lm_metrics['nano'].total_tokens} tokens")
print(f"Mini usage: {metrics.lm_metrics['mini'].total_tokens} tokens")
print(f"Cost per asset: ${metrics.lm_metrics.total_cost / len(result)}")

Organizations implementing semantic layers report 54% infrastructure time reduction through automated optimization and observability.

Case Study: RudderStack Implementation

RudderStack processes support tickets, sales calls, and OSS threads across Event Stream, Transformations, and 200+ integrations. 90% of inputs are unstructured, creating triage backlogs.

Implementation Details

Ingest and normalize:

  • Warehouse: support ticket threads, sales transcripts, usage data
  • Documentation: product docs, Notion PRDs, strategy documents

Semantic context model:

  • Infer product taxonomy from documentation
  • Map tickets, issues, PRDs to taxonomy with citations
  • Create semantic links between related content

Automated classification:

python
# First perform semantic join to find related tickets
tickets_with_related = raw_tickets.semantic.join(
    other=historical_tickets,
    predicate="Are these tickets related?",
    left_on=fc.col("description"),
    right_on=fc.col("description")
)

# Then classify and add rationale
triage_pipeline = (
    tickets_with_related
    .with_column(
        "classification",
        fc.semantic.extract(fc.col("ticket_content"), TicketSchema)
    )
    .with_column(
        "priority_rationale",
        fc.semantic.map(
            """Provide prioritization rationale based on:
            - Customer tier: {{tier}}
            - Related ticket: {{related_description}}
            - Strategy alignment: {{strategy_match}}""",
            tier=fc.col("customer_tier"),
            related_description=fc.col("description"),
            strategy_match=fc.col("classification").strategy_alignment
        )
    )
)

Linear integration:

  • Agent subscribes to new feature requests
  • Classifies to taxonomy, surfaces duplicates
  • Proposes Prioritize/Monitor/Decline with rationale
  • Writes back to Linear, sets "Under Review"

Results

  • 95% reduction in PM time per triage
  • 90%+ first-pass category acceptance
  • Citations enabled: prospect and community signals in Linear
  • Semantic links improved accuracy: Issues linked to PRDs and strategy

"It changed the way I think about feature requests… now we can double or triple the number of things that come through and still process it." — Eric Dodds, Head of Product

Advanced Classification Techniques

Multi-Modal Content Classification

Multi-modal approaches achieve 98.53% accuracy, 8-13 percentage points above text-only methods.

Process video with multiple modalities:

python
from pydantic import BaseModel
from typing import List

class VideoClassification(BaseModel):
    visual_themes: List[str] = Field(description="Themes from visual analysis")
    audio_topics: List[str] = Field(description="Topics from audio/speech")
    text_content: List[str] = Field(description="On-screen text and captions")
    overall_category: str
    confidence_score: float

video_classification = (
    videos
    .with_column(
        "visual",
        fc.semantic.extract(fc.col("frame_descriptions"), VisualSchema, model_alias="mini")
    )
    .with_column(
        "audio",
        fc.semantic.extract(fc.col("transcript"), AudioSchema, model_alias="mini")
    )
    .with_column(
        "combined",
        fc.semantic.map(
            """Combine visual and audio analysis:
            Visual: {{visual}}
            Audio: {{audio}}
            
            Provide a VideoClassification with overall category and confidence.""",
            visual=fc.col("visual"),
            audio=fc.col("audio"),
            model_alias="mini"
        )
    )
)

Semantic Clustering for Auto-Tagging

Generate tags automatically through similarity clustering:

python
# Generate embeddings
embedded_content = (
    media_df
    .with_column("embedding", fc.semantic.embed(fc.col("description")))
)

# Cluster by semantic similarity
clustered = embedded_content.semantic.with_cluster_labels(
    by=fc.col("embedding"),
    num_clusters=20
)

# Generate representative tags per cluster
cluster_tags = (
    clustered
    .group_by("cluster_label")
    .agg(
        fc.semantic.reduce(
            "Generate 3-5 descriptive tags for this cluster: {{descriptions}}",
            fc.col("description"),
            model_alias="mini"
        ).alias("cluster_tags")
    )
)

tagged_content = clustered.join(cluster_tags, on="cluster_label")

Organizations report 4x improvement in metrics consistency through semantic clustering that groups conceptually similar content even with different keywords.

Hybrid Rule-Based and Semantic Classification

Combine deterministic rules with semantic evaluation:

python
def classify_content(df):
    # Fast rule-based classification for clear cases
    df = df.with_column(
        "rule_based_category",
        fc.when(fc.col("duration") < 60, "short_form")
        .when(fc.col("title").contains("tutorial"), "educational")
        .when(fc.col("source") == "news_feed", "news")
        .otherwise(None)
    )

    # Semantic classification for ambiguous cases only
    needs_semantic = df.filter(fc.col("rule_based_category").is_null())

    semantic_classified = needs_semantic.with_column(
        "semantic_category",
        fc.semantic.classify(
            fc.col("description"),
            classes=["educational", "entertainment", "news", "business"],
            model_alias="nano"
        )
    )

    # Combine results
    return df.with_column(
        "final_category",
        fc.coalesce(fc.col("rule_based_category"), fc.col("semantic_category"))
    )

Use cheap rule evaluation for 60-70% of content, reserve expensive semantic operations for ambiguous cases.

Optimized SVM models achieve 87.6% accuracy—traditional approaches remain viable for well-defined tasks with clear boundaries.

Deployment Patterns

Lakehouse-Native Processing

Fenic is pure compute with no proprietary storage—read and write to existing data lakes:

python
# Read from S3/data lake
raw_media = session.read.parquet("s3://media-lake/raw/*.parquet")

# Process with semantic operators
classified = (
    raw_media
    .with_column("metadata", fc.semantic.extract("content", Schema))
    .with_column("embedding", fc.semantic.embed("processed_content"))
    .filter(fc.col("quality_score") > 0.8)
)

# Write back to data lake
classified.write.parquet("s3://media-lake/classified/")

Full compatibility with Parquet, Iceberg, Delta Lake, Lance. Built on Apache Arrow for ecosystem interoperability—works with Spark, Polars, DuckDB, pandas.

Incremental Processing

Process only new content since last run:

python
# Track last processed timestamp
last_run = session.read.parquet("s3://checkpoints/last_run.parquet").collect()[0]["timestamp"]

# Filter for new content
new_content = (
    session.read.parquet("s3://media-lake/raw/*.parquet")
    .filter(fc.col("created_at") > last_run)
)

# Classify new content
classified_new = new_content.with_column(
    "classification",
    fc.semantic.extract(fc.col("content"), Schema)
)

# Append to existing classifications
classified_new.write.parquet("s3://media-lake/classified/", mode="append")

# Update checkpoint
current_timestamp = fc.dt.current_timestamp()
session.create_dataframe([{"timestamp": current_timestamp}]).write.parquet("s3://checkpoints/last_run.parquet", mode="overwrite")

PDF and Document Processing

Fenic 0.5.0 introduced native PDF parsing with page chunking and metadata extraction:

python
# Extract PDF metadata
pdfs = session.read.pdf_metadata("s3://documents/**/*.pdf", recursive=True)

# Filter by metadata before expensive parsing
relevant_pdfs = pdfs.filter(
    (fc.col("page_count") > 5) &
    (fc.col("creation_date") >= "2024-01-01")
)

# Parse to markdown with page separators
parsed = relevant_pdfs.select(
    fc.col("file_path"),
    fc.semantic.parse_pdf(
        fc.col("file_path"),
        page_separator="--- PAGE {page} ---",
        describe_images=True
    ).alias("markdown")
)

# Extract structured metadata
classified_docs = parsed.with_column(
    "metadata",
    fc.semantic.extract(fc.col("markdown"), DocumentSchema, model_alias="mini")
)

Efficient document pipelines filter on metadata before expensive parsing and classification.

Implementation Checklist

Infrastructure setup:

  • Install Fenic and configure provider keys
  • Define model aliases for different classification tasks
  • Set rate limits based on provider quotas

Schema design:

  • Create Pydantic models with clear field descriptions
  • Use Literal types to constrain outputs to valid categories
  • Add Field descriptions for extraction context

Pipeline construction:

  • Start with simple classification on small datasets
  • Validate results and estimate costs using metrics
  • Add caching for expensive operations
  • Implement incremental processing for ongoing classification

Cost optimization:

  • Use smaller models for simple tasks
  • Deduplicate before expensive operations
  • Batch requests automatically through Fenic
  • Monitor per-operator costs and adjust

Production readiness:

  • Implement error handling and retry logic
  • Set up data lineage for debugging
  • Configure observability and metrics tracking
  • Test failure scenarios

Scale deployment:

  • Process batch workloads offline
  • Write results to data lake
  • Expose through fast read-only queries
  • Monitor performance and accuracy metrics

Organizations following this pattern report 45% reduction in time-to-insight and 90%+ classification accuracy at scale.

Summary

Production-grade content classification for media platforms requires infrastructure built for inference workloads:

Semantic operators as first-class operations enable automatic optimization impossible with traditional approaches. Declarative pipelines allow batch requests, intelligent caching, and global optimization.

Batch/real-time separation provides predictable performance. Heavy classification runs offline while real-time systems query enriched data without inference latency.

Strategic model selection reduces costs 80% while maintaining quality. Nano for simple classification, mini for structured extraction, reserve expensive models for advanced reasoning.

Schema-driven extraction eliminates brittle prompts through type-safe Pydantic models with automatic validation, error handling, and retry logic.

Built-in observability through metrics, row-level lineage, and cost tracking enables continuous optimization and debugging.

Results: 95% reduction in triage time, 90%+ first-pass accuracy, 45% reduction in time-to-insight.

Start with simple classification on small datasets, validate results and costs, then scale to production with infrastructure handling batching, optimization, error handling, and observability automatically.

Resources

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.