| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding: utf-8 -*-
- """HTTP API for document chat."""
- import json
- import time
- import uuid
- from typing import AsyncGenerator
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- from foundation.observability.logger.loggering import write_logger as logger
- from core.document_chat.schemas import DocumentChatRequest, DocumentChatResponse, model_to_dict
- document_chat_router = APIRouter(prefix="/sgbx", tags=["文档编辑AI对话"])
- def format_sse_event(event_type: str, data: dict) -> str:
- return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
- def get_document_chat_workflow():
- from core.document_chat.workflows.document_chat_workflow import document_chat_workflow
- return document_chat_workflow
- @document_chat_router.post("/document_chat")
- @auto_trace(generate_if_missing=True)
- async def document_chat(request: DocumentChatRequest, stream: bool = Query(False)):
- callback_task_id = f"doc_chat_{uuid.uuid4().hex[:12]}"
- TraceContext.set_trace_id(callback_task_id)
- if stream or request.response_mode == "sse":
- return StreamingResponse(
- _generate_document_chat_events(callback_task_id, request),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "X-Accel-Buffering": "no",
- },
- )
- try:
- workflow = get_document_chat_workflow()
- state = await workflow.run(request, callback_task_id)
- data = workflow.to_response_data(state)
- code = 500 if data.response_type == "error" else 200
- message = data.error_message if data.response_type == "error" else "success"
- return DocumentChatResponse(code=code, message=message or "success", data=data)
- except Exception as exc:
- logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
- raise HTTPException(status_code=500, detail=str(exc))
- async def _generate_document_chat_events(
- callback_task_id: str,
- request: DocumentChatRequest,
- ) -> AsyncGenerator[str, None]:
- started_at = time.time()
- try:
- yield format_sse_event(
- "connected",
- {
- "callback_task_id": callback_task_id,
- "status": "connected",
- "timestamp": int(time.time()),
- },
- )
- yield format_sse_event(
- "processing",
- {
- "callback_task_id": callback_task_id,
- "stage_name": "workflow_started",
- "status": "processing",
- "message": "文档 AI 对话工作流已启动",
- },
- )
- workflow = get_document_chat_workflow()
- state = await workflow.run(request, callback_task_id)
- data = workflow.to_response_data(state)
- data_dict = model_to_dict(data)
- if data.intent_result:
- yield format_sse_event(
- "intent",
- {
- "callback_task_id": callback_task_id,
- "intent_result": data.intent_result,
- },
- )
- if data.response_type in ("answer", "proposal"):
- yield format_sse_event(
- "skill_started",
- {
- "callback_task_id": callback_task_id,
- "skill_name": data.intent_result.get("skill_name") if data.intent_result else "",
- "response_type": data.response_type,
- },
- )
- if data.response_type == "answer" and data.answer:
- yield format_sse_event(
- "chunk",
- {
- "callback_task_id": callback_task_id,
- "chunk": data.answer,
- },
- )
- yield format_sse_event("answer_completed", data_dict)
- elif data.response_type == "proposal":
- if data.proposed_content:
- yield format_sse_event(
- "chunk",
- {
- "callback_task_id": callback_task_id,
- "chunk": data.proposed_content,
- },
- )
- yield format_sse_event("proposal_completed", data_dict)
- elif data.response_type in ("clarify", "unsupported"):
- yield format_sse_event("answer_completed", data_dict)
- else:
- yield format_sse_event("error", data_dict)
- yield format_sse_event(
- "completed",
- {
- "callback_task_id": callback_task_id,
- "status": state.get("overall_task_status", "completed"),
- "duration": round(time.time() - started_at, 3),
- },
- )
- except Exception as exc:
- logger.error(f"[DocumentChat] SSE request failed: {exc}", exc_info=True)
- yield format_sse_event(
- "error",
- {
- "callback_task_id": callback_task_id,
- "status": "error",
- "message": str(exc),
- },
- )
- @document_chat_router.get("/document_chat/health")
- async def document_chat_health():
- return {
- "status": "healthy",
- "module": "document_chat",
- "workflow": "langgraph",
- "skills": ["document-answer", "document-modify"],
- }
|