async def _call_llm_for_secondary_classification( self, first_category: str, first_category_code: str, level2_titles: List[str] ) -> Optional[Dict[str, Any]]: """ 调用LLM进行二级分类(并发版) 使用 function_name 从 model_setting.yaml 加载模型配置 """ # 获取该一级分类的二级分类标准和映射 secondary_standards = self.prompt_loader.get_secondary_standards(first_category) secondary_mapping = self.prompt_loader.get_secondary_mapping(first_category) # 构建层级路径和内容预览(简化处理) hierarchy_path = f"{first_category}" content_preview = "\n".join(f"- {title}" for title in level2_titles) # 并发控制 semaphore = asyncio.Semaphore(self._concurrency) async def classify_single_title(chunk_title: str) -> Dict[str, Any]: """对单个二级标题进行分类(带重试)""" prompt = self.prompt_loader.render( "chunk_secondary_classification", first_category=first_category, chunk_title=chunk_title, hierarchy_path=hierarchy_path, content_preview=content_preview, secondary_standards=secondary_standards, ) # 带重试的LLM调用 max_retries = 3 async with semaphore: for attempt in range(max_retries): try: content = await generate_model_client.get_model_generate_invoke( trace_id="hierarchy_classifier_secondary", system_prompt=prompt["system"], user_prompt=prompt["user"], function_name=self.FUNCTION_NAME_SECONDARY, ) result = _extract_json(content) if result and isinstance(result, dict) and "category_index" in result: category_index = result.get("category_index", 0) # 映射编号到代码和名称 if category_index > 0 and category_index in secondary_mapping: mapped = secondary_mapping[category_index] return { "title": chunk_title, "category_index": category_index, "category_code": mapped.get("code", ""), "category_name": mapped.get("name", ""), "raw_response": content, } else: # 编号为0或未找到映射,标记为非标准项 return { "title": chunk_title, "category_index": category_index, "category_code": "non_standard", "category_name": "非标准项", "raw_response": content, } else: logger.warning(f"[二级分类] JSON解析失败或缺少category_index: {chunk_title}, 尝试: {attempt + 1}/{max_retries}") if attempt == max_retries - 1: # 最后一次尝试失败,使用默认值 return { "title": chunk_title, "category_index": 0, "category_code": "non_standard", "category_name": "非标准项", "raw_response": content, "error": "JSON解析失败", } except Exception as e: logger.error(f"[二级分类] LLM调用失败: {chunk_title}, 错误: {e}, 尝试: {attempt + 1}/{max_retries}") if attempt == max_retries - 1: return { "title": chunk_title, "category_index": 0, "category_code": "non_standard", "category_name": "非标准项", "error": str(e), } # 不会到达这里,但保留以防万一 return { "title": chunk_title, "category_index": 0, "category_code": "non_standard", "category_name": "非标准项", "error": "未知错误", } # 并发执行所有二级标题的分类 tasks = [classify_single_title(title) for title in level2_titles] results = await asyncio.gather(*tasks) return { "first_category": first_category, "first_category_code": first_category_code, "level2_count": len(level2_titles), "classifications": results, }