<< goback()

How to Build Agentic AI with LangChain and Fenic's Declarative Tools

Typedef Team

How to Build Agentic AI with LangChain and Fenic's Declarative Tools

Production AI agents need three layers: data transformation, tool execution, and orchestration. Fenic handles data and tools through its DataFrame API and MCP server. LangChain manages orchestration with state graphs.

Agent Architecture Requirements

Build agents with these three components:

  • Data Layer: Extract structure from raw data, enrich context, apply semantic operations
  • Tool Layer: Create type-safe operations with parameter validation
  • Orchestration Layer: Manage state, coordinate tool calls, handle errors

Install and Configure Fenic

Install Fenic:

bash
pip install fenic

Configure a session with language models and rate limits:

python
from fenic.api.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    OpenAIEmbeddingModel
)

config = SessionConfig(
    app_name="langchain_agent_tools",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Rate limits prevent API quota exhaustion. Fenic manages request pacing automatically.

Prepare Data with Semantic Operations

Extract Structured Data

Transform unstructured text into typed structures using Pydantic schemas:

python
from pydantic import BaseModel, Field
from typing import List
import fenic.api.functions as fc

class ProductInfo(BaseModel):
    name: str = Field(description="Product name")
    price: float = Field(description="Price in USD")
    features: List[str] = Field(description="Key product features")

df = session.read.docs("./product_descriptions/**/*.md",
                       content_type="markdown",
                       recursive=True)

df = df.select(
    fc.col("file_path"),
    fc.semantic.extract(
        fc.col("content"),
        response_format=ProductInfo
    ).alias("product_data")
)

Join Data by Meaning

Use semantic joins to match records based on meaning, not exact strings:

python
from textwrap import dedent
from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection

examples = JoinExampleCollection()
examples.create_example(JoinExample(
    left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience",
    right="Backend Developer - Python/Go",
    output=True
))

matched_df = jobs_df.semantic.join(
    candidates_df,
    predicate=dedent('''
        Job Requirements: {{ left_on }}
        Candidate Skills: {{ right_on }}
        The candidate meets the core requirements for this role.
    '''),
    left_on=fc.col("job_requirements"),
    right_on=fc.col("candidate_skills"),
    examples=examples
)

Cluster Similar Items

Group items using embeddings and K-means clustering:

python
df_with_embeddings = df.select(
    fc.col("ticket_id"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
)

clustered_df = df_with_embeddings.semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=10,
    label_column="cluster_id",
    centroid_column="cluster_centroid"
)

Classify Text

Apply LLM-powered classification with few-shot examples:

python
from fenic.core.types.semantic_examples import MapExample, MapExampleCollection

examples = MapExampleCollection()
examples.create_example(MapExample(
    input={"title": "User can't login", "body": "Getting 401 errors"},
    output="Authentication"
))

df = df.select(
    fc.col("ticket_id"),
    fc.semantic.map(
        "Classify this support ticket: {{ title }} - {{ body }}",
        title=fc.col("title"),
        body=fc.col("body"),
        examples=examples
    ).alias("category")
)

Create Declarative Tools

Fenic's catalog system converts DataFrame queries into type-safe tools.

Build a Basic Tool

Define a tool with parameters:

python
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

customers_df = session.read.csv("./data/customers.csv")

search_query = customers_df.filter(
    fc.col("industry").contains(
        fc.tool_param("industry", StringType)
    ) &
    (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType))
).select(
    fc.col("company_name"),
    fc.col("contact_email"),
    fc.col("annual_revenue"),
    fc.col("industry")
)

session.catalog.create_tool(
    tool_name="search_customers",
    tool_description="Search for customers by industry and minimum revenue threshold",
    tool_query=search_query,
    tool_params=[
        ToolParam(
            name="industry",
            description="Industry sector to filter by"
        ),
        ToolParam(
            name="min_revenue",
            description="Minimum annual revenue in USD",
            default_value=0,
            has_default=True
        )
    ],
    result_limit=50
)

Build Multi-Step Tools

Chain semantic operations in a single tool:

python
tickets_df = session.read.csv("./data/support_tickets.csv")

processed_tickets = tickets_df.select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("description"),
    fc.semantic.embed(fc.col("description")).alias("embeddings")
).semantic.with_cluster_labels(
    by=fc.col("embeddings"),
    num_clusters=15,
    label_column="category_cluster"
)

similar_tickets_query = processed_tickets.filter(
    fc.embedding.compute_similarity(
        fc.col("embeddings"),
        fc.tool_param("query_vector", fc.col("embeddings").data_type),
        metric="cosine"
    ) > 0.7
).select(
    fc.col("ticket_id"),
    fc.col("subject"),
    fc.col("category_cluster")
)

session.catalog.create_tool(
    tool_name="find_similar_tickets",
    tool_description="Find support tickets similar to a given query embedding",
    tool_query=similar_tickets_query,
    tool_params=[
        ToolParam(
            name="query_vector",
            description="Embedding vector of the search query"
        )
    ],
    result_limit=10
)

Deploy MCP Server

Run Fenic tools as an MCP server that LangChain agents can access.

Development Server

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="CustomerDataServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Production Server

Deploy with ASGI for production:

python
from fenic.api.mcp import create_mcp_server, run_mcp_server_asgi

tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="ProductionDataServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

app = run_mcp_server_asgi(
    server=server,
    stateless_http=True,
    path="/mcp"
)

# Deploy: uvicorn app:app --host 0.0.0.0 --port 8000

CLI Server

Use the command-line interface:

bash
# Serve all catalog tools
fenic-serve --transport http --port 8000

# Serve specific tools
fenic-serve --tools search_customers find_similar_tickets

# Use stdio transport
fenic-serve --transport stdio

Connect Fenic Tools to LangChain

Define Agent State

python
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    ticket_data: dict
    analysis_results: dict
    next_action: str

Wrap MCP Tools

Create LangChain tools that call your MCP server:

python
from langchain_core.tools import tool
import requests

@tool
def search_tickets(category: str, limit: int = 10) -> dict:
    """Search support tickets by category."""
    response = requests.post(
        "http://localhost:8000/mcp/tools/search_tickets",
        json={"category": category, "limit": limit}
    )
    return response.json()

@tool
def analyze_ticket_metrics(category: str) -> dict:
    """Get statistics for tickets in a category."""
    response = requests.post(
        "http://localhost:8000/mcp/tools/analyze_ticket_metrics",
        json={"category": category}
    )
    return response.json()

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
tools = [search_tickets, analyze_ticket_metrics]
llm_with_tools = llm.bind_tools(tools)

Build LangChain Graph

Create a state graph for agent orchestration:

python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage

def call_model(state: AgentState):
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

def execute_tools(state: AgentState):
    last_message = state["messages"][-1]
    tool_outputs = []

    for tool_call in last_message.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        if tool_name == "search_tickets":
            result = search_tickets.invoke(tool_args)
        elif tool_name == "analyze_ticket_metrics":
            result = analyze_ticket_metrics.invoke(tool_args)

        tool_outputs.append({
            "tool_call_id": tool_call["id"],
            "output": result
        })

    return {"messages": tool_outputs}

def should_continue(state: AgentState):
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "continue"
    return "end"

workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", execute_tools)
workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "tools",
        "end": END
    }
)

workflow.add_edge("tools", "agent")
app = workflow.compile()

Run the Agent

python
initial_state = {
    "messages": [
        HumanMessage(content="Show me the top 5 billing issues and analyze their patterns")
    ],
    "ticket_data": {},
    "analysis_results": {},
    "next_action": ""
}

result = app.invoke(initial_state)
final_message = result["messages"][-1]
print(final_message.content)

Advanced Tool Patterns

Hybrid Search

Combine embeddings with structured filters:

python
from fenic.core.types import FloatType

products_df = session.read.csv("./data/products.csv")

products_with_embeddings = products_df.select(
    fc.col("product_id"),
    fc.col("name"),
    fc.col("description"),
    fc.col("price"),
    fc.col("category"),
    fc.semantic.embed(fc.col("description")).alias("desc_embeddings")
)

hybrid_search = products_with_embeddings.filter(
    (fc.col("category") == fc.tool_param("category", StringType)) &
    (fc.col("price").between(
        fc.tool_param("min_price", FloatType),
        fc.tool_param("max_price", FloatType)
    ))
).with_column(
    "similarity_score",
    fc.embedding.compute_similarity(
        fc.col("desc_embeddings"),
        fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type),
        metric="cosine"
    )
).filter(
    fc.col("similarity_score") > 0.6
).order_by(
    fc.col("similarity_score").desc()
)

session.catalog.create_tool(
    tool_name="hybrid_product_search",
    tool_description="Search products using category, price range, and semantic similarity",
    tool_query=hybrid_search,
    tool_params=[
        ToolParam(name="category", description="Product category"),
        ToolParam(name="min_price", description="Minimum price in USD"),
        ToolParam(name="max_price", description="Maximum price in USD"),
        ToolParam(name="query_embedding", description="Search query embedding vector")
    ],
    result_limit=20
)

Async Data Enrichment

Process data with async UDFs for parallel API calls:

python
import aiohttp
from fenic.api.functions import async_udf
from fenic.core.types import StructType, StructField, StringType, FloatType

@async_udf(
    return_type=StructType([
        StructField("sentiment", StringType),
        StructField("confidence", FloatType)
    ]),
    max_concurrency=15,
    timeout_seconds=5,
    num_retries=2
)
async def analyze_sentiment(text: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.example.com/sentiment",
            json={"text": text}
        ) as resp:
            data = await resp.json()
            return {
                "sentiment": data["label"],
                "confidence": data["score"]
            }

enriched_df = reviews_df.select(
    fc.col("review_id"),
    fc.col("review_text"),
    analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis")
)

Async UDFs provide:

  • Bounded concurrency with max_concurrency
  • Automatic retries with num_retries
  • Timeout protection with timeout_seconds
  • Ordered results matching input row order

System Tools

Auto-generate tools for data exploration:

python
from fenic.api.mcp.tools import SystemToolConfig

products_df.write.save_as_table("products", mode="overwrite")
customers_df.write.save_as_table("customers", mode="overwrite")

session.catalog.set_table_description(
    "products",
    "Product catalog with descriptions, pricing, and availability"
)

server = create_mcp_server(
    session=session,
    server_name="AutomatedToolServer",
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="data",
        max_result_rows=100
    )
)

This generates tools for schema inspection, data profiling, regex search, and SQL analysis.

Optimize Performance

Cache Intermediate Results

Save expensive computations:

python
embeddings_df = documents_df.select(
    fc.col("doc_id"),
    fc.semantic.embed(fc.col("content")).alias("embeddings")
)

embeddings_df.write.save_as_table("document_embeddings", mode="overwrite")

cached_embeddings = session.table("document_embeddings")

Process in Batches

Control memory and API usage:

python
large_df = session.read.csv("./data/large_dataset.csv")

batch_size = 1000
offset = 0

while True:
    batch = large_df.limit(batch_size).offset(offset)

    processed_batch = batch.select(
        fc.col("id"),
        fc.semantic.extract(fc.col("text"), response_format=Schema)
    )

    processed_batch.write.save_as_table(
        "processed_results",
        mode="append"
    )

    if batch.count() < batch_size:
        break

    offset += batch_size

Configure Rate Limits

Prevent API throttling:

python
config = SessionConfig(
    semantic=SemanticConfig(
        language_models={
            "fast": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=500000
            ),
            "accurate": OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=50,
                tpm=100000
            )
        },
        default_language_model="fast"
    )
)

Monitor Costs

Track query performance and costs:

python
from fenic.core.types import TimestampType

metrics_df = session.table("fenic_system.query_metrics")

recent_queries = metrics_df.select(
    fc.col("query_id"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests"),
    fc.col("end_ts")
).order_by(fc.col("end_ts").desc()).limit(10)

recent_queries.show()

cost_analysis = metrics_df.select(
    fc.dt.date_trunc(fc.col("end_ts").cast(TimestampType), "hour").alias("hour"),
    fc.col("total_lm_cost"),
    fc.col("total_lm_requests")
).group_by("hour").agg(
    fc.sum("total_lm_cost").alias("total_cost"),
    fc.sum("total_lm_requests").alias("total_requests")
).order_by(fc.col("hour").desc())

cost_analysis.show()

Production Best Practices

Data Preparation

Preprocess data once, query many times. Cache intermediate results that multiple tools use:

python
processed_df = (
    raw_df
    .select(fc.semantic.extract(fc.col("content"), TicketInfo))
    .persist()
)

Tool Design

Keep each tool focused on one operation. Document tools thoroughly:

python
session.catalog.set_table_description(
    "support_tickets",
    "Customer support tickets from 2023-2025. Includes ticket ID, category, priority, customer info, and resolution status."
)

Validate inputs with allowed_values:

python
ToolParam(
    name="category",
    description="Category to search",
    allowed_values=["Account Access", "Billing Issue", "Technical Problem"]
)

Error Handling

Handle errors at every layer:

python
from fenic.core.error import ExecutionError, ValidationError

try:
    session.catalog.create_tool(
        tool_name="search_tickets",
        tool_description="Search support tickets",
        tool_query=search_df,
        tool_params=[ToolParam(name="category", description="Category to search")]
    )
except ValidationError as e:
    print(f"Tool configuration error: {e}")
except ExecutionError as e:
    print(f"Tool execution failed: {e}")

Orchestration

Implement circuit breakers for failed tools:

python
def should_retry_tool(state: AgentState):
    consecutive_failures = state.get("consecutive_failures", 0)
    if consecutive_failures >= 3:
        return "fallback"
    return "retry"

Use stateless HTTP for horizontal scaling:

python
server = create_mcp_server(
    session=session,
    server_name="ScalableServer",
    user_defined_tools=tools,
    concurrency_limit=20
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000
)

Separate batch preprocessing from real-time agent execution. Use Fenic for heavy batch processing, then let agents query the results.

Complete Integration Example

python
from fenic.api.session import Session
from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel
from fenic.api.mcp import create_mcp_server, run_mcp_server_sync
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

# Configure session
config = SessionConfig(
    app_name="customer_intelligence",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        }
    )
)

session = Session.get_or_create(config)

# Load data
customers_df = session.read.csv("./data/customers.csv")
feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True)

# Define schema
class Feedback(BaseModel):
    sentiment: str = Field(description="Sentiment: positive, negative, or neutral")
    main_topic: str = Field(description="Primary topic discussed")
    action_items: List[str] = Field(description="Suggested action items")

# Process feedback
processed_feedback = feedback_df.select(
    fc.col("file_path"),
    fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis")
)

# Create customer search tool
search_customers_query = customers_df.filter(
    fc.col("segment").contains(fc.tool_param("segment", StringType))
).select(
    fc.col("customer_id"),
    fc.col("company_name"),
    fc.col("segment"),
    fc.col("annual_revenue")
)

session.catalog.create_tool(
    tool_name="search_customers_by_segment",
    tool_description="Find customers in a specific business segment",
    tool_query=search_customers_query,
    tool_params=[
        ToolParam(
            name="segment",
            description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')"
        )
    ],
    result_limit=50
)

# Create feedback analysis tool
feedback_analysis_query = processed_feedback.filter(
    fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType)
)

session.catalog.create_tool(
    tool_name="analyze_feedback_by_sentiment",
    tool_description="Retrieve customer feedback filtered by sentiment",
    tool_query=feedback_analysis_query,
    tool_params=[
        ToolParam(
            name="sentiment_filter",
            description="Filter by sentiment: positive, negative, or neutral"
        )
    ],
    result_limit=25
)

# Deploy MCP server
tools = session.catalog.list_tools()

server = create_mcp_server(
    session=session,
    server_name="CustomerIntelligenceServer",
    user_defined_tools=tools,
    concurrency_limit=10
)

run_mcp_server_sync(
    server=server,
    transport="http",
    stateless_http=True,
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

Key Takeaways

Separate data preparation from agent reasoning:

  • Use semantic operations to extract, classify, cluster, and join data before agents access it
  • Create type-safe tools with tool_param for validation
  • Deploy tools via MCP servers that LangChain agents call directly
  • Cache embeddings and batch process large datasets
  • Monitor costs through Fenic's metrics system
  • Use system tools for automatic schema inspection and SQL analysis

This separation improves reliability, reduces costs, and makes agent systems maintainable at scale.

Resources

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.