Building production-ready AI agents requires more than just connecting an LLM to some tools. You need robust data pipelines, reliable tool execution, proper error handling, and stateful orchestration. This guide shows you how to combine Typedef's Fenic framework with LangGraph to build agents that actually work in production.
Agent Architecture Overview
Modern AI agents consist of three critical layers:
Data Layer: Transforms raw data into agent-ready formats. This includes extraction, enrichment, semantic operations, and preparing context for decision-making.
Tool Layer: Exposes structured operations that agents can invoke. Tools need type safety, parameter validation, and consistent execution patterns.
Orchestration Layer: Manages agent state, coordinates tool calls, handles errors, and implements routing logic between different processing steps.
Fenic handles the first two layers through its DataFrame API and MCP server capabilities. LangGraph manages the third layer with its state graph architecture. Together, they create a reliable foundation for agent systems.
Why Fenic for Agent Data and Tools
The DataFrame Advantage
Fenic brings structure to probabilistic systems. While LLM outputs are stochastic, DataFrame operations remain deterministic and traceable. Every transformation has clear lineage, making agent behavior auditable and debuggable.
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel import fenic.api.functions as fc # Configure session with rate limits config = SessionConfig( app_name="agent_pipeline", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # Load and process documents df = session.read.docs("/data/support_tickets/", content_type="markdown", recursive=True) # Extract structured information from pydantic import BaseModel, Field from typing import List class TicketInfo(BaseModel): category: str = Field(description="Support ticket category") priority: str = Field(description="Priority level: low, medium, high, critical") entities: List[str] = Field(description="Named entities mentioned") df = df.select( fc.col("file_name"), fc.semantic.extract(fc.col("content"), TicketInfo).alias("ticket_info") )
This preprocessing happens once, producing clean structured data that agents can query repeatedly without reprocessing.
Semantic Operations at Scale
Fenic's semantic functions integrate LLM operations directly into data pipelines with built-in rate limiting and retry logic.
python# Classify tickets with automatic retries df = df.select( fc.col("ticket_info"), fc.semantic.classify( fc.col("ticket_info.category"), ["Account Access", "Billing Issue", "Technical Problem"], model_alias="gpt4" ).alias("classification") ) # Generate summaries with structured output df = df.select( fc.col("*"), fc.semantic.map( "Summarize this support ticket in one sentence: {{ content }}", content=fc.col("content"), model_alias="gpt4" ).alias("summary") )
Rate limits defined in the session config prevent API quota exhaustion. Fenic automatically manages request pacing across your entire pipeline.
Building Declarative Agent Tools
Fenic's catalog-backed tool system eliminates boilerplate while ensuring type safety. Define tools as DataFrame queries, not manual function wrappers.
Creating Catalog Tools
pythonfrom fenic.core.mcp.types import ToolParam # Save processed tickets as a table df.write.save_as_table("support_tickets", mode="overwrite") session.catalog.set_table_description( "support_tickets", "Customer support tickets with extracted metadata and classifications" ) from fenic.core.types import StringType, IntegerType # Define a search tool search_df = session.table("support_tickets").filter( fc.col("classification") == fc.tool_param("category", StringType) ).limit(fc.tool_param("limit", IntegerType)) session.catalog.create_tool( tool_name="search_tickets", tool_description="Search support tickets by category", tool_query=search_df, tool_params=[ ToolParam( name="category", description="Ticket category to filter by", allowed_values=["Account Access", "Billing Issue", "Technical Problem"] ), ToolParam( name="limit", description="Maximum number of results", default_value=10 ) ], result_limit=50 ) from fenic.core.types import StringType # Define an analysis tool support_tickets_table = session.table("support_tickets") analysis_df = session.sql(""" SELECT classification, COUNT(*) as ticket_count, AVG(LENGTH(content)) as avg_length FROM {support_tickets} WHERE classification = tool_param('category') GROUP BY classification """, support_tickets=support_tickets_table) session.catalog.create_tool( tool_name="analyze_ticket_metrics", tool_description="Get statistics for tickets in a specific category", tool_query=analysis_df, tool_params=[ ToolParam( name="category", description="Category to analyze" ) ] )
Tools defined this way are versionable metadata. Schema changes flow through the catalog automatically.
System Tools for Data Access
Fenic can auto-generate common tools for any table in your catalog:
pythonfrom fenic.api.mcp.tools import SystemToolConfig # Generate read, profile, schema, search, and analyze tools system_tools = SystemToolConfig( table_names=["support_tickets"], tool_namespace="support", max_result_rows=100 )
This creates five tools instantly:
support_schema
: Returns column names and typessupport_profile
: Provides column statisticssupport_read
: Pages through filtered datasupport_search_summary
: Regex search across text columnssupport_analyze
: Execute SQL queries
Running the MCP Server
Expose your tools through Fenic's MCP server implementation. Multiple deployment options support different architectures.
Development Mode
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session, "SupportTicketServer", user_defined_tools=tools, concurrency_limit=8 ) run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1" )
Production ASGI Deployment
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi tools = session.catalog.list_tools() server = create_mcp_server(session, "SupportTicketServer", user_defined_tools=tools) app = run_mcp_server_asgi( server, stateless_http=True, path="/mcp" ) # Deploy with: uvicorn server:app --workers 4 --port 8000
CLI Quick Start
For rapid prototyping, use the fenic-serve
command:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_tickets analyze_ticket_metrics # Use stdio for direct integration fenic-serve --transport stdio
Integrating with LangGraph
LangGraph excels at managing agent state and orchestrating multi-step workflows. Fenic tools become function calls within LangGraph's state machine.
Setting Up the Agent State
pythonfrom typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] ticket_data: dict analysis_results: dict next_action: str
Connecting Fenic Tools to LangGraph
pythonfrom langchain_core.tools import tool import requests # Wrap MCP server endpoints as LangGraph tools @tool def search_tickets(category: str, limit: int = 10) -> dict: """Search support tickets by category.""" response = requests.post( "http://localhost:8000/mcp/tools/search_tickets", json={"category": category, "limit": limit} ) return response.json() @tool def analyze_ticket_metrics(category: str) -> dict: """Get statistics for tickets in a category.""" response = requests.post( "http://localhost:8000/mcp/tools/analyze_ticket_metrics", json={"category": category} ) return response.json() # Bind tools to your LLM from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4") tools = [search_tickets, analyze_ticket_metrics] llm_with_tools = llm.bind_tools(tools)
Building the Agent Graph
pythonfrom langgraph.graph import StateGraph, END from langchain_core.messages import HumanMessage, AIMessage def call_model(state: AgentState): """Invoke the LLM with current state.""" messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]} def execute_tools(state: AgentState): """Execute tool calls from the LLM.""" last_message = state["messages"][-1] # Extract tool calls from the message tool_outputs = [] for tool_call in last_message.tool_calls: tool_name = tool_call["name"] tool_args = tool_call["args"] # Execute the appropriate tool if tool_name == "search_tickets": result = search_tickets.invoke(tool_args) elif tool_name == "analyze_ticket_metrics": result = analyze_ticket_metrics.invoke(tool_args) tool_outputs.append({ "tool_call_id": tool_call["id"], "output": result }) return {"messages": tool_outputs} def should_continue(state: AgentState): """Determine if more tool calls are needed.""" last_message = state["messages"][-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "continue" return "end" # Build the graph workflow = StateGraph(AgentState) workflow.add_node("agent", call_model) workflow.add_node("tools", execute_tools) workflow.set_entry_point("agent") workflow.add_conditional_edges( "agent", should_continue, { "continue": "tools", "end": END } ) workflow.add_edge("tools", "agent") app = workflow.compile()
Running the Agent
python# Initialize with a user query initial_state = { "messages": [ HumanMessage(content="Show me the top 5 billing issues and analyze their patterns") ], "ticket_data": {}, "analysis_results": {}, "next_action": "" } # Execute the agent result = app.invoke(initial_state) # Access the final response final_message = result["messages"][-1] print(final_message.content)
The agent automatically:
- Calls
search_tickets(category="Billing Issue", limit=5)
- Receives structured data from Fenic
- Calls
analyze_ticket_metrics(category="Billing Issue")
- Synthesizes findings into a natural language response
Building Reliable Async Operations
For agents that need to make multiple parallel API calls or database queries, Fenic's async UDFs maintain DataFrame semantics while maximizing throughput.
Async Tool Execution
pythonimport fenic.api.functions as fc from fenic.api.functions.builtin import async_udf from fenic.core.types import StringType, StructType, StructField, IntegerType import aiohttp @async_udf( return_type=StructType([ StructField("status", IntegerType), StructField("response", StringType) ]), max_concurrency=20, timeout_seconds=10, num_retries=3 ) async def call_external_api(ticket_id: str) -> dict: """Call external API for enrichment data.""" async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/tickets/{ticket_id}" ) as resp: return { "status": resp.status, "response": await resp.text() } # Apply to DataFrame df = df.select( fc.col("*"), call_external_api(fc.col("ticket_id")).alias("external_data") )
Key reliability features:
- Bounded concurrency:
max_concurrency=20
limits parallel requests - Automatic retries:
num_retries=3
handles transient failures - Timeout protection:
timeout_seconds=10
prevents hanging requests - Ordered results: Output matches input row order
- Graceful degradation: Individual failures return None rather than crashing the pipeline
Implementing Error Handling
Production agents need robust error handling at every layer.
Session-Level Configuration
pythonfrom fenic.api.session.config import SessionConfig, SemanticConfig, AnthropicLanguageModel config = SessionConfig( app_name="production_agent", semantic=SemanticConfig( language_models={ "claude": AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=50000, output_tpm=50000, profiles={ "fast": AnthropicLanguageModel.Profile(), "thorough": AnthropicLanguageModel.Profile( thinking_token_budget=4096 ) }, default_profile="fast" ) } ) ) session = Session.get_or_create(config)
Rate limits prevent quota exhaustion. Model profiles let you adjust reasoning effort based on task complexity.
Tool-Level Error Handling
pythonfrom fenic.core.error import ExecutionError, ValidationError try: # Create tool with validation session.catalog.create_tool( tool_name="search_tickets", tool_description="Search support tickets", tool_query=search_df, tool_params=[ ToolParam( name="category", description="Category to search", allowed_values=["Account Access", "Billing Issue", "Technical Problem"] ) ] ) except ValidationError as e: print(f"Tool configuration error: {e}") except ExecutionError as e: print(f"Tool execution failed: {e}")
LangGraph Error Handling
pythondef safe_tool_execution(state: AgentState): """Execute tools with error handling.""" last_message = state["messages"][-1] tool_outputs = [] for tool_call in last_message.tool_calls: try: tool_name = tool_call["name"] tool_args = tool_call["args"] # Execute with timeout result = requests.post( f"http://localhost:8000/mcp/tools/{tool_name}", json=tool_args, timeout=30 ) result.raise_for_status() tool_outputs.append({ "tool_call_id": tool_call["id"], "output": result.json() }) except requests.exceptions.Timeout: tool_outputs.append({ "tool_call_id": tool_call["id"], "output": {"error": "Tool execution timed out"} }) except requests.exceptions.RequestException as e: tool_outputs.append({ "tool_call_id": tool_call["id"], "output": {"error": f"Tool execution failed: {str(e)}"} }) return {"messages": tool_outputs}
Monitoring and Metrics
Track performance and costs with Fenic's built-in metrics system.
python# Query execution metrics metrics_df = session.table("fenic_system.query_metrics") # Analyze model usage model_costs = metrics_df.select( fc.col("model"), fc.col("latency_ms"), fc.col("cost_usd"), fc.col("input_tokens"), fc.col("output_tokens") ).order_by("cost_usd", ascending=False) model_costs.show(20) # Aggregate statistics summary = metrics_df.group_by("model").agg( fc.count("*").alias("total_calls"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency") ) summary.show()
This telemetry helps identify bottlenecks and optimize both Fenic pipelines and LangGraph orchestration logic.
Best Practices
Data Preparation
Preprocess once, query many times: Use Fenic to extract and structure data before agent runtime. Agents should query clean DataFrames, not raw documents.
Use persist() for reused data: Cache intermediate results that multiple tools access.
pythonprocessed_df = ( raw_df .select(fc.semantic.extract(fc.col("content"), TicketInfo)) .persist() # Cache after first computation )
Tool Design
Keep tools focused: Each tool should do one thing well. Compose advanced operations in LangGraph, not in individual tools.
Use catalog descriptions: Document tables and tools thoroughly. This context helps LLMs choose appropriate tools.
pythonsession.catalog.set_table_description( "support_tickets", "Customer support tickets from 2023-2025. Includes ticket ID, category, priority, customer info, and resolution status." )
Validate inputs with ToolParam: Define allowed_values
to prevent invalid inputs.
Orchestration
Implement circuit breakers: Track consecutive failures and stop calling broken tools.
pythondef should_retry_tool(state: AgentState): """Check if tool should be retried.""" consecutive_failures = state.get("consecutive_failures", 0) if consecutive_failures >= 3: return "fallback" return "retry"
Use stateless HTTP: For horizontally scalable deployments, enable stateless_http=True
in your MCP server.
Separate batch and real-time: Use Fenic for heavy batch processing. Let agents query the processed results in real-time.
Common Patterns
Multi-Stage Agent Pipeline
python# Stage 1: Data preparation (runs periodically) # Stage 1: Data preparation (runs periodically) def prepare_data(): df = session.read.docs("/data/new_tickets/", content_type="markdown", recursive=True) df = df.select( fc.col("*"), fc.semantic.extract(fc.col("content"), TicketInfo).alias("info"), fc.semantic.classify(fc.col("content"), categories).alias("category") ) df.write.save_as_table("support_tickets", mode="append") # Stage 2: Agent runtime (responds to queries) def run_agent(user_query: str): state = {"messages": [HumanMessage(content=user_query)]} result = app.invoke(state) return result["messages"][-1].content
Conditional Tool Routing
pythondef route_based_on_intent(state: AgentState): """Route to different tools based on user intent.""" last_message = state["messages"][-1].content # Use semantic classification to determine intent intent_df = session.create_dataframe({"query": [last_message]}) intent_df = intent_df.select( fc.semantic.classify( fc.col("query"), ["search", "analyze", "summarize"] ).alias("intent") ) intent = intent_df.to_pydict()["intent"][0] return intent # Returns "search", "analyze", or "summarize" # Add to graph with routing workflow.add_conditional_edges( "route_intent", route_based_on_intent, { "search": "search_node", "analyze": "analyze_node", "summarize": "summarize_node" } )
Feedback Loops
python# Collect agent interactions for continuous improvement interactions_df = session.create_dataframe({ "query": user_queries, "response": agent_responses, "tools_used": tools_called, "user_feedback": feedback_scores }) interactions_df.write.save_as_table("agent_interactions", mode="append") # Analyze which tools work best performance = session.table("agent_interactions").group_by("tools_used").agg( fc.avg("user_feedback").alias("avg_feedback"), fc.count("*").alias("usage_count") )
Next Steps
Start building with Typedef and LangGraph:
- Install Fenic:
pip install fenic
- Clone examples: Check the GitHub repository for sample implementations
- Join the community: Get help in the Discord server
- Read the docs: Full API documentation at docs.fenic.ai
The combination of Fenic's data-centric approach and LangGraph's orchestration capabilities provides a solid foundation for production AI agents. Start with small tools, validate reliability, then scale to advanced multi-agent systems.