Skip to content

Audit

The audit subsystem records every harness execution as an ExecutionTrace — intent, generated code, policy results, tool calls, cost, and errors. Recorders are pluggable via the AuditRecorderProtocol; ship-in-memory for dev or SQLite for production.

AuditRecorderProtocol

AuditRecorderProtocol

Bases: Protocol

Structural interface satisfied by every audit recorder.

The protocol intentionally captures only the core methods every recorder must provide. Optional extensions (get_by_id, iter_since, vacuum_older_than) are provided by both shipped implementations but are not part of the protocol — callers that need them should depend on the concrete class instead.

Source code in src/agenticapi/harness/audit/recorder.py
@runtime_checkable
class AuditRecorderProtocol(Protocol):
    """Structural interface satisfied by every audit recorder.

    The protocol intentionally captures only the **core** methods every
    recorder must provide. Optional extensions (``get_by_id``,
    ``iter_since``, ``vacuum_older_than``) are provided by both shipped
    implementations but are not part of the protocol — callers that
    need them should depend on the concrete class instead.
    """

    async def record(self, trace: ExecutionTrace) -> None:
        """Persist an execution trace."""

    def get_records(
        self,
        *,
        endpoint_name: str | None = None,
        limit: int = 100,
    ) -> list[ExecutionTrace]:
        """Return the most recent traces, optionally filtered by endpoint."""

record async

record(trace: ExecutionTrace) -> None

Persist an execution trace.

Source code in src/agenticapi/harness/audit/recorder.py
async def record(self, trace: ExecutionTrace) -> None:
    """Persist an execution trace."""

get_records

get_records(
    *, endpoint_name: str | None = None, limit: int = 100
) -> list[ExecutionTrace]

Return the most recent traces, optionally filtered by endpoint.

Source code in src/agenticapi/harness/audit/recorder.py
def get_records(
    self,
    *,
    endpoint_name: str | None = None,
    limit: int = 100,
) -> list[ExecutionTrace]:
    """Return the most recent traces, optionally filtered by endpoint."""

InMemoryAuditRecorder

Bounded in-memory storage — fast, but wiped on process restart. Use for development, tests, and single-process demos.

InMemoryAuditRecorder module-attribute

InMemoryAuditRecorder = AuditRecorder

SqliteAuditRecorder

Persistent audit storage backed by the Python standard library sqlite3 module — zero new dependencies, survives process restarts, and exposes query helpers suitable for admin dashboards.

from agenticapi.harness import HarnessEngine
from agenticapi.harness.audit import SqliteAuditRecorder

recorder = SqliteAuditRecorder(path="./audit.sqlite", max_traces=100_000)
harness = HarnessEngine(audit_recorder=recorder, policies=[...])

Writes are serialized through an asyncio.Lock and dispatched via asyncio.to_thread, so the recorder is safe to share across concurrent requests without starving the event loop. Two indices are created on first use: (timestamp DESC) for recency queries and (endpoint_name) for per-endpoint dashboards.

SqliteAuditRecorder

Persistent :class:AuditRecorder backed by a single SQLite database file.

Satisfies the :class:AuditRecorderProtocol (structurally — no inheritance). Drop-in for the in-memory recorder:

.. code-block:: python

from agenticapi.harness import HarnessEngine
from agenticapi.harness.audit import SqliteAuditRecorder

recorder = SqliteAuditRecorder(path="./audit.sqlite")
harness = HarnessEngine(audit_recorder=recorder, policies=[...])

All methods are async even though SQLite is blocking — the blocking calls are off-loaded to a worker thread via :func:asyncio.to_thread so the event loop is never starved.

Concurrency. SQLite supports many concurrent readers and one concurrent writer. We open the database with check_same_thread=False and isolation_level=None (autocommit), and serialise writes through an :class:asyncio.Lock. This is correct for the agent-audit workload: writes happen at request rate, queries are dashboards, and we never need long transactions.

Source code in src/agenticapi/harness/audit/sqlite_store.py
class SqliteAuditRecorder:
    """Persistent :class:`AuditRecorder` backed by a single SQLite database file.

    Satisfies the :class:`AuditRecorderProtocol` (structurally — no
    inheritance). Drop-in for the in-memory recorder:

    .. code-block:: python

        from agenticapi.harness import HarnessEngine
        from agenticapi.harness.audit import SqliteAuditRecorder

        recorder = SqliteAuditRecorder(path="./audit.sqlite")
        harness = HarnessEngine(audit_recorder=recorder, policies=[...])

    All methods are async even though SQLite is blocking — the
    blocking calls are off-loaded to a worker thread via
    :func:`asyncio.to_thread` so the event loop is never starved.

    Concurrency.
        SQLite supports many concurrent readers and one concurrent
        writer. We open the database with ``check_same_thread=False``
        and ``isolation_level=None`` (autocommit), and serialise writes
        through an :class:`asyncio.Lock`. This is correct for the
        agent-audit workload: writes happen at request rate, queries
        are dashboards, and we never need long transactions.
    """

    def __init__(
        self,
        *,
        path: str | Path,
        max_traces: int | None = None,
    ) -> None:
        """Initialize the recorder.

        Args:
            path: Filesystem path to the SQLite database file. Created
                if missing. Use ``":memory:"`` for an in-process database
                that disappears on process exit (handy for tests).
            max_traces: Optional hard cap on the number of stored
                traces. When set, every ``record()`` call also evicts
                the oldest rows so the table size stays bounded.
                ``None`` (default) keeps everything until the user
                runs ``vacuum_older_than()``.
        """
        self._path = str(path)
        self._max_traces = max_traces
        self._write_lock = asyncio.Lock()
        # Hold a single long-lived connection so:
        #   1. ``:memory:`` databases survive across calls (each new
        #      connection to ``:memory:`` would otherwise be a fresh
        #      empty DB).
        #   2. File-based stores avoid the open/close churn at request
        #      rate. SQLite is happy with one writer + many readers
        #      from a single connection (we serialise writes via
        #      ``_write_lock``).
        self._conn = sqlite3.connect(
            self._path,
            check_same_thread=False,
            isolation_level=None,  # autocommit
        )
        self._conn.row_factory = sqlite3.Row
        # Schema creation is idempotent.
        self._conn.executescript(_SCHEMA_SQL)
        # Apply idempotent migrations for databases created before
        # newer columns landed. ``ALTER TABLE ADD COLUMN`` raises
        # ``sqlite3.OperationalError`` when the column already exists,
        # which we silently swallow — the migration is a no-op in
        # that case.
        for migration in _MIGRATIONS_SQL:
            with contextlib.suppress(sqlite3.OperationalError):
                self._conn.execute(migration)

    # ------------------------------------------------------------------
    # Connection helpers
    # ------------------------------------------------------------------

    @contextmanager
    def _connect(self) -> Iterator[sqlite3.Connection]:
        """Yield the long-lived connection (no open/close)."""
        yield self._conn

    def close(self) -> None:
        """Close the underlying SQLite connection. Idempotent."""
        with contextlib.suppress(sqlite3.ProgrammingError):
            self._conn.close()

    def __del__(self) -> None:
        # Best-effort cleanup. Users should call ``close()`` explicitly
        # in production code paths.
        with contextlib.suppress(Exception):
            self.close()

    # ------------------------------------------------------------------
    # AuditRecorderProtocol — record + get_records
    # ------------------------------------------------------------------

    async def record(self, trace: ExecutionTrace) -> None:
        """Persist an execution trace to SQLite."""
        row = _trace_to_row(trace)
        async with self._write_lock:
            await asyncio.to_thread(self._insert_row, row)
            if self._max_traces is not None:
                await asyncio.to_thread(self._vacuum_to_max, self._max_traces)
        logger.info(
            "audit_trace_recorded",
            backend="sqlite",
            trace_id=trace.trace_id,
            endpoint_name=trace.endpoint_name,
            intent_action=trace.intent_action,
            duration_ms=trace.execution_duration_ms,
            has_error=trace.error is not None,
        )

    def get_records(
        self,
        *,
        endpoint_name: str | None = None,
        limit: int = 100,
    ) -> list[ExecutionTrace]:
        """Return the ``limit`` most recent traces, optionally filtered.

        This call is **synchronous** to keep the protocol identical to
        the in-memory recorder. Internally it opens a short-lived
        connection — the typical query pattern (a dashboard ticking
        every few seconds) does not warrant async overhead. Callers
        on a hot path should use :meth:`iter_since` instead, which is
        async-stream-shaped.
        """
        return self._select_recent(endpoint_name=endpoint_name, limit=limit)

    # ------------------------------------------------------------------
    # Optional extensions: get_by_id, iter_since, vacuum
    # ------------------------------------------------------------------

    def get_by_id(self, trace_id: str) -> ExecutionTrace | None:
        """Look up a single trace by its identifier."""
        with self._connect() as conn:
            cur = conn.execute(
                "SELECT * FROM audit_traces WHERE trace_id = ?",
                (trace_id,),
            )
            row = cur.fetchone()
        if row is None:
            return None
        return _row_to_trace(row)

    async def iter_since(self, since: datetime) -> AsyncIterator[ExecutionTrace]:
        """Yield every trace recorded at or after ``since``.

        Streams rows so very large audit stores don't materialise the
        whole result set in memory.
        """
        cutoff = since.astimezone(UTC).isoformat()

        def _fetch_chunk(after_id: str | None, batch_size: int = 200) -> list[sqlite3.Row]:
            with self._connect() as conn:
                if after_id is None:
                    cur = conn.execute(
                        "SELECT * FROM audit_traces WHERE timestamp >= ? ORDER BY timestamp ASC, trace_id ASC LIMIT ?",
                        (cutoff, batch_size),
                    )
                else:
                    cur = conn.execute(
                        "SELECT * FROM audit_traces WHERE timestamp >= ? "
                        "AND trace_id > ? "
                        "ORDER BY timestamp ASC, trace_id ASC LIMIT ?",
                        (cutoff, after_id, batch_size),
                    )
                return list(cur.fetchall())

        last_id: str | None = None
        while True:
            chunk = await asyncio.to_thread(_fetch_chunk, last_id)
            if not chunk:
                return
            for row in chunk:
                last_id = row["trace_id"]
                yield _row_to_trace(row)

    async def vacuum_older_than(self, cutoff: datetime) -> int:
        """Drop every trace recorded before ``cutoff``.

        Returns:
            The number of rows removed.
        """
        cutoff_iso = cutoff.astimezone(UTC).isoformat()

        def _delete() -> int:
            with self._connect() as conn:
                cur = conn.execute(
                    "DELETE FROM audit_traces WHERE timestamp < ?",
                    (cutoff_iso,),
                )
                return cur.rowcount

        async with self._write_lock:
            removed = await asyncio.to_thread(_delete)
        if removed:
            logger.info("audit_traces_vacuumed", backend="sqlite", removed=removed, cutoff=cutoff_iso)
        return int(removed)

    async def count(self) -> int:
        """Return the total number of stored traces."""

        def _do() -> int:
            with self._connect() as conn:
                cur = conn.execute("SELECT COUNT(*) FROM audit_traces")
                return int(cur.fetchone()[0])

        return await asyncio.to_thread(_do)

    async def clear(self) -> None:
        """Remove every trace. Test-only / dev-only operation."""

        def _do() -> None:
            with self._connect() as conn:
                conn.execute("DELETE FROM audit_traces")

        async with self._write_lock:
            await asyncio.to_thread(_do)
        logger.info("audit_traces_cleared", backend="sqlite")

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _insert_row(self, row: tuple[Any, ...]) -> None:
        with self._connect() as conn:
            conn.execute(
                "INSERT OR REPLACE INTO audit_traces ("
                "trace_id, timestamp, endpoint_name, intent_raw, intent_action, "
                "generated_code, reasoning, execution_duration_ms, execution_result, "
                "error, llm_usage, policy_evaluations, approval_request_id, stream_events"
                ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                row,
            )

    def _vacuum_to_max(self, max_traces: int) -> None:
        with self._connect() as conn:
            cur = conn.execute("SELECT COUNT(*) FROM audit_traces")
            count = int(cur.fetchone()[0])
            if count <= max_traces:
                return
            excess = count - max_traces
            conn.execute(
                "DELETE FROM audit_traces WHERE trace_id IN ("
                "SELECT trace_id FROM audit_traces ORDER BY timestamp ASC LIMIT ?"
                ")",
                (excess,),
            )

    def _select_recent(
        self,
        *,
        endpoint_name: str | None,
        limit: int,
    ) -> list[ExecutionTrace]:
        with self._connect() as conn:
            if endpoint_name is None:
                cur = conn.execute(
                    "SELECT * FROM audit_traces ORDER BY timestamp DESC LIMIT ?",
                    (limit,),
                )
            else:
                cur = conn.execute(
                    "SELECT * FROM audit_traces WHERE endpoint_name = ? ORDER BY timestamp DESC LIMIT ?",
                    (endpoint_name, limit),
                )
            rows = list(cur.fetchall())
        return [_row_to_trace(row) for row in rows]

__init__

__init__(
    *, path: str | Path, max_traces: int | None = None
) -> None

Initialize the recorder.

Parameters:

Name Type Description Default
path str | Path

Filesystem path to the SQLite database file. Created if missing. Use ":memory:" for an in-process database that disappears on process exit (handy for tests).

required
max_traces int | None

Optional hard cap on the number of stored traces. When set, every record() call also evicts the oldest rows so the table size stays bounded. None (default) keeps everything until the user runs vacuum_older_than().

None
Source code in src/agenticapi/harness/audit/sqlite_store.py
def __init__(
    self,
    *,
    path: str | Path,
    max_traces: int | None = None,
) -> None:
    """Initialize the recorder.

    Args:
        path: Filesystem path to the SQLite database file. Created
            if missing. Use ``":memory:"`` for an in-process database
            that disappears on process exit (handy for tests).
        max_traces: Optional hard cap on the number of stored
            traces. When set, every ``record()`` call also evicts
            the oldest rows so the table size stays bounded.
            ``None`` (default) keeps everything until the user
            runs ``vacuum_older_than()``.
    """
    self._path = str(path)
    self._max_traces = max_traces
    self._write_lock = asyncio.Lock()
    # Hold a single long-lived connection so:
    #   1. ``:memory:`` databases survive across calls (each new
    #      connection to ``:memory:`` would otherwise be a fresh
    #      empty DB).
    #   2. File-based stores avoid the open/close churn at request
    #      rate. SQLite is happy with one writer + many readers
    #      from a single connection (we serialise writes via
    #      ``_write_lock``).
    self._conn = sqlite3.connect(
        self._path,
        check_same_thread=False,
        isolation_level=None,  # autocommit
    )
    self._conn.row_factory = sqlite3.Row
    # Schema creation is idempotent.
    self._conn.executescript(_SCHEMA_SQL)
    # Apply idempotent migrations for databases created before
    # newer columns landed. ``ALTER TABLE ADD COLUMN`` raises
    # ``sqlite3.OperationalError`` when the column already exists,
    # which we silently swallow — the migration is a no-op in
    # that case.
    for migration in _MIGRATIONS_SQL:
        with contextlib.suppress(sqlite3.OperationalError):
            self._conn.execute(migration)

close

close() -> None

Close the underlying SQLite connection. Idempotent.

Source code in src/agenticapi/harness/audit/sqlite_store.py
def close(self) -> None:
    """Close the underlying SQLite connection. Idempotent."""
    with contextlib.suppress(sqlite3.ProgrammingError):
        self._conn.close()

record async

record(trace: ExecutionTrace) -> None

Persist an execution trace to SQLite.

Source code in src/agenticapi/harness/audit/sqlite_store.py
async def record(self, trace: ExecutionTrace) -> None:
    """Persist an execution trace to SQLite."""
    row = _trace_to_row(trace)
    async with self._write_lock:
        await asyncio.to_thread(self._insert_row, row)
        if self._max_traces is not None:
            await asyncio.to_thread(self._vacuum_to_max, self._max_traces)
    logger.info(
        "audit_trace_recorded",
        backend="sqlite",
        trace_id=trace.trace_id,
        endpoint_name=trace.endpoint_name,
        intent_action=trace.intent_action,
        duration_ms=trace.execution_duration_ms,
        has_error=trace.error is not None,
    )

get_records

get_records(
    *, endpoint_name: str | None = None, limit: int = 100
) -> list[ExecutionTrace]

Return the limit most recent traces, optionally filtered.

This call is synchronous to keep the protocol identical to the in-memory recorder. Internally it opens a short-lived connection — the typical query pattern (a dashboard ticking every few seconds) does not warrant async overhead. Callers on a hot path should use :meth:iter_since instead, which is async-stream-shaped.

Source code in src/agenticapi/harness/audit/sqlite_store.py
def get_records(
    self,
    *,
    endpoint_name: str | None = None,
    limit: int = 100,
) -> list[ExecutionTrace]:
    """Return the ``limit`` most recent traces, optionally filtered.

    This call is **synchronous** to keep the protocol identical to
    the in-memory recorder. Internally it opens a short-lived
    connection — the typical query pattern (a dashboard ticking
    every few seconds) does not warrant async overhead. Callers
    on a hot path should use :meth:`iter_since` instead, which is
    async-stream-shaped.
    """
    return self._select_recent(endpoint_name=endpoint_name, limit=limit)

get_by_id

get_by_id(trace_id: str) -> ExecutionTrace | None

Look up a single trace by its identifier.

Source code in src/agenticapi/harness/audit/sqlite_store.py
def get_by_id(self, trace_id: str) -> ExecutionTrace | None:
    """Look up a single trace by its identifier."""
    with self._connect() as conn:
        cur = conn.execute(
            "SELECT * FROM audit_traces WHERE trace_id = ?",
            (trace_id,),
        )
        row = cur.fetchone()
    if row is None:
        return None
    return _row_to_trace(row)

iter_since async

iter_since(
    since: datetime,
) -> AsyncIterator[ExecutionTrace]

Yield every trace recorded at or after since.

Streams rows so very large audit stores don't materialise the whole result set in memory.

Source code in src/agenticapi/harness/audit/sqlite_store.py
async def iter_since(self, since: datetime) -> AsyncIterator[ExecutionTrace]:
    """Yield every trace recorded at or after ``since``.

    Streams rows so very large audit stores don't materialise the
    whole result set in memory.
    """
    cutoff = since.astimezone(UTC).isoformat()

    def _fetch_chunk(after_id: str | None, batch_size: int = 200) -> list[sqlite3.Row]:
        with self._connect() as conn:
            if after_id is None:
                cur = conn.execute(
                    "SELECT * FROM audit_traces WHERE timestamp >= ? ORDER BY timestamp ASC, trace_id ASC LIMIT ?",
                    (cutoff, batch_size),
                )
            else:
                cur = conn.execute(
                    "SELECT * FROM audit_traces WHERE timestamp >= ? "
                    "AND trace_id > ? "
                    "ORDER BY timestamp ASC, trace_id ASC LIMIT ?",
                    (cutoff, after_id, batch_size),
                )
            return list(cur.fetchall())

    last_id: str | None = None
    while True:
        chunk = await asyncio.to_thread(_fetch_chunk, last_id)
        if not chunk:
            return
        for row in chunk:
            last_id = row["trace_id"]
            yield _row_to_trace(row)

vacuum_older_than async

vacuum_older_than(cutoff: datetime) -> int

Drop every trace recorded before cutoff.

Returns:

Type Description
int

The number of rows removed.

Source code in src/agenticapi/harness/audit/sqlite_store.py
async def vacuum_older_than(self, cutoff: datetime) -> int:
    """Drop every trace recorded before ``cutoff``.

    Returns:
        The number of rows removed.
    """
    cutoff_iso = cutoff.astimezone(UTC).isoformat()

    def _delete() -> int:
        with self._connect() as conn:
            cur = conn.execute(
                "DELETE FROM audit_traces WHERE timestamp < ?",
                (cutoff_iso,),
            )
            return cur.rowcount

    async with self._write_lock:
        removed = await asyncio.to_thread(_delete)
    if removed:
        logger.info("audit_traces_vacuumed", backend="sqlite", removed=removed, cutoff=cutoff_iso)
    return int(removed)

count async

count() -> int

Return the total number of stored traces.

Source code in src/agenticapi/harness/audit/sqlite_store.py
async def count(self) -> int:
    """Return the total number of stored traces."""

    def _do() -> int:
        with self._connect() as conn:
            cur = conn.execute("SELECT COUNT(*) FROM audit_traces")
            return int(cur.fetchone()[0])

    return await asyncio.to_thread(_do)

clear async

clear() -> None

Remove every trace. Test-only / dev-only operation.

Source code in src/agenticapi/harness/audit/sqlite_store.py
async def clear(self) -> None:
    """Remove every trace. Test-only / dev-only operation."""

    def _do() -> None:
        with self._connect() as conn:
            conn.execute("DELETE FROM audit_traces")

    async with self._write_lock:
        await asyncio.to_thread(_do)
    logger.info("audit_traces_cleared", backend="sqlite")

ExecutionTrace

ExecutionTrace dataclass

Complete trace of an agent execution for auditing.

Captures all stages of the execution pipeline: intent parsing, code generation, policy evaluation, sandbox execution, and the final result.

Attributes:

Name Type Description
trace_id str

Unique identifier for this execution trace.

endpoint_name str

Name of the agent endpoint that handled the request.

timestamp datetime

When the execution started.

intent_raw str

The original natural language request.

intent_action str

The classified action type (read, write, etc.).

generated_code str

The Python code generated by the LLM.

reasoning str | None

Optional chain-of-thought reasoning from the LLM.

policy_evaluations list[dict[str, Any]]

Results from each policy evaluation.

execution_result Any

The output of the sandbox execution.

execution_duration_ms float

Total execution duration in milliseconds.

error str | None

Error message if the execution failed.

llm_usage dict[str, int] | None

Token usage statistics from LLM calls.

approval_request_id str | None

ID of an associated approval request, if any.

stream_events list[dict[str, Any]]

Phase F8 — list of typed agent lifecycle events emitted on this request's :class:AgentStream. Empty for non-streaming endpoints. Each entry is the model_dump() of an :class:agenticapi.interface.stream.AgentEvent so it JSON-serialises cleanly into the audit store.

Source code in src/agenticapi/harness/audit/trace.py
@dataclass(slots=True)
class ExecutionTrace:
    """Complete trace of an agent execution for auditing.

    Captures all stages of the execution pipeline: intent parsing,
    code generation, policy evaluation, sandbox execution, and the
    final result.

    Attributes:
        trace_id: Unique identifier for this execution trace.
        endpoint_name: Name of the agent endpoint that handled the request.
        timestamp: When the execution started.
        intent_raw: The original natural language request.
        intent_action: The classified action type (read, write, etc.).
        generated_code: The Python code generated by the LLM.
        reasoning: Optional chain-of-thought reasoning from the LLM.
        policy_evaluations: Results from each policy evaluation.
        execution_result: The output of the sandbox execution.
        execution_duration_ms: Total execution duration in milliseconds.
        error: Error message if the execution failed.
        llm_usage: Token usage statistics from LLM calls.
        approval_request_id: ID of an associated approval request, if any.
        stream_events: Phase F8 — list of typed agent lifecycle events
            emitted on this request's :class:`AgentStream`. Empty for
            non-streaming endpoints. Each entry is the
            ``model_dump()`` of an
            :class:`agenticapi.interface.stream.AgentEvent` so it
            JSON-serialises cleanly into the audit store.
    """

    trace_id: str
    endpoint_name: str
    timestamp: datetime
    intent_raw: str
    intent_action: str = ""
    generated_code: str = ""
    reasoning: str | None = None
    policy_evaluations: list[dict[str, Any]] = field(default_factory=list)
    execution_result: Any = None
    execution_duration_ms: float = 0.0
    error: str | None = None
    llm_usage: dict[str, int] | None = None
    approval_request_id: str | None = None
    stream_events: list[dict[str, Any]] = field(default_factory=list)

Exporters

ConsoleExporter

Exports execution traces to stdout as JSON.

Useful for development and debugging. Requires no external dependencies.

Example

exporter = ConsoleExporter() await exporter.export(trace)

Source code in src/agenticapi/harness/audit/exporters.py
class ConsoleExporter:
    """Exports execution traces to stdout as JSON.

    Useful for development and debugging. Requires no external
    dependencies.

    Example:
        exporter = ConsoleExporter()
        await exporter.export(trace)
    """

    def __init__(self, *, pretty: bool = True) -> None:
        """Initialize the console exporter.

        Args:
            pretty: If True, format JSON with indentation.
        """
        self._pretty = pretty

    async def export(self, trace: ExecutionTrace) -> None:
        """Print the trace as JSON to stdout.

        Args:
            trace: The execution trace to export.
        """
        data = {
            "trace_id": trace.trace_id,
            "endpoint_name": trace.endpoint_name,
            "timestamp": trace.timestamp.isoformat() if trace.timestamp else None,
            "intent_raw": trace.intent_raw,
            "intent_action": trace.intent_action,
            "generated_code": trace.generated_code,
            "reasoning": trace.reasoning,
            "policy_evaluations": trace.policy_evaluations,
            "execution_result": str(trace.execution_result) if trace.execution_result is not None else None,
            "execution_duration_ms": trace.execution_duration_ms,
            "error": trace.error,
            "approval_request_id": trace.approval_request_id,
        }

        indent = 2 if self._pretty else None
        output = json.dumps(data, indent=indent, default=str)
        print(output)

        logger.debug("console_export_complete", trace_id=trace.trace_id)

__init__

__init__(*, pretty: bool = True) -> None

Initialize the console exporter.

Parameters:

Name Type Description Default
pretty bool

If True, format JSON with indentation.

True
Source code in src/agenticapi/harness/audit/exporters.py
def __init__(self, *, pretty: bool = True) -> None:
    """Initialize the console exporter.

    Args:
        pretty: If True, format JSON with indentation.
    """
    self._pretty = pretty

export async

export(trace: ExecutionTrace) -> None

Print the trace as JSON to stdout.

Parameters:

Name Type Description Default
trace ExecutionTrace

The execution trace to export.

required
Source code in src/agenticapi/harness/audit/exporters.py
async def export(self, trace: ExecutionTrace) -> None:
    """Print the trace as JSON to stdout.

    Args:
        trace: The execution trace to export.
    """
    data = {
        "trace_id": trace.trace_id,
        "endpoint_name": trace.endpoint_name,
        "timestamp": trace.timestamp.isoformat() if trace.timestamp else None,
        "intent_raw": trace.intent_raw,
        "intent_action": trace.intent_action,
        "generated_code": trace.generated_code,
        "reasoning": trace.reasoning,
        "policy_evaluations": trace.policy_evaluations,
        "execution_result": str(trace.execution_result) if trace.execution_result is not None else None,
        "execution_duration_ms": trace.execution_duration_ms,
        "error": trace.error,
        "approval_request_id": trace.approval_request_id,
    }

    indent = 2 if self._pretty else None
    output = json.dumps(data, indent=indent, default=str)
    print(output)

    logger.debug("console_export_complete", trace_id=trace.trace_id)

CompositeExporter

Fans out trace exports to multiple exporters.

Example

exporter = CompositeExporter([ConsoleExporter(), OpenTelemetryExporter()]) await exporter.export(trace)

Source code in src/agenticapi/harness/audit/exporters.py
class CompositeExporter:
    """Fans out trace exports to multiple exporters.

    Example:
        exporter = CompositeExporter([ConsoleExporter(), OpenTelemetryExporter()])
        await exporter.export(trace)
    """

    def __init__(self, exporters: list[AuditExporter]) -> None:
        """Initialize with a list of exporters.

        Args:
            exporters: List of exporters to fan out to.
        """
        self._exporters = exporters

    async def export(self, trace: ExecutionTrace) -> None:
        """Export the trace to all registered exporters in parallel.

        Uses asyncio.gather for concurrent exports. Individual exporter
        failures are logged but do not prevent other exporters from running.

        Args:
            trace: The execution trace to export.
        """
        if not self._exporters:
            return

        results = await asyncio.gather(
            *(exporter.export(trace) for exporter in self._exporters),
            return_exceptions=True,
        )
        for exporter, result in zip(self._exporters, results, strict=True):
            if isinstance(result, Exception):
                logger.error(
                    "exporter_failed",
                    exporter=type(exporter).__name__,
                    trace_id=trace.trace_id,
                    error=str(result),
                )

__init__

__init__(exporters: list[AuditExporter]) -> None

Initialize with a list of exporters.

Parameters:

Name Type Description Default
exporters list[AuditExporter]

List of exporters to fan out to.

required
Source code in src/agenticapi/harness/audit/exporters.py
def __init__(self, exporters: list[AuditExporter]) -> None:
    """Initialize with a list of exporters.

    Args:
        exporters: List of exporters to fan out to.
    """
    self._exporters = exporters

export async

export(trace: ExecutionTrace) -> None

Export the trace to all registered exporters in parallel.

Uses asyncio.gather for concurrent exports. Individual exporter failures are logged but do not prevent other exporters from running.

Parameters:

Name Type Description Default
trace ExecutionTrace

The execution trace to export.

required
Source code in src/agenticapi/harness/audit/exporters.py
async def export(self, trace: ExecutionTrace) -> None:
    """Export the trace to all registered exporters in parallel.

    Uses asyncio.gather for concurrent exports. Individual exporter
    failures are logged but do not prevent other exporters from running.

    Args:
        trace: The execution trace to export.
    """
    if not self._exporters:
        return

    results = await asyncio.gather(
        *(exporter.export(trace) for exporter in self._exporters),
        return_exceptions=True,
    )
    for exporter, result in zip(self._exporters, results, strict=True):
        if isinstance(result, Exception):
            logger.error(
                "exporter_failed",
                exporter=type(exporter).__name__,
                trace_id=trace.trace_id,
                error=str(result),
            )