Previously: observability — every operation in the harness emits a structured span, per-agent cost attribution works, dashboards show drift. Observability says what happened; it doesn't say whether what happened was right.
The difference matters. A zero-error run can produce a wrong answer. An agent that "succeeds" by the harness's internal definition can fail the user's actual need. Hamel Husain's working point, widely cited among practitioners, is worth stating again: agent complexity is only justified when you can define precise task-success criteria and build evaluations that measure them. Without evals, agent complexity is debt. On the research side, Liu et al.'s 2023 "AgentBench: Evaluating LLMs as Agents" made a parallel point by example — it proposed evaluating agents across eight distinct environments (operating systems, databases, knowledge graphs, web tasks) specifically because no single-task benchmark was capturing what real agent deployments required, and the substantial cross-environment variance their data showed is one reason you can't rely on a model's headline number when deciding whether it's right for your workload.
This chapter builds a minimal eval harness. Three pieces by the end:
Chapter 22 runs the full harness against three providers using this machinery. For now, we build the machinery.
Agent evals operate at the trajectory level, not the turn level. A single turn can look great in isolation and be wrong in context; a single turn can look ugly and be part of a correct recovery. The unit of evaluation is the full task from prompt to final output.
Four metric classes worth tracking:
Completion. Did the agent finish? This is the coarsest signal: True if it returned an answer; False if it crashed, hit MAX_ITERATIONS, or exceeded a budget.
Correctness. Is the answer right? This needs task-specific logic. For a "read file and return its size" task, we can check. For "summarize this article" we can't, trivially — we need either LLM-as-judge or a human.
Process validity. Did the agent do the right work on the way to the answer? Did it call the right tools in a reasonable order? Did it compact when expected? Did it use the plan structure? These are trajectory-level structural checks.
Cost. How many tokens did it take? How many turns? A correct answer produced with 50K tokens is a worse answer than the same correctness at 5K.
Different task types weight these differently. Debugging tasks care hugely about process validity. Question-answering tasks care about correctness and cost. Long-horizon tasks care about completion and cost. Your eval suite should reflect your workload.
# src/harness/evals/case.py
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class EvalCase:
"""A single golden trajectory test."""
id: str
description: str
user_message: str
system: str | None = None
# Optional: a list of tool names the agent must call (in any order).
# Any tool listed here must appear at least once in the run's spans.
required_tools: list[str] = field(default_factory=list)
# Optional: a list of tool names the agent must NOT call.
forbidden_tools: list[str] = field(default_factory=list)
# Optional: a callable that takes the final answer string and returns
# True/False for correctness. For tasks with deterministic answers.
check_answer: Callable[[str], bool] | None = None
# Optional: ceiling on total tokens. Pass if under.
max_tokens: int | None = None
# Optional: ceiling on iterations.
max_iterations: int | None = None
@dataclass
class EvalResult:
case_id: str
passed: bool
failures: list[str]
final_answer: str
tokens_used: int
iterations_used: int
duration_seconds: float
A deliberately simple shape. Real eval frameworks (Braintrust, LangSmith) have richer structures — scorer functions, dataset versioning, experiment tracking. We deliberately don't replicate those; the interface leaves room to integrate with them, and the book's goal is to establish the minimum honest eval harness.
# src/harness/evals/runner.py
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from ..agent import arun
from ..providers.base import Provider
from ..tools.selector import ToolCatalog
from .case import EvalCase, EvalResult
@dataclass
class EvalRunner:
provider: Provider
catalog: ToolCatalog
async def run_one(self, case: EvalCase) -> EvalResult:
start = time.time()
tool_calls_observed: list[str] = []
# Wrap the catalog in a recording proxy that appends each
# dispatched tool name to tool_calls_observed. A ToolCatalog
# with observed-dispatch is the simplest in-harness way to
# see what the model actually called.
recording_catalog = _RecordingCatalog(self.catalog, tool_calls_observed)
try:
result = await arun(
provider=self.provider,
catalog=recording_catalog,
system=case.system,
user_message=case.user_message,
)
except Exception as e:
return EvalResult(
case_id=case.id, passed=False,
failures=[f"crashed: {type(e).__name__}: {e}"],
final_answer="", tokens_used=0, iterations_used=0,
duration_seconds=time.time() - start,
)
failures: list[str] = []
if case.check_answer is not None and not case.check_answer(result.summary):
failures.append("answer check failed")
for required in case.required_tools:
if required not in tool_calls_observed:
failures.append(f"required tool not called: {required}")
for forbidden in case.forbidden_tools:
if forbidden in tool_calls_observed:
failures.append(f"forbidden tool called: {forbidden}")
if case.max_tokens is not None and result.tokens_used > case.max_tokens:
failures.append(f"tokens_used {result.tokens_used} > {case.max_tokens}")
if case.max_iterations is not None and result.iterations_used > case.max_iterations:
failures.append(f"iterations_used {result.iterations_used} > {case.max_iterations}")
return EvalResult(
case_id=case.id,
passed=len(failures) == 0,
failures=failures,
final_answer=result.summary,
tokens_used=result.tokens_used,
iterations_used=result.iterations_used,
duration_seconds=time.time() - start,
)
async def run_all(self, cases: list[EvalCase]) -> list[EvalResult]:
results: list[EvalResult] = []
for case in cases:
result = await self.run_one(case)
print(f"{'✓' if result.passed else '✗'} {case.id}: "
f"{case.description} "
f"[{result.tokens_used} tok, {result.duration_seconds:.1f}s]"
+ (f" — {', '.join(result.failures)}" if result.failures else ""))
results.append(result)
return results
class _RecordingCatalog:
"""A ToolCatalog wrapper that records every tool name dispatched.
The catalog interface is `select(query, k, must_include)` and `get(name)`.
Wrapping `select`'s returned tools is the clean interception point: each
returned Tool gets its `arun`/`run` wrapped to record the name before
delegating.
"""
def __init__(self, inner, observed: list[str]) -> None:
self._inner = inner
self._observed = observed
def select(self, query, k=7, must_include=None):
from ..tools.base import Tool
tools = self._inner.select(query, k=k, must_include=must_include)
return [self._wrap(t) for t in tools]
def _wrap(self, tool):
from ..tools.base import Tool
observed = self._observed
if tool.arun is not None:
original_arun = tool.arun
async def arun(**kwargs):
observed.append(tool.name)
return await original_arun(**kwargs)
return Tool(
name=tool.name, description=tool.description,
input_schema=tool.input_schema,
arun=arun, side_effects=tool.side_effects,
)
else:
original_run = tool.run
def run(**kwargs):
observed.append(tool.name)
return original_run(**kwargs)
return Tool(
name=tool.name, description=tool.description,
input_schema=tool.input_schema,
run=run, side_effects=tool.side_effects,
)
The runner is sequential. For a small suite (20–50 cases), that's fine. For larger suites, parallelize by running independent cases in separate async tasks, rate-limited to avoid overwhelming the provider. The interface doesn't need to change — run_all can asyncio.gather instead of looping.
For brevity the recording wrapper only proxies select(). If you use the discovery tool from §12.5 (which reads catalog.tools directly) or otherwise access catalog.get(name) / catalog.all_names() from anywhere in your harness, proxy those through self._inner too — each is a one-liner, and the companion repo's _RecordingCatalog does exactly that. Without them, the recording catalog works for §19.4's cases but will AttributeError the moment you drop a real catalog with helpers wired in.
Tokens and tool-call observation in the sketch are handwaves. A production eval runner pulls these from OTel spans directly — the tracing we built in Chapter 18 is the right substrate. A small span-reader that listens to a ConsoleSpanProcessor-like collector and reports per-run metrics is ~50 lines, which the companion repo includes but the book omits for focus.
# tests/evals/cases.py
from harness.evals.case import EvalCase
CASES = [
EvalCase(
id="arithmetic-simple",
description="2+2 with calculator",
user_message="What is 2 + 2?",
required_tools=["calc"],
check_answer=lambda ans: "4" in ans,
max_tokens=5_000,
),
EvalCase(
id="file-viewport",
description="Reads a known file via viewport, not full read",
user_message="What is the first line of /etc/hostname?",
required_tools=["read_file_viewport"],
forbidden_tools=["read_file"], # old unbounded read
check_answer=lambda ans: len(ans) > 0,
max_tokens=8_000,
),
EvalCase(
id="long-session-compaction",
description="Task that triggers compaction; verifies survival",
user_message=(
"Read /proc/cpuinfo, /proc/meminfo, /proc/version, "
"/etc/os-release, and /etc/hostname. Summarize the system in "
"three bullet points."
),
required_tools=["read_file_viewport"],
max_tokens=50_000,
max_iterations=15,
),
EvalCase(
id="premature-finalization-trap",
description="Agent must process all 5 items; shortcut is possible",
user_message=(
"For each number in [1, 2, 3, 4, 5], compute its square "
"using the calculator. Then report all five squares in a list."
),
required_tools=["calc"],
check_answer=lambda ans: all(s in ans for s in ["1", "4", "9", "16", "25"]),
),
EvalCase(
id="plan-required",
description="Task complex enough that a plan should be created",
user_message=(
"Investigate and report: (1) the user running this, (2) the "
"working directory, (3) three most-recent files in it. "
"Structure your answer as a three-point summary."
),
required_tools=["bash", "plan_create", "plan_show"],
),
]
Run them:
# examples/ch19_evals.py
import asyncio
from harness.evals.runner import EvalRunner
from harness.providers.anthropic import AnthropicProvider
from harness.tools.selector import ToolCatalog
from harness.tools.std import STANDARD_TOOLS
from tests.evals.cases import CASES
async def main() -> None:
runner = EvalRunner(
provider=AnthropicProvider(),
catalog=ToolCatalog(tools=STANDARD_TOOLS),
)
results = await runner.run_all(CASES)
passed = sum(1 for r in results if r.passed)
print(f"\n{passed}/{len(results)} passed")
asyncio.run(main())
You now have a regression suite. Run it before any model upgrade, any prompt change, any harness refactor. The output — pass counts, failure reasons — is the signal the POSIX prompt-sensitivity paper (arXiv 2410.02185, 2024) and Promptfoo's 2025 "Your model upgrade just broke your agent's safety" call for: before you ship a model version upgrade, you run this and verify nothing regresses.
For tasks where check_answer is subjective — "summarize this article" — we can use another LLM as the judge. A well-designed judge prompt and a more powerful model than the one being tested:
# src/harness/evals/llm_judge.py
async def judge(
judge_provider: Provider,
question: str,
candidate_answer: str,
reference_answer: str | None = None,
criteria: str = "accuracy, completeness, relevance",
) -> bool:
from ..messages import Message, Transcript
transcript = Transcript(system=(
"You are a strict evaluator. Given a question and a candidate answer, "
"judge whether the answer is correct by the criteria provided. "
"Reply with only 'PASS' or 'FAIL' followed by a one-sentence reason."
))
user = (f"Question: {question}\n\n"
f"Candidate answer: {candidate_answer}\n\n")
if reference_answer:
user += f"Reference answer for comparison: {reference_answer}\n\n"
user += f"Criteria: {criteria}"
transcript.append(Message.user_text(user))
response = await judge_provider.acomplete(transcript, tools=[])
text = response.text or ""
return text.strip().upper().startswith("PASS")
Two caveats worth knowing.
Judge bias. Using Claude to judge Claude's output correlates judge and candidate errors. If they share the same blind spot, the judge misses the failure. Best practice: use a different provider for the judge than for the candidate — Claude judging GPT, or vice versa.
Judge ceiling. An LLM judge can't reliably exceed its own capability ceiling on the underlying task. A judge weaker than the candidate on a hard task will mis-score confidently.
For the book's scenarios, deterministic check_answer functions cover most cases. LLM-as-judge is a tool in the kit; don't reach for it when a function would do.
The observability work from Chapter 18 gives us structured trace data. A production run that fails — crashed, timed out, produced a clearly-wrong output — is a potential eval case. A small script turns a failing trace into an EvalCase:
# src/harness/evals/from_trace.py
from .case import EvalCase
def case_from_trace(trace_summary: dict) -> EvalCase:
"""Convert a production trace into a regression eval case.
trace_summary: a dict extracted from your tracing backend. Typical
fields: user_message, system, final_answer, failure_reason.
"""
return EvalCase(
id=f"prod-regression-{trace_summary['trace_id'][:8]}",
description=f"regression from production: "
f"{trace_summary.get('failure_reason', 'unknown')}",
user_message=trace_summary["user_message"],
system=trace_summary.get("system"),
max_tokens=int(trace_summary.get("tokens_used", 0) * 1.5),
# The check is often just "doesn't repeat the same failure."
# More sophisticated: check the specific known-bad behavior.
)
The workflow: monitoring flags a failed trace, an engineer reviews it, confirms it's a regression to prevent, runs case_from_trace, reviews the generated case, tweaks it, commits it to the suite. Next CI run, the case runs; a future regression of the same issue fails CI before shipping.
This is how eval suites grow organically. Every real failure in production leaves a fossil in the suite. Over time, the suite encodes the specific failure modes your system has seen — the ones most likely to recur.
A parting distinction worth naming. Unit tests verify deterministic code. Evals verify probabilistic systems. The differences:
main, or nightly.Don't run evals on every commit — the cost and flakiness aren't worth it. Do run them as a merge gate and before any model upgrade. Treat a regression in the eval suite the same way you'd treat a regression in tests: a release blocker that requires root-causing.
git add -A && git commit -m "ch19: minimal eval harness with regression cases"
git tag ch19-evals
EvalCases with required_tools and check_answer. Run them. How many pass? For the failures, is the right fix in the harness or in the case?check_answer; replace it with an LLM judge. Does the judgment match? Where does it disagree? Judge-vs-function disagreements are informative.Chapter 18. Observability
Previously: parallel sub-agents, leases, grounded verification. The harness is capable but opaque. A failed run tells you the final error but nothing about which sub-agent burned tokens, which tool call took 12 seconds, which compaction event dropped what the final agent wanted.
Chapter 20. Cost Control
Previously: evals measure correctness. Nothing in the harness caps spend. The $47K agent-loop incident (DEV Community, Nov 2025) was two agents ping-ponging requests for eleven days; alerts fired, no one stopped them. Alerts are not enforcement.