Files
claw-code/src/query_engine.py

292 lines
13 KiB
Python
Raw Normal View History

from __future__ import annotations
import json
fix: #164 Stage A — cooperative cancellation via cancel_event in submit_message Closes the #161 follow-up gap identified in review: wall-clock timeout bounded caller-facing wait but did not cancel the underlying provider thread, which could silently mutate mutable_messages / transcript_store / permission_denials / total_usage after the caller had already observed stop_reason='timeout'. A ghost turn committed post-deadline would poison any session that got persisted afterwards. Stage A scope (this commit): runtime + engine layer cooperative cancel. Engine layer (src/query_engine.py): - submit_message now accepts cancel_event: threading.Event | None = None - Two safe checkpoints: 1. Entry (before max_turns / budget projection) — earliest possible return 2. Post-budget (after output synthesis, before mutation) — catches cancel that arrives while output was being computed - Both checkpoints return stop_reason='cancelled' with state UNCHANGED (mutable_messages, transcript_store, permission_denials, total_usage all preserved exactly as on entry) - cancel_event=None preserves legacy behaviour with zero overhead (no checkpoint checks at all) Runtime layer (src/runtime.py): - run_turn_loop creates one cancel_event per invocation when a deadline is in play (and None otherwise, preserving legacy fast path) - Passes the same event to every submit_message call across turns, so a late cancel on turn N-1 affects turn N - On timeout (either pre-call or mid-call), runtime explicitly calls cancel_event.set() before future.cancel() + synthesizing the timeout TurnResult. This upgrades #161's best-effort future.cancel() (which only cancels not-yet-started futures) to cooperative mid-flight cancel. Stop reason taxonomy after Stage A: 'completed' — turn committed, state mutated exactly once 'max_budget_reached' — overflow, state unchanged (#162) 'max_turns_reached' — capacity exceeded, state unchanged 'cancelled' — cancel_event observed, state unchanged (#164 Stage A) 'timeout' — synthesised by runtime, not engine (#161) The 'cancelled' vs 'timeout' split matters: - 'timeout' is the runtime's best-effort signal to the caller: deadline hit - 'cancelled' is the engine's confirmation: cancel was observed + honoured If the provider call wedges entirely (never reaches a checkpoint), the caller still sees 'timeout' and the thread is leaked — but any NEXT submit_message call on the same engine observes the event at entry and returns 'cancelled' immediately, preventing ghost-turn accumulation. This is the honest cooperative limit in Python threading land; true preemption requires async-native provider IO (future work, not Stage A). Tests (29 new tests, tests/test_submit_message_cancellation.py + tests/ test_run_turn_loop_cancellation.py): Engine-layer (12 tests): - TestCancellationBeforeCall (5): pre-set event returns 'cancelled' immediately; mutable_messages, transcript_store, usage, permission_denials all preserved - TestCancellationAfterBudgetCheck (1): cancel set mid-call (after projection, before commit) still honoured; output synthesised but state untouched - TestCancellationAfterCommit (2): post-commit cancel not observable (honest limit) BUT next call on same engine observes it + returns 'cancelled' - TestLegacyCallersUnchanged (3): cancel_event=None preserves #162 atomicity + max_turns contract with zero behaviour change - TestCancellationVsOtherStopReasons (2): cancel precedes max_turns check; cancel does not retroactively override a completed turn Runtime-layer (5 tests): - TestTimeoutPropagatesCancelEvent (3): submit_message receives a real Event object when deadline is set; None in legacy mode; timeout actually calls event.set() so in-flight threads observe at their next checkpoint - TestCancelEventSharedAcrossTurns (1): same event object passed to every turn (object identity check) — late cancel on turn N-1 must affect turn N Regression: 3 existing timeout test mocks updated to accept cancel_event kwarg (mocks that previously had signature (prompt, commands, tools, denials) now have (prompt, commands, tools, denials, cancel_event=None) since runtime passes cancel_event positionally on the timeout path). Full suite: 97 → 114 passing, zero regression. Closes ROADMAP #164 Stage A. What's explicitly NOT in Stage A: - Preemptive cancellation of wedged provider IO (requires asyncio-native provider path; larger refactor) - Timeout on the legacy unbounded run_turn_loop path (by design: legacy callers opt out of cancellation entirely) - CLI exposure of 'cancelled' as a distinct exit code (currently 'cancelled' maps to the same stop_reason != 'completed' break condition as others; CLI surface for cancel is a separate pinpoint if warranted)
2026-04-22 18:14:14 +09:00
import threading
from dataclasses import dataclass, field
from uuid import uuid4
from .commands import build_command_backlog
from .models import PermissionDenial, UsageSummary
from .port_manifest import PortManifest, build_port_manifest
from .session_store import StoredSession, load_session, save_session
from .tools import build_tool_backlog
from .transcript import TranscriptStore
@dataclass(frozen=True)
class QueryEngineConfig:
max_turns: int = 8
max_budget_tokens: int = 2000
compact_after_turns: int = 12
structured_output: bool = False
structured_retry_limit: int = 2
@dataclass(frozen=True)
class TurnResult:
prompt: str
output: str
matched_commands: tuple[str, ...]
matched_tools: tuple[str, ...]
permission_denials: tuple[PermissionDenial, ...]
usage: UsageSummary
stop_reason: str
cancel_observed: bool = False
@dataclass
class QueryEnginePort:
manifest: PortManifest
config: QueryEngineConfig = field(default_factory=QueryEngineConfig)
session_id: str = field(default_factory=lambda: uuid4().hex)
mutable_messages: list[str] = field(default_factory=list)
permission_denials: list[PermissionDenial] = field(default_factory=list)
total_usage: UsageSummary = field(default_factory=UsageSummary)
transcript_store: TranscriptStore = field(default_factory=TranscriptStore)
@classmethod
def from_workspace(cls) -> 'QueryEnginePort':
return cls(manifest=build_port_manifest())
@classmethod
def from_saved_session(cls, session_id: str) -> 'QueryEnginePort':
stored = load_session(session_id)
transcript = TranscriptStore(entries=list(stored.messages), flushed=True)
return cls(
manifest=build_port_manifest(),
session_id=stored.session_id,
mutable_messages=list(stored.messages),
total_usage=UsageSummary(stored.input_tokens, stored.output_tokens),
transcript_store=transcript,
)
def submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
fix: #164 Stage A — cooperative cancellation via cancel_event in submit_message Closes the #161 follow-up gap identified in review: wall-clock timeout bounded caller-facing wait but did not cancel the underlying provider thread, which could silently mutate mutable_messages / transcript_store / permission_denials / total_usage after the caller had already observed stop_reason='timeout'. A ghost turn committed post-deadline would poison any session that got persisted afterwards. Stage A scope (this commit): runtime + engine layer cooperative cancel. Engine layer (src/query_engine.py): - submit_message now accepts cancel_event: threading.Event | None = None - Two safe checkpoints: 1. Entry (before max_turns / budget projection) — earliest possible return 2. Post-budget (after output synthesis, before mutation) — catches cancel that arrives while output was being computed - Both checkpoints return stop_reason='cancelled' with state UNCHANGED (mutable_messages, transcript_store, permission_denials, total_usage all preserved exactly as on entry) - cancel_event=None preserves legacy behaviour with zero overhead (no checkpoint checks at all) Runtime layer (src/runtime.py): - run_turn_loop creates one cancel_event per invocation when a deadline is in play (and None otherwise, preserving legacy fast path) - Passes the same event to every submit_message call across turns, so a late cancel on turn N-1 affects turn N - On timeout (either pre-call or mid-call), runtime explicitly calls cancel_event.set() before future.cancel() + synthesizing the timeout TurnResult. This upgrades #161's best-effort future.cancel() (which only cancels not-yet-started futures) to cooperative mid-flight cancel. Stop reason taxonomy after Stage A: 'completed' — turn committed, state mutated exactly once 'max_budget_reached' — overflow, state unchanged (#162) 'max_turns_reached' — capacity exceeded, state unchanged 'cancelled' — cancel_event observed, state unchanged (#164 Stage A) 'timeout' — synthesised by runtime, not engine (#161) The 'cancelled' vs 'timeout' split matters: - 'timeout' is the runtime's best-effort signal to the caller: deadline hit - 'cancelled' is the engine's confirmation: cancel was observed + honoured If the provider call wedges entirely (never reaches a checkpoint), the caller still sees 'timeout' and the thread is leaked — but any NEXT submit_message call on the same engine observes the event at entry and returns 'cancelled' immediately, preventing ghost-turn accumulation. This is the honest cooperative limit in Python threading land; true preemption requires async-native provider IO (future work, not Stage A). Tests (29 new tests, tests/test_submit_message_cancellation.py + tests/ test_run_turn_loop_cancellation.py): Engine-layer (12 tests): - TestCancellationBeforeCall (5): pre-set event returns 'cancelled' immediately; mutable_messages, transcript_store, usage, permission_denials all preserved - TestCancellationAfterBudgetCheck (1): cancel set mid-call (after projection, before commit) still honoured; output synthesised but state untouched - TestCancellationAfterCommit (2): post-commit cancel not observable (honest limit) BUT next call on same engine observes it + returns 'cancelled' - TestLegacyCallersUnchanged (3): cancel_event=None preserves #162 atomicity + max_turns contract with zero behaviour change - TestCancellationVsOtherStopReasons (2): cancel precedes max_turns check; cancel does not retroactively override a completed turn Runtime-layer (5 tests): - TestTimeoutPropagatesCancelEvent (3): submit_message receives a real Event object when deadline is set; None in legacy mode; timeout actually calls event.set() so in-flight threads observe at their next checkpoint - TestCancelEventSharedAcrossTurns (1): same event object passed to every turn (object identity check) — late cancel on turn N-1 must affect turn N Regression: 3 existing timeout test mocks updated to accept cancel_event kwarg (mocks that previously had signature (prompt, commands, tools, denials) now have (prompt, commands, tools, denials, cancel_event=None) since runtime passes cancel_event positionally on the timeout path). Full suite: 97 → 114 passing, zero regression. Closes ROADMAP #164 Stage A. What's explicitly NOT in Stage A: - Preemptive cancellation of wedged provider IO (requires asyncio-native provider path; larger refactor) - Timeout on the legacy unbounded run_turn_loop path (by design: legacy callers opt out of cancellation entirely) - CLI exposure of 'cancelled' as a distinct exit code (currently 'cancelled' maps to the same stop_reason != 'completed' break condition as others; CLI surface for cancel is a separate pinpoint if warranted)
2026-04-22 18:14:14 +09:00
cancel_event: threading.Event | None = None,
) -> TurnResult:
fix: #164 Stage A — cooperative cancellation via cancel_event in submit_message Closes the #161 follow-up gap identified in review: wall-clock timeout bounded caller-facing wait but did not cancel the underlying provider thread, which could silently mutate mutable_messages / transcript_store / permission_denials / total_usage after the caller had already observed stop_reason='timeout'. A ghost turn committed post-deadline would poison any session that got persisted afterwards. Stage A scope (this commit): runtime + engine layer cooperative cancel. Engine layer (src/query_engine.py): - submit_message now accepts cancel_event: threading.Event | None = None - Two safe checkpoints: 1. Entry (before max_turns / budget projection) — earliest possible return 2. Post-budget (after output synthesis, before mutation) — catches cancel that arrives while output was being computed - Both checkpoints return stop_reason='cancelled' with state UNCHANGED (mutable_messages, transcript_store, permission_denials, total_usage all preserved exactly as on entry) - cancel_event=None preserves legacy behaviour with zero overhead (no checkpoint checks at all) Runtime layer (src/runtime.py): - run_turn_loop creates one cancel_event per invocation when a deadline is in play (and None otherwise, preserving legacy fast path) - Passes the same event to every submit_message call across turns, so a late cancel on turn N-1 affects turn N - On timeout (either pre-call or mid-call), runtime explicitly calls cancel_event.set() before future.cancel() + synthesizing the timeout TurnResult. This upgrades #161's best-effort future.cancel() (which only cancels not-yet-started futures) to cooperative mid-flight cancel. Stop reason taxonomy after Stage A: 'completed' — turn committed, state mutated exactly once 'max_budget_reached' — overflow, state unchanged (#162) 'max_turns_reached' — capacity exceeded, state unchanged 'cancelled' — cancel_event observed, state unchanged (#164 Stage A) 'timeout' — synthesised by runtime, not engine (#161) The 'cancelled' vs 'timeout' split matters: - 'timeout' is the runtime's best-effort signal to the caller: deadline hit - 'cancelled' is the engine's confirmation: cancel was observed + honoured If the provider call wedges entirely (never reaches a checkpoint), the caller still sees 'timeout' and the thread is leaked — but any NEXT submit_message call on the same engine observes the event at entry and returns 'cancelled' immediately, preventing ghost-turn accumulation. This is the honest cooperative limit in Python threading land; true preemption requires async-native provider IO (future work, not Stage A). Tests (29 new tests, tests/test_submit_message_cancellation.py + tests/ test_run_turn_loop_cancellation.py): Engine-layer (12 tests): - TestCancellationBeforeCall (5): pre-set event returns 'cancelled' immediately; mutable_messages, transcript_store, usage, permission_denials all preserved - TestCancellationAfterBudgetCheck (1): cancel set mid-call (after projection, before commit) still honoured; output synthesised but state untouched - TestCancellationAfterCommit (2): post-commit cancel not observable (honest limit) BUT next call on same engine observes it + returns 'cancelled' - TestLegacyCallersUnchanged (3): cancel_event=None preserves #162 atomicity + max_turns contract with zero behaviour change - TestCancellationVsOtherStopReasons (2): cancel precedes max_turns check; cancel does not retroactively override a completed turn Runtime-layer (5 tests): - TestTimeoutPropagatesCancelEvent (3): submit_message receives a real Event object when deadline is set; None in legacy mode; timeout actually calls event.set() so in-flight threads observe at their next checkpoint - TestCancelEventSharedAcrossTurns (1): same event object passed to every turn (object identity check) — late cancel on turn N-1 must affect turn N Regression: 3 existing timeout test mocks updated to accept cancel_event kwarg (mocks that previously had signature (prompt, commands, tools, denials) now have (prompt, commands, tools, denials, cancel_event=None) since runtime passes cancel_event positionally on the timeout path). Full suite: 97 → 114 passing, zero regression. Closes ROADMAP #164 Stage A. What's explicitly NOT in Stage A: - Preemptive cancellation of wedged provider IO (requires asyncio-native provider path; larger refactor) - Timeout on the legacy unbounded run_turn_loop path (by design: legacy callers opt out of cancellation entirely) - CLI exposure of 'cancelled' as a distinct exit code (currently 'cancelled' maps to the same stop_reason != 'completed' break condition as others; CLI surface for cancel is a separate pinpoint if warranted)
2026-04-22 18:14:14 +09:00
"""Submit a prompt and return a TurnResult.
#164 Stage A: cooperative cancellation via cancel_event.
The cancel_event argument (added for #164) lets a caller request early
termination at a safe point. When set before the pre-mutation commit
stage, submit_message returns early with ``stop_reason='cancelled'``
and the engine's state (mutable_messages, transcript_store,
permission_denials, total_usage) is left **exactly as it was on
entry**. This closes the #161 follow-up gap: before this change, a
wedged provider thread could finish executing and silently mutate
state after the caller had already observed ``stop_reason='timeout'``,
giving the session a ghost turn the caller never acknowledged.
Contract:
- cancel_event is None (default) legacy behaviour, no checks.
- cancel_event set **before** budget check returns 'cancelled'
immediately; no output synthesis, no projection, no mutation.
- cancel_event set **between** budget check and commit returns
'cancelled' with state intact.
- cancel_event set **after** commit not observable; the turn is
already committed and the caller sees 'completed'. Cancellation
is a *safe point* mechanism, not preemption. This is the honest
limit of cooperative cancellation in Python threading land.
Stop reason taxonomy after #164 Stage A:
- 'completed' turn committed, state mutated exactly once
- 'max_budget_reached' overflow, state unchanged (#162)
- 'max_turns_reached' capacity exceeded, state unchanged
- 'cancelled' cancel_event observed, state unchanged
- 'timeout' synthesised by runtime, not engine (#161)
Callers that care about deadline-driven cancellation (run_turn_loop)
can now request cleanup by setting the event on timeout the next
submit_message on the same engine will observe it at the start and
return 'cancelled' without touching state, even if the previous call
is still wedged in provider IO.
"""
# #164 Stage A: earliest safe cancellation point. No output synthesis,
# no budget projection, no mutation — just an immediate clean return.
if cancel_event is not None and cancel_event.is_set():
return TurnResult(
prompt=prompt,
output='',
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage, # unchanged
stop_reason='cancelled',
)
if len(self.mutable_messages) >= self.config.max_turns:
output = f'Max turns reached before processing prompt: {prompt}'
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
stop_reason='max_turns_reached',
)
summary_lines = [
f'Prompt: {prompt}',
f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
f'Permission denials: {len(denied_tools)}',
]
output = self._format_output(summary_lines)
projected_usage = self.total_usage.add_turn(prompt, output)
fix: #162 — budget-overflow no longer corrupts session state in submit_message Previously, QueryEnginePort.submit_message() checked the token budget AFTER appending the prompt to mutable_messages, transcript_store, and permission_denials, and AFTER calling compact_messages_if_needed(). On overflow it set stop_reason='max_budget_reached' but the overflow turn was already committed. Any caller that persisted the session afterwards wrote the rejected prompt to disk — the session was silently poisoned even though the TurnResult said the turn never completed. Fix: - Restructure submit_message so the budget check early-returns BEFORE any mutation of mutable_messages, transcript_store, permission_denials, or total_usage. - The returned TurnResult.usage reflects pre-call state (overflow never advanced the usage counter). - Normal (in-budget) path unchanged: mutation happens exactly once, at the end, only on 'completed' results. This closes the atomicity gap: submit_message is now either 'turn committed' (stop_reason='completed') or 'turn rejected, state untouched' (stop_reason in {'max_budget_reached', 'max_turns_reached'}). Callers can safely retry with a fresh budget or a smaller prompt without worrying about phantom committed turns from prior rejections. Tests (tests/test_submit_message_budget.py, 10 tests): - TestBudgetOverflowDoesNotMutate (5): mutable_messages / transcript / permission_denials / total_usage / TurnResult.usage all pre-mutation after overflow - TestOverflowPersistence (2): first-turn overflow persists empty session; successful-turn-then-overflow persists only the successful turn - TestEngineUsableAfterOverflow (2): subsequent in-budget call still works with no residue; repeated overflows don't accumulate hidden state - TestNormalPathStillCommits (1): regression guard — non-overflow path still commits mutable_messages/transcript/usage as expected Full suite: 59/59 passing, zero regression. Blocker: none. Closes ROADMAP #162.
2026-04-22 17:29:55 +09:00
# #162: budget check must precede mutation. Previously this block set
# stop_reason='max_budget_reached' but still appended the overflow turn
# to mutable_messages / transcript_store / permission_denials, corrupting
# the session for any caller that persisted it afterwards. The overflow
# prompt was effectively committed even though the TurnResult signalled
# rejection. Now we early-return with pre-mutation state intact so
# callers can safely retry with a smaller prompt or a fresh budget.
if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
fix: #162 — budget-overflow no longer corrupts session state in submit_message Previously, QueryEnginePort.submit_message() checked the token budget AFTER appending the prompt to mutable_messages, transcript_store, and permission_denials, and AFTER calling compact_messages_if_needed(). On overflow it set stop_reason='max_budget_reached' but the overflow turn was already committed. Any caller that persisted the session afterwards wrote the rejected prompt to disk — the session was silently poisoned even though the TurnResult said the turn never completed. Fix: - Restructure submit_message so the budget check early-returns BEFORE any mutation of mutable_messages, transcript_store, permission_denials, or total_usage. - The returned TurnResult.usage reflects pre-call state (overflow never advanced the usage counter). - Normal (in-budget) path unchanged: mutation happens exactly once, at the end, only on 'completed' results. This closes the atomicity gap: submit_message is now either 'turn committed' (stop_reason='completed') or 'turn rejected, state untouched' (stop_reason in {'max_budget_reached', 'max_turns_reached'}). Callers can safely retry with a fresh budget or a smaller prompt without worrying about phantom committed turns from prior rejections. Tests (tests/test_submit_message_budget.py, 10 tests): - TestBudgetOverflowDoesNotMutate (5): mutable_messages / transcript / permission_denials / total_usage / TurnResult.usage all pre-mutation after overflow - TestOverflowPersistence (2): first-turn overflow persists empty session; successful-turn-then-overflow persists only the successful turn - TestEngineUsableAfterOverflow (2): subsequent in-budget call still works with no residue; repeated overflows don't accumulate hidden state - TestNormalPathStillCommits (1): regression guard — non-overflow path still commits mutable_messages/transcript/usage as expected Full suite: 59/59 passing, zero regression. Blocker: none. Closes ROADMAP #162.
2026-04-22 17:29:55 +09:00
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage, # unchanged — overflow turn was rejected
stop_reason='max_budget_reached',
)
fix: #164 Stage A — cooperative cancellation via cancel_event in submit_message Closes the #161 follow-up gap identified in review: wall-clock timeout bounded caller-facing wait but did not cancel the underlying provider thread, which could silently mutate mutable_messages / transcript_store / permission_denials / total_usage after the caller had already observed stop_reason='timeout'. A ghost turn committed post-deadline would poison any session that got persisted afterwards. Stage A scope (this commit): runtime + engine layer cooperative cancel. Engine layer (src/query_engine.py): - submit_message now accepts cancel_event: threading.Event | None = None - Two safe checkpoints: 1. Entry (before max_turns / budget projection) — earliest possible return 2. Post-budget (after output synthesis, before mutation) — catches cancel that arrives while output was being computed - Both checkpoints return stop_reason='cancelled' with state UNCHANGED (mutable_messages, transcript_store, permission_denials, total_usage all preserved exactly as on entry) - cancel_event=None preserves legacy behaviour with zero overhead (no checkpoint checks at all) Runtime layer (src/runtime.py): - run_turn_loop creates one cancel_event per invocation when a deadline is in play (and None otherwise, preserving legacy fast path) - Passes the same event to every submit_message call across turns, so a late cancel on turn N-1 affects turn N - On timeout (either pre-call or mid-call), runtime explicitly calls cancel_event.set() before future.cancel() + synthesizing the timeout TurnResult. This upgrades #161's best-effort future.cancel() (which only cancels not-yet-started futures) to cooperative mid-flight cancel. Stop reason taxonomy after Stage A: 'completed' — turn committed, state mutated exactly once 'max_budget_reached' — overflow, state unchanged (#162) 'max_turns_reached' — capacity exceeded, state unchanged 'cancelled' — cancel_event observed, state unchanged (#164 Stage A) 'timeout' — synthesised by runtime, not engine (#161) The 'cancelled' vs 'timeout' split matters: - 'timeout' is the runtime's best-effort signal to the caller: deadline hit - 'cancelled' is the engine's confirmation: cancel was observed + honoured If the provider call wedges entirely (never reaches a checkpoint), the caller still sees 'timeout' and the thread is leaked — but any NEXT submit_message call on the same engine observes the event at entry and returns 'cancelled' immediately, preventing ghost-turn accumulation. This is the honest cooperative limit in Python threading land; true preemption requires async-native provider IO (future work, not Stage A). Tests (29 new tests, tests/test_submit_message_cancellation.py + tests/ test_run_turn_loop_cancellation.py): Engine-layer (12 tests): - TestCancellationBeforeCall (5): pre-set event returns 'cancelled' immediately; mutable_messages, transcript_store, usage, permission_denials all preserved - TestCancellationAfterBudgetCheck (1): cancel set mid-call (after projection, before commit) still honoured; output synthesised but state untouched - TestCancellationAfterCommit (2): post-commit cancel not observable (honest limit) BUT next call on same engine observes it + returns 'cancelled' - TestLegacyCallersUnchanged (3): cancel_event=None preserves #162 atomicity + max_turns contract with zero behaviour change - TestCancellationVsOtherStopReasons (2): cancel precedes max_turns check; cancel does not retroactively override a completed turn Runtime-layer (5 tests): - TestTimeoutPropagatesCancelEvent (3): submit_message receives a real Event object when deadline is set; None in legacy mode; timeout actually calls event.set() so in-flight threads observe at their next checkpoint - TestCancelEventSharedAcrossTurns (1): same event object passed to every turn (object identity check) — late cancel on turn N-1 must affect turn N Regression: 3 existing timeout test mocks updated to accept cancel_event kwarg (mocks that previously had signature (prompt, commands, tools, denials) now have (prompt, commands, tools, denials, cancel_event=None) since runtime passes cancel_event positionally on the timeout path). Full suite: 97 → 114 passing, zero regression. Closes ROADMAP #164 Stage A. What's explicitly NOT in Stage A: - Preemptive cancellation of wedged provider IO (requires asyncio-native provider path; larger refactor) - Timeout on the legacy unbounded run_turn_loop path (by design: legacy callers opt out of cancellation entirely) - CLI exposure of 'cancelled' as a distinct exit code (currently 'cancelled' maps to the same stop_reason != 'completed' break condition as others; CLI surface for cancel is a separate pinpoint if warranted)
2026-04-22 18:14:14 +09:00
# #164 Stage A: second safe cancellation point. Projection is done
# but nothing has been committed yet. If the caller cancelled while
# we were building output / computing budget, honour it here — still
# no mutation.
if cancel_event is not None and cancel_event.is_set():
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage, # unchanged
stop_reason='cancelled',
)
self.mutable_messages.append(prompt)
self.transcript_store.append(prompt)
self.permission_denials.extend(denied_tools)
self.total_usage = projected_usage
self.compact_messages_if_needed()
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
fix: #162 — budget-overflow no longer corrupts session state in submit_message Previously, QueryEnginePort.submit_message() checked the token budget AFTER appending the prompt to mutable_messages, transcript_store, and permission_denials, and AFTER calling compact_messages_if_needed(). On overflow it set stop_reason='max_budget_reached' but the overflow turn was already committed. Any caller that persisted the session afterwards wrote the rejected prompt to disk — the session was silently poisoned even though the TurnResult said the turn never completed. Fix: - Restructure submit_message so the budget check early-returns BEFORE any mutation of mutable_messages, transcript_store, permission_denials, or total_usage. - The returned TurnResult.usage reflects pre-call state (overflow never advanced the usage counter). - Normal (in-budget) path unchanged: mutation happens exactly once, at the end, only on 'completed' results. This closes the atomicity gap: submit_message is now either 'turn committed' (stop_reason='completed') or 'turn rejected, state untouched' (stop_reason in {'max_budget_reached', 'max_turns_reached'}). Callers can safely retry with a fresh budget or a smaller prompt without worrying about phantom committed turns from prior rejections. Tests (tests/test_submit_message_budget.py, 10 tests): - TestBudgetOverflowDoesNotMutate (5): mutable_messages / transcript / permission_denials / total_usage / TurnResult.usage all pre-mutation after overflow - TestOverflowPersistence (2): first-turn overflow persists empty session; successful-turn-then-overflow persists only the successful turn - TestEngineUsableAfterOverflow (2): subsequent in-budget call still works with no residue; repeated overflows don't accumulate hidden state - TestNormalPathStillCommits (1): regression guard — non-overflow path still commits mutable_messages/transcript/usage as expected Full suite: 59/59 passing, zero regression. Blocker: none. Closes ROADMAP #162.
2026-04-22 17:29:55 +09:00
stop_reason='completed',
)
def stream_submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
):
yield {'type': 'message_start', 'session_id': self.session_id, 'prompt': prompt}
if matched_commands:
yield {'type': 'command_match', 'commands': matched_commands}
if matched_tools:
yield {'type': 'tool_match', 'tools': matched_tools}
if denied_tools:
yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
yield {'type': 'message_delta', 'text': result.output}
yield {
'type': 'message_stop',
'usage': {'input_tokens': result.usage.input_tokens, 'output_tokens': result.usage.output_tokens},
'stop_reason': result.stop_reason,
'transcript_size': len(self.transcript_store.entries),
}
def compact_messages_if_needed(self) -> None:
if len(self.mutable_messages) > self.config.compact_after_turns:
self.mutable_messages[:] = self.mutable_messages[-self.config.compact_after_turns :]
self.transcript_store.compact(self.config.compact_after_turns)
def replay_user_messages(self) -> tuple[str, ...]:
return self.transcript_store.replay()
def flush_transcript(self) -> None:
self.transcript_store.flush()
def persist_session(self, directory: 'Path | None' = None) -> str:
"""Flush the transcript and save the session to disk.
Args:
directory: Optional override for the storage directory. When None
(default, for backward compat), uses the default location
(``.port_sessions`` in CWD). When set, passes through to
``save_session`` which already supports directory overrides.
#166: added directory parameter to match the session-lifecycle CLI
surface established by #160/#165. Claws running out-of-tree can now
redirect session creation to a workspace-specific dir without chdir.
"""
self.flush_transcript()
path = save_session(
StoredSession(
session_id=self.session_id,
messages=tuple(self.mutable_messages),
input_tokens=self.total_usage.input_tokens,
output_tokens=self.total_usage.output_tokens,
),
directory,
)
return str(path)
def _format_output(self, summary_lines: list[str]) -> str:
if self.config.structured_output:
payload = {
'summary': summary_lines,
'session_id': self.session_id,
}
return self._render_structured_output(payload)
return '\n'.join(summary_lines)
def _render_structured_output(self, payload: dict[str, object]) -> str:
last_error: Exception | None = None
for _ in range(self.config.structured_retry_limit):
try:
return json.dumps(payload, indent=2)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive branch
last_error = exc
payload = {'summary': ['structured output retry'], 'session_id': self.session_id}
raise RuntimeError('structured output rendering failed') from last_error
def render_summary(self) -> str:
command_backlog = build_command_backlog()
tool_backlog = build_tool_backlog()
sections = [
'# Python Porting Workspace Summary',
'',
self.manifest.to_markdown(),
'',
f'Command surface: {len(command_backlog.modules)} mirrored entries',
*command_backlog.summary_lines()[:10],
'',
f'Tool surface: {len(tool_backlog.modules)} mirrored entries',
*tool_backlog.summary_lines()[:10],
'',
f'Session id: {self.session_id}',
f'Conversation turns stored: {len(self.mutable_messages)}',
f'Permission denials tracked: {len(self.permission_denials)}',
f'Usage totals: in={self.total_usage.input_tokens} out={self.total_usage.output_tokens}',
f'Max turns: {self.config.max_turns}',
f'Max budget tokens: {self.config.max_budget_tokens}',
f'Transcript flushed: {self.transcript_store.flushed}',
]
return '\n'.join(sections)