Parcourir la source

chore: 清理冗余文件及小幅修复

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing il y a 2 semaines
Parent
commit
adc0593910

+ 3 - 3
.gitignore

@@ -47,7 +47,7 @@ htmlcov/
 nosetests.xml
 coverage.xml
 *,cover
-
+审查规则提取/
 # Translations
 *.mo
 *.pot
@@ -64,8 +64,8 @@ docs/_build/
 target/
 
 todo.md
-.design
-.claude
+.design/
+.claude/
 .R&D
 .RaD
 temp/

+ 0 - 0
__init__.py


+ 4 - 2
core/construction_review/component/reviewers/grammar_check_reviewer.py

@@ -24,7 +24,8 @@ class GrammarCheckReviewer:
         trace_id: str,
         review_content: str,
         state: Dict[str, Any] = None,
-        stage_name: str = None
+        stage_name: str = None,
+        enable_thinking: bool = False,
     ) -> ReviewResult:
         """
         执行词句语法检查
@@ -63,7 +64,8 @@ class GrammarCheckReviewer:
             model_response = await self.model_client.get_model_generate_invoke(
                 trace_id=trace_id,
                 messages=messages,
-                function_name="grammar_check"
+                function_name="grammar_check",
+                enable_thinking=enable_thinking,
             )
 
             logger.info(f"词句语法检查模型响应成功,响应长度: {len(model_response)}")

+ 49 - 17
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/embedding_client.py

@@ -6,6 +6,7 @@ Embedding 客户端
 统一通过 model_handler 获取 Embedding 模型,配置从 config.ini 读取
 """
 
+import asyncio
 import math
 import re
 from typing import List, Optional, Tuple
@@ -18,10 +19,15 @@ from foundation.observability.logger.loggering import review_logger as logger
 class EmbeddingClient:
     """Embedding模型客户端,用于计算文本相似度"""
 
+    # 连续失败次数阈值,超过后清除缓存触发降级
+    _FAILURE_THRESHOLD = 3
+    # 重试次数
+    _MAX_RETRIES = 2
+
     def __init__(self):
         """初始化 Embedding 客户端,通过 model_handler 获取模型"""
-        # 统一通过 model_handler 获取 Embedding 模型
         self._embedding_model = None
+        self._consecutive_failures = 0
 
     @property
     def embedding_model(self):
@@ -30,25 +36,51 @@ class EmbeddingClient:
             self._embedding_model = model_handler.get_embedding_model()
         return self._embedding_model
 
+    def _invalidate_cache(self):
+        """清除本地和 model_handler 的 embedding 缓存,触发降级重新初始化"""
+        self._embedding_model = None
+        self._consecutive_failures = 0
+        # 清除 model_handler 中的 embedding 缓存,使下次 get_embedding_model 重新走初始化+降级逻辑
+        for key in list(model_handler._model_cache.keys()):
+            if "embed" in key.lower():
+                del model_handler._model_cache[key]
+                logger.info(f"已清除 model_handler embedding 缓存: {key}")
+
     async def get_embedding(self, text: str) -> Optional[List[float]]:
-        """获取文本的embedding向量"""
-        try:
-            # 使用 model_handler 提供的 embedding 模型
-            embedding = self.embedding_model.embed_query(text)
-            return embedding
-        except Exception as e:
-            logger.error(f"Embedding API调用失败: {e}")
-            return None
+        """获取文本的embedding向量,带重试和缓存失效机制"""
+        for attempt in range(self._MAX_RETRIES + 1):
+            try:
+                embedding = self.embedding_model.embed_query(text)
+                self._consecutive_failures = 0
+                return embedding
+            except Exception as e:
+                if attempt < self._MAX_RETRIES:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                self._consecutive_failures += 1
+                logger.error(f"Embedding API调用失败 (连续第{self._consecutive_failures}次): {e}")
+                if self._consecutive_failures >= self._FAILURE_THRESHOLD:
+                    logger.warning("Embedding连续失败超过阈值,清除缓存触发降级")
+                    self._invalidate_cache()
+                return None
 
     async def get_embeddings_batch(self, texts: List[str]) -> List[Optional[List[float]]]:
-        """批量获取文本的embedding向量"""
-        try:
-            # 使用 model_handler 提供的 embedding 模型
-            embeddings = self.embedding_model.embed_documents(texts)
-            return embeddings
-        except Exception as e:
-            logger.error(f"Embedding API批量调用失败: {e}")
-            return [None] * len(texts)
+        """批量获取文本的embedding向量,带重试和缓存失效机制"""
+        for attempt in range(self._MAX_RETRIES + 1):
+            try:
+                embeddings = self.embedding_model.embed_documents(texts)
+                self._consecutive_failures = 0
+                return embeddings
+            except Exception as e:
+                if attempt < self._MAX_RETRIES:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                self._consecutive_failures += 1
+                logger.error(f"Embedding API批量调用失败 (连续第{self._consecutive_failures}次): {e}")
+                if self._consecutive_failures >= self._FAILURE_THRESHOLD:
+                    logger.warning("Embedding连续失败超过阈值,清除缓存触发降级")
+                    self._invalidate_cache()
+                return [None] * len(texts)
 
     def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
         """计算两个向量的余弦相似度"""

+ 2 - 2
core/construction_review/component/reviewers/utils/prompt_loader.py

@@ -186,7 +186,7 @@ class PromptLoader:
             # 创建ChatPromptTemplate
             template = ChatPromptTemplate.from_messages([
                 ("system", prompt_config['system_prompt']),
-                ("user", prompt_config['user_prompt_template']+" /no_think")
+                ("user", prompt_config['user_prompt_template'])
             ])
 
             # 如果有变量,使用partial方法预先填充
@@ -205,7 +205,7 @@ class PromptLoader:
                         user_text = user_text.replace(placeholder, str(value))
                     template = ChatPromptTemplate.from_messages([
                         ("system", system_text),
-                        ("user", user_text + " /no_think")
+                        ("user", user_text)
                     ])
 
             return template

+ 0 - 59
demo.py

@@ -1,59 +0,0 @@
-import requests
-
-# 目标请求地址
-url = 'http://183.220.37.46:23424/file_parse'
-
-# 请求头
-headers = {
-    'accept': 'application/json'
-}
-
-# 构建表单数据和文件
-files = {
-    # 各种表单参数
-    'return_middle_json': (None, 'false'),
-    'return_model_output': (None, 'false'),
-    'return_md': (None, 'true'),
-    'return_images': (None, 'false'),
-    'end_page_id': (None, '99999'),
-    'parse_method': (None, 'auto'),
-    'start_page_id': (None, '0'),
-    'lang_list': (None, 'ch'),
-    'output_dir': (None, './output'),
-    'server_url': (None, 'string'),
-    'return_content_list': (None, 'false'),
-    'backend': (None, 'hybrid-auto-engine'),
-    'table_enable': (None, 'true'),
-    'response_format_zip': (None, 'false'),
-    'formula_enable': (None, 'true'),
-    # 文件上传字段(核心)
-    'files': open(r'/D:/wx_work/sichuan_luqiao/lu_sgsc_testfile/施工方案/301_四川川交路桥有限责任公司秦唐高速公路唐山段ZT1合同项目部.pdf', 'rb')
-}
-
-try:
-    # 发送 POST 请求
-    response = requests.post(
-        url=url,
-        headers=headers,
-        files=files,
-        # 设置超时时间,避免请求一直挂起
-        timeout=600
-    )
-    
-    # 打印响应结果
-    print(f"响应状态码: {response.status_code}")
-    print(f"响应内容: {response.json()}")
-
-except FileNotFoundError as e:
-    print(f"错误:文件未找到 - {e}")
-except requests.exceptions.Timeout as e:
-    print(f"错误:请求超时 - {e}")
-except requests.exceptions.RequestException as e:
-    print(f"错误:请求失败 - {e}")
-except ValueError as e:
-    print(f"错误:响应不是有效的JSON - {e}")
-    print(f"原始响应内容: {response.text}")
-finally:
-    # 确保文件句柄关闭
-    if 'files' in locals() and hasattr(files['files'], 'close'):
-        files['files'].close()

+ 3 - 0
foundation/ai/models/model_handler.py

@@ -935,6 +935,7 @@ class ModelHandler:
                 timeout=self.REQUEST_TIMEOUT,
                 tiktoken_enabled=False,
                 check_embedding_ctx_length=False,
+                max_retries=0,  # 禁用SDK内置重试,由EmbeddingClient统一管理
             )
 
             logger.info(f"本地Qwen3-Embedding-8B模型初始化成功: {model_id}")
@@ -983,6 +984,7 @@ class ModelHandler:
                 timeout=self.REQUEST_TIMEOUT,
                 tiktoken_enabled=False,
                 check_embedding_ctx_length=False,
+                max_retries=0,  # 禁用SDK内置重试,由EmbeddingClient统一管理
             )
 
             logger.info(f"硅基流动Embedding模型初始化成功: {model_id} (dimensions: {dimensions})")
@@ -1132,6 +1134,7 @@ class ModelHandler:
                 timeout=self.REQUEST_TIMEOUT,
                 tiktoken_enabled=False,
                 check_embedding_ctx_length=False,
+                max_retries=0,  # 禁用SDK内置重试,由EmbeddingClient统一管理
             )
 
             logger.info(f"蜀天Qwen3-Embedding-8B模型初始化成功: {model_id}")

+ 29 - 6
foundation/infrastructure/mysql/async_mysql_conn_pool.py

@@ -21,7 +21,18 @@ class AsyncMySQLPool:
             self._initialized = False
 
     async def initialize(self, max_retries=3, retry_delay=2):
-        """初始化连接池,支持重试"""
+        """初始化连接池,支持重试。已初始化且连接池健康时直接返回。"""
+        if self._initialized and self._pool and not self._pool._closed:
+            return
+
+        # 关闭旧池(如果有)
+        if self._pool and not self._pool._closed:
+            server_logger.info("关闭旧连接池...")
+            self._pool.close()
+            await self._pool.wait_closed()
+            self._pool = None
+            self._initialized = False
+
         last_error = None
 
         for attempt in range(1, max_retries + 1):
@@ -37,7 +48,8 @@ class AsyncMySQLPool:
                     minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
                     maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
                     autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
-                    connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30"))
+                    connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")),
+                    pool_recycle=1800
                 )
                 self._initialized = True
                 server_logger.info("异步MySQL连接池初始化成功")
@@ -61,12 +73,23 @@ class AsyncMySQLPool:
     
     @asynccontextmanager
     async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
-        """获取数据库连接的上下文管理器"""
-        if not self._initialized:
-            # 如果没有初始化,使用默认配置初始化
+        """获取数据库连接的上下文管理器,带连接健康检查"""
+        if not self._initialized or not self._pool or self._pool._closed:
             await self.initialize()
-        
+
         async with self._pool.acquire() as conn:
+            try:
+                await conn.ping()
+            except Exception:
+                server_logger.warning("连接已失效,尝试重新初始化连接池")
+                await self.initialize()
+                async with self._pool.acquire() as new_conn:
+                    try:
+                        yield new_conn
+                    except Exception as e:
+                        server_logger.error(f"数据库连接操作失败: {e}")
+                        raise
+                return
             try:
                 yield conn
             except Exception as e:

+ 0 - 108
tmp_new_method.py

@@ -1,108 +0,0 @@
-async def _call_llm_for_secondary_classification(
-        self,
-        first_category: str,
-        first_category_code: str,
-        level2_titles: List[str]
-    ) -> Optional[Dict[str, Any]]:
-        """
-        调用LLM进行二级分类(并发版)
-
-        使用 function_name 从 model_setting.yaml 加载模型配置
-        """
-        # 获取该一级分类的二级分类标准和映射
-        secondary_standards = self.prompt_loader.get_secondary_standards(first_category)
-        secondary_mapping = self.prompt_loader.get_secondary_mapping(first_category)
-
-        # 构建层级路径和内容预览(简化处理)
-        hierarchy_path = f"{first_category}"
-        content_preview = "\n".join(f"- {title}" for title in level2_titles)
-
-        # 并发控制
-        semaphore = asyncio.Semaphore(self._concurrency)
-
-        async def classify_single_title(chunk_title: str) -> Dict[str, Any]:
-            """对单个二级标题进行分类(带重试)"""
-            prompt = self.prompt_loader.render(
-                "chunk_secondary_classification",
-                first_category=first_category,
-                chunk_title=chunk_title,
-                hierarchy_path=hierarchy_path,
-                content_preview=content_preview,
-                secondary_standards=secondary_standards,
-            )
-
-            # 带重试的LLM调用
-            max_retries = 3
-            async with semaphore:
-                for attempt in range(max_retries):
-                    try:
-                        content = await generate_model_client.get_model_generate_invoke(
-                            trace_id="hierarchy_classifier_secondary",
-                            system_prompt=prompt["system"],
-                            user_prompt=prompt["user"],
-                            function_name=self.FUNCTION_NAME_SECONDARY,
-                        )
-                        result = _extract_json(content)
-                        if result and isinstance(result, dict) and "category_index" in result:
-                            category_index = result.get("category_index", 0)
-                            # 映射编号到代码和名称
-                            if category_index > 0 and category_index in secondary_mapping:
-                                mapped = secondary_mapping[category_index]
-                                return {
-                                    "title": chunk_title,
-                                    "category_index": category_index,
-                                    "category_code": mapped.get("code", ""),
-                                    "category_name": mapped.get("name", ""),
-                                    "raw_response": content,
-                                }
-                            else:
-                                # 编号为0或未找到映射,标记为非标准项
-                                return {
-                                    "title": chunk_title,
-                                    "category_index": category_index,
-                                    "category_code": "non_standard",
-                                    "category_name": "非标准项",
-                                    "raw_response": content,
-                                }
-                        else:
-                            logger.warning(f"[二级分类] JSON解析失败或缺少category_index: {chunk_title}, 尝试: {attempt + 1}/{max_retries}")
-                            if attempt == max_retries - 1:
-                                # 最后一次尝试失败,使用默认值
-                                return {
-                                    "title": chunk_title,
-                                    "category_index": 0,
-                                    "category_code": "non_standard",
-                                    "category_name": "非标准项",
-                                    "raw_response": content,
-                                    "error": "JSON解析失败",
-                                }
-                    except Exception as e:
-                        logger.error(f"[二级分类] LLM调用失败: {chunk_title}, 错误: {e}, 尝试: {attempt + 1}/{max_retries}")
-                        if attempt == max_retries - 1:
-                            return {
-                                "title": chunk_title,
-                                "category_index": 0,
-                                "category_code": "non_standard",
-                                "category_name": "非标准项",
-                                "error": str(e),
-                            }
-
-            # 不会到达这里,但保留以防万一
-            return {
-                "title": chunk_title,
-                "category_index": 0,
-                "category_code": "non_standard",
-                "category_name": "非标准项",
-                "error": "未知错误",
-            }
-
-        # 并发执行所有二级标题的分类
-        tasks = [classify_single_title(title) for title in level2_titles]
-        results = await asyncio.gather(*tasks)
-
-        return {
-            "first_category": first_category,
-            "first_category_code": first_category_code,
-            "level2_count": len(level2_titles),
-            "classifications": results,
-        }

+ 6 - 4
utils_test/Grammar_Check_Test/grammar_check_server.py

@@ -34,7 +34,7 @@ def run_async(coro):
         return asyncio.run(coro)
 
 
-async def do_grammar_check(review_content: str) -> dict:
+async def do_grammar_check(review_content: str, enable_thinking: bool = False) -> dict:
     """
     执行词句语法审查
     直接调用 GrammarCheckReviewer.check_grammar
@@ -42,7 +42,7 @@ async def do_grammar_check(review_content: str) -> dict:
     trace_id = f"grammar_check_web_{int(time.time() * 1000)}"
     reviewer = GrammarCheckReviewer()
 
-    logger.info(f"[词句语法Web测试] trace_id={trace_id}, content_length={len(review_content)}")
+    logger.info(f"[词句语法Web测试] trace_id={trace_id}, content_length={len(review_content)}, enable_thinking={enable_thinking}")
 
     start = time.time()
     result = await reviewer.check_grammar(
@@ -50,6 +50,7 @@ async def do_grammar_check(review_content: str) -> dict:
         review_content=review_content,
         state=None,
         stage_name=None,
+        enable_thinking=enable_thinking,
     )
     wall_time = time.time() - start
 
@@ -88,13 +89,14 @@ class GrammarCheckHandler(SimpleHTTPRequestHandler):
             try:
                 body = json.loads(post_data.decode('utf-8'))
                 review_content = body.get('content', '')
+                enable_thinking = body.get('enable_thinking', False)
 
                 if not review_content:
                     self.send_json_response({"error": "请提供 content 参数"}, 400)
                     return
 
-                print(f"\n[词句语法Web测试] 收到请求, content_length={len(review_content)}")
-                result = run_async(do_grammar_check(review_content))
+                print(f"\n[词句语法Web测试] 收到请求, content_length={len(review_content)}, enable_thinking={enable_thinking}")
+                result = run_async(do_grammar_check(review_content, enable_thinking))
                 print(f"[词句语法Web测试] 完成, success={result['success']}, wall_time={result['wall_time']}s")
 
                 self.send_json_response(result)

+ 7 - 2
utils_test/Grammar_Check_Test/grammar_check_test.html

@@ -133,10 +133,14 @@
         <div class="panel">
             <h2>输入文本</h2>
             <textarea id="content" placeholder="输入要审查的施工方案文本..."></textarea>
-            <div style="margin-top:12px;">
+            <div style="margin-top:12px; display:flex; align-items:center; gap:16px;">
                 <button class="btn" id="submitBtn" onclick="runCheck()">
                     执行审查
                 </button>
+                <label style="display:flex; align-items:center; gap:6px; cursor:pointer; font-size:14px;">
+                    <input type="checkbox" id="enableThinking" style="width:16px; height:16px; cursor:pointer;">
+                    启用思考模式 (enable_thinking)
+                </label>
             </div>
         </div>
 
@@ -167,10 +171,11 @@
             const start = Date.now();
             let result;
             try {
+                const enableThinking = document.getElementById('enableThinking').checked;
                 const res = await fetch(`${API_BASE}/api/grammar_check`, {
                     method: 'POST',
                     headers: { 'Content-Type': 'application/json' },
-                    body: JSON.stringify({ content })
+                    body: JSON.stringify({ content, enable_thinking: enableThinking })
                 });
                 result = await res.json();
             } catch (e) {