#!/usr/bin/env python # -*- coding: utf-8 -*- """ RAG监控装饰器使用示例和测试脚本 展示如何使用 rag_monitor 装饰器监控RAG链路 """ import sys import os import time import json import asyncio from pathlib import Path # 添加项目根目录到路径 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, project_root) from foundation.observability.monitoring.rag import rag_monitor from foundation.observability.logger.loggering import review_logger as logger # ========== 示例1: 同步函数监控 ========== @rag_monitor.monitor_step( step_name="example_sync_query_extract", capture_input=True, capture_output=True ) def example_query_extract(content: str): """示例:查询提取函数""" logger.info(f"正在提取查询,内容长度: {len(content)}") time.sleep(0.5) # 模拟处理时间 # 模拟提取结果 return [ {"query": "安全生产条件", "entity": "安全"}, {"query": "施工管理制度", "entity": "施工"} ] @rag_monitor.monitor_step( step_name="example_sync_vector_search", capture_input=True, capture_output=True, output_transform=lambda x: { # 只保留关键信息 "results_count": len(x), "has_results": bool(x) } ) def example_vector_search(query_pairs: list): """示例:向量检索函数""" logger.info(f"正在进行向量检索,查询对数量: {len(query_pairs)}") time.sleep(1.0) # 模拟检索时间 # 模拟检索结果 results = [] for pair in query_pairs: results.append({ "query": pair["query"], "doc_id": f"doc_{hash(pair['query']) % 100}", "score": 0.85, "content": f"这是关于{pair['query']}的内容..." }) return results # ========== 示例2: 异步函数监控 ========== @rag_monitor.monitor_step( step_name="example_async_rerank", capture_input=True, capture_output=True, input_transform=lambda x: { # 只记录输入统计信息 "results_count": len(x["args"][0]) if x["args"] else 0 } ) async def example_async_rerank(results: list): """示例:异步重排序函数""" logger.info(f"正在进行重排序,结果数量: {len(results)}") await asyncio.sleep(0.8) # 模拟异步处理 # 模拟重排序 sorted_results = sorted(results, key=lambda x: x["score"], reverse=True) return sorted_results[:5] # 只返回前5个 @rag_monitor.monitor_step( step_name="example_async_parent_enhance", capture_input=True, capture_output=True ) async def example_async_parent_enhance(results: list): """示例:异步父文档增强函数""" logger.info(f"正在进行父文档增强,结果数量: {len(results)}") await asyncio.sleep(1.2) # 模拟异步处理 # 模拟父文档增强 enhanced = [] for res in results: enhanced.append({ **res, "parent_content": f"父文档内容: {res['content']}的完整上下文...", "enhanced": True }) return enhanced # ========== 示例3: 完整的RAG链路测试 ========== def test_sync_rag_pipeline(): """测试同步RAG链路""" print("\n" + "="*60) print("示例1: 同步RAG链路监控") print("="*60) # 开始追踪会话 trace_id = f"test_sync_{int(time.time() * 1000)}" rag_monitor.start_trace(trace_id, metadata={ "test_type": "sync", "description": "同步RAG链路测试" }) try: # Step 1: 查询提取 query_content = "请检查施工方案中的安全生产条件和施工管理制度是否符合规范要求。" query_pairs = example_query_extract(query_content) print(f"✅ 查询提取完成,提取到 {len(query_pairs)} 个查询对") # Step 2: 向量检索 search_results = example_vector_search(query_pairs) print(f"✅ 向量检索完成,找到 {len(search_results)} 个结果") print(f"\n✅ 同步RAG链路测试完成") finally: # 结束追踪并保存 trace_data = rag_monitor.end_trace(trace_id) print(f"\n📊 追踪数据已保存: temp/rag_monitoring/{trace_id}.json") print(f"⏱️ 总耗时: {trace_data['total_duration']}秒") print(f"📝 步骤数量: {len(trace_data['steps'])}") async def test_async_rag_pipeline(): """测试异步RAG链路""" print("\n" + "="*60) print("示例2: 异步RAG链路监控") print("="*60) # 开始追踪会话 trace_id = f"test_async_{int(time.time() * 1000)}" rag_monitor.start_trace(trace_id, metadata={ "test_type": "async", "description": "异步RAG链路测试" }) try: # 模拟一些初始数据 initial_results = [ {"query": "安全", "doc_id": "doc_1", "score": 0.82, "content": "安全内容..."}, {"query": "施工", "doc_id": "doc_2", "score": 0.91, "content": "施工内容..."}, {"query": "管理", "doc_id": "doc_3", "score": 0.75, "content": "管理内容..."} ] # Step 1: 异步重排序 reranked_results = await example_async_rerank(initial_results) print(f"✅ 重排序完成,保留前 {len(reranked_results)} 个结果") # Step 2: 异步父文档增强 enhanced_results = await example_async_parent_enhance(reranked_results) print(f"✅ 父文档增强完成,增强了 {len(enhanced_results)} 个结果") print(f"\n✅ 异步RAG链路测试完成") finally: # 结束追踪并保存 trace_data = rag_monitor.end_trace(trace_id) print(f"\n📊 追踪数据已保存: temp/rag_monitoring/{trace_id}.json") print(f"⏱️ 总耗时: {trace_data['total_duration']}秒") print(f"📝 步骤数量: {len(trace_data['steps'])}") def test_mixed_rag_pipeline(): """测试混合(同步+异步)RAG链路""" print("\n" + "="*60) print("示例3: 混合RAG链路监控(同步+异步)") print("="*60) # 开始追踪会话 trace_id = f"test_mixed_{int(time.time() * 1000)}" rag_monitor.start_trace(trace_id, metadata={ "test_type": "mixed", "description": "混合RAG链路测试" }) try: # Step 1: 同步查询提取 query_content = "检查项目的环境保护措施和质量管理体系。" query_pairs = example_query_extract(query_content) print(f"✅ [同步] 查询提取完成") # Step 2: 同步向量检索 search_results = example_vector_search(query_pairs) print(f"✅ [同步] 向量检索完成") # Step 3: 异步重排序 async def async_part(): reranked = await example_async_rerank(search_results) print(f"✅ [异步] 重排序完成") # Step 4: 异步父文档增强 enhanced = await example_async_parent_enhance(reranked) print(f"✅ [异步] 父文档增强完成") return enhanced # 运行异步部分 final_results = asyncio.run(async_part()) print(f"\n✅ 混合RAG链路测试完成,最终得到 {len(final_results)} 个结果") finally: # 结束追踪并保存 trace_data = rag_monitor.end_trace(trace_id) print(f"\n📊 追踪数据已保存: temp/rag_monitoring/{trace_id}.json") print(f"⏱️ 总耗时: {trace_data['total_duration']}秒") print(f"📝 步骤数量: {len(trace_data['steps'])}") # ========== 示例4: 自定义输入输出转换 ========== @rag_monitor.monitor_step( step_name="example_sensitive_data", capture_input=True, capture_output=True, input_transform=lambda x: { # 过滤敏感信息,只保留统计数据 "user_id": "***", # 隐藏用户ID "data_length": len(str(x)) }, output_transform=lambda x: { # 只保留关键指标 "success": x.get("success"), "count": x.get("count") } ) def example_process_sensitive_data(user_id: str, data: dict): """示例:处理敏感数据(自定义转换)""" time.sleep(0.3) return { "success": True, "user_id": user_id, "count": len(data), "details": data # 这些详细信息不会被记录 } def test_custom_transform(): """测试自定义输入输出转换""" print("\n" + "="*60) print("示例4: 自定义输入输出转换(敏感数据保护)") print("="*60) trace_id = f"test_transform_{int(time.time() * 1000)}" rag_monitor.start_trace(trace_id, metadata={ "test_type": "custom_transform" }) try: result = example_process_sensitive_data( user_id="user_12345", data={"key1": "value1", "key2": "value2"} ) print(f"✅ 处理完成,成功: {result['success']}") print(f"ℹ️ 敏感信息已被过滤,只记录统计数据") finally: trace_data = rag_monitor.end_trace(trace_id) print(f"\n📊 追踪数据已保存: temp/rag_monitoring/{trace_id}.json") # ========== 查看监控结果 ========== def view_trace_result(trace_id: str): """查看追踪结果""" file_path = Path("temp/rag_monitoring") / f"{trace_id}.json" if file_path.exists(): print(f"\n📄 追踪结果: {trace_id}") print("="*60) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) print(f"⏱️ 总耗时: {data.get('total_duration')}秒") print(f"📝 步骤数量: {len(data.get('steps', {}))}") print(f"\n步骤详情:") for step_name, step_data in data.get('steps', {}).items(): print(f"\n [{step_data.get('status', 'unknown').upper()}] {step_name}") print(f" 函数: {step_data.get('function_name')}") print(f" 耗时: {step_data.get('duration')}秒") if step_data.get('status') == 'error': print(f" ❌ 错误: {step_data.get('error', {}).get('message')}") else: print(f"❌ 找不到追踪文件: {file_path}") # ========== 主函数 ========== def main(): """运行所有测试示例""" print("\n" + "🚀 RAG监控装饰器测试 🚀".center(60, "=")) try: # 示例1: 同步RAG链路 test_sync_rag_pipeline() # 示例2: 异步RAG链路 asyncio.run(test_async_rag_pipeline()) # 示例3: 混合RAG链路 test_mixed_rag_pipeline() # 示例4: 自定义转换 test_custom_transform() print("\n" + "✅ 所有测试完成!".center(60, "=")) print(f"\n💡 提示: 查看监控数据文件在: temp/rag_monitoring/") print(f"💡 提示: 每个trace_id对应一个JSON文件,包含完整的执行链路信息") except Exception as e: print(f"\n❌ 测试失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()