Chapter 21. Resumability and Durable State

Previously: cost control. A budget-enforced harness can't run away. What it still can't do is survive a crash. The machine reboots, the process is killed, the laptop lid closes — and the session is gone.

Durability for agent harnesses has a specific shape, different from databases or web services. Three problems.

Crash during a turn. The harness dies mid-LLM-call or mid-tool-execution. On restart, we need to know where we were without duplicating work — especially side-effecting work.

Restart across processes. The user comes back tomorrow. The harness starts a new process, reads a checkpoint, and picks up. The full transcript, the plan, the scratchpad, all restored.

Idempotency for side effects. Tool calls that did succeed before the crash must not re-execute on resume. A payment charged once must not be charged twice.

This chapter builds the checkpointer that addresses all three. It's a SQLite-backed system with pre-execution and post-result durability, idempotency keys for mutating tools, and a verify-before-retry protocol for irreversible operations.

The foundational reference here is Pat Helland's 2012 ACM Queue article "Idempotence Is Not a Medical Condition," which remains the canonical treatment of idempotency in distributed systems. Helland's core argument — that any system built to survive restart must treat every side-effecting operation as potentially replayable and design its protocol so replays are safe — is the principle this chapter's Checkpointer implements specifically for agent tool dispatch. LangGraph's AsyncPostgresSaver is the contemporary production reference for this pattern in LLM harness code; we build the SQLite version because it's simpler to read and the interface ports to Postgres without code changes when you need to scale across machines.

checkpoint: transcript
+ plan
+ budget
 
Turn 1
persisted
Turn 2
persisted
Turn 3
[CRASH]
Turn 4
resumed
← load checkpoint
idempotency key check
skip completed tool calls →
Crash-resume timeline: each turn checkpoints transcript + plan + budget; after a crash, the idempotency check prevents re-executing side-effecting tools.

21.1 What Must Be Checkpointed

The durable state of a session:

  1. The transcript. Every message, every block, with IDs and timestamps.
  2. The plan. Current steps, postconditions, evidence strings.
  3. The scratchpad. Already on disk from §9.2 — the checkpointer records nothing about scratchpads because there's nothing to save. The scratchpad directory carries across process deaths on its own. If you vary the scratchpad root per session (one directory per session_id), record that root in the session metadata below so the next run knows where to look; otherwise a single shared root is fine and the checkpointer can stay out of it entirely.
  4. The budget. How much has been spent so far this session.
  5. The tool-call log. Which tool calls have been issued, which completed, which have pending results.

The last one is the novel bit. Chapter 6's registry maintained an in-memory _call_history for loop detection. For resumability, we need a persistent record of every side-effecting tool call with its idempotency state.


21.2 The Checkpointer Schema

# src/harness/checkpoint/store.py
from __future__ import annotations

import asyncio
import json
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path


SCHEMA = """
CREATE TABLE IF NOT EXISTS sessions (
    session_id TEXT PRIMARY KEY,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL,
    status TEXT NOT NULL  -- 'active', 'completed', 'failed', 'cancelled'
);

CREATE TABLE IF NOT EXISTS checkpoints (
    session_id TEXT NOT NULL,
    version INTEGER NOT NULL,
    created_at TIMESTAMP NOT NULL,
    transcript_json TEXT NOT NULL,
    plan_json TEXT,
    budget_spent_usd REAL NOT NULL DEFAULT 0,
    PRIMARY KEY (session_id, version)
);

CREATE TABLE IF NOT EXISTS tool_calls (
    session_id TEXT NOT NULL,
    call_id TEXT NOT NULL,
    tool_name TEXT NOT NULL,
    args_json TEXT NOT NULL,
    idempotency_key TEXT NOT NULL,
    status TEXT NOT NULL,  -- 'issued', 'completed', 'failed'
    result_text TEXT,
    started_at TIMESTAMP NOT NULL,
    completed_at TIMESTAMP,
    PRIMARY KEY (session_id, call_id)
);

CREATE INDEX IF NOT EXISTS idx_tool_calls_idempotency
    ON tool_calls(idempotency_key);
"""


class Checkpointer:
    def __init__(self, db_path: str | Path) -> None:
        self._path = Path(db_path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._conn = sqlite3.connect(str(self._path), check_same_thread=False)
        self._conn.row_factory = sqlite3.Row
        self._conn.executescript(SCHEMA)
        self._lock = asyncio.Lock()

    @contextmanager
    def _transaction(self):
        try:
            yield self._conn
            self._conn.commit()
        except Exception:
            self._conn.rollback()
            raise

    async def start_session(self, session_id: str,
                             user_message: str = "", system_prompt: str | None = None) -> None:
        """Record a new session or touch its updated_at.

        user_message and system_prompt are accepted for API back-compat but
        no longer stored on `sessions` — they live in the first checkpoint's
        transcript_json. A resume with a new user message does not overwrite
        the original.
        """
        async with self._lock:
            now = datetime.now(timezone.utc).isoformat()
            with self._transaction() as conn:
                # INSERT OR IGNORE preserves the original created_at
                conn.execute("""
                    INSERT OR IGNORE INTO sessions
                    (session_id, created_at, updated_at, status)
                    VALUES (?, ?, ?, 'active')
                """, (session_id, now, now))
                conn.execute("""
                    UPDATE sessions SET updated_at = ?, status = 'active'
                    WHERE session_id = ?
                """, (now, session_id))

    async def save_checkpoint(
        self, session_id: str, transcript: list[dict],
        plan: dict | None, budget_spent_usd: float,
    ) -> int:
        async with self._lock:
            now = datetime.now(timezone.utc).isoformat()
            with self._transaction() as conn:
                row = conn.execute(
                    "SELECT COALESCE(MAX(version), 0) + 1 FROM checkpoints "
                    "WHERE session_id = ?", (session_id,)
                ).fetchone()
                version = row[0]
                conn.execute("""
                    INSERT INTO checkpoints
                    (session_id, version, created_at,
                     transcript_json, plan_json, budget_spent_usd)
                    VALUES (?, ?, ?, ?, ?, ?)
                """, (session_id, version, now,
                      json.dumps(transcript),
                      json.dumps(plan) if plan else None,
                      budget_spent_usd))
                conn.execute("""
                    UPDATE sessions SET updated_at = ? WHERE session_id = ?
                """, (now, session_id))
                return version

    async def record_tool_call_issued(
        self, session_id: str, call_id: str, tool_name: str,
        args: dict, idempotency_key: str,
    ) -> None:
        async with self._lock:
            now = datetime.now(timezone.utc).isoformat()
            with self._transaction() as conn:
                conn.execute("""
                    INSERT OR IGNORE INTO tool_calls
                    (session_id, call_id, tool_name, args_json,
                     idempotency_key, status, started_at)
                    VALUES (?, ?, ?, ?, ?, 'issued', ?)
                """, (session_id, call_id, tool_name, json.dumps(args),
                      idempotency_key, now))

    async def record_tool_call_result(
        self, session_id: str, call_id: str, result_text: str,
        success: bool,
    ) -> None:
        async with self._lock:
            now = datetime.now(timezone.utc).isoformat()
            with self._transaction() as conn:
                conn.execute("""
                    UPDATE tool_calls
                    SET status = ?, result_text = ?, completed_at = ?
                    WHERE session_id = ? AND call_id = ?
                """, ('completed' if success else 'failed',
                      result_text, now, session_id, call_id))

    async def find_completed_call(
        self, idempotency_key: str,
    ) -> dict | None:
        async with self._lock:
            row = self._conn.execute("""
                SELECT call_id, tool_name, args_json, result_text, status
                FROM tool_calls
                WHERE idempotency_key = ? AND status = 'completed'
                LIMIT 1
            """, (idempotency_key,)).fetchone()
            if row is None:
                return None
            return dict(row)

    async def load_latest(self, session_id: str) -> dict | None:
        async with self._lock:
            row = self._conn.execute("""
                SELECT transcript_json, plan_json, budget_spent_usd, version
                FROM checkpoints
                WHERE session_id = ?
                ORDER BY version DESC
                LIMIT 1
            """, (session_id,)).fetchone()
            if row is None:
                return None
            return {
                "transcript": json.loads(row["transcript_json"]),
                "plan": json.loads(row["plan_json"]) if row["plan_json"] else None,
                "budget_spent_usd": row["budget_spent_usd"],
                "version": row["version"],
            }

Three decisions worth naming.

Versioned checkpoints, not in-place updates. Every save_checkpoint inserts a new row with an incremented version. Disk-cheap; debugging-priceless. If a session's final answer was wrong, you can load any earlier checkpoint and see what the state was at that moment.

Tool-call log has its own table. Not inside the checkpoint — the checkpoint is the conversation snapshot, the tool-call log is the side-effect record. They have different update frequencies and different query patterns.

sessions is identity, checkpoints is content. The first user message and the system prompt don't live on sessions — they're part of checkpoints[version=1].transcript_json. One session is a multi-turn conversation, so one row in sessions represents N user messages; storing "the original user message" would require choosing which one. Keep identity (session_id, status, timing) on sessions; keep content (transcript, plan, budget) on checkpoints. If a caller wants the original user message accessible without parsing JSON — a dashboard listing sessions by their opening question, say — persist it as a separate column on sessions yourself; nothing in the schema below depends on it being there.


21.3 Write-Before-Execute for Side Effects

The idempotency discipline. Before a mutating tool runs, record that we're about to run it. When it completes, record the result. On resume, check the log: if the call is recorded as issued but not completed, we don't know whether it actually executed. If it's completed, we know the result and return it without re-running.

# src/harness/tools/registry.py (checkpoint-aware addition)

@dataclass
class ToolRegistry:
    # ... existing fields
    checkpointer: "Checkpointer | None" = None
    session_id: str | None = None

    async def adispatch(self, name: str, args: dict, call_id: str) -> ToolResult:
        # ... existing validation, permission, loop checks

        tool = self.tools[name]
        is_mutating = "mutate" in tool.side_effects or "write" in tool.side_effects

        idempotency_key = None
        if is_mutating and self.checkpointer and self.session_id:
            idempotency_key = self._compute_idempotency_key(name, args)

            # Check for a prior completion with the same key.
            prior = await self.checkpointer.find_completed_call(idempotency_key)
            if prior is not None:
                return ToolResult(
                    call_id=call_id,
                    content=(f"[idempotent replay of {prior['call_id']}]: "
                             f"{prior['result_text']}"),
                )

            # Record intent to execute BEFORE executing.
            await self.checkpointer.record_tool_call_issued(
                self.session_id, call_id, name, args, idempotency_key
            )

        try:
            content = tool.run(**args)
            result = ToolResult(call_id=call_id, content=content)
        except Exception as e:
            result = ToolResult(
                call_id=call_id,
                content=f"{name} raised {type(e).__name__}: {e}",
                is_error=True,
            )

        if is_mutating and self.checkpointer and self.session_id:
            await self.checkpointer.record_tool_call_result(
                self.session_id, call_id, result.content, not result.is_error
            )

        return result

    def _compute_idempotency_key(self, name: str, args: dict) -> str:
        import hashlib
        payload = f"{self.session_id}:{name}:{json.dumps(args, sort_keys=True)}"
        return hashlib.sha256(payload.encode()).hexdigest()

The key construction matters. Our default idempotency key is a hash of session_id + tool_name + args. Within a session, calling write_file(/tmp/plan.txt, "v1") twice is treated as one operation (the second call returns the cached result). Different sessions aren't deduped against each other — two sessions writing to the same file legitimately can both run.

This is conservative. A stricter key would not include session — "email with this subject and body, ever, is one operation." For email-like tools where the real idempotency is vendor-side, pass the vendor's idempotency key as part of args so it's in the hash.

Verify before retry is what happens next session on interrupted calls:

# src/harness/checkpoint/resume.py

async def check_pending_tool_calls(
    checkpointer: Checkpointer, session_id: str
) -> list[dict]:
    """Return tool calls that were issued but not completed — needs verification."""
    async with checkpointer._lock:
        rows = checkpointer._conn.execute("""
            SELECT call_id, tool_name, args_json, started_at
            FROM tool_calls
            WHERE session_id = ? AND status = 'issued'
        """, (session_id,)).fetchall()
    return [dict(r) for r in rows]

On session resume, check_pending_tool_calls returns the calls where we don't know the outcome. The caller — the harness's startup logic — handles them based on the tool's side effects:

  • Read-only tool. Safe to discard (it didn't mutate anything); the agent will re-run if needed.
  • Write or mutate. The harness either: (a) marks the call as failed in the log and lets the agent decide what to do, (b) calls a tool-provided verify hook that checks whether the side effect landed, or (c) surfaces to the user for manual resolution.

Option (b) is the best when tools support it. Add an optional verify callable to the Tool dataclass; on resume, the harness calls it and records the result.


21.4 Checkpointing in the Loop

The loop saves a checkpoint after each completed turn:

# src/harness/agent.py (checkpoint-aware sketch)

async def arun(
    # ... existing parameters
    checkpointer: "Checkpointer | None" = None,
    session_id: str | None = None,
) -> str:
    # ... setup

    if checkpointer and session_id:
        await checkpointer.start_session(session_id, user_message, system)

    for iteration in range(MAX_ITERATIONS):
        # ... turn execution

        if checkpointer and session_id:
            await checkpointer.save_checkpoint(
                session_id=session_id,
                transcript=_serialize_transcript(transcript),
                plan=_serialize_plan(plan_holder.plan) if plan_holder else None,
                budget_spent_usd=budget_enforcer.spent_usd if budget_enforcer else 0.0,
            )

The _serialize_* helpers are one side of a round-trip; the resume path in §21.4 needs the inverse. Both helpers live in a small module so arun (write side) and the example script (read side) share them:

# src/harness/checkpoint/serde.py
from __future__ import annotations

from dataclasses import asdict
from datetime import datetime
from typing import Any

from ..messages import (
    Message, Transcript,
    TextBlock, ReasoningBlock, ToolCall, ToolResult,
)
from ..plans.tools import Plan  # §16.2's Plan dataclass


def _serialize_transcript(transcript: Transcript) -> list[dict]:
    out: list[dict] = []
    for msg in transcript.messages:
        out.append({
            "id": msg.id,
            "role": msg.role,
            "created_at": msg.created_at.isoformat(),
            "blocks": [asdict(b) for b in msg.blocks],
        })
    return out


def _deserialize_transcript(data: list[dict]) -> Transcript:
    messages: list[Message] = []
    for m in data:
        blocks = [_deserialize_block(b) for b in m["blocks"]]
        messages.append(Message(
            id=m["id"],
            role=m["role"],
            created_at=datetime.fromisoformat(m["created_at"]),
            blocks=blocks,
        ))
    return Transcript(messages=messages)


def _deserialize_block(d: dict):
    """Dispatch on the `kind` discriminator from §3.2's block dataclasses."""
    kind = d["kind"]
    if kind == "text":
        return TextBlock(text=d["text"])
    if kind == "reasoning":
        return ReasoningBlock(text=d["text"], metadata=d.get("metadata", {}))
    if kind == "tool_call":
        return ToolCall(id=d["id"], name=d["name"], args=d["args"])
    if kind == "tool_result":
        return ToolResult(
            call_id=d["call_id"],
            content=d["content"],
            is_error=d.get("is_error", False),
        )
    raise ValueError(f"unknown block kind: {kind!r}")


def _serialize_plan(plan: Plan | None) -> dict | None:
    return asdict(plan) if plan is not None else None


def _deserialize_plan(data: dict | None) -> Plan | None:
    return Plan(**data) if data is not None else None

Nothing here is clever — it's what the §3.2 block discriminator buys you. The kind field on every block makes deserialization a dispatch table, not a guessing game. dataclasses.asdict walks nested dataclasses for us on the way out; Plan(**data) reconstructs on the way in. The one subtlety is created_at: datetime doesn't round-trip through JSON by default, so we serialize with .isoformat() and deserialize with datetime.fromisoformat(...).

After every turn, we have a checkpoint. If the process dies before the next turn, the last checkpoint is the recoverable state. Startup uses the deserializers to rehydrate the in-memory objects that arun (and §16's plan_holder) want:

# examples/ch21_resume.py
import asyncio

from harness.agent import arun
from harness.checkpoint.store import Checkpointer
from harness.checkpoint.resume import check_pending_tool_calls
from harness.checkpoint.serde import (
    _deserialize_transcript, _deserialize_plan,
)
from harness.messages import Transcript
from harness.plans.tools import PlanHolder
from harness.providers.anthropic import AnthropicProvider
from harness.tools.selector import ToolCatalog
from harness.tools.std import STANDARD_TOOLS


async def main():
    checkpointer = Checkpointer(".harness/sessions.db")
    session_id = "session-alpha"

    # 1. Side-effect verification — surface anything interrupted mid-call.
    pending = await check_pending_tool_calls(checkpointer, session_id)
    if pending:
        print(f"WARNING: {len(pending)} tool calls were interrupted mid-execution.")
        for p in pending:
            print(f"  - {p['call_id']} {p['tool_name']} {p['args_json']}")
        # In production: run verification tools or ask the user before continuing.

    # 2. Rehydrate in-memory state from the latest checkpoint, if any.
    transcript: Transcript | None = None
    plan_holder = PlanHolder()
    latest = await checkpointer.load_latest(session_id)
    if latest is not None:
        print(f"Resuming session-alpha from checkpoint v{latest['version']}")
        transcript = _deserialize_transcript(latest["transcript"])
        if latest["plan"] is not None:
            plan_holder.plan = _deserialize_plan(latest["plan"])
    else:
        print("Starting new session-alpha")

    # 3. Pass the rehydrated transcript into arun via the `transcript=`
    #    parameter from §5.5.1 — it resumes the conversation, it doesn't
    #    replace the user's new message. Passing `transcript=None` is the
    #    fresh-session path (arun builds a new Transcript internally).
    provider = AnthropicProvider()
    catalog = ToolCatalog(tools=STANDARD_TOOLS)
    result = await arun(
        provider=provider,
        catalog=catalog,
        user_message="continue from where we left off",
        transcript=transcript,            # ← rehydrated, or None
        plan_holder=plan_holder,          # ← rehydrated plan (may be empty)
        checkpointer=checkpointer,
        session_id=session_id,
    )
    print(result.summary)


asyncio.run(main())

Three steps, each with a distinct job: verify interrupted side effects (§21.3), rehydrate in-memory objects via the deserializers above, pass them into arun through its existing parameters. No new arun parameters are needed — transcript= (from Ch 5) and plan_holder= (from Ch 16) are already the resume seam, which is the whole point of making both of them optional injection points earlier in the book.

The harness now has memory across process deaths. Combined with the scratchpad from Chapter 9 (which was already durable on disk and needs no rehydration), you have full continuity: conversation state rebuilt from SQLite, plan state rebuilt from SQLite, durable findings already on disk, and idempotent replay for any side effects that were in flight.


21.5 Choosing Your Checkpoint Cadence

We save after every turn. That's defensible for most harnesses — turns are the natural unit of progress, and SQLite writes of a few KB each are microseconds. But on very fast/short turns, checkpointing every turn starts to be a measurable fraction of latency.

Three cadences worth knowing:

  • Per turn (our default). Maximum durability. Suitable for human-facing agents where turn latency is 100ms–several seconds.
  • Per tool call. Even finer. Useful when individual tool calls are long-running and you want to resume mid-turn. Costs more SQLite writes per session.
  • Periodic. Every N turns or every N seconds. Used by LangGraph's checkpointer by default. Good for high-throughput batch scenarios where partial-loss is acceptable.

The interface (save_checkpoint) is the same; the caller decides when.


21.6 Postgres When You're Ready

The checkpointer interface matches LangGraph's. When you outgrow SQLite (multiple machines, high write rates), swap Checkpointer for a Postgres-backed implementation. The schema is essentially the same; the connection handling is different; the interface you pass to arun doesn't change.

What you actually get from Postgres: concurrent access across processes, network access from separate machines, point-in-time restore, replication. None of which you need on day one; all of which you might need on day 400.


21.7 Commit

git add -A && git commit -m "ch21: SQLite checkpointer with idempotency-aware tool dispatch"
git tag ch21-resume

21.8 Try It Yourself

  1. Crash and resume. Run a long session, kill the process after turn 4, restart and resume. Confirm the new session loads the previous transcript. Confirm any side-effecting tool calls that were pending were either verified, re-run safely, or surfaced for resolution.
  2. Duplicate write test. Write a tool that sends a mock "email" (prints a line to a file). Run the agent to send one email. Run it again in the same session with the same args. Confirm the second call returns the cached result, not a second "email."
  3. Corruption audit. After a session completes, examine the checkpoints table. How many versions are there? Can you reconstruct the session's evolution from them? Delete a middle version; does subsequent resume still work? (Answer: yes, because resume uses latest, but this tells you about recovery options.)