Browse Source

Merge branch 'dev-planWrite' of CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

tangle 1 month ago
parent
commit
72da16d45a

+ 55 - 1
core/base/progress_manager.py

@@ -1,7 +1,7 @@
 import json
 import time
 import asyncio
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
 from datetime import datetime
 
 from foundation.observability.logger.loggering import review_logger as logger
@@ -46,6 +46,9 @@ class ProgressManager:
     async def _get_redis_key(self, callback_task_id: str) -> str:
         return f"current:{callback_task_id}"
 
+    async def _get_stream_event_key(self, callback_task_id: str) -> str:
+        return f"stream_events:{callback_task_id}"
+
     async def initialize_progress(self, callback_task_id: str, user_id: str, stages: list):
         try:
             current_data = {
@@ -66,23 +69,74 @@ class ProgressManager:
                         3600,
                         json.dumps(current_data)
                     )
+                    stream_event_key = await self._get_stream_event_key(callback_task_id)
+                    self.redis_client.delete(stream_event_key)
                     logger.info(f"初始化任务进度列表")
                 except Exception as redis_e:
                     logger.warning(f"初始化进度到Redis失败: {callback_task_id}, {redis_e}")
                     if not hasattr(self, 'current_data'):
                         self.current_data = {}
+                    if not hasattr(self, 'stream_events'):
+                        self.stream_events = {}
                     self.current_data[callback_task_id] = current_data
+                    self.stream_events[callback_task_id] = []
                     logger.info(f"降级使用内存存储: {callback_task_id}")
             else:
                 if not hasattr(self, 'current_data'):
                     self.current_data = {}
+                if not hasattr(self, 'stream_events'):
+                    self.stream_events = {}
                 self.current_data[callback_task_id] = current_data
+                self.stream_events[callback_task_id] = []
                 logger.info(f"初始化任务进度到内存: {callback_task_id}")
 
         except Exception as e:
             logger.error(f"初始化进度失败: {str(e)}")
             raise
 
+    async def append_stream_event(self, callback_task_id: str, event_data: Dict[str, Any]):
+        """追加流式事件,供 SSE 轮询端按事件顺序吐给前端。"""
+        try:
+            if self.redis_connected:
+                stream_event_key = await self._get_stream_event_key(callback_task_id)
+                self.redis_client.rpush(
+                    stream_event_key,
+                    json.dumps(event_data, ensure_ascii=False)
+                )
+                self.redis_client.expire(stream_event_key, 3600)
+            else:
+                if not hasattr(self, 'stream_events'):
+                    self.stream_events = {}
+                self.stream_events.setdefault(callback_task_id, []).append(event_data)
+        except Exception as e:
+            logger.error(f"追加流式事件失败: {callback_task_id}, {str(e)}")
+            raise
+
+    async def pop_stream_events(self, callback_task_id: str, max_events: int = 100) -> List[Dict[str, Any]]:
+        """按 FIFO 顺序取出待发送的流式事件。"""
+        events: List[Dict[str, Any]] = []
+        try:
+            if self.redis_connected:
+                stream_event_key = await self._get_stream_event_key(callback_task_id)
+                for _ in range(max_events):
+                    event_json = self.redis_client.lpop(stream_event_key)
+                    if not event_json:
+                        break
+                    try:
+                        events.append(json.loads(event_json))
+                    except Exception as parse_e:
+                        logger.warning(f"解析流式事件失败: {callback_task_id}, {parse_e}")
+            else:
+                if not hasattr(self, 'stream_events'):
+                    self.stream_events = {}
+                queued_events = self.stream_events.get(callback_task_id, [])
+                events = queued_events[:max_events]
+                self.stream_events[callback_task_id] = queued_events[max_events:]
+        except Exception as e:
+            logger.error(f"获取流式事件失败: {callback_task_id}, {str(e)}")
+
+        return events
+
 
 
 

+ 157 - 26
core/construction_write/component/outline_generator.py

@@ -14,6 +14,7 @@
 import asyncio
 import copy
 import json
+import time
 from typing import Dict, Any, List, Optional
 from langchain_core.messages import AIMessage, HumanMessage
 from langchain_core.prompts import ChatPromptTemplate
@@ -132,7 +133,7 @@ class OutlineGenerator:
             # 更新进度
             await self._update_progress(
                 state,
-                current=30,
+                current=5,
                 stage_name="大纲生成",
                 message="正在生成大纲结构..."
             )
@@ -690,8 +691,8 @@ class OutlineGenerator:
                     reference_text = f"\n\n=== 参考内容 ===\n{reference_text}\n\n"
 
             # 构建完整的提示词模板
-            system_prompt = "你是一个专业的施工方案编写助手,请根据要求生成高质量的内容。"
-            user_prompt = f"{prompt}{reference_text}\n\n请生成内容:"
+            system_prompt = "你是一个严谨的施工方案文本校订助手,只能在用户提供的模板文本范围内做受限修改,禁止新增标题、段落和事实内容。"
+            user_prompt = f"{prompt}{reference_text}"
 
             # 创建 ChatPromptTemplate
             chat_template = ChatPromptTemplate.from_messages([
@@ -763,7 +764,7 @@ class OutlineGenerator:
         title = chapter_node.get("title", "")
         code = chapter_node.get("code", "")
         children = chapter_node.get("children", [])
-
+        template_content = chapter_node.get("template_content", "无")
         logger.info(f"[章节生成] trace_id: {trace_id}, 开始生成 {current_index} {title}")
 
         # 检查终止信号 - 在章节生成开始前
@@ -771,14 +772,33 @@ class OutlineGenerator:
             logger.warning(f"[章节生成] trace_id: {trace_id}, 检测到终止信号,停止生成 {current_index} {title}")
             raise asyncio.CancelledError(f"任务被终止: {current_index} {title}")
 
-        # 1. 生成当前章节的 content
-        chapter_content = await self._generate_chapter_content(
+        current_completed = 0
+        if completed_counter:
+            completed_count_list, completed_lock = completed_counter
+            async with completed_lock:
+                current_completed = completed_count_list[0]
+        await self._send_chapter_processing_progress(
             trace_id=trace_id,
-            project_info=project_info,
+            chapter_index=current_index,
             chapter_title=title,
-            chapter_code=code,
-            level=level
+            user_id=project_info.get("user_id"),
+            completed_count=current_completed,
+            total_chapters=total_leaf_chapters
         )
+        
+        # 1. 生成当前章节的 content,有模版内容ai生成,没有模版内容则直接返回空字符串
+        chapter_content = ''
+        if template_content == '' or template_content == '无':
+            chapter_content = ''
+        else:
+            chapter_content = await self._generate_chapter_content(
+                trace_id=trace_id,
+                project_info=project_info,
+                chapter_title=title,
+                chapter_code=code,
+                level=level,
+                template_content=template_content
+            )
 
         # 检查终止信号 - 在 LLM 调用后
         if await self._check_terminate_by_trace_id(trace_id):
@@ -855,6 +875,8 @@ class OutlineGenerator:
                     trace_id=trace_id,
                     chapter_index=current_index,
                     chapter_title=title,
+                    chapter_code=code,
+                    chapter_content=chapter_content,
                     user_id=project_info.get("user_id"),
                     completed_count=current_completed,
                     total_chapters=total_leaf_chapters
@@ -864,7 +886,68 @@ class OutlineGenerator:
 
         return result_node
 
-    async def _send_chapter_progress(self, trace_id: str, chapter_index: str, chapter_title: str, user_id: str = None, completed_count: int = 0, total_chapters: int = 1):
+    async def _send_chapter_processing_progress(
+        self,
+        trace_id: str,
+        chapter_index: str,
+        chapter_title: str,
+        user_id: str = None,
+        completed_count: int = 0,
+        total_chapters: int = 1
+    ):
+        """
+        发送章节正在生成的进度提示,用于让前端展示当前执行中的章节。
+        """
+        try:
+            callback_task_id = self._extract_callback_task_id(trace_id)
+            progress_manager = ProgressManagerRegistry.get_progress_manager(callback_task_id)
+
+            if progress_manager:
+                current_progress = int((completed_count / max(total_chapters, 1)) * 100)
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    user_id=user_id,
+                    stage_name="大纲生成",
+                    status="processing",
+                    message=f"正在生成章节 [{chapter_index}] {chapter_title}...",
+                    current=current_progress
+                )
+                await progress_manager.append_stream_event(
+                    callback_task_id=callback_task_id,
+                    event_data={
+                        "callback_task_id": callback_task_id,
+                        "user_id": user_id,
+                        "current": current_progress,
+                        "stage_name": "大纲生成",
+                        "status": "processing",
+                        "message": f"正在生成章节 [{chapter_index}] {chapter_title}...",
+                        "overall_task_status": "processing",
+                        "updated_at": int(time.time()),
+                        "event_type": "processing",
+                        "chapter_event": "processing",
+                        "current_chapter": {
+                            "index": chapter_index,
+                            "title": chapter_title
+                        }
+                    }
+                )
+                logger.info(f"[章节进度] task_id={trace_id}, 正在生成章节 [{chapter_index}] {chapter_title}, 进度={current_progress}%")
+            else:
+                logger.warning(f"[章节进度] 未找到 ProgressManager: callback_task_id={callback_task_id}")
+        except Exception as e:
+            logger.warning(f"[章节进度] 推送章节生成中进度失败: {str(e)}")
+
+    async def _send_chapter_progress(
+        self,
+        trace_id: str,
+        chapter_index: str,
+        chapter_title: str,
+        chapter_code: str = "",
+        chapter_content: str = "",
+        user_id: str = None,
+        completed_count: int = 0,
+        total_chapters: int = 1
+    ):
         """
         发送章节生成进度
 
@@ -872,6 +955,8 @@ class OutlineGenerator:
             trace_id: 追踪ID(格式: callback_task_id_索引)
             chapter_index: 章节编号
             chapter_title: 章节标题
+            chapter_code: 章节代码
+            chapter_content: 章节生成内容
             user_id: 用户ID
             completed_count: 已完成章节数
             total_chapters: 总章节数
@@ -892,9 +977,30 @@ class OutlineGenerator:
                     user_id=user_id,
                     stage_name="大纲生成",
                     status="processing",
-                    message=f"正在生成章节 [{chapter_index}] {chapter_title}...",
+                    message=f"章节生成成功 [{chapter_index}] {chapter_title}",
                     current=current_progress
                 )
+                await progress_manager.append_stream_event(
+                    callback_task_id=callback_task_id,
+                    event_data={
+                        "callback_task_id": callback_task_id,
+                        "user_id": user_id,
+                        "current": current_progress,
+                        "stage_name": "大纲生成",
+                        "status": "processing",
+                        "message": f"章节生成成功 [{chapter_index}] {chapter_title}",
+                        "overall_task_status": "processing",
+                        "updated_at": int(time.time()),
+                        "event_type": "processing",
+                        "chapter_event": "completed",
+                        "chapter": {
+                            "index": chapter_index,
+                            "title": chapter_title,
+                            "code": chapter_code,
+                            "generated_content": chapter_content
+                        }
+                    }
+                )
                 logger.info(f"[章节进度] task_id={trace_id}, 章节 [{chapter_index}] {chapter_title} 生成完成, 进度={current_progress}%")
             else:
                 logger.warning(f"[章节进度] 未找到 ProgressManager: callback_task_id={callback_task_id}")
@@ -956,7 +1062,8 @@ class OutlineGenerator:
         project_info: Dict[str, Any],
         chapter_title: str,
         chapter_code: str,
-        level: int
+        level: int,
+        template_content: str = ""
     ) -> str:
         """
         生成章节内容
@@ -987,20 +1094,33 @@ class OutlineGenerator:
             writing_requirements = ""
             logger.warning(f"[章节生成] trace_id: {trace_id}, code={chapter_code}, 未找到 keyword_rule")
 
-        # 构建提示词
-        prompt = f"""请为施工方案生成【{chapter_title}】章节的内容。
-
+        prompt = f"""请对【{chapter_title}】章节的模板文本进行受限校订。
+【项目信息】
 项目名称:{project_name}
-章节编号:{chapter_code}
 章节层级:第{level}级
-
-{writing_requirements}通用要求:
-1. 内容专业、准确,符合公路桥梁施工规范
-2. 语言简洁明了,条理清晰
-3. 根据实际情况生成合理的施工方案内容
-4. 字数控制在200-500字之间
-
-请直接输出章节内容,不要包含标题。"""
+施工单位: {project_info.get("construction_unit", "xx施工单位")}
+监理单位: {project_info.get("supervision_unit", "xx监理单位")}
+
+【任务边界】
+你不是在创作新章节,而是在校订下面的【参考模板】。
+只能输出参考模板中已经存在的内容,并在原句范围内做轻微润色。
+
+【硬性禁止】
+1. 禁止新增任何标题、编号、段落、条目、表格、图片说明或结尾语。
+2. 禁止扩写施工方法、技术措施、工程概况、注意事项等模板中没有的事实内容。
+3. 禁止为了“完整”“专业”“丰富”而补充示例、解释、背景或过渡句。
+4. 禁止输出“以下是”“根据模板”“已润色”等说明性文字。
+5. 禁止修改表格列,行结构和表格内容
+【允许修改】
+1. 仅可对已有句子做轻微语病修正、标点修正、措辞顺滑和专业化表达。
+2. 可将模板中的“xx”等占位符替换为项目信息中明确给出的内容;没有明确值时保留原文。
+3. 表格、编号、标题层级、段落数量、段落顺序必须与参考模板一致。
+4. 无法判断是否应该修改时,必须保留参考模板原文。
+
+【参考模板】
+{template_content}
+
+请只输出校订后的参考模板原文,不要新增任何内容。"""
 
         try:
             content = await self._call_llm(
@@ -1008,12 +1128,23 @@ class OutlineGenerator:
                 prompt=prompt,
                 timeout=60
             )
-            return content.strip()
+            return self._remove_table_markers(content).strip()
         except Exception as e:
             logger.error(f"[章节生成] trace_id: {trace_id}, 生成内容失败: {str(e)}")
             # 返回默认内容
             return f"{project_name}的{chapter_title}内容(自动生成)..."
 
+    def _remove_table_markers(self, content: str) -> str:
+        """移除模板中仅用于提示模型识别表格边界的标记。"""
+        if not content:
+            return content
+
+        return (
+            content
+            .replace("<表格开始>", "")
+            .replace("<表格结束>", "")
+        )
+
     async def _get_similar_fragments_for_node(
         self,
         trace_id: str,
@@ -1224,4 +1355,4 @@ class OutlineGenerator:
             if is_self_match or has_valid_children:
                 filtered_nodes.append(current_node)
 
-        return filtered_nodes
+        return filtered_nodes

+ 43 - 2
views/construction_write/outline_views.py

@@ -63,6 +63,8 @@ class BaseInfo(BaseModel):
     project_name: str = Field(..., description="方案名称", example="罗成依达大桥上部结构专项施工方案")
     construct_location: str = Field(..., description="建设地点", example="四川省凉山州")
     engineering_type: str = Field(..., description="方案模版类型", example="T型梁")
+    construction_unit: str = Field(..., description="施工单位")
+    supervision_unit: str = Field(..., description="监理单位")
 
 
 class ProjectInfo(BaseModel):
@@ -77,6 +79,8 @@ class TemplateStructureItem(BaseModel):
     level: int = Field(..., description="层级", ge=1, le=5)
     title: str = Field(..., description="章节标题", example="工程概况")
     code: str = Field(..., description="章节代码", example="overview")
+    template_content:str = Field("", description="模板内容")
+    is_enabled: bool = Field(True, description="是否启用该章节")
     # 使用 Union 支持递归类型
     children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
 
@@ -723,7 +727,10 @@ async def generating_outline(request: OutlineGenerationRequest):
                 "project_name": base_info.project_name,
                 "construct_location": base_info.construct_location,
                 "engineering_type": base_info.engineering_type,
-                "selectable": request.project_info.selectable or ""
+                "selectable": request.project_info.selectable or "",
+                "construction_unit": base_info.construction_unit or "",
+                "supervision_unit": base_info.supervision_unit or ""
+
             }
 
             sgbx_task_info = {
@@ -820,6 +827,16 @@ async def generating_outline(request: OutlineGenerationRequest):
                         yield format_sse_event("cancelled", cancelled_data)
                         return
 
+                    # 先推送章节内容流式事件,避免并发章节覆盖进度快照
+                    stream_events = await progress_manager.pop_stream_events(callback_task_id)
+                    for stream_data in stream_events:
+                        last_progress = stream_data.get("current", last_progress)
+                        last_event_type = stream_data.get("event_type", "processing")
+                        last_message = stream_data.get("message", "")
+                        last_progress_data = stream_data
+                        yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
+                        no_change_count = 0
+
                     # 从 Redis 获取最新进度
                     progress_data = await progress_manager.get_progress(callback_task_id)
 
@@ -868,6 +885,13 @@ async def generating_outline(request: OutlineGenerationRequest):
                         
                         # 检查任务是否完成
                         if status in ["completed", "failed", "terminated"]:
+                            stream_events = await progress_manager.pop_stream_events(callback_task_id)
+                            for stream_data in stream_events:
+                                last_progress = stream_data.get("current", last_progress)
+                                last_event_type = stream_data.get("event_type", "processing")
+                                last_message = stream_data.get("message", "")
+                                last_progress_data = stream_data
+                                yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
                             break
 
                     await asyncio.sleep(0.5)
@@ -1308,6 +1332,16 @@ async def regenerate_outline(request: RegenerateOutlineRequest):
                         yield format_sse_event("cancelled", cancelled_data)
                         return
 
+                    # 先推送章节内容流式事件,避免并发章节覆盖进度快照
+                    stream_events = await progress_manager.pop_stream_events(new_callback_task_id)
+                    for stream_data in stream_events:
+                        last_progress = stream_data.get("current", last_progress)
+                        last_event_type = stream_data.get("event_type", "processing")
+                        last_message = stream_data.get("message", "")
+                        last_progress_data = stream_data
+                        yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
+                        no_change_count = 0
+
                     # 从 Redis 获取最新进度
                     progress_data = await progress_manager.get_progress(new_callback_task_id)
 
@@ -1356,6 +1390,13 @@ async def regenerate_outline(request: RegenerateOutlineRequest):
 
                         # 检查任务是否完成
                         if status in ["completed", "failed", "terminated"]:
+                            stream_events = await progress_manager.pop_stream_events(new_callback_task_id)
+                            for stream_data in stream_events:
+                                last_progress = stream_data.get("current", last_progress)
+                                last_event_type = stream_data.get("event_type", "processing")
+                                last_message = stream_data.get("message", "")
+                                last_progress_data = stream_data
+                                yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
                             break
 
                     await asyncio.sleep(0.5)
@@ -1752,4 +1793,4 @@ async def active_tasks(
 
 
 # 将上下文生成路由注册到应用
-outline_router.include_router(context_generate_router)
+outline_router.include_router(context_generate_router)

+ 17 - 0
views/construction_write/regenerate_views.py

@@ -434,6 +434,16 @@ async def regenerate_outline(request: RegenerateOutlineRequest):
                         yield format_sse_event("cancelled", cancelled_data)
                         return
 
+                    # 先推送章节内容流式事件,避免并发章节覆盖进度快照
+                    stream_events = await progress_manager.pop_stream_events(new_callback_task_id)
+                    for stream_data in stream_events:
+                        last_progress = stream_data.get("current", last_progress)
+                        last_event_type = stream_data.get("event_type", "processing")
+                        last_message = stream_data.get("message", "")
+                        last_progress_data = stream_data
+                        yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
+                        no_change_count = 0
+
                     # 从 Redis 获取最新进度
                     progress_data = await progress_manager.get_progress(new_callback_task_id)
 
@@ -482,6 +492,13 @@ async def regenerate_outline(request: RegenerateOutlineRequest):
 
                         # 检查任务是否完成
                         if status in ["completed", "failed", "terminated"]:
+                            stream_events = await progress_manager.pop_stream_events(new_callback_task_id)
+                            for stream_data in stream_events:
+                                last_progress = stream_data.get("current", last_progress)
+                                last_event_type = stream_data.get("event_type", "processing")
+                                last_message = stream_data.get("message", "")
+                                last_progress_data = stream_data
+                                yield format_sse_event("processing", json.dumps(stream_data, ensure_ascii=False))
                             break
 
                     await asyncio.sleep(0.5)