Production AI agents need three layers: data transformation, tool execution, and orchestration. Fenic handles data and tools through its DataFrame API and MCP server. LangChain manages orchestration with state graphs.
Agent Architecture Requirements
Build agents with these three components:
- Data Layer: Extract structure from raw data, enrich context, apply semantic operations
- Tool Layer: Create type-safe operations with parameter validation
- Orchestration Layer: Manage state, coordinate tool calls, handle errors
Install and Configure Fenic
Install Fenic:
bashpip install fenic
Configure a session with language models and rate limits:
pythonfrom fenic.api.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel ) config = SessionConfig( app_name="langchain_agent_tools", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, default_language_model="gpt4", embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) }, default_embedding_model="embeddings" ) ) session = Session.get_or_create(config)
Rate limits prevent API quota exhaustion. Fenic manages request pacing automatically.
Prepare Data with Semantic Operations
Extract Structured Data
Transform unstructured text into typed structures using Pydantic schemas:
pythonfrom pydantic import BaseModel, Field from typing import List import fenic.api.functions as fc class ProductInfo(BaseModel): name: str = Field(description="Product name") price: float = Field(description="Price in USD") features: List[str] = Field(description="Key product features") df = session.read.docs("./product_descriptions/**/*.md", content_type="markdown", recursive=True) df = df.select( fc.col("file_path"), fc.semantic.extract( fc.col("content"), response_format=ProductInfo ).alias("product_data") )
Join Data by Meaning
Use semantic joins to match records based on meaning, not exact strings:
pythonfrom textwrap import dedent from fenic.core.types.semantic_examples import JoinExample, JoinExampleCollection examples = JoinExampleCollection() examples.create_example(JoinExample( left="Senior Backend Engineer with Python, FastAPI, PostgreSQL experience", right="Backend Developer - Python/Go", output=True )) matched_df = jobs_df.semantic.join( candidates_df, predicate=dedent(''' Job Requirements: {{ left_on }} Candidate Skills: {{ right_on }} The candidate meets the core requirements for this role. '''), left_on=fc.col("job_requirements"), right_on=fc.col("candidate_skills"), examples=examples )
Cluster Similar Items
Group items using embeddings and K-means clustering:
pythondf_with_embeddings = df.select( fc.col("ticket_id"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ) clustered_df = df_with_embeddings.semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=10, label_column="cluster_id", centroid_column="cluster_centroid" )
Classify Text
Apply LLM-powered classification with few-shot examples:
pythonfrom fenic.core.types.semantic_examples import MapExample, MapExampleCollection examples = MapExampleCollection() examples.create_example(MapExample( input={"title": "User can't login", "body": "Getting 401 errors"}, output="Authentication" )) df = df.select( fc.col("ticket_id"), fc.semantic.map( "Classify this support ticket: {{ title }} - {{ body }}", title=fc.col("title"), body=fc.col("body"), examples=examples ).alias("category") )
Create Declarative Tools
Fenic's catalog system converts DataFrame queries into type-safe tools.
Build a Basic Tool
Define a tool with parameters:
pythonfrom fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType customers_df = session.read.csv("./data/customers.csv") search_query = customers_df.filter( fc.col("industry").contains( fc.tool_param("industry", StringType) ) & (fc.col("annual_revenue") >= fc.tool_param("min_revenue", IntegerType)) ).select( fc.col("company_name"), fc.col("contact_email"), fc.col("annual_revenue"), fc.col("industry") ) session.catalog.create_tool( tool_name="search_customers", tool_description="Search for customers by industry and minimum revenue threshold", tool_query=search_query, tool_params=[ ToolParam( name="industry", description="Industry sector to filter by" ), ToolParam( name="min_revenue", description="Minimum annual revenue in USD", default_value=0, has_default=True ) ], result_limit=50 )
Build Multi-Step Tools
Chain semantic operations in a single tool:
pythontickets_df = session.read.csv("./data/support_tickets.csv") processed_tickets = tickets_df.select( fc.col("ticket_id"), fc.col("subject"), fc.col("description"), fc.semantic.embed(fc.col("description")).alias("embeddings") ).semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=15, label_column="category_cluster" ) similar_tickets_query = processed_tickets.filter( fc.embedding.compute_similarity( fc.col("embeddings"), fc.tool_param("query_vector", fc.col("embeddings").data_type), metric="cosine" ) > 0.7 ).select( fc.col("ticket_id"), fc.col("subject"), fc.col("category_cluster") ) session.catalog.create_tool( tool_name="find_similar_tickets", tool_description="Find support tickets similar to a given query embedding", tool_query=similar_tickets_query, tool_params=[ ToolParam( name="query_vector", description="Embedding vector of the search query" ) ], result_limit=10 )
Deploy MCP Server
Run Fenic tools as an MCP server that LangChain agents can access.
Development Server
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="CustomerDataServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Production Server
Deploy with ASGI for production:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="ProductionDataServer", user_defined_tools=tools, concurrency_limit=20 ) app = run_mcp_server_asgi( server=server, stateless_http=True, path="/mcp" ) # Deploy: uvicorn app:app --host 0.0.0.0 --port 8000
CLI Server
Use the command-line interface:
bash# Serve all catalog tools fenic-serve --transport http --port 8000 # Serve specific tools fenic-serve --tools search_customers find_similar_tickets # Use stdio transport fenic-serve --transport stdio
Connect Fenic Tools to LangChain
Define Agent State
pythonfrom typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] ticket_data: dict analysis_results: dict next_action: str
Wrap MCP Tools
Create LangChain tools that call your MCP server:
pythonfrom langchain_core.tools import tool import requests @tool def search_tickets(category: str, limit: int = 10) -> dict: """Search support tickets by category.""" response = requests.post( "http://localhost:8000/mcp/tools/search_tickets", json={"category": category, "limit": limit} ) return response.json() @tool def analyze_ticket_metrics(category: str) -> dict: """Get statistics for tickets in a category.""" response = requests.post( "http://localhost:8000/mcp/tools/analyze_ticket_metrics", json={"category": category} ) return response.json() from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4") tools = [search_tickets, analyze_ticket_metrics] llm_with_tools = llm.bind_tools(tools)
Build LangChain Graph
Create a state graph for agent orchestration:
pythonfrom langgraph.graph import StateGraph, END from langchain_core.messages import HumanMessage def call_model(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]} def execute_tools(state: AgentState): last_message = state["messages"][-1] tool_outputs = [] for tool_call in last_message.tool_calls: tool_name = tool_call["name"] tool_args = tool_call["args"] if tool_name == "search_tickets": result = search_tickets.invoke(tool_args) elif tool_name == "analyze_ticket_metrics": result = analyze_ticket_metrics.invoke(tool_args) tool_outputs.append({ "tool_call_id": tool_call["id"], "output": result }) return {"messages": tool_outputs} def should_continue(state: AgentState): last_message = state["messages"][-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "continue" return "end" workflow = StateGraph(AgentState) workflow.add_node("agent", call_model) workflow.add_node("tools", execute_tools) workflow.set_entry_point("agent") workflow.add_conditional_edges( "agent", should_continue, { "continue": "tools", "end": END } ) workflow.add_edge("tools", "agent") app = workflow.compile()
Run the Agent
pythoninitial_state = { "messages": [ HumanMessage(content="Show me the top 5 billing issues and analyze their patterns") ], "ticket_data": {}, "analysis_results": {}, "next_action": "" } result = app.invoke(initial_state) final_message = result["messages"][-1] print(final_message.content)
Advanced Tool Patterns
Hybrid Search
Combine embeddings with structured filters:
pythonfrom fenic.core.types import FloatType products_df = session.read.csv("./data/products.csv") products_with_embeddings = products_df.select( fc.col("product_id"), fc.col("name"), fc.col("description"), fc.col("price"), fc.col("category"), fc.semantic.embed(fc.col("description")).alias("desc_embeddings") ) hybrid_search = products_with_embeddings.filter( (fc.col("category") == fc.tool_param("category", StringType)) & (fc.col("price").between( fc.tool_param("min_price", FloatType), fc.tool_param("max_price", FloatType) )) ).with_column( "similarity_score", fc.embedding.compute_similarity( fc.col("desc_embeddings"), fc.tool_param("query_embedding", fc.col("desc_embeddings").data_type), metric="cosine" ) ).filter( fc.col("similarity_score") > 0.6 ).order_by( fc.col("similarity_score").desc() ) session.catalog.create_tool( tool_name="hybrid_product_search", tool_description="Search products using category, price range, and semantic similarity", tool_query=hybrid_search, tool_params=[ ToolParam(name="category", description="Product category"), ToolParam(name="min_price", description="Minimum price in USD"), ToolParam(name="max_price", description="Maximum price in USD"), ToolParam(name="query_embedding", description="Search query embedding vector") ], result_limit=20 )
Async Data Enrichment
Process data with async UDFs for parallel API calls:
pythonimport aiohttp from fenic.api.functions import async_udf from fenic.core.types import StructType, StructField, StringType, FloatType @async_udf( return_type=StructType([ StructField("sentiment", StringType), StructField("confidence", FloatType) ]), max_concurrency=15, timeout_seconds=5, num_retries=2 ) async def analyze_sentiment(text: str) -> dict: async with aiohttp.ClientSession() as session: async with session.post( "https://api.example.com/sentiment", json={"text": text} ) as resp: data = await resp.json() return { "sentiment": data["label"], "confidence": data["score"] } enriched_df = reviews_df.select( fc.col("review_id"), fc.col("review_text"), analyze_sentiment(fc.col("review_text")).alias("sentiment_analysis") )
Async UDFs provide:
- Bounded concurrency with
max_concurrency
- Automatic retries with
num_retries
- Timeout protection with
timeout_seconds
- Ordered results matching input row order
System Tools
Auto-generate tools for data exploration:
pythonfrom fenic.api.mcp.tools import SystemToolConfig products_df.write.save_as_table("products", mode="overwrite") customers_df.write.save_as_table("customers", mode="overwrite") session.catalog.set_table_description( "products", "Product catalog with descriptions, pricing, and availability" ) server = create_mcp_server( session=session, server_name="AutomatedToolServer", system_tools=SystemToolConfig( table_names=session.catalog.list_tables(), tool_namespace="data", max_result_rows=100 ) )
This generates tools for schema inspection, data profiling, regex search, and SQL analysis.
Optimize Performance
Cache Intermediate Results
Save expensive computations:
pythonembeddings_df = documents_df.select( fc.col("doc_id"), fc.semantic.embed(fc.col("content")).alias("embeddings") ) embeddings_df.write.save_as_table("document_embeddings", mode="overwrite") cached_embeddings = session.table("document_embeddings")
Process in Batches
Control memory and API usage:
pythonlarge_df = session.read.csv("./data/large_dataset.csv") batch_size = 1000 offset = 0 while True: batch = large_df.limit(batch_size).offset(offset) processed_batch = batch.select( fc.col("id"), fc.semantic.extract(fc.col("text"), response_format=Schema) ) processed_batch.write.save_as_table( "processed_results", mode="append" ) if batch.count() < batch_size: break offset += batch_size
Configure Rate Limits
Prevent API throttling:
pythonconfig = SessionConfig( semantic=SemanticConfig( language_models={ "fast": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=500000 ), "accurate": OpenAILanguageModel( model_name="gpt-4o", rpm=50, tpm=100000 ) }, default_language_model="fast" ) )
Monitor Costs
Track query performance and costs:
pythonfrom fenic.core.types import TimestampType metrics_df = session.table("fenic_system.query_metrics") recent_queries = metrics_df.select( fc.col("query_id"), fc.col("total_lm_cost"), fc.col("total_lm_requests"), fc.col("end_ts") ).order_by(fc.col("end_ts").desc()).limit(10) recent_queries.show() cost_analysis = metrics_df.select( fc.dt.date_trunc(fc.col("end_ts").cast(TimestampType), "hour").alias("hour"), fc.col("total_lm_cost"), fc.col("total_lm_requests") ).group_by("hour").agg( fc.sum("total_lm_cost").alias("total_cost"), fc.sum("total_lm_requests").alias("total_requests") ).order_by(fc.col("hour").desc()) cost_analysis.show()
Production Best Practices
Data Preparation
Preprocess data once, query many times. Cache intermediate results that multiple tools use:
pythonprocessed_df = ( raw_df .select(fc.semantic.extract(fc.col("content"), TicketInfo)) .persist() )
Tool Design
Keep each tool focused on one operation. Document tools thoroughly:
pythonsession.catalog.set_table_description( "support_tickets", "Customer support tickets from 2023-2025. Includes ticket ID, category, priority, customer info, and resolution status." )
Validate inputs with allowed_values
:
pythonToolParam( name="category", description="Category to search", allowed_values=["Account Access", "Billing Issue", "Technical Problem"] )
Error Handling
Handle errors at every layer:
pythonfrom fenic.core.error import ExecutionError, ValidationError try: session.catalog.create_tool( tool_name="search_tickets", tool_description="Search support tickets", tool_query=search_df, tool_params=[ToolParam(name="category", description="Category to search")] ) except ValidationError as e: print(f"Tool configuration error: {e}") except ExecutionError as e: print(f"Tool execution failed: {e}")
Orchestration
Implement circuit breakers for failed tools:
pythondef should_retry_tool(state: AgentState): consecutive_failures = state.get("consecutive_failures", 0) if consecutive_failures >= 3: return "fallback" return "retry"
Use stateless HTTP for horizontal scaling:
pythonserver = create_mcp_server( session=session, server_name="ScalableServer", user_defined_tools=tools, concurrency_limit=20 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000 )
Separate batch preprocessing from real-time agent execution. Use Fenic for heavy batch processing, then let agents query the results.
Complete Integration Example
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig, OpenAILanguageModel, OpenAIEmbeddingModel from fenic.api.mcp import create_mcp_server, run_mcp_server_sync from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType import fenic.api.functions as fc from pydantic import BaseModel, Field from typing import List # Configure session config = SessionConfig( app_name="customer_intelligence", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ) }, embedding_models={ "embeddings": OpenAIEmbeddingModel( model_name="text-embedding-3-small", rpm=100, tpm=100000 ) } ) ) session = Session.get_or_create(config) # Load data customers_df = session.read.csv("./data/customers.csv") feedback_df = session.read.docs("./feedback/**/*.md", content_type="markdown", recursive=True) # Define schema class Feedback(BaseModel): sentiment: str = Field(description="Sentiment: positive, negative, or neutral") main_topic: str = Field(description="Primary topic discussed") action_items: List[str] = Field(description="Suggested action items") # Process feedback processed_feedback = feedback_df.select( fc.col("file_path"), fc.semantic.extract(fc.col("content"), response_format=Feedback).alias("analysis") ) # Create customer search tool search_customers_query = customers_df.filter( fc.col("segment").contains(fc.tool_param("segment", StringType)) ).select( fc.col("customer_id"), fc.col("company_name"), fc.col("segment"), fc.col("annual_revenue") ) session.catalog.create_tool( tool_name="search_customers_by_segment", tool_description="Find customers in a specific business segment", tool_query=search_customers_query, tool_params=[ ToolParam( name="segment", description="Business segment (e.g., 'enterprise', 'mid-market', 'smb')" ) ], result_limit=50 ) # Create feedback analysis tool feedback_analysis_query = processed_feedback.filter( fc.col("analysis")["sentiment"] == fc.tool_param("sentiment_filter", StringType) ) session.catalog.create_tool( tool_name="analyze_feedback_by_sentiment", tool_description="Retrieve customer feedback filtered by sentiment", tool_query=feedback_analysis_query, tool_params=[ ToolParam( name="sentiment_filter", description="Filter by sentiment: positive, negative, or neutral" ) ], result_limit=25 ) # Deploy MCP server tools = session.catalog.list_tools() server = create_mcp_server( session=session, server_name="CustomerIntelligenceServer", user_defined_tools=tools, concurrency_limit=10 ) run_mcp_server_sync( server=server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
Key Takeaways
Separate data preparation from agent reasoning:
- Use semantic operations to extract, classify, cluster, and join data before agents access it
- Create type-safe tools with
tool_param
for validation - Deploy tools via MCP servers that LangChain agents call directly
- Cache embeddings and batch process large datasets
- Monitor costs through Fenic's metrics system
- Use system tools for automatic schema inspection and SQL analysis
This separation improves reliability, reduces costs, and makes agent systems maintainable at scale.
Resources
- Install Fenic:
pip install fenic
- GitHub Repository: Sample implementations and examples
- Discord Community: Get support and share feedback
- Orchestrating Reliable Agents: Production patterns with LangGraph
- Building Agentic Applications: Declarative DataFrame patterns
- Fenic 0.4.0 Release: Declarative tools and MCP features
- Enhance LangChain Agents: Semantic data pipeline integration How to Build Agentic AI with ... 1efcf080db8e04cf94038e2afd.md External Displaying How to Build Agentic AI with LangChain and Fenic's 28fdf41efcf080db8e04cf94038e2afd.md.