| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- """
- 文档上传接口实现
- 模拟文件上传功能,返回文件ID和回调任务ID
- """
- import ast
- import traceback
- import uuid
- import time
- from datetime import datetime
- from pydantic import BaseModel, Field
- from typing import Optional,List
- from foundation.utils import md5
- from foundation.infrastructure.config import config_handler
- from .schemas.error_schemas import FileUploadErrors
- from core.base.workflow_manager import WorkflowManager
- from foundation.observability.logger.loggering import server_logger as logger
- from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
- from core.base.redis_duplicate_checker import RedisDuplicateChecker
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
- uploaded_files = {}
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 使用workflow_manager的duplicatechecker实例,确保一致性
- duplicatechecker = workflow_manager.redis_duplicate_checker
- class FileUploadResponse(BaseModel):
- code: int
- data: dict
- def validate_upload_parameters(form_data) -> None:
- """验证请求参数"""
- allowed_params = {'file', 'user'} # 只允许这两个参数
- # 检查是否有不允许的参数
- extra_params = []
- for key in form_data.keys():
- if key not in allowed_params:
- extra_params.append(key)
- if extra_params:
- logger.warning(f"检测到不支持的参数: {extra_params}")
- raise FileUploadErrors.invalid_parameters(extra_params)
- def get_file_size(file: UploadFile) -> int:
- """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)"""
- try:
- content = file.file.read()
- size = len(content)
- file.file.seek(0)
- size_mb = size / (1024 * 1024)
- return size,round(size_mb, 2)
- except Exception as e:
- logger.warning(f"获取文件大小失败: {str(e)}")
- return 0, 0.0
- def validate_file(file: UploadFile, file_content: bytes = None) -> None:
- """验证文件格式"""
- file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else ''
- # 检测文件类型
- actual_file_type = 'unknown'
- if file_content.startswith(b'%PDF'):
- actual_file_type = 'pdf'
- elif file_content.startswith(b'PK\x03\x04'):
- if file_extension in ['.docx'] or file_extension in ['.doc'] :
- actual_file_type = 'docx/doc'
- else:
- logger.warning(f"未知文件类型,: {file_content[:20]}")
- raise FileUploadErrors.file_format_unsupported()
- else:
- logger.warning(f"未知文件类型,: {file_content[:20]}")
- raise FileUploadErrors.file_format_unsupported()
- logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})")
- @file_upload_router.post("/file_upload",response_model=FileUploadResponse)
- @auto_trace(generate_if_missing=True) # 由于使用@auto_trace需要输入callback_task_id,但此时callback_task_id还未产生,所以暂时用初始trace_id替代
- async def file_upload(
- request: Request,
- file: List[UploadFile] = File([]),
- user: str = Form(None)
- ):
- """
- 文件上传接口
- """
- try:
- # 验证请求参数
- form_data = await request.form()
- validate_upload_parameters(form_data)
- valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS"))
-
- # 验证文件上传
- if not file or len(file) == 0:
- raise FileUploadErrors.file_missing()
- elif not file[0].filename:
- raise FileUploadErrors.file_missing()
- elif len(file) > 1:
- logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
- log_type="upload", trace_id=f"upload-{int(time.time())}")
- raise FileUploadErrors.file_multiple()
-
- # 验证文件数量
- if file and len(file) > 0:
- try:
- content = file[0].file.read()
- file[0].file.seek(0) # 重置文件指针
- except:
- content = b""
- validate_file(file[0], content)
- # 验证文件格式
- file_size, file_size_mb = get_file_size(file[0])
- if file_size == 0:
- raise FileUploadErrors.file_missing()
- # 验证文件大小限制
- if file_size_mb > 50: # 文件大小不能超过50MB
- raise FileUploadErrors.file_size_exceeded()
- # 验证用户标识
- if user is None or user not in valid_users:
- raise FileUploadErrors.invalid_user()
- # 生成文件MD5ID
- file_id = md5.md5_id(content)
- created_at = int(time.time())
- # 详细文件信息调试
- logger.info(f"=== 文件详细信息 ===")
- logger.info(f"文件名: {file[0].filename}")
- logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}")
- logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}")
- logger.info(f"文件大小: {file_size_mb} MB")
- logger.info(f"========================", log_type="upload")
- logger.info(f"用户标识: {user}")
- # 确定文件类型
- file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else ''
- if content.startswith(b'%PDF'):
- file_type = 'pdf'
- elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']:
- file_type = 'docx'
- else:
- file_type = 'unknown'
- # 生成任务ID
- callback_task_id = f"{file_id}-{int(datetime.now().timestamp())}"
- TraceContext.set_trace_id(callback_task_id)
- logger.info(f"设置任务trace_id: {callback_task_id}")
-
- # 记录文件信息
- file_info = {
- 'file_id': file_id,
- 'file_content': content,
- 'user_id': user,
- 'file_type': file_type,
- 'callback_task_id': callback_task_id,
- "file_name": file[0].filename,
- "file_size": file_size_mb,
- 'updated_at': created_at
- }
- # 存储文件信息到Redis缓存,以file_id为键,供启动审查接口使用
- try:
- from foundation.utils.redis_utils import store_file_info
- # 使用file_id作为键存储文件信息(1小时过期)
- success = await store_file_info(file_id, file_info, 3600)
- if success:
- logger.info(f"文件信息已缓存到Redis: file_info:{file_id}")
- else:
- logger.warning(f"缓存文件信息到Redis失败")
- except Exception as e:
- logger.warning(f"缓存文件信息到Redis失败: {str(e)}")
- # 不影响主流程,继续处理
- # 预注册任务到重复检查器,以便启动审查时验证任务ID
- try:
- await duplicatechecker.register_task(file_info, callback_task_id)
- logger.info(f"任务已预注册: {callback_task_id}")
- except Exception as e:
- logger.error(f"任务预注册失败: {str(e)}")
- # 预注册失败不应影响文件上传成功
- # try:
- # # 提交处理任务到工作流管理器
- # await workflow_manager.submit_task_processing(file_info)
- # logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}")
- return FileUploadResponse(
- code=200,
- data={
- "id": file_info['file_id'],
- "name": file_info['file_name'],
- "size": file_size_mb,
- "created_at": created_at,
- "status": "file_upload_success",
- "callback_task_id": file_info['callback_task_id']
- }
- )
- # except Exception as workflow_error:
- # logger.error(f"工作流提交失败: {str(workflow_error)}")
- # raise FileUploadErrors.internal_error(workflow_error)
- except HTTPException:
- logger.error(f"HTTP异常: {traceback.format_exc()}")
- raise
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- logger.error(f"错误详情: {traceback.format_exc()}")
- raise FileUploadErrors.internal_error(e)
|