file_upload.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. """
  2. 文档上传接口实现
  3. 模拟文件上传功能,返回文件ID和回调任务ID
  4. """
  5. import ast
  6. import traceback
  7. import uuid
  8. import time
  9. from datetime import datetime
  10. from pydantic import BaseModel, Field
  11. from typing import Optional,List
  12. from foundation.utils import md5
  13. from foundation.infrastructure.config import config_handler
  14. from .schemas.error_schemas import FileUploadErrors
  15. from core.base.workflow_manager import WorkflowManager
  16. from foundation.observability.logger.loggering import server_logger as logger
  17. from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
  18. from core.base.redis_duplicate_checker import RedisDuplicateChecker
  19. from foundation.infrastructure.tracing import TraceContext, auto_trace
  20. file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
  21. uploaded_files = {}
  22. # 初始化工作流管理器
  23. workflow_manager = WorkflowManager(
  24. max_concurrent_docs=3,
  25. max_concurrent_reviews=5
  26. )
  27. # 使用workflow_manager的duplicatechecker实例,确保一致性
  28. duplicatechecker = workflow_manager.redis_duplicate_checker
  29. class FileUploadResponse(BaseModel):
  30. code: int
  31. data: dict
  32. def validate_upload_parameters(form_data) -> None:
  33. """验证请求参数"""
  34. allowed_params = {'file', 'user'} # 只允许这两个参数
  35. # 检查是否有不允许的参数
  36. extra_params = []
  37. for key in form_data.keys():
  38. if key not in allowed_params:
  39. extra_params.append(key)
  40. if extra_params:
  41. logger.warning(f"检测到不支持的参数: {extra_params}")
  42. raise FileUploadErrors.invalid_parameters(extra_params)
  43. def get_file_size(file: UploadFile) -> int:
  44. """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)"""
  45. try:
  46. content = file.file.read()
  47. size = len(content)
  48. file.file.seek(0)
  49. size_mb = size / (1024 * 1024)
  50. return size,round(size_mb, 2)
  51. except Exception as e:
  52. logger.warning(f"获取文件大小失败: {str(e)}")
  53. return 0, 0.0
  54. def validate_file(file: UploadFile, file_content: bytes = None) -> None:
  55. """验证文件格式"""
  56. file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else ''
  57. # 检测文件类型
  58. actual_file_type = 'unknown'
  59. if file_content.startswith(b'%PDF'):
  60. actual_file_type = 'pdf'
  61. elif file_content.startswith(b'PK\x03\x04'):
  62. if file_extension in ['.docx'] or file_extension in ['.doc'] :
  63. actual_file_type = 'docx/doc'
  64. else:
  65. logger.warning(f"未知文件类型,: {file_content[:20]}")
  66. raise FileUploadErrors.file_format_unsupported()
  67. else:
  68. logger.warning(f"未知文件类型,: {file_content[:20]}")
  69. raise FileUploadErrors.file_format_unsupported()
  70. logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})")
  71. @file_upload_router.post("/file_upload",response_model=FileUploadResponse)
  72. @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代
  73. async def file_upload(
  74. request: Request,
  75. file: List[UploadFile] = File([]),
  76. user: str = Form(None)
  77. ):
  78. """
  79. 文件上传接口
  80. """
  81. try:
  82. # 验证请求参数
  83. form_data = await request.form()
  84. validate_upload_parameters(form_data)
  85. valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS"))
  86. # 验证文件上传
  87. if not file or len(file) == 0:
  88. raise FileUploadErrors.file_missing()
  89. elif not file[0].filename:
  90. raise FileUploadErrors.file_missing()
  91. elif len(file) > 1:
  92. logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
  93. log_type="upload", trace_id=f"upload-{int(time.time())}")
  94. raise FileUploadErrors.file_multiple()
  95. # 验证文件数量
  96. if file and len(file) > 0:
  97. try:
  98. content = file[0].file.read()
  99. file[0].file.seek(0) # 重置文件指针
  100. except:
  101. content = b""
  102. validate_file(file[0], content)
  103. # 验证文件格式
  104. file_size, file_size_mb = get_file_size(file[0])
  105. if file_size == 0:
  106. raise FileUploadErrors.file_missing()
  107. # 验证文件大小限制
  108. if file_size_mb > 50: # 文件大小不能超过50MB
  109. raise FileUploadErrors.file_size_exceeded()
  110. # 验证用户标识
  111. if user is None or user not in valid_users:
  112. raise FileUploadErrors.invalid_user()
  113. # 生成文件MD5ID
  114. file_id = md5.md5_id(content)
  115. created_at = int(time.time())
  116. # 详细文件信息调试
  117. logger.info(f"=== 文件详细信息 ===")
  118. logger.info(f"文件名: {file[0].filename}")
  119. logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}")
  120. logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}")
  121. logger.info(f"文件大小: {file_size_mb} MB")
  122. logger.info(f"========================", log_type="upload")
  123. logger.info(f"用户标识: {user}")
  124. # 确定文件类型
  125. file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else ''
  126. if content.startswith(b'%PDF'):
  127. file_type = 'pdf'
  128. elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']:
  129. file_type = 'docx'
  130. else:
  131. file_type = 'unknown'
  132. # 生成任务ID
  133. callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
  134. TraceContext.set_trace_id(callback_task_id)
  135. logger.info(f"设置任务trace_id: {callback_task_id}")
  136. # 记录文件信息
  137. file_info = {
  138. 'file_id': file_id,
  139. 'file_content': content,
  140. 'user_id': user,
  141. 'file_type': file_type,
  142. 'callback_task_id': callback_task_id,
  143. "file_name": file[0].filename,
  144. "file_size": file_size_mb,
  145. 'updated_at': created_at
  146. }
  147. # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用
  148. try:
  149. from foundation.utils.redis_utils import store_file_info
  150. # 使用file_id作为键存储文件信息(1小时过期)
  151. success = await store_file_info(file_id, file_info, 3600)
  152. if success:
  153. logger.info(f"文件信息已缓存到Redis: file_info:{file_id}")
  154. else:
  155. logger.warning(f"缓存文件信息到Redis失败")
  156. except Exception as e:
  157. logger.warning(f"缓存文件信息到Redis失败: {str(e)}")
  158. # 不影响主流程,继续处理
  159. # 预注册任务到重复检查器,以便启动审查时验证任务ID
  160. try:
  161. await duplicatechecker.register_task(file_info, callback_task_id)
  162. logger.info(f"任务已预注册: {callback_task_id}")
  163. except Exception as e:
  164. logger.error(f"任务预注册失败: {str(e)}")
  165. # 预注册失败不应影响文件上传成功
  166. # try:
  167. # # 提交处理任务到工作流管理器
  168. # await workflow_manager.submit_task_processing(file_info)
  169. # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}")
  170. return FileUploadResponse(
  171. code=200,
  172. data={
  173. "id": file_info['file_id'],
  174. "name": file_info['file_name'],
  175. "size": file_size_mb,
  176. "created_at": created_at,
  177. "status": "file_upload_success",
  178. "callback_task_id": file_info['callback_task_id']
  179. }
  180. )
  181. # except Exception as workflow_error:
  182. # logger.error(f"工作流提交失败: {str(workflow_error)}")
  183. # raise FileUploadErrors.internal_error(workflow_error)
  184. except HTTPException:
  185. logger.error(f"HTTP异常: {traceback.format_exc()}")
  186. raise
  187. except Exception as e:
  188. logger.error(f"文件上传失败: {str(e)}")
  189. logger.error(f"错误详情: {traceback.format_exc()}")
  190. raise FileUploadErrors.internal_error(e)