Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the knowledge base system by introducing advanced capabilities for managing and interacting with indexed data. It enables dynamic switching between different embedding models without requiring a full re-index, and provides a novel approach to indexing structured documents, allowing LLMs to retrieve information more efficiently and contextually. These changes aim to make the knowledge base more flexible, scalable, and intelligent in its retrieval mechanisms. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - 我发现了 4 个问题,并给出了一些高层面的反馈:
- 在
IndexRebuilder.sync中,你构建了一个新的EmbeddingStorage并对其进行修改(删除/插入),但在FaissVecDB.switch_index针对同一路径创建第二个EmbeddingStorage实例之前,从未对这个实例调用save_index/close,因此重建后的索引实际上从未持久化到磁盘;建议要么在switch_index中复用同一个 storage,要么在切换之前显式地保存/关闭这个临时 storage。 - 在结构化上传路径
KBHelper._upload_document_structured中,你在循环内调用upsert_doc_section,每次调用都会开启一个新的事务;如果能在单个session.begin()块中批量插入或 upsert 所有 section,可以显著加快速度并减少数据库开销。 - 在
IndexRebuilder.sync中,你在 Python 循环里对每个 batch 调用一次provider.get_embeddings,而不是使用已有的get_embeddings_batchAPI,这会限制大规模重建时的吞吐量;改为使用批量接口会与其它向量代码保持一致,并提升重建性能。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `IndexRebuilder.sync` you build a new `EmbeddingStorage` and mutate it (delete/insert) but never call `save_index`/`close` on that instance before `FaissVecDB.switch_index` creates a *second* `EmbeddingStorage` on the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage in `switch_index` or explicitly saving/closing the temporary storage before switching.
- The structured upload path in `KBHelper._upload_document_structured` calls `upsert_doc_section` in a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a single `session.begin()` block.
- In `IndexRebuilder.sync` you call `provider.get_embeddings` in a Python loop per batch instead of using the existing `get_embeddings_batch` API, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.
## Individual Comments
### Comment 1
<location path="astrbot/core/knowledge_base/kb_db_sqlite.py" line_range="177-186" />
<code_context>
) -> None:
"""导出 FAISS 索引文件"""
try:
- index_path = kb_helper.kb_dir / "index.faiss"
- if index_path.exists():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** v3 迁移会吞掉所有异常,这可能会隐藏真实的迁移失败。
`migrate_to_v3` 将每个 `ALTER`/`UPDATE` 都包在 `except Exception: pass` 里,这会屏蔽所有失败(包括语法、权限或数据问题),并可能在没有任何信号的情况下留下部分应用的迁移。
建议仅显式处理预期的幂等场景(例如针对重复/缺失列的特定数据库错误,类似 `migrate_to_v2` 中的做法),对其他异常进行日志记录并重新抛出,以便运维人员可以看到失败的迁移。
</issue_to_address>
### Comment 2
<location path="astrbot/core/knowledge_base/kb_mgr.py" line_range="38" />
<code_context>
self._session_deleted_callback_registered = False
self.kb_insts: dict[str, KBHelper] = {}
+ self.rebuild_tasks: dict[str, dict] = {}
+ self.kb_rebuild_task_map: dict[str, str] = {}
+ self._running_rebuild_tasks: dict[str, asyncio.Task] = {}
</code_context>
<issue_to_address>
**issue (complexity):** 建议引入一个有类型的 RebuildTask dataclass 和集中注册表,来替代索引重建流程中临时的 dict 和闭包,同时保持对外行为不变。
通过引入一个小而有类型的抽象,而不是依赖临时 dict 和嵌套闭包,你可以在保持现有行为的前提下显著降低重建流程的复杂度。
### 1. 使用 `@dataclass` 替换弱类型的 dict
当前:
- `rebuild_tasks: dict[str, dict]`
- `kb_rebuild_task_map: dict[str, str]`
- `_running_rebuild_tasks: dict[str, asyncio.Task]`
都在用带有“魔法 key”(`"status"`、`"stage"` 等)的 dict 来跟踪同一个概念的不同方面。你可以将这些状态集中到一个 dataclass 中,并用显式字段来表达:
```python
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal
RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]
@dataclass
class RebuildTask:
task_id: str
kb_id: str
provider_id: str
status: RebuildStatus = "processing"
stage: RebuildStage = "prepare"
current: int = 0
total: int = 0
error: Optional[str] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)
```
然后,在 `__init__` 中,用一个以 `task_id` 为 key 的注册表,再配上一个薄的 kb→task 映射:
```python
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}
```
### 2. 简化 `start_rebuild_index` 和进度更新
重构 `start_rebuild_index` 使其基于 `RebuildTask` 而不是自由形式的 dict,同时避免依赖捕获外层状态的嵌套闭包:
```python
async def start_rebuild_index(
self,
kb_id: str,
new_provider_id: str,
batch_size: int = 32,
) -> str:
kb_helper = await self.get_kb(kb_id)
if not kb_helper:
raise ValueError("知识库不存在")
existing_task_id = self._kb_to_task_id.get(kb_id)
if existing_task_id:
existing = self._rebuild_tasks.get(existing_task_id)
if existing and existing.status == "processing":
return existing_task_id
task_id = str(uuid.uuid4())
kb_helper.last_rebuild_task_id = task_id
rebuild_task = RebuildTask(
task_id=task_id,
kb_id=kb_id,
provider_id=new_provider_id,
)
self._kb_to_task_id[kb_id] = task_id
self._rebuild_tasks[task_id] = rebuild_task
async def progress(stage: str, current: int, total: int) -> None:
t = self._rebuild_tasks.get(task_id)
if not t:
return
t.stage = stage # type: ignore[assignment]
t.current = current
t.total = total
async def runner() -> None:
try:
await self.index_rebuilder.sync(
kb_helper=kb_helper,
new_provider_id=new_provider_id,
batch_size=batch_size,
progress_callback=progress,
)
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "completed"
t.stage = "finished"
except Exception as e:
logger.error(
"Index rebuild failed for kb %s provider %s: %s",
kb_id,
new_provider_id,
e,
)
logger.error(traceback.format_exc())
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "failed"
t.error = str(e)
finally:
task = rebuild_task.task
if task and not task.done():
task.cancel()
self._running_rebuild_tasks.pop(task_id, None)
rebuild_task.task = asyncio.create_task(runner())
self._running_rebuild_tasks[task_id] = rebuild_task.task
return task_id
```
这样可以在保持相同行为的同时:
- 移除非结构化 dict 和魔法字符串。
- 将一次重建的所有状态聚合到一个对象中。
- 让进度回调逻辑更清晰、更安全。
### 3. 简化 `get_rebuild_progress`
有了 dataclass 之后,`get_rebuild_progress` 就不需要直接操作 dict 了:
```python
def get_rebuild_progress(
self,
kb_id: str | None = None,
task_id: str | None = None,
) -> dict | None:
target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
if not target_task_id:
return None
t = self._rebuild_tasks.get(target_task_id)
if not t:
return None
# 保持对外 API 不变:返回一个 dict
return {
"task_id": t.task_id,
"kb_id": t.kb_id,
"provider_id": t.provider_id,
"status": t.status,
"stage": t.stage,
"current": t.current,
"total": t.total,
"error": t.error,
}
```
这样:
- 内部模型是强类型且集中管理的。
- 外部 API 完全保持不变(仍然返回 dict)。
### 4. 清理与重建任务相关的 `terminate`
`terminate` 可以基于有类型的任务来取消:
```python
async def terminate(self) -> None:
for t in list(self._rebuild_tasks.values()):
if t.task and not t.task.done():
t.task.cancel()
self._running_rebuild_tasks.clear()
# existing kb cleanup remains...
```
这些聚焦的改动在保留全部功能(包括基于 dict 的公开进度接口和自动重建行为)的同时,统一了作业状态,降低了管理重建索引的认知负担。
</issue_to_address>
### Comment 3
<location path="astrbot/core/db/vec_db/faiss_impl/vec_db.py" line_range="222" />
<code_context>
+ metadata_filters=metadata_filters,
+ )
+
+ async def switch_index(
+ self,
+ index_store_path: str,
</code_context>
<issue_to_address>
**issue (complexity):** 建议在 `switch_index` 中收紧 `_index_switch_lock` 的临界区,使其只保护索引相关字段的原子交换,而不是整个 storage 创建和销毁的过程。
通过收紧 `switch_index` 中的临界区,在不改变行为的前提下,你可以同时降低复杂度和锁竞争。
当前 `switch_index` 在 `_index_switch_lock` 下执行*所有*操作:创建新的 storage(可能很慢)以及关闭旧的 storage。这正是审阅者指出的问题:锁在长时间 I/O 过程中一直被持有,从而串行化了不相关的操作,也让这个锁的用途更难理解。
你可以保持语义不变,但只在*字段交换*时持有锁;在获取锁之前创建新 storage,在释放锁之后关闭旧 storage:
```python
async def switch_index(
self,
index_store_path: str,
embedding_provider: EmbeddingProvider,
rerank_provider: RerankProvider | None = None,
) -> None:
"""Hot-switch to a new FAISS index and embedding provider."""
# 1. 在加锁前创建新的 storage。
try:
new_storage = EmbeddingStorage(
embedding_provider.get_dim(),
index_store_path,
)
except Exception:
# 如果创建失败,保持旧 storage 不变。
raise
# 2. 只在引用的原子交换期间持有锁。
async with self._index_switch_lock:
old_storage = self.embedding_storage
self.index_store_path = index_store_path
self.embedding_provider = embedding_provider
self.rerank_provider = rerank_provider
self.embedding_storage = new_storage
# 3. 在释放锁之后关闭旧的 storage。
if old_storage:
await old_storage.close()
```
这样可以:
- 保持所有其他方法在索引切换期间仍然与切换操作串行(行为无变化)。
- 让 `embedding_storage`/`embedding_provider` 的交换原子且易于理解。
但同时去掉了不必要的锁竞争,降低了概念复杂度:`_index_switch_lock` 现在只明确保护“当前使用哪个索引”的那些指针,而不是整个 storage 生命周期的创建和销毁。
</issue_to_address>
### Comment 4
<location path="astrbot/core/knowledge_base/kb_helper.py" line_range="387" />
<code_context>
raise e
+ async def _upload_document_structured(
+ self,
+ file_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** 建议将共享的向量化和索引工作流抽取为一个专用 helper,使“平铺上传”和“结构化上传”都复用同一套逻辑,而不是各自重复实现。
你可以通过抽取一个小的“embed + 持久化 + 统计/刷新”流水线 helper,让平铺和结构化两种路径都调用它,从而减少分支和重复。
目前,`upload_document` 和 `_upload_document_structured` 都会:
- 生成 `doc_id`
- 构建 `chunk_ids`、`contents`、`metadatas`
- 调用 `get_ep()` 并带进度地批量获取 embeddings
- 写入 `vec_db.document_storage` 和 `vec_db.embedding_storage`
- 创建并持久化 `KBDocument`
- 更新知识库统计并刷新 KB/文档
只有“如何构建 `contents` + `metadatas` + sections” 的逻辑不同。你可以把公共的尾部逻辑封装到一个 helper 中,让模式相关的逻辑保持精简。
例如:
```python
async def _index_chunks(
self,
*,
doc_id: str,
contents: list[str],
metadatas: list[dict],
index_mode: str,
file_name: str,
file_type: str,
file_size: int,
batch_size: int,
tasks_limit: int,
max_retries: int,
progress_callback=None,
) -> KBDocument:
await self._ensure_vec_db()
ep = await self.get_ep()
async def embedding_progress(current: int, total: int) -> None:
if progress_callback:
await progress_callback("embedding", current, total)
vectors = await ep.get_embeddings_batch(
contents,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=embedding_progress if progress_callback else None,
)
chunk_ids = [str(uuid.uuid4()) for _ in contents]
int_ids = await self.vec_db.document_storage.insert_documents_batch(
chunk_ids, contents, metadatas,
)
await self.vec_db.embedding_storage.insert_batch(
vectors=np.array(vectors, dtype=np.float32),
ids=int_ids,
)
kb_doc = KBDocument(
doc_id=doc_id,
kb_id=self.kb.kb_id,
doc_name=file_name,
file_type=file_type,
file_size=file_size,
file_path="",
index_mode=index_mode,
chunk_count=len(contents),
media_count=0,
)
async with self.kb_db.get_db() as session:
async with session.begin():
session.add(kb_doc)
await session.refresh(kb_doc)
await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db) # type: ignore[arg-type]
await self.refresh_kb()
await self.refresh_document(doc_id)
return kb_doc
```
然后,两种模式主要只负责准备输入:
**结构化模式:**
```python
async def _upload_document_structured(...):
if file_content is None:
raise ValueError("结构化上传需要 file_content")
doc_id = str(uuid.uuid4())
parser = await select_parser(f".{file_type}")
if progress_callback:
await progress_callback("parsing", 0, 100)
parse_result = await parser.parse(file_content, file_name)
if progress_callback:
await progress_callback("parsing", 100, 100)
structure_parser = StructureParser()
if progress_callback:
await progress_callback("chunking", 0, 100)
nodes = await structure_parser.parse_structure(parse_result.text, file_type)
sections = structure_parser.flatten(nodes)
if not sections:
logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
return await self.upload_document(
file_name=file_name,
file_content=file_content,
file_type=file_type,
index_mode="flat",
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
contents = [section.path for section in sections]
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "structure",
"section_path": section.path,
"section_level": section.level,
}
for i, section in enumerate(sections)
]
if progress_callback:
await progress_callback("chunking", 100, 100)
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="structure",
file_name=file_name,
file_type=file_type,
file_size=len(file_content),
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
for i, section in enumerate(sections):
await self.kb_db.upsert_doc_section(
DocSection(
doc_id=doc_id,
kb_id=self.kb.kb_id,
section_path=section.path,
section_level=section.level,
section_title=section.title,
section_body=section.body,
parent_section_id=None,
sort_order=i,
),
)
return kb_doc
```
**平铺模式:**
在 `upload_document` 中,当你已经构建好 `chunks_text` 和对应的平铺 metadatas 后:
```python
contents = chunks_text
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "flat",
}
for i, _ in enumerate(chunks_text)
]
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="flat",
file_name=file_name,
file_type=file_type,
file_size=file_size,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
```
这样可以在保留当前所有行为(`index_mode`、结构化 section、进度上报)的同时,将关键的“索引流水线”集中在一个地方。未来如果要修改(例如切换向量数据库、调整 metadata 结构、变更统计/刷新行为),只需要改 `_index_chunks`,就能降低复杂度并减少平铺和结构化路径之间出现细微偏差的风险。
</issue_to_address>帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈持续改进 Review 质量。
Original comment in English
Hey - I've found 4 issues, and left some high level feedback:
- In
IndexRebuilder.syncyou build a newEmbeddingStorageand mutate it (delete/insert) but never callsave_index/closeon that instance beforeFaissVecDB.switch_indexcreates a secondEmbeddingStorageon the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage inswitch_indexor explicitly saving/closing the temporary storage before switching. - The structured upload path in
KBHelper._upload_document_structuredcallsupsert_doc_sectionin a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a singlesession.begin()block. - In
IndexRebuilder.syncyou callprovider.get_embeddingsin a Python loop per batch instead of using the existingget_embeddings_batchAPI, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `IndexRebuilder.sync` you build a new `EmbeddingStorage` and mutate it (delete/insert) but never call `save_index`/`close` on that instance before `FaissVecDB.switch_index` creates a *second* `EmbeddingStorage` on the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage in `switch_index` or explicitly saving/closing the temporary storage before switching.
- The structured upload path in `KBHelper._upload_document_structured` calls `upsert_doc_section` in a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a single `session.begin()` block.
- In `IndexRebuilder.sync` you call `provider.get_embeddings` in a Python loop per batch instead of using the existing `get_embeddings_batch` API, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.
## Individual Comments
### Comment 1
<location path="astrbot/core/knowledge_base/kb_db_sqlite.py" line_range="177-186" />
<code_context>
) -> None:
"""导出 FAISS 索引文件"""
try:
- index_path = kb_helper.kb_dir / "index.faiss"
- if index_path.exists():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** v3 migration swallows all exceptions, which can hide real migration failures.
`migrate_to_v3` wraps each `ALTER`/`UPDATE` in `except Exception: pass`, which suppresses all failures (including syntax, permissions, or data issues) and can leave migrations partially applied with no signal.
Instead, only handle the expected idempotency cases explicitly (e.g. specific DB errors for duplicate/missing columns, like in `migrate_to_v2`), and log + re-raise any other exceptions so failed migrations are visible to operators.
</issue_to_address>
### Comment 2
<location path="astrbot/core/knowledge_base/kb_mgr.py" line_range="38" />
<code_context>
self._session_deleted_callback_registered = False
self.kb_insts: dict[str, KBHelper] = {}
+ self.rebuild_tasks: dict[str, dict] = {}
+ self.kb_rebuild_task_map: dict[str, str] = {}
+ self._running_rebuild_tasks: dict[str, asyncio.Task] = {}
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing a typed RebuildTask dataclass and central registries to replace ad‑hoc dicts and closures in the index rebuild flow while keeping the external behavior unchanged.
You can significantly reduce complexity in the rebuild flow by introducing a small typed abstraction instead of ad‑hoc dicts and nested closures, while keeping all behavior intact.
### 1. Replace loosely‑typed dicts with a `@dataclass`
Right now:
- `rebuild_tasks: dict[str, dict]`
- `kb_rebuild_task_map: dict[str, str]`
- `_running_rebuild_tasks: dict[str, asyncio.Task]`
all track aspects of the same concept using dicts with magic keys (`"status"`, `"stage"`, etc.). You can centralize this into a dataclass with explicit fields:
```python
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal
RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]
@dataclass
class RebuildTask:
task_id: str
kb_id: str
provider_id: str
status: RebuildStatus = "processing"
stage: RebuildStage = "prepare"
current: int = 0
total: int = 0
error: Optional[str] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)
```
Then, in `__init__`, use a single registry keyed by `task_id` plus a thin kb→task mapping:
```python
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}
```
### 2. Simplify `start_rebuild_index` and progress updates
Refactor `start_rebuild_index` to work with `RebuildTask` instead of free‑form dicts and to avoid nested closures capturing outer state:
```python
async def start_rebuild_index(
self,
kb_id: str,
new_provider_id: str,
batch_size: int = 32,
) -> str:
kb_helper = await self.get_kb(kb_id)
if not kb_helper:
raise ValueError("知识库不存在")
existing_task_id = self._kb_to_task_id.get(kb_id)
if existing_task_id:
existing = self._rebuild_tasks.get(existing_task_id)
if existing and existing.status == "processing":
return existing_task_id
task_id = str(uuid.uuid4())
kb_helper.last_rebuild_task_id = task_id
rebuild_task = RebuildTask(
task_id=task_id,
kb_id=kb_id,
provider_id=new_provider_id,
)
self._kb_to_task_id[kb_id] = task_id
self._rebuild_tasks[task_id] = rebuild_task
async def progress(stage: str, current: int, total: int) -> None:
t = self._rebuild_tasks.get(task_id)
if not t:
return
t.stage = stage # type: ignore[assignment]
t.current = current
t.total = total
async def runner() -> None:
try:
await self.index_rebuilder.sync(
kb_helper=kb_helper,
new_provider_id=new_provider_id,
batch_size=batch_size,
progress_callback=progress,
)
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "completed"
t.stage = "finished"
except Exception as e:
logger.error(
"Index rebuild failed for kb %s provider %s: %s",
kb_id,
new_provider_id,
e,
)
logger.error(traceback.format_exc())
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "failed"
t.error = str(e)
finally:
task = rebuild_task.task
if task and not task.done():
task.cancel()
self._running_rebuild_tasks.pop(task_id, None)
rebuild_task.task = asyncio.create_task(runner())
self._running_rebuild_tasks[task_id] = rebuild_task.task
return task_id
```
This keeps the same behavior but:
- Removes unstructured dicts and magic strings.
- Groups all state about a rebuild into one object.
- Makes the progress callback logic clearer and safer.
### 3. Simplify `get_rebuild_progress`
With the dataclass, `get_rebuild_progress` no longer needs to manipulate dicts directly:
```python
def get_rebuild_progress(
self,
kb_id: str | None = None,
task_id: str | None = None,
) -> dict | None:
target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
if not target_task_id:
return None
t = self._rebuild_tasks.get(target_task_id)
if not t:
return None
# Keep external API unchanged: return a dict
return {
"task_id": t.task_id,
"kb_id": t.kb_id,
"provider_id": t.provider_id,
"status": t.status,
"stage": t.stage,
"current": t.current,
"total": t.total,
"error": t.error,
}
```
This way:
- The internal model is strongly typed and centralized.
- The external API remains exactly as it was (still returns a dict).
### 4. Clean up `terminate` for rebuild tasks
`terminate` can cancel using the typed tasks:
```python
async def terminate(self) -> None:
for t in list(self._rebuild_tasks.values()):
if t.task and not t.task.done():
t.task.cancel()
self._running_rebuild_tasks.clear()
# existing kb cleanup remains...
```
These focused changes keep all functionality (including the public dict-based progress interface and auto‑rebuild behavior) while consolidating job state and reducing the cognitive load around rebuild index management.
</issue_to_address>
### Comment 3
<location path="astrbot/core/db/vec_db/faiss_impl/vec_db.py" line_range="222" />
<code_context>
+ metadata_filters=metadata_filters,
+ )
+
+ async def switch_index(
+ self,
+ index_store_path: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider tightening the `_index_switch_lock` critical section in `switch_index` so it only guards the atomic swap of index-related fields instead of the whole storage creation and teardown process.
You can reduce both complexity and contention without changing behavior by tightening the critical section in `switch_index`.
Currently, `switch_index` does *everything* under `_index_switch_lock`: creating the new storage (which may be slow) and closing the old one. That’s exactly what the reviewer is pointing at: the lock is held across long-running I/O, serializing unrelated operations and making the lock’s purpose harder to reason about.
You can keep the same semantics but only hold the lock around the *field swap*; new storage is created before acquiring the lock, and old storage is closed after releasing it:
```python
async def switch_index(
self,
index_store_path: str,
embedding_provider: EmbeddingProvider,
rerank_provider: RerankProvider | None = None,
) -> None:
"""Hot-switch to a new FAISS index and embedding provider."""
# 1. Create new storage *before* taking the lock.
try:
new_storage = EmbeddingStorage(
embedding_provider.get_dim(),
index_store_path,
)
except Exception:
# If creation fails, keep old storage intact.
raise
# 2. Only keep the lock for the atomic swap of references.
async with self._index_switch_lock:
old_storage = self.embedding_storage
self.index_store_path = index_store_path
self.embedding_provider = embedding_provider
self.rerank_provider = rerank_provider
self.embedding_storage = new_storage
# 3. Close the old storage after releasing the lock.
if old_storage:
await old_storage.close()
```
This keeps:
- All other methods still serialized vs. index switches (no functional change).
- The swap of `embedding_storage`/`embedding_provider` atomic and easy to reason about.
But it removes unnecessary lock contention and reduces conceptual complexity: `_index_switch_lock` now clearly protects just the “which index are we using?” pointers, not the entire lifecycle of storage creation and teardown.
</issue_to_address>
### Comment 4
<location path="astrbot/core/knowledge_base/kb_helper.py" line_range="387" />
<code_context>
raise e
+ async def _upload_document_structured(
+ self,
+ file_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared embedding and indexing workflow into a dedicated helper so both flat and structured upload paths reuse the same logic instead of duplicating it.
You can reduce the branching/duplication by extracting the common “embed + persist + stats/refresh” pipeline into a small helper that both flat and structured paths call.
Right now, both `upload_document` and `_upload_document_structured`:
- Generate `doc_id`
- Build `chunk_ids`, `contents`, `metadatas`
- Call `get_ep()` and do batched embeddings with progress
- Write to `vec_db.document_storage` and `vec_db.embedding_storage`
- Create and persist a `KBDocument`
- Update KB stats and refresh KB/doc
Only the “how to build `contents` + `metadatas` + sections” differs. You can encapsulate the common tail in a helper and keep the mode-specific logic small.
For example:
```python
async def _index_chunks(
self,
*,
doc_id: str,
contents: list[str],
metadatas: list[dict],
index_mode: str,
file_name: str,
file_type: str,
file_size: int,
batch_size: int,
tasks_limit: int,
max_retries: int,
progress_callback=None,
) -> KBDocument:
await self._ensure_vec_db()
ep = await self.get_ep()
async def embedding_progress(current: int, total: int) -> None:
if progress_callback:
await progress_callback("embedding", current, total)
vectors = await ep.get_embeddings_batch(
contents,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=embedding_progress if progress_callback else None,
)
chunk_ids = [str(uuid.uuid4()) for _ in contents]
int_ids = await self.vec_db.document_storage.insert_documents_batch(
chunk_ids, contents, metadatas,
)
await self.vec_db.embedding_storage.insert_batch(
vectors=np.array(vectors, dtype=np.float32),
ids=int_ids,
)
kb_doc = KBDocument(
doc_id=doc_id,
kb_id=self.kb.kb_id,
doc_name=file_name,
file_type=file_type,
file_size=file_size,
file_path="",
index_mode=index_mode,
chunk_count=len(contents),
media_count=0,
)
async with self.kb_db.get_db() as session:
async with session.begin():
session.add(kb_doc)
await session.refresh(kb_doc)
await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db) # type: ignore[arg-type]
await self.refresh_kb()
await self.refresh_document(doc_id)
return kb_doc
```
Then the two modes become mostly about preparing inputs:
**Structured:**
```python
async def _upload_document_structured(...):
if file_content is None:
raise ValueError("结构化上传需要 file_content")
doc_id = str(uuid.uuid4())
parser = await select_parser(f".{file_type}")
if progress_callback:
await progress_callback("parsing", 0, 100)
parse_result = await parser.parse(file_content, file_name)
if progress_callback:
await progress_callback("parsing", 100, 100)
structure_parser = StructureParser()
if progress_callback:
await progress_callback("chunking", 0, 100)
nodes = await structure_parser.parse_structure(parse_result.text, file_type)
sections = structure_parser.flatten(nodes)
if not sections:
logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
return await self.upload_document(
file_name=file_name,
file_content=file_content,
file_type=file_type,
index_mode="flat",
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
contents = [section.path for section in sections]
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "structure",
"section_path": section.path,
"section_level": section.level,
}
for i, section in enumerate(sections)
]
if progress_callback:
await progress_callback("chunking", 100, 100)
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="structure",
file_name=file_name,
file_type=file_type,
file_size=len(file_content),
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
for i, section in enumerate(sections):
await self.kb_db.upsert_doc_section(
DocSection(
doc_id=doc_id,
kb_id=self.kb.kb_id,
section_path=section.path,
section_level=section.level,
section_title=section.title,
section_body=section.body,
parent_section_id=None,
sort_order=i,
),
)
return kb_doc
```
**Flat:**
Inside `upload_document`, after you’ve built `chunks_text` and flat metadatas:
```python
contents = chunks_text
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "flat",
}
for i, _ in enumerate(chunks_text)
]
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="flat",
file_name=file_name,
file_type=file_type,
file_size=file_size,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
```
This keeps all current behavior (`index_mode`, structured sections, progress reporting) but concentrates the critical “indexing pipeline” into one place. Future changes (e.g., switching vector DB, changing metadata shape, altering stats/refresh behavior) will only need to be done in `_index_chunks`, reducing complexity and the chance of subtle drift between flat and structured paths.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| self._session_deleted_callback_registered = False | ||
|
|
||
| self.kb_insts: dict[str, KBHelper] = {} | ||
| self.rebuild_tasks: dict[str, dict] = {} |
There was a problem hiding this comment.
issue (complexity): 建议引入一个有类型的 RebuildTask dataclass 和集中注册表,来替代索引重建流程中临时的 dict 和闭包,同时保持对外行为不变。
通过引入一个小而有类型的抽象,而不是依赖临时 dict 和嵌套闭包,你可以在保持现有行为的前提下显著降低重建流程的复杂度。
1. 使用 @dataclass 替换弱类型的 dict
当前:
rebuild_tasks: dict[str, dict]kb_rebuild_task_map: dict[str, str]_running_rebuild_tasks: dict[str, asyncio.Task]
都在用带有“魔法 key”("status"、"stage" 等)的 dict 来跟踪同一个概念的不同方面。你可以将这些状态集中到一个 dataclass 中,并用显式字段来表达:
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal
RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]
@dataclass
class RebuildTask:
task_id: str
kb_id: str
provider_id: str
status: RebuildStatus = "processing"
stage: RebuildStage = "prepare"
current: int = 0
total: int = 0
error: Optional[str] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)然后,在 __init__ 中,用一个以 task_id 为 key 的注册表,再配上一个薄的 kb→task 映射:
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}2. 简化 start_rebuild_index 和进度更新
重构 start_rebuild_index 使其基于 RebuildTask 而不是自由形式的 dict,同时避免依赖捕获外层状态的嵌套闭包:
async def start_rebuild_index(
self,
kb_id: str,
new_provider_id: str,
batch_size: int = 32,
) -> str:
kb_helper = await self.get_kb(kb_id)
if not kb_helper:
raise ValueError("知识库不存在")
existing_task_id = self._kb_to_task_id.get(kb_id)
if existing_task_id:
existing = self._rebuild_tasks.get(existing_task_id)
if existing and existing.status == "processing":
return existing_task_id
task_id = str(uuid.uuid4())
kb_helper.last_rebuild_task_id = task_id
rebuild_task = RebuildTask(
task_id=task_id,
kb_id=kb_id,
provider_id=new_provider_id,
)
self._kb_to_task_id[kb_id] = task_id
self._rebuild_tasks[task_id] = rebuild_task
async def progress(stage: str, current: int, total: int) -> None:
t = self._rebuild_tasks.get(task_id)
if not t:
return
t.stage = stage # type: ignore[assignment]
t.current = current
t.total = total
async def runner() -> None:
try:
await self.index_rebuilder.sync(
kb_helper=kb_helper,
new_provider_id=new_provider_id,
batch_size=batch_size,
progress_callback=progress,
)
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "completed"
t.stage = "finished"
except Exception as e:
logger.error(
"Index rebuild failed for kb %s provider %s: %s",
kb_id,
new_provider_id,
e,
)
logger.error(traceback.format_exc())
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "failed"
t.error = str(e)
finally:
task = rebuild_task.task
if task and not task.done():
task.cancel()
self._running_rebuild_tasks.pop(task_id, None)
rebuild_task.task = asyncio.create_task(runner())
self._running_rebuild_tasks[task_id] = rebuild_task.task
return task_id这样可以在保持相同行为的同时:
- 移除非结构化 dict 和魔法字符串。
- 将一次重建的所有状态聚合到一个对象中。
- 让进度回调逻辑更清晰、更安全。
3. 简化 get_rebuild_progress
有了 dataclass 之后,get_rebuild_progress 就不需要直接操作 dict 了:
def get_rebuild_progress(
self,
kb_id: str | None = None,
task_id: str | None = None,
) -> dict | None:
target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
if not target_task_id:
return None
t = self._rebuild_tasks.get(target_task_id)
if not t:
return None
# 保持对外 API 不变:返回一个 dict
return {
"task_id": t.task_id,
"kb_id": t.kb_id,
"provider_id": t.provider_id,
"status": t.status,
"stage": t.stage,
"current": t.current,
"total": t.total,
"error": t.error,
}这样:
- 内部模型是强类型且集中管理的。
- 外部 API 完全保持不变(仍然返回 dict)。
4. 清理与重建任务相关的 terminate
terminate 可以基于有类型的任务来取消:
async def terminate(self) -> None:
for t in list(self._rebuild_tasks.values()):
if t.task and not t.task.done():
t.task.cancel()
self._running_rebuild_tasks.clear()
# existing kb cleanup remains...这些聚焦的改动在保留全部功能(包括基于 dict 的公开进度接口和自动重建行为)的同时,统一了作业状态,降低了管理重建索引的认知负担。
Original comment in English
issue (complexity): Consider introducing a typed RebuildTask dataclass and central registries to replace ad‑hoc dicts and closures in the index rebuild flow while keeping the external behavior unchanged.
You can significantly reduce complexity in the rebuild flow by introducing a small typed abstraction instead of ad‑hoc dicts and nested closures, while keeping all behavior intact.
1. Replace loosely‑typed dicts with a @dataclass
Right now:
rebuild_tasks: dict[str, dict]kb_rebuild_task_map: dict[str, str]_running_rebuild_tasks: dict[str, asyncio.Task]
all track aspects of the same concept using dicts with magic keys ("status", "stage", etc.). You can centralize this into a dataclass with explicit fields:
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal
RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]
@dataclass
class RebuildTask:
task_id: str
kb_id: str
provider_id: str
status: RebuildStatus = "processing"
stage: RebuildStage = "prepare"
current: int = 0
total: int = 0
error: Optional[str] = None
task: Optional[asyncio.Task] = field(default=None, repr=False)Then, in __init__, use a single registry keyed by task_id plus a thin kb→task mapping:
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}2. Simplify start_rebuild_index and progress updates
Refactor start_rebuild_index to work with RebuildTask instead of free‑form dicts and to avoid nested closures capturing outer state:
async def start_rebuild_index(
self,
kb_id: str,
new_provider_id: str,
batch_size: int = 32,
) -> str:
kb_helper = await self.get_kb(kb_id)
if not kb_helper:
raise ValueError("知识库不存在")
existing_task_id = self._kb_to_task_id.get(kb_id)
if existing_task_id:
existing = self._rebuild_tasks.get(existing_task_id)
if existing and existing.status == "processing":
return existing_task_id
task_id = str(uuid.uuid4())
kb_helper.last_rebuild_task_id = task_id
rebuild_task = RebuildTask(
task_id=task_id,
kb_id=kb_id,
provider_id=new_provider_id,
)
self._kb_to_task_id[kb_id] = task_id
self._rebuild_tasks[task_id] = rebuild_task
async def progress(stage: str, current: int, total: int) -> None:
t = self._rebuild_tasks.get(task_id)
if not t:
return
t.stage = stage # type: ignore[assignment]
t.current = current
t.total = total
async def runner() -> None:
try:
await self.index_rebuilder.sync(
kb_helper=kb_helper,
new_provider_id=new_provider_id,
batch_size=batch_size,
progress_callback=progress,
)
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "completed"
t.stage = "finished"
except Exception as e:
logger.error(
"Index rebuild failed for kb %s provider %s: %s",
kb_id,
new_provider_id,
e,
)
logger.error(traceback.format_exc())
t = self._rebuild_tasks.get(task_id)
if t:
t.status = "failed"
t.error = str(e)
finally:
task = rebuild_task.task
if task and not task.done():
task.cancel()
self._running_rebuild_tasks.pop(task_id, None)
rebuild_task.task = asyncio.create_task(runner())
self._running_rebuild_tasks[task_id] = rebuild_task.task
return task_idThis keeps the same behavior but:
- Removes unstructured dicts and magic strings.
- Groups all state about a rebuild into one object.
- Makes the progress callback logic clearer and safer.
3. Simplify get_rebuild_progress
With the dataclass, get_rebuild_progress no longer needs to manipulate dicts directly:
def get_rebuild_progress(
self,
kb_id: str | None = None,
task_id: str | None = None,
) -> dict | None:
target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
if not target_task_id:
return None
t = self._rebuild_tasks.get(target_task_id)
if not t:
return None
# Keep external API unchanged: return a dict
return {
"task_id": t.task_id,
"kb_id": t.kb_id,
"provider_id": t.provider_id,
"status": t.status,
"stage": t.stage,
"current": t.current,
"total": t.total,
"error": t.error,
}This way:
- The internal model is strongly typed and centralized.
- The external API remains exactly as it was (still returns a dict).
4. Clean up terminate for rebuild tasks
terminate can cancel using the typed tasks:
async def terminate(self) -> None:
for t in list(self._rebuild_tasks.values()):
if t.task and not t.task.done():
t.task.cancel()
self._running_rebuild_tasks.clear()
# existing kb cleanup remains...These focused changes keep all functionality (including the public dict-based progress interface and auto‑rebuild behavior) while consolidating job state and reducing the cognitive load around rebuild index management.
|
|
||
| raise e | ||
|
|
||
| async def _upload_document_structured( |
There was a problem hiding this comment.
issue (complexity): 建议将共享的向量化和索引工作流抽取为一个专用 helper,使“平铺上传”和“结构化上传”都复用同一套逻辑,而不是各自重复实现。
你可以通过抽取一个小的“embed + 持久化 + 统计/刷新”流水线 helper,让平铺和结构化两种路径都调用它,从而减少分支和重复。
目前,upload_document 和 _upload_document_structured 都会:
- 生成
doc_id - 构建
chunk_ids、contents、metadatas - 调用
get_ep()并带进度地批量获取 embeddings - 写入
vec_db.document_storage和vec_db.embedding_storage - 创建并持久化
KBDocument - 更新知识库统计并刷新 KB/文档
只有“如何构建 contents + metadatas + sections” 的逻辑不同。你可以把公共的尾部逻辑封装到一个 helper 中,让模式相关的逻辑保持精简。
例如:
async def _index_chunks(
self,
*,
doc_id: str,
contents: list[str],
metadatas: list[dict],
index_mode: str,
file_name: str,
file_type: str,
file_size: int,
batch_size: int,
tasks_limit: int,
max_retries: int,
progress_callback=None,
) -> KBDocument:
await self._ensure_vec_db()
ep = await self.get_ep()
async def embedding_progress(current: int, total: int) -> None:
if progress_callback:
await progress_callback("embedding", current, total)
vectors = await ep.get_embeddings_batch(
contents,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=embedding_progress if progress_callback else None,
)
chunk_ids = [str(uuid.uuid4()) for _ in contents]
int_ids = await self.vec_db.document_storage.insert_documents_batch(
chunk_ids, contents, metadatas,
)
await self.vec_db.embedding_storage.insert_batch(
vectors=np.array(vectors, dtype=np.float32),
ids=int_ids,
)
kb_doc = KBDocument(
doc_id=doc_id,
kb_id=self.kb.kb_id,
doc_name=file_name,
file_type=file_type,
file_size=file_size,
file_path="",
index_mode=index_mode,
chunk_count=len(contents),
media_count=0,
)
async with self.kb_db.get_db() as session:
async with session.begin():
session.add(kb_doc)
await session.refresh(kb_doc)
await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db) # type: ignore[arg-type]
await self.refresh_kb()
await self.refresh_document(doc_id)
return kb_doc然后,两种模式主要只负责准备输入:
结构化模式:
async def _upload_document_structured(...):
if file_content is None:
raise ValueError("结构化上传需要 file_content")
doc_id = str(uuid.uuid4())
parser = await select_parser(f".{file_type}")
if progress_callback:
await progress_callback("parsing", 0, 100)
parse_result = await parser.parse(file_content, file_name)
if progress_callback:
await progress_callback("parsing", 100, 100)
structure_parser = StructureParser()
if progress_callback:
await progress_callback("chunking", 0, 100)
nodes = await structure_parser.parse_structure(parse_result.text, file_type)
sections = structure_parser.flatten(nodes)
if not sections:
logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
return await self.upload_document(
file_name=file_name,
file_content=file_content,
file_type=file_type,
index_mode="flat",
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
contents = [section.path for section in sections]
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "structure",
"section_path": section.path,
"section_level": section.level,
}
for i, section in enumerate(sections)
]
if progress_callback:
await progress_callback("chunking", 100, 100)
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="structure",
file_name=file_name,
file_type=file_type,
file_size=len(file_content),
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
for i, section in enumerate(sections):
await self.kb_db.upsert_doc_section(
DocSection(
doc_id=doc_id,
kb_id=self.kb.kb_id,
section_path=section.path,
section_level=section.level,
section_title=section.title,
section_body=section.body,
parent_section_id=None,
sort_order=i,
),
)
return kb_doc平铺模式:
在 upload_document 中,当你已经构建好 chunks_text 和对应的平铺 metadatas 后:
contents = chunks_text
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "flat",
}
for i, _ in enumerate(chunks_text)
]
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="flat",
file_name=file_name,
file_type=file_type,
file_size=file_size,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)这样可以在保留当前所有行为(index_mode、结构化 section、进度上报)的同时,将关键的“索引流水线”集中在一个地方。未来如果要修改(例如切换向量数据库、调整 metadata 结构、变更统计/刷新行为),只需要改 _index_chunks,就能降低复杂度并减少平铺和结构化路径之间出现细微偏差的风险。
Original comment in English
issue (complexity): Consider extracting the shared embedding and indexing workflow into a dedicated helper so both flat and structured upload paths reuse the same logic instead of duplicating it.
You can reduce the branching/duplication by extracting the common “embed + persist + stats/refresh” pipeline into a small helper that both flat and structured paths call.
Right now, both upload_document and _upload_document_structured:
- Generate
doc_id - Build
chunk_ids,contents,metadatas - Call
get_ep()and do batched embeddings with progress - Write to
vec_db.document_storageandvec_db.embedding_storage - Create and persist a
KBDocument - Update KB stats and refresh KB/doc
Only the “how to build contents + metadatas + sections” differs. You can encapsulate the common tail in a helper and keep the mode-specific logic small.
For example:
async def _index_chunks(
self,
*,
doc_id: str,
contents: list[str],
metadatas: list[dict],
index_mode: str,
file_name: str,
file_type: str,
file_size: int,
batch_size: int,
tasks_limit: int,
max_retries: int,
progress_callback=None,
) -> KBDocument:
await self._ensure_vec_db()
ep = await self.get_ep()
async def embedding_progress(current: int, total: int) -> None:
if progress_callback:
await progress_callback("embedding", current, total)
vectors = await ep.get_embeddings_batch(
contents,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=embedding_progress if progress_callback else None,
)
chunk_ids = [str(uuid.uuid4()) for _ in contents]
int_ids = await self.vec_db.document_storage.insert_documents_batch(
chunk_ids, contents, metadatas,
)
await self.vec_db.embedding_storage.insert_batch(
vectors=np.array(vectors, dtype=np.float32),
ids=int_ids,
)
kb_doc = KBDocument(
doc_id=doc_id,
kb_id=self.kb.kb_id,
doc_name=file_name,
file_type=file_type,
file_size=file_size,
file_path="",
index_mode=index_mode,
chunk_count=len(contents),
media_count=0,
)
async with self.kb_db.get_db() as session:
async with session.begin():
session.add(kb_doc)
await session.refresh(kb_doc)
await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db) # type: ignore[arg-type]
await self.refresh_kb()
await self.refresh_document(doc_id)
return kb_docThen the two modes become mostly about preparing inputs:
Structured:
async def _upload_document_structured(...):
if file_content is None:
raise ValueError("结构化上传需要 file_content")
doc_id = str(uuid.uuid4())
parser = await select_parser(f".{file_type}")
if progress_callback:
await progress_callback("parsing", 0, 100)
parse_result = await parser.parse(file_content, file_name)
if progress_callback:
await progress_callback("parsing", 100, 100)
structure_parser = StructureParser()
if progress_callback:
await progress_callback("chunking", 0, 100)
nodes = await structure_parser.parse_structure(parse_result.text, file_type)
sections = structure_parser.flatten(nodes)
if not sections:
logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
return await self.upload_document(
file_name=file_name,
file_content=file_content,
file_type=file_type,
index_mode="flat",
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
contents = [section.path for section in sections]
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "structure",
"section_path": section.path,
"section_level": section.level,
}
for i, section in enumerate(sections)
]
if progress_callback:
await progress_callback("chunking", 100, 100)
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="structure",
file_name=file_name,
file_type=file_type,
file_size=len(file_content),
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)
for i, section in enumerate(sections):
await self.kb_db.upsert_doc_section(
DocSection(
doc_id=doc_id,
kb_id=self.kb.kb_id,
section_path=section.path,
section_level=section.level,
section_title=section.title,
section_body=section.body,
parent_section_id=None,
sort_order=i,
),
)
return kb_docFlat:
Inside upload_document, after you’ve built chunks_text and flat metadatas:
contents = chunks_text
metadatas = [
{
"kb_id": self.kb.kb_id,
"kb_doc_id": doc_id,
"chunk_index": i,
"index_mode": "flat",
}
for i, _ in enumerate(chunks_text)
]
kb_doc = await self._index_chunks(
doc_id=doc_id,
contents=contents,
metadatas=metadatas,
index_mode="flat",
file_name=file_name,
file_type=file_type,
file_size=file_size,
batch_size=batch_size,
tasks_limit=tasks_limit,
max_retries=max_retries,
progress_callback=progress_callback,
)This keeps all current behavior (index_mode, structured sections, progress reporting) but concentrates the critical “indexing pipeline” into one place. Future changes (e.g., switching vector DB, changing metadata shape, altering stats/refresh behavior) will only need to be done in _index_chunks, reducing complexity and the chance of subtle drift between flat and structured paths.
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-designed enhancement to the knowledge base (RAG) architecture. The key features, including provider-isolated FAISS indexes for hot-switching embedding models, structured document indexing, and safe online index rebuilding, are impressive and add a lot of flexibility. The addition of detailed architecture documentation and comprehensive tests is also highly commendable.
My review has identified a few minor issues:
- A potential for silent failures in the database migration logic due to overly broad exception handling.
- An inconsistency in the path separator used for structured documents between the implementation and the documentation.
- A small typo in the new architecture document.
Overall, this is an excellent contribution that greatly improves the knowledge base system's capabilities and robustness. The provided feedback aims to polish these final details.
Note: Security Review did not run due to the size of the PR.
| async def migrate_to_v3(self) -> None: | ||
| """执行知识库数据库 v3 迁移 | ||
|
|
||
| 变更: | ||
| - knowledge_bases.default_index_mode | ||
| - kb_documents.index_mode | ||
| """ | ||
| async with self.get_db() as session: | ||
| session: AsyncSession | ||
| async with session.begin(): | ||
| try: | ||
| await session.execute( | ||
| text( | ||
| "ALTER TABLE knowledge_bases " | ||
| "ADD COLUMN default_index_mode VARCHAR(20) DEFAULT 'flat'", | ||
| ), | ||
| ) | ||
| except Exception: | ||
| pass | ||
| try: | ||
| await session.execute( | ||
| text( | ||
| "ALTER TABLE kb_documents " | ||
| "ADD COLUMN index_mode VARCHAR(20) DEFAULT 'flat'", | ||
| ), | ||
| ) | ||
| except Exception: | ||
| pass | ||
| try: | ||
| await session.execute( | ||
| text( | ||
| "UPDATE knowledge_bases SET default_index_mode = 'flat' " | ||
| "WHERE default_index_mode IS NULL OR default_index_mode = ''", | ||
| ), | ||
| ) | ||
| except Exception: | ||
| pass | ||
| try: | ||
| await session.execute( | ||
| text( | ||
| "UPDATE kb_documents SET index_mode = 'flat' " | ||
| "WHERE index_mode IS NULL OR index_mode = ''", | ||
| ), | ||
| ) | ||
| except Exception: | ||
| pass | ||
| await session.commit() | ||
|
|
There was a problem hiding this comment.
The try...except Exception: pass blocks in this migration function are too broad and can silently swallow unexpected errors, making debugging difficult. For robustness, please consider adopting the pattern from migrate_to_v2, where specific exceptions like (ProgrammingError, OperationalError) are caught and inspected. This ensures that only expected errors (like a column already existing) are ignored, while other potential issues are properly logged or raised.
| stack.pop() | ||
|
|
||
| parent_path = stack[-1].path if stack else "" | ||
| path = f"{parent_path} > {title}" if parent_path else title |
There was a problem hiding this comment.
There appears to be an inconsistency in the section path separator. This implementation uses > as the separator. However, the architecture documentation (docs/knowledge-base-architecture.md) and the ReadDocumentSectionTool's description in astrbot/core/astr_main_agent_resources.py both use / in their examples (e.g., 'Chapter 1/API'). This could cause confusion and issues for the astr_kb_read_section tool. To ensure consistency, please align the separator across the implementation, documentation, and tool descriptions. Using > is a good choice to avoid ambiguity with file path separators, but all related code and documentation should reflect this.
|
|
||
| > 对应 Issue: #5262 | ||
| > 分支: `rag-enhancement` | ||
| > 最后更新: 2026-03-08 |
9ec2b61 to
1b1da3a
Compare
实现#5262
Modifications / 改动点
重构知识库底层架构,提升稳定性和健壮性
新增
kb_db_sqlite.py知识库数据库抽象层新增
index_rebuilder.py索引重建器新增
structure_parser.py结构解析器优化 FAISS 向量数据库实现 (
vec_db.py,document_storage.py,embedding_storage.py)完善知识库管理器和帮助类 (
kb_mgr.py,kb_helper.py)新增知识库回归测试
tests/core/test_kb_architecture_refactor.py新增架构文档
docs/knowledge-base-architecture.mdThis is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
单元测试和短期实机使用测试都已经进行。
但是前端还未修改,暂时还未做前端主动切换嵌入式模型时,重建向量索引的测试
Checklist / 检查清单
Summary by Sourcery
优化知识库架构,以支持按提供商隔离的 FAISS 索引、结构化文档索引以及在保持备份与检索流程兼容的前提下进行安全的在线索引重建。
New Features:
Enhancements:
Documentation:
Tests:
Original summary in English
Summary by Sourcery
Refine the knowledge base architecture to support provider-specific FAISS indexes, structured document indexing, and safe online index rebuilding while keeping backups and retrieval flows compatible.
New Features:
Enhancements:
Documentation:
Tests: