Media platforms managing millions of assets need automated classification systems that exceed human accuracy while processing content at scale. Transformer-based classifiers achieve 91% accuracy compared to 82% for traditional keyword methods—a 9 percentage point improvement that translates to thousands of correctly classified assets daily.
This technical guide shows how to build production classification systems using Typedef's Fenic framework, a PySpark-inspired DataFrame platform where LLM inference operates as a first-class query operation.
Traditional Tagging Approaches Break at Scale
Volume Exceeds Manual Processing Capacity
49% of CMS users spend over an hour publishing content, with 14% experiencing full-day delays. Manual metadata generation creates bottlenecks that compound across thousands of assets. Organizations processing 10,000+ monthly assets cannot sustain manual workflows.
Format Drift Degrades Rule-Based Systems
Content structures evolve—producers change templates, add metadata fields, switch formats. Regex patterns capturing 90% of content six months ago degrade to 60% as schemas drift. These failures remain silent until dashboards lose signal during critical incidents.
Keyword Matching Misses Context
Traditional systems identify "timeout" but miss sarcasm, regional references, and sentiment that humans recognize instantly. BERT-based models capture contextual relationships through bidirectional attention that keyword approaches cannot replicate.
The digital asset management market grows from $4.22 billion to $11.94 billion by 2030, driven by enterprises treating intelligent classification as infrastructure rather than optional tooling.
Production Classification Architecture
Inference-First Query Engine
Traditional platforms treat LLM calls as external black boxes that optimizers cannot inspect. Fenic's architecture embeds semantic operations directly into the query engine, enabling optimization of inference alongside CPU and memory operations.
The query optimizer sees semantic.extract() and semantic.classify() as first-class operations with specific characteristics: high latency, token costs, batching benefits, caching opportunities. This enables:
- Operation reordering to minimize expensive inference
- Request batching across rows
- Aggressive caching for deterministic operations
- Intelligent parallelization across providers
- Accurate pre-execution cost estimation
Batch vs. Real-Time Processing
Production systems separate heavy inference from user-facing paths:
Batch preprocessing (offline):
- Extract structured metadata from raw assets
- Generate embeddings for semantic search
- Classify content into taxonomy categories
- Create summaries and descriptions
Real-time agents (online):
- Query enriched data without inference latency
- Provide predictable response times
- Enable user-facing features
RudderStack reduced triage time by 95% using this pattern—batch classification with fast read-only queries.
Build Classification Pipelines with Fenic
Configure Your Environment
Install Fenic (Python 3.10-3.12):
bashpip install fenic
Set provider keys:
bashexport OPENAI_API_KEY="your-openai-api-key" export ANTHROPIC_API_KEY="your-anthropic-api-key" export GOOGLE_API_KEY="your-google-api-key"
Initialize session with model aliases:
pythonimport fenic as fc config = fc.SessionConfig( app_name="media_classification", semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( "gpt-4o-nano", rpm=500, tpm=200_000 ), "mini": fc.OpenAILanguageModel( "gpt-4o-mini", rpm=300, tpm=150_000 ), "flash": fc.GoogleVertexLanguageModel( "gemini-2.0-flash-lite", rpm=300, tpm=150_000 ), }, default_language_model="flash", ), ) session = fc.Session.get_or_create(config)
Model aliases abstract provider details—swap models without changing pipeline code. Rate limits (rpm/tpm) prevent throttling while Fenic batches requests automatically.
Schema-Driven Content Extraction
Transform unstructured content to structured data using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class ContentMetadata(BaseModel): primary_category: Literal[ "news", "entertainment", "sports", "education", "business", "technology" ] subcategories: List[str] = Field( description="Specific topics covered" ) sentiment: Literal["positive", "neutral", "negative"] target_audience: Literal[ "general", "professional", "academic", "youth" ] content_warnings: List[str] = Field( description="Sensitive content requiring warnings" ) key_entities: List[str] = Field( description="People, organizations, locations mentioned" ) media_assets = ( df .with_column( "metadata", fc.semantic.extract(fc.col("transcript"), ContentMetadata, model_alias="mini") ) .unnest("metadata") .filter(fc.col("primary_category") == "technology") .filter(fc.col("sentiment") != "negative") )
Schemas guide extraction with clear structure and constraints. Field descriptions provide context for accurate results. After extraction, filter and aggregate using standard DataFrame operations.
Fenic handles schema validation, error handling, and retries with row-level lineage for debugging.
Natural Language Content Filtering
Filter content with semantic predicates:
pythonfiltered_content = df.filter( (fc.col("duration_seconds") > 300) & fc.semantic.predicate( """Does this content discuss climate policy or environmental regulation? Content: {{description}}""", description=fc.col("description") ) )
Combine boolean logic with semantic evaluation. The query engine optimizes both—cheap boolean filters run first before expensive LLM predicates.
Dynamic prompts with Jinja templates:
pythonfc.semantic.predicate( """Does this content mention {{ topic }}? {% if priority == "high" %} Only true if prominently featured. {% endif %} Content: {{ content_text }}""", topic=fc.lit("artificial intelligence"), priority=fc.col("priority"), content_text=fc.col("transcript") )
Templates evaluate per row, enabling filtering that adapts to record characteristics.
Multi-Label Classification
Media assets often belong to multiple categories. Multi-label systems require specialized handling as aggregate metrics mask per-label performance:
pythonclass MultiLabelClassification(BaseModel): topics: List[Literal[ "technology", "science", "health", "politics", "business", "culture", "environment", "education" ]] = Field(description="All relevant topic categories") content_types: List[Literal[ "tutorial", "news", "opinion", "interview", "review", "analysis", "entertainment" ]] = Field(description="Content format classifications") maturity_rating: Literal["all_ages", "teen", "mature", "explicit"] accessibility_features: List[Literal[ "closed_captions", "audio_description", "sign_language", "transcript_available" ]] classified = ( media_df .with_column( "classification", fc.semantic.extract(fc.col("content"), MultiLabelClassification, model_alias="mini") ) .unnest("classification") .explode("topics") ) tech_health = classified.filter( fc.col("topics").is_in(["technology", "health"]) )
Explode list fields to create rows per category, enabling precise filtering and aggregation across multi-label dimensions.
Optimize Performance and Cost
Strategic Model Selection
Model costs vary 10-100x between sizes. Strategic selection cuts expenses by 80% while maintaining quality:
pythonsemantic_config = fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel("gpt-4o-nano"), # Simple classification "mini": fc.OpenAILanguageModel("gpt-4o-mini"), # Structured extraction "flash": fc.GoogleVertexLanguageModel("gemini-2.0-flash-lite"), # Bulk processing "opus": fc.AnthropicLanguageModel("claude-opus-4") # Advanced reasoning } ) # Binary classification - use nano fc.semantic.classify(fc.col("text"), ["safe", "unsafe"], model_alias="nano") # Structured extraction - use mini fc.semantic.extract(fc.col("content"), ContentSchema, model_alias="mini") # Nuanced analysis - use opus only when required fc.semantic.map(analysis_prompt, model_alias="opus")
DistilBERT maintains 90% accuracy vs. BERT's 91% while cutting compute costs 40-60%. Apply this principle: smaller models for appropriate tasks.
Automatic Batching and Rate Limiting
Fenic batches requests and self-throttles based on configured limits:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "flash": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=300, # Requests per minute tpm=150_000 # Tokens per minute ) } ) )
The engine tracks real-time token usage and adjusts rates to stay within provider constraints. Async I/O with concurrent batching maximizes throughput while preventing throttling.
Reduce costs through deduplication before embedding or classification:
pythondeduplicated = ( df .drop_duplicates([fc.col("normalized_content")]) .with_column("embedding", fc.semantic.embed(fc.col("content"))) )
Process unique content rather than raw volume—often 40-60% cost reduction for media libraries with republished content.
Intelligent Caching
Cache expensive operations for reuse:
pythoncached_classifications = ( df .filter(fc.col("publish_date") >= "2024-01-01") .with_column("extracted", fc.semantic.extract(fc.col("content"), Schema)) .persist() ) # Multiple operations reuse cached classifications tech_content = cached_classifications.filter(fc.col("extracted").category == "tech") news_content = cached_classifications.filter(fc.col("extracted").category == "news")
Fenic also caches identical inference calls automatically within sessions—no redundant API calls for repeated prompts with identical inputs.
Production Reliability Features
Error Handling and Retry Logic
Fenic provides built-in retry logic for transient failures. For 429 quota exhausted responses, the system fails fast with clear errors rather than retry loops.
Token capacity guardrails validate context limits before submission:
pythonlarge_document_extraction = ( df .with_column( "summary", fc.semantic.map( "Summarize this document: {{content}}", content=fc.col("full_text"), model_alias="mini" ) ) ) # Raises clear error if documents exceed model token limits
Prevents silent failures where requests truncate without notification.
Data Lineage for Debugging
Every column and row has traceable origins:
pythonpipeline = ( media_df .filter(fc.col("duration") > 600) .with_column("extracted", fc.semantic.extract(fc.col("transcript"), ContentSchema)) .unnest("extracted") .filter(fc.col("category") == "education") ) lineage = pipeline.lineage() result = pipeline.collect()
Row-level lineage enables debugging when classifications fail or produce unexpected results. Trace backwards to identify error origins.
Monitoring and Observability
Built-in metrics provide first-class observability:
pythonresult = classification_pipeline.collect() metrics = result.metrics print(f"Total tokens: {metrics.lm_metrics.total_tokens}") print(f"Total cost: ${metrics.lm_metrics.total_cost}") print(f"Execution time: {metrics.execution_time}s") # Operator-level metrics show spending patterns for op_metric in metrics.operator_metrics: if op_metric.cost > 5.0: print(f"Expensive operator: {op_metric.name}, Cost: ${op_metric.cost}")
Track model usage distribution:
pythonprint(f"Nano usage: {metrics.lm_metrics['nano'].total_tokens} tokens") print(f"Mini usage: {metrics.lm_metrics['mini'].total_tokens} tokens") print(f"Cost per asset: ${metrics.lm_metrics.total_cost / len(result)}")
Organizations implementing semantic layers report 54% infrastructure time reduction through automated optimization and observability.
Case Study: RudderStack Implementation
RudderStack processes support tickets, sales calls, and OSS threads across Event Stream, Transformations, and 200+ integrations. 90% of inputs are unstructured, creating triage backlogs.
Implementation Details
Ingest and normalize:
- Warehouse: support ticket threads, sales transcripts, usage data
- Documentation: product docs, Notion PRDs, strategy documents
Semantic context model:
- Infer product taxonomy from documentation
- Map tickets, issues, PRDs to taxonomy with citations
- Create semantic links between related content
Automated classification:
python# First perform semantic join to find related tickets tickets_with_related = raw_tickets.semantic.join( other=historical_tickets, predicate="Are these tickets related?", left_on=fc.col("description"), right_on=fc.col("description") ) # Then classify and add rationale triage_pipeline = ( tickets_with_related .with_column( "classification", fc.semantic.extract(fc.col("ticket_content"), TicketSchema) ) .with_column( "priority_rationale", fc.semantic.map( """Provide prioritization rationale based on: - Customer tier: {{tier}} - Related ticket: {{related_description}} - Strategy alignment: {{strategy_match}}""", tier=fc.col("customer_tier"), related_description=fc.col("description"), strategy_match=fc.col("classification").strategy_alignment ) ) )
Linear integration:
- Agent subscribes to new feature requests
- Classifies to taxonomy, surfaces duplicates
- Proposes Prioritize/Monitor/Decline with rationale
- Writes back to Linear, sets "Under Review"
Results
- 95% reduction in PM time per triage
- 90%+ first-pass category acceptance
- Citations enabled: prospect and community signals in Linear
- Semantic links improved accuracy: Issues linked to PRDs and strategy
"It changed the way I think about feature requests… now we can double or triple the number of things that come through and still process it." — Eric Dodds, Head of Product
Advanced Classification Techniques
Multi-Modal Content Classification
Multi-modal approaches achieve 98.53% accuracy, 8-13 percentage points above text-only methods.
Process video with multiple modalities:
pythonfrom pydantic import BaseModel from typing import List class VideoClassification(BaseModel): visual_themes: List[str] = Field(description="Themes from visual analysis") audio_topics: List[str] = Field(description="Topics from audio/speech") text_content: List[str] = Field(description="On-screen text and captions") overall_category: str confidence_score: float video_classification = ( videos .with_column( "visual", fc.semantic.extract(fc.col("frame_descriptions"), VisualSchema, model_alias="mini") ) .with_column( "audio", fc.semantic.extract(fc.col("transcript"), AudioSchema, model_alias="mini") ) .with_column( "combined", fc.semantic.map( """Combine visual and audio analysis: Visual: {{visual}} Audio: {{audio}} Provide a VideoClassification with overall category and confidence.""", visual=fc.col("visual"), audio=fc.col("audio"), model_alias="mini" ) ) )
Semantic Clustering for Auto-Tagging
Generate tags automatically through similarity clustering:
python# Generate embeddings embedded_content = ( media_df .with_column("embedding", fc.semantic.embed(fc.col("description"))) ) # Cluster by semantic similarity clustered = embedded_content.semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=20 ) # Generate representative tags per cluster cluster_tags = ( clustered .group_by("cluster_label") .agg( fc.semantic.reduce( "Generate 3-5 descriptive tags for this cluster: {{descriptions}}", fc.col("description"), model_alias="mini" ).alias("cluster_tags") ) ) tagged_content = clustered.join(cluster_tags, on="cluster_label")
Organizations report 4x improvement in metrics consistency through semantic clustering that groups conceptually similar content even with different keywords.
Hybrid Rule-Based and Semantic Classification
Combine deterministic rules with semantic evaluation:
pythondef classify_content(df): # Fast rule-based classification for clear cases df = df.with_column( "rule_based_category", fc.when(fc.col("duration") < 60, "short_form") .when(fc.col("title").contains("tutorial"), "educational") .when(fc.col("source") == "news_feed", "news") .otherwise(None) ) # Semantic classification for ambiguous cases only needs_semantic = df.filter(fc.col("rule_based_category").is_null()) semantic_classified = needs_semantic.with_column( "semantic_category", fc.semantic.classify( fc.col("description"), classes=["educational", "entertainment", "news", "business"], model_alias="nano" ) ) # Combine results return df.with_column( "final_category", fc.coalesce(fc.col("rule_based_category"), fc.col("semantic_category")) )
Use cheap rule evaluation for 60-70% of content, reserve expensive semantic operations for ambiguous cases.
Optimized SVM models achieve 87.6% accuracy—traditional approaches remain viable for well-defined tasks with clear boundaries.
Deployment Patterns
Lakehouse-Native Processing
Fenic is pure compute with no proprietary storage—read and write to existing data lakes:
python# Read from S3/data lake raw_media = session.read.parquet("s3://media-lake/raw/*.parquet") # Process with semantic operators classified = ( raw_media .with_column("metadata", fc.semantic.extract("content", Schema)) .with_column("embedding", fc.semantic.embed("processed_content")) .filter(fc.col("quality_score") > 0.8) ) # Write back to data lake classified.write.parquet("s3://media-lake/classified/")
Full compatibility with Parquet, Iceberg, Delta Lake, Lance. Built on Apache Arrow for ecosystem interoperability—works with Spark, Polars, DuckDB, pandas.
Incremental Processing
Process only new content since last run:
python# Track last processed timestamp last_run = session.read.parquet("s3://checkpoints/last_run.parquet").collect()[0]["timestamp"] # Filter for new content new_content = ( session.read.parquet("s3://media-lake/raw/*.parquet") .filter(fc.col("created_at") > last_run) ) # Classify new content classified_new = new_content.with_column( "classification", fc.semantic.extract(fc.col("content"), Schema) ) # Append to existing classifications classified_new.write.parquet("s3://media-lake/classified/", mode="append") # Update checkpoint current_timestamp = fc.dt.current_timestamp() session.create_dataframe([{"timestamp": current_timestamp}]).write.parquet("s3://checkpoints/last_run.parquet", mode="overwrite")
PDF and Document Processing
Fenic 0.5.0 introduced native PDF parsing with page chunking and metadata extraction:
python# Extract PDF metadata pdfs = session.read.pdf_metadata("s3://documents/**/*.pdf", recursive=True) # Filter by metadata before expensive parsing relevant_pdfs = pdfs.filter( (fc.col("page_count") > 5) & (fc.col("creation_date") >= "2024-01-01") ) # Parse to markdown with page separators parsed = relevant_pdfs.select( fc.col("file_path"), fc.semantic.parse_pdf( fc.col("file_path"), page_separator="--- PAGE {page} ---", describe_images=True ).alias("markdown") ) # Extract structured metadata classified_docs = parsed.with_column( "metadata", fc.semantic.extract(fc.col("markdown"), DocumentSchema, model_alias="mini") )
Efficient document pipelines filter on metadata before expensive parsing and classification.
Implementation Checklist
Infrastructure setup:
- Install Fenic and configure provider keys
- Define model aliases for different classification tasks
- Set rate limits based on provider quotas
Schema design:
- Create Pydantic models with clear field descriptions
- Use Literal types to constrain outputs to valid categories
- Add Field descriptions for extraction context
Pipeline construction:
- Start with simple classification on small datasets
- Validate results and estimate costs using metrics
- Add caching for expensive operations
- Implement incremental processing for ongoing classification
Cost optimization:
- Use smaller models for simple tasks
- Deduplicate before expensive operations
- Batch requests automatically through Fenic
- Monitor per-operator costs and adjust
Production readiness:
- Implement error handling and retry logic
- Set up data lineage for debugging
- Configure observability and metrics tracking
- Test failure scenarios
Scale deployment:
- Process batch workloads offline
- Write results to data lake
- Expose through fast read-only queries
- Monitor performance and accuracy metrics
Organizations following this pattern report 45% reduction in time-to-insight and 90%+ classification accuracy at scale.
Summary
Production-grade content classification for media platforms requires infrastructure built for inference workloads:
Semantic operators as first-class operations enable automatic optimization impossible with traditional approaches. Declarative pipelines allow batch requests, intelligent caching, and global optimization.
Batch/real-time separation provides predictable performance. Heavy classification runs offline while real-time systems query enriched data without inference latency.
Strategic model selection reduces costs 80% while maintaining quality. Nano for simple classification, mini for structured extraction, reserve expensive models for advanced reasoning.
Schema-driven extraction eliminates brittle prompts through type-safe Pydantic models with automatic validation, error handling, and retry logic.
Built-in observability through metrics, row-level lineage, and cost tracking enables continuous optimization and debugging.
Results: 95% reduction in triage time, 90%+ first-pass accuracy, 45% reduction in time-to-insight.
Start with simple classification on small datasets, validate results and costs, then scale to production with infrastructure handling batching, optimization, error handling, and observability automatically.
Resources
- Fenic GitHub Repository
- Fenic Documentation
- Content Classification Statistics
- Building Reliable AI Pipelines with Fenic
- Intelligent Tagging System Trends
- RudderStack Case Study
- Semantic Processing Statistics
- Fenic 0.4.0 Release
- Fenic 0.5.0 Release How to Enable Content Classi ... efcf08072a7bac18dbb18b703.md External Displaying How to Enable Content Classification and Tagging a 290df41efcf08072a7bac18dbb18b703.md.

