Skip to content

Tools & MCP

Declarative tools and the lower-level MCP HTTP server infrastructure. See the ToolKit & tools and MCP servers guides for usage.

ToolKit

ToolKit

Base class for declarative MCP tool servers.

Subclass, decorate methods with @tool, call as_server() to get an MCPServerHandle.

as_server

as_server(server_id: str | None = None) -> MCPServerHandle

Build and return an MCPServerHandle with all @tool methods registered.

Source code in caw/toolkit.py
def as_server(self, server_id: str | None = None) -> MCPServerHandle:
    """Build and return an `MCPServerHandle` with all ``@tool`` methods registered."""
    cls = type(self)
    if cls._thread_safe and not hasattr(self, "_toolkit_lock"):
        self._toolkit_lock = threading.Lock()
    sid = server_id or f"{cls._server_name or cls.__name__}_{uuid_mod.uuid4().hex[:6]}"

    handle = create_mcp_http_server_bundle(
        sid,
        display_name=cls._display_name or cls._server_name or cls.__name__,
        state_instance=self,
    )

    for attr_name in dir(cls):
        attr = getattr(cls, attr_name, None)
        if attr is None:
            continue
        info = getattr(attr, "_toolkit_tool_info", None)
        if info is None:
            continue
        wrapper = _make_tool_func(attr, info)
        register_tool(handle.server, wrapper)

    return handle

@tool

tool

tool(name: str | None = None, *, description: str | None = None, title: str | None = None, annotations: Any | None = None, structured_output: bool | None = None)

Mark a method as an MCP tool. Does NOT modify the function itself.

Source code in caw/toolkit.py
def tool(
    name: str | None = None,
    *,
    description: str | None = None,
    title: str | None = None,
    annotations: Any | None = None,
    structured_output: bool | None = None,
):
    """Mark a method as an MCP tool.  Does NOT modify the function itself."""

    def decorator(method):
        method._toolkit_tool_info = {
            "name": name,
            "description": description,
            "title": title,
            "annotations": annotations,
            "structured_output": structured_output,
        }
        return method

    return decorator

MCP tool servers

MCPServerHandle dataclass

MCPServerHandle(server_id: str, server: FastMCP, host: str = '127.0.0.1', port: int | None = None, path: str | None = None, _state_instance: Any = None, _server_task: Task | None = None, _uvicorn_server: Server | None = None, uvicorn_log_level: str | None = 'error', _bound_socket: socket | None = None, _daemon_thread: Thread | None = None, _daemon_loop: AbstractEventLoop | None = None, _ready_event: Event | None = None, _startup_error: BaseException | None = None)

Convenience wrapper exposing runner + agent config for a FastMCP server.

runner

runner() -> Callable[[], None]

Return a sync function that blocks while serving the MCP HTTP endpoint.

Source code in caw/mcp.py
def runner(self) -> Callable[[], None]:
    """Return a sync function that blocks while serving the MCP HTTP endpoint."""
    bound_socket = self._bound_socket

    async def _serve() -> None:
        uvicorn_server = self._build_uvicorn_server()
        if bound_socket is not None:
            await uvicorn_server.serve(sockets=[bound_socket])
        else:
            await uvicorn_server.serve()

    def _run() -> None:
        asyncio.run(_serve())

    return _run

run_in_background async

run_in_background() -> AsyncIterator[None]

Async context manager that runs the HTTP server on a background task.

Source code in caw/mcp.py
@asynccontextmanager
async def run_in_background(self) -> AsyncIterator[None]:
    """Async context manager that runs the HTTP server on a background task."""
    await self.start()
    try:
        yield
    finally:
        await self.stop()

get_state

get_state() -> Any

Return the cached state instance (if provided).

Source code in caw/mcp.py
def get_state(self) -> Any:
    """Return the cached state instance (if provided)."""
    return self._state_instance

start async

start(max_retries: int = 5) -> None

Start the HTTP server in the background with retry on port conflict.

Source code in caw/mcp.py
async def start(self, max_retries: int = 5) -> None:
    """Start the HTTP server in the background with retry on port conflict."""
    if self._server_task is not None:
        raise RuntimeError("Server already running")

    last_error = None
    for attempt in range(max_retries):
        try:
            if self._bound_socket is None:
                self._bound_socket = _create_bound_socket(self.host)
                self.port = self._bound_socket.getsockname()[1]
                self.server.settings.port = self.port

            uvicorn_server = self._build_uvicorn_server()
            self._uvicorn_server = uvicorn_server
            self._server_task = asyncio.create_task(uvicorn_server.serve(sockets=[self._bound_socket]))
            host, port = self._ensure_address()
            await _wait_for_server_ready(host, port)
            return
        except (OSError, RuntimeError) as e:
            last_error = e
            if self._uvicorn_server is not None:
                self._uvicorn_server.should_exit = True
            if self._server_task is not None:
                self._server_task.cancel()
                try:
                    await self._server_task
                except (asyncio.CancelledError, Exception):
                    pass
            self._server_task = None
            self._uvicorn_server = None
            if self._bound_socket is not None:
                try:
                    self._bound_socket.close()
                except OSError:
                    pass
                self._bound_socket = None
            if attempt < max_retries - 1:
                await asyncio.sleep(0.1 * (attempt + 1))

    raise RuntimeError(f"Failed to start MCP server after {max_retries} attempts: {last_error}")

stop async

stop() -> None

Stop the background HTTP server.

Source code in caw/mcp.py
async def stop(self) -> None:
    """Stop the background HTTP server."""
    if self._server_task is None:
        return
    if self._uvicorn_server is not None:
        self._uvicorn_server.should_exit = True
    try:
        await self._server_task
    except asyncio.CancelledError:
        pass
    finally:
        self._server_task = None
        self._uvicorn_server = None

start_sync

start_sync(timeout: float = 30.0) -> None

Start the server from a synchronous context.

Spawns a daemon thread with its own event loop, starts the server inside it, and blocks the calling thread until the server is ready.

Source code in caw/mcp.py
def start_sync(self, timeout: float = 30.0) -> None:
    """Start the server from a synchronous context.

    Spawns a daemon thread with its own event loop, starts the server
    inside it, and blocks the calling thread until the server is ready.
    """
    if self._daemon_thread is not None:
        raise RuntimeError("Server already running (sync)")

    ready = threading.Event()
    self._ready_event = ready
    self._startup_error = None

    def _daemon_main() -> None:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self._daemon_loop = loop

        async def _run() -> None:
            try:
                await self.start()
            except BaseException as exc:
                self._startup_error = exc
                return
            finally:
                ready.set()

            # Keep the loop alive while the server task runs
            if self._server_task is not None:
                try:
                    await self._server_task
                except asyncio.CancelledError:
                    pass

        try:
            loop.run_until_complete(_run())
        finally:
            # Cancel lingering tasks (e.g. SSE shutdown watchers) to avoid
            # "Task was destroyed but it is pending!" warnings on exit.
            pending = asyncio.all_tasks(loop)
            for task in pending:
                task.cancel()
            if pending:
                loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
            loop.close()

    thread = threading.Thread(target=_daemon_main, daemon=True)
    self._daemon_thread = thread
    thread.start()

    ready.wait(timeout=timeout)
    if self._startup_error is not None:
        # Join the thread since startup failed
        thread.join(timeout=5)
        self._daemon_thread = None
        raise RuntimeError(f"MCP server failed to start: {self._startup_error}") from self._startup_error

stop_sync

stop_sync(timeout: float = 10.0) -> None

Stop the server from a synchronous context.

Source code in caw/mcp.py
def stop_sync(self, timeout: float = 10.0) -> None:
    """Stop the server from a synchronous context."""
    if self._daemon_thread is None or self._daemon_loop is None:
        return

    loop = self._daemon_loop
    future = asyncio.run_coroutine_threadsafe(self.stop(), loop)
    try:
        future.result(timeout=timeout)
    except Exception:
        pass

    self._daemon_thread.join(timeout=timeout)
    self._daemon_thread = None
    self._daemon_loop = None
    self._ready_event = None

mcp_tool

mcp_tool(name: str | None = None, *, title: str | None = None, description: str | None = None, annotations: Any | None = None, icons: list[Any] | None = None, meta: dict[str, Any] | None = None, structured_output: bool | None = None)

Decorator to attach MCP metadata to a tool function.

Source code in caw/mcp.py
def mcp_tool(
    name: str | None = None,
    *,
    title: str | None = None,
    description: str | None = None,
    annotations: Any | None = None,
    icons: list[Any] | None = None,
    meta: dict[str, Any] | None = None,
    structured_output: bool | None = None,
):
    """Decorator to attach MCP metadata to a tool function."""

    def decorator(func: Callable[..., Any]):
        info = {
            "name": name,
            "title": title,
            "description": description or func.__doc__ or "",
            "annotations": annotations,
            "icons": icons,
            "meta": meta,
            "structured_output": structured_output,
        }
        setattr(func, "_mcp_tool_info", info)
        return func

    return decorator

register_tool

register_tool(server: FastMCP, func: Callable[..., Any]) -> None

Register a decorated tool function with a FastMCP server.

Source code in caw/mcp.py
def register_tool(server: FastMCP, func: Callable[..., Any]) -> None:
    """Register a decorated tool function with a FastMCP server."""
    info = getattr(func, "_mcp_tool_info", None) or getattr(func, "_toolkit_tool_info", {})
    server.tool(
        name=info.get("name"),
        title=info.get("title"),
        description=info.get("description", func.__doc__ or ""),
        annotations=info.get("annotations"),
        icons=info.get("icons"),
        structured_output=info.get("structured_output"),
    )(func)

create_mcp_http_server_bundle

create_mcp_http_server_bundle(server_id: str, *, display_name: str, state_factory: Callable[[], Any] | None = None, state_instance: Any = None, state_display_name: str | None = None, state_shutdown: Callable[[Any], None | Awaitable[None]] | None = None, uvicorn_log_level: str | None = 'error', state_logger: Callable[[str], None] | None = None, **fastmcp_kwargs: Any) -> MCPServerHandle

Create a FastMCP server plus helpers for running it over HTTP.

Source code in caw/mcp.py
def create_mcp_http_server_bundle(
    server_id: str,
    *,
    display_name: str,
    state_factory: Callable[[], Any] | None = None,
    state_instance: Any = None,
    state_display_name: str | None = None,
    state_shutdown: Callable[[Any], None | Awaitable[None]] | None = None,
    uvicorn_log_level: str | None = "error",
    state_logger: Callable[[str], None] | None = None,
    **fastmcp_kwargs: Any,
) -> MCPServerHandle:
    """Create a FastMCP server plus helpers for running it over HTTP."""
    lifespan = fastmcp_kwargs.pop("lifespan", None)
    if lifespan is not None and (state_factory is not None or state_instance is not None):
        raise ValueError("Provide either lifespan or state_factory/state_instance, not both.")
    if state_factory is not None and state_instance is not None:
        raise ValueError("Provide either state_factory or state_instance, not both.")

    if state_factory is not None:
        state_instance = state_factory()

    state = state_instance
    if state is not None:
        active = 0

        @asynccontextmanager
        async def managed_lifespan(_server: FastMCP):
            nonlocal active
            first_start = active == 0
            if first_start and state_display_name:
                message = f"Initializing {state_display_name}"
                if state_logger:
                    state_logger(message)
            active += 1
            try:
                yield state
            finally:
                active -= 1
                if active == 0:
                    if state_display_name:
                        message = f"Shutting down {state_display_name}"
                        if state_logger:
                            state_logger(message)
                    if state_shutdown is not None:
                        result = state_shutdown(state)
                        if isinstance(result, Awaitable):
                            await result

        lifespan = managed_lifespan

    streamable_path = f"/mcp/{server_id}/{uuid_mod.uuid4().hex[:6]}"

    bound_socket = _create_bound_socket("127.0.0.1")
    port = bound_socket.getsockname()[1]

    server = FastMCP(
        name=display_name,
        host="127.0.0.1",
        port=port,
        streamable_http_path=streamable_path,
        lifespan=lifespan,
        **fastmcp_kwargs,
    )
    return MCPServerHandle(
        server_id=server_id,
        server=server,
        host="127.0.0.1",
        port=port,
        path=streamable_path,
        _state_instance=state_instance,
        uvicorn_log_level=uvicorn_log_level,
        _bound_socket=bound_socket,
    )

create_stateless_tool_server

create_stateless_tool_server(funcs: list[Callable[..., Any]], *, server_id: str | None = None, display_name: str = 'stateless_tools') -> MCPServerHandle

Bundle plain functions into a single MCPServerHandle.

Each function is registered as an MCP tool. Functions may optionally be decorated with mcp_tool or tool to supply metadata; bare functions are registered using their name and docstring.

Source code in caw/mcp.py
def create_stateless_tool_server(
    funcs: list[Callable[..., Any]],
    *,
    server_id: str | None = None,
    display_name: str = "stateless_tools",
) -> MCPServerHandle:
    """Bundle plain functions into a single `MCPServerHandle`.

    Each function is registered as an MCP tool.  Functions may optionally be
    decorated with `mcp_tool` or `tool` to supply
    metadata; bare functions are registered using their name and docstring.
    """
    sid = server_id or f"general_{uuid_mod.uuid4().hex[:6]}"
    handle = create_mcp_http_server_bundle(sid, display_name=display_name)
    for func in funcs:
        register_tool(handle.server, func)
    return handle

create_subagent_tool_server

create_subagent_tool_server(spec: Any, traj_dir: str, jsonl_path: str | None = None) -> MCPServerHandle

Create an HTTP tool server that exposes a subagent as a callable tool.

Parameters

spec An AgentSpec with name, description, system_prompt, model. traj_dir Directory where subagent trajectory JSON files are written. jsonl_path Path to the parent session's JSONL log (for interleaved subagent events).

Source code in caw/mcp.py
def create_subagent_tool_server(
    spec: Any,
    traj_dir: str,
    jsonl_path: str | None = None,
) -> MCPServerHandle:
    """Create an HTTP tool server that exposes a subagent as a callable tool.

    Parameters
    ----------
    spec
        An ``AgentSpec`` with ``name``, ``description``, ``system_prompt``, ``model``.
    traj_dir
        Directory where subagent trajectory JSON files are written.
    jsonl_path
        Path to the parent session's JSONL log (for interleaved subagent events).
    """
    tool_name = _sanitize_tool_name(spec.name)

    state = SubagentState(
        name=spec.name,
        description=spec.description,
        system_prompt=spec.system_prompt,
        model=spec.model or "",
        traj_dir=traj_dir,
        jsonl_path=jsonl_path or "",
        tools=getattr(spec, "tools", None),
        tool_servers=list(getattr(spec, "tool_servers", None) or []),
        mcp_servers=list(getattr(spec, "mcp_servers", None) or []),
        subagents=list(getattr(spec, "subagents", None) or []),
    )

    handle = create_mcp_http_server_bundle(
        "subagent",
        display_name=f"caw-subagent-{spec.name}",
        state_instance=state,
    )

    @mcp_tool(name=tool_name, description=spec.description)
    async def subagent_tool(prompt: str, ctx: Context) -> str:
        s: SubagentState = get_state_from_context(ctx)
        return await asyncio.to_thread(
            _run_subagent_blocking,
            prompt,
            s.system_prompt,
            s.model,
            s.traj_dir,
            s.jsonl_path,
            s.name,
            s.tools,
            s.tool_servers,
            s.mcp_servers,
            s.subagents,
        )

    register_tool(handle.server, subagent_tool)
    return handle

get_state_from_context

get_state_from_context(ctx: Context) -> Any

Return the lifespan state object from a tool Context.

Source code in caw/mcp.py
def get_state_from_context(ctx: Context) -> Any:
    """Return the lifespan state object from a tool Context."""
    return ctx.request_context.lifespan_context