|
|
@@ -15,13 +15,53 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
|
|
from dotenv import load_dotenv
|
|
|
load_dotenv()
|
|
|
|
|
|
-from fastapi import FastAPI, HTTPException, Depends, Request, Response
|
|
|
+from fastapi import FastAPI, HTTPException, Depends, Request, Response, BackgroundTasks
|
|
|
+from fastapi.responses import HTMLResponse, JSONResponse
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
from pydantic import BaseModel
|
|
|
from typing import Optional, Any, Union
|
|
|
import hashlib
|
|
|
import secrets
|
|
|
+import requests
|
|
|
+from urllib.parse import urlparse
|
|
|
+
|
|
|
+# MIME 类型到后缀的映射
|
|
|
+MIME_MAP = {
|
|
|
+ 'application/pdf': '.pdf',
|
|
|
+ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
|
|
|
+ 'application/msword': '.doc',
|
|
|
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
|
|
|
+ 'application/vnd.ms-excel': '.xls',
|
|
|
+ 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
|
|
|
+ 'application/vnd.ms-powerpoint': '.ppt',
|
|
|
+ 'text/markdown': '.md',
|
|
|
+ 'text/plain': '.txt',
|
|
|
+ 'text/html': '.html',
|
|
|
+ 'image/jpeg': '.jpg',
|
|
|
+ 'image/png': '.png',
|
|
|
+ 'application/zip': '.zip',
|
|
|
+}
|
|
|
+
|
|
|
+def detect_file_extension(url: str) -> str:
|
|
|
+ """通过 URL 路径或 HEAD 请求检测文件后缀"""
|
|
|
+ if not url:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 1. 尝试从路径解析
|
|
|
+ path = urlparse(url).path
|
|
|
+ ext = os.path.splitext(path)[1].lower()
|
|
|
+ if ext and len(ext) <= 6:
|
|
|
+ return ext
|
|
|
+
|
|
|
+ # 2. 尝试 HEAD 请求检测 Content-Type
|
|
|
+ try:
|
|
|
+ response = requests.head(url, allow_redirects=True, timeout=5)
|
|
|
+ content_type = response.headers.get('Content-Type', '').split(';')[0].strip()
|
|
|
+ return MIME_MAP.get(content_type, "")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"检测文件后缀失败: {e}")
|
|
|
+ return ""
|
|
|
# 修复JWT导入 - 确保使用正确的JWT库
|
|
|
try:
|
|
|
# 首先尝试使用PyJWT
|
|
|
@@ -95,6 +135,75 @@ TABLE_MAP = {
|
|
|
"job": "t_job_of_preparation" # 办公制度
|
|
|
}
|
|
|
|
|
|
+def get_db_connection():
|
|
|
+ """获取数据库连接"""
|
|
|
+ try:
|
|
|
+ database_url = os.getenv('DATABASE_URL', '')
|
|
|
+ if not database_url:
|
|
|
+ return None
|
|
|
+
|
|
|
+ parsed = urlparse(database_url)
|
|
|
+ config = {
|
|
|
+ 'host': parsed.hostname or 'localhost',
|
|
|
+ 'port': parsed.port or 3306,
|
|
|
+ 'user': parsed.username or 'root',
|
|
|
+ 'password': parsed.password or '',
|
|
|
+ 'database': parsed.path[1:] if parsed.path else 'sso_db',
|
|
|
+ 'charset': 'utf8mb4'
|
|
|
+ }
|
|
|
+
|
|
|
+ return pymysql.connect(**config)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"数据库连接失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+# --- 初始化主表 ---
|
|
|
+def init_master_table():
|
|
|
+ """初始化主表结构,并确保所有必要字段都存在"""
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ cursor = conn.cursor()
|
|
|
+ # 1. 创建主表 (如果不存在)
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS t_document_main (
|
|
|
+ id CHAR(36) PRIMARY KEY,
|
|
|
+ title VARCHAR(255) NOT NULL,
|
|
|
+ standard_no VARCHAR(100),
|
|
|
+ issuing_authority VARCHAR(255),
|
|
|
+ release_date DATE,
|
|
|
+ document_type VARCHAR(100),
|
|
|
+ professional_field VARCHAR(100),
|
|
|
+ validity VARCHAR(50) DEFAULT '现行',
|
|
|
+ created_by VARCHAR(100),
|
|
|
+ created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ updated_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
+ conversion_status TINYINT DEFAULT 0, -- 0:待转化, 1:转化中, 2:已完成, 3:失败
|
|
|
+ conversion_progress INT DEFAULT 0,
|
|
|
+ converted_file_name VARCHAR(255),
|
|
|
+ conversion_error TEXT,
|
|
|
+ whether_to_enter TINYINT DEFAULT 0, -- 0:未入库, 1:已入库
|
|
|
+ source_type ENUM('basis', 'work', 'job') NOT NULL,
|
|
|
+ source_id CHAR(36) NOT NULL,
|
|
|
+ file_url TEXT,
|
|
|
+ file_extension VARCHAR(10),
|
|
|
+ content TEXT,
|
|
|
+ primary_category_id INT,
|
|
|
+ secondary_category_id INT,
|
|
|
+ year INT
|
|
|
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
|
|
+ """)
|
|
|
+ conn.commit()
|
|
|
+ print("✅ 主表 t_document_main 初始化成功")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"❌ 初始化主表失败: {e}")
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+# 执行初始化
|
|
|
+init_master_table()
|
|
|
+
|
|
|
def get_table_name(table_type: Optional[str]) -> str:
|
|
|
"""根据类型获取对应的数据库表名,默认为编制依据"""
|
|
|
return TABLE_MAP.get(table_type, "t_basis_of_preparation")
|
|
|
@@ -105,14 +214,29 @@ class DocumentAdd(BaseModel):
|
|
|
primary_category_id: Optional[Any] = None
|
|
|
secondary_category_id: Optional[Any] = None
|
|
|
year: Optional[int] = None
|
|
|
- table_type: Optional[str] = "basis" # 增加表类型参数
|
|
|
+ table_type: Optional[str] = "basis"
|
|
|
+ # 新增编辑需要的字段
|
|
|
+ id: Optional[str] = None
|
|
|
+ source_id: Optional[str] = None
|
|
|
+ # 扩展字段 (子表特有属性)
|
|
|
+ standard_no: Optional[str] = None
|
|
|
+ issuing_authority: Optional[str] = None
|
|
|
+ release_date: Optional[str] = None
|
|
|
+ document_type: Optional[str] = None
|
|
|
+ professional_field: Optional[str] = None
|
|
|
+ validity: Optional[str] = None
|
|
|
+ project_name: Optional[str] = None
|
|
|
+ project_section: Optional[str] = None
|
|
|
+ # 文件相关字段
|
|
|
+ file_url: Optional[str] = None
|
|
|
+ file_extension: Optional[str] = None
|
|
|
|
|
|
class DocumentListRequest(BaseModel):
|
|
|
- primaryCategoryId: Optional[int] = None
|
|
|
- secondaryCategoryId: Optional[int] = None
|
|
|
page: int = 1
|
|
|
size: int = 50
|
|
|
- sort_by: str = "created_at" # created_at or updated_at
|
|
|
+ keyword: Optional[str] = None
|
|
|
+ table_type: Optional[str] = None
|
|
|
+ whether_to_enter: Optional[int] = None
|
|
|
|
|
|
# 配置
|
|
|
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-jwt-secret-key-12345")
|
|
|
@@ -134,28 +258,6 @@ def find_available_port(start_port=8000, max_port=8010):
|
|
|
return port
|
|
|
return None
|
|
|
|
|
|
-def get_db_connection():
|
|
|
- """获取数据库连接"""
|
|
|
- try:
|
|
|
- database_url = os.getenv('DATABASE_URL', '')
|
|
|
- if not database_url:
|
|
|
- return None
|
|
|
-
|
|
|
- parsed = urlparse(database_url)
|
|
|
- config = {
|
|
|
- 'host': parsed.hostname or 'localhost',
|
|
|
- 'port': parsed.port or 3306,
|
|
|
- 'user': parsed.username or 'root',
|
|
|
- 'password': parsed.password or '',
|
|
|
- 'database': parsed.path[1:] if parsed.path else 'sso_db',
|
|
|
- 'charset': 'utf8mb4'
|
|
|
- }
|
|
|
-
|
|
|
- return pymysql.connect(**config)
|
|
|
- except Exception as e:
|
|
|
- print(f"数据库连接失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
def verify_password_simple(password: str, stored_hash: str) -> bool:
|
|
|
"""验证密码(简化版)"""
|
|
|
if stored_hash.startswith("sha256$"):
|
|
|
@@ -3553,16 +3655,16 @@ import httpx
|
|
|
from fastapi.responses import HTMLResponse
|
|
|
|
|
|
class BatchEnterRequest(BaseModel):
|
|
|
- ids: list[int]
|
|
|
- table_type: Optional[str] = "basis"
|
|
|
+ ids: list[Union[int, str]]
|
|
|
+ table_type: Optional[str] = None
|
|
|
|
|
|
class BatchDeleteRequest(BaseModel):
|
|
|
ids: list[Union[int, str]]
|
|
|
- table_type: Optional[str] = "basis"
|
|
|
+ table_type: Optional[str] = None
|
|
|
|
|
|
class ConvertRequest(BaseModel):
|
|
|
id: Union[int, str]
|
|
|
- table_type: Optional[str] = "basis"
|
|
|
+ table_type: Optional[str] = None
|
|
|
|
|
|
# --- 文档管理中心 API ---
|
|
|
|
|
|
@@ -3640,24 +3742,53 @@ async def batch_enter_knowledge_base(req: BatchEnterRequest, credentials: HTTPAu
|
|
|
return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
- table_name = get_table_name(req.table_type)
|
|
|
- # 批量更新 whether_to_enter 为 1
|
|
|
- # 只更新尚未入库的数据 (whether_to_enter = 0)
|
|
|
+
|
|
|
+ # 1. 批量更新主表 t_document_main
|
|
|
+ # 只更新尚未入库的数据 (whether_to_enter = 0),同时更新修改时间和修改人(如果需要)
|
|
|
placeholders = ', '.join(['%s'] * len(req.ids))
|
|
|
- sql = f"UPDATE {table_name} SET whether_to_enter = 1, updated_at = NOW() WHERE id IN ({placeholders}) AND whether_to_enter = 0"
|
|
|
- cursor.execute(sql, req.ids)
|
|
|
- conn.commit()
|
|
|
+ username = payload.get("username", "admin")
|
|
|
|
|
|
+ # 首先更新主表
|
|
|
+ sql_main = f"""
|
|
|
+ UPDATE t_document_main
|
|
|
+ SET whether_to_enter = 1, updated_time = NOW()
|
|
|
+ WHERE id IN ({placeholders}) AND whether_to_enter = 0
|
|
|
+ """
|
|
|
+ cursor.execute(sql_main, req.ids)
|
|
|
affected_rows = cursor.rowcount
|
|
|
+
|
|
|
+ # 2. 尝试更新对应的子表以保持同步
|
|
|
+ try:
|
|
|
+ # 查询这些 ID 对应的 source_type 和 source_id
|
|
|
+ cursor.execute(f"SELECT id, source_type, source_id FROM t_document_main WHERE id IN ({placeholders})", req.ids)
|
|
|
+ docs = cursor.fetchall()
|
|
|
+
|
|
|
+ for doc_row in docs:
|
|
|
+ d_id, s_type, s_id = doc_row
|
|
|
+ if s_type and s_id:
|
|
|
+ sub_table = get_table_name(s_type)
|
|
|
+ if sub_table:
|
|
|
+ # 更新子表中的 whether_to_enter 字段(如果存在)
|
|
|
+ # 注意:子表中的主键可能是 id 且值为 s_id
|
|
|
+ sub_sql = f"UPDATE {sub_table} SET whether_to_enter = 1, updated_at = NOW(), updated_by = %s WHERE id = %s"
|
|
|
+ try:
|
|
|
+ cursor.execute(sub_sql, (username, s_id))
|
|
|
+ except Exception as sub_e:
|
|
|
+ print(f"更新子表 {sub_table} 失败 (可能字段不存在): {sub_e}")
|
|
|
+ except Exception as sync_e:
|
|
|
+ print(f"同步更新子表失败: {sync_e}")
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
cursor.close()
|
|
|
conn.close()
|
|
|
|
|
|
message = f"成功将 {affected_rows} 条数据加入知识库"
|
|
|
if affected_rows < len(req.ids):
|
|
|
- message += f"(跳过了 {len(req.ids) - affected_rows} 条已入库数据)"
|
|
|
+ message += f"(跳过了 {len(req.ids) - affected_rows} 条已入库数据或未找到数据)"
|
|
|
|
|
|
return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
except Exception as e:
|
|
|
+ print(f"批量操作失败: {e}")
|
|
|
return ApiResponse(code=500, message=f"批量操作失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
@app.post("/api/v1/documents/batch-delete")
|
|
|
@@ -3675,18 +3806,39 @@ async def batch_delete_documents(req: BatchDeleteRequest, credentials: HTTPAutho
|
|
|
return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
- table_name = get_table_name(req.table_type)
|
|
|
|
|
|
if not req.ids:
|
|
|
return ApiResponse(code=400, message="未指定要删除的文档 ID", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
placeholders = ', '.join(['%s'] * len(req.ids))
|
|
|
- sql = f"DELETE FROM {table_name} WHERE id IN ({placeholders})"
|
|
|
- cursor.execute(sql, req.ids)
|
|
|
- conn.commit()
|
|
|
|
|
|
+ # 1. 尝试同步删除子表中的数据
|
|
|
+ try:
|
|
|
+ # 查询这些 ID 对应的 source_type 和 source_id
|
|
|
+ cursor.execute(f"SELECT source_type, source_id FROM t_document_main WHERE id IN ({placeholders})", req.ids)
|
|
|
+ docs = cursor.fetchall()
|
|
|
+
|
|
|
+ for doc_row in docs:
|
|
|
+ s_type, s_id = doc_row
|
|
|
+ if s_type and s_id:
|
|
|
+ sub_table = get_table_name(s_type)
|
|
|
+ if sub_table:
|
|
|
+ # 删除子表数据
|
|
|
+ sub_sql = f"DELETE FROM {sub_table} WHERE id = %s"
|
|
|
+ try:
|
|
|
+ cursor.execute(sub_sql, (s_id,))
|
|
|
+ except Exception as sub_e:
|
|
|
+ print(f"删除子表 {sub_table} 数据失败: {sub_e}")
|
|
|
+ except Exception as sync_e:
|
|
|
+ print(f"同步删除子表数据失败: {sync_e}")
|
|
|
+
|
|
|
+ # 2. 删除主表 t_document_main 中的数据
|
|
|
+ sql_main = f"DELETE FROM t_document_main WHERE id IN ({placeholders})"
|
|
|
+ cursor.execute(sql_main, req.ids)
|
|
|
affected_rows = cursor.rowcount
|
|
|
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
return ApiResponse(
|
|
|
code=0,
|
|
|
message=f"成功删除 {affected_rows} 条文档数据",
|
|
|
@@ -3701,30 +3853,90 @@ async def batch_delete_documents(req: BatchDeleteRequest, credentials: HTTPAutho
|
|
|
if conn:
|
|
|
conn.close()
|
|
|
|
|
|
+async def simulate_conversion(doc_id: str):
|
|
|
+ """模拟文档转换过程"""
|
|
|
+ import time
|
|
|
+ conn = None
|
|
|
+ try:
|
|
|
+ conn = get_db_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ # 1. 模拟开始 (10%)
|
|
|
+ cursor.execute("UPDATE t_document_main SET conversion_status = 1, conversion_progress = 10 WHERE id = %s", (doc_id,))
|
|
|
+ conn.commit()
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ # 2. 模拟进行中 (40%)
|
|
|
+ cursor.execute("UPDATE t_document_main SET conversion_progress = 40 WHERE id = %s", (doc_id,))
|
|
|
+ conn.commit()
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ # 3. 模拟进行中 (75%)
|
|
|
+ cursor.execute("UPDATE t_document_main SET conversion_progress = 75 WHERE id = %s", (doc_id,))
|
|
|
+ conn.commit()
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ # 4. 模拟完成 (100%)
|
|
|
+ cursor.execute("""
|
|
|
+ UPDATE t_document_main
|
|
|
+ SET conversion_status = 2, conversion_progress = 100,
|
|
|
+ converted_file_name = CONCAT(title, '_已转换.pdf')
|
|
|
+ WHERE id = %s
|
|
|
+ """, (doc_id,))
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"模拟转换出错: {e}")
|
|
|
+ if conn:
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("UPDATE t_document_main SET conversion_status = 3, conversion_error = %s WHERE id = %s", (str(e), doc_id))
|
|
|
+ conn.commit()
|
|
|
+ finally:
|
|
|
+ if conn:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
@app.post("/api/v1/documents/convert")
|
|
|
-async def convert_document(req: ConvertRequest, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
- """异步启动文档转换"""
|
|
|
- import subprocess
|
|
|
+async def convert_document(req: ConvertRequest, background_tasks: BackgroundTasks, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
+ """启动文档转换 (支持真实脚本与模拟逻辑)"""
|
|
|
try:
|
|
|
payload = verify_token(credentials.credentials)
|
|
|
if not payload or not payload.get("is_superuser"):
|
|
|
return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
- # 启动后台进程执行转换
|
|
|
- # 脚本位于 d:\UGit\LQAdminPlatform\scripts\miner_u.py
|
|
|
+ table_type = req.table_type
|
|
|
+ # 如果没有提供 table_type,从主表查询
|
|
|
+ if not table_type:
|
|
|
+ try:
|
|
|
+ conn = get_db_connection()
|
|
|
+ if conn:
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT source_type FROM t_document_main WHERE id = %s", (req.id,))
|
|
|
+ res = cursor.fetchone()
|
|
|
+ if res:
|
|
|
+ table_type = res[0]
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"从主表获取 source_type 失败: {e}")
|
|
|
+
|
|
|
+ # 1. 优先尝试启动真实转换脚本
|
|
|
script_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "scripts", "miner_u.py"))
|
|
|
- # 使用当前 python 解释器
|
|
|
- python_exe = sys.executable
|
|
|
-
|
|
|
- # 异步启动,不等待结束
|
|
|
- subprocess.Popen([python_exe, script_path, str(req.table_type), str(req.id)],
|
|
|
- stdout=subprocess.DEVNULL,
|
|
|
- stderr=subprocess.DEVNULL,
|
|
|
- creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0)
|
|
|
+ if os.path.exists(script_path):
|
|
|
+ import subprocess
|
|
|
+ python_exe = sys.executable
|
|
|
+ # 传递 table_type 和 id 给脚本
|
|
|
+ subprocess.Popen([python_exe, script_path, str(table_type or "basis"), str(req.id)],
|
|
|
+ stdout=subprocess.DEVNULL,
|
|
|
+ stderr=subprocess.DEVNULL,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0)
|
|
|
+ return ApiResponse(code=0, message="转换任务已在后台启动", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ # 2. 如果脚本不存在,则启动模拟转换逻辑
|
|
|
+ background_tasks.add_task(simulate_conversion, str(req.id))
|
|
|
|
|
|
return ApiResponse(
|
|
|
code=0,
|
|
|
- message="转换任务已启动",
|
|
|
+ message="转换任务已启动 (模拟模式)",
|
|
|
timestamp=datetime.now(timezone.utc).isoformat()
|
|
|
).model_dump()
|
|
|
except Exception as e:
|
|
|
@@ -3733,155 +3945,469 @@ async def convert_document(req: ConvertRequest, credentials: HTTPAuthorizationCr
|
|
|
|
|
|
@app.post("/api/v1/documents/add")
|
|
|
async def add_document(doc: DocumentAdd, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
- """添加新文档"""
|
|
|
+ """添加新文档 (同步主表和子表)"""
|
|
|
try:
|
|
|
payload = verify_token(credentials.credentials)
|
|
|
- if not payload or not payload.get("is_superuser"):
|
|
|
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ if not payload:
|
|
|
+ return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
+ user_id = payload.get("username", "admin")
|
|
|
conn = get_db_connection()
|
|
|
if not conn:
|
|
|
return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
-
|
|
|
+
|
|
|
cursor = conn.cursor()
|
|
|
- table_name = get_table_name(doc.table_type)
|
|
|
- # 修正列名:reference_basis -> reference_basis_list
|
|
|
- sql = f"""
|
|
|
- INSERT INTO {table_name}
|
|
|
- (chinese_name, reference_basis_list, document_type, professional_field, release_date, created_at, updated_at)
|
|
|
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
|
|
|
- """
|
|
|
- # 构造日期:如果是年份,转为 YYYY-01-01
|
|
|
- release_date = f"{doc.year}-01-01" if doc.year else None
|
|
|
-
|
|
|
- cursor.execute(sql, (doc.title, doc.content, str(doc.primary_category_id) if doc.primary_category_id else None,
|
|
|
- str(doc.secondary_category_id) if doc.secondary_category_id else None, release_date))
|
|
|
- conn.commit()
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
+ doc_id = str(uuid.uuid4())
|
|
|
+ source_id = str(uuid.uuid4())
|
|
|
+ table_name = TABLE_MAP.get(doc.table_type, "t_basis_of_preparation")
|
|
|
|
|
|
- return ApiResponse(code=0, message="文档添加成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ try:
|
|
|
+ # 1. 插入子表
|
|
|
+ if doc.table_type == 'basis':
|
|
|
+ cursor.execute(
|
|
|
+ f"INSERT INTO {table_name} (id, chinese_name, created_by) VALUES (%s, %s, %s)",
|
|
|
+ (source_id, doc.title, user_id)
|
|
|
+ )
|
|
|
+ elif doc.table_type == 'work':
|
|
|
+ cursor.execute(
|
|
|
+ f"INSERT INTO {table_name} (id, plan_name, created_by) VALUES (%s, %s, %s)",
|
|
|
+ (source_id, doc.title, user_id)
|
|
|
+ )
|
|
|
+ elif doc.table_type == 'job':
|
|
|
+ cursor.execute(
|
|
|
+ f"INSERT INTO {table_name} (id, file_name, created_by) VALUES (%s, %s, %s)",
|
|
|
+ (source_id, doc.title, user_id)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 2. 插入主表
|
|
|
+ cursor.execute("""
|
|
|
+ INSERT INTO t_document_main
|
|
|
+ (id, title, content, created_by, source_type, source_id, whether_to_enter, primary_category_id, secondary_category_id, year, file_url, file_extension)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """, (doc_id, doc.title, doc.content, user_id, doc.table_type, source_id, 0, doc.primary_category_id, doc.secondary_category_id, doc.year, doc.file_url, doc.file_extension))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ return ApiResponse(code=0, message="文档添加成功", data={"id": doc_id}, timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ except Exception as e:
|
|
|
+ conn.rollback()
|
|
|
+ raise e
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
except Exception as e:
|
|
|
- print(f"添加文档错误: {e}")
|
|
|
- return ApiResponse(code=500, message=f"服务器内部错误: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ print(f"添加文档失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+@app.get("/api/v1/documents/detail/{doc_id}")
|
|
|
+async def get_document_detail(doc_id: str, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
+ """获取文档详情 (关联查询子表)"""
|
|
|
+ print(f"🔍 正在获取文档详情: {doc_id}")
|
|
|
+ try:
|
|
|
+ payload = verify_token(credentials.credentials)
|
|
|
+ if not payload:
|
|
|
+ return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # 1. 查询主表
|
|
|
+ cursor.execute("SELECT * FROM t_document_main WHERE id = %s", (doc_id,))
|
|
|
+ main_row = cursor.fetchone()
|
|
|
+ if not main_row:
|
|
|
+ print(f"❌ 文档不存在: {doc_id}")
|
|
|
+ return ApiResponse(code=404, message="文档不存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ columns = [desc[0] for desc in cursor.description]
|
|
|
+ doc = dict(zip(columns, main_row))
|
|
|
+ print(f"✅ 找到主表数据: {doc.get('title')}")
|
|
|
+
|
|
|
+ # 2. 查询子表
|
|
|
+ source_type = doc.get('source_type')
|
|
|
+ source_id = doc.get('source_id')
|
|
|
+ table_name = TABLE_MAP.get(source_type)
|
|
|
+
|
|
|
+ if table_name and source_id:
|
|
|
+ cursor.execute(f"SELECT * FROM {table_name} WHERE id = %s", (source_id,))
|
|
|
+ sub_row = cursor.fetchone()
|
|
|
+ if sub_row:
|
|
|
+ sub_columns = [desc[0] for desc in cursor.description]
|
|
|
+ sub_data = dict(zip(sub_columns, sub_row))
|
|
|
+
|
|
|
+ # 将子表字段映射到通用字段名,方便前端处理
|
|
|
+ if source_type == 'basis':
|
|
|
+ doc['standard_no'] = sub_data.get('standard_number')
|
|
|
+ doc['issuing_authority'] = sub_data.get('issuing_authority')
|
|
|
+ doc['release_date'] = str(sub_data.get('release_date')) if sub_data.get('release_date') else None
|
|
|
+ doc['document_type'] = sub_data.get('document_type')
|
|
|
+ doc['professional_field'] = sub_data.get('professional_field')
|
|
|
+ doc['validity'] = sub_data.get('validity')
|
|
|
+ elif source_type == 'work':
|
|
|
+ doc['project_name'] = sub_data.get('project_name')
|
|
|
+ doc['project_section'] = sub_data.get('project_section')
|
|
|
+ doc['issuing_authority'] = sub_data.get('compiling_unit')
|
|
|
+ doc['release_date'] = str(sub_data.get('compiling_date')) if sub_data.get('compiling_date') else None
|
|
|
+ elif source_type == 'job':
|
|
|
+ doc['issuing_authority'] = sub_data.get('issuing_department')
|
|
|
+ doc['document_type'] = sub_data.get('document_type')
|
|
|
+ doc['release_date'] = str(sub_data.get('publish_date')) if sub_data.get('publish_date') else None
|
|
|
+
|
|
|
+ # 格式化主表时间
|
|
|
+ if doc.get('created_time'):
|
|
|
+ doc['created_time'] = doc['created_time'].isoformat()
|
|
|
+ if doc.get('updated_time'):
|
|
|
+ doc['updated_time'] = doc['updated_time'].isoformat()
|
|
|
+ if doc.get('release_date') and not isinstance(doc['release_date'], str):
|
|
|
+ doc['release_date'] = doc['release_date'].isoformat()
|
|
|
+
|
|
|
+ return ApiResponse(code=0, message="获取详情成功", data=doc, timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取文档详情失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
@app.get("/api/v1/documents/list")
|
|
|
async def get_document_list(
|
|
|
- primaryCategoryId: Optional[str] = None,
|
|
|
- secondaryCategoryId: Optional[str] = None,
|
|
|
- year: Optional[int] = None,
|
|
|
whether_to_enter: Optional[int] = None,
|
|
|
keyword: Optional[str] = None,
|
|
|
- table_type: Optional[str] = "basis",
|
|
|
+ table_type: Optional[str] = None,
|
|
|
page: int = 1,
|
|
|
size: int = 50,
|
|
|
- sort_by: str = "created_at",
|
|
|
credentials: HTTPAuthorizationCredentials = Depends(security)
|
|
|
):
|
|
|
- """获取文档列表(支持过滤与搜索)"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
+ """获取文档列表 (从主表查询)"""
|
|
|
try:
|
|
|
payload = verify_token(credentials.credentials)
|
|
|
- if not payload or not payload.get("is_superuser"):
|
|
|
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ if not payload:
|
|
|
+ return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
if not conn:
|
|
|
return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
-
|
|
|
+
|
|
|
cursor = conn.cursor()
|
|
|
- table_name = get_table_name(table_type)
|
|
|
-
|
|
|
where_clauses = []
|
|
|
params = []
|
|
|
|
|
|
- if primaryCategoryId:
|
|
|
- where_clauses.append("document_type = %s")
|
|
|
- params.append(primaryCategoryId)
|
|
|
- if secondaryCategoryId:
|
|
|
- where_clauses.append("professional_field = %s")
|
|
|
- params.append(secondaryCategoryId)
|
|
|
- if year:
|
|
|
- where_clauses.append("YEAR(release_date) = %s")
|
|
|
- params.append(year)
|
|
|
+ if table_type:
|
|
|
+ where_clauses.append("source_type = %s")
|
|
|
+ params.append(table_type)
|
|
|
if whether_to_enter is not None:
|
|
|
- where_clauses.append("CAST(whether_to_enter AS UNSIGNED) = %s")
|
|
|
+ where_clauses.append("whether_to_enter = %s")
|
|
|
params.append(whether_to_enter)
|
|
|
if keyword:
|
|
|
- where_clauses.append("(chinese_name LIKE %s OR reference_basis_list LIKE %s OR standard_no LIKE %s)")
|
|
|
- like_keyword = f"%{keyword}%"
|
|
|
- params.extend([like_keyword, like_keyword, like_keyword])
|
|
|
+ where_clauses.append("(title LIKE %s OR content LIKE %s)")
|
|
|
+ params.extend([f"%{keyword}%", f"%{keyword}%"])
|
|
|
|
|
|
- where_stmt = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
|
|
|
-
|
|
|
- # 排序逻辑:按创建时间倒序
|
|
|
- sort_field = "created_at" if sort_by == "created_at" else "updated_at"
|
|
|
- order_by = f"ORDER BY {sort_field} DESC"
|
|
|
-
|
|
|
- # 分页
|
|
|
+ where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
|
|
|
offset = (page - 1) * size
|
|
|
|
|
|
- # 返回更多字段
|
|
|
- sql = f"""
|
|
|
- SELECT id, chinese_name as title, reference_basis_list as content,
|
|
|
- document_type, professional_field,
|
|
|
- YEAR(release_date) as year, release_date, standard_no, status,
|
|
|
- CAST(whether_to_enter AS UNSIGNED) as whether_to_enter, file_url,
|
|
|
- conversion_status, conversion_progress, conversion_error,
|
|
|
- created_at, updated_at
|
|
|
- FROM {table_name} {where_stmt}
|
|
|
- {order_by} LIMIT %s OFFSET %s
|
|
|
- """
|
|
|
+ sql = f"SELECT * FROM t_document_main {where_sql} ORDER BY created_time DESC LIMIT %s OFFSET %s"
|
|
|
params.extend([size, offset])
|
|
|
|
|
|
- cursor.execute(sql, params)
|
|
|
- columns = [col[0] for col in cursor.description]
|
|
|
- items = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
|
-
|
|
|
- # 格式化时间
|
|
|
- for item in items:
|
|
|
- for key, value in item.items():
|
|
|
- if isinstance(value, (datetime, date)):
|
|
|
- item[key] = value.isoformat()
|
|
|
-
|
|
|
- # 获取总数
|
|
|
- count_sql = f"SELECT COUNT(*) FROM {table_name} {where_stmt}"
|
|
|
- cursor.execute(count_sql, params[:-2])
|
|
|
+ cursor.execute(sql, tuple(params))
|
|
|
+ columns = [desc[0] for desc in cursor.description]
|
|
|
+ items = []
|
|
|
+ for row in cursor.fetchall():
|
|
|
+ item = dict(zip(columns, row))
|
|
|
+ # 格式化时间
|
|
|
+ for key in ['created_time', 'updated_time', 'release_date']:
|
|
|
+ if item.get(key) and hasattr(item[key], 'isoformat'):
|
|
|
+ item[key] = item[key].isoformat()
|
|
|
+ items.append(item)
|
|
|
+
|
|
|
+ # 总数
|
|
|
+ count_sql = f"SELECT COUNT(*) FROM t_document_main {where_sql}"
|
|
|
+ cursor.execute(count_sql, tuple(params[:-2]))
|
|
|
total = cursor.fetchone()[0]
|
|
|
|
|
|
- # 优化统计查询:合并全局总数和已入库总数的查询,减少数据库交互
|
|
|
- stats_sql = f"SELECT COUNT(*), SUM(CASE WHEN CAST(whether_to_enter AS UNSIGNED) = 1 THEN 1 ELSE 0 END) FROM {table_name}"
|
|
|
- cursor.execute(stats_sql)
|
|
|
- stats_result = cursor.fetchone()
|
|
|
+ # 统计数据
|
|
|
+ cursor.execute("SELECT COUNT(*) FROM t_document_main")
|
|
|
+ all_total = cursor.fetchone()[0]
|
|
|
+ cursor.execute("SELECT COUNT(*) FROM t_document_main WHERE whether_to_enter = 1")
|
|
|
+ total_entered = cursor.fetchone()[0]
|
|
|
|
|
|
- all_total = 0
|
|
|
- total_entered = 0
|
|
|
- if stats_result:
|
|
|
- all_total = stats_result[0] or 0
|
|
|
- total_entered = int(stats_result[1] or 0)
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
|
|
|
return ApiResponse(
|
|
|
- code=0,
|
|
|
- message="获取成功",
|
|
|
+ code=0,
|
|
|
+ message="查询成功",
|
|
|
data={
|
|
|
"items": items,
|
|
|
"total": total,
|
|
|
- "all_total": all_total,
|
|
|
- "total_entered": total_entered,
|
|
|
"page": page,
|
|
|
- "size": size
|
|
|
+ "size": size,
|
|
|
+ "all_total": all_total,
|
|
|
+ "total_entered": total_entered
|
|
|
},
|
|
|
timestamp=datetime.now(timezone.utc).isoformat()
|
|
|
).model_dump()
|
|
|
except Exception as e:
|
|
|
- print(f"获取文档列表错误: {e}")
|
|
|
- return ApiResponse(code=500, message=f"服务器内部错误: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
+ print(f"获取文档列表失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+@app.post("/api/v1/documents/edit")
|
|
|
+async def edit_document(doc: DocumentAdd, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
+ """编辑文档 (同步主表和子表)"""
|
|
|
+ try:
|
|
|
+ payload = verify_token(credentials.credentials)
|
|
|
+ if not payload:
|
|
|
+ return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ if not doc.id or not doc.source_id:
|
|
|
+ return ApiResponse(code=400, message="缺少ID参数", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+ table_name = TABLE_MAP.get(doc.table_type, "t_basis_of_preparation")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 更新子表内容
|
|
|
+ if doc.table_type == 'basis':
|
|
|
+ cursor.execute(f"""
|
|
|
+ UPDATE {table_name}
|
|
|
+ SET chinese_name = %s, standard_number = %s, issuing_authority = %s,
|
|
|
+ release_date = %s, document_type = %s, professional_field = %s, validity = %s
|
|
|
+ WHERE id = %s
|
|
|
+ """, (doc.title, doc.standard_no, doc.issuing_authority, doc.release_date,
|
|
|
+ doc.document_type, doc.professional_field, doc.validity, doc.source_id))
|
|
|
+ elif doc.table_type == 'work':
|
|
|
+ cursor.execute(f"""
|
|
|
+ UPDATE {table_name}
|
|
|
+ SET plan_name = %s, project_name = %s, project_section = %s,
|
|
|
+ compiling_unit = %s, compiling_date = %s
|
|
|
+ WHERE id = %s
|
|
|
+ """, (doc.title, doc.project_name, doc.project_section, doc.issuing_authority,
|
|
|
+ doc.release_date, doc.source_id))
|
|
|
+ elif doc.table_type == 'job':
|
|
|
+ cursor.execute(f"""
|
|
|
+ UPDATE {table_name}
|
|
|
+ SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s
|
|
|
+ WHERE id = %s
|
|
|
+ """, (doc.title, doc.issuing_authority, doc.document_type, doc.release_date, doc.source_id))
|
|
|
+
|
|
|
+ # 2. 更新主表内容
|
|
|
+ cursor.execute("""
|
|
|
+ UPDATE t_document_main
|
|
|
+ SET title = %s, content = %s, updated_time = NOW(),
|
|
|
+ primary_category_id = %s, secondary_category_id = %s, year = %s,
|
|
|
+ file_url = %s, file_extension = %s
|
|
|
+ WHERE id = %s
|
|
|
+ """, (doc.title, doc.content, doc.primary_category_id, doc.secondary_category_id, doc.year,
|
|
|
+ doc.file_url, doc.file_extension, doc.id))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ return ApiResponse(code=0, message="文档更新成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ except Exception as e:
|
|
|
+ conn.rollback()
|
|
|
+ raise e
|
|
|
+ finally:
|
|
|
cursor.close()
|
|
|
- if conn:
|
|
|
conn.close()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"编辑文档失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+@app.post("/api/v1/documents/enter")
|
|
|
+async def enter_document(data: dict, credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
|
+ """文档入库"""
|
|
|
+ try:
|
|
|
+ doc_id = data.get("id")
|
|
|
+ if not doc_id:
|
|
|
+ return ApiResponse(code=400, message="缺少ID", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ payload = verify_token(credentials.credentials)
|
|
|
+ username = payload.get("username", "admin") if payload else "admin"
|
|
|
+
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ # 1. 更新主表
|
|
|
+ cursor.execute("UPDATE t_document_main SET whether_to_enter = 1, updated_time = NOW() WHERE id = %s", (doc_id,))
|
|
|
+
|
|
|
+ # 2. 尝试同步更新子表
|
|
|
+ try:
|
|
|
+ cursor.execute("SELECT source_type, source_id FROM t_document_main WHERE id = %s", (doc_id,))
|
|
|
+ res = cursor.fetchone()
|
|
|
+ if res and res[0] and res[1]:
|
|
|
+ s_type, s_id = res
|
|
|
+ sub_table = get_table_name(s_type)
|
|
|
+ if sub_table:
|
|
|
+ sub_sql = f"UPDATE {sub_table} SET whether_to_enter = 1, updated_at = NOW(), updated_by = %s WHERE id = %s"
|
|
|
+ try:
|
|
|
+ cursor.execute(sub_sql, (username, s_id))
|
|
|
+ except Exception as sub_e:
|
|
|
+ print(f"入库同步子表 {sub_table} 失败: {sub_e}")
|
|
|
+ except Exception as sync_e:
|
|
|
+ print(f"入库同步子表异常: {sync_e}")
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+ return ApiResponse(code=0, message="入库成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"入库失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+@app.get("/api/v1/basic-info/list")
|
|
|
+async def get_basic_info_list(
|
|
|
+ type: str,
|
|
|
+ page: int = 1,
|
|
|
+ size: int = 50,
|
|
|
+ keyword: Optional[str] = None,
|
|
|
+ title: Optional[str] = None,
|
|
|
+ standard_no: Optional[str] = None,
|
|
|
+ document_type: Optional[str] = None,
|
|
|
+ professional_field: Optional[str] = None,
|
|
|
+ validity: Optional[str] = None,
|
|
|
+ issuing_authority: Optional[str] = None,
|
|
|
+ release_date_start: Optional[str] = None,
|
|
|
+ release_date_end: Optional[str] = None,
|
|
|
+ credentials: HTTPAuthorizationCredentials = Depends(security)
|
|
|
+):
|
|
|
+ """获取基本信息列表 (支持多条件检索)"""
|
|
|
+ try:
|
|
|
+ payload = verify_token(credentials.credentials)
|
|
|
+ if not payload:
|
|
|
+ return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ # 根据类型选择表名和字段映射
|
|
|
+ if type == 'basis':
|
|
|
+ table_name = "t_basis_of_preparation"
|
|
|
+ fields = "id, chinese_name as title, standard_number as standard_no, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time as created_at"
|
|
|
+ # 字段名映射供过滤使用
|
|
|
+ field_map = {
|
|
|
+ 'title': 'chinese_name',
|
|
|
+ 'standard_no': 'standard_number',
|
|
|
+ 'issuing_authority': 'issuing_authority',
|
|
|
+ 'release_date': 'release_date',
|
|
|
+ 'document_type': 'document_type',
|
|
|
+ 'professional_field': 'professional_field',
|
|
|
+ 'validity': 'validity'
|
|
|
+ }
|
|
|
+ elif type == 'work':
|
|
|
+ table_name = "t_work_of_preparation"
|
|
|
+ fields = "id, plan_name as title, NULL as standard_no, compiling_unit as issuing_authority, compiling_date as release_date, NULL as document_type, NULL as professional_field, NULL as validity, created_by, created_time as created_at"
|
|
|
+ field_map = {
|
|
|
+ 'title': 'plan_name',
|
|
|
+ 'issuing_authority': 'compiling_unit',
|
|
|
+ 'release_date': 'compiling_date'
|
|
|
+ }
|
|
|
+ elif type == 'job':
|
|
|
+ table_name = "t_job_of_preparation"
|
|
|
+ fields = "id, file_name as title, NULL as standard_no, issuing_department as issuing_authority, publish_date as release_date, document_type, NULL as professional_field, NULL as validity, created_by, created_time as created_at"
|
|
|
+ field_map = {
|
|
|
+ 'title': 'file_name',
|
|
|
+ 'issuing_authority': 'issuing_department',
|
|
|
+ 'release_date': 'publish_date',
|
|
|
+ 'document_type': 'document_type'
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ return ApiResponse(code=400, message="无效的类型", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
+
|
|
|
+ where_clauses = []
|
|
|
+ params = []
|
|
|
+
|
|
|
+ # 1. 统一关键字搜索 (保持兼容)
|
|
|
+ if keyword:
|
|
|
+ if type == 'basis':
|
|
|
+ where_clauses.append("(chinese_name LIKE %s OR standard_number LIKE %s)")
|
|
|
+ params.extend([f"%{keyword}%", f"%{keyword}%"])
|
|
|
+ elif type == 'work':
|
|
|
+ where_clauses.append("plan_name LIKE %s")
|
|
|
+ params.append(f"%{keyword}%")
|
|
|
+ elif type == 'job':
|
|
|
+ where_clauses.append("file_name LIKE %s")
|
|
|
+ params.append(f"%{keyword}%")
|
|
|
+
|
|
|
+ # 2. 精细化检索
|
|
|
+ if title and 'title' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['title']} LIKE %s")
|
|
|
+ params.append(f"%{title}%")
|
|
|
+
|
|
|
+ if standard_no and 'standard_no' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['standard_no']} LIKE %s")
|
|
|
+ params.append(f"%{standard_no}%")
|
|
|
+
|
|
|
+ if document_type and 'document_type' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['document_type']} = %s")
|
|
|
+ params.append(document_type)
|
|
|
+
|
|
|
+ if professional_field and 'professional_field' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['professional_field']} = %s")
|
|
|
+ params.append(professional_field)
|
|
|
+
|
|
|
+ if validity and 'validity' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['validity']} = %s")
|
|
|
+ params.append(validity)
|
|
|
+
|
|
|
+ if issuing_authority and 'issuing_authority' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['issuing_authority']} LIKE %s")
|
|
|
+ params.append(f"%{issuing_authority}%")
|
|
|
+
|
|
|
+ if release_date_start and 'release_date' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['release_date']} >= %s")
|
|
|
+ params.append(release_date_start)
|
|
|
+
|
|
|
+ if release_date_end and 'release_date' in field_map:
|
|
|
+ where_clauses.append(f"{field_map['release_date']} <= %s")
|
|
|
+ params.append(release_date_end)
|
|
|
+
|
|
|
+ where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
|
|
|
+
|
|
|
+ # 分页查询
|
|
|
+ offset = (page - 1) * size
|
|
|
+ sql = f"SELECT {fields} FROM {table_name}{where_sql} ORDER BY created_at DESC LIMIT %s OFFSET %s"
|
|
|
+ params.extend([size, offset])
|
|
|
+
|
|
|
+ cursor.execute(sql, tuple(params))
|
|
|
+ columns = [desc[0] for desc in cursor.description]
|
|
|
+ items = []
|
|
|
+ for row in cursor.fetchall():
|
|
|
+ item = dict(zip(columns, row))
|
|
|
+ # 格式化日期
|
|
|
+ for key in ['release_date', 'created_at']:
|
|
|
+ if item.get(key) and hasattr(item[key], 'isoformat'):
|
|
|
+ item[key] = item[key].isoformat()
|
|
|
+ elif item.get(key):
|
|
|
+ item[key] = str(item[key])
|
|
|
+ items.append(item)
|
|
|
+
|
|
|
+ # 总数查询
|
|
|
+ count_sql = f"SELECT COUNT(*) FROM {table_name}{where_sql}"
|
|
|
+ cursor.execute(count_sql, tuple(params[:-2]))
|
|
|
+ total = cursor.fetchone()[0]
|
|
|
+
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ return ApiResponse(
|
|
|
+ code=0,
|
|
|
+ message="查询成功",
|
|
|
+ data={"items": items, "total": total, "page": page, "size": size},
|
|
|
+ timestamp=datetime.now(timezone.utc).isoformat()
|
|
|
+ ).model_dump()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"查询基本信息失败: {e}")
|
|
|
+ return ApiResponse(code=500, message=f"服务器内部错误: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
|
|
|
|
|
|
@app.get("/api/v1/documents/categories/primary")
|
|
|
async def get_primary_categories(credentials: HTTPAuthorizationCredentials = Depends(security)):
|