Skip to content

Storage

Session persistence. See the Persistence guide for the on-disk layout.

SessionStore

SessionStore

SessionStore(data_dir: str | Path, session_id: str, resume: bool = False)

Persists session data to a directory on disk.

Layout:

<data_dir>/sessions/<session_id>/
    traj.jsonl          # incremental append-only event log
    trajectory.json     # full trajectory, overwritten after each turn
    turns/
        000_input.txt
        000_raw_output.jsonl
Source code in caw/storage.py
def __init__(self, data_dir: str | Path, session_id: str, resume: bool = False) -> None:
    self._session_dir = Path(data_dir) / "sessions" / session_id
    self._turns_dir = self._session_dir / "turns"
    self._turns_dir.mkdir(parents=True, exist_ok=True)
    # On resume, continue numbering after the existing turn files so we
    # append to — rather than overwrite — the original session's record.
    self._turn_counter = self._next_turn_index() if resume else 0
    self._jsonl = JsonlWriter(self._session_dir / "traj.jsonl")

session_dir property

session_dir: Path

Path to this session's directory.

jsonl_path property

jsonl_path: Path

Path to the traj.jsonl file.

write_metadata

write_metadata(trajectory: Trajectory) -> None

Append a metadata entry to traj.jsonl (called once at session start).

Source code in caw/storage.py
def write_metadata(self, trajectory: Trajectory) -> None:
    """Append a metadata entry to traj.jsonl (called once at session start)."""
    self._jsonl.write_metadata(trajectory)

finalize

finalize(trajectory: Trajectory) -> None

Write trajectory.json with the complete session record.

Source code in caw/storage.py
def finalize(self, trajectory: Trajectory) -> None:
    """Write trajectory.json with the complete session record."""
    self._save_trajectory(trajectory)

append_turn

append_turn(turn: Turn, trajectory: Trajectory, raw_output: str | None = None) -> None

Record a completed turn: event JSONL lines, trajectory snapshot, and raw files.

Source code in caw/storage.py
def append_turn(self, turn: Turn, trajectory: Trajectory, raw_output: str | None = None) -> None:
    """Record a completed turn: event JSONL lines, trajectory snapshot, and raw files."""
    prefix = f"{self._turn_counter:03d}"

    # Raw turn files
    input_path = self._turns_dir / f"{prefix}_input.txt"
    input_path.write_text(turn.input, encoding="utf-8")

    if raw_output is not None:
        output_path = self._turns_dir / f"{prefix}_raw_output.jsonl"
        output_path.write_text(raw_output, encoding="utf-8")

    # Per-event JSONL lines
    self._jsonl.write_turn_events(turn, self._turn_counter)

    # Full trajectory snapshot
    self._save_trajectory(trajectory)

    self._turn_counter += 1

JsonlWriter

JsonlWriter

JsonlWriter(path: str | Path, *, subagent: str | None = None)

Append-only JSONL writer with file locking for concurrent safety.

When subagent is set, every entry is tagged with "subagent": name so readers can distinguish parent vs. subagent events.

Source code in caw/storage.py
def __init__(self, path: str | Path, *, subagent: str | None = None) -> None:
    self._path = Path(path)
    self._subagent = subagent

append

append(entry: dict[str, Any]) -> None

Append a single JSON object as one line (file-locked).

Source code in caw/storage.py
def append(self, entry: dict[str, Any]) -> None:
    """Append a single JSON object as one line (file-locked)."""
    if self._subagent:
        entry = {**entry, "subagent": self._subagent}
    with open(self._path, "a", encoding="utf-8") as f:
        fcntl.flock(f, fcntl.LOCK_EX)
        f.write(json.dumps(entry) + "\n")

write_metadata

write_metadata(trajectory: Trajectory) -> None

Write a metadata entry (typically once at session start).

Source code in caw/storage.py
def write_metadata(self, trajectory: Trajectory) -> None:
    """Write a metadata entry (typically once at session start)."""
    self.append(
        {
            "type": "metadata",
            "session_id": trajectory.session_id,
            "created_at": trajectory.created_at,
            "agent": trajectory.agent,
            "model": trajectory.model,
            "system_prompt": trajectory.system_prompt,
            "reasoning": trajectory.reasoning,
            "mcp_servers": [
                {"name": s.name, "command": s.command, "args": s.args, "env": s.env, "url": s.url}
                for s in trajectory.mcp_servers
            ],
            "metadata": trajectory.metadata,
        }
    )

write_turn_events

write_turn_events(turn: Turn, turn_index: int) -> None

Write per-event JSONL lines for a completed turn.

Source code in caw/storage.py
def write_turn_events(self, turn: Turn, turn_index: int) -> None:
    """Write per-event JSONL lines for a completed turn."""
    # User message
    self.append({"type": "user", "message": turn.input, "turn_index": turn_index})

    # Content blocks — mirrors Display event order
    for block in turn.output:
        if isinstance(block, ThinkingBlock):
            self.append(
                {
                    "type": "thinking",
                    "text": block.text,
                    "turn_index": turn_index,
                }
            )
        elif isinstance(block, TextBlock):
            self.append(
                {
                    "type": "text",
                    "text": block.text,
                    "turn_index": turn_index,
                }
            )
        elif isinstance(block, ToolUse):
            self.append(
                {
                    "type": "tool_call",
                    "id": block.id,
                    "name": block.name,
                    "arguments": block.arguments,
                    "turn_index": turn_index,
                }
            )
            result_entry: dict[str, Any] = {
                "type": "tool_result",
                "id": block.id,
                "name": block.name,
                "output": block.output,
                "is_error": block.is_error,
                "turn_index": turn_index,
            }
            if block.subagent_trajectory:
                result_entry["subagent_trajectory"] = block.subagent_trajectory.to_dict()
            self.append(result_entry)

    # Turn-end stats
    self.append(
        {
            "type": "turn_end",
            "turn_index": turn_index,
            "usage": turn.usage.to_dict(),
            "duration_ms": turn.duration_ms,
        }
    )