Skip to content

Policies

Policy (Base)

Policy

Bases: BaseModel

Base class for all harness policies.

Subclasses implement evaluate() to check generated code against their specific constraints. Policies are pure computation (sync, no I/O) and must be deterministic for a given input.

Example

class MyPolicy(Policy): max_lines: int = 100

def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
    lines = code.count("\n") + 1
    if lines > self.max_lines:
        return PolicyResult(
            allowed=False,
            violations=[f"Code has {lines} lines, max is {self.max_lines}"],
            policy_name="MyPolicy",
        )
    return PolicyResult(allowed=True, policy_name="MyPolicy")
Source code in src/agenticapi/harness/policy/base.py
class Policy(BaseModel):
    """Base class for all harness policies.

    Subclasses implement evaluate() to check generated code against
    their specific constraints. Policies are pure computation (sync,
    no I/O) and must be deterministic for a given input.

    Example:
        class MyPolicy(Policy):
            max_lines: int = 100

            def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
                lines = code.count("\\n") + 1
                if lines > self.max_lines:
                    return PolicyResult(
                        allowed=False,
                        violations=[f"Code has {lines} lines, max is {self.max_lines}"],
                        policy_name="MyPolicy",
                    )
                return PolicyResult(allowed=True, policy_name="MyPolicy")
    """

    model_config = {"extra": "forbid"}

    def evaluate(
        self,
        *,
        code: str,
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """Evaluate generated code against this policy.

        Args:
            code: The generated Python source code to evaluate.
            intent_action: The classified action type (read, write, etc.).
            intent_domain: The domain of the request (order, product, etc.).
            **kwargs: Additional context for evaluation.

        Returns:
            PolicyResult indicating whether the code is allowed.
        """
        return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

    def evaluate_intent_text(
        self,
        *,
        intent_text: str,
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """Evaluate raw user intent text before it reaches the LLM.

        Called by the framework **before** the LLM fires, so policies
        can block prompt injection, PII, or other unsafe content at
        the earliest possible point. Policies whose domain is generated
        code leave the default allow-everything implementation.

        Args:
            intent_text: The raw natural-language string from the request.
            intent_action: The classified intent action, if available.
            intent_domain: The classified intent domain, if available.
            **kwargs: Additional context for evaluation.

        Returns:
            :class:`PolicyResult` — default allows every input.
        """
        del intent_text, intent_action, intent_domain, kwargs
        return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

    def evaluate_tool_call(
        self,
        *,
        tool_name: str,
        arguments: dict[str, Any],
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """Evaluate a direct tool call against this policy (Phase E4).

        The harness's **tool-first execution path** skips code
        generation entirely when the LLM returns a structured
        function call. In that case there's no generated code to run
        through :meth:`evaluate`; instead, every registered policy is
        asked whether the *call itself* — identified by the tool's
        name plus the keyword arguments the model produced — is
        allowed.

        Subclasses override this hook to enforce constraints at the
        tool-call boundary. ``CodePolicy`` uses the default
        allow-everything behaviour (its domain is AST analysis of
        generated code, not tool arguments). ``DataPolicy`` uses it
        to block DDL tool names (``drop_*``, ``truncate_*``) and
        argument values that match restricted tables.

        Args:
            tool_name: The name of the tool the model wants to call.
            arguments: The keyword arguments the model produced for
                the tool. Always a dict.
            intent_action: The classified intent action. Available
                for rules that care about read/write/destructive.
            intent_domain: The classified intent domain.
            **kwargs: Additional context for evaluation.

        Returns:
            :class:`PolicyResult` — default implementation allows
            every tool call. Subclasses narrow as needed.
        """
        del tool_name, arguments, intent_action, intent_domain, kwargs
        return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

evaluate

evaluate(
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

Evaluate generated code against this policy.

Parameters:

Name Type Description Default
code str

The generated Python source code to evaluate.

required
intent_action str

The classified action type (read, write, etc.).

''
intent_domain str

The domain of the request (order, product, etc.).

''
**kwargs Any

Additional context for evaluation.

{}

Returns:

Type Description
PolicyResult

PolicyResult indicating whether the code is allowed.

Source code in src/agenticapi/harness/policy/base.py
def evaluate(
    self,
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """Evaluate generated code against this policy.

    Args:
        code: The generated Python source code to evaluate.
        intent_action: The classified action type (read, write, etc.).
        intent_domain: The domain of the request (order, product, etc.).
        **kwargs: Additional context for evaluation.

    Returns:
        PolicyResult indicating whether the code is allowed.
    """
    return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

evaluate_intent_text

evaluate_intent_text(
    *,
    intent_text: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

Evaluate raw user intent text before it reaches the LLM.

Called by the framework before the LLM fires, so policies can block prompt injection, PII, or other unsafe content at the earliest possible point. Policies whose domain is generated code leave the default allow-everything implementation.

Parameters:

Name Type Description Default
intent_text str

The raw natural-language string from the request.

required
intent_action str

The classified intent action, if available.

''
intent_domain str

The classified intent domain, if available.

''
**kwargs Any

Additional context for evaluation.

{}

Returns:

Type Description
PolicyResult

class:PolicyResult — default allows every input.

Source code in src/agenticapi/harness/policy/base.py
def evaluate_intent_text(
    self,
    *,
    intent_text: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """Evaluate raw user intent text before it reaches the LLM.

    Called by the framework **before** the LLM fires, so policies
    can block prompt injection, PII, or other unsafe content at
    the earliest possible point. Policies whose domain is generated
    code leave the default allow-everything implementation.

    Args:
        intent_text: The raw natural-language string from the request.
        intent_action: The classified intent action, if available.
        intent_domain: The classified intent domain, if available.
        **kwargs: Additional context for evaluation.

    Returns:
        :class:`PolicyResult` — default allows every input.
    """
    del intent_text, intent_action, intent_domain, kwargs
    return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

evaluate_tool_call

evaluate_tool_call(
    *,
    tool_name: str,
    arguments: dict[str, Any],
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

Evaluate a direct tool call against this policy (Phase E4).

The harness's tool-first execution path skips code generation entirely when the LLM returns a structured function call. In that case there's no generated code to run through :meth:evaluate; instead, every registered policy is asked whether the call itself — identified by the tool's name plus the keyword arguments the model produced — is allowed.

Subclasses override this hook to enforce constraints at the tool-call boundary. CodePolicy uses the default allow-everything behaviour (its domain is AST analysis of generated code, not tool arguments). DataPolicy uses it to block DDL tool names (drop_*, truncate_*) and argument values that match restricted tables.

Parameters:

Name Type Description Default
tool_name str

The name of the tool the model wants to call.

required
arguments dict[str, Any]

The keyword arguments the model produced for the tool. Always a dict.

required
intent_action str

The classified intent action. Available for rules that care about read/write/destructive.

''
intent_domain str

The classified intent domain.

''
**kwargs Any

Additional context for evaluation.

{}

Returns:

Type Description
PolicyResult

class:PolicyResult — default implementation allows

PolicyResult

every tool call. Subclasses narrow as needed.

Source code in src/agenticapi/harness/policy/base.py
def evaluate_tool_call(
    self,
    *,
    tool_name: str,
    arguments: dict[str, Any],
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """Evaluate a direct tool call against this policy (Phase E4).

    The harness's **tool-first execution path** skips code
    generation entirely when the LLM returns a structured
    function call. In that case there's no generated code to run
    through :meth:`evaluate`; instead, every registered policy is
    asked whether the *call itself* — identified by the tool's
    name plus the keyword arguments the model produced — is
    allowed.

    Subclasses override this hook to enforce constraints at the
    tool-call boundary. ``CodePolicy`` uses the default
    allow-everything behaviour (its domain is AST analysis of
    generated code, not tool arguments). ``DataPolicy`` uses it
    to block DDL tool names (``drop_*``, ``truncate_*``) and
    argument values that match restricted tables.

    Args:
        tool_name: The name of the tool the model wants to call.
        arguments: The keyword arguments the model produced for
            the tool. Always a dict.
        intent_action: The classified intent action. Available
            for rules that care about read/write/destructive.
        intent_domain: The classified intent domain.
        **kwargs: Additional context for evaluation.

    Returns:
        :class:`PolicyResult` — default implementation allows
        every tool call. Subclasses narrow as needed.
    """
    del tool_name, arguments, intent_action, intent_domain, kwargs
    return PolicyResult(allowed=True, policy_name=self.__class__.__name__)

PolicyResult

PolicyResult

Bases: BaseModel

Result of a policy evaluation.

Attributes:

Name Type Description
allowed bool

Whether the code is allowed under this policy.

violations list[str]

List of violation descriptions if not allowed.

warnings list[str]

List of non-blocking warnings.

policy_name str

Name of the policy that produced this result.

Source code in src/agenticapi/harness/policy/base.py
class PolicyResult(BaseModel):
    """Result of a policy evaluation.

    Attributes:
        allowed: Whether the code is allowed under this policy.
        violations: List of violation descriptions if not allowed.
        warnings: List of non-blocking warnings.
        policy_name: Name of the policy that produced this result.
    """

    allowed: bool
    violations: list[str] = Field(default_factory=list)
    warnings: list[str] = Field(default_factory=list)
    policy_name: str = ""

CodePolicy

CodePolicy

Bases: Policy

Policy that validates generated code against module and pattern restrictions.

Uses AST parsing to detect dangerous patterns such as forbidden imports, eval/exec usage, dynamic imports, and network access.

Attributes:

Name Type Description
allowed_modules list[str]

Whitelist of allowed modules (empty = no whitelist filtering).

denied_modules list[str]

Blacklist of denied modules.

max_code_lines int

Maximum number of lines allowed in generated code.

deny_eval_exec bool

Whether to deny eval() and exec() calls.

deny_dynamic_import bool

Whether to deny import() calls.

allow_network bool

Whether to allow network-related modules.

allowed_hosts list[str]

Whitelist of allowed hosts (unused in static analysis).

Source code in src/agenticapi/harness/policy/code_policy.py
class CodePolicy(Policy):
    """Policy that validates generated code against module and pattern restrictions.

    Uses AST parsing to detect dangerous patterns such as forbidden imports,
    eval/exec usage, dynamic imports, and network access.

    Attributes:
        allowed_modules: Whitelist of allowed modules (empty = no whitelist filtering).
        denied_modules: Blacklist of denied modules.
        max_code_lines: Maximum number of lines allowed in generated code.
        deny_eval_exec: Whether to deny eval() and exec() calls.
        deny_dynamic_import: Whether to deny __import__() calls.
        allow_network: Whether to allow network-related modules.
        allowed_hosts: Whitelist of allowed hosts (unused in static analysis).
    """

    allowed_modules: list[str] = Field(default_factory=list)
    denied_modules: list[str] = Field(default_factory=lambda: list(_DEFAULT_DENIED_MODULES))
    max_code_lines: int = Field(default=500, ge=1)
    deny_eval_exec: bool = True
    deny_dynamic_import: bool = True
    allow_network: bool = False
    allowed_hosts: list[str] = Field(default_factory=list)

    def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
        """Evaluate generated code against code restrictions.

        Args:
            code: The generated Python source code.
            **kwargs: Additional context (ignored).

        Returns:
            PolicyResult with any violations found.
        """
        violations: list[str] = []
        warnings: list[str] = []

        # Check line count
        line_count = code.count("\n") + 1
        if line_count > self.max_code_lines:
            violations.append(f"Code has {line_count} lines, exceeds maximum of {self.max_code_lines}")

        # Parse AST
        try:
            tree = ast.parse(code)
        except SyntaxError as e:
            violations.append(f"Code has syntax error: {e}")
            return PolicyResult(
                allowed=False,
                violations=violations,
                warnings=warnings,
                policy_name="CodePolicy",
            )

        # Walk AST nodes
        for node in ast.walk(tree):
            self._check_imports(node, violations, warnings)
            self._check_eval_exec(node, violations)
            self._check_dynamic_import(node, violations)

        allowed = len(violations) == 0
        return PolicyResult(
            allowed=allowed,
            violations=violations,
            warnings=warnings,
            policy_name="CodePolicy",
        )

    def _check_imports(
        self,
        node: ast.AST,
        violations: list[str],
        warnings: list[str],
    ) -> None:
        """Check import statements against allowed/denied module lists."""
        module_names: list[str] = []

        if isinstance(node, ast.Import):
            module_names = [alias.name for alias in node.names]
        elif isinstance(node, ast.ImportFrom) and node.module is not None:
            module_names = [node.module]

        for module_name in module_names:
            # Get the top-level module name
            top_level = module_name.split(".")[0]

            # Check denied modules
            if top_level in self.denied_modules or module_name in self.denied_modules:
                violations.append(f"Import of denied module: {module_name}")
                continue

            # Check network modules
            if not self.allow_network and top_level in _NETWORK_MODULES:
                violations.append(f"Import of network module not allowed: {module_name}")
                continue

            # Check allowed modules whitelist
            if (
                self.allowed_modules
                and top_level not in self.allowed_modules
                and module_name not in self.allowed_modules
            ):
                violations.append(f"Import of module not in allowed list: {module_name}")

    def _check_eval_exec(self, node: ast.AST, violations: list[str]) -> None:
        """Check for eval() and exec() calls."""
        if not self.deny_eval_exec:
            return

        if isinstance(node, ast.Call):
            func = node.func
            if isinstance(func, ast.Name) and func.id in ("eval", "exec"):
                violations.append(f"Use of {func.id}() is denied")

    def _check_dynamic_import(self, node: ast.AST, violations: list[str]) -> None:
        """Check for __import__() calls."""
        if not self.deny_dynamic_import:
            return

        if isinstance(node, ast.Call):
            func = node.func
            if isinstance(func, ast.Name) and func.id == "__import__":
                violations.append("Use of __import__() is denied")

evaluate

evaluate(*, code: str, **kwargs: Any) -> PolicyResult

Evaluate generated code against code restrictions.

Parameters:

Name Type Description Default
code str

The generated Python source code.

required
**kwargs Any

Additional context (ignored).

{}

Returns:

Type Description
PolicyResult

PolicyResult with any violations found.

Source code in src/agenticapi/harness/policy/code_policy.py
def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
    """Evaluate generated code against code restrictions.

    Args:
        code: The generated Python source code.
        **kwargs: Additional context (ignored).

    Returns:
        PolicyResult with any violations found.
    """
    violations: list[str] = []
    warnings: list[str] = []

    # Check line count
    line_count = code.count("\n") + 1
    if line_count > self.max_code_lines:
        violations.append(f"Code has {line_count} lines, exceeds maximum of {self.max_code_lines}")

    # Parse AST
    try:
        tree = ast.parse(code)
    except SyntaxError as e:
        violations.append(f"Code has syntax error: {e}")
        return PolicyResult(
            allowed=False,
            violations=violations,
            warnings=warnings,
            policy_name="CodePolicy",
        )

    # Walk AST nodes
    for node in ast.walk(tree):
        self._check_imports(node, violations, warnings)
        self._check_eval_exec(node, violations)
        self._check_dynamic_import(node, violations)

    allowed = len(violations) == 0
    return PolicyResult(
        allowed=allowed,
        violations=violations,
        warnings=warnings,
        policy_name="CodePolicy",
    )

DataPolicy

DataPolicy

Bases: Policy

Policy that validates SQL and data access patterns in generated code.

Enforces table-level access controls, column restrictions, and DDL prevention through regex-based SQL pattern detection.

Attributes:

Name Type Description
readable_tables list[str]

Tables allowed for SELECT queries (empty = all allowed).

writable_tables list[str]

Tables allowed for INSERT/UPDATE/DELETE (empty = all allowed).

restricted_columns list[str]

Column references to deny, e.g. ["users.password_hash"].

max_query_duration_ms int

Maximum allowed query duration hint.

max_result_rows int

Maximum result rows hint.

deny_ddl bool

Whether to deny DDL statements (DROP, ALTER, CREATE, TRUNCATE).

Source code in src/agenticapi/harness/policy/data_policy.py
class DataPolicy(Policy):
    """Policy that validates SQL and data access patterns in generated code.

    Enforces table-level access controls, column restrictions, and
    DDL prevention through regex-based SQL pattern detection.

    Attributes:
        readable_tables: Tables allowed for SELECT queries (empty = all allowed).
        writable_tables: Tables allowed for INSERT/UPDATE/DELETE (empty = all allowed).
        restricted_columns: Column references to deny, e.g. ["users.password_hash"].
        max_query_duration_ms: Maximum allowed query duration hint.
        max_result_rows: Maximum result rows hint.
        deny_ddl: Whether to deny DDL statements (DROP, ALTER, CREATE, TRUNCATE).
    """

    readable_tables: list[str] = Field(default_factory=list)
    writable_tables: list[str] = Field(default_factory=list)
    restricted_columns: list[str] = Field(default_factory=list)
    max_query_duration_ms: int = Field(default=5000, ge=1)
    max_result_rows: int = Field(default=10000, ge=1)
    deny_ddl: bool = True

    def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
        """Evaluate generated code for data access policy violations.

        Args:
            code: The generated Python source code containing SQL.
            **kwargs: Additional context (ignored).

        Returns:
            PolicyResult with any violations found.
        """
        violations: list[str] = []
        warnings: list[str] = []

        self._check_ddl(code, violations)
        self._check_select_tables(code, violations)
        self._check_write_tables(code, violations)
        self._check_restricted_columns(code, violations)
        self._check_result_limits(code, warnings)

        allowed = len(violations) == 0
        return PolicyResult(
            allowed=allowed,
            violations=violations,
            warnings=warnings,
            policy_name="DataPolicy",
        )

    def evaluate_tool_call(
        self,
        *,
        tool_name: str,
        arguments: dict[str, Any],
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """Block destructive tool calls and forbidden tables (Phase E4).

        Enforcement layers:

        1. ``deny_ddl=True`` blocks any tool whose *name* starts with
           ``drop_``, ``truncate_``, ``alter_``, or ``create_table``
           regardless of arguments. This is the "stop someone from
           exposing a ``drop_table`` tool" safety net.
        2. When ``readable_tables`` is set and the call looks like a
           read (``intent_action in {"read","search","aggregate"}``
           or the argument dict contains a ``table`` key), the table
           name is checked against the whitelist.
        3. When ``writable_tables`` is set and the argument dict
           contains a ``table`` key (common for tool shapes like
           ``insert_row(table=..., row=...)``), the table name is
           checked against the write whitelist.
        4. ``restricted_columns`` matches ``<table>.<column>`` in any
           argument value that's a string (handy for free-form SQL
           passed as a parameter).

        The default shape (no lists configured) is permissive except
        for the DDL name check, so turning on DataPolicy for a
        tool-first endpoint does not silently break unrelated
        tools.
        """
        del kwargs
        violations: list[str] = []
        warnings: list[str] = []
        name_lower = tool_name.lower()

        # (1) Destructive name patterns.
        if self.deny_ddl and (
            name_lower.startswith(("drop_", "truncate_", "alter_"))
            or name_lower in {"drop_table", "truncate_table", "create_table"}
        ):
            violations.append(f"DDL tool call not allowed: {tool_name}")

        # (2) / (3) Table whitelist enforcement.
        maybe_table = arguments.get("table") if isinstance(arguments, dict) else None
        if isinstance(maybe_table, str):
            lowered = maybe_table.lower()
            is_read = intent_action in {"read", "search", "aggregate", "analyze"} or name_lower.startswith(
                ("get_", "list_", "search_", "query_", "select_", "read_", "find_")
            )
            is_write = intent_action in {"write", "create", "update", "delete"} or name_lower.startswith(
                ("insert_", "update_", "delete_", "upsert_", "write_", "create_")
            )
            if is_read and self.readable_tables and lowered not in [t.lower() for t in self.readable_tables]:
                violations.append(f"Tool call reads table not in readable_tables: {maybe_table}")
            if is_write and self.writable_tables and lowered not in [t.lower() for t in self.writable_tables]:
                violations.append(f"Tool call writes table not in writable_tables: {maybe_table}")

        # (4) Restricted column references inside string arguments.
        if self.restricted_columns:
            restricted_lower = {ref.lower() for ref in self.restricted_columns}
            for arg_value in (arguments or {}).values():
                if not isinstance(arg_value, str):
                    continue
                for match in _TABLE_COLUMN_PATTERN.findall(arg_value):
                    table = match[0] or match[1] or match[2]
                    column = match[3] or match[4] or match[5]
                    if f"{table}.{column}".lower() in restricted_lower:
                        violations.append(f"Tool call references restricted column: {table}.{column}")

        del intent_domain
        return PolicyResult(
            allowed=not violations,
            violations=violations,
            warnings=warnings,
            policy_name="DataPolicy",
        )

    def _check_ddl(self, code: str, violations: list[str]) -> None:
        """Check for DDL statements."""
        if not self.deny_ddl:
            return

        matches = _DDL_PATTERN.findall(code)
        for operation, obj_type, name in matches:
            violations.append(f"DDL statement not allowed: {operation} {obj_type} {name}")

    def _check_select_tables(self, code: str, violations: list[str]) -> None:
        """Check SELECT queries against readable_tables whitelist.

        Checks tables referenced in FROM clauses, JOIN clauses, and
        subqueries so that table whitelist enforcement is not limited
        to the primary FROM table.
        """
        if not self.readable_tables:
            return

        allowed = {t.lower() for t in self.readable_tables}

        # Primary FROM tables
        for table_name in _SELECT_PATTERN.findall(code):
            if table_name.lower() not in allowed:
                violations.append(f"SELECT from table not in readable list: {table_name}")

        # JOIN tables (LEFT JOIN, INNER JOIN, CROSS JOIN, etc.)
        for table_name in _JOIN_PATTERN.findall(code):
            if table_name.lower() not in allowed:
                violations.append(f"JOIN references table not in readable list: {table_name}")

        # Subquery FROM clauses — extract all FROM <table> references
        # and check any that weren't already caught by _SELECT_PATTERN.
        primary_tables = {t.lower() for t in _SELECT_PATTERN.findall(code)}
        join_tables = {t.lower() for t in _JOIN_PATTERN.findall(code)}
        already_checked = primary_tables | join_tables
        for table_name in _SUBQUERY_FROM_PATTERN.findall(code):
            if table_name.lower() not in already_checked and table_name.lower() not in allowed:
                violations.append(f"Subquery references table not in readable list: {table_name}")

    def _check_write_tables(self, code: str, violations: list[str]) -> None:
        """Check write operations against writable_tables whitelist."""
        for pattern in _WRITE_PATTERNS:
            matches = pattern.findall(code)
            for table_name in matches:
                if self.writable_tables and table_name.lower() not in [t.lower() for t in self.writable_tables]:
                    violations.append(f"Write to table not in writable list: {table_name}")

    def _check_restricted_columns(self, code: str, violations: list[str]) -> None:
        """Check for references to restricted columns."""
        if not self.restricted_columns:
            return

        # Build a set of restricted references in lowercase for matching
        restricted_lower = {ref.lower() for ref in self.restricted_columns}

        matches = _TABLE_COLUMN_PATTERN.findall(code)
        for groups in matches:
            # Each match has 6 groups: 3 for table (backtick, quote, bare), 3 for column
            table = groups[0] or groups[1] or groups[2]
            column = groups[3] or groups[4] or groups[5]
            ref = f"{table}.{column}".lower()
            if ref in restricted_lower:
                violations.append(f"Access to restricted column: {table}.{column}")

    def _check_result_limits(self, code: str, warnings: list[str]) -> None:
        """Check for potential unlimited result sets."""
        # Warn if SELECT without LIMIT is found
        select_matches = _SELECT_PATTERN.findall(code)
        if select_matches:
            has_limit = re.search(r"\bLIMIT\b", code, re.IGNORECASE)
            if not has_limit:
                warnings.append(
                    f"SELECT query without LIMIT clause detected. "
                    f"Consider adding LIMIT {self.max_result_rows} to prevent large result sets."
                )

evaluate

evaluate(*, code: str, **kwargs: Any) -> PolicyResult

Evaluate generated code for data access policy violations.

Parameters:

Name Type Description Default
code str

The generated Python source code containing SQL.

required
**kwargs Any

Additional context (ignored).

{}

Returns:

Type Description
PolicyResult

PolicyResult with any violations found.

Source code in src/agenticapi/harness/policy/data_policy.py
def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
    """Evaluate generated code for data access policy violations.

    Args:
        code: The generated Python source code containing SQL.
        **kwargs: Additional context (ignored).

    Returns:
        PolicyResult with any violations found.
    """
    violations: list[str] = []
    warnings: list[str] = []

    self._check_ddl(code, violations)
    self._check_select_tables(code, violations)
    self._check_write_tables(code, violations)
    self._check_restricted_columns(code, violations)
    self._check_result_limits(code, warnings)

    allowed = len(violations) == 0
    return PolicyResult(
        allowed=allowed,
        violations=violations,
        warnings=warnings,
        policy_name="DataPolicy",
    )

evaluate_tool_call

evaluate_tool_call(
    *,
    tool_name: str,
    arguments: dict[str, Any],
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

Block destructive tool calls and forbidden tables (Phase E4).

Enforcement layers:

  1. deny_ddl=True blocks any tool whose name starts with drop_, truncate_, alter_, or create_table regardless of arguments. This is the "stop someone from exposing a drop_table tool" safety net.
  2. When readable_tables is set and the call looks like a read (intent_action in {"read","search","aggregate"} or the argument dict contains a table key), the table name is checked against the whitelist.
  3. When writable_tables is set and the argument dict contains a table key (common for tool shapes like insert_row(table=..., row=...)), the table name is checked against the write whitelist.
  4. restricted_columns matches <table>.<column> in any argument value that's a string (handy for free-form SQL passed as a parameter).

The default shape (no lists configured) is permissive except for the DDL name check, so turning on DataPolicy for a tool-first endpoint does not silently break unrelated tools.

Source code in src/agenticapi/harness/policy/data_policy.py
def evaluate_tool_call(
    self,
    *,
    tool_name: str,
    arguments: dict[str, Any],
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """Block destructive tool calls and forbidden tables (Phase E4).

    Enforcement layers:

    1. ``deny_ddl=True`` blocks any tool whose *name* starts with
       ``drop_``, ``truncate_``, ``alter_``, or ``create_table``
       regardless of arguments. This is the "stop someone from
       exposing a ``drop_table`` tool" safety net.
    2. When ``readable_tables`` is set and the call looks like a
       read (``intent_action in {"read","search","aggregate"}``
       or the argument dict contains a ``table`` key), the table
       name is checked against the whitelist.
    3. When ``writable_tables`` is set and the argument dict
       contains a ``table`` key (common for tool shapes like
       ``insert_row(table=..., row=...)``), the table name is
       checked against the write whitelist.
    4. ``restricted_columns`` matches ``<table>.<column>`` in any
       argument value that's a string (handy for free-form SQL
       passed as a parameter).

    The default shape (no lists configured) is permissive except
    for the DDL name check, so turning on DataPolicy for a
    tool-first endpoint does not silently break unrelated
    tools.
    """
    del kwargs
    violations: list[str] = []
    warnings: list[str] = []
    name_lower = tool_name.lower()

    # (1) Destructive name patterns.
    if self.deny_ddl and (
        name_lower.startswith(("drop_", "truncate_", "alter_"))
        or name_lower in {"drop_table", "truncate_table", "create_table"}
    ):
        violations.append(f"DDL tool call not allowed: {tool_name}")

    # (2) / (3) Table whitelist enforcement.
    maybe_table = arguments.get("table") if isinstance(arguments, dict) else None
    if isinstance(maybe_table, str):
        lowered = maybe_table.lower()
        is_read = intent_action in {"read", "search", "aggregate", "analyze"} or name_lower.startswith(
            ("get_", "list_", "search_", "query_", "select_", "read_", "find_")
        )
        is_write = intent_action in {"write", "create", "update", "delete"} or name_lower.startswith(
            ("insert_", "update_", "delete_", "upsert_", "write_", "create_")
        )
        if is_read and self.readable_tables and lowered not in [t.lower() for t in self.readable_tables]:
            violations.append(f"Tool call reads table not in readable_tables: {maybe_table}")
        if is_write and self.writable_tables and lowered not in [t.lower() for t in self.writable_tables]:
            violations.append(f"Tool call writes table not in writable_tables: {maybe_table}")

    # (4) Restricted column references inside string arguments.
    if self.restricted_columns:
        restricted_lower = {ref.lower() for ref in self.restricted_columns}
        for arg_value in (arguments or {}).values():
            if not isinstance(arg_value, str):
                continue
            for match in _TABLE_COLUMN_PATTERN.findall(arg_value):
                table = match[0] or match[1] or match[2]
                column = match[3] or match[4] or match[5]
                if f"{table}.{column}".lower() in restricted_lower:
                    violations.append(f"Tool call references restricted column: {table}.{column}")

    del intent_domain
    return PolicyResult(
        allowed=not violations,
        violations=violations,
        warnings=warnings,
        policy_name="DataPolicy",
    )

ResourcePolicy

ResourcePolicy

Bases: Policy

Policy that enforces resource limits on generated code.

Primarily stores resource limits for sandbox enforcement, but also performs basic static checks for obviously resource-intensive patterns.

Attributes:

Name Type Description
max_cpu_seconds float

Maximum CPU time in seconds.

max_memory_mb int

Maximum memory usage in megabytes.

max_execution_time_seconds float

Maximum wall-clock execution time.

max_concurrent_operations int

Maximum concurrent operations.

max_cost_per_request_usd float

Maximum estimated cost per request.

Source code in src/agenticapi/harness/policy/resource_policy.py
class ResourcePolicy(Policy):
    """Policy that enforces resource limits on generated code.

    Primarily stores resource limits for sandbox enforcement, but also
    performs basic static checks for obviously resource-intensive patterns.

    Attributes:
        max_cpu_seconds: Maximum CPU time in seconds.
        max_memory_mb: Maximum memory usage in megabytes.
        max_execution_time_seconds: Maximum wall-clock execution time.
        max_concurrent_operations: Maximum concurrent operations.
        max_cost_per_request_usd: Maximum estimated cost per request.
    """

    max_cpu_seconds: float = Field(default=30.0, gt=0)
    max_memory_mb: int = Field(default=512, ge=1)
    max_execution_time_seconds: float = Field(default=60.0, gt=0)
    max_concurrent_operations: int = Field(default=10, ge=1)
    max_cost_per_request_usd: float = Field(default=0.50, ge=0)

    def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
        """Evaluate generated code for resource-intensive patterns.

        Performs basic static analysis to detect obviously expensive
        operations like deeply nested loops or very large ranges.

        Args:
            code: The generated Python source code.
            **kwargs: Additional context (ignored).

        Returns:
            PolicyResult with any violations or warnings found.
        """
        violations: list[str] = []
        warnings: list[str] = []

        self._check_loop_depth(code, violations, warnings)
        self._check_large_ranges(code, violations, warnings)
        self._check_recursive_patterns(code, warnings)

        allowed = len(violations) == 0
        return PolicyResult(
            allowed=allowed,
            violations=violations,
            warnings=warnings,
            policy_name="ResourcePolicy",
        )

    def _check_loop_depth(self, code: str, violations: list[str], warnings: list[str]) -> None:
        """Check for deeply nested loops that may indicate O(n^k) complexity."""
        try:
            tree = ast.parse(code)
        except SyntaxError:
            return

        max_depth = _find_max_loop_depth(tree)
        if max_depth >= _NESTED_LOOP_DEPTH_THRESHOLD:
            violations.append(
                f"Deeply nested loops detected (depth {max_depth}). "
                f"Maximum allowed nesting depth is {_NESTED_LOOP_DEPTH_THRESHOLD - 1}."
            )
        elif max_depth >= 2:
            warnings.append(f"Nested loops detected (depth {max_depth}). May be resource-intensive.")

    def _check_large_ranges(self, code: str, violations: list[str], warnings: list[str]) -> None:
        """Check for very large range() calls."""
        matches = _LARGE_COLLECTION_PATTERN.findall(code)
        for match in matches:
            try:
                value = int(match)
                if value >= _LARGE_RANGE_THRESHOLD:
                    violations.append(
                        f"Very large range({value}) detected. Maximum allowed range size is {_LARGE_RANGE_THRESHOLD}."
                    )
            except ValueError:
                pass

    def _check_recursive_patterns(self, code: str, warnings: list[str]) -> None:
        """Check for potential recursive function calls."""
        try:
            tree = ast.parse(code)
        except SyntaxError:
            return

        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
                func_name = node.name
                for child in ast.walk(node):
                    if isinstance(child, ast.Call) and isinstance(child.func, ast.Name) and child.func.id == func_name:
                        warnings.append(
                            f"Recursive function '{func_name}' detected. Ensure it has proper termination conditions."
                        )
                        break

evaluate

evaluate(*, code: str, **kwargs: Any) -> PolicyResult

Evaluate generated code for resource-intensive patterns.

Performs basic static analysis to detect obviously expensive operations like deeply nested loops or very large ranges.

Parameters:

Name Type Description Default
code str

The generated Python source code.

required
**kwargs Any

Additional context (ignored).

{}

Returns:

Type Description
PolicyResult

PolicyResult with any violations or warnings found.

Source code in src/agenticapi/harness/policy/resource_policy.py
def evaluate(self, *, code: str, **kwargs: Any) -> PolicyResult:
    """Evaluate generated code for resource-intensive patterns.

    Performs basic static analysis to detect obviously expensive
    operations like deeply nested loops or very large ranges.

    Args:
        code: The generated Python source code.
        **kwargs: Additional context (ignored).

    Returns:
        PolicyResult with any violations or warnings found.
    """
    violations: list[str] = []
    warnings: list[str] = []

    self._check_loop_depth(code, violations, warnings)
    self._check_large_ranges(code, violations, warnings)
    self._check_recursive_patterns(code, warnings)

    allowed = len(violations) == 0
    return PolicyResult(
        allowed=allowed,
        violations=violations,
        warnings=warnings,
        policy_name="ResourcePolicy",
    )

RuntimePolicy

RuntimePolicy

Bases: Policy

Dynamic policy evaluation for runtime constraints.

Checks code complexity via AST node count and enforces configurable limits. Future versions will integrate with middleware for rate limiting and authentication.

Example

policy = RuntimePolicy(max_code_complexity=500) result = policy.evaluate(code="x = 1") assert result.allowed is True

Source code in src/agenticapi/harness/policy/runtime_policy.py
class RuntimePolicy(Policy):
    """Dynamic policy evaluation for runtime constraints.

    Checks code complexity via AST node count and enforces configurable
    limits. Future versions will integrate with middleware for rate
    limiting and authentication.

    Example:
        policy = RuntimePolicy(max_code_complexity=500)
        result = policy.evaluate(code="x = 1")
        assert result.allowed is True
    """

    max_code_complexity: int = Field(
        default=500,
        ge=1,
        description="Maximum AST node count (proxy for code complexity).",
    )
    max_code_lines: int = Field(
        default=500,
        ge=1,
        description="Maximum number of lines in generated code.",
    )

    def evaluate(
        self,
        *,
        code: str,
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """Evaluate runtime constraints on the generated code.

        Checks:
        - Code complexity via AST node count
        - Code length (line count)

        Args:
            code: The generated Python source code.
            intent_action: The classified action type.
            intent_domain: The domain of the request.
            **kwargs: Additional context.

        Returns:
            PolicyResult indicating whether the code passes runtime checks.
        """
        violations: list[str] = []
        warnings: list[str] = []

        # Check line count
        line_count = code.count("\n") + 1
        if line_count > self.max_code_lines:
            violations.append(f"Code has {line_count} lines, exceeds maximum of {self.max_code_lines}")

        # Check complexity via AST node count
        try:
            tree = ast.parse(code)
            node_count = sum(1 for _ in ast.walk(tree))
            if node_count > self.max_code_complexity:
                violations.append(
                    f"Code complexity ({node_count} AST nodes) exceeds maximum of {self.max_code_complexity}"
                )
            elif node_count > self.max_code_complexity * 0.8:
                warnings.append(
                    f"Code complexity ({node_count} AST nodes) approaching limit of {self.max_code_complexity}"
                )
        except SyntaxError:
            violations.append("Code contains syntax errors and cannot be analyzed")

        allowed = len(violations) == 0

        if not allowed:
            logger.warning(
                "runtime_policy_violation",
                violations=violations,
                line_count=line_count,
            )

        return PolicyResult(
            allowed=allowed,
            violations=violations,
            warnings=warnings,
            policy_name="RuntimePolicy",
        )

evaluate

evaluate(
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

Evaluate runtime constraints on the generated code.

Checks: - Code complexity via AST node count - Code length (line count)

Parameters:

Name Type Description Default
code str

The generated Python source code.

required
intent_action str

The classified action type.

''
intent_domain str

The domain of the request.

''
**kwargs Any

Additional context.

{}

Returns:

Type Description
PolicyResult

PolicyResult indicating whether the code passes runtime checks.

Source code in src/agenticapi/harness/policy/runtime_policy.py
def evaluate(
    self,
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """Evaluate runtime constraints on the generated code.

    Checks:
    - Code complexity via AST node count
    - Code length (line count)

    Args:
        code: The generated Python source code.
        intent_action: The classified action type.
        intent_domain: The domain of the request.
        **kwargs: Additional context.

    Returns:
        PolicyResult indicating whether the code passes runtime checks.
    """
    violations: list[str] = []
    warnings: list[str] = []

    # Check line count
    line_count = code.count("\n") + 1
    if line_count > self.max_code_lines:
        violations.append(f"Code has {line_count} lines, exceeds maximum of {self.max_code_lines}")

    # Check complexity via AST node count
    try:
        tree = ast.parse(code)
        node_count = sum(1 for _ in ast.walk(tree))
        if node_count > self.max_code_complexity:
            violations.append(
                f"Code complexity ({node_count} AST nodes) exceeds maximum of {self.max_code_complexity}"
            )
        elif node_count > self.max_code_complexity * 0.8:
            warnings.append(
                f"Code complexity ({node_count} AST nodes) approaching limit of {self.max_code_complexity}"
            )
    except SyntaxError:
        violations.append("Code contains syntax errors and cannot be analyzed")

    allowed = len(violations) == 0

    if not allowed:
        logger.warning(
            "runtime_policy_violation",
            violations=violations,
            line_count=line_count,
        )

    return PolicyResult(
        allowed=allowed,
        violations=violations,
        warnings=warnings,
        policy_name="RuntimePolicy",
    )

BudgetPolicy

Cost-governance primitive with per-request, per-session, per-user-per-day, and per-endpoint-per-day scopes.

In the current implementation, the real budget logic lives in estimate_and_enforce(...) and record_actual(...). See the guide for the current explicit integration pattern.

See the Cost Budgeting guide for usage patterns.

BudgetPolicy

Bases: Policy

Cost-budget enforcement policy.

Composes with the rest of the harness via :class:PolicyEvaluator, but its real entry point is :meth:estimate_and_enforce, which the framework calls before the LLM call fires. The standard :meth:Policy.evaluate hook is also implemented (returning a no-op result) so existing harness pipelines that pass code-only policies through PolicyEvaluator continue to compose.

Example

from agenticapi import BudgetPolicy from agenticapi.harness.policy.pricing import PricingRegistry

pricing = PricingRegistry.default() budget = BudgetPolicy( pricing=pricing, max_per_request_usd=0.50, max_per_session_usd=5.00, max_per_user_per_day_usd=50.00, ) harness = HarnessEngine(policies=[budget, ...])

Source code in src/agenticapi/harness/policy/budget_policy.py
class BudgetPolicy(Policy):
    """Cost-budget enforcement policy.

    Composes with the rest of the harness via :class:`PolicyEvaluator`,
    but its real entry point is :meth:`estimate_and_enforce`, which
    the framework calls *before* the LLM call fires. The standard
    :meth:`Policy.evaluate` hook is also implemented (returning a
    no-op result) so existing harness pipelines that pass code-only
    policies through ``PolicyEvaluator`` continue to compose.

    Example:
        from agenticapi import BudgetPolicy
        from agenticapi.harness.policy.pricing import PricingRegistry

        pricing = PricingRegistry.default()
        budget = BudgetPolicy(
            pricing=pricing,
            max_per_request_usd=0.50,
            max_per_session_usd=5.00,
            max_per_user_per_day_usd=50.00,
        )
        harness = HarnessEngine(policies=[budget, ...])
    """

    model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True, extra="forbid")

    pricing: PricingRegistry
    max_per_request_usd: float | None = None
    max_per_session_usd: float | None = None
    max_per_user_per_day_usd: float | None = None
    max_per_endpoint_per_day_usd: float | None = None
    spend_store: SpendStore = Field(default_factory=InMemorySpendStore)

    # ------------------------------------------------------------------
    # Policy contract
    # ------------------------------------------------------------------

    def evaluate(
        self,
        *,
        code: str,
        intent_action: str = "",
        intent_domain: str = "",
        **kwargs: Any,
    ) -> PolicyResult:
        """No-op for code-level evaluation.

        BudgetPolicy operates on the LLM-call boundary via
        :meth:`estimate_and_enforce` and :meth:`record_actual`,
        not on the generated code itself. This stub keeps it
        composable with the existing :class:`PolicyEvaluator`.
        """
        del code, intent_action, intent_domain, kwargs
        return PolicyResult(allowed=True, policy_name="BudgetPolicy")

    # ------------------------------------------------------------------
    # Pre-call estimate + enforcement
    # ------------------------------------------------------------------

    def estimate_and_enforce(self, ctx: BudgetEvaluationContext) -> CostEstimate:
        """Estimate cost for the upcoming LLM call and enforce all budgets.

        Args:
            ctx: Per-request context with the active session/user/endpoint
                and the model + token estimates.

        Returns:
            The :class:`CostEstimate` that was computed.

        Raises:
            BudgetExceeded: If any configured budget would be breached.
        """
        cost = self.pricing.estimate_cost(
            model=ctx.model,
            input_tokens=ctx.input_tokens,
            output_tokens=ctx.max_output_tokens,
        )
        estimate = CostEstimate(
            model=ctx.model,
            estimated_input_tokens=ctx.input_tokens,
            estimated_output_tokens=ctx.max_output_tokens,
            estimated_cost_usd=cost,
        )

        # Per-request ceiling.
        if self.max_per_request_usd is not None and cost > self.max_per_request_usd:
            raise BudgetExceeded(
                scope="request",
                limit_usd=self.max_per_request_usd,
                observed_usd=cost,
                model=ctx.model,
            )

        # Per-session running total + ceiling.
        if self.max_per_session_usd is not None and ctx.session_id:
            current = self.spend_store.get("session", ctx.session_id)
            projected = current + cost
            if projected > self.max_per_session_usd:
                raise BudgetExceeded(
                    scope="session",
                    limit_usd=self.max_per_session_usd,
                    observed_usd=projected,
                    model=ctx.model,
                )

        # Per-user-per-day running total + ceiling.
        if self.max_per_user_per_day_usd is not None and ctx.user_id:
            today = datetime.now(tz=UTC).date()
            current = self.spend_store.get("user_per_day", ctx.user_id, day=today)
            projected = current + cost
            if projected > self.max_per_user_per_day_usd:
                raise BudgetExceeded(
                    scope="user_per_day",
                    limit_usd=self.max_per_user_per_day_usd,
                    observed_usd=projected,
                    model=ctx.model,
                )

        # Per-endpoint-per-day total.
        if self.max_per_endpoint_per_day_usd is not None and ctx.endpoint_name:
            today = datetime.now(tz=UTC).date()
            current = self.spend_store.get("endpoint_per_day", ctx.endpoint_name, day=today)
            projected = current + cost
            if projected > self.max_per_endpoint_per_day_usd:
                raise BudgetExceeded(
                    scope="endpoint_per_day",
                    limit_usd=self.max_per_endpoint_per_day_usd,
                    observed_usd=projected,
                    model=ctx.model,
                )

        return estimate

    # ------------------------------------------------------------------
    # Post-call reconciliation
    # ------------------------------------------------------------------

    def record_actual(
        self,
        ctx: BudgetEvaluationContext,
        *,
        actual_input_tokens: int,
        actual_output_tokens: int,
    ) -> float:
        """Record the actual spend after the LLM call returns.

        The pre-call estimate uses the worst-case ``max_output_tokens``;
        the post-call reconciliation replaces that estimate with the
        actual usage so the running totals reflect what really happened.

        Args:
            ctx: The same context used for the pre-call estimate.
            actual_input_tokens: Real input tokens reported by the LLM.
            actual_output_tokens: Real output tokens reported by the LLM.

        Returns:
            The actual cost in USD that was added to the running totals.
        """
        actual_cost = self.pricing.estimate_cost(
            model=ctx.model,
            input_tokens=actual_input_tokens,
            output_tokens=actual_output_tokens,
        )
        if ctx.session_id and self.max_per_session_usd is not None:
            self.spend_store.add("session", ctx.session_id, actual_cost)
        if ctx.user_id and self.max_per_user_per_day_usd is not None:
            today = datetime.now(tz=UTC).date()
            self.spend_store.add("user_per_day", ctx.user_id, actual_cost, day=today)
        if ctx.endpoint_name and self.max_per_endpoint_per_day_usd is not None:
            today = datetime.now(tz=UTC).date()
            self.spend_store.add("endpoint_per_day", ctx.endpoint_name, actual_cost, day=today)
        return actual_cost

    # ------------------------------------------------------------------
    # Inspection helpers (handy for /metrics integrations later)
    # ------------------------------------------------------------------

    def current_spend(self, *, scope: str, key: str, day: date | None = None) -> float:
        """Return the running spend in USD for a given scope/key."""
        return self.spend_store.get(scope, key, day=day)

evaluate

evaluate(
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult

No-op for code-level evaluation.

BudgetPolicy operates on the LLM-call boundary via :meth:estimate_and_enforce and :meth:record_actual, not on the generated code itself. This stub keeps it composable with the existing :class:PolicyEvaluator.

Source code in src/agenticapi/harness/policy/budget_policy.py
def evaluate(
    self,
    *,
    code: str,
    intent_action: str = "",
    intent_domain: str = "",
    **kwargs: Any,
) -> PolicyResult:
    """No-op for code-level evaluation.

    BudgetPolicy operates on the LLM-call boundary via
    :meth:`estimate_and_enforce` and :meth:`record_actual`,
    not on the generated code itself. This stub keeps it
    composable with the existing :class:`PolicyEvaluator`.
    """
    del code, intent_action, intent_domain, kwargs
    return PolicyResult(allowed=True, policy_name="BudgetPolicy")

estimate_and_enforce

estimate_and_enforce(
    ctx: BudgetEvaluationContext,
) -> CostEstimate

Estimate cost for the upcoming LLM call and enforce all budgets.

Parameters:

Name Type Description Default
ctx BudgetEvaluationContext

Per-request context with the active session/user/endpoint and the model + token estimates.

required

Returns:

Name Type Description
The CostEstimate

class:CostEstimate that was computed.

Raises:

Type Description
BudgetExceeded

If any configured budget would be breached.

Source code in src/agenticapi/harness/policy/budget_policy.py
def estimate_and_enforce(self, ctx: BudgetEvaluationContext) -> CostEstimate:
    """Estimate cost for the upcoming LLM call and enforce all budgets.

    Args:
        ctx: Per-request context with the active session/user/endpoint
            and the model + token estimates.

    Returns:
        The :class:`CostEstimate` that was computed.

    Raises:
        BudgetExceeded: If any configured budget would be breached.
    """
    cost = self.pricing.estimate_cost(
        model=ctx.model,
        input_tokens=ctx.input_tokens,
        output_tokens=ctx.max_output_tokens,
    )
    estimate = CostEstimate(
        model=ctx.model,
        estimated_input_tokens=ctx.input_tokens,
        estimated_output_tokens=ctx.max_output_tokens,
        estimated_cost_usd=cost,
    )

    # Per-request ceiling.
    if self.max_per_request_usd is not None and cost > self.max_per_request_usd:
        raise BudgetExceeded(
            scope="request",
            limit_usd=self.max_per_request_usd,
            observed_usd=cost,
            model=ctx.model,
        )

    # Per-session running total + ceiling.
    if self.max_per_session_usd is not None and ctx.session_id:
        current = self.spend_store.get("session", ctx.session_id)
        projected = current + cost
        if projected > self.max_per_session_usd:
            raise BudgetExceeded(
                scope="session",
                limit_usd=self.max_per_session_usd,
                observed_usd=projected,
                model=ctx.model,
            )

    # Per-user-per-day running total + ceiling.
    if self.max_per_user_per_day_usd is not None and ctx.user_id:
        today = datetime.now(tz=UTC).date()
        current = self.spend_store.get("user_per_day", ctx.user_id, day=today)
        projected = current + cost
        if projected > self.max_per_user_per_day_usd:
            raise BudgetExceeded(
                scope="user_per_day",
                limit_usd=self.max_per_user_per_day_usd,
                observed_usd=projected,
                model=ctx.model,
            )

    # Per-endpoint-per-day total.
    if self.max_per_endpoint_per_day_usd is not None and ctx.endpoint_name:
        today = datetime.now(tz=UTC).date()
        current = self.spend_store.get("endpoint_per_day", ctx.endpoint_name, day=today)
        projected = current + cost
        if projected > self.max_per_endpoint_per_day_usd:
            raise BudgetExceeded(
                scope="endpoint_per_day",
                limit_usd=self.max_per_endpoint_per_day_usd,
                observed_usd=projected,
                model=ctx.model,
            )

    return estimate

record_actual

record_actual(
    ctx: BudgetEvaluationContext,
    *,
    actual_input_tokens: int,
    actual_output_tokens: int,
) -> float

Record the actual spend after the LLM call returns.

The pre-call estimate uses the worst-case max_output_tokens; the post-call reconciliation replaces that estimate with the actual usage so the running totals reflect what really happened.

Parameters:

Name Type Description Default
ctx BudgetEvaluationContext

The same context used for the pre-call estimate.

required
actual_input_tokens int

Real input tokens reported by the LLM.

required
actual_output_tokens int

Real output tokens reported by the LLM.

required

Returns:

Type Description
float

The actual cost in USD that was added to the running totals.

Source code in src/agenticapi/harness/policy/budget_policy.py
def record_actual(
    self,
    ctx: BudgetEvaluationContext,
    *,
    actual_input_tokens: int,
    actual_output_tokens: int,
) -> float:
    """Record the actual spend after the LLM call returns.

    The pre-call estimate uses the worst-case ``max_output_tokens``;
    the post-call reconciliation replaces that estimate with the
    actual usage so the running totals reflect what really happened.

    Args:
        ctx: The same context used for the pre-call estimate.
        actual_input_tokens: Real input tokens reported by the LLM.
        actual_output_tokens: Real output tokens reported by the LLM.

    Returns:
        The actual cost in USD that was added to the running totals.
    """
    actual_cost = self.pricing.estimate_cost(
        model=ctx.model,
        input_tokens=actual_input_tokens,
        output_tokens=actual_output_tokens,
    )
    if ctx.session_id and self.max_per_session_usd is not None:
        self.spend_store.add("session", ctx.session_id, actual_cost)
    if ctx.user_id and self.max_per_user_per_day_usd is not None:
        today = datetime.now(tz=UTC).date()
        self.spend_store.add("user_per_day", ctx.user_id, actual_cost, day=today)
    if ctx.endpoint_name and self.max_per_endpoint_per_day_usd is not None:
        today = datetime.now(tz=UTC).date()
        self.spend_store.add("endpoint_per_day", ctx.endpoint_name, actual_cost, day=today)
    return actual_cost

current_spend

current_spend(
    *, scope: str, key: str, day: date | None = None
) -> float

Return the running spend in USD for a given scope/key.

Source code in src/agenticapi/harness/policy/budget_policy.py
def current_spend(self, *, scope: str, key: str, day: date | None = None) -> float:
    """Return the running spend in USD for a given scope/key."""
    return self.spend_store.get(scope, key, day=day)

BudgetEvaluationContext dataclass

Per-request context that callers pass into :meth:BudgetPolicy.estimate_and_enforce.

Attributes:

Name Type Description
endpoint_name str

Endpoint receiving the call.

session_id str | None

Optional session identifier.

user_id str | None

Optional authenticated user identifier.

model str

LLM model identifier.

input_tokens int

Estimated prompt token count.

max_output_tokens int

Cap on output tokens (used for the estimate).

Source code in src/agenticapi/harness/policy/budget_policy.py
@dataclass(slots=True)
class BudgetEvaluationContext:
    """Per-request context that callers pass into :meth:`BudgetPolicy.estimate_and_enforce`.

    Attributes:
        endpoint_name: Endpoint receiving the call.
        session_id: Optional session identifier.
        user_id: Optional authenticated user identifier.
        model: LLM model identifier.
        input_tokens: Estimated prompt token count.
        max_output_tokens: Cap on output tokens (used for the estimate).
    """

    endpoint_name: str
    session_id: str | None
    user_id: str | None
    model: str
    input_tokens: int
    max_output_tokens: int = 1024

CostEstimate dataclass

Result of a pre-call cost estimate.

Attributes:

Name Type Description
model str

Model the estimate was made for.

estimated_input_tokens int

Token count fed into the estimate.

estimated_output_tokens int

Worst-case output token count.

estimated_cost_usd float

Computed worst-case cost.

Source code in src/agenticapi/harness/policy/budget_policy.py
@dataclass(frozen=True, slots=True)
class CostEstimate:
    """Result of a pre-call cost estimate.

    Attributes:
        model: Model the estimate was made for.
        estimated_input_tokens: Token count fed into the estimate.
        estimated_output_tokens: Worst-case output token count.
        estimated_cost_usd: Computed worst-case cost.
    """

    model: str
    estimated_input_tokens: int
    estimated_output_tokens: int
    estimated_cost_usd: float

SpendStore

Bases: Protocol

Protocol for the running-spend tracker.

Implementations may be in-memory, Redis-backed, database-backed, or anything else. The protocol is intentionally tiny so that high-traffic deployments can swap in a sharded backend without touching the policy.

Source code in src/agenticapi/harness/policy/budget_policy.py
@runtime_checkable
class SpendStore(Protocol):
    """Protocol for the running-spend tracker.

    Implementations may be in-memory, Redis-backed, database-backed,
    or anything else. The protocol is intentionally tiny so that
    high-traffic deployments can swap in a sharded backend without
    touching the policy.
    """

    def get(self, scope: str, key: str, *, day: date | None = None) -> float:
        """Return the running total in USD for the given scope/key/day."""
        ...

    def add(self, scope: str, key: str, amount_usd: float, *, day: date | None = None) -> None:
        """Atomically add ``amount_usd`` to the running total."""
        ...

    def reset(self, scope: str, key: str | None = None) -> None:
        """Forget recorded spend (for testing or manual rollover)."""
        ...

get

get(
    scope: str, key: str, *, day: date | None = None
) -> float

Return the running total in USD for the given scope/key/day.

Source code in src/agenticapi/harness/policy/budget_policy.py
def get(self, scope: str, key: str, *, day: date | None = None) -> float:
    """Return the running total in USD for the given scope/key/day."""
    ...

add

add(
    scope: str,
    key: str,
    amount_usd: float,
    *,
    day: date | None = None,
) -> None

Atomically add amount_usd to the running total.

Source code in src/agenticapi/harness/policy/budget_policy.py
def add(self, scope: str, key: str, amount_usd: float, *, day: date | None = None) -> None:
    """Atomically add ``amount_usd`` to the running total."""
    ...

reset

reset(scope: str, key: str | None = None) -> None

Forget recorded spend (for testing or manual rollover).

Source code in src/agenticapi/harness/policy/budget_policy.py
def reset(self, scope: str, key: str | None = None) -> None:
    """Forget recorded spend (for testing or manual rollover)."""
    ...

InMemorySpendStore

Process-local :class:SpendStore keyed by scope/key/day.

Day-scoped totals key off (scope, key, isoformat(day)) so the same store cleanly handles per-day budgets without rollover code. Other scopes ignore the day component.

Source code in src/agenticapi/harness/policy/budget_policy.py
class InMemorySpendStore:
    """Process-local :class:`SpendStore` keyed by scope/key/day.

    Day-scoped totals key off ``(scope, key, isoformat(day))`` so the
    same store cleanly handles per-day budgets without rollover code.
    Other scopes ignore the day component.
    """

    def __init__(self) -> None:
        self._totals: dict[tuple[str, str, str | None], float] = defaultdict(float)

    @staticmethod
    def _key(scope: str, key: str, day: date | None) -> tuple[str, str, str | None]:
        if scope == "user_per_day":
            d = (day or datetime.now(tz=UTC).date()).isoformat()
            return (scope, key, d)
        return (scope, key, None)

    def get(self, scope: str, key: str, *, day: date | None = None) -> float:
        return self._totals.get(self._key(scope, key, day), 0.0)

    def add(self, scope: str, key: str, amount_usd: float, *, day: date | None = None) -> None:
        self._totals[self._key(scope, key, day)] += amount_usd

    def reset(self, scope: str, key: str | None = None) -> None:
        if key is None:
            for k in [k for k in self._totals if k[0] == scope]:
                del self._totals[k]
        else:
            for k in [k for k in self._totals if k[0] == scope and k[1] == key]:
                del self._totals[k]

PricingRegistry

Per-1k-token pricing table with a factory that ships the April 2026 public-price snapshot. Accepts overrides for custom or fine-tuned models.

PricingRegistry

Mutable registry of model → pricing.

Example

pricing = PricingRegistry.default()

Override with negotiated contract pricing:

pricing.set("claude-sonnet-4-6", input_usd_per_1k=2.40, output_usd_per_1k=12.00) cost = pricing.estimate_cost( model="claude-sonnet-4-6", input_tokens=1500, output_tokens=400, )

cost == (1500 * 2.40 + 400 * 12.00) / 1000 == $8.40

Source code in src/agenticapi/harness/policy/pricing.py
class PricingRegistry:
    """Mutable registry of model → pricing.

    Example:
        pricing = PricingRegistry.default()
        # Override with negotiated contract pricing:
        pricing.set("claude-sonnet-4-6", input_usd_per_1k=2.40, output_usd_per_1k=12.00)
        cost = pricing.estimate_cost(
            model="claude-sonnet-4-6",
            input_tokens=1500,
            output_tokens=400,
        )
        # cost == (1500 * 2.40 + 400 * 12.00) / 1000 == $8.40
    """

    def __init__(self, prices: dict[str, ModelPricing] | None = None) -> None:
        """Initialize the registry.

        Args:
            prices: Optional initial pricing map. If omitted, the
                registry starts empty (use :meth:`default` for the
                shipped snapshot).
        """
        self._prices: dict[str, ModelPricing] = dict(prices or {})

    @classmethod
    def default(cls) -> PricingRegistry:
        """Return a registry pre-populated with the shipped price snapshot."""
        return cls(prices=dict(_DEFAULT_PRICES))

    def set(
        self,
        model: str,
        *,
        input_usd_per_1k: float,
        output_usd_per_1k: float,
        cache_read_usd_per_1k: float | None = None,
        cache_write_usd_per_1k: float | None = None,
    ) -> None:
        """Register or override pricing for a single model.

        Args:
            model: Model identifier as reported by the LLM backend.
            input_usd_per_1k: USD per 1 000 input tokens.
            output_usd_per_1k: USD per 1 000 output tokens.
            cache_read_usd_per_1k: Optional cache-read price.
            cache_write_usd_per_1k: Optional cache-write price.
        """
        self._prices[model] = ModelPricing(
            input_usd_per_1k=input_usd_per_1k,
            output_usd_per_1k=output_usd_per_1k,
            cache_read_usd_per_1k=cache_read_usd_per_1k,
            cache_write_usd_per_1k=cache_write_usd_per_1k,
        )

    def get(self, model: str) -> ModelPricing | None:
        """Return the pricing entry for ``model`` if known, else ``None``."""
        return self._prices.get(model)

    def estimate_cost(
        self,
        *,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cache_read_tokens: int = 0,
        cache_write_tokens: int = 0,
    ) -> float:
        """Estimate USD cost for a single LLM call.

        Unknown models cost 0.0 (with a warning) so the framework
        degrades gracefully on a fresh model rather than raising — a
        production deployment can opt into strict mode by checking
        ``get(model) is None`` first.

        Args:
            model: Model identifier.
            input_tokens: Number of prompt tokens.
            output_tokens: Number of completion tokens.
            cache_read_tokens: Optional cache-read token count.
            cache_write_tokens: Optional cache-write token count.

        Returns:
            Estimated cost in USD.
        """
        pricing = self._prices.get(model)
        if pricing is None:
            logger.warning("pricing_unknown_model", model=model)
            return 0.0

        cost = (input_tokens * pricing.input_usd_per_1k) / 1000.0
        cost += (output_tokens * pricing.output_usd_per_1k) / 1000.0
        if cache_read_tokens:
            rate = pricing.cache_read_usd_per_1k or pricing.input_usd_per_1k
            cost += (cache_read_tokens * rate) / 1000.0
        if cache_write_tokens:
            rate = pricing.cache_write_usd_per_1k or pricing.input_usd_per_1k
            cost += (cache_write_tokens * rate) / 1000.0
        return cost

    def known_models(self) -> list[str]:
        """Return a sorted list of model identifiers known to this registry."""
        return sorted(self._prices.keys())

    def __contains__(self, model: str) -> bool:
        return model in self._prices

    def __len__(self) -> int:
        return len(self._prices)

__init__

__init__(
    prices: dict[str, ModelPricing] | None = None,
) -> None

Initialize the registry.

Parameters:

Name Type Description Default
prices dict[str, ModelPricing] | None

Optional initial pricing map. If omitted, the registry starts empty (use :meth:default for the shipped snapshot).

None
Source code in src/agenticapi/harness/policy/pricing.py
def __init__(self, prices: dict[str, ModelPricing] | None = None) -> None:
    """Initialize the registry.

    Args:
        prices: Optional initial pricing map. If omitted, the
            registry starts empty (use :meth:`default` for the
            shipped snapshot).
    """
    self._prices: dict[str, ModelPricing] = dict(prices or {})

default classmethod

default() -> PricingRegistry

Return a registry pre-populated with the shipped price snapshot.

Source code in src/agenticapi/harness/policy/pricing.py
@classmethod
def default(cls) -> PricingRegistry:
    """Return a registry pre-populated with the shipped price snapshot."""
    return cls(prices=dict(_DEFAULT_PRICES))

set

set(
    model: str,
    *,
    input_usd_per_1k: float,
    output_usd_per_1k: float,
    cache_read_usd_per_1k: float | None = None,
    cache_write_usd_per_1k: float | None = None,
) -> None

Register or override pricing for a single model.

Parameters:

Name Type Description Default
model str

Model identifier as reported by the LLM backend.

required
input_usd_per_1k float

USD per 1 000 input tokens.

required
output_usd_per_1k float

USD per 1 000 output tokens.

required
cache_read_usd_per_1k float | None

Optional cache-read price.

None
cache_write_usd_per_1k float | None

Optional cache-write price.

None
Source code in src/agenticapi/harness/policy/pricing.py
def set(
    self,
    model: str,
    *,
    input_usd_per_1k: float,
    output_usd_per_1k: float,
    cache_read_usd_per_1k: float | None = None,
    cache_write_usd_per_1k: float | None = None,
) -> None:
    """Register or override pricing for a single model.

    Args:
        model: Model identifier as reported by the LLM backend.
        input_usd_per_1k: USD per 1 000 input tokens.
        output_usd_per_1k: USD per 1 000 output tokens.
        cache_read_usd_per_1k: Optional cache-read price.
        cache_write_usd_per_1k: Optional cache-write price.
    """
    self._prices[model] = ModelPricing(
        input_usd_per_1k=input_usd_per_1k,
        output_usd_per_1k=output_usd_per_1k,
        cache_read_usd_per_1k=cache_read_usd_per_1k,
        cache_write_usd_per_1k=cache_write_usd_per_1k,
    )

get

get(model: str) -> ModelPricing | None

Return the pricing entry for model if known, else None.

Source code in src/agenticapi/harness/policy/pricing.py
def get(self, model: str) -> ModelPricing | None:
    """Return the pricing entry for ``model`` if known, else ``None``."""
    return self._prices.get(model)

estimate_cost

estimate_cost(
    *,
    model: str,
    input_tokens: int,
    output_tokens: int,
    cache_read_tokens: int = 0,
    cache_write_tokens: int = 0,
) -> float

Estimate USD cost for a single LLM call.

Unknown models cost 0.0 (with a warning) so the framework degrades gracefully on a fresh model rather than raising — a production deployment can opt into strict mode by checking get(model) is None first.

Parameters:

Name Type Description Default
model str

Model identifier.

required
input_tokens int

Number of prompt tokens.

required
output_tokens int

Number of completion tokens.

required
cache_read_tokens int

Optional cache-read token count.

0
cache_write_tokens int

Optional cache-write token count.

0

Returns:

Type Description
float

Estimated cost in USD.

Source code in src/agenticapi/harness/policy/pricing.py
def estimate_cost(
    self,
    *,
    model: str,
    input_tokens: int,
    output_tokens: int,
    cache_read_tokens: int = 0,
    cache_write_tokens: int = 0,
) -> float:
    """Estimate USD cost for a single LLM call.

    Unknown models cost 0.0 (with a warning) so the framework
    degrades gracefully on a fresh model rather than raising — a
    production deployment can opt into strict mode by checking
    ``get(model) is None`` first.

    Args:
        model: Model identifier.
        input_tokens: Number of prompt tokens.
        output_tokens: Number of completion tokens.
        cache_read_tokens: Optional cache-read token count.
        cache_write_tokens: Optional cache-write token count.

    Returns:
        Estimated cost in USD.
    """
    pricing = self._prices.get(model)
    if pricing is None:
        logger.warning("pricing_unknown_model", model=model)
        return 0.0

    cost = (input_tokens * pricing.input_usd_per_1k) / 1000.0
    cost += (output_tokens * pricing.output_usd_per_1k) / 1000.0
    if cache_read_tokens:
        rate = pricing.cache_read_usd_per_1k or pricing.input_usd_per_1k
        cost += (cache_read_tokens * rate) / 1000.0
    if cache_write_tokens:
        rate = pricing.cache_write_usd_per_1k or pricing.input_usd_per_1k
        cost += (cache_write_tokens * rate) / 1000.0
    return cost

known_models

known_models() -> list[str]

Return a sorted list of model identifiers known to this registry.

Source code in src/agenticapi/harness/policy/pricing.py
def known_models(self) -> list[str]:
    """Return a sorted list of model identifiers known to this registry."""
    return sorted(self._prices.keys())

ModelPricing dataclass

Per-1k-token pricing for a single LLM model.

Attributes:

Name Type Description
input_usd_per_1k float

Cost per 1 000 prompt (input) tokens.

output_usd_per_1k float

Cost per 1 000 completion (output) tokens.

cache_read_usd_per_1k float | None

Optional cost per 1 000 cache-read tokens. When None, treated as equal to input_usd_per_1k.

cache_write_usd_per_1k float | None

Optional cost per 1 000 cache-write tokens. When None, treated as equal to input_usd_per_1k.

Source code in src/agenticapi/harness/policy/pricing.py
@dataclass(frozen=True, slots=True)
class ModelPricing:
    """Per-1k-token pricing for a single LLM model.

    Attributes:
        input_usd_per_1k: Cost per 1 000 prompt (input) tokens.
        output_usd_per_1k: Cost per 1 000 completion (output) tokens.
        cache_read_usd_per_1k: Optional cost per 1 000 cache-read
            tokens. When None, treated as equal to input_usd_per_1k.
        cache_write_usd_per_1k: Optional cost per 1 000 cache-write
            tokens. When None, treated as equal to input_usd_per_1k.
    """

    input_usd_per_1k: float
    output_usd_per_1k: float
    cache_read_usd_per_1k: float | None = None
    cache_write_usd_per_1k: float | None = None