""" 文档上传接口实现 模拟文件上传功能,返回文件ID和回调任务ID """ import ast import traceback import time from datetime import datetime from pydantic import BaseModel from typing import List from foundation.utils import md5 from foundation.infrastructure.config import config_handler from .schemas.error_schemas import FileUploadErrors from core.base.workflow_manager import WorkflowManager from foundation.observability.logger.loggering import review_logger as logger from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request from foundation.infrastructure.tracing import TraceContext, auto_trace file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"]) uploaded_files = {} # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) # 使用workflow_manager的duplicatechecker实例,确保一致性 duplicatechecker = workflow_manager.redis_duplicate_checker class FileUploadResponse(BaseModel): code: int data: dict def validate_upload_parameters(form_data) -> None: """验证请求参数""" allowed_params = {'file', 'user'} # 只允许这两个参数 # 检查是否有不允许的参数 extra_params = [] for key in form_data.keys(): if key not in allowed_params: extra_params.append(key) if extra_params: logger.warning(f"检测到不支持的参数: {extra_params}") raise FileUploadErrors.invalid_parameters(extra_params) def get_file_size(file: UploadFile) -> int: """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)""" try: content = file.file.read() size = len(content) file.file.seek(0) size_mb = size / (1024 * 1024) return size,round(size_mb, 2) except Exception as e: logger.warning(f"获取文件大小失败: {str(e)}") return 0, 0.0 def validate_file(file: UploadFile, file_content: bytes = None) -> None: """验证文件格式""" file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else '' # 检测文件类型 actual_file_type = 'unknown' if file_content.startswith(b'%PDF'): actual_file_type = 'pdf' elif file_content.startswith(b'PK\x03\x04'): if file_extension in ['.docx'] or file_extension in ['.doc'] : actual_file_type = 'docx/doc' else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})") @file_upload_router.post("/file_upload",response_model=FileUploadResponse) @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代 async def file_upload( request: Request, file: List[UploadFile] = File([]), user: str = Form(None) ): """ 文件上传接口 """ try: # 验证请求参数 form_data = await request.form() validate_upload_parameters(form_data) valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS")) # 验证文件上传 if not file or len(file) == 0: raise FileUploadErrors.file_missing() elif not file[0].filename: raise FileUploadErrors.file_missing() elif len(file) > 1: logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}", log_type="upload", trace_id=f"upload-{int(time.time())}") raise FileUploadErrors.file_multiple() # 验证文件数量 if file and len(file) > 0: try: content = file[0].file.read() file[0].file.seek(0) # 重置文件指针 except: content = b"" validate_file(file[0], content) # 验证文件格式 file_size, file_size_mb = get_file_size(file[0]) if file_size == 0: raise FileUploadErrors.file_missing() # 验证文件大小限制 if file_size_mb > 50: # 文件大小不能超过50MB raise FileUploadErrors.file_size_exceeded() # 验证用户标识 if user is None or user not in valid_users: raise FileUploadErrors.invalid_user() # 生成文件MD5ID file_id = md5.md5_id(content) created_at = int(time.time()) # 详细文件信息调试 logger.info(f"=== 文件详细信息 ===") logger.info(f"文件名: {file[0].filename}") logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}") logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}") logger.info(f"文件大小: {file_size_mb} MB") logger.info(f"========================", log_type="upload") logger.info(f"用户标识: {user}") # 确定文件类型 file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else '' if content.startswith(b'%PDF'): file_type = 'pdf' elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']: file_type = 'docx' else: file_type = 'unknown' # 生成任务ID callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}" TraceContext.set_trace_id(callback_task_id) logger.info(f"设置任务trace_id: {callback_task_id}") # 记录文件信息 file_info = { 'file_id': file_id, 'file_content': content, 'user_id': user, 'file_type': file_type, 'callback_task_id': callback_task_id, "file_name": file[0].filename, "file_size": file_size_mb, 'updated_at': created_at } # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用 try: from foundation.utils.redis_utils import store_file_info # 使用file_id作为键存储文件信息(1小时过期) success = await store_file_info(file_id, file_info, 3600) if success: logger.info(f"文件信息已缓存到Redis: file_info:{file_id}") else: logger.warning(f"缓存文件信息到Redis失败") except Exception as e: logger.warning(f"缓存文件信息到Redis失败: {str(e)}") # 不影响主流程,继续处理 # 预注册任务到重复检查器,以便启动审查时验证任务ID try: await duplicatechecker.register_task(file_info, callback_task_id) logger.info(f"任务已预注册: {callback_task_id}") except Exception as e: logger.error(f"任务预注册失败: {str(e)}") # 预注册失败不应影响文件上传成功 # try: # # 提交处理任务到工作流管理器 # await workflow_manager.submit_task_processing(file_info) # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}") return FileUploadResponse( code=200, data={ "id": file_info['file_id'], "name": file_info['file_name'], "size": file_size_mb, "created_at": created_at, "status": "file_upload_success", "callback_task_id": file_info['callback_task_id'] } ) # except Exception as workflow_error: # logger.error(f"工作流提交失败: {str(workflow_error)}") # raise FileUploadErrors.internal_error(workflow_error) except HTTPException: logger.error(f"HTTP异常: {traceback.format_exc()}") raise except Exception as e: logger.error(f"文件上传失败: {str(e)}") logger.error(f"错误详情: {traceback.format_exc()}") raise FileUploadErrors.internal_error(e)