| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :cattle_farm_views.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/10 17:32
- '''
- import json
- from typing import Optional, List, Dict, Any
- from pydantic import BaseModel, Field
- from fastapi import Depends, Response, Header
- from sse_starlette import EventSourceResponse
- from starlette.responses import JSONResponse
- from fastapi import Depends, Request, Response, Header
- from foundation.ai.agent.test_agent import test_agent_client
- from foundation.ai.agent.generate.model_generate import generate_model_client
- from foundation.observability.logger.loggering import server_logger
- from foundation.schemas.test_schemas import TestForm
- from foundation.utils.common import return_json, handler_err
- from views import test_router, get_operation_id
- from foundation.ai.agent.workflow.test_workflow_graph import test_workflow_graph
- from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
- from database.repositories.bus_data_query import BasisOfPreparationDAO
- from foundation.utils.tool_utils import DateTimeEncoder
- from langchain_core.prompts import ChatPromptTemplate
- from foundation.utils.yaml_utils import system_prompt_config
- from foundation.ai.models.silicon_flow import SiliconFlowAPI
- from foundation.database.base.vector.pg_vector import PGVectorDB
- from foundation.database.base.vector.milvus_vector import MilvusVectorManager
- # 响应模型用于Swagger文档
- class StandardResponse(BaseModel):
- """标准响应模型"""
- code: int = Field(description="响应状态码 (0:成功, 其他:错误)")
- msg: str = Field(description="响应消息")
- data: Optional[Dict[str, Any]] = Field(default=None, description="响应数据")
- data_type: Optional[str] = Field(default=None, description="数据类型")
- trace_id: Optional[str] = Field(default=None, description="追踪ID")
- class UserRecord(BaseModel):
- """用户记录模型"""
- user_id: Optional[int] = Field(description="用户ID")
- name: Optional[str] = Field(description="用户名")
- email: Optional[str] = Field(description="邮箱")
- age: Optional[int] = Field(description="年龄")
- created_at: Optional[str] = Field(description="创建时间")
- class EmbeddingResponse(BaseModel):
- """向量嵌入响应模型"""
- embed_dim: int = Field(description="嵌入向量维度")
- embedding: List[float] = Field(description="嵌入向量数据")
- class SearchResult(BaseModel):
- """搜索结果模型"""
- text_content: Optional[str] = Field(description="文本内容")
- score: Optional[float] = Field(description="相关性得分")
- metadata: Optional[Dict[str, Any]] = Field(default=None, description="元数据")
- class SSEData(BaseModel):
- """SSE数据模型"""
- code: int = Field(description="状态码")
- output: Optional[str] = Field(default=None, description="输出内容")
- completed: bool = Field(description="是否完成")
- trace_id: Optional[str] = Field(default=None, description="追踪ID")
- message: Optional[str] = Field(default=None, description="消息")
- dataType: Optional[str] = Field(default="text", description="数据类型")
- @test_router.post("/generate/chat", tags=["模型生成"])
- async def generate_chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
-
- # 创建ChatPromptTemplate
- template = ChatPromptTemplate.from_messages([
- ("system", system_prompt_config['system_prompt']),
- ("user", input_query)
- ])
- task_prompt_info = {"task_prompt": template}
- output = await generate_model_client.get_model_generate_invoke(trace_id=trace_id , task_prompt_info=task_prompt_info)
- # 直接执行
- server_logger.info(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/generate/stream", tags=["模型生成"])
- async def generate_stream_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 创建ChatPromptTemplate
- template = ChatPromptTemplate.from_messages([
- ("system", system_prompt_config['system_prompt']),
- ("user", input_query)
- ])
- task_prompt_info = {"task_prompt": template}
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
- for chunk in generate_model_client.get_model_generate_stream(trace_id=trace_id , task_prompt_info=task_prompt_info):
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "output": chunk,
- "completed": False,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- # 路由
- @test_router.post("/agent/chat", tags=["智能体"])
- async def chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
-
- # stream 流式执行
- output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/agent/stream", response_class=Response, tags=["智能体"])
- async def chat_agent_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
-
- # 提取参数
- input_data = param.input
- context = param.context
- header_info = {
-
- }
- task_prompt_info = {"task_prompt": ""}
- # 如果business_scene为None,则使用大模型进行意图识别
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_agent_client.handle_query_stream(
- trace_id=trace_id,
- config_param=param.config,
- task_prompt_info=task_prompt_info,
- input_query=input_data,
- context=context,
- header_info=header_info
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @test_router.post("/graph/stream", response_class=Response, tags=["智能体"])
- async def chat_graph_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # request_param = {
- # "input": param.input,
- # "config": param.config,
- # "context": param.context
- # }
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_workflow_graph.handle_query_stream(
- param=param,
- trace_id=trace_id,
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": "Stream completed",
- "code": 0,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "msg": str(e),
- "code": 1,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
-
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
-
- @test_router.post("/redis", tags=["redis"])
- async def test_redis(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- from foundation.utils.redis_utils import set_redis_result_cache_data , get_redis_result_cache_data
- output = "success"
- data_type = "output"
- await set_redis_result_cache_data(data_type=data_type , trace_id=trace_id , value=input_data)
- server_logger.info(trace_id=trace_id, msg=f"key:{trace_id}:{data_type},value:{input_data} redis 设置成功")
- output = await get_redis_result_cache_data(data_type=data_type , trace_id=trace_id)
- # 直接执行
- server_logger.info(trace_id=trace_id, msg=f"【result】: {output}", log_type="/redis")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/redis")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/redis")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/mysql/add", tags=["mysql"])
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
- test_tab_dao = TestTabDAO(async_db_pool)
- # name: str, email: str, age: int
- name = input_data
- email = session_id
- age = 18
- test_id = await test_tab_dao.insert_user(name=name, email=email, age=age)
- output = f"【test_id】: {test_id}"
-
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/add")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/mysql/get", tags=["mysql"])
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = input_data;
- data = await test_tab_dao.get_user_by_id(user_id=test_id)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/get")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/get")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/mysql/list", tags=["mysql"])
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = input_data;
- data = await test_tab_dao.get_all_users()
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/list")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/list")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/mysql/update", tags=["mysql"])
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = session_id;
- updates = {
- "name": input_data,
- "email": "test_email——upt",
- "age": 22
- }
- success = await test_tab_dao.update_user(user_id=test_id , **updates)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {success}", log_type="/mysql/update")
- output = success
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/update")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/bop/get", tags=["mysql"])
- async def test_bop_get(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- bop_dao = BasisOfPreparationDAO(async_db_pool)
- test_id = input_data;
- data = await bop_dao.get_info_by_id(id=test_id)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/bop/get")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/bop/get")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/bop/list", tags=["mysql"])
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
- bop_dao = BasisOfPreparationDAO(async_db_pool)
- test_id = input_data;
- data = await bop_dao.get_list()
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/bop/list")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/bop/list")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- # RAG-嵌入接口
- @test_router.post("/embedding", tags=["RAG服务"])
- async def embedding_test_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- embedding模型测试
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- text = input_query
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- from foundation.ai.models.silicon_flow import SiliconFlowAPI
- base_api_platform = SiliconFlowAPI()
- embedding = base_api_platform.get_embeddings([text])[0]
- embed_dim = len(embedding)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
- output = f"embed_dim={embed_dim},embedding:{embedding}"
- #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
- # 直接执行
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- #PG向量检索
- @test_router.post("/bfp/search", tags=["RAG服务"])
- async def bfp_search_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据向量检索
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- top_k = int(session_id)
-
- output = None
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- #PG向量检索/重排序
- @test_router.post("/bfp/search/rerank", tags=["RAG服务"])
- async def bfp_search_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档检索和重排序
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- top_k = int(session_id)
-
- output = None
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
- # 重排序处理
- content_list = [doc["text_content"] for doc in output]
- output = client.rerank(input_query=input_query, documents=content_list , top_n=top_k)
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- # Milvus向量检索
- @test_router.post("/data/bfp/milvus/search", tags=["milvus"])
- async def bfp_search_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理 和 入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- top_k = int(session_id)
-
- output = None
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- vector_db = MilvusVectorManager(base_api_platform=client)
- output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
|