""" 文档上传接口实现 模拟文件上传功能,返回文件ID和回调任务ID """ import ast import traceback import uuid import time from datetime import datetime from pydantic import BaseModel, Field from typing import Optional,List from foundation.utils import md5 from foundation.base.config import config_handler from .schemas.error_schemas import FileUploadErrors from core.base.workflow_manager import WorkflowManager from foundation.logger.loggering import server_logger as logger from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request from core.base.redis_duplicate_checker import RedisDuplicateChecker from foundation.trace.trace_context import TraceContext, auto_trace file_upload_router = APIRouter(prefix="/sgsc", tags=["文档上传"]) uploaded_files = {} # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) # 使用workflow_manager的duplicatechecker实例,确保一致性 duplicatechecker = workflow_manager.redis_duplicate_checker class FileUploadResponse(BaseModel): code: int data: dict def validate_upload_parameters(form_data) -> None: """验证请求参数""" allowed_params = {'file', 'user'} # 只允许这两个参数 # 检查是否有不允许的参数 extra_params = [] for key in form_data.keys(): if key not in allowed_params: extra_params.append(key) if extra_params: logger.warning(f"检测到不支持的参数: {extra_params}") raise FileUploadErrors.invalid_parameters(extra_params) def get_file_size(file: UploadFile) -> int: """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)""" try: content = file.file.read() size = len(content) file.file.seek(0) size_mb = size / (1024 * 1024) return size,round(size_mb, 2) except Exception as e: logger.warning(f"获取文件大小失败: {str(e)}") return 0, 0.0 def validate_file(file: UploadFile, file_content: bytes = None) -> None: """验证文件格式""" file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else '' # 检测文件类型 actual_file_type = 'unknown' if file_content.startswith(b'%PDF'): actual_file_type = 'pdf' elif file_content.startswith(b'PK\x03\x04'): if file_extension in ['.docx'] or file_extension in ['.doc'] : actual_file_type = 'docx/doc' else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})") @file_upload_router.post("/file_upload", response_model=FileUploadResponse) @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代 async def file_upload( request: Request, file: List[UploadFile] = File([]), user: str = Form(None) ): """ 文件上传接口 """ try: # 验证请求参数 form_data = await request.form() validate_upload_parameters(form_data) valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS")) # 验证文件上传 if not file or len(file) == 0: raise FileUploadErrors.file_missing() elif not file[0].filename: raise FileUploadErrors.file_missing() elif len(file) > 1: logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}", log_type="upload", trace_id=f"upload-{int(time.time())}") raise FileUploadErrors.file_multiple() # 验证文件数量 if file and len(file) > 0: try: content = file[0].file.read() file[0].file.seek(0) # 重置文件指针 except: content = b"" validate_file(file[0], content) # 验证文件格式 file_size, file_size_mb = get_file_size(file[0]) if file_size == 0: raise FileUploadErrors.file_missing() # 验证文件大小限制 if file_size_mb > 50: # 文件大小不能超过50MB raise FileUploadErrors.file_size_exceeded() # 验证用户标识 if user is None or user not in valid_users: raise FileUploadErrors.invalid_user() # 生成文件MD5ID file_id = md5.md5_id(content) created_at = int(time.time()) # 详细文件信息调试 logger.info(f"=== 文件详细信息 ===") logger.info(f"文件名: {file[0].filename}") logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}") logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}") logger.info(f"文件大小: {file_size_mb} MB") logger.info(f"========================", log_type="upload") logger.info(f"用户标识: {user}") # 确定文件类型 file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else '' if content.startswith(b'%PDF'): file_type = 'pdf' elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']: file_type = 'docx' else: file_type = 'unknown' # 生成任务ID callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}" TraceContext.set_trace_id(callback_task_id) logger.info(f"设置任务trace_id: {callback_task_id}") # 记录文件信息 file_info = { 'file_id': file_id, 'file_content': content, 'user_id': user, 'file_type': file_type, 'callback_task_id': callback_task_id, "file_name": file[0].filename, "file_size": file_size_mb, 'updated_at': created_at } # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用 try: from foundation.utils.redis_utils import store_file_info # 使用file_id作为键存储文件信息(1小时过期) success = await store_file_info(file_id, file_info, 3600) if success: logger.info(f"文件信息已缓存到Redis: file_info:{file_id}") else: logger.warning(f"缓存文件信息到Redis失败") except Exception as e: logger.warning(f"缓存文件信息到Redis失败: {str(e)}") # 不影响主流程,继续处理 # 预注册任务到重复检查器,以便启动审查时验证任务ID try: await duplicatechecker.register_task(file_info, callback_task_id) logger.info(f"任务已预注册: {callback_task_id}") except Exception as e: logger.error(f"任务预注册失败: {str(e)}") # 预注册失败不应影响文件上传成功 # try: # # 提交处理任务到工作流管理器 # await workflow_manager.submit_task_processing(file_info) # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}") return FileUploadResponse( code=200, data={ "id": file_info['file_id'], "name": file_info['file_name'], "size": file_size_mb, "created_at": created_at, "status": "file_upload_success", "callback_task_id": file_info['callback_task_id'] } ) # except Exception as workflow_error: # logger.error(f"工作流提交失败: {str(workflow_error)}") # raise FileUploadErrors.internal_error(workflow_error) except HTTPException: logger.error(f"HTTP异常: {traceback.format_exc()}") raise except Exception as e: logger.error(f"文件上传失败: {str(e)}") logger.error(f"错误详情: {traceback.format_exc()}") raise FileUploadErrors.internal_error(e)