X Tutup
Skip to content

fix(core): interrupt subagent tool waits on stop#5850

Open
idiotsj wants to merge 7 commits intoAstrBotDevs:masterfrom
idiotsj:fix/subagent-stop-interrupt
Open

fix(core): interrupt subagent tool waits on stop#5850
idiotsj wants to merge 7 commits intoAstrBotDevs:masterfrom
idiotsj:fix/subagent-stop-interrupt

Conversation

@idiotsj
Copy link
Contributor

@idiotsj idiotsj commented Mar 7, 2026

Fixes #5668

Summary

  • make pending tool execution waits stop-aware so /stop can interrupt subagent handoff/tool waits
  • preserve partial assistant output when interruption happens during tool execution
  • centralize runner stop state on the shared abort_signal and pass it through consistently
  • add regression tests for subagent handoff interruption and regular long-running tool interruption

Root Cause

The existing stop flow only took effect at agent loop checkpoints. When a subagent or regular tool call was awaiting a long-running async operation, the runner could not interrupt that wait until the tool returned, so /stop appeared ineffective.

Testing

  • python -m pytest d:\code\astrbot\tests\test_tool_loop_agent_runner.py -q
  • ruff format .
  • ruff check .

Sourcery 总结

使工具执行和子代理(subagent)接力对停止请求更敏感,并将中断处理集中到一个共享的中止信号上。

Bug 修复:

  • 允许 /stop 请求中断长时间运行的工具执行和子代理接力,而不是等待工具执行完成。
  • 确保在运行在工具执行期间被中止时,已生成的部分助手输出能够被保留并正确完成。

增强项:

  • 将运行器的停止状态管理集中到一个基于 asyncio 的共享中止信号上,并将其在 LLM 和工具调用链路中进行传递。
  • 将中止运行(aborted-run)的最终处理重构为一个专门的辅助方法,以统一中止响应格式和清理逻辑。
  • 为工具执行器的结果引入一个支持中断感知的迭代器包装器,以便安全地取消并关闭正在运行的工具生成器。

测试:

  • 添加回归测试,验证 stop 能中断挂起的子代理接力运行,并将子代理上下文标记为已取消。
  • 添加回归测试,验证 stop 能中断挂起的常规长时间运行工具,并将工具处理器标记为已取消。
Original summary in English

Summary by Sourcery

使工具和子代理的执行对停止请求具有感知能力,以便用户的停止请求可以中断长时间运行的操作,并干净地终止运行。

Bug Fixes:

  • 允许停止请求中断挂起的子代理交接和长时间运行的工具执行,而不是等待它们完成。
  • 确保在运行在工具执行期间被停止时,能够正确保留并最终定稿助理的部分输出。

Enhancements:

  • 将运行器的停止状态集中到一个共享的、基于 asyncio 的中止信号上,并将其传递给 LLM 和工具调用。
  • 为工具执行器的结果引入一个支持中断感知的迭代器包装器,以安全地取消并关闭正在运行的工具生成器。
  • 将中止运行的收尾逻辑重构到一个专用的 helper 中,以统一中止响应的构建和清理逻辑。

Tests:

  • 添加回归测试,以验证停止请求会中断挂起的子代理交接,并将子代理上下文标记为已取消。
  • 添加回归测试,以验证停止请求会中断挂起的常规工具,并将工具处理程序标记为已取消。
Original summary in English

Summary by Sourcery

Make tool and subagent execution stop-aware so user stop requests can interrupt long-running operations and terminate runs cleanly.

Bug Fixes:

  • Allow stop requests to interrupt pending subagent handoffs and long-running tool executions instead of waiting for them to complete.
  • Ensure partial assistant output is preserved and finalized correctly when a run is stopped during tool execution.

Enhancements:

  • Centralize runner stop state on a shared asyncio-based abort signal and pass it through to LLM and tool calls.
  • Introduce an interruption-aware iterator wrapper around tool executor results to safely cancel and close running tool generators.
  • Refactor aborted-run finalization into a dedicated helper to unify aborted response construction and cleanup logic.

Tests:

  • Add regression tests verifying stop interrupts pending subagent handoffs and marks subagent contexts as cancelled.
  • Add regression tests verifying stop interrupts pending regular tools and marks tool handlers as cancelled.
Original summary in English

Sourcery 总结

使工具执行和子代理(subagent)接力对停止请求更敏感,并将中断处理集中到一个共享的中止信号上。

Bug 修复:

  • 允许 /stop 请求中断长时间运行的工具执行和子代理接力,而不是等待工具执行完成。
  • 确保在运行在工具执行期间被中止时,已生成的部分助手输出能够被保留并正确完成。

增强项:

  • 将运行器的停止状态管理集中到一个基于 asyncio 的共享中止信号上,并将其在 LLM 和工具调用链路中进行传递。
  • 将中止运行(aborted-run)的最终处理重构为一个专门的辅助方法,以统一中止响应格式和清理逻辑。
  • 为工具执行器的结果引入一个支持中断感知的迭代器包装器,以便安全地取消并关闭正在运行的工具生成器。

测试:

  • 添加回归测试,验证 stop 能中断挂起的子代理接力运行,并将子代理上下文标记为已取消。
  • 添加回归测试,验证 stop 能中断挂起的常规长时间运行工具,并将工具处理器标记为已取消。
Original summary in English

Summary by Sourcery

使工具和子代理的执行对停止请求具有感知能力,以便用户的停止请求可以中断长时间运行的操作,并干净地终止运行。

Bug Fixes:

  • 允许停止请求中断挂起的子代理交接和长时间运行的工具执行,而不是等待它们完成。
  • 确保在运行在工具执行期间被停止时,能够正确保留并最终定稿助理的部分输出。

Enhancements:

  • 将运行器的停止状态集中到一个共享的、基于 asyncio 的中止信号上,并将其传递给 LLM 和工具调用。
  • 为工具执行器的结果引入一个支持中断感知的迭代器包装器,以安全地取消并关闭正在运行的工具生成器。
  • 将中止运行的收尾逻辑重构到一个专用的 helper 中,以统一中止响应的构建和清理逻辑。

Tests:

  • 添加回归测试,以验证停止请求会中断挂起的子代理交接,并将子代理上下文标记为已取消。
  • 添加回归测试,以验证停止请求会中断挂起的常规工具,并将工具处理程序标记为已取消。
Original summary in English

Summary by Sourcery

Make tool and subagent execution stop-aware so user stop requests can interrupt long-running operations and terminate runs cleanly.

Bug Fixes:

  • Allow stop requests to interrupt pending subagent handoffs and long-running tool executions instead of waiting for them to complete.
  • Ensure partial assistant output is preserved and finalized correctly when a run is stopped during tool execution.

Enhancements:

  • Centralize runner stop state on a shared asyncio-based abort signal and pass it through to LLM and tool calls.
  • Introduce an interruption-aware iterator wrapper around tool executor results to safely cancel and close running tool generators.
  • Refactor aborted-run finalization into a dedicated helper to unify aborted response construction and cleanup logic.

Tests:

  • Add regression tests verifying stop interrupts pending subagent handoffs and marks subagent contexts as cancelled.
  • Add regression tests verifying stop interrupts pending regular tools and marks tool handlers as cancelled.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 7, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where the agent runner could not be interrupted while a subagent or a long-running tool call was in progress. Previously, the /stop command was only effective at specific agent loop checkpoints, leading to an unresponsive experience during tool execution. The changes introduce a centralized asynchronous event for signaling abortion, allowing for immediate interruption of pending tool operations and subagent handoffs, thereby improving the responsiveness and control of the agent.

Highlights

  • Interruptible Tool Execution: Pending tool executions are now stop-aware, allowing the /stop command to interrupt subagent handoffs and long-running tool waits.
  • Partial Output Preservation: Partial assistant output is preserved when an interruption occurs during tool execution, providing better user experience.
  • Centralized Stop State: The runner's stop state is centralized using a shared abort_signal (an asyncio.Event) and consistently passed through relevant functions.
  • Regression Tests: New regression tests have been added to verify the interruption of subagent handoffs and regular long-running tools.
Changelog
  • astrbot/core/agent/runners/tool_loop_agent_runner.py
    • Imported contextlib.suppress for cleaner error handling.
    • Defined _ToolExecutionInterrupted exception for specific interruption scenarios.
    • Introduced DEFAULT_TOOL_EXECUTOR_POLL_INTERVAL for controlling tool execution polling frequency.
    • Replaced the boolean _stop_requested flag with an asyncio.Event named _abort_signal to manage stop requests.
    • Added _tool_executor_poll_interval to ToolLoopAgentRunner initialization, configurable via kwargs.
    • Passed the _abort_signal to _iter_llm_responses and _resolve_tool_exec to propagate the stop signal.
    • Updated conditional checks from _stop_requested to a new _is_stop_requested() method.
    • Refactored the logic for finalizing an aborted step into a new asynchronous method _finalize_aborted_step.
    • Wrapped the _handle_function_tools iteration in a try...except _ToolExecutionInterrupted block to catch interruptions.
    • Added _is_stop_requested() checks within _handle_function_tools to raise _ToolExecutionInterrupted before processing new tool calls.
    • Modified tool execution loop to use _iter_tool_executor_results for interruptible iteration.
    • Added _ToolExecutionInterrupted to exception handling in _handle_function_tools to re-raise the interruption.
    • Implemented request_stop to set the _abort_signal event.
    • Added _is_stop_requested method to check the state of the _abort_signal.
    • Introduced _cancel_tool_iteration to gracefully cancel pending tool tasks and close executors upon interruption.
    • Created _iter_tool_executor_results as an asynchronous generator to poll for stop requests during tool execution and manage task cancellation.
  • tests/test_tool_loop_agent_runner.py
    • Added imports for asyncio, SimpleNamespace, Agent, HandoffTool, and FunctionToolExecutor for new test cases.
    • Introduced MockToolCallProvider to simulate LLM responses with tool calls and capture the abort_signal.
    • Added MockHandoffProvider extending MockToolCallProvider for specific handoff tool testing.
    • Created BlockingSubagentContext to simulate a subagent that blocks indefinitely until cancelled.
    • Developed BlockingToolState to simulate a long-running tool handler that blocks indefinitely until cancelled.
    • Added test_stop_interrupts_pending_subagent_handoff to verify that subagent handoffs can be interrupted by a stop request.
    • Included test_stop_interrupts_pending_regular_tool to confirm that regular long-running tools can be interrupted by a stop request.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:core The bug / feature is about astrbot's core, backend label Mar 7, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="1008" />
<code_context>
+            data=AgentResponseData(chain=MessageChain(type="aborted")),
+        )
+
+    async def _cancel_tool_iteration(
+        self,
+        executor: T.Any,
</code_context>
<issue_to_address>
**issue (complexity):** 请考虑简化新的 tool-executor 中止逻辑,用一个将 `anext(executor)` 与中止信号直接竞争(race)的单一辅助函数,替换当前基于轮询的 `_iter_tool_executor_results`/`_cancel_tool_iteration` 组合。

主要的额外复杂度来自 `_iter_tool_executor_results` + `_cancel_tool_iteration` 以及基于时间的轮询。你可以在保留所有新的中止行为的同时,通过以下方式简化控制流:

1. 移除轮询间隔以及围绕 `asyncio.wait` 的循环。
2. 让单个 `anext(executor)` 任务直接与 `self._abort_signal.wait()` 竞争。
3. 将取消逻辑内联进 `_iter_tool_executor_results`,这样就不再需要 `_cancel_tool_iteration`。

这样既能保留对长时间运行的 `anext(executor)` 的显式取消,又能避免嵌套的轮询循环和 `tool_executor_poll_interval` 这个调节参数。

### 建议的重构

首先,去掉轮询间隔配置以及 `_cancel_tool_iteration````python
# __init__ / __post_init__:
self._abort_signal = asyncio.Event()
# Remove:
# self._tool_executor_poll_interval = ...
```

完全移除 `_cancel_tool_iteration`。

然后重写 `_iter_tool_executor_results`,使用“下一条结果”与“中止信号”之间的单次竞争:

```python
async def _iter_tool_executor_results(
    self,
    executor: T.Any,
) -> T.AsyncGenerator[T.Any, None]:
    while True:
        if self._is_stop_requested():
            # Ensure executor is closed if we haven't started a next_result_task yet
            close_executor = getattr(executor, "aclose", None)
            if close_executor is not None:
                with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
                    await close_executor()
            raise _ToolExecutionInterrupted(
                "Tool execution interrupted before reading the next tool result."
            )

        next_result_task = asyncio.create_task(anext(executor))
        abort_task = asyncio.create_task(self._abort_signal.wait())
        try:
            done, _ = await asyncio.wait(
                {next_result_task, abort_task},
                return_when=asyncio.FIRST_COMPLETED,
            )

            if abort_task in done:
                # Stop requested: cancel the in-flight anext and close executor
                if not next_result_task.done():
                    next_result_task.cancel()
                    with suppress(asyncio.CancelledError, StopAsyncIteration):
                        await next_result_task

                close_executor = getattr(executor, "aclose", None)
                if close_executor is not None:
                    with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
                        await close_executor()

                raise _ToolExecutionInterrupted("Tool execution interrupted by a stop request.")

            # Normal path: next_result_task finished first
            try:
                yield next_result_task.result()
            except StopAsyncIteration:
                return
        finally:
            # Best-effort cleanup of the abort task
            if not abort_task.done():
                abort_task.cancel()
                with suppress(asyncio.CancelledError):
                    await abort_task
```

这同时保留了:

- 工具启动前的中断(依然在 `for func_tool_name, ...` 循环中通过 `_ToolExecutionInterrupted` 处理)。
- 工具调用过程中的中断(显式取消 `anext(executor)` 并调用 `aclose()`)。
- 通过 `_finalize_aborted_step``_ToolExecutionInterrupted` 进行的集中式停止处理。

但它:

- 移除了额外的 `tool_executor_poll_interval` 配置及相关逻辑。
- 避免了嵌套的轮询循环以及对 `_cancel_tool_iteration` 的多次调用。
- 将取消逻辑局部化在一个单一、线性的辅助函数中,更易于理解和维护。
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得我们的评审有帮助,请考虑分享给他人 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的代码评审。
Original comment in English

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="1008" />
<code_context>
+            data=AgentResponseData(chain=MessageChain(type="aborted")),
+        )
+
+    async def _cancel_tool_iteration(
+        self,
+        executor: T.Any,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new tool-executor abort logic by replacing the polling-based `_iter_tool_executor_results`/`_cancel_tool_iteration` pair with a single helper that races `anext(executor)` directly against the abort signal.

The main extra complexity comes from `_iter_tool_executor_results` + `_cancel_tool_iteration` and the time-based polling. You can keep all the new abort behavior while simplifying the control flow by:

1. Removing the polling interval and loop around `asyncio.wait`.
2. Racing a single `anext(executor)` task against `self._abort_signal.wait()` directly.
3. Inlining the cancellation into `_iter_tool_executor_results` so `_cancel_tool_iteration` is no longer needed.

That keeps explicit cancellation of a long-running `anext(executor)` without the nested polling loops or the `tool_executor_poll_interval` knob.

### Suggested refactor

First, drop the poll-interval configuration and `_cancel_tool_iteration`:

```python
# __init__ / __post_init__:
self._abort_signal = asyncio.Event()
# Remove:
# self._tool_executor_poll_interval = ...
```

Remove `_cancel_tool_iteration` entirely.

Then rewrite `_iter_tool_executor_results` to use a single race between the next result and the abort signal:

```python
async def _iter_tool_executor_results(
    self,
    executor: T.Any,
) -> T.AsyncGenerator[T.Any, None]:
    while True:
        if self._is_stop_requested():
            # Ensure executor is closed if we haven't started a next_result_task yet
            close_executor = getattr(executor, "aclose", None)
            if close_executor is not None:
                with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
                    await close_executor()
            raise _ToolExecutionInterrupted(
                "Tool execution interrupted before reading the next tool result."
            )

        next_result_task = asyncio.create_task(anext(executor))
        abort_task = asyncio.create_task(self._abort_signal.wait())
        try:
            done, _ = await asyncio.wait(
                {next_result_task, abort_task},
                return_when=asyncio.FIRST_COMPLETED,
            )

            if abort_task in done:
                # Stop requested: cancel the in-flight anext and close executor
                if not next_result_task.done():
                    next_result_task.cancel()
                    with suppress(asyncio.CancelledError, StopAsyncIteration):
                        await next_result_task

                close_executor = getattr(executor, "aclose", None)
                if close_executor is not None:
                    with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
                        await close_executor()

                raise _ToolExecutionInterrupted("Tool execution interrupted by a stop request.")

            # Normal path: next_result_task finished first
            try:
                yield next_result_task.result()
            except StopAsyncIteration:
                return
        finally:
            # Best-effort cleanup of the abort task
            if not abort_task.done():
                abort_task.cancel()
                with suppress(asyncio.CancelledError):
                    await abort_task
```

This preserves:

- Pre-tool-start interruption (still handled in the `for func_tool_name, ...` loop via `_ToolExecutionInterrupted`).
- Mid-tool-call interruption (explicitly cancels `anext(executor)` and calls `aclose()`).
- Centralized stop handling via `_finalize_aborted_step` and `_ToolExecutionInterrupted`.

But it:

- Removes the extra `tool_executor_poll_interval` configuration and associated logic.
- Avoids nested polling loops and multiple calls to `_cancel_tool_iteration`.
- Keeps cancellation logic local to a single, linear helper that’s easier to reason about.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@dosubot
Copy link

dosubot bot commented Mar 7, 2026

Related Documentation

1 document(s) may need updating based on files changed in this PR:

AstrBotTeam's Space

pr4697的改动
View Suggested Changes
@@ -558,12 +558,40 @@
 - 该方法设置 `agent_stop_requested` 标志,返回被请求停止的任务数量
 - 系统消息提示:"[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]"
 - Agent Runner 通过后台监控任务(`_watch_agent_stop_signal`)周期性检查停止信号
-- 检测到停止信号后,调用 `request_stop()` 方法设置内部 `_stop_requested` 标志位
-- Agent 在流式生成或工具调用过程中检查 `_stop_requested` 标志,并在适当时机平滑退出
+- 检测到停止信号后,调用 `request_stop()` 方法设置内部的 `abort_signal`(一个 asyncio.Event 对象)
+- Agent 在流式生成、工具调用过程以及工具执行等待期间检查 `abort_signal` 状态,并在适当时机平滑退出
 - Agent 转换为 DONE 状态,并触发 `on_agent_done` 钩子
 - 对话历史和会话状态得以保留,响应类型标记为 "aborted"
 - `was_aborted()` 方法返回 `true`,表明任务被用户主动中止
 
+**工具执行中断机制(PR #5850)**:
+
+[PR #5850](https://github.com/AstrBotDevs/AstrBot/pull/5850) 增强了停止机制,使其能够在工具执行等待期间中断长时间运行的异步操作(包括子代理 handoff)。修复前,`/stop` 命令仅在 Agent 循环检查点生效,无法中断正在等待工具返回的操作,导致长时间工具调用或子代理任务期间停止功能失效([Issue #5668](https://github.com/AstrBotDevs/AstrBot/issues/5668))。
+
+**核心改进**:
+
+- **工具执行轮询机制**:`_iter_tool_executor_results()` 方法使用可配置的轮询间隔(默认 0.1 秒,通过 `tool_executor_poll_interval` 参数配置)周期性检查工具执行状态和停止信号
+- **中断异常**:检测到停止信号时抛出 `_ToolExecutionInterrupted` 异常,优雅地中断工具执行操作
+- **工具迭代取消**:通过 `aclose()` 方法正确取消工具执行器的异步迭代,确保资源释放
+- **中心化 abort_signal**:停止机制现在使用统一的 `abort_signal`(asyncio.Event)替代之前的 `_stop_requested` 布尔标志,该信号会一致地传递给 LLM 提供商和工具执行器
+- **部分输出保留**:当工具执行期间发生中断时(而非仅在 LLM 响应生成期间),已生成的部分助手输出会被保留
+- **中止终结逻辑**:`_finalize_aborted_step()` 方法集中处理中止场景的终结逻辑,确保正在执行的工具调用被妥善处理
+
+**停止机制多层级保障**:
+
+停止机制现在在以下多个层级生效,确保全面覆盖:
+
+1. **LLM 流式生成期间**:检查 `abort_signal` 并中断流式响应
+2. **Agent 循环迭代之间**:在步进之间检查停止信号
+3. **工具执行等待期间**:通过轮询机制检查停止信号,能够中断长时间工具调用(包括子代理 handoff、Computer Use 工具等)
+
+修复后,`/stop` 命令可在工具执行的任何阶段生效,而不仅限于检查点,显著提升了用户对 Agent 执行的控制能力。
+
+**回归测试**:PR #5850 新增了以下测试场景:
+
+- 子代理 handoff 中断测试
+- 常规长时间工具执行中断测试
+
 ##### 第三方 Agent Runner(如 Dify、Coze、DeerFlow)
 对于第三方 Agent Runner,停止请求会完全中断事件传播:
 
@@ -575,9 +603,12 @@
 ##### 后端实现
 - `request_agent_stop_all()` 方法(`active_event_registry`):用于平滑停止,设置 `agent_stop_requested` 标志但不中断事件传播,返回被请求停止的任务数量
 - `stop_all()` 方法(`active_event_registry`):用于硬停止,完全中断事件传播
-- `request_stop()` 方法(`tool_loop_agent_runner`):设置内部 `_stop_requested` 和 `_aborted` 标志位
+- `request_stop()` 方法(`tool_loop_agent_runner`):设置内部的 `abort_signal`(asyncio.Event)
+- `_is_stop_requested()` 方法(`tool_loop_agent_runner`):检查 `abort_signal` 的状态
 - `was_aborted()` 方法(`tool_loop_agent_runner`):检查任务是否被用户主动中止
 - Agent 执行过程中通过后台监控任务(`_watch_agent_stop_signal`)周期性检查停止信号
+- `_iter_tool_executor_results()` 方法(`tool_loop_agent_runner`):轮询工具执行状态并检查停止信号,支持在工具执行等待期间中断
+- `_finalize_aborted_step()` 方法(`tool_loop_agent_runner`):集中处理中止场景的终结逻辑,保留部分输出并清理未消费的跟进消息
 - `/api/chat/stop` POST 端点(`chat.py`):接收前端停止请求,验证会话所有权后调用 `request_agent_stop_all()`
 
 ##### 历史记录持久化改进

[Accept] [Decline]

Note: You must be authenticated to accept/decline updates.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to interrupt long-running tool executions, including subagent handoffs, when a stop is requested. The implementation centralizes the stop state using an asyncio.Event and introduces a polling wrapper, _iter_tool_executor_results, around tool executors to make them stop-aware. The logic for handling interruptions is encapsulated in a new _finalize_aborted_step method. The changes also include passing the abort signal to LLM provider calls to allow for interruption during streaming. New regression tests have been added to verify that both subagent handoffs and regular tool executions can be successfully interrupted.

@idiotsj
Copy link
Contributor Author

idiotsj commented Mar 7, 2026

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题,并留下了一些整体性的反馈:

  • _handle_function_tools 中,except _ToolExecutionInterrupted: raise 这段代码是多余的,因为通用代码路径本身就会让这个异常向外传播;你可以通过移除这个显式的捕获并重新抛出逻辑来简化实现。
  • _iter_tool_executor_results 中的执行器关闭逻辑在循环开始处的提前停止分支和 abort_task 完成的分支中被重复实现;可以考虑提取一个小的辅助函数(例如 _close_executor(executor)),以便让这部分行为保持一致并更易维护。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_handle_function_tools`, the `except _ToolExecutionInterrupted: raise` block is redundant since the generic code path already lets this exception propagate; you can simplify by removing that explicit catch-and-rethrow.
- The executor shutdown logic in `_iter_tool_executor_results` is duplicated in both the early-stop branch at the top of the loop and the `abort_task`-completed branch; consider extracting a small helper (e.g. `_close_executor(executor)`) to keep this behavior consistent and easier to maintain.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="996" />
<code_context>
+            data=AgentResponseData(chain=MessageChain(type="aborted")),
+        )
+
+    async def _iter_tool_executor_results(
+        self,
+        executor: T.Any,
</code_context>
<issue_to_address>
**issue (complexity):** 考虑重构新的停止处理逻辑,将执行器清理集中到一起,并只依赖单一的中断机制,从而让控制流更易于理解和维护。

你可以在不改变现有行为的前提下简化新的停止处理逻辑:

1. **用一个小的辅助函数集中执行器清理逻辑**

当前 `_iter_tool_executor_results` 在两个地方重复了 `aclose` + `suppress(...)` 的逻辑。通过一个小的辅助函数可以让控制流更加清晰,也更易于维护:

```python
async def _close_executor(self, executor: T.Any) -> None:
    close_executor = getattr(executor, "aclose", None)
    if close_executor is None:
        return
    with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
        await close_executor()
```

然后 `_iter_tool_executor_results` 可以复用它:

```python
async def _iter_tool_executor_results(
    self,
    executor: T.Any,
) -> T.AsyncGenerator[T.Any, None]:
    while True:
        if self._is_stop_requested():
            await self._close_executor(executor)
            raise _ToolExecutionInterrupted(
                "Tool execution interrupted before reading the next tool result."
            )

        next_result_task = asyncio.create_task(anext(executor))
        abort_task = asyncio.create_task(self._abort_signal.wait())
        try:
            done, _ = await asyncio.wait(
                {next_result_task, abort_task},
                return_when=asyncio.FIRST_COMPLETED,
            )

            if abort_task in done:
                if not next_result_task.done():
                    next_result_task.cancel()
                    with suppress(asyncio.CancelledError, StopAsyncIteration):
                        await next_result_task

                await self._close_executor(executor)
                raise _ToolExecutionInterrupted(
                    "Tool execution interrupted by a stop request."
                )

            try:
                yield next_result_task.result()
            except StopAsyncIteration:
                return
        finally:
            if not abort_task.done():
                abort_task.cancel()
                with suppress(asyncio.CancelledError):
                    await abort_task
```

这可以保持相同的语义,但移除了重复的清理代码,并将异常抑制集中到一个位置。

2. **使用单一的控制流路径来处理工具中断**

你目前混合使用了:

- 工具循环前显式调用 `_is_stop_requested()` 的检查;
-`_iter_tool_executor_results` 内部抛出的 `_ToolExecutionInterrupted`- 以及在 `step``async for` 之后的另一个 `_is_stop_requested()` 检查。

如果你希望将 `_ToolExecutionInterrupted` 作为工具中断的基础原语,那么可以移除前后置的检查,只依赖异常本身。例如:

```python
async def _handle_function_tools(...):
    for func_tool_name, func_tool_args, func_tool_id in zip(...):
        # Remove this outer check:
        # if self._is_stop_requested():
        #     raise _ToolExecutionInterrupted(...)

        ...
        try:
            async for resp in self._iter_tool_executor_results(executor):
                ...
        except _ToolExecutionInterrupted:
            raise
        except Exception as e:
            ...
````step` 中,你只需要处理这个异常,而不需要额外的 `_is_stop_requested()` 保护:

```python
if llm_resp.tools_call_name:
    if self.tool_schema_mode == "skills_like":
        llm_resp, _ = await self._resolve_tool_exec(llm_resp)

    tool_call_result_blocks = []
    cached_images = []
    try:
        async for result in self._handle_function_tools(self.req, llm_resp):
            ...
    except _ToolExecutionInterrupted:
        yield await self._finalize_aborted_step(llm_resp)
        return

    # Remove this:
    # if self._is_stop_requested():
    #     yield await self._finalize_aborted_step(llm_resp)
    #     return
```

这样可以保持停止功能不变,同时减少中断影响控制流的方式数量,让你更容易推理工具执行在何时/何处退出。
</issue_to_address>

Sourcery 对开源项目免费——如果你喜欢我们的评审,请考虑分享一下 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据你的反馈改进以后的评审。
Original comment in English

Hey - I've found 1 issue, and left some high level feedback:

  • In _handle_function_tools, the except _ToolExecutionInterrupted: raise block is redundant since the generic code path already lets this exception propagate; you can simplify by removing that explicit catch-and-rethrow.
  • The executor shutdown logic in _iter_tool_executor_results is duplicated in both the early-stop branch at the top of the loop and the abort_task-completed branch; consider extracting a small helper (e.g. _close_executor(executor)) to keep this behavior consistent and easier to maintain.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_handle_function_tools`, the `except _ToolExecutionInterrupted: raise` block is redundant since the generic code path already lets this exception propagate; you can simplify by removing that explicit catch-and-rethrow.
- The executor shutdown logic in `_iter_tool_executor_results` is duplicated in both the early-stop branch at the top of the loop and the `abort_task`-completed branch; consider extracting a small helper (e.g. `_close_executor(executor)`) to keep this behavior consistent and easier to maintain.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="996" />
<code_context>
+            data=AgentResponseData(chain=MessageChain(type="aborted")),
+        )
+
+    async def _iter_tool_executor_results(
+        self,
+        executor: T.Any,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the new stop-handling logic to centralize executor cleanup and rely on a single interruption mechanism to make the control flow easier to follow and maintain.

You can simplify the new stop-handling without changing behavior by:

1. **Centralizing executor cleanup in a small helper**

Right now `_iter_tool_executor_results` duplicates the `aclose` + `suppress(...)` logic in two places. A small helper makes the control flow clearer and easier to maintain:

```python
async def _close_executor(self, executor: T.Any) -> None:
    close_executor = getattr(executor, "aclose", None)
    if close_executor is None:
        return
    with suppress(asyncio.CancelledError, RuntimeError, StopAsyncIteration):
        await close_executor()
```

Then `_iter_tool_executor_results` can reuse this:

```python
async def _iter_tool_executor_results(
    self,
    executor: T.Any,
) -> T.AsyncGenerator[T.Any, None]:
    while True:
        if self._is_stop_requested():
            await self._close_executor(executor)
            raise _ToolExecutionInterrupted(
                "Tool execution interrupted before reading the next tool result."
            )

        next_result_task = asyncio.create_task(anext(executor))
        abort_task = asyncio.create_task(self._abort_signal.wait())
        try:
            done, _ = await asyncio.wait(
                {next_result_task, abort_task},
                return_when=asyncio.FIRST_COMPLETED,
            )

            if abort_task in done:
                if not next_result_task.done():
                    next_result_task.cancel()
                    with suppress(asyncio.CancelledError, StopAsyncIteration):
                        await next_result_task

                await self._close_executor(executor)
                raise _ToolExecutionInterrupted(
                    "Tool execution interrupted by a stop request."
                )

            try:
                yield next_result_task.result()
            except StopAsyncIteration:
                return
        finally:
            if not abort_task.done():
                abort_task.cancel()
                with suppress(asyncio.CancelledError):
                    await abort_task
```

This keeps the same semantics but removes duplicated cleanup code and narrows the suppression to one place.

2. **Use a single control-flow path for tool interruption**

You currently mix:

- explicit `_is_stop_requested()` check before the tool loop, and
- `_ToolExecutionInterrupted` raised inside `_iter_tool_executor_results`, and
- a separate `_is_stop_requested()` check *after* the `async for` in `step`.

If you want `_ToolExecutionInterrupted` to be the primitive for tool interruption, you can remove the pre/post checks and rely on the exception only. For example:

```python
async def _handle_function_tools(...):
    for func_tool_name, func_tool_args, func_tool_id in zip(...):
        # Remove this outer check:
        # if self._is_stop_requested():
        #     raise _ToolExecutionInterrupted(...)

        ...
        try:
            async for resp in self._iter_tool_executor_results(executor):
                ...
        except _ToolExecutionInterrupted:
            raise
        except Exception as e:
            ...
```

And in `step` you only handle the exception, without an extra `_is_stop_requested()` guard:

```python
if llm_resp.tools_call_name:
    if self.tool_schema_mode == "skills_like":
        llm_resp, _ = await self._resolve_tool_exec(llm_resp)

    tool_call_result_blocks = []
    cached_images = []
    try:
        async for result in self._handle_function_tools(self.req, llm_resp):
            ...
    except _ToolExecutionInterrupted:
        yield await self._finalize_aborted_step(llm_resp)
        return

    # Remove this:
    # if self._is_stop_requested():
    #     yield await self._finalize_aborted_step(llm_resp)
    #     return
```

This keeps the stop feature intact but reduces the number of ways interruption can affect control flow, making it easier to reason about when/where tool execution exits.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@idiotsj
Copy link
Contributor Author

idiotsj commented Mar 7, 2026

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我在这里给出了一些整体性的反馈:

  • 可以考虑将重复使用的中断消息字符串("[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]")提取为模块级常量,这样如果以后需要更新,行为和措辞都能保持一致。
  • _iter_tool_executor_results 辅助函数目前接受 executor: Any;你可以将其收紧为 AsyncIterator / AsyncGenerator 类型(并且可以考虑让该辅助函数在返回值类型上做成泛型),以提高类型安全性,并让预期的 executor 接口更加清晰。
面向 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- Consider extracting the repeated interruption message string ("[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]") into a module-level constant so the behavior and wording remain consistent if it needs to be updated later.
- The `_iter_tool_executor_results` helper currently takes `executor: Any`; you could tighten this to an `AsyncIterator`/`AsyncGenerator` type (and possibly make the helper generic over the yielded type) to improve type safety and clarity around the expected executor interface.

Sourcery 对开源项目免费 —— 如果你喜欢我们的评审,欢迎分享 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've left some high level feedback:

  • Consider extracting the repeated interruption message string ("[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]") into a module-level constant so the behavior and wording remain consistent if it needs to be updated later.
  • The _iter_tool_executor_results helper currently takes executor: Any; you could tighten this to an AsyncIterator/AsyncGenerator type (and possibly make the helper generic over the yielded type) to improve type safety and clarity around the expected executor interface.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider extracting the repeated interruption message string ("[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]") into a module-level constant so the behavior and wording remain consistent if it needs to be updated later.
- The `_iter_tool_executor_results` helper currently takes `executor: Any`; you could tighten this to an `AsyncIterator`/`AsyncGenerator` type (and possibly make the helper generic over the yielded type) to improve type safety and clarity around the expected executor interface.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@idiotsj
Copy link
Contributor Author

idiotsj commented Mar 7, 2026

I checked this warning and it appears to be a base-branch CodeQL configuration mismatch rather than a change introduced by this PR.

What I found:

  • The warning says the missing configuration is Actions workflow (codeql.yml) -> /language:javascript-typescript.
  • The current .github/workflows/codeql.yml on master only defines Python analysis.
  • The latest master check runs also show only Analyze (python) and no active JavaScript/TypeScript CodeQL run.

So the warning seems to come from stale code-scanning configuration metadata still associated with refs/heads/master, not from this branch's code changes.

I did not add an unrelated CodeQL workflow change to this PR. The clean fix should be to refresh CodeQL on the base branch (for example by rerunning CodeQL on master), so GitHub recomputes the branch configurations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] stop指令对subagent任务不生效

1 participant

X Tutup