AI agents need structured, reliable access to your data infrastructure to function in production. While agent frameworks handle orchestration and reasoning, the real bottleneck lies in connecting them to existing data warehouses, databases, and processing pipelines. Most teams end up building fragile middleware that breaks under production load.
This guide shows you how to integrate agentic pipelines with your existing data stack using Typedef's Fenic framework, which treats inference as a first-class operation within DataFrame semantics. You'll learn how to build type-safe data layers, expose them through Model Context Protocol (MCP) servers, and connect them to agent frameworks like LangGraph, LangChain, Mastra, and PydanticAI.
The Integration Challenge
Production AI agents face three critical integration problems when connecting to existing data infrastructure:
Data Format Mismatches
Your data warehouse stores structured SQL tables, your document store contains unstructured markdown and PDFs, and your vector database holds embeddings. Agents need a unified interface that handles all these formats without custom parsing logic for each source.
Context Window Limitations
Agents can't load entire datasets. They need pre-processed, filtered data delivered on-demand through parameterized queries. Traditional data tools weren't designed for this access pattern.
Infrastructure Brittleness
Teams typically build custom microservices to bridge agents and data systems. This creates maintenance overhead, introduces new failure modes, and scatters business logic across multiple codebases. Industry data shows only about 5% of AI pilots deliver measurable business impact, largely due to these infrastructure challenges.
Architecture Patterns for Integration
Production-ready agentic systems require three distinct layers working together:
Data Layer
Transforms raw data into agent-ready formats. This includes extraction from unstructured sources, semantic enrichment, schema validation, and batch preprocessing. Fenic handles this through its DataFrame API with native semantic operators.
Tool Layer
Exposes structured operations agents can invoke. Tools need type safety, parameter validation, runtime error handling, and consistent interfaces across all data sources.
Orchestration Layer
Manages agent state, coordinates tool calls between reasoning steps, implements routing logic, and handles errors gracefully. Frameworks like LangGraph, LangChain, Mastra, and PydanticAI handle this layer.
The key architectural insight: separate data preparation from agent runtime. Process heavy transformations in batch pipelines, then expose clean queryable interfaces through MCP servers.
Setting Up Your Data Layer
Start by configuring a Fenic session with your model providers and data sources:
pythonfrom fenic.api.session import Session, SessionConfig from fenic.api.session.config import SemanticConfig, OpenAILanguageModel from pathlib import Path config = SessionConfig( app_name="agent_data_layer", db_path=Path("./data/catalog.db"), # Persistent catalog semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=50000 ) }, default_language_model="fast" ) ) session = Session.get_or_create(config)
Loading Data from Multiple Sources
Fenic provides connectors for common data sources:
pythonimport fenic.api.functions as fc # Data warehouse tables warehouse_df = session.read.csv("s3://warehouse/customers/*.csv") # Document repositories docs_df = session.read.docs( "./documentation/**/*.md", content_type="markdown", recursive=True ) # HuggingFace datasets ml_data = session.read.parquet("hf://datasets/company/training/*.parquet") # Local files with batch loading support_tickets = session.read.docs( "/data/support_tickets/", content_type="markdown", recursive=True )
Processing Unstructured Data
Apply semantic operations to structure your data before agents access it:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class TicketInfo(BaseModel): category: Literal["billing", "technical", "account"] priority: Literal["low", "medium", "high", "critical"] customer_id: str = Field(description="Customer identifier") entities: List[str] = Field(description="Named entities mentioned") sentiment: str = Field(description="Customer sentiment") # Extract structured data from unstructured tickets processed_tickets = support_tickets.select( fc.col("file_path"), fc.col("content"), fc.semantic.extract( fc.col("content"), response_format=TicketInfo ).alias("ticket_info") ) # Filter and classify filtered_tickets = processed_tickets.filter( fc.col("ticket_info.priority").isin(["high", "critical"]) ).select( fc.col("ticket_info.*"), fc.semantic.classify( fc.col("ticket_info.category"), ["account_access", "payment_issue", "bug_report"], model_alias="fast" ).alias("subcategory") ) # Persist for agent queries filtered_tickets.write.save_as_table("support_tickets", mode="overwrite")
The semantic extraction capabilities transform unstructured documents into queryable tables with type-safe schemas.
Creating Parameterized Tools
Once data is processed, create tools agents can call with parameters:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType # Define parameterized query tickets_table = session.table("support_tickets") search_query = tickets_table.filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("priority") == fc.tool_param("priority", StringType)) ).limit(fc.tool_param("limit", IntegerType)) # Register as catalog tool session.catalog.create_tool( tool_name="search_tickets", tool_description="Search support tickets by category and priority", tool_query=search_query, tool_params=[ ToolParam( name="category", description="Ticket category", allowed_values=["billing", "technical", "account"] ), ToolParam( name="priority", description="Priority level", allowed_values=["low", "medium", "high", "critical"] ), ToolParam( name="limit", description="Maximum results", default_value=10, has_default=True ) ], result_limit=50 )
The declarative tool creation approach eliminates boilerplate while ensuring type safety and parameter validation.
System Tools for Common Operations
Fenic auto-generates standard tools for any table:
pythonfrom fenic.api.mcp.tools import SystemToolConfig # Generate schema, profile, read, search, and analyze tools system_tools = SystemToolConfig( table_names=["support_tickets", "customers"], tool_namespace="data", max_result_rows=100 )
This creates five tools per table:
data_schema: Returns column names and typesdata_profile: Provides statistical summariesdata_read: Paginated filtered accessdata_search_summary: Regex search across columnsdata_analyze: Raw SQL execution
Deploying MCP Servers
MCP servers bridge your data layer and agent frameworks. Fenic provides multiple deployment options.
Development Server
For local development and testing:
pythonfrom fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="AgentDataServer", user_defined_tools=tools, system_tools=system_tools, concurrency_limit=10 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production ASGI Deployment
For production environments with horizontal scaling:
pythonfrom fenic.api.mcp.server import run_mcp_server_asgi app = run_mcp_server_asgi( server, stateless_http=True, path="/mcp" ) # Deploy with: uvicorn server:app --workers 4 --port 8000
CLI Quick Start
For rapid prototyping:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_tickets analyze_metrics # Use stdio for direct integration fenic-serve --transport stdio
Integration Method: LangGraph
LangGraph excels at managing agent state and orchestrating multi-step workflows. Connect it to your Fenic tools through HTTP requests to the MCP server.
Setting Up Agent State
pythonfrom typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] search_results: dict analysis: dict
Wrapping MCP Tools
pythonfrom langchain_core.tools import tool import requests @tool def search_tickets(category: str, priority: str, limit: int = 10) -> dict: """Search support tickets by category and priority.""" response = requests.post( "http://localhost:8000/mcp/tools/search_tickets", json={ "category": category, "priority": priority, "limit": limit }, timeout=30 ) response.raise_for_status() return response.json() @tool def analyze_metrics(table_name: str, metric: str) -> dict: """Calculate metrics from data tables.""" response = requests.post( f"http://localhost:8000/mcp/tools/data_analyze", json={ "query": f"SELECT {metric}, COUNT(*) FROM {table_name} GROUP BY {metric}" }, timeout=30 ) response.raise_for_status() return response.json()
Building the Agent Graph
pythonfrom langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4") tools = [search_tickets, analyze_metrics] llm_with_tools = llm.bind_tools(tools) def call_model(state: AgentState): """Invoke LLM with current state.""" response = llm_with_tools.invoke(state["messages"]) return {"messages": [response]} def execute_tools(state: AgentState): """Execute tool calls from LLM.""" last_message = state["messages"][-1] tool_outputs = [] for tool_call in last_message.tool_calls: if tool_call["name"] == "search_tickets": result = search_tickets.invoke(tool_call["args"]) elif tool_call["name"] == "analyze_metrics": result = analyze_metrics.invoke(tool_call["args"]) tool_outputs.append({ "tool_call_id": tool_call["id"], "output": result }) return {"messages": tool_outputs} def should_continue(state: AgentState): """Determine if more tool calls needed.""" last_message = state["messages"][-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "continue" return "end" # Build graph workflow = StateGraph(AgentState) workflow.add_node("agent", call_model) workflow.add_node("tools", execute_tools) workflow.set_entry_point("agent") workflow.add_conditional_edges( "agent", should_continue, {"continue": "tools", "end": END} ) workflow.add_edge("tools", "agent") app = workflow.compile()
Integration Method: LangChain
LangChain agents benefit from structured data preprocessing. The integration pattern mirrors LangGraph but focuses on single-step tool execution.
Advanced Semantic Pipeline
Build sophisticated preprocessing pipelines:
python# Load customer feedback feedback_df = session.read.docs( "./feedback/**/*.md", content_type="markdown", recursive=True ) # Extract with embeddings class FeedbackAnalysis(BaseModel): sentiment: Literal["positive", "negative", "neutral"] main_topic: str action_items: List[str] urgency: Literal["low", "medium", "high"] enriched_feedback = feedback_df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=FeedbackAnalysis ).alias("analysis"), fc.semantic.embed(fc.col("content")).alias("embeddings") ) # Semantic clustering clustered = enriched_feedback.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=10, label_column="topic_cluster" ) clustered.write.save_as_table("feedback_analysis", mode="overwrite")
Creating Hybrid Search Tools
Combine semantic similarity with structured filters:
pythonfeedback_table = session.table("feedback_analysis") hybrid_search = feedback_table.filter( fc.col("analysis.sentiment") == fc.tool_param("sentiment", StringType) ).with_column( "similarity", fc.embedding.compute_similarity( fc.col("embeddings"), fc.tool_param("query_embedding", fc.col("embeddings").data_type), metric="cosine" ) ).filter( fc.col("similarity") > 0.7 ).order_by(fc.col("similarity").desc()) session.catalog.create_tool( tool_name="search_feedback", tool_description="Search customer feedback by sentiment and semantic similarity", tool_query=hybrid_search, tool_params=[ ToolParam(name="sentiment", description="Filter by sentiment"), ToolParam(name="query_embedding", description="Query embedding vector") ], result_limit=20 )
Integration Method: Mastra
Mastra provides TypeScript-based workflow orchestration. The integration uses Fenic's persistent catalog for durable memory.
Setting Up Persistent Memory
pythonfrom datetime import datetime # Create memory tables from fenic.core.types import StringType, TimestampType from fenic.core.types.schema import Schema, ColumnField conversation_schema = Schema([ ColumnField("conversation_id", StringType), ColumnField("timestamp", TimestampType), ColumnField("role", StringType), ColumnField("content", StringType), ColumnField("metadata", StringType) ]) session.catalog.create_database("agent_memory") session.catalog.set_current_database("agent_memory") session.catalog.create_table( "conversations", conversation_schema, description="Agent conversation history with timestamps" )
Memory Storage Functions
pythondef store_conversation_turn(conversation_id, role, content, metadata=None): """Store conversation turn in persistent catalog.""" df = session.create_dataframe({ "conversation_id": [conversation_id], "timestamp": [datetime.now()], "role": [role], "content": [content], "metadata": [metadata or "{}"] }) df.write.save_as_table("conversations", mode="append")
Memory Retrieval Tools
python# Tool to retrieve recent conversations recent_conversations = ( session.table("conversations") .filter( fc.col("conversation_id") == fc.tool_param("conversation_id", StringType) ) .sort(fc.col("timestamp").desc()) .limit(fc.tool_param("limit", IntegerType)) ) session.catalog.create_tool( tool_name="get_recent_conversations", tool_description="Retrieve recent conversation history", tool_query=recent_conversations, tool_params=[ ToolParam(name="conversation_id", description="Conversation ID"), ToolParam(name="limit", description="Max messages", default_value=10, has_default=True) ], result_limit=100 )
Mastra Agent Configuration
tsximport { Agent } from '@mastra/core/agent'; import { createTool } from '@mastra/core/tools'; import { z } from 'zod'; import { openai } from '@ai-sdk/openai'; const retrieveMemoryTool = createTool({ id: 'retrieve-memory', description: 'Retrieves conversation history from persistent memory', inputSchema: z.object({ conversation_id: z.string(), limit: z.number().default(10) }), execute: async ({ context }) => { const response = await fetch('http://127.0.0.1:8000/mcp', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ tool: 'get_recent_conversations', params: { conversation_id: context.conversation_id, limit: context.limit } }) }); const data = await response.json(); return data.results; } }); const memoryAgent = new Agent({ name: 'Memory Agent', instructions: 'You have access to persistent memory across sessions.', model: openai('gpt-4-turbo'), tools: { retrieveMemory: retrieveMemoryTool } });
Integration Method: PydanticAI
PydanticAI emphasizes type-safe tool calling. The integration leverages Fenic's declarative tool definitions.
Type-Safe Tool Creation
python# Define tool with complex parameter types from fenic.core.types import ArrayType, FloatType user_analysis = session.table("users").filter( fc.col("user_id").is_in(fc.tool_param("user_ids", ArrayType(IntegerType))) ).group_by("segment").agg( fc.count("*").alias("user_count"), fc.avg("lifetime_value").alias("avg_ltv") ) session.catalog.create_tool( tool_name="analyze_user_segments", tool_description="Analyze user segments for given user IDs", tool_query=user_analysis, tool_params=[ ToolParam( name="user_ids", description="List of user IDs to analyze" ) ], result_limit=100 )
Pydantic Model Extraction
pythonclass UserProfile(BaseModel): name: str = Field(description="Full name") role: str = Field(description="Job title") skills: List[str] = Field(description="Technical skills") experience_years: int = Field(ge=0, le=50) users_df = session.read.csv("users.csv") profiles = users_df.select( fc.col("user_id"), fc.semantic.extract( fc.col("bio_text"), response_format=UserProfile ).alias("profile") ) profiles.write.save_as_table("user_profiles", mode="overwrite")
PydanticAI Configuration
pythonfrom pydantic_ai import Agent agent = Agent( "openai:gpt-4", system_prompt="You are a user data assistant.", mcp_servers={ "fenic_tools": { "url": "http://127.0.0.1:8000/mcp", "transport": "http" } } ) # Agent automatically discovers and validates tools result = await agent.run( "Analyze the user segments for IDs [1001, 1002, 1003]" )
Production Considerations
Error Handling
Implement robust error handling at every layer:
pythondef safe_tool_execution(state): """Execute tools with comprehensive error handling.""" tool_outputs = [] for tool_call in state["messages"][-1].tool_calls: try: result = requests.post( f"http://localhost:8000/mcp/tools/{tool_call['name']}", json=tool_call["args"], timeout=30 ) result.raise_for_status() tool_outputs.append({ "tool_call_id": tool_call["id"], "output": result.json() }) except requests.exceptions.Timeout: tool_outputs.append({ "tool_call_id": tool_call["id"], "output": {"error": "Tool execution timeout"} }) except requests.exceptions.RequestException as e: tool_outputs.append({ "tool_call_id": tool_call["id"], "output": {"error": f"Request failed: {str(e)}"} }) return {"messages": tool_outputs}
Async Operations for Throughput
Use async UDFs for concurrent external API calls:
pythonimport aiohttp from fenic.api.functions import async_udf from fenic.core.types import StructType, StructField, IntegerType, StringType @async_udf( return_type=StructType([ StructField("status", IntegerType), StructField("data", StringType) ]), max_concurrency=20, timeout_seconds=10, num_retries=3 ) async def enrich_from_api(record_id: str) -> dict: """Call external API with automatic retries and timeout.""" async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/records/{record_id}" ) as resp: return { "status": resp.status, "data": await resp.text() } # Apply to DataFrame enriched_df = df.select( fc.col("*"), enrich_from_api(fc.col("record_id")).alias("api_data") )
The async UDF capabilities maintain DataFrame semantics while maximizing I/O throughput.
Monitoring and Cost Tracking
Track performance and costs through built-in metrics:
python# Query execution metrics metrics_df = session.table("fenic_system.query_metrics") # Analyze tool usage tool_usage = metrics_df.select( fc.col("query_text"), fc.col("total_lm_cost"), fc.col("latency_ms"), fc.col("total_lm_requests") ).order_by(fc.col("total_lm_cost").desc()) tool_usage.show(20) # Aggregate by time window daily_costs = session.sql(""" SELECT DATE(end_ts) as date, SUM(total_lm_cost) as total_cost, COUNT(*) as query_count, AVG(latency_ms) as avg_latency FROM {metrics} WHERE query_text LIKE '%search_tickets%' GROUP BY DATE(end_ts) ORDER BY date DESC """, metrics=metrics_df) daily_costs.show()
Rate Limit Management
Configure per-model rate limits in session setup:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, # Requests per minute tpm=500000 # Tokens per minute ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100000 ), "reasoning": OpenAILanguageModel( model_name="o4-mini", rpm=100, tpm=100000, profiles={ "fast": OpenAILanguageModel.Profile( reasoning_effort="low" ), "thorough": OpenAILanguageModel.Profile( reasoning_effort="high" ) }, default_profile="fast" ) } ) )
Best Practices
Data Preparation Strategy
Preprocess once, query many times
Run expensive semantic operations in batch pipelines. Agents should query clean, structured tables, not raw documents:
python# Bad: Agent calls semantic operations on each query def agent_tool(query): raw_docs = session.read.docs("./docs/") results = raw_docs.select( fc.semantic.extract(fc.col("content"), Schema) # Slow! ).filter(fc.col("extracted.topic") == query) return results # Good: Preprocess once, query many times processed_docs = ( session.read.docs("./docs/") .select(fc.semantic.extract(fc.col("content"), Schema)) .persist() # Cache expensive operation ) processed_docs.write.save_as_table("processed_docs", mode="overwrite") def agent_tool(query): # Fast filtered query on preprocessed data return session.table("processed_docs").filter( fc.col("topic") == query )
Use appropriate model tiers
Configure different models for different task complexities:
python# Cheap model for simple classification df.select( fc.semantic.classify( fc.col("category"), ["bug", "feature", "question"], model_alias="fast" ) ) # Accurate model for complex extraction df.select( fc.semantic.extract( fc.col("complex_document"), ComplexSchema, model_alias="accurate" ) )
Tool Design Principles
Keep tools focused
Each tool should perform one specific task. Compose complex operations in the orchestration layer:
python# Bad: One tool doing too much session.catalog.create_tool( tool_name="everything", tool_description="Search, analyze, and summarize all data", # ... complex multi-step query ) # Good: Focused tools composed by agent session.catalog.create_tool( tool_name="search_data", tool_description="Search records by criteria", # ... simple filtered query ) session.catalog.create_tool( tool_name="analyze_results", tool_description="Calculate statistics on search results", # ... aggregation query )
Document thoroughly
AI agents rely on tool descriptions to choose appropriate tools:
pythonsession.catalog.set_table_description( "support_tickets", "Customer support tickets from 2023-2025. Includes ticket ID, " "category, priority, customer info, resolution status, and timestamps. " "Use for analyzing support patterns and customer issues." )
Validate inputs
Use ToolParam with allowed_values to prevent invalid inputs:
pythonToolParam( name="priority", description="Priority level for filtering", allowed_values=["low", "medium", "high", "critical"] )
Orchestration Patterns
Implement circuit breakers
Track consecutive failures and stop calling broken tools:
pythondef should_retry_tool(state): """Circuit breaker for tool failures.""" consecutive_failures = state.get("consecutive_failures", 0) if consecutive_failures >= 3: return "fallback" return "retry" workflow.add_conditional_edges( "tool_error", should_retry_tool, {"retry": "tools", "fallback": "human_review"} )
Separate batch and real-time
Use Fenic for heavy batch processing. Let agents query preprocessed results:
python# Batch: Run daily to process new data def daily_preprocessing_job(): new_data = session.read.csv("s3://bucket/daily/*.csv") processed = new_data.select( fc.col("*"), fc.semantic.extract(fc.col("text"), Schema), fc.semantic.embed(fc.col("text")) ) processed.write.save_as_table("daily_data", mode="append") # Real-time: Agents query preprocessed data instantly def agent_query(filters): return session.table("daily_data").filter(filters).limit(10)
Use stateless HTTP for scale
Enable horizontal scaling of MCP servers:
pythonrun_mcp_server_sync( server, transport="http", stateless_http=True, # No session state port=8000 )
Production Deployment
Local development, cloud execution
The same code runs locally and in production:
python# Local development config = SessionConfig(app_name="dev_pipeline") session = Session.get_or_create(config) df = session.read.csv("./local_data.csv") results = df.select(fc.semantic.extract(fc.col("text"), Schema)) results.write.parquet("./results.parquet")
python# Production - same code, cloud execution from fenic.api.session.config import CloudConfig, CloudExecutorSize config = SessionConfig( app_name="prod_pipeline", cloud=CloudConfig( size=CloudExecutorSize.LARGE # Scale up ) ) session = Session.get_or_create(config) df = session.read.csv("s3://bucket/data/*.csv") results = df.select(fc.semantic.extract(fc.col("text"), Schema)) results.write.parquet("s3://bucket/results/")
Real-World Impact
Teams using this integration approach report significant improvements:
95% Reduction in Triage Time
RudderStack integrated Typedef with their Linear workflow, achieving one-pass feature request triage with semantic classification and automatic citation. PMs now review five high-value items each morning instead of spending hours on manual categorization.
100x Time Savings
Enterprise analytics teams transformed OLAP warehouses into dynamic product-signal engines. Product managers query diverse datasets with LLM categorizations in minutes instead of weeks of manual processing.
Days Not Months
Insurance companies deploy semantic extraction pipelines across thousands of policies and transcripts in days, eliminating errors from human analysis and significantly reducing operational risk.
Conclusion
Integrating agentic pipelines with existing data stacks requires separating three concerns: data preparation, tool definition, and orchestration. By preprocessing data with semantic operations, exposing type-safe tools through MCP servers, and connecting to agent frameworks, you build production-grade systems that scale.
The key architectural decisions:
- Use Fenic's DataFrame API for batch semantic preprocessing
- Create declarative, catalog-backed tools with parameter validation
- Deploy MCP servers as the bridge between data and agents
- Choose the right integration pattern for your orchestration framework
- Implement error handling, monitoring, and rate limiting for production reliability
Whether you're using LangGraph, LangChain, Mastra, or PydanticAI, the pattern remains consistent: build reliable data pipelines that agents can trust.
Ready to start building? Install Fenic and explore the open-source framework:
bashpip install fenic
For production-scale deployments, Typedef Cloud provides serverless execution, advanced mixed AI workflows, and enterprise features that eliminate fragile glue code from your infrastructure. How to Integrate Agentic Pip ... efcf080339466ed8c010bb3ca.md External Displaying How to Integrate Agentic Pipelines with Existing D 296df41efcf080339466ed8c010bb3ca.md.

