file_upload.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. """
  2. 文档上传接口实现
  3. 模拟文件上传功能,返回文件ID和回调任务ID
  4. """
  5. import ast
  6. import traceback
  7. import time
  8. import tempfile
  9. import subprocess
  10. import os
  11. from datetime import datetime
  12. from pathlib import Path
  13. from pydantic import BaseModel
  14. from typing import List
  15. from foundation.utils import md5
  16. from foundation.infrastructure.config import config_handler
  17. from .schemas.error_schemas import FileUploadErrors
  18. from core.base.workflow_manager import WorkflowManager
  19. from foundation.observability.logger.loggering import review_logger as logger
  20. from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
  21. from foundation.infrastructure.tracing import TraceContext, auto_trace
  22. def _find_soffice_path() -> str:
  23. """
  24. 查找 LibreOffice soffice 可执行文件路径
  25. Returns:
  26. str: soffice 可执行文件路径
  27. Raises:
  28. FileNotFoundError: 未找到 LibreOffice
  29. """
  30. import platform
  31. import shutil
  32. # Linux/Docker 环境:直接使用 soffice
  33. if platform.system() != 'Windows':
  34. return 'soffice'
  35. # Windows 环境:首先从 PATH 中查找
  36. soffice_in_path = shutil.which('soffice')
  37. if soffice_in_path:
  38. logger.info(f"从 PATH 找到 LibreOffice: {soffice_in_path}")
  39. return soffice_in_path
  40. # 备选:检测常见安装路径
  41. possible_paths = [
  42. r"C:\Program Files\LibreOffice\program\soffice.exe",
  43. r"C:\Program Files (x86)\LibreOffice\program\soffice.exe",
  44. ]
  45. for path in possible_paths:
  46. if os.path.exists(path):
  47. logger.info(f"找到 LibreOffice: {path}")
  48. return path
  49. raise FileNotFoundError(
  50. "LibreOffice 未安装。请从 https://www.libreoffice.org/download/ 下载安装,"
  51. "或确保 soffice.exe 在 PATH 中"
  52. )
  53. def convert_docx_to_pdf(docx_content: bytes, filename: str) -> tuple[bytes, str]:
  54. """
  55. 将 docx/doc 文件内容转换为 PDF
  56. Windows 开发环境: 优先使用 docx2pdf (Microsoft Word COM),回退到 LibreOffice
  57. Linux/Docker 生产环境: 使用 LibreOffice (soffice)
  58. Args:
  59. docx_content: docx/doc 文件的二进制内容
  60. filename: 原始文件名
  61. Returns:
  62. tuple[bytes, str]: (PDF 文件内容, 原始文件名)
  63. Raises:
  64. Exception: 转换失败时抛出异常
  65. """
  66. import platform
  67. # Windows 环境:优先尝试 docx2pdf (Microsoft Word COM)
  68. if platform.system() == 'Windows':
  69. try:
  70. from docx2pdf import convert
  71. return _convert_via_docx2pdf(docx_content, filename, convert)
  72. except ImportError:
  73. logger.info("docx2pdf 未安装,使用 LibreOffice")
  74. except Exception as e:
  75. logger.warning(f"docx2pdf 转换失败,回退到 LibreOffice: {str(e)}")
  76. # Linux/Docker 或 Windows 回退:使用 LibreOffice
  77. return _convert_via_libreoffice(docx_content, filename)
  78. def _convert_via_docx2pdf(docx_content: bytes, filename: str, convert_func) -> tuple[bytes, str]:
  79. """使用 docx2pdf (Microsoft Word COM) 转换,返回 PDF 内容和原始文件名"""
  80. with tempfile.TemporaryDirectory() as temp_dir:
  81. temp_dir_path = Path(temp_dir)
  82. # 保存原始文件
  83. original_ext = Path(filename).suffix.lower()
  84. temp_input = temp_dir_path / f"input{original_ext}"
  85. temp_output = temp_dir_path / "output.pdf"
  86. temp_input.write_bytes(docx_content)
  87. logger.info(f"使用 Microsoft Word 转换 {filename} 为 PDF...")
  88. convert_func(str(temp_input), str(temp_output))
  89. if not temp_output.exists():
  90. raise Exception("转换后未找到 PDF 文件")
  91. pdf_content = temp_output.read_bytes()
  92. logger.info(f"成功转换 {filename} 为 PDF, 大小: {len(pdf_content) / 1024:.2f} KB")
  93. return pdf_content, filename # 返回原始文件名
  94. def _convert_via_libreoffice(docx_content: bytes, filename: str) -> tuple[bytes, str]:
  95. """使用 LibreOffice (soffice) 转换,返回 PDF 内容和原始文件名"""
  96. # 创建临时目录
  97. with tempfile.TemporaryDirectory() as temp_dir:
  98. temp_dir_path = Path(temp_dir)
  99. # 保存原始文件到临时目录
  100. original_ext = Path(filename).suffix.lower()
  101. base_name = Path(filename).stem
  102. temp_input = temp_dir_path / f"input{original_ext}"
  103. temp_input.write_bytes(docx_content)
  104. logger.info(f"使用 LibreOffice 转换 {filename} 为 PDF...")
  105. # 查找 LibreOffice 路径
  106. try:
  107. soffice_path = _find_soffice_path()
  108. except FileNotFoundError as e:
  109. logger.error(str(e))
  110. raise Exception(str(e))
  111. # 使用 LibreOffice 转换
  112. try:
  113. result = subprocess.run(
  114. [
  115. soffice_path, '--headless', '--convert-to', 'pdf',
  116. '--outdir', str(temp_dir_path),
  117. str(temp_input)
  118. ],
  119. capture_output=True,
  120. text=True,
  121. timeout=120 # 2分钟超时
  122. )
  123. if result.returncode != 0:
  124. logger.error(f"LibreOffice 转换失败: {result.stderr}")
  125. raise Exception(f"LibreOffice 转换失败: {result.stderr}")
  126. # 查找生成的 PDF 文件
  127. pdf_files = list(temp_dir_path.glob("*.pdf"))
  128. if not pdf_files:
  129. raise Exception("转换后未找到 PDF 文件")
  130. pdf_file = pdf_files[0]
  131. pdf_content = pdf_file.read_bytes()
  132. logger.info(f"成功转换 {filename} 为 PDF, 大小: {len(pdf_content) / 1024:.2f} KB")
  133. return pdf_content, filename # 返回原始文件名
  134. except subprocess.TimeoutExpired:
  135. raise Exception("LibreOffice 转换超时")
  136. except FileNotFoundError:
  137. raise Exception("LibreOffice 未安装或 soffice 命令不可用")
  138. file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
  139. uploaded_files = {}
  140. # 初始化工作流管理器
  141. workflow_manager = WorkflowManager(
  142. max_concurrent_docs=3,
  143. max_concurrent_reviews=5
  144. )
  145. # 使用workflow_manager的duplicatechecker实例,确保一致性
  146. duplicatechecker = workflow_manager.redis_duplicate_checker
  147. class FileUploadResponse(BaseModel):
  148. code: int
  149. data: dict
  150. def validate_upload_parameters(form_data) -> None:
  151. """验证请求参数"""
  152. allowed_params = {'file', 'user'} # 只允许这两个参数
  153. # 检查是否有不允许的参数
  154. extra_params = []
  155. for key in form_data.keys():
  156. if key not in allowed_params:
  157. extra_params.append(key)
  158. if extra_params:
  159. logger.warning(f"检测到不支持的参数: {extra_params}")
  160. raise FileUploadErrors.invalid_parameters(extra_params)
  161. def get_file_size(file: UploadFile) -> int:
  162. """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)"""
  163. try:
  164. content = file.file.read()
  165. size = len(content)
  166. file.file.seek(0)
  167. size_mb = size / (1024 * 1024)
  168. return size,round(size_mb, 2)
  169. except Exception as e:
  170. logger.warning(f"获取文件大小失败: {str(e)}")
  171. return 0, 0.0
  172. def validate_file(file: UploadFile, file_content: bytes = None) -> None:
  173. """验证文件格式"""
  174. file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else ''
  175. # 检测文件类型
  176. actual_file_type = 'unknown'
  177. if file_content.startswith(b'%PDF'):
  178. actual_file_type = 'pdf'
  179. elif file_content.startswith(b'PK\x03\x04'):
  180. if file_extension in ['.docx'] or file_extension in ['.doc'] :
  181. actual_file_type = 'docx/doc'
  182. else:
  183. logger.warning(f"未知文件类型,: {file_content[:20]}")
  184. raise FileUploadErrors.file_format_unsupported()
  185. else:
  186. logger.warning(f"未知文件类型,: {file_content[:20]}")
  187. raise FileUploadErrors.file_format_unsupported()
  188. logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})")
  189. @file_upload_router.post("/file_upload",response_model=FileUploadResponse)
  190. @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代
  191. async def file_upload(
  192. request: Request,
  193. file: List[UploadFile] = File([]),
  194. user: str = Form(None)
  195. ):
  196. """
  197. 文件上传接口
  198. """
  199. try:
  200. # 验证请求参数
  201. form_data = await request.form()
  202. validate_upload_parameters(form_data)
  203. valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS"))
  204. # 验证文件上传
  205. if not file or len(file) == 0:
  206. raise FileUploadErrors.file_missing()
  207. elif not file[0].filename:
  208. raise FileUploadErrors.file_missing()
  209. elif len(file) > 1:
  210. logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
  211. log_type="upload", trace_id=f"upload-{int(time.time())}")
  212. raise FileUploadErrors.file_multiple()
  213. # 验证文件数量
  214. if file and len(file) > 0:
  215. try:
  216. content = file[0].file.read()
  217. file[0].file.seek(0) # 重置文件指针
  218. except:
  219. content = b""
  220. validate_file(file[0], content)
  221. # 验证文件格式
  222. file_size, file_size_mb = get_file_size(file[0])
  223. if file_size == 0:
  224. raise FileUploadErrors.file_missing()
  225. # 验证文件大小限制
  226. if file_size_mb > 50: # 文件大小不能超过50MB
  227. raise FileUploadErrors.file_size_exceeded()
  228. # 验证用户标识
  229. if user is None or user not in valid_users:
  230. raise FileUploadErrors.invalid_user()
  231. # 生成文件MD5ID(基于原始文件内容,用于重复检测)
  232. file_id = md5.md5_id(content)
  233. created_at = int(time.time())
  234. # 详细文件信息调试
  235. logger.info(f"=== 文件详细信息 ===")
  236. logger.info(f"文件名: {file[0].filename}")
  237. logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}")
  238. logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}")
  239. logger.info(f"文件大小: {file_size_mb} MB")
  240. logger.info(f"========================", log_type="upload")
  241. logger.info(f"用户标识: {user}")
  242. # 确定文件类型
  243. file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else ''
  244. original_filename = file[0].filename # 保存原始文件名
  245. if content.startswith(b'%PDF'):
  246. file_type = 'pdf'
  247. elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']:
  248. # 检测到 docx/doc 文件,转换为 PDF
  249. logger.info(f"检测到 {file_extension} 文件,正在转换为 PDF...")
  250. try:
  251. pdf_content, _ = convert_docx_to_pdf(content, original_filename)
  252. # 更新文件内容(文件名和 MD5 保持不变,用于重复检测)
  253. content = pdf_content
  254. file_type = 'pdf' # 标记为 PDF 类型,后续流程按 PDF 处理
  255. file_size = len(pdf_content)
  256. file_size_mb = round(file_size / (1024 * 1024), 2)
  257. # 注意:file_id 保持不变(基于原始 docx 内容),用于重复文件检测
  258. logger.info(f"文件已转换为 PDF,大小: {file_size_mb} MB")
  259. except Exception as convert_error:
  260. logger.error(f"docx 转 PDF 失败: {str(convert_error)}")
  261. raise FileUploadErrors.internal_error(f"文档转换失败: {str(convert_error)}")
  262. else:
  263. file_type = 'unknown'
  264. # 生成任务ID
  265. callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
  266. TraceContext.set_trace_id(callback_task_id)
  267. logger.info(f"设置任务trace_id: {callback_task_id}")
  268. # 记录文件信息
  269. file_info = {
  270. 'file_id': file_id,
  271. 'file_content': content,
  272. 'user_id': user,
  273. 'file_type': file_type,
  274. 'callback_task_id': callback_task_id,
  275. "file_name": original_filename, # 保持原始文件名(docx 转 PDF 后仍显示原始文件名)
  276. "file_size": file_size_mb,
  277. 'updated_at': created_at
  278. }
  279. # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用
  280. try:
  281. from foundation.utils.redis_utils import store_file_info
  282. # 使用file_id作为键存储文件信息(1小时过期)
  283. success = await store_file_info(file_id, file_info, 3600)
  284. if success:
  285. logger.info(f"文件信息已缓存到Redis: file_info:{file_id}")
  286. else:
  287. logger.warning(f"缓存文件信息到Redis失败")
  288. except Exception as e:
  289. logger.warning(f"缓存文件信息到Redis失败: {str(e)}")
  290. # 不影响主流程,继续处理
  291. # 预注册任务到重复检查器,以便启动审查时验证任务ID
  292. try:
  293. await duplicatechecker.register_task(file_info, callback_task_id)
  294. logger.info(f"任务已预注册: {callback_task_id}")
  295. except Exception as e:
  296. logger.error(f"任务预注册失败: {str(e)}")
  297. # 预注册失败不应影响文件上传成功
  298. # try:
  299. # # 提交处理任务到工作流管理器
  300. # await workflow_manager.submit_task_processing(file_info)
  301. # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}")
  302. return FileUploadResponse(
  303. code=200,
  304. data={
  305. "id": file_info['file_id'],
  306. "name": file_info['file_name'],
  307. "size": file_size_mb,
  308. "created_at": created_at,
  309. "status": "file_upload_success",
  310. "callback_task_id": file_info['callback_task_id']
  311. }
  312. )
  313. # except Exception as workflow_error:
  314. # logger.error(f"工作流提交失败: {str(workflow_error)}")
  315. # raise FileUploadErrors.internal_error(workflow_error)
  316. except HTTPException:
  317. logger.error(f"HTTP异常: {traceback.format_exc()}")
  318. raise
  319. except Exception as e:
  320. logger.error(f"文件上传失败: {str(e)}")
  321. logger.error(f"错误详情: {traceback.format_exc()}")
  322. raise FileUploadErrors.internal_error(e)