<< goback()

How to Coordinate CrewAI Agent Teams with a Single Source of Truth on Typedef

Typedef Team

How to Coordinate CrewAI Agent Teams with a Single Source of Truth on Typedef

Multi-agent systems face a critical challenge: ensuring all agents work from consistent, synchronized data. When different agents maintain separate data stores, coordination breaks down. Tasks duplicate, context fragments, and teams lose the ability to reason about shared state.

Typedef's catalog system solves this by providing a persistent, queryable data layer that serves as the single source of truth for your entire agent team. Combined with CrewAI's task orchestration, you get coordinated multi-agent workflows where every agent reads from and writes to the same verified data foundation.

This guide demonstrates how to build CrewAI agent teams that maintain perfect synchronization through Typedef's catalog architecture.

Why Agent Teams Need a Single Source of Truth

Multi-agent systems typically fail when:

  • Agents maintain independent memory stores with no synchronization
  • Context from one agent's work never reaches teammates
  • Multiple agents process the same data repeatedly without coordination
  • Teams lack visibility into what work has been completed
  • Data inconsistencies accumulate across agent interactions

A single source of truth provides:

Centralized state management - All agents query and update the same persistent catalog tables, eliminating data drift between team members.

Type-safe data contracts - Schema-driven tables ensure every agent works with validated, consistent data structures across the entire workflow.

Audit trails - Every catalog operation is tracked, creating complete lineage for debugging multi-agent interactions.

Efficient data reuse - Agents access preprocessed results from teammates rather than recomputing expensive operations.

Cross-session persistence - Work continues seamlessly across agent restarts, deployments, and workflow interruptions.

Platform Architecture Overview

The coordination architecture consists of three layers working together:

Data Layer (Typedef/Fenic) - Persistent catalog stores all shared state as typed DataFrame tables. Semantic operators enable AI-native transformations on this data.

Tool Layer (MCP Server) - Model Context Protocol server exposes catalog operations as callable tools with automatic type validation and parameter checking.

Orchestration Layer (CrewAI) - Manages agent task delegation, execution order, and result synthesis while agents interact with the shared catalog.

This separation keeps data management deterministic and traceable while allowing probabilistic reasoning in the orchestration layer.

Setting Up the Typedef Foundation

Install Fenic and configure your environment:

bash
pip install fenic
export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"

Create a persistent session with catalog storage:

python
from fenic.api.session.session import Session
from fenic.api.session.config import (
    SessionConfig,
    SemanticConfig,
    OpenAILanguageModel,
    AnthropicLanguageModel
)
from pathlib import Path

config = SessionConfig(
    app_name="crewai_coordination",
    db_path=Path("./crew_catalog.db"),  # Persistent local storage
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=100,
                tpm=100000
            ),
            "claude": AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=100,
                input_tpm=100000,
                output_tpm=100000
            )
        },
        default_language_model="gpt4"
    )
)

session = Session.get_or_create(config)

# Create dedicated database for crew coordination
session.catalog.create_database("crew_workspace")
session.catalog.set_current_database("crew_workspace")

The db_path parameter creates a persistent catalog that survives across sessions. Multiple agents can connect to the same catalog file, creating the shared coordination layer.

Creating Shared Data Tables

Define tables that serve as coordination points for your agent team:

python
from fenic.core.types import (
    StringType, IntegerType, TimestampType,
    BooleanType, Schema, ColumnField
)
from datetime import datetime

# Task registry - tracks all work items
task_schema = Schema([
    ColumnField("task_id", StringType),
    ColumnField("task_type", StringType),
    ColumnField("description", StringType),
    ColumnField("assigned_agent", StringType),
    ColumnField("status", StringType),  # pending, in_progress, completed, failed
    ColumnField("priority", IntegerType),
    ColumnField("created_at", TimestampType),
    ColumnField("completed_at", TimestampType),
    ColumnField("result_data", StringType)
])

session.catalog.create_table(
    "task_registry",
    task_schema,
    description="Central registry of all agent tasks with status tracking"
)

# Research findings - shared knowledge base
findings_schema = Schema([
    ColumnField("finding_id", StringType),
    ColumnField("source_agent", StringType),
    ColumnField("topic", StringType),
    ColumnField("content", StringType),
    ColumnField("confidence_score", IntegerType),
    ColumnField("created_at", TimestampType),
    ColumnField("verified", BooleanType)
])

session.catalog.create_table(
    "research_findings",
    findings_schema,
    description="Consolidated research results from all research agents"
)

# Agent communication log - coordination messages
messages_schema = Schema([
    ColumnField("message_id", StringType),
    ColumnField("from_agent", StringType),
    ColumnField("to_agent", StringType),
    ColumnField("message_type", StringType),
    ColumnField("content", StringType),
    ColumnField("timestamp", TimestampType)
])

session.catalog.create_table(
    "agent_messages",
    messages_schema,
    description="Inter-agent communication and coordination messages"
)

These tables form the coordination backbone. Every agent reads task assignments, writes findings, and logs coordination messages to these shared tables.

Building Catalog-Backed Tools

Create tools that expose catalog operations to your CrewAI agents:

python
import fenic.api.functions as fc
from fenic.core.mcp.types import ToolParam
from fenic.core.types import StringType, IntegerType

# Tool: Claim a pending task
claim_task = (
    session.table("task_registry")
    .filter(
        (fc.col("status") == "pending") &
        (fc.col("task_type") == fc.tool_param("task_type", StringType))
    )
    .sort(fc.col("priority").desc())
    .limit(1)
)

session.catalog.create_tool(
    tool_name="claim_next_task",
    tool_description="Claims the highest priority pending task of specified type",
    tool_query=claim_task,
    result_limit=1,
    tool_params=[
        ToolParam(
            name="task_type",
            description="Type of task to claim",
            allowed_values=["research", "analysis", "writing", "review"]
        )
    ]
)

# Tool: Store research findings
store_finding = session.table("research_findings")

session.catalog.create_tool(
    tool_name="store_research_finding",
    tool_description="Stores a research finding in the shared knowledge base",
    tool_query=store_finding,
    result_limit=10,
    tool_params=[
        ToolParam(
            name="finding_id",
            description="Unique identifier for this finding"
        ),
        ToolParam(
            name="source_agent",
            description="Name of the agent storing this finding"
        ),
        ToolParam(
            name="topic",
            description="Research topic or category"
        ),
        ToolParam(
            name="content",
            description="The research finding content"
        ),
        ToolParam(
            name="confidence_score",
            description="Confidence level 1-10"
        )
    ]
)

# Tool: Search existing findings
search_findings = (
    session.table("research_findings")
    .filter(
        fc.col("topic").contains(fc.tool_param("search_term", StringType)) |
        fc.col("content").contains(fc.tool_param("search_term", StringType))
    )
    .filter(fc.col("verified") == True)
    .sort(fc.col("confidence_score").desc())
)

session.catalog.create_tool(
    tool_name="search_research_findings",
    tool_description="Searches verified research findings by topic or content",
    tool_query=search_findings,
    result_limit=20,
    tool_params=[
        ToolParam(
            name="search_term",
            description="Term to search for in findings"
        )
    ]
)

# Tool: Update task status
update_status = (
    session.table("task_registry")
    .filter(fc.col("task_id") == fc.tool_param("task_id", StringType))
)

session.catalog.create_tool(
    tool_name="update_task_status",
    tool_description="Updates the status of a specific task",
    tool_query=update_status,
    result_limit=1,
    tool_params=[
        ToolParam(
            name="task_id",
            description="ID of the task to update"
        ),
        ToolParam(
            name="new_status",
            description="New status for the task",
            allowed_values=["pending", "in_progress", "completed", "failed"]
        )
    ]
)

These declarative tools eliminate boilerplate while maintaining type safety. The catalog validates parameters and enforces schemas automatically.

Exposing Tools via MCP Server

Launch an MCP server to make catalog tools accessible to agents:

python
from fenic.api.mcp.server import create_mcp_server, run_mcp_server_sync
from fenic.api.mcp.tools import SystemToolConfig

# Retrieve all catalog tools
catalog_tools = session.catalog.list_tools()

# Create MCP server with both catalog and system tools
server = create_mcp_server(
    session=session,
    server_name="CrewAI Coordination Server",
    user_defined_tools=catalog_tools,
    system_tools=SystemToolConfig(
        table_names=session.catalog.list_tables(),
        tool_namespace="crew",
        max_result_rows=100
    ),
    concurrency_limit=10
)

# Run server
run_mcp_server_sync(
    server,
    transport="http",
    stateless_http=False,  # Maintain state for coordination
    port=8000,
    host="127.0.0.1",
    path="/mcp"
)

For production deployments, use ASGI:

python
from fenic.api.mcp.server import run_mcp_server_asgi

app = run_mcp_server_asgi(
    server,
    stateless_http=False,
    port=8000,
    host="0.0.0.0",
    path="/mcp"
)

# Deploy with: uvicorn server:app --host 0.0.0.0 --port 8000 --workers 4

The MCP server handles concurrent tool calls from multiple agents while maintaining catalog consistency.

Building CrewAI Agents with Catalog Access

Create CrewAI agents that interact with the shared catalog through custom tools:

python
from crewai import Agent, Task, Crew
from crewai_tools import tool
import requests
from typing import Dict, Any

# Wrapper function for MCP tool calls
def call_mcp_tool(tool_name: str, params: Dict[str, Any]) -> Any:
    """Call an MCP tool on the Typedef server."""
    response = requests.post(
        "http://127.0.0.1:8000/mcp/tools/call_tool",
        json={
            "tool_name": tool_name,
            "arguments": params
        },
        timeout=30
    )
    response.raise_for_status()
    return response.json()

# CrewAI tool: Claim task
@tool("Claim Next Task")
def claim_task_tool(task_type: str) -> str:
    """
    Claims the next available task of the specified type from the task registry.

    Args:
        task_type: Type of task to claim (research, analysis, writing, review)

    Returns:
        Task details as JSON string
    """
    result = call_mcp_tool("claim_next_task", {"task_type": task_type})
    return str(result)

# CrewAI tool: Store finding
@tool("Store Research Finding")
def store_finding_tool(
    finding_id: str,
    source_agent: str,
    topic: str,
    content: str,
    confidence_score: int
) -> str:
    """
    Stores a research finding in the shared knowledge base.

    Args:
        finding_id: Unique identifier for the finding
        source_agent: Name of the agent storing the finding
        topic: Research topic or category
        content: The finding content
        confidence_score: Confidence level 1-10

    Returns:
        Confirmation message
    """
    result = call_mcp_tool("store_research_finding", {
        "finding_id": finding_id,
        "source_agent": source_agent,
        "topic": topic,
        "content": content,
        "confidence_score": confidence_score
    })
    return "Finding stored successfully"

# CrewAI tool: Search findings
@tool("Search Research Findings")
def search_findings_tool(search_term: str) -> str:
    """
    Searches the shared knowledge base for relevant research findings.

    Args:
        search_term: Term to search for in findings

    Returns:
        List of matching findings as JSON string
    """
    result = call_mcp_tool("search_research_findings", {
        "search_term": search_term
    })
    return str(result)

# Define specialized agents
research_agent = Agent(
    role="Research Specialist",
    goal="Conduct thorough research and store findings in shared catalog",
    backstory="""You are an expert researcher who systematically gathers
    information and stores findings in the team's shared knowledge base.
    You always check existing findings before starting new research.""",
    tools=[claim_task_tool, store_finding_tool, search_findings_tool],
    verbose=True,
    allow_delegation=False
)

analysis_agent = Agent(
    role="Data Analyst",
    goal="Analyze research findings and identify patterns",
    backstory="""You are a data analyst who reviews research findings
    from the shared catalog and identifies patterns and insights. You
    build on the work of your research teammates.""",
    tools=[search_findings_tool, store_finding_tool],
    verbose=True,
    allow_delegation=False
)

synthesis_agent = Agent(
    role="Content Synthesizer",
    goal="Create comprehensive reports from team findings",
    backstory="""You are a skilled writer who synthesizes research and
    analysis from the shared catalog into clear, actionable reports. You
    ensure all team findings are properly incorporated.""",
    tools=[search_findings_tool],
    verbose=True,
    allow_delegation=False
)

These agents access the same catalog through their tools, ensuring perfect coordination.

Implementing Coordination Patterns

Pattern 1: Task Queue Coordination

Multiple agents claim tasks from a shared queue:

python
# Initialize task queue
import fenic.api.functions as fc
from datetime import datetime
import uuid

tasks_df = session.create_dataframe({
    "task_id": [str(uuid.uuid4()) for _ in range(5)],
    "task_type": ["research", "research", "analysis", "writing", "review"],
    "description": [
        "Research AI safety frameworks",
        "Research ethical AI guidelines",
        "Analyze research findings on AI safety",
        "Write summary of AI ethics landscape",
        "Review final report"
    ],
    "assigned_agent": [None] * 5,
    "status": ["pending"] * 5,
    "priority": [10, 8, 6, 4, 2],
    "created_at": [datetime.now()] * 5,
    "completed_at": [None] * 5,
    "result_data": [None] * 5
})

tasks_df.write.save_as_table("task_registry", mode="append")

# Define task claiming workflow
research_task = Task(
    description="""
    1. Claim the next research task using claim_task_tool
    2. Conduct the research
    3. Store findings using store_finding_tool
    4. Update task status to completed
    """,
    agent=research_agent,
    expected_output="Research findings stored in shared catalog"
)

analysis_task = Task(
    description="""
    1. Wait for research tasks to complete
    2. Search research findings on relevant topics
    3. Analyze patterns and insights
    4. Store analysis results
    """,
    agent=analysis_agent,
    expected_output="Analysis results stored in shared catalog"
)

# Create crew with task dependencies
crew = Crew(
    agents=[research_agent, analysis_agent, synthesis_agent],
    tasks=[research_task, analysis_task],
    verbose=True
)

result = crew.kickoff()

Pattern 2: Knowledge Sharing Between Agents

Agents build on each other's work through the shared catalog:

python
# Agent 1 stores initial finding
def research_workflow():
    # Search for existing work first
    existing = call_mcp_tool("search_research_findings", {
        "search_term": "AI safety"
    })

    if not existing:
        # No existing research, conduct new research
        finding_id = str(uuid.uuid4())
        call_mcp_tool("store_research_finding", {
            "finding_id": finding_id,
            "source_agent": "research_agent_1",
            "topic": "AI Safety",
            "content": "Research finding content here...",
            "confidence_score": 8
        })
        return finding_id
    else:
        # Build on existing research
        return existing

# Agent 2 accesses and enhances the finding
def analysis_workflow(research_topic: str):
    # Retrieve findings from catalog
    findings = call_mcp_tool("search_research_findings", {
        "search_term": research_topic
    })

    # Analyze and store enhanced finding
    call_mcp_tool("store_research_finding", {
        "finding_id": str(uuid.uuid4()),
        "source_agent": "analysis_agent",
        "topic": research_topic,
        "content": f"Analysis based on previous findings: {findings}",
        "confidence_score": 9
    })

Pattern 3: Semantic Processing with Shared Context

Use Fenic's semantic operators to enrich shared data:

python
import fenic.api.functions as fc
from pydantic import BaseModel, Field
from typing import List

class EnrichedFinding(BaseModel):
    summary: str = Field(description="One-sentence summary")
    key_points: List[str] = Field(description="Key points")
    related_topics: List[str] = Field(description="Related research areas")
    action_items: List[str] = Field(description="Suggested next steps")

# Enrich findings with semantic extraction
findings_df = session.table("research_findings")

enriched = findings_df.select(
    fc.col("*"),
    fc.semantic.extract(
        fc.col("content"),
        EnrichedFinding,
        model_alias="gpt4"
    ).alias("enriched_data")
).unnest("enriched_data")

# Store enriched findings back to catalog
enriched.write.save_as_table("enriched_findings", mode="overwrite")

session.catalog.set_table_description(
    "enriched_findings",
    "Research findings with AI-extracted summaries and action items"
)

Now all agents can query structured, enriched findings rather than raw text.

Advanced Coordination Strategies

Multi-Stage Agent Pipeline

Chain multiple agent teams with catalog handoffs:

python
# Stage 1: Research team populates initial findings
research_crew = Crew(
    agents=[research_agent],
    tasks=[research_task],
    process="sequential"
)

research_result = research_crew.kickoff()

# Stage 2: Analysis team processes research findings
analysis_task = Task(
    description="""
    Query research_findings table for new entries since last run.
    Analyze patterns and store insights in analysis_results table.
    """,
    agent=analysis_agent,
    expected_output="Analysis complete with insights stored"
)

analysis_crew = Crew(
    agents=[analysis_agent],
    tasks=[analysis_task],
    process="sequential"
)

analysis_result = analysis_crew.kickoff()

# Stage 3: Synthesis team creates final output
synthesis_task = Task(
    description="""
    Retrieve all research findings and analysis results.
    Create comprehensive report synthesizing team's work.
    """,
    agent=synthesis_agent,
    expected_output="Final report incorporating all team findings"
)

synthesis_crew = Crew(
    agents=[synthesis_agent],
    tasks=[synthesis_task],
    process="sequential"
)

final_result = synthesis_crew.kickoff()

Real-Time Status Monitoring

Track agent progress through catalog queries:

python
# Monitor task completion rates
task_status = session.sql("""
    SELECT
        status,
        COUNT(*) as task_count,
        AVG(CASE
            WHEN completed_at IS NOT NULL AND created_at IS NOT NULL
            THEN EXTRACT(EPOCH FROM (completed_at - created_at))
            ELSE NULL
        END) as avg_completion_seconds
    FROM {tasks}
    GROUP BY status
""", tasks=session.table("task_registry"))

task_status.show()

# Track agent productivity
agent_metrics = session.sql("""
    SELECT
        source_agent,
        COUNT(*) as findings_contributed,
        AVG(confidence_score) as avg_confidence,
        COUNT(DISTINCT topic) as topics_covered
    FROM {findings}
    GROUP BY source_agent
    ORDER BY findings_contributed DESC
""", findings=session.table("research_findings"))

agent_metrics.show()

Async Tool Execution for External APIs

Integrate external data sources with catalog coordination:

python
import aiohttp
import fenic.api.functions as fc
from fenic.api.functions.builtin import async_udf
from fenic.core.types import StructType, StructField, StringType, IntegerType

@async_udf(
    return_type=StructType([
        StructField("external_data", StringType),
        StructField("status_code", IntegerType)
    ]),
    max_concurrency=20,
    timeout_seconds=10,
    num_retries=3
)
async def fetch_external_context(topic: str) -> dict:
    """Fetch external context for research topics."""
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.example.com/research?topic={topic}"
        ) as resp:
            return {
                "external_data": await resp.text(),
                "status_code": resp.status
            }

# Enrich catalog data with external sources
enriched_research = (
    session.table("research_findings")
    .select(
        fc.col("*"),
        fetch_external_context(fc.col("topic")).alias("external_context")
    )
    .unnest("external_context")
)

enriched_research.write.save_as_table(
    "externally_enriched_research",
    mode="overwrite"
)

Monitoring and Performance Optimization

Track Catalog Operations

Use built-in metrics to monitor agent coordination:

python
import fenic.api.functions as fc

# Query metrics for all catalog operations
metrics_df = session.table("fenic_system.query_metrics")

# Analyze tool call patterns
tool_usage = metrics_df.select(
    fc.col("tool_name"),
    fc.col("total_lm_requests"),
    fc.col("total_lm_cost"),
    fc.col("latency_ms")
).filter(
    fc.col("tool_name").is_not_null()
).group_by("tool_name").agg(
    fc.count("*").alias("call_count"),
    fc.avg("latency_ms").alias("avg_latency"),
    fc.sum("total_lm_cost").alias("total_cost")
).order_by("call_count", ascending=False)

tool_usage.show()

# Monitor coordination bottlenecks
slow_operations = metrics_df.filter(
    fc.col("latency_ms") > 5000
).select(
    fc.col("query_text"),
    fc.col("latency_ms"),
    fc.col("end_ts")
).order_by("latency_ms", ascending=False)

slow_operations.show(10)

Optimize Data Access Patterns

Cache frequently accessed catalog data:

python
# Cache hot data for faster agent access
frequently_accessed = (
    session.table("research_findings")
    .filter(fc.col("verified") == True)
    .filter(fc.col("confidence_score") >= 8)
    .cache()  # Keep in memory
)

# Use cached data in agent workflows
high_confidence_findings = frequently_accessed.filter(
    fc.col("topic").contains("AI safety")
)

Implement Circuit Breakers

Prevent cascade failures in multi-agent systems:

python
def safe_catalog_operation(operation_func, max_retries=3):
    """Execute catalog operations with retry logic."""
    for attempt in range(max_retries):
        try:
            result = operation_func()
            return result
        except Exception as e:
            if attempt == max_retries - 1:
                # Log failure to catalog
                error_df = session.create_dataframe({
                    "operation": [operation_func.__name__],
                    "error": [str(e)],
                    "timestamp": [datetime.now()],
                    "attempt": [attempt + 1]
                })
                error_df.write.save_as_table(
                    "operation_errors",
                    mode="append"
                )
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

Best Practices for Production Deployment

Schema evolution management - Version catalog tables as your agent system evolves. Create new tables rather than modifying schemas in production.

python
# Version your schemas
session.catalog.create_table(
    "research_findings_v2",
    updated_schema,
    description="Research findings with enhanced metadata (v2)"
)

Concurrent access control - Configure appropriate concurrency limits on your MCP server to prevent resource exhaustion.

python
server = create_mcp_server(
    session=session,
    server_name="Production Coordination Server",
    user_defined_tools=catalog_tools,
    concurrency_limit=50,  # Adjust based on load
)

Catalog backup and recovery - Regularly backup your catalog database:

bash
# Backup catalog
cp crew_catalog.db crew_catalog_backup_$(date +%Y%m%d).db

# Restore if needed
cp crew_catalog_backup_20250101.db crew_catalog.db

Tool result limits - Set appropriate limits to prevent memory issues:

python
session.catalog.create_tool(
    tool_name="search_findings",
    tool_query=query,
    result_limit=100,  # Prevent excessive result sets
    tool_params=params
)

Rate limiting configuration - Configure model rate limits to prevent quota exhaustion:

python
config = SessionConfig(
    app_name="production_crew",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4.1-nano",
                rpm=100,  # Requests per minute
                tpm=100000  # Tokens per minute
            )
        }
    )
)

Catalog maintenance - Implement cleanup routines for old data:

python
# Archive completed tasks older than 30 days
old_tasks = session.sql("""
    SELECT * FROM {tasks}
    WHERE status = 'completed'
    AND completed_at < CURRENT_TIMESTAMP - INTERVAL '30 days'
""", tasks=session.table("task_registry"))

old_tasks.write.save_as_table("archived_tasks", mode="append")

# Remove from active registry by keeping only recent/active tasks
active_tasks = session.sql("""
    SELECT * FROM {tasks}
    WHERE status != 'completed'
    OR completed_at >= CURRENT_TIMESTAMP - INTERVAL '30 days'
""", tasks=session.table("task_registry"))

active_tasks.write.save_as_table("task_registry", mode="overwrite")

Conclusion

Typedef's catalog system transforms CrewAI agent teams from loosely coordinated collections into synchronized, data-driven systems. By centralizing state management through a persistent, queryable catalog, you eliminate coordination failures while maintaining full audit trails.

The combination of Fenic's semantic operations, type-safe tool creation, and MCP server integration provides the infrastructure needed for production multi-agent systems. Start with simple task queue coordination, then scale to sophisticated multi-stage pipelines as your agent team grows.

For teams building production agent systems, explore Fenic's open-source framework or learn more about Typedef's platform. How to Coordinate CrewAI Ag ... fcf0804b9998e36c77e7b43f.md External Displaying How to Coordinate CrewAI Agent Teams with a Single 295df41efcf0804b9998e36c77e7b43f.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.