Skip to content

Agent & Session

The main user-facing API. See Concepts for the mental model and Sessions for usage.

Agent

Agent

Agent(provider: str | list[str] | None = None, data_dir: str | None = None, system_prompt: str | None = None, model: str | ModelTier | None = None, reasoning: str | None = None, tools: ToolGroup | None = None, tool_servers: list[Any] | None = None, stateless_tools: list[Any] | None = None, name: str = '', description: str = '', logger: AgentLogger | None = None, **kwargs: Any)

Coding agent wrapper — unified interface across providers.

The provider argument may be:

  • a single name (e.g. "claude") — pinned to that provider;
  • a list (e.g. ["claude", "codex", "opencode"]) — a fallback order;
  • "auto" (or omitted) — use the global order from caw.set_provider_order, else CAW_PROVIDER (which may be a comma-separated list), else the default provider.

With a fallback order, the agent selects the first installed provider and, on the first send, transparently moves to the next provider if that one fails or is rate-limited — no exception handling needed by the caller.

Source code in caw/agent.py
def __init__(
    self,
    provider: str | list[str] | None = None,
    data_dir: str | None = None,
    system_prompt: str | None = None,
    model: str | ModelTier | None = None,
    reasoning: str | None = None,
    tools: ToolGroup | None = None,
    tool_servers: list[Any] | None = None,
    stateless_tools: list[Any] | None = None,
    name: str = "",
    description: str = "",
    logger: AgentLogger | None = None,
    **kwargs: Any,
) -> None:
    self._provider_name = provider
    self._provider: Provider | None = None
    self._mcp_servers: list[MCPServer] = []
    self._subagents: list[AgentSpec] = []
    self._logger = logger
    self._tool_servers: list[Any] = []  # list[MCPServerHandle], lazy import
    if tool_servers:
        for ts in tool_servers:
            self.add_tool_server(ts)
    if stateless_tools:
        self._tool_servers.append(create_stateless_tool_server(stateless_tools))
    self._data_dir = data_dir
    self._name = name
    self._description = description
    self._metadata: dict[str, Any] = {}
    if tools is not None:
        kwargs["tools"] = tools
    if system_prompt is not None:
        kwargs["system_prompt"] = system_prompt
    if model is not None:
        kwargs["model"] = model
    elif os.environ.get(CAW_MODEL):
        kwargs["model"] = os.environ[CAW_MODEL]
    if reasoning is not None:
        kwargs["reasoning"] = reasoning
    elif os.environ.get(CAW_EFFORT):
        kwargs["reasoning"] = os.environ[CAW_EFFORT]
    if "auto_wait" not in kwargs:
        env_val = os.environ.get(CAW_AUTOWAIT, "").strip().lower()
        if env_val in ("0", "false", "no", "off"):
            kwargs["auto_wait"] = False
        # Otherwise leave unset so provider default (True) applies
    self._kwargs = kwargs

provider property

provider: Provider

The resolved provider instance (lazily created).

For an auto/fallback order, this is the first provider in the order whose CLI is installed (a fast, no-network check).

mcp_servers property

mcp_servers: list[MCPServer]

Currently configured MCP servers.

metadata property

metadata: dict[str, Any]

Mutable metadata dict carried onto every session's trajectory.

set_provider

set_provider(provider: str) -> None

Set or change the provider before starting a session.

Source code in caw/agent.py
def set_provider(self, provider: str) -> None:
    """Set or change the provider before starting a session."""
    self._provider_name = provider
    self._provider = None

add_mcp_server

add_mcp_server(server: MCPServer) -> None

Register an MCP server for tool access.

Source code in caw/agent.py
def add_mcp_server(self, server: MCPServer) -> None:
    """Register an MCP server for tool access."""
    self._mcp_servers.append(server)

add_tool_server

add_tool_server(handle: Any) -> None

Register a custom HTTP tool server (MCPServerHandle or ToolKit).

If handle is a ToolKit instance, as_server() is called automatically. The handle's lifecycle (start/stop) is managed by the session.

Source code in caw/agent.py
def add_tool_server(self, handle: Any) -> None:
    """Register a custom HTTP tool server (MCPServerHandle or ToolKit).

    If *handle* is a `ToolKit` instance, ``as_server()``
    is called automatically.  The handle's lifecycle (start/stop) is managed
    by the session.
    """
    if isinstance(handle, ToolKit):
        handle = handle.as_server()
    self._tool_servers.append(handle)

set_model

set_model(model: str | ModelTier) -> None

Set the model to use for sessions.

Source code in caw/agent.py
def set_model(self, model: str | ModelTier) -> None:
    """Set the model to use for sessions."""
    self._kwargs["model"] = model

set_reasoning

set_reasoning(reasoning: str) -> None

Set the reasoning budget token (e.g. 'medium').

Source code in caw/agent.py
def set_reasoning(self, reasoning: str) -> None:
    """Set the reasoning budget token (e.g. ``'medium'``)."""
    self._kwargs["reasoning"] = reasoning

set_system_prompt

set_system_prompt(system_prompt: str) -> None

Set a system prompt that guides the agent's behavior for the session.

Source code in caw/agent.py
def set_system_prompt(self, system_prompt: str) -> None:
    """Set a system prompt that guides the agent's behavior for the session."""
    self._kwargs["system_prompt"] = system_prompt

set_tools

set_tools(tools: ToolGroup) -> None

Set the tool permission groups for sessions.

Source code in caw/agent.py
def set_tools(self, tools: ToolGroup) -> None:
    """Set the tool permission groups for sessions."""
    self._kwargs["tools"] = tools

add_subagent

add_subagent(spec: AgentSpec) -> None

Register a subagent that will be exposed as a tool.

Source code in caw/agent.py
def add_subagent(self, spec: AgentSpec) -> None:
    """Register a subagent that will be exposed as a tool."""
    self._subagents.append(spec)

get_spec

get_spec() -> AgentSpec

Return an AgentSpec snapshot of this agent's current configuration.

Source code in caw/agent.py
def get_spec(self) -> AgentSpec:
    """Return an AgentSpec snapshot of this agent's current configuration."""
    return AgentSpec(
        name=self._name,
        description=self._description,
        system_prompt=self._kwargs.get("system_prompt", ""),
        model=self._kwargs.get("model", ""),
        reasoning=self._kwargs.get("reasoning", ""),
        tools=self._kwargs.get("tools"),
        tool_servers=list(self._tool_servers),
        mcp_servers=list(self._mcp_servers),
        subagents=list(self._subagents),
        metadata=dict(self._metadata),
    )

check_limit

check_limit() -> int | None

Check if the provider's usage limit is currently active.

Sends a minimal test prompt to detect whether the configured provider and model are currently rate-limited. Returns the estimated number of minutes until the limit resets, or None if no limit is detected.

This incurs a small token cost for the probe request.

Source code in caw/agent.py
def check_limit(self) -> int | None:
    """Check if the provider's usage limit is currently active.

    Sends a minimal test prompt to detect whether the configured
    provider and model are currently rate-limited.  Returns the
    estimated number of minutes until the limit resets, or ``None``
    if no limit is detected.

    This incurs a small token cost for the probe request.
    """
    model = self._kwargs.get("model")
    if isinstance(model, ModelTier):
        model = self.provider.resolve_model(model)
    return self.provider.check_limit(model=model)

check_health

check_health(live: bool = False) -> 'ProviderHealth'

Report raw health signals for this agent's provider.

Fast by default (CLI installed + credential introspection, no token cost). Pass live=True to additionally probe whether the provider responds and is currently rate-limited. See caw.health.ProviderHealth.

Source code in caw/agent.py
def check_health(self, live: bool = False) -> "ProviderHealth":
    """Report raw health signals for this agent's provider.

    Fast by default (CLI installed + credential introspection, no token
    cost).  Pass ``live=True`` to additionally probe whether the provider
    responds and is currently rate-limited.  See
    `caw.health.ProviderHealth`.
    """
    model = self._kwargs.get("model")
    if isinstance(model, ModelTier):
        model = self.provider.resolve_model(model)
    return self.provider.check_health(live=live, model=model)

interactive

interactive(initial_prompt: str, capture_bytes: int = 0, *, select_provider: bool = False, **kwargs: Any) -> InteractiveResult

Launch the provider binary interactively with an initial prompt.

The user interacts with the agent directly in their terminal. A copy of stdout is captured via a pty. MCP tool servers are started before launch and stopped after the process exits.

Parameters:

Name Type Description Default
initial_prompt str

The first message sent to the agent.

required
capture_bytes int

Maximum bytes of terminal output to keep (tail). 0 (default) means capture everything.

0
select_provider bool

When True, show an arrow-key menu of the installed providers first and launch the one the user picks (instead of this agent's configured provider). If the user cancels the menu, no agent is launched and an InteractiveResult with exit code 130 is returned.

False

Returns:

Type Description
InteractiveResult

An InteractiveResult with the exit code and captured terminal

InteractiveResult

output.

Source code in caw/agent.py
def interactive(
    self,
    initial_prompt: str,
    capture_bytes: int = 0,
    *,
    select_provider: bool = False,
    **kwargs: Any,
) -> InteractiveResult:
    """Launch the provider binary interactively with an initial prompt.

    The user interacts with the agent directly in their terminal.
    A copy of stdout is captured via a pty.  MCP tool servers are
    started before launch and stopped after the process exits.

    Args:
        initial_prompt: The first message sent to the agent.
        capture_bytes: Maximum bytes of terminal output to keep (tail).
            ``0`` (default) means capture everything.
        select_provider: When ``True``, show an arrow-key menu of the
            installed providers first and launch the one the user picks
            (instead of this agent's configured provider).  If the user
            cancels the menu, no agent is launched and an
            ``InteractiveResult`` with exit code ``130`` is returned.

    Returns:
        An ``InteractiveResult`` with the exit code and captured terminal
        output.
    """
    if select_provider:
        provider = self._pick_interactive_provider()
        if provider is None:
            return InteractiveResult(exit_code=130, output="")
    else:
        provider = self.provider

    merged = {**self._kwargs, **kwargs}

    # Remove session-only concerns
    merged.pop("auto_wait", None)
    merged.pop("metadata", None)

    # Resolve model tier
    model = merged.get("model")
    if isinstance(model, ModelTier):
        merged["model"] = provider.resolve_model(model)

    # Resolve tool restrictions — default to ALL (user is present)
    tools = merged.pop("tools", None)
    if tools is not None:
        restrictions = provider.resolve_tool_restrictions(tools)
        merged.update(restrictions)

    # Start MCP tool server handles
    all_handles: list[Any] = list(self._tool_servers)
    for handle in all_handles:
        handle.start_sync()

    all_mcp = list(self._mcp_servers)
    for handle in all_handles:
        all_mcp.append(MCPServer(name=handle.server_id, url=handle.url))

    try:
        return provider.start_interactive(
            initial_prompt, mcp_servers=all_mcp, capture_bytes=capture_bytes, **merged
        )
    finally:
        for handle in all_handles:
            try:
                handle.stop_sync()
            except Exception:
                pass

completion

completion(message: str, **kwargs: Any) -> Trajectory

Send a single message and return the complete trajectory.

Convenience wrapper for simple use cases where you don't need to maintain a multi-turn session:

traj = agent.completion("Explain this code")
print(traj.result)
Source code in caw/agent.py
def completion(self, message: str, **kwargs: Any) -> Trajectory:
    """Send a single message and return the complete trajectory.

    Convenience wrapper for simple use cases where you don't need
    to maintain a multi-turn session:

        traj = agent.completion("Explain this code")
        print(traj.result)
    """
    session = self.start_session(**kwargs)
    session.send(message)
    return session.end()

start_session

start_session(traj_path: str | Path | None = None, **kwargs: Any) -> Session

Start a new interactive session with the agent.

Parameters:

Name Type Description Default
traj_path str | Path | None

If set, the trajectory is saved to this path after each step and when Session.end is called.

None

Additional keyword arguments are forwarded to the session. Passing a logger (any object with info/warn/error string methods) makes every major event — user message, tool call, tool result, assistant text, thinking, turn-end stats — also emit a one-line summary through it, in addition to the console Display. See caw.logger.

Source code in caw/agent.py
def start_session(self, traj_path: str | Path | None = None, **kwargs: Any) -> Session:
    """Start a new interactive session with the agent.

    Args:
        traj_path: If set, the trajectory is saved to this path after each
            step and when ``Session.end`` is called.

    Additional keyword arguments are forwarded to the session. Passing a
    ``logger`` (any object with ``info``/``warn``/``error`` string methods)
    makes every major event — user message, tool call, tool result,
    assistant text, thinking, turn-end stats — also emit a one-line summary
    through it, in addition to the console ``Display``. See ``caw.logger``.
    """
    base = {**self._kwargs, **kwargs}
    auto_wait, session_metadata, logger = self._pop_session_opts(base)

    # Generate session_id early so the JSONL path is known before MCP configs
    session_id: str | None = None
    store: SessionStore | None = None
    if self._data_dir:
        session_id = str(uuid_mod.uuid4())
        store = SessionStore(self._data_dir, session_id)

    subagent_traj_dir, all_handles, all_mcp = self._start_tool_servers(store)

    # Resolve the fallback order and select the first installed provider.
    order = self._resolved_order()
    sel_index, _ = _select_installed(order)
    chosen = order[sel_index:]
    first_choice = order[0]

    def _build(provider_name: str) -> ProviderSession:
        prov = _resolve_provider(provider_name)
        merged = self._provider_session_kwargs(
            prov, base, is_fallback=(provider_name != first_choice), provider_name=provider_name
        )
        if session_id:
            merged["session_id"] = session_id
        return prov.start_session(mcp_servers=all_mcp, **merged)

    remaining = list(chosen[1:])

    def _fallback_build() -> ProviderSession | None:
        if not remaining:
            return None
        return _build(remaining.pop(0))

    provider_session = _build(chosen[0])

    session = Session(
        provider_session,
        store=store,
        subagent_traj_dir=subagent_traj_dir,
        tool_handles=all_handles,
        auto_wait=auto_wait,
        metadata=session_metadata,
        logger=logger,
        fallback_build=_fallback_build if remaining else None,
    )

    if traj_path is not None:
        session._traj_path = traj_path

    if store:
        store.write_metadata(session.trajectory)

    return session

resume_session

resume_session(resume_handle: str, **kwargs: Any) -> Session

Resume a session from a handle produced by Session.resume_handle.

Returns a live Session whose next Session.send continues the original conversation.

  • Without data_dir (or if the session isn't on disk): the backend conversation is resumed using the key embedded in the handle, but caw's trajectory starts empty (no prior turns restored).
  • With the original data_dir: the full trajectory is restored and new turns are appended to the original session directory.

A bare session id is also accepted in place of a full handle, but only when data_dir is set (the resume key is then read from disk).

Source code in caw/agent.py
def resume_session(self, resume_handle: str, **kwargs: Any) -> Session:
    """Resume a session from a handle produced by `Session.resume_handle`.

    Returns a live `Session` whose next `Session.send`
    continues the original conversation.

    - **Without ``data_dir`` (or if the session isn't on disk):** the backend
      conversation is resumed using the key embedded in the handle, but
      caw's trajectory starts empty (no prior turns restored).
    - **With the original ``data_dir``:** the full trajectory is restored
      and new turns are appended to the original session directory.

    A bare session id is also accepted in place of a full handle, but only
    when ``data_dir`` is set (the resume key is then read from disk).
    """
    decoded = _decode_resume_handle(resume_handle)
    resume_provider: Provider | None
    if decoded is not None:
        resume_provider = _resolve_provider(decoded["provider"])
        session_id = decoded["session_id"]
        resume_key: str | None = decoded["resume_key"]
        # A pinned single-provider agent must match the handle; an auto /
        # multi-provider agent simply resumes with the handle's provider.
        if self._is_pinned_single() and resume_provider.name != self.provider.name:
            raise ValueError(
                f"Handle is for provider {resume_provider.name!r} but this Agent is pinned to {self.provider.name!r}."
            )
    else:
        # Treat the string as a bare caw session id; the resume key (and
        # provider) must then come from the on-disk trajectory.
        session_id = resume_handle
        resume_key = None
        resume_provider = None

    # Load the persisted trajectory if a data_dir is available.
    trajectory: Trajectory | None = None
    store: SessionStore | None = None
    had_existing = False
    if self._data_dir:
        traj_path = Path(self._data_dir) / "sessions" / session_id / "trajectory.json"
        if traj_path.exists():
            with open(traj_path) as f:
                trajectory = Trajectory.from_dict(json.load(f))
            had_existing = True

    if resume_key is None:
        if trajectory is None:
            raise FileNotFoundError(
                f"Cannot resume {resume_handle!r}: it is not a self-contained "
                f"handle and no persisted session was found"
                + (f" under {self._data_dir}" if self._data_dir else " (no data_dir set)")
                + "."
            )
        if resume_provider is None:
            resume_provider = _resolve_provider(trajectory.agent)
        resume_key = resume_provider.resume_key_from_trajectory(trajectory)
        if not resume_key:
            raise ValueError(f"Cannot resume {resume_handle!r}: no resume key was persisted for this session.")

    assert resume_provider is not None  # set on both branches by this point

    base = {**self._kwargs, **kwargs}
    auto_wait, session_metadata, logger = self._pop_session_opts(base)

    if self._data_dir:
        store = SessionStore(self._data_dir, session_id, resume=True)

    subagent_traj_dir, all_handles, all_mcp = self._start_tool_servers(store)

    merged = self._provider_session_kwargs(resume_provider, base)
    # session_id is fixed by the handle/trajectory, not generated.
    merged.pop("session_id", None)
    provider_session = resume_provider.resume_session(
        mcp_servers=all_mcp,
        session_id=session_id,
        resume_key=resume_key,
        trajectory=trajectory,
        **merged,
    )

    session = Session(
        provider_session,
        store=store,
        subagent_traj_dir=subagent_traj_dir,
        tool_handles=all_handles,
        auto_wait=auto_wait,
        metadata=session_metadata,
        logger=logger,
    )
    # Fresh on-disk session (data_dir set but nothing persisted yet): seed
    # the metadata line like start_session does.
    if store is not None and not had_existing:
        store.write_metadata(session.trajectory)
    return session

Session

Session

Session(provider_session: ProviderSession, store: SessionStore | None = None, subagent_traj_dir: str | None = None, tool_handles: list[Any] | None = None, auto_wait: bool = True, metadata: dict[str, Any] | None = None, logger: AgentLogger | None = None, fallback_build: Callable[[], ProviderSession | None] | None = None)

A live interaction session with a coding agent.

Source code in caw/agent.py
def __init__(
    self,
    provider_session: ProviderSession,
    store: SessionStore | None = None,
    subagent_traj_dir: str | None = None,
    tool_handles: list[Any] | None = None,
    auto_wait: bool = True,
    metadata: dict[str, Any] | None = None,
    logger: AgentLogger | None = None,
    fallback_build: Callable[[], ProviderSession | None] | None = None,
) -> None:
    self._session = provider_session
    self._store = store
    self._subagent_traj_dir = subagent_traj_dir
    self._tool_handles = tool_handles or []
    self._auto_wait = auto_wait
    self._metadata: dict[str, Any] = dict(metadata) if metadata else {}
    self._readonly = False
    self._send_lock = threading.Lock()
    self._async_send_lock: asyncio.Lock | None = None
    self._traj_path: str | Path | None = None
    self._logger = logger
    # Auto-provider fallback: builds the next provider's session, or returns
    # None when the order is exhausted.  Consulted only on the first send.
    self._fallback_build = fallback_build
    self._committed = False
    if logger is not None:
        self._session.set_logger(logger)

trajectory property

trajectory: Trajectory

Accumulated trajectory (available during and after the session).

resume_handle property

resume_handle: str

Opaque string for resuming this session later, possibly in another process.

Store it anywhere (a database, a file, …) and pass it to Agent.resume_session. The handle is self-contained: it carries the backend resume key, so resuming works even without the original data_dir (you just won't get the prior trajectory restored). If the resuming Agent does share the same data_dir, the full history is restored and new turns are appended.

Raises if the backend has not yet assigned a resume key — send at least one message first.

session_dir property

session_dir: Path | None

Path to the session's data directory, or None if persistence is disabled.

send_async async

send_async(message: str) -> Turn

Async version of send — runs in a thread.

Messages are processed in FIFO order: if multiple send_async calls overlap, each waits for the previous one to finish before starting. This lets you fire-and-forget multiple messages:

tasks = [asyncio.create_task(session.send_async(m)) for m in msgs]
turns = await asyncio.gather(*tasks)  # executed in order

You can also do async work while a send is in progress:

task = asyncio.create_task(session.send_async(prompt))
while not task.done():
    source = await asyncio.wait_for(queue.get(), timeout=0.5)
    yield source
turn = await task
Source code in caw/agent.py
async def send_async(self, message: str) -> Turn:
    """Async version of `send` — runs in a thread.

    Messages are processed in FIFO order: if multiple ``send_async``
    calls overlap, each waits for the previous one to finish before
    starting.  This lets you fire-and-forget multiple messages:

        tasks = [asyncio.create_task(session.send_async(m)) for m in msgs]
        turns = await asyncio.gather(*tasks)  # executed in order

    You can also do async work while a send is in progress:

        task = asyncio.create_task(session.send_async(prompt))
        while not task.done():
            source = await asyncio.wait_for(queue.get(), timeout=0.5)
            yield source
        turn = await task
    """
    if self._async_send_lock is None:
        self._async_send_lock = asyncio.Lock()
    async with self._async_send_lock:
        return await asyncio.to_thread(self.send, message)

send

send(message: str) -> Turn

Send a message and get the agent's response turn.

When auto-wait is enabled and the provider reports a usage limit, this method sleeps until the limit resets and then automatically resumes the conversation — transparently to the caller.

When the session was created with an auto-provider fallback order, the first send transparently moves to the next provider if the current one fails (CLI missing, auth error) or is rate-limited — the caller never sees the exception. Once a provider has produced the first turn the session is committed to it (conversation context cannot be moved across CLIs), so later failures propagate normally.

Source code in caw/agent.py
def send(self, message: str) -> Turn:
    """Send a message and get the agent's response turn.

    When auto-wait is enabled and the provider reports a usage limit,
    this method sleeps until the limit resets and then automatically
    resumes the conversation — transparently to the caller.

    When the session was created with an auto-provider fallback order, the
    *first* send transparently moves to the next provider if the current
    one fails (CLI missing, auth error) or is rate-limited — the caller
    never sees the exception.  Once a provider has produced the first turn
    the session is committed to it (conversation context cannot be moved
    across CLIs), so later failures propagate normally.
    """
    if self._readonly:
        raise RuntimeError("Cannot send messages on a loaded session")
    with self._send_lock:
        if not self._committed and self._fallback_build is not None:
            return self._first_send_with_fallback(message)
        return self._send_with_autowait(message)

end

end() -> Trajectory

End the session and return the complete trajectory.

Source code in caw/agent.py
def end(self) -> Trajectory:
    """End the session and return the complete trajectory."""
    if self._readonly:
        raise RuntimeError("Cannot send messages on a loaded session")
    self._session.end()
    traj = self.trajectory
    traj.completed_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
    if traj.turns and self._session.detect_usage_limit(traj.turns[-1]) is not None:
        traj.usage_limited = True
    if self._store is not None:
        self._store.finalize(traj)
    # Stop all tool server handles
    for handle in self._tool_handles:
        try:
            handle.stop_sync()
        except Exception:
            pass
    # Auto-save trajectory if configured
    if self._traj_path is not None:
        try:
            p = Path(self._traj_path)
            p.parent.mkdir(parents=True, exist_ok=True)
            with open(p, "w") as f:
                json.dump(traj.to_dict(), f, indent=2)
        except Exception:
            logger.warning("Failed to save trajectory to %s", self._traj_path, exc_info=True)
    return traj

save_trajectory

save_trajectory(path: str | Path) -> None

Save the trajectory to a JSON file at the given path.

Source code in caw/agent.py
def save_trajectory(self, path: str | Path) -> None:
    """Save the trajectory to a JSON file at the given path."""
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    with open(p, "w") as f:
        json.dump(self.trajectory.to_dict(), f, indent=2)

load_trajectory classmethod

load_trajectory(path: str | Path) -> Session

Load a trajectory from a JSON file. The returned session is read-only.

Source code in caw/agent.py
@classmethod
def load_trajectory(cls, path: str | Path) -> Session:
    """Load a trajectory from a JSON file. The returned session is read-only."""
    with open(path) as f:
        data = json.load(f)
    traj = Trajectory.from_dict(data)
    return cls._from_trajectory(traj)