Previously: tools are first-class and dispatch through a registry. The loop is tight, typed, and provider-agnostic. What it doesn't yet do is stream output to the user, stop cleanly when the user changes their mind, or survive a transient network failure.
Three things happen in every production run that our loop from Chapter 4 doesn't handle gracefully, and they share enough infrastructure that this chapter fixes them together rather than in three separate passes. The model takes fifteen seconds to generate a long response, and the user sits staring at a silent terminal. The user decides they asked the wrong question and hits Ctrl-C, and the partial state vanishes without a trace. The provider returns a 503, and the whole run dies. Streaming, cooperative interruption, and retries on transient failures — all three are load-bearing in any system you'd want to put behind a CLI.
By the end of this chapter, your harness streams responses token by token, captures a clean checkpoint on interrupt, and recovers from transient provider errors with exponential backoff under a bounded per-session budget. The common thread is async: we refactor to asyncio here and keep it for the rest of the book.
Up to this chapter, everything has been synchronous. The loop calls provider.complete(); the provider calls the vendor SDK; the SDK blocks the thread until the response arrives; the loop dispatches the tool; the tool runs; rinse, repeat.
Sync was fine for what we'd built. It doesn't scale to what we're about to build for three reasons:
Streaming is natively async. Both Anthropic's and OpenAI's streaming APIs yield events as server-sent events (SSE). You can read them synchronously with a generator, but async is the idiomatic Python fit — async for event in stream: just works.
Interruption needs cooperative cancellation. A Ctrl-C in a synchronous loop either gets caught mid-network-read (messy, leaves connection state dangling) or between operations (you can't actually interrupt a blocking call). Async gives us asyncio.CancelledError — a typed, structured way to unwind a partially-complete turn without corrupting state. The conceptual framing here comes from Nathaniel J. Smith's 2018 essay "Notes on structured concurrency, happy nurseries, and related stuff," which argued that cancellation deserves to be a first-class primitive in any concurrent system and whose design influenced Trio, anyio, and the more recent cancellation-scope improvements in CPython's own asyncio. Cooperative cancellation is the whole reason we can catch Ctrl-C cleanly in §5.6 without blowing up the partial transcript.
Parallel sub-agents want concurrency. Chapter 17 runs multiple sub-agents in parallel. That's a coroutine-per-agent story, not a thread-per-agent one — threads would work but are heavier than needed and subtly hostile to the vendor SDKs.
One concession: we keep a sync entry point for scripts and tests that don't need the async machinery. run() becomes arun() (async); a thin run() wrapper calls asyncio.run(arun(...)) when we're invoked synchronously.
Different providers stream in different shapes. Anthropic emits message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop. OpenAI emits chat.completion.chunk with a delta field. Our job is to normalize both into a single internal event stream so the loop doesn't care which provider fed it.
# src/harness/providers/events.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
@dataclass(frozen=True)
class TextDelta:
text: str
kind: Literal["text_delta"] = "text_delta"
@dataclass(frozen=True)
class ReasoningDelta:
"""A fragment of model-internal reasoning (Chapter 3's `ReasoningBlock`).
Emitted by reasoning-enabled providers alongside TextDelta; the loop
accumulates them into `ProviderResponse.reasoning_text`.
"""
text: str
kind: Literal["reasoning_delta"] = "reasoning_delta"
@dataclass(frozen=True)
class ToolCallStart:
id: str
name: str
kind: Literal["tool_call_start"] = "tool_call_start"
@dataclass(frozen=True)
class ToolCallDelta:
id: str
args_fragment: str # partial JSON, accumulated by the loop
kind: Literal["tool_call_delta"] = "tool_call_delta"
@dataclass(frozen=True)
class Completed:
input_tokens: int
output_tokens: int
reasoning_tokens: int = 0
reasoning_metadata: dict = field(default_factory=dict)
kind: Literal["completed"] = "completed"
StreamEvent = TextDelta | ReasoningDelta | ToolCallStart | ToolCallDelta | Completed
Five event types cover everything we need. Text and reasoning arriving a chunk at a time, a tool call beginning, a tool call's arguments arriving piecewise (they can stream across multiple events), and a terminal event with final token counts plus any provider-specific reasoning metadata (OpenAI's encrypted reasoning items, Anthropic's signature — the vendor-opaque data needed for reasoning replay, see Chapter 3). Everything else providers emit — keepalives, internal state markers — the adapters discard.
We extend Provider with an async streaming method. Non-streaming complete() stays for batch tests and scripts; astream() is the production path.
# src/harness/providers/base.py (updated)
from __future__ import annotations
from dataclasses import dataclass, field
from typing import AsyncIterator, Protocol
from ..messages import Transcript
from .events import StreamEvent
@dataclass(frozen=True)
class ToolCallRef:
"""One tool invocation carried in a ProviderResponse.
Separate from `messages.ToolCall` because `ProviderResponse` is the
pre-transcript handoff shape; `ToolCall` is the in-transcript block
(with a `kind` discriminator). The loop constructs one in-transcript
ToolCall from each ToolCallRef when it commits the assistant message.
"""
id: str
name: str
args: dict
@dataclass(frozen=True)
class ProviderResponse:
text: str | None = None
tool_calls: tuple[ToolCallRef, ...] = ()
reasoning_text: str | None = None
reasoning_metadata: dict = field(default_factory=dict)
input_tokens: int = 0
output_tokens: int = 0
reasoning_tokens: int = 0
@property
def is_tool_call(self) -> bool:
return len(self.tool_calls) > 0
@property
def is_final(self) -> bool:
return self.text is not None and not self.tool_calls
# Back-compat shortcuts into tool_calls[0]. The book's earlier chapters
# talked about a single tool call per turn; those shortcuts keep that
# prose honest for the common single-call case. Migrate to iterating
# `tool_calls` for the batched case.
@property
def tool_call_id(self) -> str | None:
return self.tool_calls[0].id if self.tool_calls else None
@property
def tool_name(self) -> str | None:
return self.tool_calls[0].name if self.tool_calls else None
@property
def tool_args(self) -> dict | None:
return self.tool_calls[0].args if self.tool_calls else None
class Provider(Protocol):
name: str
def astream(
self, transcript: Transcript, tools: list[dict]
) -> AsyncIterator[StreamEvent]:
...
async def acomplete(
self, transcript: Transcript, tools: list[dict]
) -> ProviderResponse:
...
Two changes since Chapter 3's shape. tool_calls is now a tuple of ToolCallRef, not four singular fields — because a single provider response can carry multiple tool calls. OpenAI Responses and Anthropic Messages both batch them by default; many OpenAI-compatible local servers (notably Ollama serving small models like Gemma) batch them regardless of what you set parallel_tool_calls to. The singular shape silently dropped every call after the first, and the loop hit a dead end: either the next turn's provider invariant failed ("tool_use ids must have matching tool_result ids"), or the agent re-planned the dropped calls and the loop detector eventually tripped on the repeats. Either way, calls the model asked for disappeared without trace. Carrying the full list is correct; the shortcut properties (tool_name, tool_args, tool_call_id) give existing single-call code a migration path without forcing a giant rewrite.
What's otherwise new is Provider gaining astream() + acomplete() — the streaming half of the protocol.
One subtle point on the Protocol: astream is declared def, not async def, even though every implementation is written async def astream(...): yield .... The reason is how the type system classifies the two shapes. async def f() -> AsyncIterator[T] declares a coroutine that returns an iterator. def f() -> AsyncIterator[T] declares a callable whose return type is an iterator — which is exactly what an async generator function produces when called. Pyright rejects the first shape when you try to override it with an async generator; the second shape accepts it. If your editor complains Return type mismatch: base method returns CoroutineType[…, AsyncIterator[…]], this is the fix.
acomplete() can be implemented on top of astream() by accumulating events into a single ProviderResponse. That's what the base adapter class does — each concrete provider only has to implement astream().
# src/harness/providers/base.py (continued)
import json
from ..providers.events import (
Completed, ReasoningDelta, TextDelta, ToolCallDelta, ToolCallStart,
)
async def accumulate(stream: AsyncIterator[StreamEvent]) -> ProviderResponse:
"""Collect a stream into one ProviderResponse — handles batched tool calls."""
text_parts: list[str] = []
reasoning_parts: list[str] = []
# Keyed by tool id, values {"name": str, "args_buffer": str}. We also
# remember arrival order so batched calls come out in the order the
# provider emitted them, not dict-iteration order.
tool_entries: dict[str, dict] = {}
tool_ids_in_order: list[str] = []
last_opened_id: str | None = None
orphan_counter = 0
input_tokens = 0
output_tokens = 0
reasoning_tokens = 0
reasoning_metadata: dict = {}
async for event in stream:
match event:
case TextDelta(text=t):
text_parts.append(t)
case ReasoningDelta(text=t):
reasoning_parts.append(t)
case ToolCallStart(id=i, name=n):
entry_id = i or f"_orphan_{orphan_counter}"
if not i:
orphan_counter += 1
if entry_id not in tool_entries:
tool_entries[entry_id] = {"name": n, "args_buffer": ""}
tool_ids_in_order.append(entry_id)
else:
tool_entries[entry_id]["name"] = n
last_opened_id = entry_id
case ToolCallDelta(id=i, args_fragment=frag):
# Fragments reference their parent tool_id. Fall back to
# the last-opened id if omitted; synthesize an orphan if
# a fragment arrives before any Start — never drop data.
target_id = i or last_opened_id
if target_id is None:
target_id = f"_orphan_{orphan_counter}"
orphan_counter += 1
if target_id not in tool_entries:
tool_entries[target_id] = {"name": "", "args_buffer": ""}
tool_ids_in_order.append(target_id)
last_opened_id = target_id
tool_entries[target_id]["args_buffer"] += frag
case Completed(input_tokens=it, output_tokens=ot,
reasoning_tokens=rt, reasoning_metadata=rmeta):
input_tokens, output_tokens = it, ot
reasoning_tokens = rt
reasoning_metadata = dict(rmeta)
reasoning_text = "".join(reasoning_parts) if reasoning_parts else None
tool_calls: list[ToolCallRef] = []
for tid in tool_ids_in_order:
entry = tool_entries[tid]
try:
args = json.loads(entry["args_buffer"]) if entry["args_buffer"] else {}
except json.JSONDecodeError:
# Surface the raw buffer; the registry's validator will return a
# structured error to the model on the next turn.
args = {"_raw": entry["args_buffer"]}
tool_calls.append(ToolCallRef(id=tid, name=entry["name"], args=args))
if tool_calls:
return ProviderResponse(
tool_calls=tuple(tool_calls),
reasoning_text=reasoning_text,
reasoning_metadata=reasoning_metadata,
input_tokens=input_tokens, output_tokens=output_tokens,
reasoning_tokens=reasoning_tokens,
)
return ProviderResponse(
text="".join(text_parts),
reasoning_text=reasoning_text,
reasoning_metadata=reasoning_metadata,
input_tokens=input_tokens, output_tokens=output_tokens,
reasoning_tokens=reasoning_tokens,
)
Nothing gets dropped. Every ToolCallStart opens a new entry keyed by id; every ToolCallDelta appends to the entry with the matching id; the arrival-order list gives us a stable ordering to replay back to the loop. The orphan fallback is defensive — _orphan_0, _orphan_1, etc. — so a malformed stream where a fragment arrives before a start still shows up in tool_calls instead of vanishing. Whether the provider is the first-party Anthropic Messages API, OpenAI Responses, or a small local model on Ollama that batches aggressively, they all go through the same path; they just happen to populate one or more entries.
Each of Chapter 3's adapters gets an astream implementation. All three yield the same normalized StreamEvent flow the loop consumes; only the translation from the vendor's raw SDK events changes.
Batched tool calls are handled from the start. §5.3's accumulate collects every call the model emits in a turn as a ToolCallRef, and the loop dispatches them sequentially in arrival order (§5.5). Both providers default to parallel tool use — Anthropic's Messages API and OpenAI's Responses API both emit multiple tool_use / function_call items in a single assistant response when the model thinks several tools can run at once. We leave that default on. Letting the model batch is usually the right thing: reading three files or hitting three search endpoints in one turn instead of three consecutive turns saves real latency and real cost. Chapter 17 upgrades sequential dispatch to concurrent via LeaseManager for cases where the tools themselves can run in parallel; this chapter's sequential-over-a-batch design is already correct end-to-end.
Anthropic's streaming works through a context manager that yields raw events. We translate each to our internal shape.
# src/harness/providers/anthropic.py (updated)
from __future__ import annotations
from typing import Any, AsyncIterator
from ..messages import Transcript
from .events import (
Completed, ReasoningDelta, StreamEvent,
TextDelta, ToolCallDelta, ToolCallStart,
)
from .base import Provider, ProviderResponse, accumulate
class AnthropicProvider(Provider):
name = "anthropic"
def __init__(self, model: str = "claude-sonnet-4-6",
client: Any | None = None,
enable_thinking: bool = False,
thinking_budget_tokens: int = 2000,
max_tokens: int = 4096) -> None:
self.model = model
self.enable_thinking = enable_thinking
self.thinking_budget_tokens = thinking_budget_tokens
self.max_tokens = max_tokens
if client is None:
from anthropic import AsyncAnthropic # external SDK
client = AsyncAnthropic()
self._client = client
async def astream(
self, transcript: Transcript, tools: list[dict]
) -> AsyncIterator[StreamEvent]:
kwargs: dict = {
"model": self.model,
"max_tokens": self.max_tokens,
"messages": [_to_anthropic(m, self.enable_thinking)
for m in transcript.messages],
"tools": tools,
}
if transcript.system:
kwargs["system"] = transcript.system
if self.enable_thinking:
kwargs["thinking"] = {
"type": "enabled",
"budget_tokens": self.thinking_budget_tokens,
}
# Parallel tool use stays on (Anthropic's default). `accumulate`
# handles the batch; the loop dispatches each call sequentially.
current_tool_id: str | None = None
async with self._client.messages.stream(**kwargs) as stream:
async for raw in stream:
event = _translate(raw, current_tool_id)
if isinstance(event, ToolCallStart):
current_tool_id = event.id
if event is not None:
yield event
final = await stream.get_final_message()
yield Completed(
input_tokens=final.usage.input_tokens,
output_tokens=final.usage.output_tokens,
)
async def acomplete(self, transcript, tools):
return await accumulate(self.astream(transcript, tools))
def _translate(raw: Any, current_tool_id: str | None) -> StreamEvent | None:
t = raw.type
if t == "content_block_start" and raw.content_block.type == "tool_use":
return ToolCallStart(id=raw.content_block.id, name=raw.content_block.name)
if t == "content_block_delta":
d = raw.delta
if d.type == "text_delta":
return TextDelta(text=d.text)
if d.type == "thinking_delta":
return ReasoningDelta(text=d.thinking)
if d.type == "signature_delta":
return None # the signature lands on the final message, not a stream event
if d.type == "input_json_delta":
return ToolCallDelta(id=current_tool_id or "",
args_fragment=d.partial_json)
return None
# _to_anthropic unchanged from Chapter 3 (it already handles ReasoningBlock
# and the keep_reasoning flag for round-tripping thinking on/off).
A few additions since Chapter 3's version: the enable_thinking / thinking_budget_tokens constructor arguments, the conditional thinking={...} kwarg, and the thinking_delta → ReasoningDelta translation. The current_tool_id thread through _translate is how we emit ToolCallDelta events with the right call_id — input-json deltas don't carry the id themselves, so the loop remembers the most recent tool_use block's id.
What gets deleted. Chapter 3's _from_anthropic(raw) -> ProviderResponse and the sync complete() method are both gone. Their work is now split: _translate emits one StreamEvent per raw SDK event, accumulate (§5.3) folds the event stream into a single ProviderResponse at the end, and acomplete is a one-liner that chains the two. Keep _to_anthropic and _block_to_anthropic — those serialize outgoing messages to the wire, and Chapter 3 already taught them to handle ReasoningBlock with the keep_reasoning flag.
OpenAI's Responses API streams typed events (response.output_text.delta, response.function_call_arguments.delta, response.output_item.added, response.completed) rather than Anthropic's content_block_delta-with-nested-type shape. The mapping is mechanical but the event names are different.
# src/harness/providers/openai.py (updated)
from __future__ import annotations
import json
from typing import Any, AsyncIterator, Literal
from ..messages import Transcript
from .events import (
Completed, ReasoningDelta, StreamEvent,
TextDelta, ToolCallDelta, ToolCallStart,
)
from .base import Provider, ProviderResponse, accumulate
ReasoningEffort = Literal["minimal", "low", "medium", "high"]
class OpenAIProvider(Provider):
name = "openai"
def __init__(self, model: str = "gpt-5",
client: Any | None = None,
max_output_tokens: int = 4096,
reasoning_effort: ReasoningEffort | None = None) -> None:
self.model = model
self.max_output_tokens = max_output_tokens
self.reasoning_effort = reasoning_effort
if client is None:
from openai import AsyncOpenAI # external SDK
client = AsyncOpenAI()
self._client = client
async def astream(
self, transcript: Transcript, tools: list[dict]
) -> AsyncIterator[StreamEvent]:
kwargs: dict[str, Any] = {
"model": self.model,
"input": [i for m in transcript.messages for i in _to_responses_input(m)],
"max_output_tokens": self.max_output_tokens,
"stream": True,
}
if transcript.system:
kwargs["instructions"] = transcript.system
if tools:
kwargs["tools"] = [_tool_to_responses(t) for t in tools]
# Parallel tool calls stay on (matches Anthropic §5.4 above).
if self.reasoning_effort is not None:
kwargs["reasoning"] = {"effort": self.reasoning_effort}
kwargs["include"] = ["reasoning.encrypted_content"]
kwargs["store"] = False # we manage state locally; see §3.4
stream = await self._client.responses.create(**kwargs)
# item_id → call_id for function_call items (argument deltas reference
# the item_id but dispatch uses call_id).
call_ids_by_item: dict[str, str] = {}
# Reasoning items we capture for round-trip replay (Chapter 3 §3.4).
reasoning_item_meta: dict[str, dict] = {}
input_tokens = 0
output_tokens = 0
reasoning_tokens = 0
async for event in stream:
et = getattr(event, "type", None)
if et == "response.output_text.delta":
delta = getattr(event, "delta", "") or ""
if delta:
yield TextDelta(text=delta)
elif et == "response.reasoning_summary_text.delta":
delta = getattr(event, "delta", "") or ""
if delta:
yield ReasoningDelta(text=delta)
elif et == "response.output_item.added":
item = getattr(event, "item", None)
if item is None:
continue
item_type = getattr(item, "type", None)
if item_type == "function_call":
call_id = getattr(item, "call_id", None) or getattr(item, "id", "")
item_id = getattr(item, "id", "") or call_id
name = getattr(item, "name", "") or ""
if item_id:
call_ids_by_item[item_id] = call_id
yield ToolCallStart(id=call_id, name=name)
elif item_type == "reasoning":
rid = getattr(item, "id", "") or ""
if rid:
reasoning_item_meta.setdefault(rid, {"id": rid})
elif et == "response.function_call_arguments.delta":
delta = getattr(event, "delta", "") or ""
if delta:
item_id = getattr(event, "item_id", "") or ""
call_id = call_ids_by_item.get(item_id, item_id)
yield ToolCallDelta(id=call_id, args_fragment=delta)
elif et == "response.output_item.done":
# Reasoning items carry their `encrypted_content` here — we
# stash it so the next turn can replay the reasoning item.
item = getattr(event, "item", None)
if item is None or getattr(item, "type", None) != "reasoning":
continue
rid = getattr(item, "id", "") or ""
enc = getattr(item, "encrypted_content", None)
if rid:
entry = reasoning_item_meta.setdefault(rid, {"id": rid})
if enc:
entry["encrypted_content"] = enc
elif et == "response.completed":
response = getattr(event, "response", None)
usage = getattr(response, "usage", None) if response else None
if usage is not None:
input_tokens = getattr(usage, "input_tokens", 0) or 0
output_tokens = getattr(usage, "output_tokens", 0) or 0
details = getattr(usage, "output_tokens_details", None)
if details is not None:
reasoning_tokens = (
getattr(details, "reasoning_tokens", 0) or 0
)
yield Completed(
input_tokens=input_tokens,
output_tokens=output_tokens,
reasoning_tokens=reasoning_tokens,
reasoning_metadata=(
{"openai_items": list(reasoning_item_meta.values())}
if reasoning_item_meta else {}
),
)
async def acomplete(self, transcript, tools):
return await accumulate(self.astream(transcript, tools))
# _tool_to_responses and _to_responses_input unchanged from Chapter 3 §3.4.
Two shape differences from Anthropic worth naming. OpenAI's argument-delta events carry item_id, not the tool's call_id — we keep a small lookup table populated when the output_item.added event fires. And reasoning items emit their encrypted_content only on response.output_item.done; we buffer it there so the final Completed event can carry it out via reasoning_metadata. That's the same round-trip blob Chapter 3's _to_responses_input replays on the next turn, so the model sees its own chain-of-thought.
What gets deleted. Same story as Anthropic: Chapter 3's _from_responses(raw) -> ProviderResponse and the sync complete() method are gone, replaced by the event-emitting loop above plus accumulate. Keep _to_responses_input and _tool_to_responses — they serialize outgoing messages, and _to_responses_input is where reasoning items get replayed.
LocalProvider from Chapter 3 inherits all of OpenAIProvider — including the new astream. All it overrides is the client construction, pointing the OpenAI SDK at a local endpoint. No new code for streaming:
# src/harness/providers/local.py — unchanged from Chapter 3
class LocalProvider(OpenAIProvider):
name = "local"
def __init__(self, model: str = "llama-3.1-8b-instruct",
base_url: str = "http://localhost:8000/v1",
api_key: str = "not-needed",
client: Any | None = None,
max_output_tokens: int = 4096) -> None:
if client is None:
from openai import AsyncOpenAI # external SDK
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
super().__init__(model=model, client=client,
max_output_tokens=max_output_tokens)
This is the payoff of the adapter seam. vLLM, Ollama, LM Studio, llama.cpp's server — every OSS endpoint that speaks OpenAI-compatible Responses streams through this class unchanged, emitting the same StreamEvent flow our loop already knows how to consume. The three public providers in the book — Anthropic, OpenAI, local OSS — share one protocol, one event vocabulary, one loop. Chapter 22's multi-provider demo runs the same code against all three.
The loop from Chapter 4 becomes async. The logic is identical; the plumbing changes.
# src/harness/agent.py (updated)
from __future__ import annotations
import asyncio
from typing import Callable
from .messages import Message, Transcript, ToolCall
from .providers.base import Provider, ProviderResponse, accumulate
from .providers.events import StreamEvent, TextDelta
from .tools.registry import ToolRegistry
MAX_ITERATIONS = 20
async def arun(
provider: Provider,
registry: ToolRegistry,
user_message: str,
transcript: Transcript | None = None,
system: str | None = None,
on_event: Callable[[StreamEvent], None] | None = None,
on_tool_call: Callable[[ToolCall], None] | None = None,
on_tool_result: Callable[[ToolResult], None] | None = None,
) -> str:
# Two usage modes:
# 1. Task (no transcript passed) → we build a fresh one and it's
# garbage-collected when arun returns. Good for one-shot scripts.
# 2. Chat (transcript passed) → the caller owns it and keeps it
# around across multiple arun calls. User messages and assistant
# replies accumulate; the next call continues the conversation.
if transcript is None:
transcript = Transcript(system=system)
transcript.append(Message.user_text(user_message))
for _ in range(MAX_ITERATIONS):
text_buffer: list[str] = []
async def consume() -> ProviderResponse:
nonlocal text_buffer
events = provider.astream(transcript, registry.schemas())
async def forward():
async for e in events:
if on_event is not None:
on_event(e)
if isinstance(e, TextDelta):
text_buffer.append(e.text)
yield e
return await accumulate(forward())
response = await consume()
if response.is_final:
# `from_assistant_response` preserves any ReasoningBlock the
# provider emitted alongside the final text (Chapter 3 §3.2).
transcript.append(Message.from_assistant_response(response))
return response.text or ""
# Commit ONE assistant message carrying all N ToolCall blocks.
# Both Anthropic and OpenAI happily accept multi-tool_use / multi-
# function_call assistant messages on round-trip; Chapter 3's
# `from_assistant_response` just emits one block per ref.
transcript.append(Message.from_assistant_response(response))
# Dispatch each call sequentially, in arrival order.
for ref in response.tool_calls:
call = ToolCall(id=ref.id, name=ref.name, args=dict(ref.args))
if on_tool_call is not None:
on_tool_call(call)
result = registry.dispatch(call.name, call.args, call.id)
transcript.append(Message.tool_result(result))
if on_tool_result is not None:
on_tool_result(result)
raise RuntimeError(f"agent did not finish in {MAX_ITERATIONS} iterations")
def run(*args, **kwargs) -> str:
"""Sync wrapper for scripts and tests."""
return asyncio.run(arun(*args, **kwargs))
One assistant message, N tool-result messages — one per call. That matches Chapter 3's convention (tool results live in user-role messages, one block per message) and it matches both providers' round-trip shapes. The loop still runs sequentially; if you want parallel dispatch with shared-state safety, Chapter 17's LeaseManager is the primitive.
The on_event callback is how CLIs plug in to render the stream. A three-line handler gives you a streaming terminal:
def print_deltas(event):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
The user sees tokens as they arrive. If you want a spinner, a progress bar, a WebSocket push — same callback, different implementation.
With transcript in the signature, chat continuity is three lines. One Transcript outside the loop, one arun(...) call per user prompt with transcript=transcript passed through, and every subsequent prompt sees every prior user message, assistant reply, and tool result. A minimal demo:
# examples/ch05_multi_turn.py
"""Three prompts in sequence, one shared Transcript, visible tool calls.
The model on prompt 2 knows what you said in prompt 1. The model on prompt 3
can still see both. No REPL, no signal handling — just continuity.
Every dispatched tool call and its result is printed inline, so you can
watch the agent actually work instead of seeing silence during tool turns.
"""
import asyncio
import json
from harness.agent import arun
from harness.messages import Transcript, ToolCall, ToolResult
from harness.providers.anthropic import AnthropicProvider
from harness.providers.events import TextDelta
from harness.tools.registry import ToolRegistry
from harness.tools.std import calc, bash
async def main() -> None:
provider = AnthropicProvider()
registry = ToolRegistry(tools=[calc, bash])
def on_event(event):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
def on_tool_call(call: ToolCall) -> None:
args = json.dumps(call.args, ensure_ascii=False)
# 2-space indent so the call nests visually under the assistant text.
print(f"\n ⚙ {call.name}({args})", flush=True)
def on_tool_result(result: ToolResult) -> None:
marker = "✗" if result.is_error else "→"
preview = result.content.strip().replace("\n", " ⏎ ")
if len(preview) > 120:
preview = preview[:117] + "..."
print(f" {marker} {preview}\n", flush=True)
# One transcript, reused across every turn. This is the whole feature.
transcript = Transcript(system="You are a helpful, concise assistant.")
prompts = [
"My favourite number is 42. Remember it.",
"What's my favourite number times seven? Use the calculator.",
"Now divide the number I first mentioned by two.",
]
for prompt in prompts:
print(f"\n\nUser: {prompt}\nAssistant: ", end="", flush=True)
await arun(
provider, registry, prompt,
transcript=transcript,
on_event=on_event,
on_tool_call=on_tool_call,
on_tool_result=on_tool_result,
)
print(f"\n\n[session ended — {len(transcript.messages)} messages in transcript]")
asyncio.run(main())
Expected output for prompt 2 (42 × 7) looks roughly like:
Assistant: I'll calculate that for you.
⚙ calc({"expression": "42 * 7"})
→ 294
Assistant: 42 × 7 is **294**.
Three callbacks cover the whole visible surface of a turn. on_event renders the streaming text character-by-character. on_tool_call announces the dispatch, showing the tool name and parsed arguments — no JSON-delta fragments, no partial state. on_tool_result prints a short preview of whatever the tool returned, with ✗ for errors and → for success. Together they give you the "agent is working, here's what it's doing" feel Claude Code and aider ship with; production harnesses build richer versions of the same three hooks (colours, spinners, collapsible sections) without changing the contract.
Run it. The model answers 294 on prompt 2 because it saw prompt 1; and 21 on prompt 3 because both prior prompts are still in the transcript. If you comment out transcript=transcript and let arun build a fresh transcript each call, the model fails both follow-ups — it has no idea what "my favourite number" refers to.
This is the whole primitive. The §5.6.1 REPL is the same idea with signal handling and Ctrl-C semantics bolted on. Chapter 21 swaps the in-memory transcript for a SQLite-backed one so the session survives process restarts.
When a user hits Ctrl-C, two things have to happen. The partial assistant message must be captured in the transcript — so when they resume (or so we can debug), we know what the model had started saying. And any in-flight provider connection must close cleanly, so we don't leak sockets.
The canonical async pattern:
# examples/ch05_interruptible.py
import asyncio
import signal
from harness.agent import arun
from harness.providers.anthropic import AnthropicProvider
from harness.providers.events import TextDelta
from harness.tools.registry import ToolRegistry
from harness.tools.std import calc, bash
async def main() -> None:
provider = AnthropicProvider()
registry = ToolRegistry(tools=[calc, bash])
def on_event(event):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
task = asyncio.create_task(
arun(provider, registry,
"Tell me a long story, with three chapters.",
on_event=on_event)
)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, task.cancel)
try:
answer = await task
print("\n---\n", answer)
except asyncio.CancelledError:
print("\n[interrupted]")
asyncio.run(main())
The task.cancel() call raises asyncio.CancelledError inside the coroutine. The streaming context manager in the Anthropic adapter closes the HTTP connection cleanly on exit. The transcript's system prompt and any completed tool results remain intact; only the currently-streaming assistant message is lost.
Note what this example is. It's a one-shot script — one arun call, then main() returns. Ctrl-C cancels the arun, main() catches the CancelledError, prints [interrupted], and exits. The program "closes completely" because there's nothing else for it to do. Interactive CLIs (Claude Code, aider, Cursor's terminal agents) use a different shape: they wrap arun in a REPL, and Ctrl-C interrupts the current turn but lands you back at a prompt. We come to that in §5.6.1. The single-shot cancellation mechanism below is the primitive; the REPL is a layer on top.
For a real harness we'd want to capture the partial text before cancellation unwinds it. We do that by extracting one turn of the loop into a helper that pushes into a caller-provided partial_text list as tokens arrive:
# src/harness/agent.py (continued)
async def _one_turn(
provider: Provider,
registry: ToolRegistry,
transcript: Transcript,
partial_text: list[str],
on_event: Callable[[StreamEvent], None] | None,
) -> ProviderResponse:
"""Run one provider turn; push text deltas into `partial_text` as we go.
On CancelledError, whatever was accumulated so far is still in
`partial_text` — the caller can flush it into the transcript.
"""
stream = provider.astream(transcript, registry.schemas())
async def forward():
async for event in stream:
if on_event is not None:
on_event(event)
if isinstance(event, TextDelta):
partial_text.append(event.text)
yield event
return await accumulate(forward())
Now the loop can wrap each turn in a try/except and save the partial text if the user hits Ctrl-C mid-stream:
# src/harness/agent.py (interrupt-safe loop)
async def arun(
provider: Provider,
registry: ToolRegistry,
user_message: str,
transcript: Transcript | None = None,
system: str | None = None,
on_event: Callable[[StreamEvent], None] | None = None,
on_tool_call: Callable[[ToolCall], None] | None = None,
on_tool_result: Callable[[ToolResult], None] | None = None,
) -> str:
if transcript is None:
transcript = Transcript(system=system)
transcript.append(Message.user_text(user_message))
for _ in range(MAX_ITERATIONS):
partial_text: list[str] = []
try:
response = await _one_turn(
provider, registry, transcript, partial_text, on_event,
)
except asyncio.CancelledError:
if partial_text:
transcript.append(Message.assistant_text(
"".join(partial_text) + " [interrupted]"
))
raise
if response.is_final:
transcript.append(Message.from_assistant_response(response))
return response.text or ""
# Tool calls: commit the assistant turn (one message, N ToolCall
# blocks), then dispatch each call in arrival order. One tool_result
# message per call, matching Chapter 3's convention.
transcript.append(Message.from_assistant_response(response))
for ref in response.tool_calls:
call = ToolCall(id=ref.id, name=ref.name, args=dict(ref.args))
if on_tool_call is not None:
on_tool_call(call)
result = registry.dispatch(call.name, call.args, call.id)
transcript.append(Message.tool_result(result))
if on_tool_result is not None:
on_tool_result(result)
raise RuntimeError(f"agent did not finish in {MAX_ITERATIONS} iterations")
_one_turn is where the streaming-to-ProviderResponse work happens from §5.5; pulling it out of the inner loop lets us wrap one turn in a try/except asyncio.CancelledError without nesting the whole event machinery inside the try. On cancellation, partial_text still holds every delta that arrived before the cancel — we flush it into the transcript with an [interrupted] marker and re-raise, so the caller knows we stopped deliberately. Every chapter from here on assumes arun is the interrupt-safe version with this _one_turn helper.
Chapter 21 makes this durable — the interrupted transcript goes to a checkpointer so the next process can resume. For now, the interrupt is clean in memory.
The one-shot example above cancels and exits. In an interactive CLI you want something different: Ctrl-C stops the model mid-stream and hands control back to the prompt so the user can refine or retry; Ctrl-C a second time within a short window actually exits. Claude Code, aider, and Cursor's terminal agents all do this. They also do one more thing the one-shot pattern can't: each prompt continues the conversation. The model sees what was said three prompts ago.
The REPL gets chat continuity for free now that arun accepts an optional transcript. The trick is to build one Transcript up front, outside the loop, and hand it to every arun call:
# examples/ch05_repl.py
import asyncio
import signal
import time
from harness.agent import arun
from harness.messages import Transcript
from harness.providers.anthropic import AnthropicProvider
from harness.providers.events import TextDelta
from harness.tools.registry import ToolRegistry
from harness.tools.std import calc, bash
async def main() -> None:
provider = AnthropicProvider()
registry = ToolRegistry(tools=[calc, bash])
# One transcript for the whole session — this is what gives the REPL
# chat continuity. Every arun call appends to it; the next call starts
# from the grown transcript, so the model sees prior turns.
transcript = Transcript(system="You are a helpful assistant.")
def on_event(event):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
loop = asyncio.get_running_loop()
last_sigint = 0.0 # timestamp of the previous Ctrl-C, if any
while True:
# Block on stdin in a worker thread so Ctrl-C isn't swallowed by input().
try:
user_input = await asyncio.to_thread(input, "\n> ")
except EOFError:
print()
return
if not user_input.strip():
continue
task = asyncio.create_task(
arun(provider, registry, user_input,
transcript=transcript, on_event=on_event)
)
def on_sigint() -> None:
nonlocal last_sigint
now = time.monotonic()
if now - last_sigint < 1.5 and task.done():
# Second Ctrl-C within 1.5s while no task is running → quit.
loop.stop()
else:
last_sigint = now
if not task.done():
task.cancel()
loop.add_signal_handler(signal.SIGINT, on_sigint)
try:
answer = await task
# `arun` returns a bare str at this point in the book — Chapter 15
# promotes it to AgentRunResult so you can also read token and
# iteration counts off the return value. For now, just the text.
print() # newline after the streamed output
except asyncio.CancelledError:
print("\n[interrupted — Ctrl-C again within 1.5s to quit]")
asyncio.run(main())
Four moments to notice. The transcript = Transcript(system=…) outside the REPL loop is what gives chat continuity — say "remember the number 42" on turn 1 and ask "what number did I give you?" on turn 2; the model will answer correctly because both prompts are in the same transcript. The await asyncio.to_thread(input, ...) keeps the SIGINT handler responsive — plain input() blocks the event loop and Ctrl-C produces KeyboardInterrupt on the main thread, which bypasses the signal handler entirely. The two-strike Ctrl-C logic distinguishes "stop the current turn" (task is running) from "quit" (idle at the prompt, second Ctrl-C within 1.5 seconds). The except asyncio.CancelledError catches cancellation at the REPL level and keeps the outer while True: loop alive — the partial assistant text from _one_turn is already saved inside the shared transcript with the [interrupted] marker, and the next user prompt continues the session from that mid-sentence state.
A consequence: the transcript grows monotonically across prompts. Ten chatty turns, each with a two-tool investigation, can easily push past 30K tokens. That's not a problem yet, but by turn 40 you'll feel it — response latency climbs, input-token cost per turn balloons, and the model starts "losing" earlier context (the lost-in-the-middle effect Chapter 10 covers). This is exactly what Chapter 7's accountant measures and Chapter 8's compactor fixes. If you'd asked me five chapters ago what makes context engineering actually matter, this is the answer: a chat-continuity REPL that you use for ten minutes, and suddenly the problem is real.
This is the pattern any agent CLI you build will grow into. The primitive (cancellable arun with an optional shared transcript) is what §5.5 and §5.6 established; the REPL layer is what ships it to users. Chapter 21 turns the in-memory transcript into a SQLite-backed one you can resume across processes.
Providers fail. 429 because you hit a rate limit. 502 because the edge load balancer hiccuped. 503 because an autoscaling event is mid-flight. 500 because someone deployed. Connection reset because of anything. A harness that dies on the first 503 is not a harness — and the patterns for surviving that kind of failure well are well-established engineering wisdom, not LLM-specific novelty. Michael Nygard's "Release It!" (2018, 2nd edition) is the canonical book-length treatment of what's built in this section and the next — transient-error classification, bounded retry, circuit breakers, bulkheads, and cross-service fallback are all there, framed as "stability patterns" earned from a decade of production post-mortems.
The retry policy we want has four properties.
Retryable vs non-retryable is a decision, not a guess. 429, 500, 502, 503, 504, connection timeout: retry. 400, 401, 403, 404: do not retry. A malformed request is not going to fix itself.
Exponential backoff with jitter. wait = min(max, base * 2**attempt + uniform(0, base)). The jitter term matters more than it looks. Marc Brooker's 2015 AWS Architecture Blog post "Exponential Backoff And Jitter" is the canonical reference here: without jitter, a provider outage produces correlated failures across every client, every client retries at the same moment after their identical backoff window elapses, and the provider takes a second synchronized hit right as it's coming back up — a classic thundering-herd amplification. Jitter desynchronizes the retries, spreading load across the recovery window.
Bounded total retries and wall time. Per-call retries, per-session retry budget, and a wall-clock cap. "Retry forever" is a cost-runaway pattern; the $47K agent loop (DEV Community, 2025) is one form of it, and Chapter 20's budget enforcer builds the higher-level cost cap that sits above even this retry budget.
Respect Retry-After. When a provider tells you when to come back, come back then. Anthropic and OpenAI both send retry-after headers on 429s; ignoring them in favor of your own exponential backoff is how you get throttled harder, not less.
# src/harness/providers/retry.py
from __future__ import annotations
import asyncio
import random
from dataclasses import dataclass
from typing import Any, Awaitable, Callable
class RetryBudgetExceeded(Exception):
pass
@dataclass
class RetryPolicy:
max_attempts: int = 5
base_delay: float = 1.0
max_delay: float = 30.0
max_total_seconds: float = 120.0
retryable_statuses: frozenset[int] = frozenset({429, 500, 502, 503, 504})
async def run(self, fn: Callable[[], Awaitable[Any]]) -> Any:
start = asyncio.get_event_loop().time()
last_exception: Exception | None = None
for attempt in range(self.max_attempts):
try:
return await fn()
except Exception as e:
last_exception = e
if not self._retryable(e):
raise
elapsed = asyncio.get_event_loop().time() - start
if elapsed >= self.max_total_seconds:
raise RetryBudgetExceeded(
f"retry budget ({self.max_total_seconds}s) exceeded"
) from e
delay = self._delay(attempt, e)
await asyncio.sleep(delay)
raise RetryBudgetExceeded(
f"exhausted {self.max_attempts} attempts"
) from last_exception
def _retryable(self, e: Exception) -> bool:
status = getattr(e, "status_code", None)
if status is None:
# treat connection-level failures as retryable
return isinstance(e, (ConnectionError, TimeoutError,
asyncio.TimeoutError))
return status in self.retryable_statuses
def _delay(self, attempt: int, error: Exception) -> float:
retry_after = getattr(error, "retry_after", None)
if retry_after is not None:
return float(retry_after)
jitter = random.uniform(0, self.base_delay)
return min(self.base_delay * (2 ** attempt) + jitter, self.max_delay)
Wire it into the provider call site:
# in AnthropicProvider.astream (sketch)
async def astream(self, transcript, tools):
retry = RetryPolicy()
# we can retry the stream *start*, but once streaming has begun and
# tokens have reached the caller, retry is semantically wrong — we'd
# produce duplicate output. So retry wraps the initial open.
async def open_stream():
return self._client.messages.stream(
model=self.model,
max_tokens=4096,
messages=[_to_anthropic(m) for m in transcript.messages],
tools=tools,
system=transcript.system,
)
stream_cm = await retry.run(open_stream)
async with stream_cm as stream:
# ... yield events as before
The subtlety to notice: retrying mid-stream is the wrong semantics. If we've already yielded three tokens to the caller, retrying from scratch would produce three duplicated tokens. Retry wraps the opening of the stream; once the stream is open, errors mid-stream bubble up. Chapter 21 addresses mid-stream failure with checkpointing — the agent resumes from the last complete turn, not from the middle of a partial one.
When the primary provider is down for long enough, we want to fall back to a second one. The pattern is a provider that wraps two providers:
# src/harness/providers/fallback.py
from __future__ import annotations
from typing import AsyncIterator
from ..messages import Transcript
from .base import Provider, ProviderResponse
from .events import StreamEvent
from .retry import RetryBudgetExceeded
class FallbackProvider:
name = "fallback"
def __init__(self, primary: Provider, secondary: Provider) -> None:
self.primary = primary
self.secondary = secondary
async def astream(
self, transcript: Transcript, tools: list[dict]
) -> AsyncIterator[StreamEvent]:
try:
async for event in self.primary.astream(transcript, tools):
yield event
return
except RetryBudgetExceeded:
pass # fall through to secondary
async for event in self.secondary.astream(transcript, tools):
yield event
async def acomplete(self, transcript, tools):
from .base import accumulate
return await accumulate(self.astream(transcript, tools))
Composition over inheritance. The FallbackProvider is itself a Provider; the loop doesn't know it's compound. If you want three-way fallback, wrap two FallbackProviders.
A caveat worth naming: tool-call argument shapes differ subtly across providers. A tool call that works against Anthropic may produce slightly different JSON when routed through OpenAI. Chapter 22 tests this explicitly; for now, the fallback is correct for text responses and shaped-similarly enough for tool calls to survive most real-world failover events.
git add -A && git commit -m "ch05: streaming, interruption, retry, fallback"
git tag ch05-streaming
[interrupted] marker). If it doesn't, find the missing wiring.ChaosProvider that wraps another provider and fails randomly with 500s on 20% of requests. Run the calculator example through it. Does the retry policy recover? What happens when you set failure rate to 100%?RetryPolicy._delay. Run fifty parallel agents against a mock provider that returns a 429 on the first request. Measure the wall-clock time until all fifty complete. Re-add jitter and re-run. Observe the difference. Write down what you saw; Chapter 20 will use it.Chapter 4. The Tool Protocol
Previously: typed messages, typed transcripts, three provider adapters. The loop no longer crashes on unknown tools, but its fix is ad hoc — a try/except in the dispatch. We owe ourselves a proper tool abstraction.
Chapter 6. Safe Tool Execution
Previously: streaming, interruption, retries. The loop survives network failures and closes cleanly on Ctrl-C. But a misnamed argument still fails inside the tool function, and the registry still can't tell when the model is spinning.