| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- """
- 文档上传接口实现
- 模拟文件上传功能,返回文件ID和回调任务ID
- """
- import ast
- import traceback
- import time
- import tempfile
- import subprocess
- import os
- from datetime import datetime
- from pathlib import Path
- from pydantic import BaseModel
- from typing import List
- from foundation.utils import md5
- from foundation.infrastructure.config import config_handler
- from .schemas.error_schemas import FileUploadErrors
- from core.base.workflow_manager import WorkflowManager
- from foundation.observability.logger.loggering import review_logger as logger
- from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- def _find_soffice_path() -> str:
- """
- 查找 LibreOffice soffice 可执行文件路径
- Returns:
- str: soffice 可执行文件路径
- Raises:
- FileNotFoundError: 未找到 LibreOffice
- """
- import platform
- import shutil
- # Linux/Docker 环境:直接使用 soffice
- if platform.system() != 'Windows':
- return 'soffice'
- # Windows 环境:首先从 PATH 中查找
- soffice_in_path = shutil.which('soffice')
- if soffice_in_path:
- logger.info(f"从 PATH 找到 LibreOffice: {soffice_in_path}")
- return soffice_in_path
- # 备选:检测常见安装路径
- possible_paths = [
- r"C:\Program Files\LibreOffice\program\soffice.exe",
- r"C:\Program Files (x86)\LibreOffice\program\soffice.exe",
- ]
- for path in possible_paths:
- if os.path.exists(path):
- logger.info(f"找到 LibreOffice: {path}")
- return path
- raise FileNotFoundError(
- "LibreOffice 未安装。请从 https://www.libreoffice.org/download/ 下载安装,"
- "或确保 soffice.exe 在 PATH 中"
- )
- def convert_docx_to_pdf(docx_content: bytes, filename: str) -> tuple[bytes, str]:
- """
- 将 docx/doc 文件内容转换为 PDF
- Windows 开发环境: 优先使用 docx2pdf (Microsoft Word COM),回退到 LibreOffice
- Linux/Docker 生产环境: 使用 LibreOffice (soffice)
- Args:
- docx_content: docx/doc 文件的二进制内容
- filename: 原始文件名
- Returns:
- tuple[bytes, str]: (PDF 文件内容, 原始文件名)
- Raises:
- Exception: 转换失败时抛出异常
- """
- import platform
- # Windows 环境:优先尝试 docx2pdf (Microsoft Word COM)
- if platform.system() == 'Windows':
- try:
- from docx2pdf import convert
- return _convert_via_docx2pdf(docx_content, filename, convert)
- except ImportError:
- logger.info("docx2pdf 未安装,使用 LibreOffice")
- except Exception as e:
- logger.warning(f"docx2pdf 转换失败,回退到 LibreOffice: {str(e)}")
- # Linux/Docker 或 Windows 回退:使用 LibreOffice
- return _convert_via_libreoffice(docx_content, filename)
- def _convert_via_docx2pdf(docx_content: bytes, filename: str, convert_func) -> tuple[bytes, str]:
- """使用 docx2pdf (Microsoft Word COM) 转换,返回 PDF 内容和原始文件名"""
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_dir_path = Path(temp_dir)
- # 保存原始文件
- original_ext = Path(filename).suffix.lower()
- temp_input = temp_dir_path / f"input{original_ext}"
- temp_output = temp_dir_path / "output.pdf"
- temp_input.write_bytes(docx_content)
- logger.info(f"使用 Microsoft Word 转换 {filename} 为 PDF...")
- convert_func(str(temp_input), str(temp_output))
- if not temp_output.exists():
- raise Exception("转换后未找到 PDF 文件")
- pdf_content = temp_output.read_bytes()
- logger.info(f"成功转换 {filename} 为 PDF, 大小: {len(pdf_content) / 1024:.2f} KB")
- return pdf_content, filename # 返回原始文件名
- def _convert_via_libreoffice(docx_content: bytes, filename: str) -> tuple[bytes, str]:
- """使用 LibreOffice (soffice) 转换,返回 PDF 内容和原始文件名"""
- # 创建临时目录
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_dir_path = Path(temp_dir)
- # 保存原始文件到临时目录
- original_ext = Path(filename).suffix.lower()
- base_name = Path(filename).stem
- temp_input = temp_dir_path / f"input{original_ext}"
- temp_input.write_bytes(docx_content)
- logger.info(f"使用 LibreOffice 转换 {filename} 为 PDF...")
- # 查找 LibreOffice 路径
- try:
- soffice_path = _find_soffice_path()
- except FileNotFoundError as e:
- logger.error(str(e))
- raise Exception(str(e))
- # 使用 LibreOffice 转换
- try:
- result = subprocess.run(
- [
- soffice_path, '--headless', '--convert-to', 'pdf',
- '--outdir', str(temp_dir_path),
- str(temp_input)
- ],
- capture_output=True,
- text=True,
- timeout=120 # 2分钟超时
- )
- if result.returncode != 0:
- logger.error(f"LibreOffice 转换失败: {result.stderr}")
- raise Exception(f"LibreOffice 转换失败: {result.stderr}")
- # 查找生成的 PDF 文件
- pdf_files = list(temp_dir_path.glob("*.pdf"))
- if not pdf_files:
- raise Exception("转换后未找到 PDF 文件")
- pdf_file = pdf_files[0]
- pdf_content = pdf_file.read_bytes()
- logger.info(f"成功转换 {filename} 为 PDF, 大小: {len(pdf_content) / 1024:.2f} KB")
- return pdf_content, filename # 返回原始文件名
- except subprocess.TimeoutExpired:
- raise Exception("LibreOffice 转换超时")
- except FileNotFoundError:
- raise Exception("LibreOffice 未安装或 soffice 命令不可用")
- file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
- uploaded_files = {}
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 使用workflow_manager的duplicatechecker实例,确保一致性
- duplicatechecker = workflow_manager.redis_duplicate_checker
- class FileUploadResponse(BaseModel):
- code: int
- data: dict
- def validate_upload_parameters(form_data) -> None:
- """验证请求参数"""
- allowed_params = {'file', 'user'} # 只允许这两个参数
- # 检查是否有不允许的参数
- extra_params = []
- for key in form_data.keys():
- if key not in allowed_params:
- extra_params.append(key)
- if extra_params:
- logger.warning(f"检测到不支持的参数: {extra_params}")
- raise FileUploadErrors.invalid_parameters(extra_params)
- def get_file_size(file: UploadFile) -> int:
- """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)"""
- try:
- content = file.file.read()
- size = len(content)
- file.file.seek(0)
- size_mb = size / (1024 * 1024)
- return size,round(size_mb, 2)
- except Exception as e:
- logger.warning(f"获取文件大小失败: {str(e)}")
- return 0, 0.0
- def validate_file(file: UploadFile, file_content: bytes = None) -> None:
- """验证文件格式"""
- file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else ''
- # 检测文件类型
- actual_file_type = 'unknown'
- if file_content.startswith(b'%PDF'):
- actual_file_type = 'pdf'
- elif file_content.startswith(b'PK\x03\x04'):
- if file_extension in ['.docx'] or file_extension in ['.doc'] :
- actual_file_type = 'docx/doc'
- else:
- logger.warning(f"未知文件类型,: {file_content[:20]}")
- raise FileUploadErrors.file_format_unsupported()
- else:
- logger.warning(f"未知文件类型,: {file_content[:20]}")
- raise FileUploadErrors.file_format_unsupported()
- logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})")
- @file_upload_router.post("/file_upload",response_model=FileUploadResponse)
- @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代
- async def file_upload(
- request: Request,
- file: List[UploadFile] = File([]),
- user: str = Form(None)
- ):
- """
- 文件上传接口
- """
- try:
- # 验证请求参数
- form_data = await request.form()
- validate_upload_parameters(form_data)
- valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS"))
-
- # 验证文件上传
- if not file or len(file) == 0:
- raise FileUploadErrors.file_missing()
- elif not file[0].filename:
- raise FileUploadErrors.file_missing()
- elif len(file) > 1:
- logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
- log_type="upload", trace_id=f"upload-{int(time.time())}")
- raise FileUploadErrors.file_multiple()
-
- # 验证文件数量
- if file and len(file) > 0:
- try:
- content = file[0].file.read()
- file[0].file.seek(0) # 重置文件指针
- except:
- content = b""
- validate_file(file[0], content)
- # 验证文件格式
- file_size, file_size_mb = get_file_size(file[0])
- if file_size == 0:
- raise FileUploadErrors.file_missing()
- # 验证文件大小限制
- if file_size_mb > 50: # 文件大小不能超过50MB
- raise FileUploadErrors.file_size_exceeded()
- # 验证用户标识
- if user is None or user not in valid_users:
- raise FileUploadErrors.invalid_user()
- # 生成文件MD5ID(基于原始文件内容,用于重复检测)
- file_id = md5.md5_id(content)
- created_at = int(time.time())
- # 详细文件信息调试
- logger.info(f"=== 文件详细信息 ===")
- logger.info(f"文件名: {file[0].filename}")
- logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}")
- logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}")
- logger.info(f"文件大小: {file_size_mb} MB")
- logger.info(f"========================", log_type="upload")
- logger.info(f"用户标识: {user}")
- # 确定文件类型
- file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else ''
- original_filename = file[0].filename # 保存原始文件名
- if content.startswith(b'%PDF'):
- file_type = 'pdf'
- elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']:
- # 检测到 docx/doc 文件,转换为 PDF
- logger.info(f"检测到 {file_extension} 文件,正在转换为 PDF...")
- try:
- pdf_content, _ = convert_docx_to_pdf(content, original_filename)
- # 更新文件内容(文件名和 MD5 保持不变,用于重复检测)
- content = pdf_content
- file_type = 'pdf' # 标记为 PDF 类型,后续流程按 PDF 处理
- file_size = len(pdf_content)
- file_size_mb = round(file_size / (1024 * 1024), 2)
- # 注意:file_id 保持不变(基于原始 docx 内容),用于重复文件检测
- logger.info(f"文件已转换为 PDF,大小: {file_size_mb} MB")
- except Exception as convert_error:
- logger.error(f"docx 转 PDF 失败: {str(convert_error)}")
- raise FileUploadErrors.internal_error(f"文档转换失败: {str(convert_error)}")
- else:
- file_type = 'unknown'
- # 生成任务ID
- callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
- TraceContext.set_trace_id(callback_task_id)
- logger.info(f"设置任务trace_id: {callback_task_id}")
-
- # 记录文件信息
- file_info = {
- 'file_id': file_id,
- 'file_content': content,
- 'user_id': user,
- 'file_type': file_type,
- 'callback_task_id': callback_task_id,
- "file_name": original_filename, # 保持原始文件名(docx 转 PDF 后仍显示原始文件名)
- "file_size": file_size_mb,
- 'updated_at': created_at
- }
- # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用
- try:
- from foundation.utils.redis_utils import store_file_info
- # 使用file_id作为键存储文件信息(1小时过期)
- success = await store_file_info(file_id, file_info, 3600)
- if success:
- logger.info(f"文件信息已缓存到Redis: file_info:{file_id}")
- else:
- logger.warning(f"缓存文件信息到Redis失败")
- except Exception as e:
- logger.warning(f"缓存文件信息到Redis失败: {str(e)}")
- # 不影响主流程,继续处理
- # 预注册任务到重复检查器,以便启动审查时验证任务ID
- try:
- await duplicatechecker.register_task(file_info, callback_task_id)
- logger.info(f"任务已预注册: {callback_task_id}")
- except Exception as e:
- logger.error(f"任务预注册失败: {str(e)}")
- # 预注册失败不应影响文件上传成功
- # try:
- # # 提交处理任务到工作流管理器
- # await workflow_manager.submit_task_processing(file_info)
- # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}")
- return FileUploadResponse(
- code=200,
- data={
- "id": file_info['file_id'],
- "name": file_info['file_name'],
- "size": file_size_mb,
- "created_at": created_at,
- "status": "file_upload_success",
- "callback_task_id": file_info['callback_task_id']
- }
- )
- # except Exception as workflow_error:
- # logger.error(f"工作流提交失败: {str(workflow_error)}")
- # raise FileUploadErrors.internal_error(workflow_error)
- except HTTPException:
- logger.error(f"HTTP异常: {traceback.format_exc()}")
- raise
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- logger.error(f"错误详情: {traceback.format_exc()}")
- raise FileUploadErrors.internal_error(e)
|