X Tutup
Skip to content

fix: 修复 qq_official 消息重复消费与误拦截问题#5899

Open
001VIsir wants to merge 10 commits intoAstrBotDevs:masterfrom
001VIsir:fix/5848
Open

fix: 修复 qq_official 消息重复消费与误拦截问题#5899
001VIsir wants to merge 10 commits intoAstrBotDevs:masterfrom
001VIsir:fix/5848

Conversation

@001VIsir
Copy link

@001VIsir 001VIsir commented Mar 8, 2026

Fix #5848

变更背景

在 qq_official 场景下,存在两类问题:

  1. 同一消息会被重复消费,导致重复处理与重复回复;

主要改动

1)事件总线层增加短窗口去重兜底

  • 文件:astrbot/core/event_bus.py
  • 说明:
    • 增加短时间窗口事件指纹去重;
    • 在消息进入调度前进行兜底拦截,减少重复事件进入后续处理链路。

2)平台实例加载增加同 ID 防重复保护

  • 文件:astrbot/core/platform/manager.py
  • 说明:
    • 加载平台时,若发现同平台 ID 实例已存在,则先终止旧实例再加载新实例;
    • 避免同一平台多实例并发导致的重复消费。

3)qq_official 去重策略修正(避免误拦截)

  • 文件:astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
  • 说明:
    • 将去重缓存拆分为两类:
      • message_id 去重:长 TTL(用于防重复回调/重投);
      • sender+content 去重:短 TTL(仅用于瞬时重复兜底)。
    • 修复原先“内容去重长期生效”导致的误判问题,允许用户在短窗口外正常重复输入。
    • 统一私聊发送者 ID 取值优先级(优先 user_openid,回退 id),降低不同回调路径字段不一致带来的误判/漏判。

影响文件

  • astrbot/core/event_bus.py
  • astrbot/core/platform/manager.py
  • astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py

Summary by Sourcery

改进 QQ 公众号消息的处理方式,防止既出现重复消费,又出现过度去重,并在事件总线上添加短时间窗口的重复保护机制,同时增加防御性的平台重载逻辑。

Bug 修复:

  • 通过引入“按适配器划分的消息 ID”和“基于内容+发送者的短时间窗口去重”,避免对同一条 QQ 公众号消息进行重复处理。
  • 将基于内容的去重限制在短时间窗口内,并统一标准化发送者 ID 的提取方式,防止合法的重复 QQ 公众号输入被长期拦截。
  • 在重载前终止已有的相同平台 ID 的活动实例,阻止因多个活动实例并存而导致的消息重复消费。

增强功能:

  • 在事件总线上添加一个短时间窗口的事件指纹去重层,用于丢弃跨平台实例快速重复的完全相同事件。
Original summary in English

Summary by Sourcery

Improve QQ Official message handling to prevent both duplicate consumption and over-aggressive deduplication, and add a short-window duplicate guard in the event bus plus defensive platform reloading.

Bug Fixes:

  • Avoid repeated processing of the same QQ Official message by introducing per-adapter message ID and short-window content+sender deduplication.
  • Prevent legitimate repeated QQ Official inputs from being blocked long-term by scoping content-based deduplication to a short time window and standardizing sender ID extraction.
  • Stop duplicate message consumption caused by multiple active instances of the same platform ID by terminating existing instances before reloading.

Enhancements:

  • Add a short time-window event fingerprint deduplication layer in the event bus to drop rapidly repeated identical events across platform instances.

来自 Sourcery 的摘要

为 QQ 公众号消息和事件分发引入健壮的去重机制,在避免重复消费的同时防止过度过滤。

Bug 修复:

  • 通过在平台适配器中结合长生命周期的消息 ID 去重与短时间窗口内的「发送者 + 内容」校验,防止同一条 QQ 公众号消息被重复处理。
  • 通过将基于内容的去重限制在极短的 TTL 内,并在不同消息类型之间统一发送者 ID 的抽取方式,避免错误拦截合法的重复 QQ 公众号输入。
  • 在重新加载前终止已有实例,消除由同一平台 ID 的多个活动实例导致的重复消息消费问题。

增强:

  • 在事件总线上新增短时间窗口的事件指纹去重层,用于丢弃跨平台实例快速重复的相同事件。
Original summary in English

来自 Sourcery 的总结

通过在适配器和事件总线两个层面加入健壮的短时间窗口去重机制,并防止重复的平台实例消费同一批消息,以改进 QQ 公众号消息处理。

Bug 修复:

  • 通过在平台适配器中结合“按消息 ID 追踪”与“短时间窗口内基于发送者 + 内容的去重”,防止 QQ 公众号消息被重复处理。
  • 通过在加载新实例前终止同一平台 ID 的任何已存在实例,避免同一平台 ID 的多个在线实例导致消息被重复消费。

增强:

  • 在事件总线中引入短时间窗口的事件指纹去重层,用于丢弃在短时间内重复出现的相同事件。
  • 统一不同消息类型中 QQ 公众号发送者 ID 的提取方式,以支持一致的去重和事件指纹生成。

文档:

  • 新增问题日志,记录本地开发环境中关于格式化工具、代码检查器和类型检查器的相关问题及其解决方案。
Original summary in English

Summary by Sourcery

Improve QQ Official message handling by adding robust short-window deduplication at both the adapter and event bus levels and preventing duplicate platform instances from consuming the same messages.

Bug Fixes:

  • Prevent repeated processing of QQ Official messages by combining per-message ID tracking with short-window sender+content deduplication in the platform adapter.
  • Avoid duplicate message consumption caused by multiple live instances of the same platform ID by terminating any existing instance before loading a new one.

Enhancements:

  • Introduce a short time-window event fingerprint deduplication layer in the event bus to drop rapidly repeated identical events.
  • Standardize QQ Official sender ID extraction across message types to support consistent deduplication and event fingerprinting.

Documentation:

  • Add a problems log describing local development environment tooling issues and their resolution for formatters, linters, and the type checker.

Bug Fixes(缺陷修复):

  • 通过在 QQ 公众号平台适配器中增加按消息去重和短时间窗口内基于“内容+发送者”的去重,防止 QQ 公众号消息被重复处理。
  • 将基于内容的去重限制在一个较短的 TTL 范围内,并在各类回调中统一发送者 ID 的提取方式,以避免长期阻塞合法的重复 QQ 公众号输入。
  • 在重新加载前终止具有相同平台 ID 的已存在实例,消除由多个活动实例引发的消息重复消费问题。

Enhancements(增强改进):

  • 在事件总线中引入短时间窗口的事件指纹去重层,用于丢弃在各个平台实例之间快速重复出现的相同事件。
Original summary in English

来自 Sourcery 的摘要

为 QQ 公众号消息和事件分发引入健壮的去重机制,在避免重复消费的同时防止过度过滤。

Bug 修复:

  • 通过在平台适配器中结合长生命周期的消息 ID 去重与短时间窗口内的「发送者 + 内容」校验,防止同一条 QQ 公众号消息被重复处理。
  • 通过将基于内容的去重限制在极短的 TTL 内,并在不同消息类型之间统一发送者 ID 的抽取方式,避免错误拦截合法的重复 QQ 公众号输入。
  • 在重新加载前终止已有实例,消除由同一平台 ID 的多个活动实例导致的重复消息消费问题。

增强:

  • 在事件总线上新增短时间窗口的事件指纹去重层,用于丢弃跨平台实例快速重复的相同事件。
Original summary in English

来自 Sourcery 的总结

通过在适配器和事件总线两个层面加入健壮的短时间窗口去重机制,并防止重复的平台实例消费同一批消息,以改进 QQ 公众号消息处理。

Bug 修复:

  • 通过在平台适配器中结合“按消息 ID 追踪”与“短时间窗口内基于发送者 + 内容的去重”,防止 QQ 公众号消息被重复处理。
  • 通过在加载新实例前终止同一平台 ID 的任何已存在实例,避免同一平台 ID 的多个在线实例导致消息被重复消费。

增强:

  • 在事件总线中引入短时间窗口的事件指纹去重层,用于丢弃在短时间内重复出现的相同事件。
  • 统一不同消息类型中 QQ 公众号发送者 ID 的提取方式,以支持一致的去重和事件指纹生成。

文档:

  • 新增问题日志,记录本地开发环境中关于格式化工具、代码检查器和类型检查器的相关问题及其解决方案。
Original summary in English

Summary by Sourcery

Improve QQ Official message handling by adding robust short-window deduplication at both the adapter and event bus levels and preventing duplicate platform instances from consuming the same messages.

Bug Fixes:

  • Prevent repeated processing of QQ Official messages by combining per-message ID tracking with short-window sender+content deduplication in the platform adapter.
  • Avoid duplicate message consumption caused by multiple live instances of the same platform ID by terminating any existing instance before loading a new one.

Enhancements:

  • Introduce a short time-window event fingerprint deduplication layer in the event bus to drop rapidly repeated identical events.
  • Standardize QQ Official sender ID extraction across message types to support consistent deduplication and event fingerprinting.

Documentation:

  • Add a problems log describing local development environment tooling issues and their resolution for formatters, linters, and the type checker.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 8, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在解决 qq_official 场景下消息重复消费和误拦截的问题。通过在事件总线层引入短窗口事件去重机制,以及在平台管理器中增加同 ID 实例的防御性处理,确保消息处理的唯一性。同时,针对 qq_official 适配器,优化了去重策略,区分了消息 ID 和内容去重的生命周期,并统一了发送者 ID 的获取逻辑,从而提升了系统的稳定性和用户体验。

Highlights

  • 事件总线层增加短窗口去重兜底: 在消息进入调度前,通过增加短时间窗口事件指纹去重,减少重复事件进入后续处理链路。
  • 平台实例加载增加同 ID 防重复保护: 加载平台时,若发现同平台 ID 实例已存在,则先终止旧实例再加载新实例,避免同一平台多实例并发导致的重复消费。
  • qq_official 去重策略修正: 将去重缓存拆分为 message_id(长 TTL)和 sender+content(短 TTL),修复了内容去重长期生效导致的误判问题,并统一了私聊发送者 ID 的取值优先级。
Changelog
  • astrbot/core/event_bus.py
    • 增加了短时间窗口事件指纹去重机制。
    • 在消息调度前进行重复事件拦截。
  • astrbot/core/platform/manager.py
    • 实现了平台加载时对已存在同 ID 实例的终止与重载逻辑。
  • astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
    • 将去重缓存分为 message_id(长 TTL)和 sender+content(短 TTL)。
    • 修复了内容去重长期生效导致的误判问题。
    • 统一了私聊发送者 ID 的取值优先级。
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Mar 8, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 2 个问题,并给出了一些整体性反馈:

  • _get_sender_id 中,当前优先级是 iduser_openidmember_openid,这与注释中“应优先使用 user_openid”的描述相矛盾,并且可能导致跨回调时发送者 ID 不一致;建议将检查顺序调整为先 user_openid,再 member_openid,最后 id
  • 当前用于去重的短内容键是 f"{sender_id}:{content[:50]}";如果不同消息的前 50 个字符相同,就有可能被错误地视为重复,建议改为使用完整内容或完整内容的哈希,而不是截断。
给 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- In `_get_sender_id` the priority is `id``user_openid``member_openid`, which contradicts the description that `user_openid` should be preferred and may lead to inconsistent sender IDs across callbacks; consider reordering the checks to `user_openid` first, then `member_openid`, then `id`.
- The short content key for deduplication is currently `f"{sender_id}:{content[:50]}"`; if different messages share the same 50‑char prefix this may incorrectly treat them as duplicates, so consider either using the full content or a hash of the full content instead of truncation.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="39-47" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    def _get_sender_id(self, message) -> str:
+        """Extract sender ID from different message types."""
+        if hasattr(message, 'author') and hasattr(message.author, 'id'):
+            return str(message.author.id)
+        if hasattr(message, 'author') and hasattr(message.author, 'user_openid'):
+            return str(message.author.user_openid)
+        if hasattr(message, 'author') and hasattr(message.author, 'member_openid'):
+            return str(message.author.member_openid)
+        return ""
+
     # 收到群消息
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 将发送者 ID 的选择策略与 `_parse_from_qqofficial` 对齐,以避免去重键不一致。

Here `_get_sender_id` prefers `author.id` over `user_openid`, but `_parse_from_qqofficial` prefers `user_openid` and only falls back to `id`. This can cause the same user to appear under different IDs, weakening deduplication (`_is_duplicate_message`) and event_bus fingerprinting. Please align the precedence order with `_parse_from_qqofficial` or extract a shared helper so both paths use the same logic.

```suggestion
    def _get_sender_id(self, message) -> str:
        """Extract sender ID from different message types.

        The precedence order is aligned with `_parse_from_qqofficial` to ensure
        consistent deduplication and event fingerprinting:
        1. author.user_openid
        2. author.member_openid
        3. author.id
        """
        if hasattr(message, "author") and hasattr(message.author, "user_openid"):
            return str(message.author.user_openid)
        if hasattr(message, "author") and hasattr(message.author, "member_openid"):
            return str(message.author.member_openid)
        if hasattr(message, "author") and hasattr(message.author, "id"):
            return str(message.author.id)
        return ""
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="39" />
<code_context>
         self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
         self.astrbot_config_mgr = astrbot_config_mgr
+        # 跨平台实例短窗去重(兜底):处理同一消息在极短时间内重复入队。
+        self._recent_event_fingerprints: dict[str, float] = {}
+        self._dedup_ttl_seconds = 0.5
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议通过用“元组键 + deque/set TTL 结构”替换当前基于哈希的字典方案,并补充单消费者假设的说明,来简化新的去重逻辑,从而提升清晰度和安全性。

You can simplify the dedup logic without changing behavior by:

1. Using a tuple as the fingerprint key (no hashing / string joins).
2. Using a deque + set for TTL-based dedup, with straightforward eviction.
3. Explicitly documenting the single-consumer assumption.

### 1. Replace SHA-1 fingerprint with a tuple

```python
from collections import deque
import time

class EventBus:
    def __init__(...):
        ...
        # 最近事件指纹,短窗去重(0.5s),单消费者线程内使用
        self._dedup_ttl_seconds = 0.5
        self._dedup_seen = set()              # Set[Fingerprint]
        self._dedup_queue = deque()           # deque[(timestamp, Fingerprint)]
```

```python
    def _build_event_fingerprint(self, event: AstrMessageEvent) -> tuple:
        # 简单元组键即可,避免拼接和哈希
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )
```

### 2. Simplify TTL cleanup with deque + set

```python
    def _clean_expired_event_fingerprints(self) -> None:
        now = time.time()
        expire_before = now - self._dedup_ttl_seconds
        while self._dedup_queue and self._dedup_queue[0][0] < expire_before:
            _, fingerprint = self._dedup_queue.popleft()
            self._dedup_seen.discard(fingerprint)

    def _is_duplicate_event(self, event: AstrMessageEvent) -> bool:
        # dispatch 是单消费者循环,未加锁是有意为之
        self._clean_expired_event_fingerprints()
        fingerprint = self._build_event_fingerprint(event)
        if fingerprint in self._dedup_seen:
            return True
        ts = time.time()
        self._dedup_seen.add(fingerprint)
        self._dedup_queue.append((ts, fingerprint))
        return False
```

This:

- Removes the need to iterate and mutate a dict copy.
- Makes eviction logic linear and ordered (pop from left while expired).
- Removes hashing / encoding complexity, while collision risk remains negligible in a 0.5s window.

### 3. Clarify concurrency assumption

Add a short comment where dedup is used to prevent future accidental multi-consumer usage:

```python
    async def dispatch(self) -> None:
        # event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._is_duplicate_event(event):
                ...
```
</issue_to_address>

Sourcery 对开源项目免费使用——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review 质量。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • In _get_sender_id the priority is iduser_openidmember_openid, which contradicts the description that user_openid should be preferred and may lead to inconsistent sender IDs across callbacks; consider reordering the checks to user_openid first, then member_openid, then id.
  • The short content key for deduplication is currently f"{sender_id}:{content[:50]}"; if different messages share the same 50‑char prefix this may incorrectly treat them as duplicates, so consider either using the full content or a hash of the full content instead of truncation.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_get_sender_id` the priority is `id``user_openid``member_openid`, which contradicts the description that `user_openid` should be preferred and may lead to inconsistent sender IDs across callbacks; consider reordering the checks to `user_openid` first, then `member_openid`, then `id`.
- The short content key for deduplication is currently `f"{sender_id}:{content[:50]}"`; if different messages share the same 50‑char prefix this may incorrectly treat them as duplicates, so consider either using the full content or a hash of the full content instead of truncation.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="39-47" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    def _get_sender_id(self, message) -> str:
+        """Extract sender ID from different message types."""
+        if hasattr(message, 'author') and hasattr(message.author, 'id'):
+            return str(message.author.id)
+        if hasattr(message, 'author') and hasattr(message.author, 'user_openid'):
+            return str(message.author.user_openid)
+        if hasattr(message, 'author') and hasattr(message.author, 'member_openid'):
+            return str(message.author.member_openid)
+        return ""
+
     # 收到群消息
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Align sender ID selection strategy with `_parse_from_qqofficial` to avoid inconsistent dedup keys.

Here `_get_sender_id` prefers `author.id` over `user_openid`, but `_parse_from_qqofficial` prefers `user_openid` and only falls back to `id`. This can cause the same user to appear under different IDs, weakening deduplication (`_is_duplicate_message`) and event_bus fingerprinting. Please align the precedence order with `_parse_from_qqofficial` or extract a shared helper so both paths use the same logic.

```suggestion
    def _get_sender_id(self, message) -> str:
        """Extract sender ID from different message types.

        The precedence order is aligned with `_parse_from_qqofficial` to ensure
        consistent deduplication and event fingerprinting:
        1. author.user_openid
        2. author.member_openid
        3. author.id
        """
        if hasattr(message, "author") and hasattr(message.author, "user_openid"):
            return str(message.author.user_openid)
        if hasattr(message, "author") and hasattr(message.author, "member_openid"):
            return str(message.author.member_openid)
        if hasattr(message, "author") and hasattr(message.author, "id"):
            return str(message.author.id)
        return ""
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="39" />
<code_context>
         self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
         self.astrbot_config_mgr = astrbot_config_mgr
+        # 跨平台实例短窗去重(兜底):处理同一消息在极短时间内重复入队。
+        self._recent_event_fingerprints: dict[str, float] = {}
+        self._dedup_ttl_seconds = 0.5
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new deduplication logic by replacing the hash-based dict approach with a tuple-key + deque/set TTL structure and documenting the single-consumer assumption for clarity and safety.

You can simplify the dedup logic without changing behavior by:

1. Using a tuple as the fingerprint key (no hashing / string joins).
2. Using a deque + set for TTL-based dedup, with straightforward eviction.
3. Explicitly documenting the single-consumer assumption.

### 1. Replace SHA-1 fingerprint with a tuple

```python
from collections import deque
import time

class EventBus:
    def __init__(...):
        ...
        # 最近事件指纹,短窗去重(0.5s),单消费者线程内使用
        self._dedup_ttl_seconds = 0.5
        self._dedup_seen = set()              # Set[Fingerprint]
        self._dedup_queue = deque()           # deque[(timestamp, Fingerprint)]
```

```python
    def _build_event_fingerprint(self, event: AstrMessageEvent) -> tuple:
        # 简单元组键即可,避免拼接和哈希
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )
```

### 2. Simplify TTL cleanup with deque + set

```python
    def _clean_expired_event_fingerprints(self) -> None:
        now = time.time()
        expire_before = now - self._dedup_ttl_seconds
        while self._dedup_queue and self._dedup_queue[0][0] < expire_before:
            _, fingerprint = self._dedup_queue.popleft()
            self._dedup_seen.discard(fingerprint)

    def _is_duplicate_event(self, event: AstrMessageEvent) -> bool:
        # dispatch 是单消费者循环,未加锁是有意为之
        self._clean_expired_event_fingerprints()
        fingerprint = self._build_event_fingerprint(event)
        if fingerprint in self._dedup_seen:
            return True
        ts = time.time()
        self._dedup_seen.add(fingerprint)
        self._dedup_queue.append((ts, fingerprint))
        return False
```

This:

- Removes the need to iterate and mutate a dict copy.
- Makes eviction logic linear and ordered (pop from left while expired).
- Removes hashing / encoding complexity, while collision risk remains negligible in a 0.5s window.

### 3. Clarify concurrency assumption

Add a short comment where dedup is used to prevent future accidental multi-consumer usage:

```python
    async def dispatch(self) -> None:
        # event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._is_duplicate_event(event):
                ...
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR aims to resolve duplicate message consumption and incorrect interception issues on the qq_official platform by implementing deduplication logic across the event bus, platform manager, and adapter layers. While the approach of adding global short-window deduplication in the event bus and fine-tuning deduplication strategies in the qq_official adapter is sound, a critical logic bug exists in the platform manager's loading logic. Sanitized platform IDs are not correctly handled when checking for existing instances, which could lead to multiple instances of the same platform running concurrently and reintroducing the very duplicate processing this PR intends to prevent. Furthermore, the expired event cleanup logic in event_bus.py may face performance issues under high concurrency, and the newly added _get_sender_id method in qqofficial_platform_adapter.py has inconsistent logic that could cause deduplication to fail.

Comment on lines +39 to +47
def _get_sender_id(self, message) -> str:
"""Extract sender ID from different message types."""
if hasattr(message, 'author') and hasattr(message.author, 'id'):
return str(message.author.id)
if hasattr(message, 'author') and hasattr(message.author, 'user_openid'):
return str(message.author.user_openid)
if hasattr(message, 'author') and hasattr(message.author, 'member_openid'):
return str(message.author.member_openid)
return ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

此处的 _get_sender_id 方法中获取发送者 ID 的优先级顺序与本次 PR 在 _parse_from_qqofficial 方法中的修改(优先使用 user_openid)不一致。不一致的 ID 获取逻辑可能会导致去重失败或误判。

当前的实现优先检查 id,但对于私聊和群聊场景,user_openidmember_openid 通常是更准确的唯一标识。为了确保去重逻辑的正确性和代码逻辑的一致性,建议重构此方法,统一 ID 的获取优先级,并使其更健壮。

建议的优先级为 user_openid -> member_openid -> id

    def _get_sender_id(self, message) -> str:
        """Extract sender ID from different message types."""
        if not hasattr(message, "author"):
            return ""
        author = message.author
        # 统一ID获取优先级,优先使用 openid
        sender_id = (
            getattr(author, "user_openid", None)
            or getattr(author, "member_openid", None)
            or getattr(author, "id", None)
        )
        return str(sender_id) if sender_id is not None else ""

Comment on lines +126 to +132
if platform_id in self._inst_map:
logger.warning(
"平台 %s(%s) 已存在实例,先终止旧实例再重载。",
platform_config["type"],
platform_id,
)
await self.terminate_platform(platform_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The load_platform function fails to correctly identify and terminate existing platform instances if the platform ID requires sanitization. When a platform ID contains illegal characters (like : or !), it is sanitized and the updated ID is stored in platform_config["id"]. However, the check for an existing instance on line 126 and the subsequent call to terminate_platform on line 132 use the original, unsanitized platform_id variable.

Since the platform instance was likely stored in self._inst_map using the sanitized ID during a previous load (see line 210), the check on line 126 will fail to find the existing instance. This results in multiple instances of the same platform running simultaneously, leading to duplicate message processing and resource leaks, which directly undermines the purpose of this PR.

            platform_id = platform_config.get("id")
            if not self._is_valid_platform_id(platform_id):
                sanitized_id, changed = self._sanitize_platform_id(platform_id)
                if sanitized_id and changed:
                    logger.warning(
                        "平台 ID %r 包含非法字符 ':' 或 '!',已替换为 %r。",
                        platform_id,
                        sanitized_id,
                    )
                    platform_config["id"] = sanitized_id
                    platform_id = sanitized_id # Update the local variable to the sanitized version
                    self.astrbot_config.save_config()
                else:
                    logger.error(
                        f"平台 ID {platform_id!r} 不能为空,跳过加载该平台适配器。",
                    )
                    return

Comment on lines +42 to +47
def _clean_expired_event_fingerprints(self) -> None:
now = time.time()
expire_before = now - self._dedup_ttl_seconds
for key, ts in list(self._recent_event_fingerprints.items()):
if ts < expire_before:
self._recent_event_fingerprints.pop(key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

此处的过期指纹清理逻辑存在性能隐患。_clean_expired_event_fingerprints 方法会在每次调用 _is_duplicate_event 时(即每个事件到来时)被调用,并遍历整个 _recent_event_fingerprints 字典。在高并发场景下,这会导致不必要的 CPU 消耗。

建议优化此逻辑,例如:

  1. 定期清理:设置一个后台任务或在 dispatch 循环中定期(而不是每次)清理过期指纹。
  2. 使用更高效的数据结构:可以考虑使用 collections.deque 配合 set 来管理指纹。deque可以高效地从头部移除过期项,而 set 提供快速的成员检查。这样可以将清理操作的复杂度从 O(N) 降低到 O(M)(M为过期项数量),查找复杂度保持 O(1)。

…dapter.py

Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
@001VIsir
Copy link
Author

001VIsir commented Mar 9, 2026

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并给出了一些总体反馈:

  • EventBusQQOfficialPlatformAdapter 中用于短时间窗口去重的映射,在每条消息上都会通过扫描整个字典来清理,这在高负载下可能会成为热点路径;建议考虑限制最大大小、采用概率性清理,或者降低清理频率(按更粗粒度的时间间隔清理),以减少每条消息的开销。
  • QQOfficialPlatformAdapter._is_duplicate_message 中新增的去重逻辑,会对每条新注册的消息以 info 级别打日志,在高并发部署中可能会导致日志被刷屏;建议降级到 debug,或仅在检测到重复消息时记录日志。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The short-window dedup maps in both `EventBus` and `QQOfficialPlatformAdapter` are cleaned on every message by scanning the entire dict, which may become a hot path under load; consider either bounding the size, cleaning probabilistically, or only cleaning at a coarser interval to reduce per-message overhead.
- The new dedup logic in `QQOfficialPlatformAdapter._is_duplicate_message` logs every newly registered message at `info` level, which could spam logs in busy deployments; consider downgrading to `debug` or logging only on duplicate detection.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="39-48" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    def _get_sender_id(self, message) -> str:
+        """Extract sender ID from different message types.
+
+        The precedence order is aligned with `_parse_from_qqofficial` to ensure
+        consistent deduplication and event fingerprinting:
+        1. author.user_openid
+        2. author.member_openid
+        3. author.id
+        """
+        if hasattr(message, "author") and hasattr(message.author, "user_openid"):
+            return str(message.author.user_openid)
+        if hasattr(message, "author") and hasattr(message.author, "member_openid"):
+            return str(message.author.member_openid)
+        if hasattr(message, "author") and hasattr(message.author, "id"):
+            return str(message.author.id)
+        return ""
</code_context>
<issue_to_address>
**issue (bug_risk):** `_get_sender_id` 中的发送者 ID 优先级与 `_parse_from_qqofficial` 不一致。

文档字符串声称 `_get_sender_id``_parse_from_qqofficial` 的行为一致,但两者的优先级不同:`_get_sender_id` 使用 `user_openid -> member_openid -> id`,而 `_parse_from_qqofficial` 只回退到 `user_openid or id`。这会导致事件总线去重键使用 `member_openid`,而 `abm.sender.id` 使用 `user_openid/id`,从而产生不一致的指纹。请考虑要么在 `_parse_from_qqofficial`(以及其他 `get_sender_id()` 调用方)中加入对 `member_openid` 的处理,要么从 `_get_sender_id` 中移除 `member_openid`,以确保优先级真正保持一致。
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="62" />
<code_context>
     ) -> None:
+        sender_id = self._get_sender_id(message)
+        content = getattr(message, 'content', '') or ""
+        if await self.platform._is_duplicate_message(message.id, content, sender_id):
+            return
         abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 去重逻辑假定 `message.id``str` 类型,而这不一定总是成立。

`_is_duplicate_message` 期望接收一个 `str`,但当前是直接传入 `message.id`。如果 `id` 的类型发生变化(或者在不同消息类中不一致),生成的去重键就可能不一致。建议在调用处或 `_is_duplicate_message` 内部显式使用 `str(message.id)` 做归一化,以提高对类型变化的鲁棒性。

Suggested implementation:

```python
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(

```

```python
    async def on_at_message_create(self, message: botpy.message.Message) -> None:
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(

```
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得这次 Review 有帮助,可以考虑分享给更多人 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据反馈改进后续的 Review。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • The short-window dedup maps in both EventBus and QQOfficialPlatformAdapter are cleaned on every message by scanning the entire dict, which may become a hot path under load; consider either bounding the size, cleaning probabilistically, or only cleaning at a coarser interval to reduce per-message overhead.
  • The new dedup logic in QQOfficialPlatformAdapter._is_duplicate_message logs every newly registered message at info level, which could spam logs in busy deployments; consider downgrading to debug or logging only on duplicate detection.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The short-window dedup maps in both `EventBus` and `QQOfficialPlatformAdapter` are cleaned on every message by scanning the entire dict, which may become a hot path under load; consider either bounding the size, cleaning probabilistically, or only cleaning at a coarser interval to reduce per-message overhead.
- The new dedup logic in `QQOfficialPlatformAdapter._is_duplicate_message` logs every newly registered message at `info` level, which could spam logs in busy deployments; consider downgrading to `debug` or logging only on duplicate detection.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="39-48" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    def _get_sender_id(self, message) -> str:
+        """Extract sender ID from different message types.
+
+        The precedence order is aligned with `_parse_from_qqofficial` to ensure
+        consistent deduplication and event fingerprinting:
+        1. author.user_openid
+        2. author.member_openid
+        3. author.id
+        """
+        if hasattr(message, "author") and hasattr(message.author, "user_openid"):
+            return str(message.author.user_openid)
+        if hasattr(message, "author") and hasattr(message.author, "member_openid"):
+            return str(message.author.member_openid)
+        if hasattr(message, "author") and hasattr(message.author, "id"):
+            return str(message.author.id)
+        return ""
</code_context>
<issue_to_address>
**issue (bug_risk):** Sender ID precedence in `_get_sender_id` is inconsistent with `_parse_from_qqofficial`.

The docstring claims `_get_sender_id` matches `_parse_from_qqofficial`, but the precedence differs: `_get_sender_id` uses `user_openid -> member_openid -> id`, while `_parse_from_qqofficial` only falls back to `user_openid or id`. This can cause event-bus dedup keys to use `member_openid` while `abm.sender.id` uses `user_openid/id`, leading to inconsistent fingerprints. Please either add `member_openid` to `_parse_from_qqofficial` (and other `get_sender_id()` callers) or remove it from `_get_sender_id` so the precedence is truly consistent.
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="62" />
<code_context>
     ) -> None:
+        sender_id = self._get_sender_id(message)
+        content = getattr(message, 'content', '') or ""
+        if await self.platform._is_duplicate_message(message.id, content, sender_id):
+            return
         abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Deduplication assumes `message.id` is a `str`, which may not always be true.

`_is_duplicate_message` expects a `str` but receives `message.id` directly. If the `id` type changes (or differs across message classes), the deduplication keys could become inconsistent. Consider explicitly normalizing with `str(message.id)` either at the call site or within `_is_duplicate_message` to make the behavior robust to type changes.

Suggested implementation:

```python
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(

```

```python
    async def on_at_message_create(self, message: botpy.message.Message) -> None:
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(

```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +39 to +48
def _get_sender_id(self, message) -> str:
"""Extract sender ID from different message types.

The precedence order is aligned with `_parse_from_qqofficial` to ensure
consistent deduplication and event fingerprinting:
1. author.user_openid
2. author.member_openid
3. author.id
"""
if hasattr(message, "author") and hasattr(message.author, "user_openid"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): _get_sender_id 中的发送者 ID 优先级与 _parse_from_qqofficial 不一致。

文档字符串声称 _get_sender_id_parse_from_qqofficial 的行为一致,但两者的优先级不同:_get_sender_id 使用 user_openid -> member_openid -> id,而 _parse_from_qqofficial 只回退到 user_openid or id。这会导致事件总线去重键使用 member_openid,而 abm.sender.id 使用 user_openid/id,从而产生不一致的指纹。请考虑要么在 _parse_from_qqofficial(以及其他 get_sender_id() 调用方)中加入对 member_openid 的处理,要么从 _get_sender_id 中移除 member_openid,以确保优先级真正保持一致。

Original comment in English

issue (bug_risk): Sender ID precedence in _get_sender_id is inconsistent with _parse_from_qqofficial.

The docstring claims _get_sender_id matches _parse_from_qqofficial, but the precedence differs: _get_sender_id uses user_openid -> member_openid -> id, while _parse_from_qqofficial only falls back to user_openid or id. This can cause event-bus dedup keys to use member_openid while abm.sender.id uses user_openid/id, leading to inconsistent fingerprints. Please either add member_openid to _parse_from_qqofficial (and other get_sender_id() callers) or remove it from _get_sender_id so the precedence is truly consistent.

) -> None:
sender_id = self._get_sender_id(message)
content = getattr(message, 'content', '') or ""
if await self.platform._is_duplicate_message(message.id, content, sender_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 去重逻辑假定 message.idstr 类型,而这不一定总是成立。

_is_duplicate_message 期望接收一个 str,但当前是直接传入 message.id。如果 id 的类型发生变化(或者在不同消息类中不一致),生成的去重键就可能不一致。建议在调用处或 _is_duplicate_message 内部显式使用 str(message.id) 做归一化,以提高对类型变化的鲁棒性。

Suggested implementation:

        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
    async def on_at_message_create(self, message: botpy.message.Message) -> None:
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
Original comment in English

suggestion (bug_risk): Deduplication assumes message.id is a str, which may not always be true.

_is_duplicate_message expects a str but receives message.id directly. If the id type changes (or differs across message classes), the deduplication keys could become inconsistent. Consider explicitly normalizing with str(message.id) either at the call site or within _is_duplicate_message to make the behavior robust to type changes.

Suggested implementation:

        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
    async def on_at_message_create(self, message: botpy.message.Message) -> None:
        sender_id = self._get_sender_id(message)
        content = getattr(message, 'content', '') or ""
        if await self.platform._is_duplicate_message(str(message.id), content, sender_id):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(

@001VIsir
Copy link
Author

001VIsir commented Mar 9, 2026

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题,并给出了一些高层次的反馈:

  • QQOfficial 的去重映射(message_id_timestampscontent_key_timestamps)目前是无界的;建议增加最大容量或定期在后台清理,以避免在长期高并发、高流量场景下内存无限增长。
  • 使用单个 _dedup_lock 来包裹整个 _is_duplicate_message 检查和登记流程,在高并发场景下可能成为瓶颈;建议考虑使用更细粒度的结构(例如分片锁,或基于原子更新的无锁清理)来降低锁竞争。
  • 事件总线和 QQOfficial 的去重逻辑都会在 info 级别记录每一条重复/新消息,这在生产环境中可能会比较嘈杂;建议将日志级别降到 debug,或增加采样以控制日志量。
供 AI Agent 使用的提示词
Please address the comments from this code review:

## Overall Comments
- QQOfficial 的去重映射(`message_id_timestamps``content_key_timestamps`)目前是无界的;建议增加最大容量或定期在后台清理,以避免在长期高并发、高流量场景下内存无限增长。
- 使用单个 `_dedup_lock` 来包裹整个 `_is_duplicate_message` 检查和登记流程,在高并发场景下可能成为瓶颈;建议考虑使用更细粒度的结构(例如分片锁,或基于原子更新的无锁清理)来降低锁竞争。
- 事件总线和 QQOfficial 的去重逻辑都会在 `info` 级别记录每一条重复/新消息,这在生产环境中可能会比较嘈杂;建议将日志级别降到 `debug`,或增加采样以控制日志量。

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="175" />
<code_context>
+
+    def _clean_expired_messages(self) -> None:
+        """Clean up expired dedup caches."""
+        current_time = time.time()
+        expired_message_ids = [
+            msg_id
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 考虑使用单调时钟来做基于 TTL 的去重,而不是使用 `time.time()`。

当前的这段清理逻辑和 event_bus 的去重逻辑都使用 `time.time()` 来做 TTL 过期判断。如果系统时间发生跳变(例如 NTP 同步、手动调整、夏令时变更),条目可能会过早或过晚过期。对于相对 TTL 来说,`time.monotonic()` 更安全,因为它不受系统当前时间调整的影响,可以保持去重时序的一致性。

建议实现方式:

```python
    def _clean_expired_messages(self) -> None:
        """Clean up expired dedup caches."""
        # Use a monotonic clock for TTL-based expiry to avoid issues with system clock changes.
        current_time = time.monotonic()
        expired_message_ids = [

```

```python
        expired_content_keys = [
            content_key
            for content_key, timestamp in self.content_key_timestamps.items()
            if current_time - timestamp > self._content_key_ttl_seconds

```

要完全完成向单调时间的切换以用于基于 TTL 的去重,还需要:

1. 修改所有向 `self.message_id_timestamps``self.content_key_timestamps` *写入* 的代码,改为使用 `time.monotonic()` 而不是 `time.time()`。例如:
   - `self.message_id_timestamps[msg_id] = time.monotonic()`
   - `self.content_key_timestamps[content_key] = time.monotonic()`
2. 在当前使用 `time.time()`TTL 检查的「event_bus 去重逻辑」中,同时更新:
   - 存储时间戳的地方使用 `time.monotonic()`- TTL 比较时的当前时间来源也改为 `time.monotonic()`3. 确认没有代码将墙上时钟时间(例如来自 `time.time()` 或 datetime)的时间戳与这些用于去重的时间戳混用;所有 TTL 比较都必须是 monotonic-to-monotonic。

通过这些额外的修改,可以让所有与去重相关的计时在面对系统时间调整时仍然保持一致且健壮。
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进之后的 Review。
Original comment in English

Hey - I've found 1 issue, and left some high level feedback:

  • The QQOfficial deduplication maps (message_id_timestamps and content_key_timestamps) are unbounded; consider adding a max size or periodic background cleanup to avoid unbounded memory growth under long-running high-traffic workloads.
  • Using a single _dedup_lock around the entire _is_duplicate_message check and registration can become a bottleneck under high concurrency; consider using a more granular structure (e.g., sharded locks or lock-free cleanup with atomic updates) to reduce contention.
  • The event bus and QQOfficial dedup logic both log every duplicate/new message at info level, which may be noisy in production; consider downgrading to debug or adding sampling to keep logs manageable.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The QQOfficial deduplication maps (`message_id_timestamps` and `content_key_timestamps`) are unbounded; consider adding a max size or periodic background cleanup to avoid unbounded memory growth under long-running high-traffic workloads.
- Using a single `_dedup_lock` around the entire `_is_duplicate_message` check and registration can become a bottleneck under high concurrency; consider using a more granular structure (e.g., sharded locks or lock-free cleanup with atomic updates) to reduce contention.
- The event bus and QQOfficial dedup logic both log every duplicate/new message at `info` level, which may be noisy in production; consider downgrading to `debug` or adding sampling to keep logs manageable.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="175" />
<code_context>
+
+    def _clean_expired_messages(self) -> None:
+        """Clean up expired dedup caches."""
+        current_time = time.time()
+        expired_message_ids = [
+            msg_id
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider using a monotonic clock for TTL-based deduplication instead of `time.time()`.

Both this cleanup and the event_bus dedup logic use `time.time()` for TTL expiry. If the system clock jumps (NTP sync, manual change, DST quirks), entries can expire too early or too late. For relative TTLs, `time.monotonic()` is safer because it isn’t affected by wall-clock adjustments and keeps dedup timing consistent.

Suggested implementation:

```python
    def _clean_expired_messages(self) -> None:
        """Clean up expired dedup caches."""
        # Use a monotonic clock for TTL-based expiry to avoid issues with system clock changes.
        current_time = time.monotonic()
        expired_message_ids = [

```

```python
        expired_content_keys = [
            content_key
            for content_key, timestamp in self.content_key_timestamps.items()
            if current_time - timestamp > self._content_key_ttl_seconds

```

To fully implement the switch to monotonic time for TTL-based deduplication, you should also:

1. Change any code that *writes* to `self.message_id_timestamps` and `self.content_key_timestamps` to use `time.monotonic()` instead of `time.time()`. For example:
   - `self.message_id_timestamps[msg_id] = time.monotonic()`
   - `self.content_key_timestamps[content_key] = time.monotonic()`
2. In the "event_bus dedup logic" that currently uses `time.time()` for TTL checks, update both:
   - The timestamp storage to use `time.monotonic()`.
   - The TTL comparison to use `time.monotonic()` as the current time source.
3. Verify that no code mixes wall-clock timestamps (e.g. from `time.time()` or datetimes) with these dedup timestamp values; all TTL comparisons must be monotonic-to-monotonic.

These additional changes keep all dedup-related timing consistent and robust against wall-clock adjustments.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

001VIsir and others added 4 commits March 10, 2026 00:49
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
@001VIsir
Copy link
Author

001VIsir commented Mar 9, 2026

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,我发现了两个问题,并给出了一些整体层面的反馈:

  • 建议把 problems.md 从仓库中移除(或迁移到合适的开发者文档位置),因为它描述的是一次性的本地环境问题,而不是项目级行为,很可能很快就会过时。
  • 新增的去重日志(例如 _is_duplicate_message_is_duplicate_event 中)对每次命中/未命中都使用 info 级别;建议将其中部分降级为 debug,以避免在高消息量时造成日志噪声。
给 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- Consider removing `problems.md` from the repo (or moving it to an appropriate developer docs location) since it describes a one-off local environment issue rather than project-level behavior and may quickly become stale.
- The new deduplication logging (e.g., in `_is_duplicate_message` and `_is_duplicate_event`) uses `info` level for every hit/miss; consider downgrading some of these to `debug` to avoid log noise under high message volume.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="144" />
<code_context>
         qq_group = platform_config["enable_group_c2c"]
         guild_dm = platform_config["enable_guild_direct_message"]

+        # Lock for thread-safe deduplication
+        self._dedup_lock = asyncio.Lock()
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议将去重逻辑和 key 提取抽取到可复用的辅助类/函数中,从而让适配器和各个 handler 更简洁、更少重复。

在保持当前去重行为的前提下,你可以通过以下方式显著降低复杂度和重复:

1. 将去重用到的字典/锁/清理逻辑封装到一个小的辅助类中。
2.`(sender_id, content)` 的提取集中到一处,避免每个 handler 都重复这段逻辑。

### 1. 提取一个 `MessageDeduplicator` 辅助类

将缓存管理、TTL 和哈希计算从 `QQOfficialPlatformAdapter` 中抽取到一个小类中:

```python
class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
    ) -> None:
        self._message_id_timestamps: dict[str, float] = {}
        self._content_key_timestamps: dict[str, float] = {}
        self._message_id_ttl_seconds = message_id_ttl_seconds
        self._content_key_ttl_seconds = content_key_ttl_seconds
        self._lock = asyncio.Lock()

    def _clean_expired(self, now: float) -> None:
        self._message_id_timestamps = {
            k: ts
            for k, ts in self._message_id_timestamps.items()
            if now - ts <= self._message_id_ttl_seconds
        }
        self._content_key_timestamps = {
            k: ts
            for k, ts in self._content_key_timestamps.items()
            if now - ts <= self._content_key_ttl_seconds
        }

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            now = time.monotonic()
            self._clean_expired(now)

            if message_id in self._message_id_timestamps:
                logger.info(
                    f"[QQOfficial] Duplicate message detected (by ID): {message_id[:50]}..."
                )
                return True

            content_key = None
            if content and sender_id:
                content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
                content_key = f"{sender_id}:{content_hash}"
                if content_key in self._content_key_timestamps:
                    logger.info(
                        f"[QQOfficial] Duplicate message detected (by content): {content_key}"
                    )
                    return True

            # register
            self._message_id_timestamps[message_id] = now
            if content_key is not None:
                self._content_key_timestamps[content_key] = now

            logger.info(f"[QQOfficial] New message registered: {message_id[:50]}...")
            return False
```

然后在 `QQOfficialPlatformAdapter.__init__` 中:

```python
class QQOfficialPlatformAdapter(PlatformAdapter):
    def __init__(...):
        ...
        self._deduplicator = MessageDeduplicator(
            message_id_ttl_seconds=30 * 60,
            content_key_ttl_seconds=3.0,
        )
```

并将 `_is_duplicate_message` 替换为一个很薄的封装(或者直接调用该辅助类):

```python
    async def _is_duplicate_message(
        self, message_id: str, content: str = "", sender_id: str = ""
    ) -> bool:
        return await self._deduplicator.is_duplicate(message_id, content, sender_id)
```

这在保持原有行为完全一致的同时,把底层缓存/哈希细节从适配器中移了出去。

### 2. 集中处理各 handler 的 sender/content 提取

你已经添加了 `_get_sender_id`。可以进一步把内容的提取也集中起来,在所有 handler 中复用:

```python
class botClient(Client):
    ...

    def _extract_dedup_key(self, message) -> tuple[str, str]:
        sender_id = self._get_sender_id(message)
        content = getattr(message, "content", "") or ""
        return sender_id, content

    async def _should_drop_message(self, message) -> bool:
        sender_id, content = self._extract_dedup_key(message)
        return await self.platform._is_duplicate_message(message.id, content, sender_id)

    async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
        if await self._should_drop_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        ...
```

然后在 `on_at_message_create``on_direct_message_create``on_c2c_message_create` 中同样调用 `_should_drop_message`,以消除重复的 sender/content 获取和去重调用逻辑,同时保留原有的去重检查。
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="38" />
<code_context>
         # abconf uuid -> scheduler
         self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
         self.astrbot_config_mgr = astrbot_config_mgr
+        # 跨平台实例短窗去重(兜底):处理同一消息在极短时间内重复入队。
+        # 最近事件指纹,短窗去重(0.5s),单消费者线程内使用
+        self._dedup_ttl_seconds = 0.5
</code_context>
<issue_to_address>
**issue (complexity):** 建议将去重 TTL 和指纹管理抽取到一个小的 `EventDeduplicator` 辅助类中,让 `EventBus` 只关注分发意图本身。

你可以保留新增的去重行为,同时通过抽取一个很小的辅助类来封装 TTL + 指纹逻辑,从而降低 `EventBus` 的理解成本。

### 1. 抽取一个可复用的 `EventDeduplicator`

将状态和底层管理逻辑移动到一个小类中(可以放在同一个模块或单独模块):

```python
import time
from collections import deque
from typing import Deque, Set, Tuple

class EventDeduplicator:
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        self._ttl_seconds = ttl_seconds
        self._seen: Set[tuple] = set()
        self._queue: Deque[Tuple[float, tuple]] = deque()

    def _clean_expired(self) -> None:
        now = time.monotonic()
        expire_before = now - self._ttl_seconds
        while self._queue and self._queue[0][0] < expire_before:
            _, fingerprint = self._queue.popleft()
            self._seen.discard(fingerprint)

    def _build_fingerprint(self, event: AstrMessageEvent) -> tuple:
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        self._clean_expired()
        fingerprint = self._build_fingerprint(event)
        if fingerprint in self._seen:
            return True
        ts = time.monotonic()
        self._seen.add(fingerprint)
        self._queue.append((ts, fingerprint))
        return False
```

### 2. 简化 `EventBus`,只负责调用去重

这样 `EventBus` 只表达“需要去重”这一意图,不再管理底层缓存细节:

```python
class EventBus:
    def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr) -> None:
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr
        self._deduplicator = EventDeduplicator(ttl_seconds=0.5)

    async def dispatch(self) -> None:
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._deduplicator.is_duplicate(event):
                logger.info(
                    "Skip duplicate event in event_bus, umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                continue
            # ... existing logic ...
```

这样可以在保持所有行为(单调时间 TTL、指纹定义、短时间窗去重)不变的同时:

- `EventBus` 不再把分发逻辑和底层缓存维护混在一起。
- 去重逻辑可以被单独复用和测试,如果之后想在平台层和总线层之间统一去重策略,也更容易共享/扩展。
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会基于这些反馈改进后续的 Review。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • Consider removing problems.md from the repo (or moving it to an appropriate developer docs location) since it describes a one-off local environment issue rather than project-level behavior and may quickly become stale.
  • The new deduplication logging (e.g., in _is_duplicate_message and _is_duplicate_event) uses info level for every hit/miss; consider downgrading some of these to debug to avoid log noise under high message volume.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider removing `problems.md` from the repo (or moving it to an appropriate developer docs location) since it describes a one-off local environment issue rather than project-level behavior and may quickly become stale.
- The new deduplication logging (e.g., in `_is_duplicate_message` and `_is_duplicate_event`) uses `info` level for every hit/miss; consider downgrading some of these to `debug` to avoid log noise under high message volume.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="144" />
<code_context>
         qq_group = platform_config["enable_group_c2c"]
         guild_dm = platform_config["enable_guild_direct_message"]

+        # Lock for thread-safe deduplication
+        self._dedup_lock = asyncio.Lock()
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the deduplication logic and key extraction into reusable helpers to keep the adapter and handlers simpler and less repetitive.

You can keep the dedup behavior but significantly reduce complexity and duplication by:

1. Encapsulating the dedup dictionaries/lock/cleanup into a small helper.
2. Centralizing `(sender_id, content)` extraction so each handler doesn’t repeat it.

### 1. Extract a `MessageDeduplicator` helper

Move the cache management, TTLs, and hashing out of `QQOfficialPlatformAdapter` into a small class:

```python
class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
    ) -> None:
        self._message_id_timestamps: dict[str, float] = {}
        self._content_key_timestamps: dict[str, float] = {}
        self._message_id_ttl_seconds = message_id_ttl_seconds
        self._content_key_ttl_seconds = content_key_ttl_seconds
        self._lock = asyncio.Lock()

    def _clean_expired(self, now: float) -> None:
        self._message_id_timestamps = {
            k: ts
            for k, ts in self._message_id_timestamps.items()
            if now - ts <= self._message_id_ttl_seconds
        }
        self._content_key_timestamps = {
            k: ts
            for k, ts in self._content_key_timestamps.items()
            if now - ts <= self._content_key_ttl_seconds
        }

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            now = time.monotonic()
            self._clean_expired(now)

            if message_id in self._message_id_timestamps:
                logger.info(
                    f"[QQOfficial] Duplicate message detected (by ID): {message_id[:50]}..."
                )
                return True

            content_key = None
            if content and sender_id:
                content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
                content_key = f"{sender_id}:{content_hash}"
                if content_key in self._content_key_timestamps:
                    logger.info(
                        f"[QQOfficial] Duplicate message detected (by content): {content_key}"
                    )
                    return True

            # register
            self._message_id_timestamps[message_id] = now
            if content_key is not None:
                self._content_key_timestamps[content_key] = now

            logger.info(f"[QQOfficial] New message registered: {message_id[:50]}...")
            return False
```

Then in `QQOfficialPlatformAdapter.__init__`:

```python
class QQOfficialPlatformAdapter(PlatformAdapter):
    def __init__(...):
        ...
        self._deduplicator = MessageDeduplicator(
            message_id_ttl_seconds=30 * 60,
            content_key_ttl_seconds=3.0,
        )
```

And replace `_is_duplicate_message` with a thin wrapper (or just call the helper directly):

```python
    async def _is_duplicate_message(
        self, message_id: str, content: str = "", sender_id: str = ""
    ) -> bool:
        return await self._deduplicator.is_duplicate(message_id, content, sender_id)
```

This keeps behavior identical but removes the low-level cache/hashing mechanics from the adapter itself.

### 2. Centralize sender/content extraction for handlers

You already added `_get_sender_id`. You can also centralize the content extraction and reuse in all handlers:

```python
class botClient(Client):
    ...

    def _extract_dedup_key(self, message) -> tuple[str, str]:
        sender_id = self._get_sender_id(message)
        content = getattr(message, "content", "") or ""
        return sender_id, content

    async def _should_drop_message(self, message) -> bool:
        sender_id, content = self._extract_dedup_key(message)
        return await self.platform._is_duplicate_message(message.id, content, sender_id)

    async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
        if await self._should_drop_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        ...
```

Then apply the same `_should_drop_message` call to `on_at_message_create`, `on_direct_message_create`, and `on_c2c_message_create` to remove repeated sender/content retrieval and dedup calls while preserving the dedup checks.
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="38" />
<code_context>
         # abconf uuid -> scheduler
         self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
         self.astrbot_config_mgr = astrbot_config_mgr
+        # 跨平台实例短窗去重(兜底):处理同一消息在极短时间内重复入队。
+        # 最近事件指纹,短窗去重(0.5s),单消费者线程内使用
+        self._dedup_ttl_seconds = 0.5
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the deduplication TTL and fingerprint bookkeeping into a small `EventDeduplicator` helper so `EventBus` focuses only on dispatch intent.

You can keep the new deduplication behavior while reducing the cognitive load on `EventBus` by extracting a tiny helper that encapsulates the TTL + fingerprint logic.

### 1. Extract a reusable `EventDeduplicator`

Move the state and low-level bookkeeping into a small class (could live in the same module or a separate one):

```python
import time
from collections import deque
from typing import Deque, Set, Tuple

class EventDeduplicator:
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        self._ttl_seconds = ttl_seconds
        self._seen: Set[tuple] = set()
        self._queue: Deque[Tuple[float, tuple]] = deque()

    def _clean_expired(self) -> None:
        now = time.monotonic()
        expire_before = now - self._ttl_seconds
        while self._queue and self._queue[0][0] < expire_before:
            _, fingerprint = self._queue.popleft()
            self._seen.discard(fingerprint)

    def _build_fingerprint(self, event: AstrMessageEvent) -> tuple:
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        self._clean_expired()
        fingerprint = self._build_fingerprint(event)
        if fingerprint in self._seen:
            return True
        ts = time.monotonic()
        self._seen.add(fingerprint)
        self._queue.append((ts, fingerprint))
        return False
```

### 2. Simplify `EventBus` to delegate dedup

Then `EventBus` only expresses the *intent* (“deduplicate”) and no longer manages cache internals:

```python
class EventBus:
    def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr) -> None:
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr
        self._deduplicator = EventDeduplicator(ttl_seconds=0.5)

    async def dispatch(self) -> None:
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._deduplicator.is_duplicate(event):
                logger.info(
                    "Skip duplicate event in event_bus, umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                continue
            # ... existing logic ...
```

This keeps all behavior (monotonic TTL, fingerprint definition, short-window dedup) but:

- `EventBus` no longer mixes dispatch logic with low-level cache maintenance.
- The dedup logic is reusable and testable in isolation, and can be shared/extended if you decide to harmonize platform-level and bus-level dedup later.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

qq_group = platform_config["enable_group_c2c"]
guild_dm = platform_config["enable_guild_direct_message"]

# Lock for thread-safe deduplication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将去重逻辑和 key 提取抽取到可复用的辅助类/函数中,从而让适配器和各个 handler 更简洁、更少重复。

在保持当前去重行为的前提下,你可以通过以下方式显著降低复杂度和重复:

  1. 将去重用到的字典/锁/清理逻辑封装到一个小的辅助类中。
  2. (sender_id, content) 的提取集中到一处,避免每个 handler 都重复这段逻辑。

1. 提取一个 MessageDeduplicator 辅助类

将缓存管理、TTL 和哈希计算从 QQOfficialPlatformAdapter 中抽取到一个小类中:

class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
    ) -> None:
        self._message_id_timestamps: dict[str, float] = {}
        self._content_key_timestamps: dict[str, float] = {}
        self._message_id_ttl_seconds = message_id_ttl_seconds
        self._content_key_ttl_seconds = content_key_ttl_seconds
        self._lock = asyncio.Lock()

    def _clean_expired(self, now: float) -> None:
        self._message_id_timestamps = {
            k: ts
            for k, ts in self._message_id_timestamps.items()
            if now - ts <= self._message_id_ttl_seconds
        }
        self._content_key_timestamps = {
            k: ts
            for k, ts in self._content_key_timestamps.items()
            if now - ts <= self._content_key_ttl_seconds
        }

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            now = time.monotonic()
            self._clean_expired(now)

            if message_id in self._message_id_timestamps:
                logger.info(
                    f"[QQOfficial] Duplicate message detected (by ID): {message_id[:50]}..."
                )
                return True

            content_key = None
            if content and sender_id:
                content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
                content_key = f"{sender_id}:{content_hash}"
                if content_key in self._content_key_timestamps:
                    logger.info(
                        f"[QQOfficial] Duplicate message detected (by content): {content_key}"
                    )
                    return True

            # register
            self._message_id_timestamps[message_id] = now
            if content_key is not None:
                self._content_key_timestamps[content_key] = now

            logger.info(f"[QQOfficial] New message registered: {message_id[:50]}...")
            return False

然后在 QQOfficialPlatformAdapter.__init__ 中:

class QQOfficialPlatformAdapter(PlatformAdapter):
    def __init__(...):
        ...
        self._deduplicator = MessageDeduplicator(
            message_id_ttl_seconds=30 * 60,
            content_key_ttl_seconds=3.0,
        )

并将 _is_duplicate_message 替换为一个很薄的封装(或者直接调用该辅助类):

    async def _is_duplicate_message(
        self, message_id: str, content: str = "", sender_id: str = ""
    ) -> bool:
        return await self._deduplicator.is_duplicate(message_id, content, sender_id)

这在保持原有行为完全一致的同时,把底层缓存/哈希细节从适配器中移了出去。

2. 集中处理各 handler 的 sender/content 提取

你已经添加了 _get_sender_id。可以进一步把内容的提取也集中起来,在所有 handler 中复用:

class botClient(Client):
    ...

    def _extract_dedup_key(self, message) -> tuple[str, str]:
        sender_id = self._get_sender_id(message)
        content = getattr(message, "content", "") or ""
        return sender_id, content

    async def _should_drop_message(self, message) -> bool:
        sender_id, content = self._extract_dedup_key(message)
        return await self.platform._is_duplicate_message(message.id, content, sender_id)

    async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
        if await self._should_drop_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        ...

然后在 on_at_message_createon_direct_message_createon_c2c_message_create 中同样调用 _should_drop_message,以消除重复的 sender/content 获取和去重调用逻辑,同时保留原有的去重检查。

Original comment in English

issue (complexity): Consider extracting the deduplication logic and key extraction into reusable helpers to keep the adapter and handlers simpler and less repetitive.

You can keep the dedup behavior but significantly reduce complexity and duplication by:

  1. Encapsulating the dedup dictionaries/lock/cleanup into a small helper.
  2. Centralizing (sender_id, content) extraction so each handler doesn’t repeat it.

1. Extract a MessageDeduplicator helper

Move the cache management, TTLs, and hashing out of QQOfficialPlatformAdapter into a small class:

class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
    ) -> None:
        self._message_id_timestamps: dict[str, float] = {}
        self._content_key_timestamps: dict[str, float] = {}
        self._message_id_ttl_seconds = message_id_ttl_seconds
        self._content_key_ttl_seconds = content_key_ttl_seconds
        self._lock = asyncio.Lock()

    def _clean_expired(self, now: float) -> None:
        self._message_id_timestamps = {
            k: ts
            for k, ts in self._message_id_timestamps.items()
            if now - ts <= self._message_id_ttl_seconds
        }
        self._content_key_timestamps = {
            k: ts
            for k, ts in self._content_key_timestamps.items()
            if now - ts <= self._content_key_ttl_seconds
        }

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            now = time.monotonic()
            self._clean_expired(now)

            if message_id in self._message_id_timestamps:
                logger.info(
                    f"[QQOfficial] Duplicate message detected (by ID): {message_id[:50]}..."
                )
                return True

            content_key = None
            if content and sender_id:
                content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
                content_key = f"{sender_id}:{content_hash}"
                if content_key in self._content_key_timestamps:
                    logger.info(
                        f"[QQOfficial] Duplicate message detected (by content): {content_key}"
                    )
                    return True

            # register
            self._message_id_timestamps[message_id] = now
            if content_key is not None:
                self._content_key_timestamps[content_key] = now

            logger.info(f"[QQOfficial] New message registered: {message_id[:50]}...")
            return False

Then in QQOfficialPlatformAdapter.__init__:

class QQOfficialPlatformAdapter(PlatformAdapter):
    def __init__(...):
        ...
        self._deduplicator = MessageDeduplicator(
            message_id_ttl_seconds=30 * 60,
            content_key_ttl_seconds=3.0,
        )

And replace _is_duplicate_message with a thin wrapper (or just call the helper directly):

    async def _is_duplicate_message(
        self, message_id: str, content: str = "", sender_id: str = ""
    ) -> bool:
        return await self._deduplicator.is_duplicate(message_id, content, sender_id)

This keeps behavior identical but removes the low-level cache/hashing mechanics from the adapter itself.

2. Centralize sender/content extraction for handlers

You already added _get_sender_id. You can also centralize the content extraction and reuse in all handlers:

class botClient(Client):
    ...

    def _extract_dedup_key(self, message) -> tuple[str, str]:
        sender_id = self._get_sender_id(message)
        content = getattr(message, "content", "") or ""
        return sender_id, content

    async def _should_drop_message(self, message) -> bool:
        sender_id, content = self._extract_dedup_key(message)
        return await self.platform._is_duplicate_message(message.id, content, sender_id)

    async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
        if await self._should_drop_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        ...

Then apply the same _should_drop_message call to on_at_message_create, on_direct_message_create, and on_c2c_message_create to remove repeated sender/content retrieval and dedup calls while preserving the dedup checks.

# abconf uuid -> scheduler
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
# 跨平台实例短窗去重(兜底):处理同一消息在极短时间内重复入队。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将去重 TTL 和指纹管理抽取到一个小的 EventDeduplicator 辅助类中,让 EventBus 只关注分发意图本身。

你可以保留新增的去重行为,同时通过抽取一个很小的辅助类来封装 TTL + 指纹逻辑,从而降低 EventBus 的理解成本。

1. 抽取一个可复用的 EventDeduplicator

将状态和底层管理逻辑移动到一个小类中(可以放在同一个模块或单独模块):

import time
from collections import deque
from typing import Deque, Set, Tuple

class EventDeduplicator:
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        self._ttl_seconds = ttl_seconds
        self._seen: Set[tuple] = set()
        self._queue: Deque[Tuple[float, tuple]] = deque()

    def _clean_expired(self) -> None:
        now = time.monotonic()
        expire_before = now - self._ttl_seconds
        while self._queue and self._queue[0][0] < expire_before:
            _, fingerprint = self._queue.popleft()
            self._seen.discard(fingerprint)

    def _build_fingerprint(self, event: AstrMessageEvent) -> tuple:
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        self._clean_expired()
        fingerprint = self._build_fingerprint(event)
        if fingerprint in self._seen:
            return True
        ts = time.monotonic()
        self._seen.add(fingerprint)
        self._queue.append((ts, fingerprint))
        return False

2. 简化 EventBus,只负责调用去重

这样 EventBus 只表达“需要去重”这一意图,不再管理底层缓存细节:

class EventBus:
    def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr) -> None:
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr
        self._deduplicator = EventDeduplicator(ttl_seconds=0.5)

    async def dispatch(self) -> None:
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._deduplicator.is_duplicate(event):
                logger.info(
                    "Skip duplicate event in event_bus, umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                continue
            # ... existing logic ...

这样可以在保持所有行为(单调时间 TTL、指纹定义、短时间窗去重)不变的同时:

  • EventBus 不再把分发逻辑和底层缓存维护混在一起。
  • 去重逻辑可以被单独复用和测试,如果之后想在平台层和总线层之间统一去重策略,也更容易共享/扩展。
Original comment in English

issue (complexity): Consider extracting the deduplication TTL and fingerprint bookkeeping into a small EventDeduplicator helper so EventBus focuses only on dispatch intent.

You can keep the new deduplication behavior while reducing the cognitive load on EventBus by extracting a tiny helper that encapsulates the TTL + fingerprint logic.

1. Extract a reusable EventDeduplicator

Move the state and low-level bookkeeping into a small class (could live in the same module or a separate one):

import time
from collections import deque
from typing import Deque, Set, Tuple

class EventDeduplicator:
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        self._ttl_seconds = ttl_seconds
        self._seen: Set[tuple] = set()
        self._queue: Deque[Tuple[float, tuple]] = deque()

    def _clean_expired(self) -> None:
        now = time.monotonic()
        expire_before = now - self._ttl_seconds
        while self._queue and self._queue[0][0] < expire_before:
            _, fingerprint = self._queue.popleft()
            self._seen.discard(fingerprint)

    def _build_fingerprint(self, event: AstrMessageEvent) -> tuple:
        return (
            event.get_platform_id() or "",
            event.unified_msg_origin or "",
            event.get_sender_id() or "",
            (event.get_message_str() or "").strip(),
        )

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        self._clean_expired()
        fingerprint = self._build_fingerprint(event)
        if fingerprint in self._seen:
            return True
        ts = time.monotonic()
        self._seen.add(fingerprint)
        self._queue.append((ts, fingerprint))
        return False

2. Simplify EventBus to delegate dedup

Then EventBus only expresses the intent (“deduplicate”) and no longer manages cache internals:

class EventBus:
    def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr) -> None:
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr
        self._deduplicator = EventDeduplicator(ttl_seconds=0.5)

    async def dispatch(self) -> None:
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._deduplicator.is_duplicate(event):
                logger.info(
                    "Skip duplicate event in event_bus, umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                continue
            # ... existing logic ...

This keeps all behavior (monotonic TTL, fingerprint definition, short-window dedup) but:

  • EventBus no longer mixes dispatch logic with low-level cache maintenance.
  • The dedup logic is reusable and testable in isolation, and can be shared/extended if you decide to harmonize platform-level and bus-level dedup later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]消息重复读取

1 participant

X Tutup