Previously: cost control. A budget-enforced harness can't run away. What it still can't do is survive a crash. The machine reboots, the process is killed, the laptop lid closes — and the session is gone.
Durability for agent harnesses has a specific shape, different from databases or web services. Three problems.
Crash during a turn. The harness dies mid-LLM-call or mid-tool-execution. On restart, we need to know where we were without duplicating work — especially side-effecting work.
Restart across processes. The user comes back tomorrow. The harness starts a new process, reads a checkpoint, and picks up. The full transcript, the plan, the scratchpad, all restored.
Idempotency for side effects. Tool calls that did succeed before the crash must not re-execute on resume. A payment charged once must not be charged twice.
This chapter builds the checkpointer that addresses all three. It's a SQLite-backed system with pre-execution and post-result durability, idempotency keys for mutating tools, and a verify-before-retry protocol for irreversible operations.
The foundational reference here is Pat Helland's 2012 ACM Queue article "Idempotence Is Not a Medical Condition," which remains the canonical treatment of idempotency in distributed systems. Helland's core argument — that any system built to survive restart must treat every side-effecting operation as potentially replayable and design its protocol so replays are safe — is the principle this chapter's Checkpointer implements specifically for agent tool dispatch. LangGraph's AsyncPostgresSaver is the contemporary production reference for this pattern in LLM harness code; we build the SQLite version because it's simpler to read and the interface ports to Postgres without code changes when you need to scale across machines.
The durable state of a session:
The last one is the novel bit. Chapter 6's registry maintained an in-memory _call_history for loop detection. For resumability, we need a persistent record of every side-effecting tool call with its idempotency state.
# src/harness/checkpoint/store.py
from __future__ import annotations
import asyncio
import json
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
SCHEMA = """
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
status TEXT NOT NULL -- 'active', 'completed', 'failed', 'cancelled'
);
CREATE TABLE IF NOT EXISTS checkpoints (
session_id TEXT NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL,
transcript_json TEXT NOT NULL,
plan_json TEXT,
budget_spent_usd REAL NOT NULL DEFAULT 0,
PRIMARY KEY (session_id, version)
);
CREATE TABLE IF NOT EXISTS tool_calls (
session_id TEXT NOT NULL,
call_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
args_json TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
status TEXT NOT NULL, -- 'issued', 'completed', 'failed'
result_text TEXT,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
PRIMARY KEY (session_id, call_id)
);
CREATE INDEX IF NOT EXISTS idx_tool_calls_idempotency
ON tool_calls(idempotency_key);
"""
class Checkpointer:
def __init__(self, db_path: str | Path) -> None:
self._path = Path(db_path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(self._path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(SCHEMA)
self._lock = asyncio.Lock()
@contextmanager
def _transaction(self):
try:
yield self._conn
self._conn.commit()
except Exception:
self._conn.rollback()
raise
async def start_session(self, session_id: str,
user_message: str = "", system_prompt: str | None = None) -> None:
"""Record a new session or touch its updated_at.
user_message and system_prompt are accepted for API back-compat but
no longer stored on `sessions` — they live in the first checkpoint's
transcript_json. A resume with a new user message does not overwrite
the original.
"""
async with self._lock:
now = datetime.now(timezone.utc).isoformat()
with self._transaction() as conn:
# INSERT OR IGNORE preserves the original created_at
conn.execute("""
INSERT OR IGNORE INTO sessions
(session_id, created_at, updated_at, status)
VALUES (?, ?, ?, 'active')
""", (session_id, now, now))
conn.execute("""
UPDATE sessions SET updated_at = ?, status = 'active'
WHERE session_id = ?
""", (now, session_id))
async def save_checkpoint(
self, session_id: str, transcript: list[dict],
plan: dict | None, budget_spent_usd: float,
) -> int:
async with self._lock:
now = datetime.now(timezone.utc).isoformat()
with self._transaction() as conn:
row = conn.execute(
"SELECT COALESCE(MAX(version), 0) + 1 FROM checkpoints "
"WHERE session_id = ?", (session_id,)
).fetchone()
version = row[0]
conn.execute("""
INSERT INTO checkpoints
(session_id, version, created_at,
transcript_json, plan_json, budget_spent_usd)
VALUES (?, ?, ?, ?, ?, ?)
""", (session_id, version, now,
json.dumps(transcript),
json.dumps(plan) if plan else None,
budget_spent_usd))
conn.execute("""
UPDATE sessions SET updated_at = ? WHERE session_id = ?
""", (now, session_id))
return version
async def record_tool_call_issued(
self, session_id: str, call_id: str, tool_name: str,
args: dict, idempotency_key: str,
) -> None:
async with self._lock:
now = datetime.now(timezone.utc).isoformat()
with self._transaction() as conn:
conn.execute("""
INSERT OR IGNORE INTO tool_calls
(session_id, call_id, tool_name, args_json,
idempotency_key, status, started_at)
VALUES (?, ?, ?, ?, ?, 'issued', ?)
""", (session_id, call_id, tool_name, json.dumps(args),
idempotency_key, now))
async def record_tool_call_result(
self, session_id: str, call_id: str, result_text: str,
success: bool,
) -> None:
async with self._lock:
now = datetime.now(timezone.utc).isoformat()
with self._transaction() as conn:
conn.execute("""
UPDATE tool_calls
SET status = ?, result_text = ?, completed_at = ?
WHERE session_id = ? AND call_id = ?
""", ('completed' if success else 'failed',
result_text, now, session_id, call_id))
async def find_completed_call(
self, idempotency_key: str,
) -> dict | None:
async with self._lock:
row = self._conn.execute("""
SELECT call_id, tool_name, args_json, result_text, status
FROM tool_calls
WHERE idempotency_key = ? AND status = 'completed'
LIMIT 1
""", (idempotency_key,)).fetchone()
if row is None:
return None
return dict(row)
async def load_latest(self, session_id: str) -> dict | None:
async with self._lock:
row = self._conn.execute("""
SELECT transcript_json, plan_json, budget_spent_usd, version
FROM checkpoints
WHERE session_id = ?
ORDER BY version DESC
LIMIT 1
""", (session_id,)).fetchone()
if row is None:
return None
return {
"transcript": json.loads(row["transcript_json"]),
"plan": json.loads(row["plan_json"]) if row["plan_json"] else None,
"budget_spent_usd": row["budget_spent_usd"],
"version": row["version"],
}
Three decisions worth naming.
Versioned checkpoints, not in-place updates. Every save_checkpoint inserts a new row with an incremented version. Disk-cheap; debugging-priceless. If a session's final answer was wrong, you can load any earlier checkpoint and see what the state was at that moment.
Tool-call log has its own table. Not inside the checkpoint — the checkpoint is the conversation snapshot, the tool-call log is the side-effect record. They have different update frequencies and different query patterns.
sessions is identity, checkpoints is content. The first user message and the system prompt don't live on sessions — they're part of checkpoints[version=1].transcript_json. One session is a multi-turn conversation, so one row in sessions represents N user messages; storing "the original user message" would require choosing which one. Keep identity (session_id, status, timing) on sessions; keep content (transcript, plan, budget) on checkpoints. If a caller wants the original user message accessible without parsing JSON — a dashboard listing sessions by their opening question, say — persist it as a separate column on sessions yourself; nothing in the schema below depends on it being there.
The idempotency discipline. Before a mutating tool runs, record that we're about to run it. When it completes, record the result. On resume, check the log: if the call is recorded as issued but not completed, we don't know whether it actually executed. If it's completed, we know the result and return it without re-running.
# src/harness/tools/registry.py (checkpoint-aware addition)
@dataclass
class ToolRegistry:
# ... existing fields
checkpointer: "Checkpointer | None" = None
session_id: str | None = None
async def adispatch(self, name: str, args: dict, call_id: str) -> ToolResult:
# ... existing validation, permission, loop checks
tool = self.tools[name]
is_mutating = "mutate" in tool.side_effects or "write" in tool.side_effects
idempotency_key = None
if is_mutating and self.checkpointer and self.session_id:
idempotency_key = self._compute_idempotency_key(name, args)
# Check for a prior completion with the same key.
prior = await self.checkpointer.find_completed_call(idempotency_key)
if prior is not None:
return ToolResult(
call_id=call_id,
content=(f"[idempotent replay of {prior['call_id']}]: "
f"{prior['result_text']}"),
)
# Record intent to execute BEFORE executing.
await self.checkpointer.record_tool_call_issued(
self.session_id, call_id, name, args, idempotency_key
)
try:
content = tool.run(**args)
result = ToolResult(call_id=call_id, content=content)
except Exception as e:
result = ToolResult(
call_id=call_id,
content=f"{name} raised {type(e).__name__}: {e}",
is_error=True,
)
if is_mutating and self.checkpointer and self.session_id:
await self.checkpointer.record_tool_call_result(
self.session_id, call_id, result.content, not result.is_error
)
return result
def _compute_idempotency_key(self, name: str, args: dict) -> str:
import hashlib
payload = f"{self.session_id}:{name}:{json.dumps(args, sort_keys=True)}"
return hashlib.sha256(payload.encode()).hexdigest()
The key construction matters. Our default idempotency key is a hash of session_id + tool_name + args. Within a session, calling write_file(/tmp/plan.txt, "v1") twice is treated as one operation (the second call returns the cached result). Different sessions aren't deduped against each other — two sessions writing to the same file legitimately can both run.
This is conservative. A stricter key would not include session — "email with this subject and body, ever, is one operation." For email-like tools where the real idempotency is vendor-side, pass the vendor's idempotency key as part of args so it's in the hash.
Verify before retry is what happens next session on interrupted calls:
# src/harness/checkpoint/resume.py
async def check_pending_tool_calls(
checkpointer: Checkpointer, session_id: str
) -> list[dict]:
"""Return tool calls that were issued but not completed — needs verification."""
async with checkpointer._lock:
rows = checkpointer._conn.execute("""
SELECT call_id, tool_name, args_json, started_at
FROM tool_calls
WHERE session_id = ? AND status = 'issued'
""", (session_id,)).fetchall()
return [dict(r) for r in rows]
On session resume, check_pending_tool_calls returns the calls where we don't know the outcome. The caller — the harness's startup logic — handles them based on the tool's side effects:
verify hook that checks whether the side effect landed, or (c) surfaces to the user for manual resolution.Option (b) is the best when tools support it. Add an optional verify callable to the Tool dataclass; on resume, the harness calls it and records the result.
The loop saves a checkpoint after each completed turn:
# src/harness/agent.py (checkpoint-aware sketch)
async def arun(
# ... existing parameters
checkpointer: "Checkpointer | None" = None,
session_id: str | None = None,
) -> str:
# ... setup
if checkpointer and session_id:
await checkpointer.start_session(session_id, user_message, system)
for iteration in range(MAX_ITERATIONS):
# ... turn execution
if checkpointer and session_id:
await checkpointer.save_checkpoint(
session_id=session_id,
transcript=_serialize_transcript(transcript),
plan=_serialize_plan(plan_holder.plan) if plan_holder else None,
budget_spent_usd=budget_enforcer.spent_usd if budget_enforcer else 0.0,
)
The _serialize_* helpers are one side of a round-trip; the resume path in §21.4 needs the inverse. Both helpers live in a small module so arun (write side) and the example script (read side) share them:
# src/harness/checkpoint/serde.py
from __future__ import annotations
from dataclasses import asdict
from datetime import datetime
from typing import Any
from ..messages import (
Message, Transcript,
TextBlock, ReasoningBlock, ToolCall, ToolResult,
)
from ..plans.tools import Plan # §16.2's Plan dataclass
def _serialize_transcript(transcript: Transcript) -> list[dict]:
out: list[dict] = []
for msg in transcript.messages:
out.append({
"id": msg.id,
"role": msg.role,
"created_at": msg.created_at.isoformat(),
"blocks": [asdict(b) for b in msg.blocks],
})
return out
def _deserialize_transcript(data: list[dict]) -> Transcript:
messages: list[Message] = []
for m in data:
blocks = [_deserialize_block(b) for b in m["blocks"]]
messages.append(Message(
id=m["id"],
role=m["role"],
created_at=datetime.fromisoformat(m["created_at"]),
blocks=blocks,
))
return Transcript(messages=messages)
def _deserialize_block(d: dict):
"""Dispatch on the `kind` discriminator from §3.2's block dataclasses."""
kind = d["kind"]
if kind == "text":
return TextBlock(text=d["text"])
if kind == "reasoning":
return ReasoningBlock(text=d["text"], metadata=d.get("metadata", {}))
if kind == "tool_call":
return ToolCall(id=d["id"], name=d["name"], args=d["args"])
if kind == "tool_result":
return ToolResult(
call_id=d["call_id"],
content=d["content"],
is_error=d.get("is_error", False),
)
raise ValueError(f"unknown block kind: {kind!r}")
def _serialize_plan(plan: Plan | None) -> dict | None:
return asdict(plan) if plan is not None else None
def _deserialize_plan(data: dict | None) -> Plan | None:
return Plan(**data) if data is not None else None
Nothing here is clever — it's what the §3.2 block discriminator buys you. The kind field on every block makes deserialization a dispatch table, not a guessing game. dataclasses.asdict walks nested dataclasses for us on the way out; Plan(**data) reconstructs on the way in. The one subtlety is created_at: datetime doesn't round-trip through JSON by default, so we serialize with .isoformat() and deserialize with datetime.fromisoformat(...).
After every turn, we have a checkpoint. If the process dies before the next turn, the last checkpoint is the recoverable state. Startup uses the deserializers to rehydrate the in-memory objects that arun (and §16's plan_holder) want:
# examples/ch21_resume.py
import asyncio
from harness.agent import arun
from harness.checkpoint.store import Checkpointer
from harness.checkpoint.resume import check_pending_tool_calls
from harness.checkpoint.serde import (
_deserialize_transcript, _deserialize_plan,
)
from harness.messages import Transcript
from harness.plans.tools import PlanHolder
from harness.providers.anthropic import AnthropicProvider
from harness.tools.selector import ToolCatalog
from harness.tools.std import STANDARD_TOOLS
async def main():
checkpointer = Checkpointer(".harness/sessions.db")
session_id = "session-alpha"
# 1. Side-effect verification — surface anything interrupted mid-call.
pending = await check_pending_tool_calls(checkpointer, session_id)
if pending:
print(f"WARNING: {len(pending)} tool calls were interrupted mid-execution.")
for p in pending:
print(f" - {p['call_id']} {p['tool_name']} {p['args_json']}")
# In production: run verification tools or ask the user before continuing.
# 2. Rehydrate in-memory state from the latest checkpoint, if any.
transcript: Transcript | None = None
plan_holder = PlanHolder()
latest = await checkpointer.load_latest(session_id)
if latest is not None:
print(f"Resuming session-alpha from checkpoint v{latest['version']}")
transcript = _deserialize_transcript(latest["transcript"])
if latest["plan"] is not None:
plan_holder.plan = _deserialize_plan(latest["plan"])
else:
print("Starting new session-alpha")
# 3. Pass the rehydrated transcript into arun via the `transcript=`
# parameter from §5.5.1 — it resumes the conversation, it doesn't
# replace the user's new message. Passing `transcript=None` is the
# fresh-session path (arun builds a new Transcript internally).
provider = AnthropicProvider()
catalog = ToolCatalog(tools=STANDARD_TOOLS)
result = await arun(
provider=provider,
catalog=catalog,
user_message="continue from where we left off",
transcript=transcript, # ← rehydrated, or None
plan_holder=plan_holder, # ← rehydrated plan (may be empty)
checkpointer=checkpointer,
session_id=session_id,
)
print(result.summary)
asyncio.run(main())
Three steps, each with a distinct job: verify interrupted side effects (§21.3), rehydrate in-memory objects via the deserializers above, pass them into arun through its existing parameters. No new arun parameters are needed — transcript= (from Ch 5) and plan_holder= (from Ch 16) are already the resume seam, which is the whole point of making both of them optional injection points earlier in the book.
The harness now has memory across process deaths. Combined with the scratchpad from Chapter 9 (which was already durable on disk and needs no rehydration), you have full continuity: conversation state rebuilt from SQLite, plan state rebuilt from SQLite, durable findings already on disk, and idempotent replay for any side effects that were in flight.
We save after every turn. That's defensible for most harnesses — turns are the natural unit of progress, and SQLite writes of a few KB each are microseconds. But on very fast/short turns, checkpointing every turn starts to be a measurable fraction of latency.
Three cadences worth knowing:
The interface (save_checkpoint) is the same; the caller decides when.
The checkpointer interface matches LangGraph's. When you outgrow SQLite (multiple machines, high write rates), swap Checkpointer for a Postgres-backed implementation. The schema is essentially the same; the connection handling is different; the interface you pass to arun doesn't change.
What you actually get from Postgres: concurrent access across processes, network access from separate machines, point-in-time restore, replication. None of which you need on day one; all of which you might need on day 400.
git add -A && git commit -m "ch21: SQLite checkpointer with idempotency-aware tool dispatch"
git tag ch21-resume
checkpoints table. How many versions are there? Can you reconstruct the session's evolution from them? Delete a middle version; does subsequent resume still work? (Answer: yes, because resume uses latest, but this tells you about recovery options.)Chapter 20. Cost Control
Previously: evals measure correctness. Nothing in the harness caps spend. The $47K agent-loop incident (DEV Community, Nov 2025) was two agents ping-ponging requests for eleven days; alerts fired, no one stopped them. Alerts are not enforcement.
Chapter 22. What Transfers, Where to Go
Previously: twenty-one chapters of cumulative engineering. The harness has a loop, a transcript, adapters for three providers, a tool registry with validation and loop detection, streaming, async, permissions, MCP integration, a scratchpad, retrieval, compaction, sub-agents, structured plans, parallel coordination, observability, evals, cost control, and durable checkpointing.