tmp_new_method.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. async def _call_llm_for_secondary_classification(
  2. self,
  3. first_category: str,
  4. first_category_code: str,
  5. level2_titles: List[str]
  6. ) -> Optional[Dict[str, Any]]:
  7. """
  8. 调用LLM进行二级分类(并发版)
  9. 使用 function_name 从 model_setting.yaml 加载模型配置
  10. """
  11. # 获取该一级分类的二级分类标准和映射
  12. secondary_standards = self.prompt_loader.get_secondary_standards(first_category)
  13. secondary_mapping = self.prompt_loader.get_secondary_mapping(first_category)
  14. # 构建层级路径和内容预览(简化处理)
  15. hierarchy_path = f"{first_category}"
  16. content_preview = "\n".join(f"- {title}" for title in level2_titles)
  17. # 并发控制
  18. semaphore = asyncio.Semaphore(self._concurrency)
  19. async def classify_single_title(chunk_title: str) -> Dict[str, Any]:
  20. """对单个二级标题进行分类(带重试)"""
  21. prompt = self.prompt_loader.render(
  22. "chunk_secondary_classification",
  23. first_category=first_category,
  24. chunk_title=chunk_title,
  25. hierarchy_path=hierarchy_path,
  26. content_preview=content_preview,
  27. secondary_standards=secondary_standards,
  28. )
  29. # 带重试的LLM调用
  30. max_retries = 3
  31. async with semaphore:
  32. for attempt in range(max_retries):
  33. try:
  34. content = await generate_model_client.get_model_generate_invoke(
  35. trace_id="hierarchy_classifier_secondary",
  36. system_prompt=prompt["system"],
  37. user_prompt=prompt["user"],
  38. function_name=self.FUNCTION_NAME_SECONDARY,
  39. )
  40. result = _extract_json(content)
  41. if result and isinstance(result, dict) and "category_index" in result:
  42. category_index = result.get("category_index", 0)
  43. # 映射编号到代码和名称
  44. if category_index > 0 and category_index in secondary_mapping:
  45. mapped = secondary_mapping[category_index]
  46. return {
  47. "title": chunk_title,
  48. "category_index": category_index,
  49. "category_code": mapped.get("code", ""),
  50. "category_name": mapped.get("name", ""),
  51. "raw_response": content,
  52. }
  53. else:
  54. # 编号为0或未找到映射,标记为非标准项
  55. return {
  56. "title": chunk_title,
  57. "category_index": category_index,
  58. "category_code": "non_standard",
  59. "category_name": "非标准项",
  60. "raw_response": content,
  61. }
  62. else:
  63. logger.warning(f"[二级分类] JSON解析失败或缺少category_index: {chunk_title}, 尝试: {attempt + 1}/{max_retries}")
  64. if attempt == max_retries - 1:
  65. # 最后一次尝试失败,使用默认值
  66. return {
  67. "title": chunk_title,
  68. "category_index": 0,
  69. "category_code": "non_standard",
  70. "category_name": "非标准项",
  71. "raw_response": content,
  72. "error": "JSON解析失败",
  73. }
  74. except Exception as e:
  75. logger.error(f"[二级分类] LLM调用失败: {chunk_title}, 错误: {e}, 尝试: {attempt + 1}/{max_retries}")
  76. if attempt == max_retries - 1:
  77. return {
  78. "title": chunk_title,
  79. "category_index": 0,
  80. "category_code": "non_standard",
  81. "category_name": "非标准项",
  82. "error": str(e),
  83. }
  84. # 不会到达这里,但保留以防万一
  85. return {
  86. "title": chunk_title,
  87. "category_index": 0,
  88. "category_code": "non_standard",
  89. "category_name": "非标准项",
  90. "error": "未知错误",
  91. }
  92. # 并发执行所有二级标题的分类
  93. tasks = [classify_single_title(title) for title in level2_titles]
  94. results = await asyncio.gather(*tasks)
  95. return {
  96. "first_category": first_category,
  97. "first_category_code": first_category_code,
  98. "level2_count": len(level2_titles),
  99. "classifications": results,
  100. }