Multi-agent systems face a critical challenge: ensuring all agents work from consistent, synchronized data. When different agents maintain separate data stores, coordination breaks down. Tasks duplicate, context fragments, and teams lose the ability to reason about shared state.
Typedef's catalog system solves this by providing a persistent, queryable data layer that serves as the single source of truth for your entire agent team. Combined with CrewAI's task orchestration, you get coordinated multi-agent workflows where every agent reads from and writes to the same verified data foundation.
This guide demonstrates how to build CrewAI agent teams that maintain perfect synchronization through Typedef's catalog architecture.
Why Agent Teams Need a Single Source of Truth
Multi-agent systems typically fail when:
- Agents maintain independent memory stores with no synchronization
- Context from one agent's work never reaches teammates
- Multiple agents process the same data repeatedly without coordination
- Teams lack visibility into what work has been completed
- Data inconsistencies accumulate across agent interactions
A single source of truth provides:
Centralized state management - All agents query and update the same persistent catalog tables, eliminating data drift between team members.
Type-safe data contracts - Schema-driven tables ensure every agent works with validated, consistent data structures across the entire workflow.
Audit trails - Every catalog operation is tracked, creating complete lineage for debugging multi-agent interactions.
Efficient data reuse - Agents access preprocessed results from teammates rather than recomputing expensive operations.
Cross-session persistence - Work continues seamlessly across agent restarts, deployments, and workflow interruptions.
Platform Architecture Overview
The coordination architecture consists of three layers working together:
Data Layer (Typedef/Fenic) - Persistent catalog stores all shared state as typed DataFrame tables. Semantic operators enable AI-native transformations on this data.
Tool Layer (MCP Server) - Model Context Protocol server exposes catalog operations as callable tools with automatic type validation and parameter checking.
Orchestration Layer (CrewAI) - Manages agent task delegation, execution order, and result synthesis while agents interact with the shared catalog.
This separation keeps data management deterministic and traceable while allowing probabilistic reasoning in the orchestration layer.
Setting Up the Typedef Foundation
Install Fenic and configure your environment:
bashpip install fenic export OPENAI_API_KEY="your-key" export ANTHROPIC_API_KEY="your-key"
Create a persistent session with catalog storage:
pythonfrom fenic.api.session.session import Session from fenic.api.session.config import ( SessionConfig, SemanticConfig, OpenAILanguageModel, AnthropicLanguageModel ) from pathlib import Path config = SessionConfig( app_name="crewai_coordination", db_path=Path("./crew_catalog.db"), # Persistent local storage semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=100, tpm=100000 ), "claude": AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=100, input_tpm=100000, output_tpm=100000 ) }, default_language_model="gpt4" ) ) session = Session.get_or_create(config) # Create dedicated database for crew coordination session.catalog.create_database("crew_workspace") session.catalog.set_current_database("crew_workspace")
The db_path parameter creates a persistent catalog that survives across sessions. Multiple agents can connect to the same catalog file, creating the shared coordination layer.
Creating Shared Data Tables
Define tables that serve as coordination points for your agent team:
pythonfrom fenic.core.types import ( StringType, IntegerType, TimestampType, BooleanType, Schema, ColumnField ) from datetime import datetime # Task registry - tracks all work items task_schema = Schema([ ColumnField("task_id", StringType), ColumnField("task_type", StringType), ColumnField("description", StringType), ColumnField("assigned_agent", StringType), ColumnField("status", StringType), # pending, in_progress, completed, failed ColumnField("priority", IntegerType), ColumnField("created_at", TimestampType), ColumnField("completed_at", TimestampType), ColumnField("result_data", StringType) ]) session.catalog.create_table( "task_registry", task_schema, description="Central registry of all agent tasks with status tracking" ) # Research findings - shared knowledge base findings_schema = Schema([ ColumnField("finding_id", StringType), ColumnField("source_agent", StringType), ColumnField("topic", StringType), ColumnField("content", StringType), ColumnField("confidence_score", IntegerType), ColumnField("created_at", TimestampType), ColumnField("verified", BooleanType) ]) session.catalog.create_table( "research_findings", findings_schema, description="Consolidated research results from all research agents" ) # Agent communication log - coordination messages messages_schema = Schema([ ColumnField("message_id", StringType), ColumnField("from_agent", StringType), ColumnField("to_agent", StringType), ColumnField("message_type", StringType), ColumnField("content", StringType), ColumnField("timestamp", TimestampType) ]) session.catalog.create_table( "agent_messages", messages_schema, description="Inter-agent communication and coordination messages" )
These tables form the coordination backbone. Every agent reads task assignments, writes findings, and logs coordination messages to these shared tables.
Building Catalog-Backed Tools
Create tools that expose catalog operations to your CrewAI agents:
pythonimport fenic.api.functions as fc from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType # Tool: Claim a pending task claim_task = ( session.table("task_registry") .filter( (fc.col("status") == "pending") & (fc.col("task_type") == fc.tool_param("task_type", StringType)) ) .sort(fc.col("priority").desc()) .limit(1) ) session.catalog.create_tool( tool_name="claim_next_task", tool_description="Claims the highest priority pending task of specified type", tool_query=claim_task, result_limit=1, tool_params=[ ToolParam( name="task_type", description="Type of task to claim", allowed_values=["research", "analysis", "writing", "review"] ) ] ) # Tool: Store research findings store_finding = session.table("research_findings") session.catalog.create_tool( tool_name="store_research_finding", tool_description="Stores a research finding in the shared knowledge base", tool_query=store_finding, result_limit=10, tool_params=[ ToolParam( name="finding_id", description="Unique identifier for this finding" ), ToolParam( name="source_agent", description="Name of the agent storing this finding" ), ToolParam( name="topic", description="Research topic or category" ), ToolParam( name="content", description="The research finding content" ), ToolParam( name="confidence_score", description="Confidence level 1-10" ) ] ) # Tool: Search existing findings search_findings = ( session.table("research_findings") .filter( fc.col("topic").contains(fc.tool_param("search_term", StringType)) | fc.col("content").contains(fc.tool_param("search_term", StringType)) ) .filter(fc.col("verified") == True) .sort(fc.col("confidence_score").desc()) ) session.catalog.create_tool( tool_name="search_research_findings", tool_description="Searches verified research findings by topic or content", tool_query=search_findings, result_limit=20, tool_params=[ ToolParam( name="search_term", description="Term to search for in findings" ) ] ) # Tool: Update task status update_status = ( session.table("task_registry") .filter(fc.col("task_id") == fc.tool_param("task_id", StringType)) ) session.catalog.create_tool( tool_name="update_task_status", tool_description="Updates the status of a specific task", tool_query=update_status, result_limit=1, tool_params=[ ToolParam( name="task_id", description="ID of the task to update" ), ToolParam( name="new_status", description="New status for the task", allowed_values=["pending", "in_progress", "completed", "failed"] ) ] )
These declarative tools eliminate boilerplate while maintaining type safety. The catalog validates parameters and enforces schemas automatically.
Exposing Tools via MCP Server
Launch an MCP server to make catalog tools accessible to agents:
pythonfrom fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync from fenic.api.mcp.tools import SystemToolConfig # Retrieve all catalog tools catalog_tools = session.catalog.list_tools() # Create MCP server with both catalog and system tools server = create_mcp_server( session=session, server_name="CrewAI Coordination Server", user_defined_tools=catalog_tools, system_tools=SystemToolConfig( table_names=session.catalog.list_tables(), tool_namespace="crew", max_result_rows=100 ), concurrency_limit=10 ) # Run server run_mcp_server_sync( server, transport="http", stateless_http=False, # Maintain state for coordination port=8000, host="127.0.0.1", path="/mcp" )
For production deployments, use ASGI:
pythonfrom fenic.api.mcp.server import run_mcp_server_asgi app = run_mcp_server_asgi( server, stateless_http=False, port=8000, host="0.0.0.0", path="/mcp" ) # Deploy with: uvicorn server:app --host 0.0.0.0 --port 8000 --workers 4
The MCP server handles concurrent tool calls from multiple agents while maintaining catalog consistency.
Building CrewAI Agents with Catalog Access
Create CrewAI agents that interact with the shared catalog through custom tools:
pythonfrom crewai import Agent, Task, Crew from crewai_tools import tool import requests from typing import Dict, Any # Wrapper function for MCP tool calls def call_mcp_tool(tool_name: str, params: Dict[str, Any]) -> Any: """Call an MCP tool on the Typedef server.""" response = requests.post( "http://127.0.0.1:8000/mcp/tools/call_tool", json={ "tool_name": tool_name, "arguments": params }, timeout=30 ) response.raise_for_status() return response.json() # CrewAI tool: Claim task @tool("Claim Next Task") def claim_task_tool(task_type: str) -> str: """ Claims the next available task of the specified type from the task registry. Args: task_type: Type of task to claim (research, analysis, writing, review) Returns: Task details as JSON string """ result = call_mcp_tool("claim_next_task", {"task_type": task_type}) return str(result) # CrewAI tool: Store finding @tool("Store Research Finding") def store_finding_tool( finding_id: str, source_agent: str, topic: str, content: str, confidence_score: int ) -> str: """ Stores a research finding in the shared knowledge base. Args: finding_id: Unique identifier for the finding source_agent: Name of the agent storing the finding topic: Research topic or category content: The finding content confidence_score: Confidence level 1-10 Returns: Confirmation message """ result = call_mcp_tool("store_research_finding", { "finding_id": finding_id, "source_agent": source_agent, "topic": topic, "content": content, "confidence_score": confidence_score }) return "Finding stored successfully" # CrewAI tool: Search findings @tool("Search Research Findings") def search_findings_tool(search_term: str) -> str: """ Searches the shared knowledge base for relevant research findings. Args: search_term: Term to search for in findings Returns: List of matching findings as JSON string """ result = call_mcp_tool("search_research_findings", { "search_term": search_term }) return str(result) # Define specialized agents research_agent = Agent( role="Research Specialist", goal="Conduct thorough research and store findings in shared catalog", backstory="""You are an expert researcher who systematically gathers information and stores findings in the team's shared knowledge base. You always check existing findings before starting new research.""", tools=[claim_task_tool, store_finding_tool, search_findings_tool], verbose=True, allow_delegation=False ) analysis_agent = Agent( role="Data Analyst", goal="Analyze research findings and identify patterns", backstory="""You are a data analyst who reviews research findings from the shared catalog and identifies patterns and insights. You build on the work of your research teammates.""", tools=[search_findings_tool, store_finding_tool], verbose=True, allow_delegation=False ) synthesis_agent = Agent( role="Content Synthesizer", goal="Create comprehensive reports from team findings", backstory="""You are a skilled writer who synthesizes research and analysis from the shared catalog into clear, actionable reports. You ensure all team findings are properly incorporated.""", tools=[search_findings_tool], verbose=True, allow_delegation=False )
These agents access the same catalog through their tools, ensuring perfect coordination.
Implementing Coordination Patterns
Pattern 1: Task Queue Coordination
Multiple agents claim tasks from a shared queue:
python# Initialize task queue import fenic.api.functions as fc from datetime import datetime import uuid tasks_df = session.create_dataframe({ "task_id": [str(uuid.uuid4()) for _ in range(5)], "task_type": ["research", "research", "analysis", "writing", "review"], "description": [ "Research AI safety frameworks", "Research ethical AI guidelines", "Analyze research findings on AI safety", "Write summary of AI ethics landscape", "Review final report" ], "assigned_agent": [None] * 5, "status": ["pending"] * 5, "priority": [10, 8, 6, 4, 2], "created_at": [datetime.now()] * 5, "completed_at": [None] * 5, "result_data": [None] * 5 }) tasks_df.write.save_as_table("task_registry", mode="append") # Define task claiming workflow research_task = Task( description=""" 1. Claim the next research task using claim_task_tool 2. Conduct the research 3. Store findings using store_finding_tool 4. Update task status to completed """, agent=research_agent, expected_output="Research findings stored in shared catalog" ) analysis_task = Task( description=""" 1. Wait for research tasks to complete 2. Search research findings on relevant topics 3. Analyze patterns and insights 4. Store analysis results """, agent=analysis_agent, expected_output="Analysis results stored in shared catalog" ) # Create crew with task dependencies crew = Crew( agents=[research_agent, analysis_agent, synthesis_agent], tasks=[research_task, analysis_task], verbose=True ) result = crew.kickoff()
Pattern 2: Knowledge Sharing Between Agents
Agents build on each other's work through the shared catalog:
python# Agent 1 stores initial finding def research_workflow(): # Search for existing work first existing = call_mcp_tool("search_research_findings", { "search_term": "AI safety" }) if not existing: # No existing research, conduct new research finding_id = str(uuid.uuid4()) call_mcp_tool("store_research_finding", { "finding_id": finding_id, "source_agent": "research_agent_1", "topic": "AI Safety", "content": "Research finding content here...", "confidence_score": 8 }) return finding_id else: # Build on existing research return existing # Agent 2 accesses and enhances the finding def analysis_workflow(research_topic: str): # Retrieve findings from catalog findings = call_mcp_tool("search_research_findings", { "search_term": research_topic }) # Analyze and store enhanced finding call_mcp_tool("store_research_finding", { "finding_id": str(uuid.uuid4()), "source_agent": "analysis_agent", "topic": research_topic, "content": f"Analysis based on previous findings: {findings}", "confidence_score": 9 })
Pattern 3: Semantic Processing with Shared Context
Use Fenic's semantic operators to enrich shared data:
pythonimport fenic.api.functions as fc from pydantic import BaseModel, Field from typing import List class EnrichedFinding(BaseModel): summary: str = Field(description="One-sentence summary") key_points: List[str] = Field(description="Key points") related_topics: List[str] = Field(description="Related research areas") action_items: List[str] = Field(description="Suggested next steps") # Enrich findings with semantic extraction findings_df = session.table("research_findings") enriched = findings_df.select( fc.col("*"), fc.semantic.extract( fc.col("content"), EnrichedFinding, model_alias="gpt4" ).alias("enriched_data") ).unnest("enriched_data") # Store enriched findings back to catalog enriched.write.save_as_table("enriched_findings", mode="overwrite") session.catalog.set_table_description( "enriched_findings", "Research findings with AI-extracted summaries and action items" )
Now all agents can query structured, enriched findings rather than raw text.
Advanced Coordination Strategies
Multi-Stage Agent Pipeline
Chain multiple agent teams with catalog handoffs:
python# Stage 1: Research team populates initial findings research_crew = Crew( agents=[research_agent], tasks=[research_task], process="sequential" ) research_result = research_crew.kickoff() # Stage 2: Analysis team processes research findings analysis_task = Task( description=""" Query research_findings table for new entries since last run. Analyze patterns and store insights in analysis_results table. """, agent=analysis_agent, expected_output="Analysis complete with insights stored" ) analysis_crew = Crew( agents=[analysis_agent], tasks=[analysis_task], process="sequential" ) analysis_result = analysis_crew.kickoff() # Stage 3: Synthesis team creates final output synthesis_task = Task( description=""" Retrieve all research findings and analysis results. Create comprehensive report synthesizing team's work. """, agent=synthesis_agent, expected_output="Final report incorporating all team findings" ) synthesis_crew = Crew( agents=[synthesis_agent], tasks=[synthesis_task], process="sequential" ) final_result = synthesis_crew.kickoff()
Real-Time Status Monitoring
Track agent progress through catalog queries:
python# Monitor task completion rates task_status = session.sql(""" SELECT status, COUNT(*) as task_count, AVG(CASE WHEN completed_at IS NOT NULL AND created_at IS NOT NULL THEN EXTRACT(EPOCH FROM (completed_at - created_at)) ELSE NULL END) as avg_completion_seconds FROM {tasks} GROUP BY status """, tasks=session.table("task_registry")) task_status.show() # Track agent productivity agent_metrics = session.sql(""" SELECT source_agent, COUNT(*) as findings_contributed, AVG(confidence_score) as avg_confidence, COUNT(DISTINCT topic) as topics_covered FROM {findings} GROUP BY source_agent ORDER BY findings_contributed DESC """, findings=session.table("research_findings")) agent_metrics.show()
Async Tool Execution for External APIs
Integrate external data sources with catalog coordination:
pythonimport aiohttp import fenic.api.functions as fc from fenic.api.functions.builtin import async_udf from fenic.core.types import StructType, StructField, StringType, IntegerType @async_udf( return_type=StructType([ StructField("external_data", StringType), StructField("status_code", IntegerType) ]), max_concurrency=20, timeout_seconds=10, num_retries=3 ) async def fetch_external_context(topic: str) -> dict: """Fetch external context for research topics.""" async with aiohttp.ClientSession() as session: async with session.get( f"https://api.example.com/research?topic={topic}" ) as resp: return { "external_data": await resp.text(), "status_code": resp.status } # Enrich catalog data with external sources enriched_research = ( session.table("research_findings") .select( fc.col("*"), fetch_external_context(fc.col("topic")).alias("external_context") ) .unnest("external_context") ) enriched_research.write.save_as_table( "externally_enriched_research", mode="overwrite" )
Monitoring and Performance Optimization
Track Catalog Operations
Use built-in metrics to monitor agent coordination:
pythonimport fenic.api.functions as fc # Query metrics for all catalog operations metrics_df = session.table("fenic_system.query_metrics") # Analyze tool call patterns tool_usage = metrics_df.select( fc.col("tool_name"), fc.col("total_lm_requests"), fc.col("total_lm_cost"), fc.col("latency_ms") ).filter( fc.col("tool_name").is_not_null() ).group_by("tool_name").agg( fc.count("*").alias("call_count"), fc.avg("latency_ms").alias("avg_latency"), fc.sum("total_lm_cost").alias("total_cost") ).order_by("call_count", ascending=False) tool_usage.show() # Monitor coordination bottlenecks slow_operations = metrics_df.filter( fc.col("latency_ms") > 5000 ).select( fc.col("query_text"), fc.col("latency_ms"), fc.col("end_ts") ).order_by("latency_ms", ascending=False) slow_operations.show(10)
Optimize Data Access Patterns
Cache frequently accessed catalog data:
python# Cache hot data for faster agent access frequently_accessed = ( session.table("research_findings") .filter(fc.col("verified") == True) .filter(fc.col("confidence_score") >= 8) .cache() # Keep in memory ) # Use cached data in agent workflows high_confidence_findings = frequently_accessed.filter( fc.col("topic").contains("AI safety") )
Implement Circuit Breakers
Prevent cascade failures in multi-agent systems:
pythondef safe_catalog_operation(operation_func, max_retries=3): """Execute catalog operations with retry logic.""" for attempt in range(max_retries): try: result = operation_func() return result except Exception as e: if attempt == max_retries - 1: # Log failure to catalog error_df = session.create_dataframe({ "operation": [operation_func.__name__], "error": [str(e)], "timestamp": [datetime.now()], "attempt": [attempt + 1] }) error_df.write.save_as_table( "operation_errors", mode="append" ) raise time.sleep(2 ** attempt) # Exponential backoff
Best Practices for Production Deployment
Schema evolution management - Version catalog tables as your agent system evolves. Create new tables rather than modifying schemas in production.
python# Version your schemas session.catalog.create_table( "research_findings_v2", updated_schema, description="Research findings with enhanced metadata (v2)" )
Concurrent access control - Configure appropriate concurrency limits on your MCP server to prevent resource exhaustion.
pythonserver = create_mcp_server( session=session, server_name="Production Coordination Server", user_defined_tools=catalog_tools, concurrency_limit=50, # Adjust based on load )
Catalog backup and recovery - Regularly backup your catalog database:
bash# Backup catalog cp crew_catalog.db crew_catalog_backup_$(date +%Y%m%d).db # Restore if needed cp crew_catalog_backup_20250101.db crew_catalog.db
Tool result limits - Set appropriate limits to prevent memory issues:
pythonsession.catalog.create_tool( tool_name="search_findings", tool_query=query, result_limit=100, # Prevent excessive result sets tool_params=params )
Rate limiting configuration - Configure model rate limits to prevent quota exhaustion:
pythonconfig = SessionConfig( app_name="production_crew", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=100, # Requests per minute tpm=100000 # Tokens per minute ) } ) )
Catalog maintenance - Implement cleanup routines for old data:
python# Archive completed tasks older than 30 days old_tasks = session.sql(""" SELECT * FROM {tasks} WHERE status = 'completed' AND completed_at < CURRENT_TIMESTAMP - INTERVAL '30 days' """, tasks=session.table("task_registry")) old_tasks.write.save_as_table("archived_tasks", mode="append") # Remove from active registry by keeping only recent/active tasks active_tasks = session.sql(""" SELECT * FROM {tasks} WHERE status != 'completed' OR completed_at >= CURRENT_TIMESTAMP - INTERVAL '30 days' """, tasks=session.table("task_registry")) active_tasks.write.save_as_table("task_registry", mode="overwrite")
Conclusion
Typedef's catalog system transforms CrewAI agent teams from loosely coordinated collections into synchronized, data-driven systems. By centralizing state management through a persistent, queryable catalog, you eliminate coordination failures while maintaining full audit trails.
The combination of Fenic's semantic operations, type-safe tool creation, and MCP server integration provides the infrastructure needed for production multi-agent systems. Start with simple task queue coordination, then scale to sophisticated multi-stage pipelines as your agent team grows.
For teams building production agent systems, explore Fenic's open-source framework or learn more about Typedef's platform. How to Coordinate CrewAI Ag ... fcf0804b9998e36c77e7b43f.md External Displaying How to Coordinate CrewAI Agent Teams with a Single 295df41efcf0804b9998e36c77e7b43f.md.

