<< goback()

How to Orchestrate Reliable Agents with Typedef and LangGraph

Typedef Team

How to Orchestrate Reliable Agents with Typedef and LangGraph

Building production-ready AI agents requires more than just connecting an LLM to some tools. You need robust data pipelines, reliable tool execution, proper error handling, and stateful orchestration. This guide shows you how to combine Typedef's Fenic framework with LangGraph to build agents that actually work in production.

Agent Architecture Overview

Modern AI agents consist of three critical layers:

Data Layer: Transforms raw data into agent-ready formats. This includes extraction, enrichment, semantic operations, and preparing context for decision-making.

Tool Layer: Exposes structured operations that agents can invoke. Tools need type safety, parameter validation, and consistent execution patterns.

Orchestration Layer: Manages agent state, coordinates tool calls, handles errors, and implements routing logic between different processing steps.

Fenic handles the first two layers through its DataFrame API and MCP server capabilities. LangGraph manages the third layer with its state graph architecture. Together, they create a reliable foundation for agent systems.

Why Fenic for Agent Data and Tools

The DataFrame Advantage

Fenic brings structure to probabilistic systems. While LLM outputs are stochastic, DataFrame operations remain deterministic and traceable. Every transformation has clear lineage, making agent behavior auditable and debuggable.

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel
import fenic.api.functions as fc

# Configure session with rate limits
config = SessionConfig(
    app_name="agent_pipeline",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# Load and process documents
df = session.read.docs("/data/support_tickets/", content_type="markdown", recursive=True)

# Extract structured information
from pydantic import BaseModel, Field
from typing import List

class TicketInfo(BaseModel):
    category: str = Field(description="Support ticket category")
    priority: str = Field(description="Priority level: low, medium, high, critical")
    entities: List[str] = Field(description="Named entities mentioned")

df = df.select(
    fc.col("file_name"),
    fc.semantic.extract(fc.col("content"), TicketInfo).alias("ticket_info")
)

This preprocessing happens once, producing clean structured data that agents can query repeatedly without reprocessing.

Semantic Operations at Scale

Fenic's semantic functions integrate LLM operations directly into data pipelines with built-in rate limiting and retry logic.

python
# Classify tickets with automatic retries
df = df.select(
    fc.col("ticket_info"),
    fc.semantic.classify(
        fc.col("ticket_info.category"),
        ["Account Access", "Billing Issue", "Technical Problem"],
        model_alias="gpt4"
    ).alias("classification")
)

# Generate summaries with structured output
df = df.select(
    fc.col("*"),
    fc.semantic.map(
        "Summarize this support ticket in one sentence: {{ content }}",
        content=fc.col("content"),
        model_alias="gpt4"
    ).alias("summary")
)

Rate limits defined in the session config prevent API quota exhaustion. Fenic automatically manages request pacing across your entire pipeline.

Building Declarative Agent Tools

Fenic's catalog-backed tool system eliminates boilerplate while ensuring type safety. Define tools as DataFrame queries, not manual function wrappers.

Creating Catalog Tools

python
from fenic.core.mcp.types import ToolParam

# Save processed tickets as a table
df.write.save_as_table("support_tickets", mode="overwrite")
session.catalog.set_table_description(
    "support_tickets",
    "Customer support tickets with extracted metadata and classifications"
)

from fenic.core.types import StringType, IntegerType

# Define a search tool
search_df = session.table("support_tickets").filter(
    fc.col("classification") == fc.tool_param("category", StringType)
).limit(fc.tool_param("limit", IntegerType))

session.catalog.create_tool(
    tool_name="search_tickets",
    tool_description="Search support tickets by category",
    tool_query=search_df,
    tool_params=[
        ToolParam(
            name="category",
            description="Ticket category to filter by",
            allowed_values=["Account Access", "Billing Issue", "Technical Problem"]
        ),
        ToolParam(
            name="limit",
            description="Maximum number of results",
            default_value=10
        )
    ],
    result_limit=50
)

from fenic.core.types import StringType

# Define an analysis tool
support_tickets_table = session.table("support_tickets")
analysis_df = session.sql("""
    SELECT
        classification,
        COUNT(*) as ticket_count,
        AVG(LENGTH(content)) as avg_length
    FROM {support_tickets}
    WHERE classification = tool_param('category')
    GROUP BY classification
""", support_tickets=support_tickets_table)

session.catalog.create_tool(
    tool_name="analyze_ticket_metrics",
    tool_description="Get statistics for tickets in a specific category",
    tool_query=analysis_df,
    tool_params=[
        ToolParam(
            name="category",
            description="Category to analyze"
        )
    ]
)

Tools defined this way are versionable metadata. Schema changes flow through the catalog automatically.

System Tools for Data Access

Fenic can auto-generate common tools for any table in your catalog:

python
from fenic.api.mcp.tools import SystemToolConfig

# Generate read, profile, schema, search, and analyze tools
system_tools = SystemToolConfig(
    table_names=["support_tickets"],
    tool_namespace="support",
    max_result_rows=100
)

This creates five tools instantly:

  • support_schema: Returns column names and types
  • support_profile: Provides column statistics
  • support_read: Pages through filtered data
  • support_search_summary: Regex search across text columns
  • support_analyze: Execute SQL queries

Running the MCP Server

Expose your tools through Fenic's MCP server implementation. Multiple deployment options support different architectures.

Development Mode

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session,
    "SupportTicketServer",
    user_defined_tools=tools,
    concurrency_limit=8
)

run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1"
)

Production ASGI Deployment

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

tools = session.catalog.list_tools()
server = create_mcp_server(session, "SupportTicketServer", user_defined_tools=tools)

app = run_mcp_server_asgi(
    server,
    stateless_http=True,
    path="/mcp"
)

# Deploy with: uvicorn server:app --workers 4 --port 8000

CLI Quick Start

For rapid prototyping, use the fenic-serve command:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_tickets analyze_ticket_metrics

# Use stdio for direct integration
fenic-serve --transport stdio

Integrating with LangGraph

LangGraph excels at managing agent state and orchestrating multi-step workflows. Fenic tools become function calls within LangGraph's state machine.

Setting Up the Agent State

python
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    ticket_data: dict
    analysis_results: dict
    next_action: str

Connecting Fenic Tools to LangGraph

python
from langchain_core.tools import tool
import requests

# Wrap MCP server endpoints as LangGraph tools
@tool
def search_tickets(category: str, limit: int = 10) -> dict:
    """Search support tickets by category."""
    response = requests.post(
        "http://localhost:8000/mcp/tools/search_tickets",
        json={"category": category, "limit": limit}
    )
    return response.json()

@tool
def analyze_ticket_metrics(category: str) -> dict:
    """Get statistics for tickets in a category."""
    response = requests.post(
        "http://localhost:8000/mcp/tools/analyze_ticket_metrics",
        json={"category": category}
    )
    return response.json()

# Bind tools to your LLM
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
tools = [search_tickets, analyze_ticket_metrics]
llm_with_tools = llm.bind_tools(tools)

Building the Agent Graph

python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage

def call_model(state: AgentState):
    """Invoke the LLM with current state."""
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

def execute_tools(state: AgentState):
    """Execute tool calls from the LLM."""
    last_message = state["messages"][-1]

    # Extract tool calls from the message
    tool_outputs = []
    for tool_call in last_message.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        # Execute the appropriate tool
        if tool_name == "search_tickets":
            result = search_tickets.invoke(tool_args)
        elif tool_name == "analyze_ticket_metrics":
            result = analyze_ticket_metrics.invoke(tool_args)

        tool_outputs.append({
            "tool_call_id": tool_call["id"],
            "output": result
        })

    return {"messages": tool_outputs}

def should_continue(state: AgentState):
    """Determine if more tool calls are needed."""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "continue"
    return "end"

# Build the graph
workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", execute_tools)

workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "tools",
        "end": END
    }
)
workflow.add_edge("tools", "agent")

app = workflow.compile()

Running the Agent

python
# Initialize with a user query
initial_state = {
    "messages": [
        HumanMessage(content="Show me the top 5 billing issues and analyze their patterns")
    ],
    "ticket_data": {},
    "analysis_results": {},
    "next_action": ""
}

# Execute the agent
result = app.invoke(initial_state)

# Access the final response
final_message = result["messages"][-1]
print(final_message.content)

The agent automatically:

  1. Calls search_tickets(category="Billing Issue", limit=5)
  2. Receives structured data from Fenic
  3. Calls analyze_ticket_metrics(category="Billing Issue")
  4. Synthesizes findings into a natural language response

Building Reliable Async Operations

For agents that need to make multiple parallel API calls or database queries, Fenic's async UDFs maintain DataFrame semantics while maximizing throughput.

Async Tool Execution

python
import fenic.api.functions as fc
from fenic.api.functions.builtin import async_udf
from fenic.core.types import StringType, StructType, StructField, IntegerType
import aiohttp

@async_udf(
    return_type=StructType([
        StructField("status", IntegerType),
        StructField("response", StringType)
    ]),
    max_concurrency=20,
    timeout_seconds=10,
    num_retries=3
)
async def call_external_api(ticket_id: str) -> dict:
    """Call external API for enrichment data."""
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.example.com/tickets/{ticket_id}"
        ) as resp:
            return {
                "status": resp.status,
                "response": await resp.text()
            }

# Apply to DataFrame
df = df.select(
    fc.col("*"),
    call_external_api(fc.col("ticket_id")).alias("external_data")
)

Key reliability features:

  • Bounded concurrency: max_concurrency=20 limits parallel requests
  • Automatic retries: num_retries=3 handles transient failures
  • Timeout protection: timeout_seconds=10 prevents hanging requests
  • Ordered results: Output matches input row order
  • Graceful degradation: Individual failures return None rather than crashing the pipeline

Implementing Error Handling

Production agents need robust error handling at every layer.

Session-Level Configuration

python
from fenic.api.session.config import SessionConfig, SemanticConfig, AnthropicLanguageModel

config = SessionConfig(
    app_name="production_agent",
    semantic=SemanticConfig(
        language_models={
            "claude": AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=50000,
                output_tpm=50000,
                profiles={
                    "fast": AnthropicLanguageModel.Profile(),
                    "thorough": AnthropicLanguageModel.Profile(
                        thinking_token_budget=4096
                    )
                },
                default_profile="fast"
            )
        }
    )
)

session = Session.get_or_create(config)

Rate limits prevent quota exhaustion. Model profiles let you adjust reasoning effort based on task complexity.

Tool-Level Error Handling

python
from fenic.core.error import ExecutionError, ValidationError

try:
    # Create tool with validation
    session.catalog.create_tool(
        tool_name="search_tickets",
        tool_description="Search support tickets",
        tool_query=search_df,
        tool_params=[
            ToolParam(
                name="category",
                description="Category to search",
                allowed_values=["Account Access", "Billing Issue", "Technical Problem"]
            )
        ]
    )
except ValidationError as e:
    print(f"Tool configuration error: {e}")
except ExecutionError as e:
    print(f"Tool execution failed: {e}")

LangGraph Error Handling

python
def safe_tool_execution(state: AgentState):
    """Execute tools with error handling."""
    last_message = state["messages"][-1]
    tool_outputs = []

    for tool_call in last_message.tool_calls:
        try:
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]

            # Execute with timeout
            result = requests.post(
                f"http://localhost:8000/mcp/tools/{tool_name}",
                json=tool_args,
                timeout=30
            )
            result.raise_for_status()

            tool_outputs.append({
                "tool_call_id": tool_call["id"],
                "output": result.json()
            })
        except requests.exceptions.Timeout:
            tool_outputs.append({
                "tool_call_id": tool_call["id"],
                "output": {"error": "Tool execution timed out"}
            })
        except requests.exceptions.RequestException as e:
            tool_outputs.append({
                "tool_call_id": tool_call["id"],
                "output": {"error": f"Tool execution failed: {str(e)}"}
            })

    return {"messages": tool_outputs}

Monitoring and Metrics

Track performance and costs with Fenic's built-in metrics system.

python
# Query execution metrics
metrics_df = session.table("fenic_system.query_metrics")

# Analyze model usage
model_costs = metrics_df.select(
    fc.col("model"),
    fc.col("latency_ms"),
    fc.col("cost_usd"),
    fc.col("input_tokens"),
    fc.col("output_tokens")
).order_by("cost_usd", ascending=False)

model_costs.show(20)

# Aggregate statistics
summary = metrics_df.group_by("model").agg(
    fc.count("*").alias("total_calls"),
    fc.sum("cost_usd").alias("total_cost"),
    fc.avg("latency_ms").alias("avg_latency")
)

summary.show()

This telemetry helps identify bottlenecks and optimize both Fenic pipelines and LangGraph orchestration logic.

Best Practices

Data Preparation

Preprocess once, query many times: Use Fenic to extract and structure data before agent runtime. Agents should query clean DataFrames, not raw documents.

Use persist() for reused data: Cache intermediate results that multiple tools access.

python
processed_df = (
    raw_df
    .select(fc.semantic.extract(fc.col("content"), TicketInfo))
    .persist()  # Cache after first computation
)

Tool Design

Keep tools focused: Each tool should do one thing well. Compose advanced operations in LangGraph, not in individual tools.

Use catalog descriptions: Document tables and tools thoroughly. This context helps LLMs choose appropriate tools.

python
session.catalog.set_table_description(
    "support_tickets",
    "Customer support tickets from 2023-2025. Includes ticket ID, category, priority, customer info, and resolution status."
)

Validate inputs with ToolParam: Define allowed_values to prevent invalid inputs.

Orchestration

Implement circuit breakers: Track consecutive failures and stop calling broken tools.

python
def should_retry_tool(state: AgentState):
    """Check if tool should be retried."""
    consecutive_failures = state.get("consecutive_failures", 0)
    if consecutive_failures >= 3:
        return "fallback"
    return "retry"

Use stateless HTTP: For horizontally scalable deployments, enable stateless_http=True in your MCP server.

Separate batch and real-time: Use Fenic for heavy batch processing. Let agents query the processed results in real-time.

Common Patterns

Multi-Stage Agent Pipeline

python
# Stage 1: Data preparation (runs periodically)
# Stage 1: Data preparation (runs periodically)
def prepare_data():
    df = session.read.docs("/data/new_tickets/", content_type="markdown", recursive=True)
    df = df.select(
        fc.col("*"),
        fc.semantic.extract(fc.col("content"), TicketInfo).alias("info"),
        fc.semantic.classify(fc.col("content"), categories).alias("category")
    )
    df.write.save_as_table("support_tickets", mode="append")
# Stage 2: Agent runtime (responds to queries)
def run_agent(user_query: str):
    state = {"messages": [HumanMessage(content=user_query)]}
    result = app.invoke(state)
    return result["messages"][-1].content

Conditional Tool Routing

python
def route_based_on_intent(state: AgentState):
    """Route to different tools based on user intent."""
    last_message = state["messages"][-1].content

    # Use semantic classification to determine intent
    intent_df = session.create_dataframe({"query": [last_message]})
    intent_df = intent_df.select(
        fc.semantic.classify(
            fc.col("query"),
            ["search", "analyze", "summarize"]
        ).alias("intent")
    )
    intent = intent_df.to_pydict()["intent"][0]

    return intent  # Returns "search", "analyze", or "summarize"

# Add to graph with routing
workflow.add_conditional_edges(
    "route_intent",
    route_based_on_intent,
    {
        "search": "search_node",
        "analyze": "analyze_node",
        "summarize": "summarize_node"
    }
)

Feedback Loops

python
# Collect agent interactions for continuous improvement
interactions_df = session.create_dataframe({
    "query": user_queries,
    "response": agent_responses,
    "tools_used": tools_called,
    "user_feedback": feedback_scores
})

interactions_df.write.save_as_table("agent_interactions", mode="append")

# Analyze which tools work best
performance = session.table("agent_interactions").group_by("tools_used").agg(
    fc.avg("user_feedback").alias("avg_feedback"),
    fc.count("*").alias("usage_count")
)

Next Steps

Start building with Typedef and LangGraph:

  1. Install Fenic: pip install fenic
  2. Clone examples: Check the GitHub repository for sample implementations
  3. Join the community: Get help in the Discord server
  4. Read the docs: Full API documentation at docs.fenic.ai

The combination of Fenic's data-centric approach and LangGraph's orchestration capabilities provides a solid foundation for production AI agents. Start with small tools, validate reliability, then scale to advanced multi-agent systems.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.