|
|
@@ -1,7 +1,7 @@
|
|
|
"""
|
|
|
-内容块分类模块(二级和三级分类)
|
|
|
+内容块分类模块(二级分类)
|
|
|
|
|
|
-对已经完成一级分类的内容块进行二级和三级分类
|
|
|
+对已经完成一级分类的内容块进行二级分类
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
@@ -22,22 +22,6 @@ from ..config.provider import default_config_provider
|
|
|
from ..utils.prompt_loader import PromptLoader
|
|
|
|
|
|
|
|
|
-# 延迟导入新的三级分类器(避免循环导入)
|
|
|
-_LLM_CONTENT_CLASSIFIER = None
|
|
|
-
|
|
|
-
|
|
|
-def _get_llm_content_classifier():
|
|
|
- """延迟导入 LLMContentClassifier"""
|
|
|
- global _LLM_CONTENT_CLASSIFIER
|
|
|
- if _LLM_CONTENT_CLASSIFIER is None:
|
|
|
- from ...reviewers.utils.llm_content_classifier_v2 import (
|
|
|
- LLMContentClassifier,
|
|
|
- ClassifierConfig
|
|
|
- )
|
|
|
- _LLM_CONTENT_CLASSIFIER = (LLMContentClassifier, ClassifierConfig)
|
|
|
- return _LLM_CONTENT_CLASSIFIER
|
|
|
-
|
|
|
-
|
|
|
def _extract_json(text: str) -> Optional[Dict[str, Any]]:
|
|
|
"""从字符串中提取第一个有效 JSON 对象"""
|
|
|
for pattern in [r"```json\s*(\{.*?})\s*```", r"```\s*(\{.*?})\s*```"]:
|
|
|
@@ -144,353 +128,6 @@ class ChunkClassifier:
|
|
|
|
|
|
return "\n".join(standards_lines) if standards_lines else "(无二级分类标准)", index_mapping
|
|
|
|
|
|
- def _build_tertiary_standards(self, first_category_code: str, second_category_code: str) -> tuple[str, dict]:
|
|
|
- """
|
|
|
- 构建三级分类标准文本
|
|
|
-
|
|
|
- 返回:
|
|
|
- (标准文本, 索引映射字典)
|
|
|
- """
|
|
|
- if first_category_code not in self.classification_tree:
|
|
|
- return "(无三级分类标准)", {}
|
|
|
-
|
|
|
- if second_category_code not in self.classification_tree[first_category_code]:
|
|
|
- return "(无三级分类标准)", {}
|
|
|
-
|
|
|
- third_items = self.classification_tree[first_category_code][second_category_code]["third_items"]
|
|
|
-
|
|
|
- if not third_items:
|
|
|
- return "(无三级分类标准)", {}
|
|
|
-
|
|
|
- standards_lines = [" 0. 非标准项 - 不符合以下任何类别"]
|
|
|
- index_mapping = {0: ("非标准项", "non_standard")}
|
|
|
-
|
|
|
- for idx, third_item in enumerate(third_items, 1):
|
|
|
- third_cn = third_item["third_cn"]
|
|
|
- third_code = third_item["third_code"]
|
|
|
- third_focus = third_item["third_focus"]
|
|
|
-
|
|
|
- # 保存索引映射
|
|
|
- index_mapping[idx] = (third_cn, third_code)
|
|
|
-
|
|
|
- if third_focus and third_focus != "NULL":
|
|
|
- standards_lines.append(f" {idx}. {third_cn} - 关注点:{third_focus}")
|
|
|
- else:
|
|
|
- standards_lines.append(f" {idx}. {third_cn}")
|
|
|
-
|
|
|
- return "\n".join(standards_lines), index_mapping
|
|
|
-
|
|
|
- # 默认模型(三级分类会从 model_setting.yaml 动态加载)
|
|
|
- DEFAULT_MODEL = "qwen3_5_122b_a10b"
|
|
|
-
|
|
|
- # 二级分类模型(从 model_setting.yaml 动态加载,配置 key: doc_classification_secondary)
|
|
|
- @property
|
|
|
- def SECONDARY_MODEL(self) -> str:
|
|
|
- """二级分类模型,从 model_setting.yaml 读取配置"""
|
|
|
- try:
|
|
|
- from foundation.ai.models.model_config_loader import get_model_for_function
|
|
|
- model = get_model_for_function("doc_classification_secondary")
|
|
|
- if model:
|
|
|
- return model
|
|
|
- except Exception as e:
|
|
|
- logger.debug(f"加载二级分类模型配置失败: {e}")
|
|
|
- return "qwen3_5_35b_a3b" # 兜底默认值
|
|
|
-
|
|
|
- async def _call_llm_once(
|
|
|
- self,
|
|
|
- system_prompt: str,
|
|
|
- user_prompt: str,
|
|
|
- model_name: Optional[str] = None
|
|
|
- ) -> Optional[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 单次异步 LLM 调用(使用统一的 GenerateModelClient)
|
|
|
-
|
|
|
- 参数:
|
|
|
- system_prompt: 系统提示词
|
|
|
- user_prompt: 用户提示词
|
|
|
- model_name: 模型名称,默认使用 DEFAULT_MODEL
|
|
|
-
|
|
|
- 失败返回 None,由调用方决定处理逻辑
|
|
|
- """
|
|
|
- if model_name is None:
|
|
|
- model_name = self.DEFAULT_MODEL
|
|
|
-
|
|
|
- try:
|
|
|
- content = await generate_model_client.get_model_generate_invoke(
|
|
|
- trace_id="chunk_classifier",
|
|
|
- system_prompt=system_prompt,
|
|
|
- user_prompt=user_prompt,
|
|
|
- model_name=model_name,
|
|
|
- )
|
|
|
- result = _extract_json(content)
|
|
|
- return result if result is not None else {"raw_content": content}
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[ChunkClassifier] LLM 调用失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
- async def _batch_call_llm(
|
|
|
- self,
|
|
|
- requests: List[tuple], # [(system_prompt, user_prompt), ...]
|
|
|
- model_name: Optional[str] = None,
|
|
|
- ) -> List[Optional[Dict[str, Any]]]:
|
|
|
- """
|
|
|
- 并发批量调用 LLM(带信号量控制)
|
|
|
-
|
|
|
- 参数:
|
|
|
- requests: 请求列表,每个元素是 (system_prompt, user_prompt) 元组
|
|
|
- model_name: 指定模型名称,None则使用默认模型
|
|
|
-
|
|
|
- 返回:
|
|
|
- 结果列表,与输入请求一一对应
|
|
|
- """
|
|
|
- semaphore = asyncio.Semaphore(self._concurrency)
|
|
|
-
|
|
|
- async def bounded_call(system_prompt: str, user_prompt: str):
|
|
|
- async with semaphore:
|
|
|
- return await self._call_llm_once(system_prompt, user_prompt, model_name)
|
|
|
-
|
|
|
- tasks = [bounded_call(sp, up) for sp, up in requests]
|
|
|
- return list(await asyncio.gather(*tasks))
|
|
|
-
|
|
|
- async def classify_chunks_secondary_async(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 异步对chunks进行二级分类(全部走LLM,移除本地规则)
|
|
|
-
|
|
|
- 参数:
|
|
|
- chunks: 已完成一级分类的chunk列表
|
|
|
-
|
|
|
- 返回:
|
|
|
- 添加了二级分类字段的chunk列表
|
|
|
- """
|
|
|
- logger.info(f"正在对 {len(chunks)} 个内容块进行二级分类(LLM全量)...")
|
|
|
-
|
|
|
- # 准备LLM请求
|
|
|
- llm_requests = []
|
|
|
- valid_chunks = []
|
|
|
- index_mappings = [] # 保存每个请求对应的索引映射
|
|
|
-
|
|
|
- for chunk in chunks:
|
|
|
- first_category_code = chunk.get("chapter_classification", "")
|
|
|
- chunk_title = chunk.get("section_label", "")
|
|
|
- hierarchy_path = " -> ".join(chunk.get("hierarchy_path", []))
|
|
|
- content = chunk.get("review_chunk_content", "")
|
|
|
- content_preview = content[:300] if content else ""
|
|
|
-
|
|
|
- # 获取一级分类的中文名称
|
|
|
- first_category_cn = self._get_first_category_cn(first_category_code)
|
|
|
-
|
|
|
- # 构建二级分类标准(返回标准文本和索引映射)
|
|
|
- secondary_standards, index_mapping = self._build_secondary_standards(first_category_code)
|
|
|
-
|
|
|
- if secondary_standards == "(无二级分类标准)":
|
|
|
- # 如果没有二级分类标准,跳过
|
|
|
- chunk["secondary_category_cn"] = "无"
|
|
|
- chunk["secondary_category_code"] = "none"
|
|
|
- continue
|
|
|
-
|
|
|
- # 渲染提示词
|
|
|
- prompt = self.prompt_loader.render(
|
|
|
- "chunk_secondary_classification",
|
|
|
- first_category=first_category_cn,
|
|
|
- chunk_title=chunk_title,
|
|
|
- hierarchy_path=hierarchy_path,
|
|
|
- content_preview=content_preview,
|
|
|
- secondary_standards=secondary_standards
|
|
|
- )
|
|
|
-
|
|
|
- llm_requests.append((prompt["system"], prompt["user"]))
|
|
|
- valid_chunks.append(chunk)
|
|
|
- index_mappings.append(index_mapping)
|
|
|
-
|
|
|
- if not llm_requests:
|
|
|
- logger.info("所有内容块都没有二级分类标准,跳过二级分类")
|
|
|
- return chunks
|
|
|
-
|
|
|
- # 全部走LLM分类
|
|
|
- logger.info(f"[二级分类] 全部 {len(valid_chunks)} 个内容块走LLM分类")
|
|
|
-
|
|
|
- llm_results = await self._batch_call_llm(llm_requests, model_name=self.SECONDARY_MODEL)
|
|
|
-
|
|
|
- # 处理LLM结果
|
|
|
- for chunk, llm_result, index_mapping in zip(valid_chunks, llm_results, index_mappings):
|
|
|
- if llm_result and isinstance(llm_result, dict):
|
|
|
- category_index = llm_result.get("category_index")
|
|
|
-
|
|
|
- if isinstance(category_index, int) and category_index in index_mapping:
|
|
|
- secondary_cn, secondary_code = index_mapping[category_index]
|
|
|
- chunk["secondary_category_code"] = secondary_code
|
|
|
- chunk["secondary_category_cn"] = secondary_cn
|
|
|
- else:
|
|
|
- # LLM返回无效,使用非标准项
|
|
|
- chunk["secondary_category_code"] = "non_standard"
|
|
|
- chunk["secondary_category_cn"] = "非标准项"
|
|
|
- else:
|
|
|
- # LLM调用失败
|
|
|
- chunk["secondary_category_code"] = "non_standard"
|
|
|
- chunk["secondary_category_cn"] = "非标准项"
|
|
|
-
|
|
|
- logger.info("二级分类完成!")
|
|
|
- return chunks
|
|
|
-
|
|
|
- async def classify_chunks_tertiary_async(
|
|
|
- self,
|
|
|
- chunks: List[Dict[str, Any]],
|
|
|
- use_enhanced_classifier: bool = True,
|
|
|
- classifier_config: Optional[Any] = None,
|
|
|
- progress_callback: Optional[Any] = None
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 异步对chunks进行三级分类
|
|
|
-
|
|
|
- 参数:
|
|
|
- chunks: 已完成二级分类的chunk列表
|
|
|
- use_enhanced_classifier: 是否使用增强型分类器(行级细粒度、多分类、Embedding优化)
|
|
|
- - True: 使用新的 llm_content_classifier_v2(推荐)
|
|
|
- - False: 使用原有逐chunk分类方式
|
|
|
- classifier_config: 增强型分类器的配置对象(ClassifierConfig),为None时使用默认配置
|
|
|
- progress_callback: 进度回调函数 (completed, total, section_name, success) -> None,支持 async
|
|
|
-
|
|
|
- 返回:
|
|
|
- 添加了三级分类字段的chunk列表
|
|
|
-
|
|
|
- 新增字段(use_enhanced_classifier=True时):
|
|
|
- - tertiary_category_code: 三级分类代码
|
|
|
- - tertiary_category_cn: 三级分类名称
|
|
|
- - tertiary_classification_details: 行级分类详情列表,每个条目包含:
|
|
|
- - third_category_code: 三级分类代码
|
|
|
- - third_category_name: 三级分类名称
|
|
|
- - start_line: 起始行号
|
|
|
- - end_line: 结束行号
|
|
|
- - content: 原文内容
|
|
|
- """
|
|
|
- if use_enhanced_classifier:
|
|
|
- return await self._classify_chunks_tertiary_enhanced(chunks, classifier_config, progress_callback)
|
|
|
- else:
|
|
|
- return await self._classify_chunks_tertiary_legacy(chunks)
|
|
|
-
|
|
|
- async def _classify_chunks_tertiary_enhanced(
|
|
|
- self,
|
|
|
- chunks: List[Dict[str, Any]],
|
|
|
- config: Optional[Any] = None,
|
|
|
- progress_callback: Optional[Any] = None
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 使用增强型分类器进行三级分类
|
|
|
-
|
|
|
- 特点:
|
|
|
- - 行级细粒度分类
|
|
|
- - 支持一个段落包含多个三级分类
|
|
|
- - Embedding 相似度优化(跳过明显对应的段落)
|
|
|
- - 全局行号支持
|
|
|
- """
|
|
|
- logger.info(f"正在使用增强型分类器对 {len(chunks)} 个内容块进行三级分类... 特点: 行级细粒度 | 多分类支持 | Embedding优化")
|
|
|
-
|
|
|
- try:
|
|
|
- LLMContentClassifier, ClassifierConfig = _get_llm_content_classifier()
|
|
|
- except ImportError as e:
|
|
|
- logger.warning(f"无法导入增强型分类器,回退到传统方式: {e}")
|
|
|
- return await self._classify_chunks_tertiary_legacy(chunks)
|
|
|
-
|
|
|
- # 创建分类器实例
|
|
|
- if config is None:
|
|
|
- config = ClassifierConfig()
|
|
|
- # 使用与二级分类相同的并发度
|
|
|
- config.max_concurrent_requests = self._concurrency
|
|
|
-
|
|
|
- # 从全局配置加载模型和thinking模式
|
|
|
- try:
|
|
|
- from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
|
|
|
- config.model = get_model_for_function("doc_classification_tertiary")
|
|
|
- config.enable_thinking = get_thinking_mode_for_function("doc_classification_tertiary") or False
|
|
|
- logger.info(f"三级分类配置 - 并发度: {config.max_concurrent_requests}, 模型: {config.model}, thinking: {config.enable_thinking}")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"加载模型配置失败,使用默认配置: {e}")
|
|
|
- config.model = "qwen3_5_35b_a3b"
|
|
|
- config.enable_thinking = False
|
|
|
-
|
|
|
- classifier = LLMContentClassifier(config)
|
|
|
-
|
|
|
- # 调用增强型分类器
|
|
|
- updated_chunks = await classifier.classify_chunks(chunks, progress_callback=progress_callback)
|
|
|
-
|
|
|
- return updated_chunks
|
|
|
-
|
|
|
- async def _classify_chunks_tertiary_legacy(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 传统三级分类方式(逐chunk分类)
|
|
|
-
|
|
|
- 每个chunk只能属于一个三级分类
|
|
|
- """
|
|
|
- logger.info(f"正在对 {len(chunks)} 个内容块进行三级分类...")
|
|
|
-
|
|
|
- # 准备LLM请求
|
|
|
- llm_requests = []
|
|
|
- valid_chunks = []
|
|
|
- index_mappings = [] # 保存每个请求对应的索引映射
|
|
|
-
|
|
|
- for chunk in chunks:
|
|
|
- first_category_code = chunk.get("chapter_classification", "")
|
|
|
- second_category_code = chunk.get("secondary_category_code", "")
|
|
|
- second_category_cn = chunk.get("secondary_category_cn", "")
|
|
|
- chunk_title = chunk.get("section_label", "")
|
|
|
- content = chunk.get("review_chunk_content", "")
|
|
|
- content_preview = content[:300] if content else ""
|
|
|
-
|
|
|
- # 获取一级分类的中文名称
|
|
|
- first_category_cn = self._get_first_category_cn(first_category_code)
|
|
|
-
|
|
|
- # 构建三级分类标准(返回标准文本和索引映射)
|
|
|
- tertiary_standards, index_mapping = self._build_tertiary_standards(first_category_code, second_category_code)
|
|
|
-
|
|
|
- if tertiary_standards == "(无三级分类标准)":
|
|
|
- # 如果没有三级分类标准,跳过
|
|
|
- chunk["tertiary_category_cn"] = "无"
|
|
|
- chunk["tertiary_category_code"] = "none"
|
|
|
- continue
|
|
|
-
|
|
|
- # 渲染提示词
|
|
|
- prompt = self.prompt_loader.render(
|
|
|
- "chunk_tertiary_classification",
|
|
|
- first_category=first_category_cn,
|
|
|
- secondary_category=second_category_cn,
|
|
|
- chunk_title=chunk_title,
|
|
|
- content_preview=content_preview,
|
|
|
- tertiary_standards=tertiary_standards
|
|
|
- )
|
|
|
-
|
|
|
- llm_requests.append((prompt["system"], prompt["user"]))
|
|
|
- valid_chunks.append(chunk)
|
|
|
- index_mappings.append(index_mapping)
|
|
|
-
|
|
|
- if not llm_requests:
|
|
|
- logger.info("所有内容块都没有三级分类标准,跳过三级分类")
|
|
|
- return chunks
|
|
|
-
|
|
|
- # 批量异步调用LLM API
|
|
|
- llm_results = await self._batch_call_llm(llm_requests)
|
|
|
-
|
|
|
- # 处理分类结果
|
|
|
- for chunk, llm_result, index_mapping in zip(valid_chunks, llm_results, index_mappings):
|
|
|
- if llm_result and isinstance(llm_result, dict):
|
|
|
- category_index = llm_result.get("category_index")
|
|
|
-
|
|
|
- # 验证索引并映射到类别
|
|
|
- if isinstance(category_index, int) and category_index in index_mapping:
|
|
|
- tertiary_cn, tertiary_code = index_mapping[category_index]
|
|
|
- chunk["tertiary_category_cn"] = tertiary_cn
|
|
|
- chunk["tertiary_category_code"] = tertiary_code
|
|
|
- else:
|
|
|
- # 索引无效,归类为非标准项
|
|
|
- logger.warning(f"LLM返回的索引 {category_index} 无效,归类为'非标准项'")
|
|
|
- chunk["tertiary_category_cn"] = "非标准项"
|
|
|
- chunk["tertiary_category_code"] = "non_standard"
|
|
|
- else:
|
|
|
- chunk["tertiary_category_cn"] = "非标准项"
|
|
|
- chunk["tertiary_category_code"] = "non_standard"
|
|
|
-
|
|
|
- logger.info("三级分类完成!")
|
|
|
- return chunks
|
|
|
-
|
|
|
def _get_first_category_cn(self, first_category_code: str) -> str:
|
|
|
"""获取一级分类的中文名称"""
|
|
|
category_mapping = {
|
|
|
@@ -515,24 +152,3 @@ class ChunkClassifier:
|
|
|
except RuntimeError:
|
|
|
raise RuntimeError("请使用 await classify_chunks_secondary_async")
|
|
|
|
|
|
- def classify_chunks_tertiary(
|
|
|
- self,
|
|
|
- chunks: List[Dict[str, Any]],
|
|
|
- use_enhanced_classifier: bool = True,
|
|
|
- classifier_config: Optional[Any] = None
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """同步包装:三级分类
|
|
|
-
|
|
|
- Args:
|
|
|
- chunks: 已完成二级分类的chunk列表
|
|
|
- use_enhanced_classifier: 是否使用增强型分类器(默认True)
|
|
|
- classifier_config: 增强型分类器配置(可选)
|
|
|
- """
|
|
|
- try:
|
|
|
- return asyncio.run(self.classify_chunks_tertiary_async(
|
|
|
- chunks,
|
|
|
- use_enhanced_classifier=use_enhanced_classifier,
|
|
|
- classifier_config=classifier_config
|
|
|
- ))
|
|
|
- except RuntimeError:
|
|
|
- raise RuntimeError("请使用 await classify_chunks_tertiary_async")
|