Преглед изворни кода

v0.0.2-旧方案-文档上传-审查结果推送-审查结果获取流程归档点,文件上传接口自动触发审查任务

WangXuMing пре 3 месеци
родитељ
комит
ed3257eabb

+ 0 - 0
core/construction_review/doc_worker/__init__.py → core/base/doc_worker/__init__.py


+ 0 - 0
core/construction_review/doc_worker/config.yaml → core/base/doc_worker/config.yaml


+ 0 - 0
core/construction_review/doc_worker/config_loader.py → core/base/doc_worker/config_loader.py


+ 0 - 0
core/construction_review/doc_worker/core.py → core/base/doc_worker/core.py


+ 0 - 0
core/construction_review/doc_worker/llm_classifier.py → core/base/doc_worker/llm_classifier.py


+ 0 - 0
core/construction_review/doc_worker/result_saver.py → core/base/doc_worker/result_saver.py


+ 0 - 0
core/construction_review/doc_worker/text_splitter.py → core/base/doc_worker/text_splitter.py


+ 0 - 0
core/construction_review/doc_worker/toc_extractor.py → core/base/doc_worker/toc_extractor.py


+ 30 - 16
core/construction_review/component/ai_review_engine.py

@@ -62,7 +62,8 @@ class AIReviewEngine(BaseReviewer):
 
 
     
-    async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+    async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
+                                   stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """基础合规性检查"""
         review_content = unit_content['content']
         review_references = unit_content.get('review_references')
@@ -71,7 +72,7 @@ class AIReviewEngine(BaseReviewer):
 
         async def check_with_semaphore(check_func, *args):
             async with self.semaphore:
-                return await check_func(*args)
+                return await check_func(*args, stage_name=stage_name, state=state, current_progress=current_progress)
 
         basic_tasks = [
             check_with_semaphore(self.check_grammar, trace_id_idx, review_content, review_references),
@@ -98,14 +99,15 @@ class AIReviewEngine(BaseReviewer):
             'overall_score': self._calculate_basic_score(grammar_result, semantic_result, completeness_result)
         }
 
-    async def technical_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+    async def technical_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any],
+                                      stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """技术性合规检查"""
         review_content = unit_content['content']
         review_references = unit_content.get('review_references')
         logger.info(f"开始技术性合规检查,内容长度: {len(review_content)}")
         async def check_with_semaphore(check_func, *args):
             async with self.semaphore:
-                return await check_func(*args)
+                return await check_func(*args, stage_name=stage_name, state=state, current_progress=current_progress)
 
         technical_tasks = [
             check_with_semaphore(self.check_mandatory_standards, trace_id_idx, review_content,review_references),
@@ -149,47 +151,59 @@ class AIReviewEngine(BaseReviewer):
         }
 
 
-    async def check_grammar(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_grammar(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                          stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """语法检查"""
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['sensitive']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("语法检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("语法检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
-    async def check_semantic_logic(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_semantic_logic(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                                 stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """语义逻辑检查"""
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['semantic']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("语义逻辑检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("语义逻辑检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
-    async def check_completeness(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_completeness(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                               stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """完整性检查"""
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['completeness']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("完整性检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("完整性检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
-    async def check_mandatory_standards(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_mandatory_standards(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                                        stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """强制性标准检查"""
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['mandatory']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("强制性标准检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("强制性标准检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
-    async def check_design_values(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_design_values(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                                  stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """设计值检查"""
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['design']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("设计值检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("设计值检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
-    async def check_technical_parameters(self, trace_id_idx: str, review_content: str = None, review_references: str = None) -> Dict[str, Any]:
+    async def check_technical_parameters(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
+                                         stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
         """技术参数检查"""
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['technical']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("技术参数检查", trace_id, reviewer_type, prompt_name, review_content,review_references)
+        return await self.review("技术参数检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
+                               stage_name, state, current_progress)
 
     # RAG检索增强
     async def vector_search(self, content: str) -> List[Dict[str, Any]]:

+ 4 - 4
core/construction_review/component/document_processor.py

@@ -15,11 +15,11 @@ from foundation.logger.loggering import server_logger as logger
 
 # 引入doc_worker核心组件
 try:
-    from ..doc_worker import TOCExtractor, TextSplitter, LLMClassifier
-    from ..doc_worker.config_loader import get_config
+    from base.doc_worker import TOCExtractor, TextSplitter, LLMClassifier
+    from base.doc_worker.config_loader import get_config
 except ImportError:
-    from core.construction_review.doc_worker import TOCExtractor, TextSplitter, LLMClassifier
-    from core.construction_review.doc_worker.config_loader import get_config
+    from core.base.doc_worker import TOCExtractor, TextSplitter, LLMClassifier
+    from core.base.doc_worker.config_loader import get_config
 
 class DocumentProcessor:
     """文档处理器"""

+ 19 - 2
core/construction_review/component/reviewers/base_reviewer.py

@@ -4,6 +4,7 @@
 """
 
 
+import asyncio
 import uuid
 import time
 from abc import ABC
@@ -33,7 +34,8 @@ class BaseReviewer(ABC):
         self.prompt_loader = prompt_loader
     
     #@obverse
-    async def review(self, name: str, trace_id: str, reviewer_type: str, prompt_name: str, review_content: str,review_references: str = None) -> ReviewResult:
+    async def review(self, name: str, trace_id: str, reviewer_type: str, prompt_name: str, review_content: str, review_references: str = None,
+                    stage_name: str = None, state: dict = None, current_progress: int = None) -> ReviewResult:
         """
         执行审查
 
@@ -47,8 +49,10 @@ class BaseReviewer(ABC):
                 - rag: rag_enhanced_review, vector_search_review, hybrid_search_review
                 - ai: professional_suggestion, standardization_suggestion, completeness_suggestion, readability_suggestion
             review_content: 待审查内容 (必填)
-
             review_references: 审查参考内容 (可选)
+            stage_name: 阶段名称 (可选,用于进度更新)
+            state: 状态字典 (可选,用于进度更新)
+            current_progress: 当前进度 (可选,用于进度更新)
 
         Returns:
             ReviewResult: 审查结果
@@ -56,6 +60,19 @@ class BaseReviewer(ABC):
         start_time = time.time()
         name = prompt_name
         try:
+            # 添加进度更新
+            progress_message = f"{name}_{prompt_name}"
+            # 安全检查:确保所有必要参数都存在才执行进度更新
+            if state and state.get("progress_manager") and stage_name and current_progress is not None:
+                asyncio.create_task(
+                    state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name=stage_name,
+                        current=current_progress,
+                        status="processing",
+                        message=progress_message
+                    )
+                )
             logger.info(f"开始执行 {name} 审查,trace_id: {trace_id},内容长度: {len(review_content)}")
             prompt_kwargs = {}
             prompt_kwargs["content"] = review_content

+ 23 - 3
core/construction_review/workflows/ai_review_workflow.py

@@ -249,13 +249,33 @@ class AIReviewWorkflow:
             # 基本审查单元
             async def review_single_unit(unit_content: Dict[str, Any], unit_index: int,callback_task_id) -> ReviewResult:
                 """使用LangGraph编排的原子化组件方法审查单个单元"""
-                try:    
+                try:
                         # 构建Trace ID
                         trace_id_idx = "("+str(callback_task_id)+'-'+str(unit_index)+")"
+
+                        # 获取section_label用于stage_name
+                        section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
+                        stage_name = f"AI审查:{section_label}"
+
+                        # 方法内部进度计算(基于当前处理的单元)
+                        current_progress = int((unit_index / total_units) * 100)
+                        progress_message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+
+                        if state["progress_manager"]:
+                            asyncio.create_task(
+                                state["progress_manager"].update_stage_progress(
+                                    callback_task_id=state["callback_task_id"],
+                                    stage_name=stage_name,
+                                    current=current_progress,
+                                    status="processing",
+                                    message=progress_message
+                                )
+                            )
+                        
                         # 并发执行各种原子化审查方法
                         review_tasks = [
-                            self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content),
-                            self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content),
+                            self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
+                            self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
                             # self.ai_review_engine.rag_enhanced_check(unit_content, trace_id_idx)
                         ]
 

Разлика између датотеке није приказан због своје велике величине
+ 1 - 1
temp/AI审查结果.json


+ 179 - 0
test/test_sse_integration.py

@@ -0,0 +1,179 @@
+#!/usr/bin/env python3
+"""
+SSE进度推送集成测试脚本
+模拟文件上传后建立SSE连接的完整流程
+"""
+
+import requests
+import json
+import time
+import threading
+import os
+import signal
+import sys
+from typing import Dict, Any
+
+# 全局标志用于控制程序退出
+should_exit = False
+
+def signal_handler(signum, frame):
+    """信号处理器,处理Ctrl+C"""
+    global should_exit
+    print(f"\n\n⚡ 接收到信号 {signum},正在退出...")
+    should_exit = True
+    sys.exit(0)
+
+# 注册信号处理器
+signal.signal(signal.SIGINT, signal_handler)
+signal.signal(signal.SIGTERM, signal_handler)
+
+class ProgressTracker:
+    def __init__(self):
+        self.progress_events = []
+        self.completed = False
+        self.error = None
+
+    def handle_sse_event(self, event_data: Dict[str, Any]):
+        """处理SSE事件"""
+        self.progress_events.append(event_data)
+
+        event_type = event_data.get("type", "unknown")
+        data = event_data.get("data", {})
+
+        print(f"\n📡 SSE事件 [{event_type}]:")
+        print(f"   时间: {data.get('timestamp', 'N/A')}")
+        print(f"   进度: {data.get('current', 0)}%")
+        print(f"   阶段: {data.get('stage_name', 'N/A')}")
+        print(f"   状态: {data.get('status', 'N/A')}")
+        print(f"   消息: {data.get('message', 'N/A')}")
+
+        if event_type == "completed":
+            self.completed = True
+            print(f"\n✅ 任务完成: {data.get('task_status', 'N/A')}")
+        elif event_type == "error":
+            self.error = data.get("message", "未知错误")
+            print(f"\n❌ 任务错误: {self.error}")
+
+def test_file_upload():
+    """测试文件上传"""
+    upload_url = "http://127.0.0.1:8035/sgsc/file_upload"
+    file_path = "D:/wx_work/sichuan_luqiao/项目背景资料/路桥桥梁工程施工方案 7 份/罗成依达大桥拱座竖桩专项施工方案.pdf"
+
+    try:
+        with open(file_path, 'rb') as f:
+            files = {
+                'file': f
+            }
+            data = {
+                'callback_url': 'https://client.example.com/callback?task_id=ocr-12345',
+                'project_plan_type': 'bridge_up_part',
+                'user': 'user-001'
+            }
+
+            print("📤 开始上传文件...")
+            response = requests.post(upload_url, files=files, data=data)
+            response.raise_for_status()
+
+            result = response.json()
+            print(f"✅ 文件上传成功")
+            print(f"   文件ID: {result['data']['id']}")
+            print(f"   文件名: {result['data']['name']}")
+            print(f"   回调任务ID: {result['data']['callback_task_id']}")
+
+            return result['data']['callback_task_id']
+
+    except Exception as e:
+        print(f"❌ 文件上传失败: {e}")
+        return None
+
+def test_sse_connection(callback_task_id: str, tracker: ProgressTracker):
+    """测试SSE连接"""
+    sse_url = f"http://127.0.0.1:8035/sgsc/sse/progress/{callback_task_id}?user=user-001"
+
+    try:
+        print(f"🔗 建立SSE连接: {sse_url}")
+        print("=" * 60)
+        print("📡 SSE原始响应:")
+        print("-" * 60)
+
+        # 建立SSE连接
+        response = requests.get(sse_url, stream=True)
+        response.raise_for_status()
+
+        for line in response.iter_lines():
+            if should_exit:
+                print("\n👋 用户请求退出,停止监听SSE")
+                break
+
+            if line:
+                # 直接打印原始响应
+                line_str = line.decode('utf-8')
+                print(f"原始响应: {line_str}")
+
+                # 解析SSE事件格式(可选)
+                if line_str.startswith('data: '):
+                    try:
+                        event_data = json.loads(line_str[6:])
+                        tracker.handle_sse_event({
+                            "type": "data",
+                            "data": event_data
+                        })
+
+                        # 如果任务完成,退出监听
+                        if tracker.completed:
+                            print("-" * 60)
+                            print("📡 任务完成,结束监听")
+                            break
+
+                    except json.JSONDecodeError:
+                        # JSON解析失败也继续监听
+                        pass
+
+    except Exception as e:
+        print(f"❌ SSE连接失败: {e}")
+
+def main():
+    """主测试流程"""
+    print("🚀 开始SSE进度推送集成测试")
+    print("💡 提示: 按 Ctrl+C 可以随时退出")
+    print("=" * 60)
+
+    # 第一步:上传文件
+    callback_task_id = test_file_upload()
+    if not callback_task_id or should_exit:
+        print("❌ 文件上传失败,测试终止")
+        return
+
+    print(f"\n⏳ 等待2秒后建立SSE连接...")
+    for i in range(2, 0, -1):
+        if should_exit:
+            print("\n👋 用户请求退出,测试终止")
+            return
+        print(f"   {i}秒...")
+        time.sleep(1)
+
+    # 第二步:建立SSE连接监听进度
+    tracker = ProgressTracker()
+
+    # 在主线程中运行SSE监听
+    test_sse_connection(callback_task_id, tracker)
+
+    # 输出测试结果
+    if not should_exit:
+        print("\n" + "=" * 60)
+        print("📊 测试结果汇总:")
+        print(f"   收到事件数量: {len(tracker.progress_events)}")
+        print(f"   任务是否完成: {'✅' if tracker.completed else '❌'}")
+
+        if tracker.error:
+            print(f"   错误信息: {tracker.error}")
+
+        if tracker.completed:
+            print("🎉 SSE实时推送测试成功!")
+        else:
+            print("⚠️ 任务可能未在预期时间内完成")
+
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 2
views/construction_review/file_upload.py

@@ -160,8 +160,8 @@ async def file_upload(
 
 
         # 生成回调任务ID
-        callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
-        #callback_task_id = "d0856b13c5328e732e9c590209554b76-1763369817"            
+        #callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
+        callback_task_id = "d0856b13c5328e732e9c590209554b76-1763369845"            
 
         # 更新trace_id为正式的callback_task_id
         TraceContext.set_trace_id(callback_task_id)

+ 1 - 1
views/construction_review/task_progress.py

@@ -81,7 +81,7 @@ def format_sse_event(event_type: str, data: str) -> str:
     return "\n".join(lines) + "\n" 
 
 
-@task_progress_router.get("/sse/current/{callback_task_id}")
+@task_progress_router.get("/sse/progress/{callback_task_id}")
 @auto_trace("callback_task_id")
 async def sse_progress_stream(
     callback_task_id: str,

Неке датотеке нису приказане због велике количине промена