views.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding: utf-8 -*-
  2. """HTTP API for document chat."""
  3. import json
  4. import time
  5. import uuid
  6. from typing import AsyncGenerator
  7. from fastapi import APIRouter, HTTPException, Query
  8. from fastapi.responses import StreamingResponse
  9. from foundation.infrastructure.tracing import TraceContext, auto_trace
  10. from foundation.observability.logger.loggering import write_logger as logger
  11. from core.document_chat.schemas import DocumentChatRequest, DocumentChatResponse, model_to_dict
  12. document_chat_router = APIRouter(prefix="/sgbx", tags=["文档编辑AI对话"])
  13. def format_sse_event(event_type: str, data: dict) -> str:
  14. return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
  15. def get_document_chat_workflow():
  16. from core.document_chat.workflows.document_chat_workflow import document_chat_workflow
  17. return document_chat_workflow
  18. @document_chat_router.post("/document_chat")
  19. @auto_trace(generate_if_missing=True)
  20. async def document_chat(request: DocumentChatRequest, stream: bool = Query(False)):
  21. callback_task_id = f"doc_chat_{uuid.uuid4().hex[:12]}"
  22. TraceContext.set_trace_id(callback_task_id)
  23. if stream or request.response_mode == "sse":
  24. return StreamingResponse(
  25. _generate_document_chat_events(callback_task_id, request),
  26. media_type="text/event-stream",
  27. headers={
  28. "Cache-Control": "no-cache",
  29. "Connection": "keep-alive",
  30. "X-Accel-Buffering": "no",
  31. },
  32. )
  33. try:
  34. workflow = get_document_chat_workflow()
  35. state = await workflow.run(request, callback_task_id)
  36. data = workflow.to_response_data(state)
  37. code = 500 if data.response_type == "error" else 200
  38. message = data.error_message if data.response_type == "error" else "success"
  39. return DocumentChatResponse(code=code, message=message or "success", data=data)
  40. except Exception as exc:
  41. logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
  42. raise HTTPException(status_code=500, detail=str(exc))
  43. async def _generate_document_chat_events(
  44. callback_task_id: str,
  45. request: DocumentChatRequest,
  46. ) -> AsyncGenerator[str, None]:
  47. started_at = time.time()
  48. try:
  49. yield format_sse_event(
  50. "connected",
  51. {
  52. "callback_task_id": callback_task_id,
  53. "status": "connected",
  54. "timestamp": int(time.time()),
  55. },
  56. )
  57. yield format_sse_event(
  58. "processing",
  59. {
  60. "callback_task_id": callback_task_id,
  61. "stage_name": "workflow_started",
  62. "status": "processing",
  63. "message": "文档 AI 对话工作流已启动",
  64. },
  65. )
  66. workflow = get_document_chat_workflow()
  67. state = await workflow.run(request, callback_task_id)
  68. data = workflow.to_response_data(state)
  69. data_dict = model_to_dict(data)
  70. if data.intent_result:
  71. yield format_sse_event(
  72. "intent",
  73. {
  74. "callback_task_id": callback_task_id,
  75. "intent_result": data.intent_result,
  76. },
  77. )
  78. if data.response_type in ("answer", "proposal"):
  79. yield format_sse_event(
  80. "skill_started",
  81. {
  82. "callback_task_id": callback_task_id,
  83. "skill_name": data.intent_result.get("skill_name") if data.intent_result else "",
  84. "response_type": data.response_type,
  85. },
  86. )
  87. if data.response_type == "answer" and data.answer:
  88. yield format_sse_event(
  89. "chunk",
  90. {
  91. "callback_task_id": callback_task_id,
  92. "chunk": data.answer,
  93. },
  94. )
  95. yield format_sse_event("answer_completed", data_dict)
  96. elif data.response_type == "proposal":
  97. if data.proposed_content:
  98. yield format_sse_event(
  99. "chunk",
  100. {
  101. "callback_task_id": callback_task_id,
  102. "chunk": data.proposed_content,
  103. },
  104. )
  105. yield format_sse_event("proposal_completed", data_dict)
  106. elif data.response_type in ("clarify", "unsupported"):
  107. yield format_sse_event("answer_completed", data_dict)
  108. else:
  109. yield format_sse_event("error", data_dict)
  110. yield format_sse_event(
  111. "completed",
  112. {
  113. "callback_task_id": callback_task_id,
  114. "status": state.get("overall_task_status", "completed"),
  115. "duration": round(time.time() - started_at, 3),
  116. },
  117. )
  118. except Exception as exc:
  119. logger.error(f"[DocumentChat] SSE request failed: {exc}", exc_info=True)
  120. yield format_sse_event(
  121. "error",
  122. {
  123. "callback_task_id": callback_task_id,
  124. "status": "error",
  125. "message": str(exc),
  126. },
  127. )
  128. @document_chat_router.get("/document_chat/health")
  129. async def document_chat_health():
  130. return {
  131. "status": "healthy",
  132. "module": "document_chat",
  133. "workflow": "langgraph",
  134. "skills": ["document-answer", "document-modify"],
  135. }