ソースを参照

解决config.ini 冲突

WangXuMing 3 ヶ月 前
コミット
e945f23065

+ 41 - 0
README.md

@@ -17,6 +17,9 @@
 
 
 
+### 增加组件依赖
+  pip install aiomysql -i https://mirrors.aliyun.com/pypi/simple/
+
 
 ### 测试接口
 
@@ -78,3 +81,41 @@ curl -X POST "http://localhost:8001/test/agent/stream" \
           },
           "input": "查询信息"
         }
+
+
+### MySQL 数据库操作测试
+  - 新增
+        http://localhost:8001/test/mysql/add
+        {
+          "config": {
+              "session_id":"10002"
+          },
+          "input": "张三"
+        }
+    - 查询列表
+      http://localhost:8001/test/mysql/list
+       {
+        "config": {
+            "session_id":"10002"
+        },
+        "input": "张三"
+      }
+
+     - 查询单个
+      http://localhost:8001/test/mysql/get
+       {
+        "config": {
+            "session_id":"10002"
+        },
+        "input": "4"
+      }
+
+      - 修改
+        http://localhost:8001/test/mysql/update
+        {
+          "config": {
+              "session_id":"1"
+          },
+          "input": "李四"
+        }
+      

BIN
build_graph_app.png


+ 11 - 0
config/config.ini

@@ -57,6 +57,7 @@ CONSOLE_OUTPUT=True
 [user_lists]
 USERS=['user-001']
 
+
 [siliconflow]
 SLCF_MODEL_SERVER_URL=https://api.siliconflow.cn/v1
 SLCF_API_KEY=sk-npqfinszhdvnwvensnjmlqtihgevehqiyfwunedxnefkmrud
@@ -75,3 +76,13 @@ QWEN_LOCAL_14B_SERVER_URL=http://172.16.35.50:8003/v1
 QWEN_LOCAL_14B_MODEL_ID=Qwen3-14B
 QWEN_LOCAL_14B_API_KEY=sk-dummy
 
+
+[mysql]
+MYSQL_HOST=localhost
+MYSQL_PORT=3306
+MYSQL_USER=root
+MYSQL_PASSWORD=admin
+MYSQL_DB=lq_db
+MYSQL_MIN_SIZE=1
+MYSQL_MAX_SIZE=2
+MYSQL_AUTO_COMMIT=True

+ 59 - 0
config/sql/test.sql

@@ -0,0 +1,59 @@
+
+
+
+ -- 测试信息表
+
+CREATE TABLE IF NOT EXISTS test_tab (
+    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '用户唯一标识符',
+    name VARCHAR(100) NOT NULL COMMENT '用户姓名',
+    email VARCHAR(100) UNIQUE NOT NULL COMMENT '用户邮箱,唯一',
+    age INT COMMENT '用户年龄',
+    status ENUM('active', 'inactive') DEFAULT 'active' COMMENT '用户状态:active-活跃, inactive-非活跃',
+    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
+    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',
+    INDEX idx_email (email) COMMENT '邮箱索引,用于快速查找',
+    INDEX idx_status (status) COMMENT '状态索引,用于按状态筛选'
+) COMMENT='用户信息表';
+
+
+
+
+
+
+
+### MySQL 数据库操作测试
+  - 新增
+        http://localhost:8001/test/mysql/add
+        {
+          "config": {
+              "session_id":"10002"
+          },
+          "input": "张三"
+        }
+    - 查询列表
+      http://localhost:8001/test/mysql/list
+       {
+        "config": {
+            "session_id":"10002"
+        },
+        "input": "张三"
+      }
+
+     - 查询单个
+      http://localhost:8001/test/mysql/get
+       {
+        "config": {
+            "session_id":"10002"
+        },
+        "input": "4"
+      }
+
+      - 修改
+        http://localhost:8001/test/mysql/update
+        {
+          "config": {
+              "session_id":"1"
+          },
+          "input": "李四"
+        }
+      

+ 376 - 0
foundation/base/mysql/async_mysql_base_dao.py

@@ -0,0 +1,376 @@
+from typing import List, Tuple, Any, Optional, Dict
+from mysql.connector import Error
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.mysql.async_mysql_conn_pool import AsyncMySQLPool
+import aiomysql
+
+class AsyncBaseDAO:
+    """异步数据库访问基类"""
+    
+    def __init__(self, db_pool: AsyncMySQLPool):
+        self.db_pool = db_pool
+        
+    
+    async def execute_query(self, query: str, params: Tuple = None) -> bool:
+        """执行写操作"""
+        try:
+            async with self.db_pool.get_cursor() as cursor:
+                await cursor.execute(query, params or ())
+                return True
+        except Exception as err:
+            handler_err(logger=server_logger, err=err ,err_name="执行查询失败")
+            raise
+    
+    async def fetch_all(self, query: str, params: Tuple = None) -> List[Dict]:
+        """查询多条记录"""
+        try:
+            async with self.db_pool.get_cursor() as cursor:
+                await cursor.execute(query, params or ())
+                return await cursor.fetchall()
+        except Exception as err:
+            handler_err(logger=server_logger, err=err ,err_name="查询数据失败")
+            raise
+    
+    async def fetch_one(self, query: str, params: Tuple = None) -> Optional[Dict]:
+        """查询单条记录"""
+        try:
+            async with self.db_pool.get_cursor() as cursor:
+                await cursor.execute(query, params or ())
+                return await cursor.fetchone()
+        except Exception as err:
+            handler_err(logger=server_logger, err=err ,err_name="查询单条数据失败")
+            raise
+    
+    async def fetch_scalar(self, query: str, params: Tuple = None) -> Any:
+        """查询单个值"""
+        result = await self.fetch_one(query, params)
+        return list(result.values())[0] if result else None
+    
+    async def execute_many(self, query: str, params_list: List[Tuple]) -> bool:
+        """批量执行"""
+        try:
+            async with self.db_pool.get_cursor() as cursor:
+                await cursor.executemany(query, params_list)
+                return True
+        except Exception as err:
+            handler_err(logger=server_logger, err=err ,err_name="批量执行失败")
+            raise
+
+    async def update_record(self, table: str, updates: Dict, conditions: Dict) -> bool:
+        """
+        通用更新记录方法
+        
+        Args:
+            table: 表名
+            updates: 要更新的字段和值,如 {'name': '新名字', 'age': 25}
+            conditions: 更新条件,如 {'id': 1, 'status': 'active'}
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        if not updates:
+            raise ValueError("更新字段不能为空")
+        
+        if not conditions:
+            raise ValueError("更新条件不能为空")
+        
+        try:
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{field} = %s" for field in updates.keys()])
+            set_values = list(updates.values())
+            
+            # 构建 WHERE 子句
+            where_clause = " AND ".join([f"{field} = %s" for field in conditions.keys()])
+            where_values = list(conditions.values())
+            
+            # 构建完整 SQL
+            sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
+            params = set_values + where_values
+            
+            return await self.execute_query(sql, tuple(params))
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="更新记录失败")
+            raise
+    
+    async def update_by_id(self, table: str, record_id: int, updates: Dict) -> bool:
+        """
+        根据ID更新记录
+        
+        Args:
+            table: 表名
+            record_id: 记录ID
+            updates: 要更新的字段和值
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        return await self.update_record(table, updates, {'id': record_id})
+    
+    async def update_with_condition(self, table: str, updates: Dict, where_sql: str, params: Tuple = None) -> bool:
+        """
+        使用自定义WHERE条件更新记录
+        
+        Args:
+            table: 表名
+            updates: 要更新的字段和值
+            where_sql: WHERE条件SQL
+            params: WHERE条件参数
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        if not updates:
+            raise ValueError("更新字段不能为空")
+        
+        try:
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{field} = %s" for field in updates.keys()])
+            set_values = list(updates.values())
+            
+            # 构建完整 SQL
+            sql = f"UPDATE {table} SET {set_clause} WHERE {where_sql}"
+            
+            # 合并参数
+            all_params = tuple(set_values) + (params if params else ())
+            
+            return await self.execute_query(sql, all_params)
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="条件更新失败")
+            raise
+    
+    async def batch_update(self, table: str, updates_list: List[Dict], id_field: str = 'id') -> bool:
+        """
+        批量更新记录(根据ID)
+        
+        Args:
+            table: 表名
+            updates_list: 更新数据列表,每个元素包含id和要更新的字段
+            id_field: ID字段名,默认为'id'
+        
+        Returns:
+            bool: 批量更新是否成功
+        """
+        if not updates_list:
+            raise ValueError("更新数据列表不能为空")
+        
+        try:
+            # 使用事务确保批量操作的原子性
+            async with self.db_pool.get_connection() as conn:
+                async with conn.cursor(aiomysql.DictCursor) as cursor:
+                    for update_data in updates_list:
+                        if id_field not in update_data:
+                            raise ValueError(f"更新数据中缺少{id_field}字段")
+                        
+                        record_id = update_data[id_field]
+                        # 从更新数据中移除ID字段
+                        update_fields = {k: v for k, v in update_data.items() if k != id_field}
+                        
+                        if not update_fields:
+                            continue
+                        
+                        # 构建SET子句
+                        set_clause = ", ".join([f"{field} = %s" for field in update_fields.keys()])
+                        set_values = list(update_fields.values())
+                        
+                        # 执行更新
+                        sql = f"UPDATE {table} SET {set_clause} WHERE {id_field} = %s"
+                        params = set_values + [record_id]
+                        
+                        await cursor.execute(sql, params)
+                    
+                    # 提交事务
+                    await conn.commit()
+                    return True
+                    
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="批量更新失败")
+            raise
+
+
+class TestTabDAO(AsyncBaseDAO):
+    """异步用户数据访问对象"""
+    
+
+    async def insert_user(self, name: str, email: str, age: int) -> int:
+        """插入用户"""
+        insert_sql = "INSERT INTO test_tab (name, email, age) VALUES (%s, %s, %s)"
+        try:
+            async with self.db_pool.get_cursor() as cursor:
+                await cursor.execute(insert_sql, (name, email, age))
+                return cursor.lastrowid
+        except Exception as err:
+            handler_err(logger=server_logger, err=err ,err_name="插入用户失败")
+            raise
+    
+    async def get_user_by_id(self, user_id: int) -> Optional[Dict]:
+        """根据ID获取用户"""
+        query = "SELECT * FROM test_tab WHERE id = %s AND status = 'active'"
+        return await self.fetch_one(query, (user_id,))
+    
+    async def get_all_users(self) -> List[Dict]:
+        """获取所有用户"""
+        query = "SELECT * FROM test_tab WHERE status = 'active' ORDER BY created_at DESC"
+        return await self.fetch_all(query)
+    
+
+    async def get_users_by_condition(self, conditions: Dict) -> List[Dict]:
+        """根据条件查询用户"""
+        if not conditions:
+            return await self.get_all_users()
+        
+        try:
+            where_clause = " AND ".join([f"{field} = %s" for field in conditions.keys()])
+            where_values = list(conditions.values())
+            
+            query = f"SELECT * FROM test_tab WHERE {where_clause} AND status = 'active' ORDER BY created_at DESC"
+            return await self.fetch_all(query, tuple(where_values))
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="条件查询用户失败")
+            raise
+    
+    # ========== 修改方法 ==========
+    
+    async def update_user(self, user_id: int, **updates) -> bool:
+        """
+        更新用户信息
+        
+        Args:
+            user_id: 用户ID
+            **updates: 要更新的字段,如 name='新名字', age=25, email='new@email.com'
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        if not updates:
+            server_logger.warning("没有提供更新字段")
+            return False
+        
+        # 过滤允许更新的字段
+        allowed_fields = {'name', 'email', 'age', 'status'}
+        valid_updates = {k: v for k, v in updates.items() if k in allowed_fields}
+        
+        if not valid_updates:
+            server_logger.warning("没有有效的更新字段")
+            return False
+        
+        try:
+            return await self.update_by_id('test_tab', user_id, valid_updates)
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="更新用户失败")
+            raise
+    
+    async def update_user_by_email(self, email: str, **updates) -> bool:
+        """
+        根据邮箱更新用户信息
+        
+        Args:
+            email: 用户邮箱
+            **updates: 要更新的字段
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        if not updates:
+            server_logger.warning("没有提供更新字段")
+            return False
+        
+        # 过滤允许更新的字段
+        allowed_fields = {'name', 'age', 'status'}
+        valid_updates = {k: v for k, v in updates.items() if k in allowed_fields}
+        
+        if not valid_updates:
+            server_logger.warning("没有有效的更新字段")
+            return False
+        
+        try:
+            return await self.update_record('test_tab', valid_updates, {'email': email})
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="根据邮箱更新用户失败")
+            raise
+    
+    async def update_user_status(self, user_id: int, status: str) -> bool:
+        """
+        更新用户状态
+        
+        Args:
+            user_id: 用户ID
+            status: 状态值 ('active' 或 'inactive')
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        if status not in ('active', 'inactive'):
+            raise ValueError("状态值必须是 'active' 或 'inactive'")
+        
+        try:
+            return await self.update_user(user_id, status=status)
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="更新用户状态失败")
+            raise
+    
+    async def batch_update_users(self, updates_list: List[Dict]) -> bool:
+        """
+        批量更新用户信息
+        
+        Args:
+            updates_list: 更新数据列表,每个元素必须包含id字段
+        
+        Returns:
+            bool: 批量更新是否成功
+        """
+        try:
+            return await self.batch_update('test_tab', updates_list, 'id')
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="批量更新用户失败")
+            raise
+    
+    async def update_users_age_range(self, min_age: int, max_age: int, updates: Dict) -> bool:
+        """
+        更新年龄范围内的用户
+        
+        Args:
+            min_age: 最小年龄
+            max_age: 最大年龄
+            updates: 要更新的字段
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        try:
+            where_sql = "age BETWEEN %s AND %s AND status = 'active'"
+            params = (min_age, max_age)
+            
+            return await self.update_with_condition('test_tab', updates, where_sql, params)
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="更新年龄范围用户失败")
+            raise
+    
+    async def increment_user_age(self, user_id: int, increment: int = 1) -> bool:
+        """
+        增加用户年龄
+        
+        Args:
+            user_id: 用户ID
+            increment: 增加的值,默认为1
+        
+        Returns:
+            bool: 更新是否成功
+        """
+        try:
+            sql = "UPDATE test_tab SET age = age + %s WHERE id = %s AND status = 'active'"
+            return await self.execute_query(sql, (increment, user_id))
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="增加用户年龄失败")
+            raise
+
+

+ 86 - 0
foundation/base/mysql/async_mysql_conn_pool.py

@@ -0,0 +1,86 @@
+import aiomysql
+from contextlib import asynccontextmanager
+from typing import  Dict,Optional, AsyncGenerator
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.config import config_handler
+
+# 异步数据库连接池
+class AsyncMySQLPool:
+    _instance = None
+    
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            cls._instance = super().__new__(cls)
+        return cls._instance
+    
+    def __init__(self):
+        if not hasattr(self, '_pool'):
+            self._pool = None
+            self._initialized = False
+    
+    async def initialize(self):
+        """初始化连接池"""
+        try:
+            
+            self._pool = await aiomysql.create_pool(
+                host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
+                port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
+                user=config_handler.get("mysql", "MYSQL_USER"),
+                password=config_handler.get("mysql", "MYSQL_PASSWORD"),
+                db=config_handler.get("mysql", "MYSQL_DB"),
+                minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
+                maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
+                autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT")
+            )
+            self._initialized = True
+            server_logger.info("异步MySQL连接池初始化成功")
+        except Exception as e:
+            server_logger.error(f"连接池初始化失败: {e}")
+            raise
+    
+    async def close(self):
+        """关闭连接池"""
+        if self._pool:
+            self._pool.close()
+            await self._pool.wait_closed()
+            server_logger.info("异步MySQL连接池已关闭")
+    
+    @asynccontextmanager
+    async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
+        """获取数据库连接的上下文管理器"""
+        if not self._initialized:
+            # 如果没有初始化,使用默认配置初始化
+            await self.initialize()
+        
+        async with self._pool.acquire() as conn:
+            try:
+                yield conn
+            except Exception as e:
+                server_logger.error(f"数据库连接操作失败: {e}")
+                raise
+    
+    @asynccontextmanager
+    async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
+        """获取游标的上下文管理器"""
+        if connection:
+            # 使用提供的连接
+            async with connection.cursor(aiomysql.DictCursor) as cursor:
+                try:
+                    yield cursor
+                except Exception as e:
+                    server_logger.error(f"游标操作失败: {e}")
+                    raise
+        else:
+            # 创建新连接
+            async with self.get_connection() as conn:
+                async with conn.cursor(aiomysql.DictCursor) as cursor:
+                    try:
+                        yield cursor
+                    except Exception as e:
+                        server_logger.error(f"游标操作失败: {e}")
+                        raise
+
+
+# 全局数据库连接池实例
+#async_db_pool = AsyncMySQLPool()

+ 11 - 1
foundation/utils/tool_utils.py

@@ -8,7 +8,8 @@ from functools import wraps
 from foundation.logger.loggering import server_logger
 from foundation.utils.common import handler_err
 from foundation.base.config import config_handler
-
+import json
+from datetime import datetime
 # 获取当前文件的目录
 current_dir = os.path.dirname(__file__)
 # 构建到 .env 的相对路径
@@ -39,3 +40,12 @@ def get_system_prompt() -> str:
     server_logger.info(f"获取系统提示语: {system_prompt}")
     return str(system_prompt)
 
+
+
+
+
+class DateTimeEncoder(json.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, datetime):
+            return obj.isoformat()  # 转换为 ISO 8601 格式字符串
+        return super().default(obj)

+ 2 - 0
requirements.txt

@@ -181,3 +181,5 @@ redis==6.2.0
 langgraph-checkpoint-postgres==2.0.23
 langgraph-checkpoint-redis==0.0.8
 langchain-redis==0.2.3
+aiomysql==0.3.2
+

+ 10 - 5
views/__init__.py

@@ -13,19 +13,24 @@ from contextlib import asynccontextmanager
 from contextvars import ContextVar
 
 from fastapi import FastAPI, APIRouter
-
-
-mcp_server = None
+from foundation.base.mysql.async_mysql_conn_pool import AsyncMySQLPool
+from foundation.logger.loggering import server_logger
 
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     # 启动时加载工具
     #await mcp_server.get_mcp_tools()
+    # 全局数据库连接池实例
+    async_db_pool = AsyncMySQLPool()
+    await async_db_pool.initialize()
+    app.state.async_db_pool = async_db_pool
+    server_logger.info(f"✅ MySQL数据库连接池:{app.state.async_db_pool}")
 
     yield
     # 关闭时清理
-    if mcp_server and mcp_server.cleanup:
-        await mcp_server.close()
+    if async_db_pool and async_db_pool.close:
+        await async_db_pool.close()
+
 
 test_router = APIRouter(prefix="/test", tags=["agent"])
 current_operation_id: ContextVar[str] = ContextVar("operation_id", default=str(uuid.uuid4()))

+ 180 - 2
views/test_views.py

@@ -13,6 +13,7 @@ from typing import Optional
 from fastapi import Depends, Response, Header
 from sse_starlette import EventSourceResponse
 from starlette.responses import JSONResponse
+from fastapi import Depends, Request, Response, Header
 
 from foundation.agent.test_agent import test_agent_client
 from foundation.agent.generate.model_generate import generate_model_client
@@ -22,7 +23,8 @@ from foundation.utils.common import return_json, handler_err
 from views import test_router, get_operation_id
 from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
 
-
+from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
+from foundation.utils.tool_utils import DateTimeEncoder
 
 
 @test_router.post("/generate/chat", response_model=TestForm)
@@ -332,4 +334,180 @@ async def chat_graph_stream(param: TestForm,
         return JSONResponse(
             return_json(code=1, msg=f"{err}", trace_id=trace_id),
             status_code=500
-        )
+        )
+    
+
+
+
+
+
+
+@test_router.post("/mysql/add", response_model=TestForm)
+async def test_mysql_add(
+        request: Request,
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+    根据MySQL应用
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 验证参数
+
+        # 从字典中获取input
+        input_data = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 从app.state中获取数据库连接池
+        async_db_pool = request.app.state.async_db_pool
+        from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
+        test_tab_dao = TestTabDAO(async_db_pool)
+        #  name: str, email: str, age: int
+        name = input_data
+        email = session_id
+        age = 18
+        test_id = await test_tab_dao.insert_user(name=name, email=email, age=age)
+        output = f"【test_id】: {test_id}"
+       
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/add")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+@test_router.post("/mysql/get", response_model=TestForm)
+async def test_mysql_add(
+        request: Request,
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+    根据MySQL应用
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 验证参数
+
+        # 从字典中获取input
+        input_data = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 从app.state中获取数据库连接池
+        async_db_pool = request.app.state.async_db_pool
+        test_tab_dao = TestTabDAO(async_db_pool)
+        test_id = input_data;
+        data = await test_tab_dao.get_user_by_id(user_id=test_id)
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/get")
+        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
+        output = json_str
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/get")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+@test_router.post("/mysql/list", response_model=TestForm)
+async def test_mysql_add(
+        request: Request,
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+    根据MySQL应用
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 验证参数
+
+        # 从字典中获取input
+        input_data = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 从app.state中获取数据库连接池
+        async_db_pool = request.app.state.async_db_pool
+        from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
+        test_tab_dao = TestTabDAO(async_db_pool)
+        test_id = input_data;
+        data = await test_tab_dao.get_all_users()
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/list")
+        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
+        output = json_str
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/list")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+
+
+@test_router.post("/mysql/update", response_model=TestForm)
+async def test_mysql_add(
+        request: Request,
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+    根据MySQL应用
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 验证参数
+
+        # 从字典中获取input
+        input_data = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 从app.state中获取数据库连接池
+        async_db_pool = request.app.state.async_db_pool
+        test_tab_dao = TestTabDAO(async_db_pool)
+        test_id = session_id;
+        updates = {
+            "name": input_data,
+            "email": "test_email——upt",
+            "age": 22
+        }
+        success = await test_tab_dao.update_user(user_id=test_id , **updates)
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {success}", log_type="/mysql/update")
+        output = success
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/update")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+