""" 文档上传接口实现 模拟文件上传功能,返回文件ID和回调任务ID """ import ast import traceback import uuid import time from datetime import datetime from pydantic import BaseModel from typing import Optional,List from foundation.utils import md5 from foundation.base.config import config_handler from .schemas.error_schemas import FileUploadErrors from core.base.workflow_manager import WorkflowManager from foundation.logger.loggering import server_logger as logger from fastapi import APIRouter, UploadFile, File, Form, HTTPException from core.base.redis_duplicate_checker import RedisDuplicateChecker from foundation.trace.trace_context import TraceContext, auto_trace file_upload_router = APIRouter(prefix="/sgsc", tags=["文档上传"]) uploaded_files = {} # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) # 使用workflow_manager的duplicatechecker实例,确保一致性 duplicatechecker = workflow_manager.redis_duplicate_checker class FileUploadResponse(BaseModel): code: int data: dict def get_file_size(file: UploadFile) -> int: """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)""" try: content = file.file.read() size = len(content) file.file.seek(0) size_mb = size / (1024 * 1024) return size,round(size_mb, 2) except Exception as e: logger.warning(f"获取文件大小失败: {str(e)}") return 0, 0.0 def validate_file(file: UploadFile, file_content: bytes = None) -> None: """验证文件格式""" file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else '' # 检测文件类型 actual_file_type = 'unknown' if file_content.startswith(b'%PDF'): actual_file_type = 'pdf' elif file_content.startswith(b'PK\x03\x04'): if file_extension in ['.docx'] or file_extension in ['.doc'] : actual_file_type = 'docx/doc' else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() else: logger.warning(f"未知文件类型,: {file_content[:20]}") raise FileUploadErrors.file_format_unsupported() logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})") @file_upload_router.post("/file_upload", response_model=FileUploadResponse) @auto_trace(generate_if_missing=True) # 不查找参数,直接生成初始trace_id async def file_upload( file: List[UploadFile] = File([]), callback_url: str = Form(None), project_plan_type: str = Form(None), user: str = Form(None) ): """ 文件上传接口 """ try: # 验证工程方案类型 valid_project_types = { 'bridge_up_part', # 桥梁上部结构 'tunnel_construction', # 隧道施工 'road_repair' # 道路维修 } valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS")) # 验证文件上传 if not file or len(file) == 0: raise FileUploadErrors.file_missing() elif not file[0].filename: raise FileUploadErrors.file_missing() elif len(file) > 1: logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}", log_type="upload", trace_id=f"upload-{int(time.time())}") raise FileUploadErrors.file_multiple() # 验证文件数量 if file and len(file) > 0: try: content = file[0].file.read() file[0].file.seek(0) # 重置文件指针 except: content = b"" validate_file(file[0], content) # 验证文件格式 file_size, file_size_mb = get_file_size(file[0]) if file_size == 0: raise FileUploadErrors.file_missing() # 验证文件大小限制 if file_size_mb > 30: # 文件大小不能超过30MB raise FileUploadErrors.file_size_exceeded() # 验证回调地址 if callback_url is '': raise FileUploadErrors.callback_url_missing() # 验证用户标识 if user is None or user not in valid_users: raise FileUploadErrors.invalid_user() # 工程方案类型校验 if project_plan_type not in valid_project_types: raise FileUploadErrors.project_plan_type_invalid() # 生成文件MD5ID file_id = md5.md5_id(content) if await duplicatechecker.is_duplicate_task(file_id): raise FileUploadErrors.task_already_exists() created_at = int(time.time()) # 详细文件信息调试 logger.info(f"=== 文件详细信息 ===") logger.info(f"文件名: {file[0].filename}") logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}") logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}") logger.info(f"文件大小: {file_size_mb} MB") logger.info(f"========================", log_type="upload") logger.info(f"请求参数 - 回调URL: {callback_url}\n, 工程类型: {project_plan_type}", log_type="upload") logger.info(f"用户标识: {user}") # 确定文件类型 file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else '' if content.startswith(b'%PDF'): file_type = 'pdf' elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']: file_type = 'docx' else: file_type = 'unknown' # 生成回调任务ID callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}" #callback_task_id = "d0856b13c5328e732e9c590209554b76-1763369817" # 更新trace_id为正式的callback_task_id TraceContext.set_trace_id(callback_task_id) logger.info(f"更新trace_id为正式callback_task_id: {callback_task_id}") # 记录文件信息 file_info = { 'file_id': file_id, 'file_content': content, 'user_id': user, 'file_type': file_type, 'callback_task_id': callback_task_id, "file_name": file[0].filename, "file_size": file_size_mb, "project_plan_type": project_plan_type, 'updated_at': created_at } try: # 提交处理任务到工作流管理器 await workflow_manager.submit_task_processing(file_info) logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}") return FileUploadResponse( code=200, data={ "id": file_info['file_id'], "name": file_info['file_name'], "size": file_size_mb, "created_at": created_at, "status": "processing", "callback_task_id": file_info['callback_task_id'] } ) except Exception as workflow_error: logger.error(f"工作流提交失败: {str(workflow_error)}") raise FileUploadErrors.internal_error(workflow_error) except HTTPException: logger.error(f"HTTP异常: {traceback.format_exc()}") raise except Exception as e: logger.error(f"文件上传失败: {str(e)}") logger.error(f"错误详情: {traceback.format_exc()}") raise FileUploadErrors.internal_error(e)