Skip to content

Agent Mesh

Multi-agent orchestration: AgentMesh, MeshContext, and MeshCycleError.

mesh

Multi-agent mesh orchestration (Element 2).

Provides AgentMesh for composing multiple agent roles into orchestrated pipelines with budget propagation, trace linkage, and cycle detection.

AgentMesh

Multi-agent orchestration container.

Wraps an AgenticApp and provides decorators for declaring agent roles and orchestrators. Roles are in-process handlers that can be called by orchestrators via MeshContext.call().

Example::

app = AgenticApp(title="Research")
mesh = AgentMesh(app=app, name="research")

@mesh.role(name="researcher")
async def researcher(payload, ctx):
    return {"topic": str(payload), "points": ["a", "b"]}

@mesh.orchestrator(name="pipeline", roles=["researcher"])
async def pipeline(intent, mesh_ctx):
    return await mesh_ctx.call("researcher", intent.raw)

Parameters:

Name Type Description Default
app AgenticApp

The AgenticApp to register endpoints on.

required
name str

A human-readable name for this mesh.

'mesh'
Source code in src/agenticapi/mesh/mesh.py
class AgentMesh:
    """Multi-agent orchestration container.

    Wraps an ``AgenticApp`` and provides decorators for declaring
    agent roles and orchestrators. Roles are in-process handlers
    that can be called by orchestrators via ``MeshContext.call()``.

    Example::

        app = AgenticApp(title="Research")
        mesh = AgentMesh(app=app, name="research")

        @mesh.role(name="researcher")
        async def researcher(payload, ctx):
            return {"topic": str(payload), "points": ["a", "b"]}

        @mesh.orchestrator(name="pipeline", roles=["researcher"])
        async def pipeline(intent, mesh_ctx):
            return await mesh_ctx.call("researcher", intent.raw)

    Args:
        app: The ``AgenticApp`` to register endpoints on.
        name: A human-readable name for this mesh.
    """

    def __init__(self, *, app: AgenticApp, name: str = "mesh") -> None:
        self._app = app
        self._name = name
        self._roles: dict[str, Callable[..., Awaitable[Any]]] = {}
        self._orchestrators: dict[str, Callable[..., Awaitable[Any]]] = {}

    @property
    def name(self) -> str:
        """The mesh name."""
        return self._name

    @property
    def roles(self) -> list[str]:
        """Names of all registered roles."""
        return sorted(self._roles)

    def role(
        self,
        name: str,
        *,
        response_model: type[BaseModel] | None = None,
        description: str = "",
    ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
        """Register a mesh role.

        A role is a lightweight agent handler invokable via
        ``MeshContext.call(role_name, payload)``. It is also exposed
        as a standalone ``/agent/{name}`` endpoint on the parent app.

        Args:
            name: Unique role name within this mesh.
            response_model: Optional Pydantic model for response
                validation.
            description: Human-readable description.

        Returns:
            Decorator that registers the handler.
        """

        def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
            if name in self._roles:
                msg = f"Mesh role '{name}' is already registered."
                raise ValueError(msg)

            self._roles[name] = fn

            # Also register as a regular agent endpoint.
            @self._app.agent_endpoint(
                name=name,
                description=description or f"Mesh role: {name}",
                response_model=response_model,
            )
            async def _endpoint_wrapper(intent: Intent[Any], context: AgentContext) -> AgentResponse:
                mesh_ctx = MeshContext(
                    mesh=self,
                    trace_id=getattr(context, "trace_id", uuid.uuid4().hex),
                )
                result = await fn(intent.raw, mesh_ctx)
                return AgentResponse(result=result, reasoning=f"Mesh role '{name}'")

            logger.info("mesh_role_registered", mesh=self._name, role=name)
            return fn

        return decorator

    def orchestrator(
        self,
        name: str,
        *,
        roles: list[str] | None = None,
        description: str = "",
        budget_usd: float | None = None,
    ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
        """Register a mesh orchestrator.

        An orchestrator is an agent handler that receives a
        ``MeshContext`` and can call other roles via
        ``mesh_ctx.call()``.

        Args:
            name: Unique orchestrator name.
            roles: Optional list of role names this orchestrator
                is expected to call (documentation only — not enforced).
            description: Human-readable description.
            budget_usd: Optional total budget for sub-agent calls.

        Returns:
            Decorator that registers the handler.
        """

        def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
            if name in self._orchestrators:
                msg = f"Mesh orchestrator '{name}' is already registered."
                raise ValueError(msg)

            self._orchestrators[name] = fn

            @self._app.agent_endpoint(
                name=name,
                description=description or f"Mesh orchestrator: {name}",
            )
            async def _orch_wrapper(intent: Intent[Any], context: AgentContext) -> AgentResponse:
                mesh_ctx = MeshContext(
                    mesh=self,
                    trace_id=getattr(context, "trace_id", uuid.uuid4().hex),
                    parent_budget_remaining_usd=budget_usd,
                    call_stack=[],
                )
                result = await fn(intent, mesh_ctx)
                if isinstance(result, AgentResponse):
                    return result
                return AgentResponse(result=result, reasoning=f"Mesh orchestrator '{name}'")

            logger.info(
                "mesh_orchestrator_registered",
                mesh=self._name,
                orchestrator=name,
                declared_roles=roles or [],
            )
            return fn

        return decorator

name property

name: str

The mesh name.

roles property

roles: list[str]

Names of all registered roles.

role

role(
    name: str,
    *,
    response_model: type[BaseModel] | None = None,
    description: str = "",
) -> Callable[
    [Callable[..., Awaitable[Any]]],
    Callable[..., Awaitable[Any]],
]

Register a mesh role.

A role is a lightweight agent handler invokable via MeshContext.call(role_name, payload). It is also exposed as a standalone /agent/{name} endpoint on the parent app.

Parameters:

Name Type Description Default
name str

Unique role name within this mesh.

required
response_model type[BaseModel] | None

Optional Pydantic model for response validation.

None
description str

Human-readable description.

''

Returns:

Type Description
Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]

Decorator that registers the handler.

Source code in src/agenticapi/mesh/mesh.py
def role(
    self,
    name: str,
    *,
    response_model: type[BaseModel] | None = None,
    description: str = "",
) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
    """Register a mesh role.

    A role is a lightweight agent handler invokable via
    ``MeshContext.call(role_name, payload)``. It is also exposed
    as a standalone ``/agent/{name}`` endpoint on the parent app.

    Args:
        name: Unique role name within this mesh.
        response_model: Optional Pydantic model for response
            validation.
        description: Human-readable description.

    Returns:
        Decorator that registers the handler.
    """

    def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
        if name in self._roles:
            msg = f"Mesh role '{name}' is already registered."
            raise ValueError(msg)

        self._roles[name] = fn

        # Also register as a regular agent endpoint.
        @self._app.agent_endpoint(
            name=name,
            description=description or f"Mesh role: {name}",
            response_model=response_model,
        )
        async def _endpoint_wrapper(intent: Intent[Any], context: AgentContext) -> AgentResponse:
            mesh_ctx = MeshContext(
                mesh=self,
                trace_id=getattr(context, "trace_id", uuid.uuid4().hex),
            )
            result = await fn(intent.raw, mesh_ctx)
            return AgentResponse(result=result, reasoning=f"Mesh role '{name}'")

        logger.info("mesh_role_registered", mesh=self._name, role=name)
        return fn

    return decorator

orchestrator

orchestrator(
    name: str,
    *,
    roles: list[str] | None = None,
    description: str = "",
    budget_usd: float | None = None,
) -> Callable[
    [Callable[..., Awaitable[Any]]],
    Callable[..., Awaitable[Any]],
]

Register a mesh orchestrator.

An orchestrator is an agent handler that receives a MeshContext and can call other roles via mesh_ctx.call().

Parameters:

Name Type Description Default
name str

Unique orchestrator name.

required
roles list[str] | None

Optional list of role names this orchestrator is expected to call (documentation only — not enforced).

None
description str

Human-readable description.

''
budget_usd float | None

Optional total budget for sub-agent calls.

None

Returns:

Type Description
Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]

Decorator that registers the handler.

Source code in src/agenticapi/mesh/mesh.py
def orchestrator(
    self,
    name: str,
    *,
    roles: list[str] | None = None,
    description: str = "",
    budget_usd: float | None = None,
) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
    """Register a mesh orchestrator.

    An orchestrator is an agent handler that receives a
    ``MeshContext`` and can call other roles via
    ``mesh_ctx.call()``.

    Args:
        name: Unique orchestrator name.
        roles: Optional list of role names this orchestrator
            is expected to call (documentation only — not enforced).
        description: Human-readable description.
        budget_usd: Optional total budget for sub-agent calls.

    Returns:
        Decorator that registers the handler.
    """

    def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
        if name in self._orchestrators:
            msg = f"Mesh orchestrator '{name}' is already registered."
            raise ValueError(msg)

        self._orchestrators[name] = fn

        @self._app.agent_endpoint(
            name=name,
            description=description or f"Mesh orchestrator: {name}",
        )
        async def _orch_wrapper(intent: Intent[Any], context: AgentContext) -> AgentResponse:
            mesh_ctx = MeshContext(
                mesh=self,
                trace_id=getattr(context, "trace_id", uuid.uuid4().hex),
                parent_budget_remaining_usd=budget_usd,
                call_stack=[],
            )
            result = await fn(intent, mesh_ctx)
            if isinstance(result, AgentResponse):
                return result
            return AgentResponse(result=result, reasoning=f"Mesh orchestrator '{name}'")

        logger.info(
            "mesh_orchestrator_registered",
            mesh=self._name,
            orchestrator=name,
            declared_roles=roles or [],
        )
        return fn

    return decorator

MeshContext dataclass

Request-scoped context for inter-agent calls.

Created per-request by the mesh and injected into orchestrator handlers. Provides call() for invoking other roles.

Attributes:

Name Type Description
mesh Any

The parent AgentMesh instance.

trace_id str

The parent request's trace id.

parent_budget_remaining_usd float | None

Remaining budget from the parent request, if any. None means unbounded.

call_stack list[str]

Ordered list of role names in the current call chain, used for cycle detection.

spent_usd float

Accumulated cost across all sub-calls.

Source code in src/agenticapi/mesh/context.py
@dataclass(slots=True)
class MeshContext:
    """Request-scoped context for inter-agent calls.

    Created per-request by the mesh and injected into orchestrator
    handlers. Provides ``call()`` for invoking other roles.

    Attributes:
        mesh: The parent ``AgentMesh`` instance.
        trace_id: The parent request's trace id.
        parent_budget_remaining_usd: Remaining budget from the parent
            request, if any. ``None`` means unbounded.
        call_stack: Ordered list of role names in the current call
            chain, used for cycle detection.
        spent_usd: Accumulated cost across all sub-calls.
    """

    mesh: Any  # AgentMesh — forward reference to avoid circular import
    trace_id: str = ""
    parent_budget_remaining_usd: float | None = None
    call_stack: list[str] = field(default_factory=list)
    spent_usd: float = 0.0

    async def call(self, role: str, payload: Any) -> Any:
        """Invoke a named role within the mesh.

        Performs cycle detection, budget enforcement, and trace
        propagation before delegating to the role's handler.

        Args:
            role: The name of the role to invoke.
            payload: The intent payload (string or dict) to pass.

        Returns:
            The role handler's return value.

        Raises:
            ValueError: If the role is not registered.
            MeshCycleError: If a call cycle is detected.
            agenticapi.exceptions.BudgetExceeded: If the budget is
                exhausted.
        """
        from agenticapi.mesh.mesh import AgentMesh  # noqa: TC001

        mesh: AgentMesh = self.mesh

        if role not in mesh._roles:
            msg = f"Unknown mesh role: '{role}'. Available: {sorted(mesh._roles)}"
            raise ValueError(msg)

        # Cycle detection.
        if role in self.call_stack:
            raise MeshCycleError(role, list(self.call_stack))

        # Budget check.
        if self.parent_budget_remaining_usd is not None and self.parent_budget_remaining_usd <= 0:
            from agenticapi.exceptions import BudgetExceeded

            raise BudgetExceeded(
                scope="mesh",
                limit_usd=0.0,
                observed_usd=self.spent_usd,
                violation="Mesh budget exhausted across sub-agent calls",
            )

        child_stack = [*self.call_stack, role]
        child_trace = f"{self.trace_id}:{role}:{uuid.uuid4().hex[:8]}"

        logger.info(
            "mesh_call",
            role=role,
            depth=len(child_stack),
            trace_id=child_trace,
        )

        handler = mesh._roles[role]

        # Build a child MeshContext for nested calls.
        child_ctx = MeshContext(
            mesh=mesh,
            trace_id=child_trace,
            parent_budget_remaining_usd=self.parent_budget_remaining_usd,
            call_stack=child_stack,
            spent_usd=self.spent_usd,
        )

        result = await handler(payload, child_ctx)

        logger.info(
            "mesh_call_complete",
            role=role,
            depth=len(child_stack),
            trace_id=child_trace,
        )

        return result

call async

call(role: str, payload: Any) -> Any

Invoke a named role within the mesh.

Performs cycle detection, budget enforcement, and trace propagation before delegating to the role's handler.

Parameters:

Name Type Description Default
role str

The name of the role to invoke.

required
payload Any

The intent payload (string or dict) to pass.

required

Returns:

Type Description
Any

The role handler's return value.

Raises:

Type Description
ValueError

If the role is not registered.

MeshCycleError

If a call cycle is detected.

BudgetExceeded

If the budget is exhausted.

Source code in src/agenticapi/mesh/context.py
async def call(self, role: str, payload: Any) -> Any:
    """Invoke a named role within the mesh.

    Performs cycle detection, budget enforcement, and trace
    propagation before delegating to the role's handler.

    Args:
        role: The name of the role to invoke.
        payload: The intent payload (string or dict) to pass.

    Returns:
        The role handler's return value.

    Raises:
        ValueError: If the role is not registered.
        MeshCycleError: If a call cycle is detected.
        agenticapi.exceptions.BudgetExceeded: If the budget is
            exhausted.
    """
    from agenticapi.mesh.mesh import AgentMesh  # noqa: TC001

    mesh: AgentMesh = self.mesh

    if role not in mesh._roles:
        msg = f"Unknown mesh role: '{role}'. Available: {sorted(mesh._roles)}"
        raise ValueError(msg)

    # Cycle detection.
    if role in self.call_stack:
        raise MeshCycleError(role, list(self.call_stack))

    # Budget check.
    if self.parent_budget_remaining_usd is not None and self.parent_budget_remaining_usd <= 0:
        from agenticapi.exceptions import BudgetExceeded

        raise BudgetExceeded(
            scope="mesh",
            limit_usd=0.0,
            observed_usd=self.spent_usd,
            violation="Mesh budget exhausted across sub-agent calls",
        )

    child_stack = [*self.call_stack, role]
    child_trace = f"{self.trace_id}:{role}:{uuid.uuid4().hex[:8]}"

    logger.info(
        "mesh_call",
        role=role,
        depth=len(child_stack),
        trace_id=child_trace,
    )

    handler = mesh._roles[role]

    # Build a child MeshContext for nested calls.
    child_ctx = MeshContext(
        mesh=mesh,
        trace_id=child_trace,
        parent_budget_remaining_usd=self.parent_budget_remaining_usd,
        call_stack=child_stack,
        spent_usd=self.spent_usd,
    )

    result = await handler(payload, child_ctx)

    logger.info(
        "mesh_call_complete",
        role=role,
        depth=len(child_stack),
        trace_id=child_trace,
    )

    return result