#!/usr/bin/env python # -*- coding: utf-8 -*- """ 模型压力测试脚本 测试当前系统的模型调用压力水平,支持: - 可配置并发数(--concurrency) - 可配置测试总次数(--count) - 可配置上下文长度(--context-size):1k/2k/4k/8k/16k tokens - 选择不同 LLM / Embedding 模型 - 输出延迟统计(avg/p50/p95/p99/min/max)、吞吐量、错误率 运行方式: # 使用默认配置(10并发,50次请求,蜀天35B) python utils_test/Model_Test/test_model_stress.py # python utils_test/Model_Test/test_model_stress.py --concurrency 150 --count 150 --model shutian_qwen3_6_27b --context-size 8k # 避免服务端 KV 缓存命中(注入随机值) python utils_test/Model_Test/test_model_stress.py --concurrency 10 --count 50 --bust-cache # 自定义参数 python utils_test/Model_Test/test_model_stress.py --concurrency 20 --count 100 --model shutian_qwen3_5_122b # 测试不同上下文长度 python utils_test/Model_Test/test_model_stress.py --context-size 4k python utils_test/Model_Test/test_model_stress.py --context-size 8k -c 5 -n 20 # 自动遍历所有上下文长度(1k/2k/4k/8k/16k)生成对比报告 python utils_test/Model_Test/test_model_stress.py --context-size all # 测试 Embedding 模型 python utils_test/Model_Test/test_model_stress.py --type embedding --count 100 --concurrency 10 # 测试所有 LLM 模型(逐个) python utils_test/Model_Test/test_model_stress.py --all-models # 使用 function_name(从 model_setting.yaml 读取模型) python utils_test/Model_Test/test_model_stress.py --function completeness_review_generate """ import sys import asyncio import argparse import time import statistics import uuid from pathlib import Path from dataclasses import dataclass, field from typing import List, Optional, Tuple PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) # ============================================================ # 可用模型列表(与 model_setting.yaml available_models 一致) # ============================================================ LLM_MODELS = [ "qwen3_5_35b_a3b", "qwen3_5_27b", "qwen3_5_122b_a10b", "doubao", "doubao-1.5-pro-256k", "doubao-1.5-lite-32k", "deepseek", "deepseek-v3", "lq_qwen3_8b", "lq_qwen3_8b_lq_lora", "lq_qwen3_4b", "qwen_local_14b", "shutian_qwen3_5_122b", "shutian_qwen3_8b", "shutian_qwen3_5_35b", "shutian_qwen3_6_27b", ] EMBEDDING_MODELS = [ "siliconflow_embed", "shutian_qwen3_embed", ] # 测试用 prompt TEST_SYSTEM_PROMPT = "你是一个测试助手,请简洁回答问题。" TEST_USER_PROMPT = "请用一句话回答:1+1等于几?" TEST_EMBED_TEXT = "这是一个模型Embedding压力测试文本,用于验证向量化服务的并发能力和响应延迟。" # 上下文填充文本(中文段落,约 1.5 字/token,用于模拟指定 token 数的上下文) _PADDING_SENTENCES = [ "施工方案编制应结合工程实际,充分考虑施工环境、地质条件、气候因素等影响。", "桥梁工程的施工质量直接关系到结构安全和使用寿命,必须严格按照设计图纸和规范要求执行。", "混凝土浇筑前应检查模板支撑体系的稳定性,确保钢筋绑扎间距和保护层厚度满足设计要求。", "预应力张拉施工应按照设计张拉顺序进行,控制张拉力和伸长量在允许偏差范围内。", "基坑开挖过程中应加强监测,及时掌握围护结构变形和周边建筑物沉降情况。", "路基填筑应分层压实,每层压实厚度不宜超过30cm,压实度应满足设计和规范要求。", "隧道施工应遵循短开挖、强支护、早封闭、勤量测的原则,确保施工安全。", "钢结构焊接应由持证焊工操作,焊缝质量应符合设计要求和相关标准规定。", "施工测量放线应采用全站仪或GPS定位,确保平面位置和高程精度满足规范要求。", "安全生产管理应建立健全责任制,定期开展安全教育培训和隐患排查治理工作。", ] # 预生成不同长度的填充文本(避免每次请求重复生成) _CONTEXT_CACHE: dict = {} def _generate_context_text(target_tokens: int) -> str: """生成约 target_tokens 个 token 的中文上下文填充文本。 Qwen 系列 tokenizer 中文约 1.8~2.2 字/token,取 2.2 倍冗余确保实际 token 数达标。 为保证填充质量,循环使用有意义的工程语句而非随机字符。 """ if target_tokens in _CONTEXT_CACHE: return _CONTEXT_CACHE[target_tokens] # 2.2 字/token 冗余系数,宁多勿少 target_chars = int(target_tokens * 2.2) parts = [] idx = 0 while len("".join(parts)) < target_chars: parts.append(_PADDING_SENTENCES[idx % len(_PADDING_SENTENCES)]) idx += 1 text = "\n".join(parts) # 精确截断 text = text[:target_chars] _CONTEXT_CACHE[target_tokens] = text return text CONTEXT_SIZE_PRESETS = { "1k": 1024, "2k": 2048, "4k": 4096, "8k": 8192, "16k": 16384, } # ============================================================ # 统计数据结构 # ============================================================ @dataclass class RequestResult: success: bool latency_ms: float error: Optional[str] = None completion_tokens: int = 0 # 模型输出 token 数 prompt_tokens: int = 0 # 模型输入 token 数 @dataclass class StressTestResult: model_name: str model_type: str # "llm" or "embedding" concurrency: int total_requests: int context_size_tokens: int = 0 # 0 表示未指定 success_count: int = 0 fail_count: int = 0 latencies_ms: List[float] = field(default_factory=list) completion_tokens_list: List[int] = field(default_factory=list) prompt_tokens_list: List[int] = field(default_factory=list) errors: List[str] = field(default_factory=list) total_time_s: float = 0.0 # ============================================================ # 测试执行器 # ============================================================ def _extract_token_usage(response) -> Tuple[int, int]: """从 LangChain AIMessage 中提取 token 使用量。 返回 (prompt_tokens, completion_tokens),提取失败返回 (0, 0)。 """ prompt_tokens = 0 completion_tokens = 0 # 方式1: usage_metadata(LangChain 标准) if hasattr(response, "usage_metadata") and response.usage_metadata: meta = response.usage_metadata prompt_tokens = getattr(meta, "input_tokens", 0) or 0 completion_tokens = getattr(meta, "output_tokens", 0) or 0 # 方式2: response_metadata.token_usage(OpenAI 兼容) if prompt_tokens == 0 and completion_tokens == 0: rm = getattr(response, "response_metadata", {}) or {} token_usage = rm.get("token_usage", {}) prompt_tokens = token_usage.get("prompt_tokens", 0) completion_tokens = token_usage.get("completion_tokens", 0) return prompt_tokens, completion_tokens async def _run_llm_request(trace_id: str, model_name: Optional[str] = None, function_name: Optional[str] = None, context_size: int = 0, bust_cache: bool = False) -> RequestResult: """执行单次 LLM 调用并记录延迟和 token 用量 Args: context_size: 上下文 token 数,>0 时在 user_prompt 前拼接填充文本 bust_cache: 在 prompt 末尾追加随机值避免 KV 缓存命中 """ from foundation.ai.models.model_handler import model_handler from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function from langchain_core.messages import SystemMessage, HumanMessage # 解析最终使用的模型名称和 thinking 配置 resolved_model = model_name enable_thinking = False if function_name: try: cfg_model = get_model_for_function(function_name) cfg_thinking = get_thinking_mode_for_function(function_name) if cfg_model: resolved_model = cfg_model if cfg_thinking is not None: enable_thinking = cfg_thinking except Exception: pass if not resolved_model: resolved_model = "shutian_qwen3_5_35b" user_prompt = TEST_USER_PROMPT if context_size > 0: padding = _generate_context_text(context_size) user_prompt = f"{padding}\n\n---\n\n{TEST_USER_PROMPT}" if bust_cache: rand = uuid.uuid4().hex[:12] user_prompt = f"[noise:{rand}]\n{user_prompt}" messages = [SystemMessage(content=TEST_SYSTEM_PROMPT), HumanMessage(content=user_prompt)] start = time.perf_counter() try: model = model_handler.get_model_by_name(resolved_model) # 处理 Qwen3.5 thinking 模式绑定 is_qwen35 = "qwen3.5" in resolved_model.lower() or "qwen3_5" in resolved_model.lower() if is_qwen35: model = model.bind(extra_body={"chat_template_kwargs": {"enable_thinking": enable_thinking}}) response = await model.ainvoke(messages) latency = (time.perf_counter() - start) * 1000 if not response or not response.content: return RequestResult(success=False, latency_ms=latency, error="空响应") prompt_tokens, completion_tokens = _extract_token_usage(response) return RequestResult( success=True, latency_ms=latency, completion_tokens=completion_tokens, prompt_tokens=prompt_tokens, ) except Exception as e: latency = (time.perf_counter() - start) * 1000 return RequestResult(success=False, latency_ms=latency, error=str(e)[:200]) async def _run_embedding_request(trace_id: str, model_name: str) -> RequestResult: """执行单次 Embedding 调用并记录延迟""" from foundation.ai.models.model_handler import model_handler start = time.perf_counter() try: embed_model = model_handler.get_embedding_model() # OpenAIEmbeddings 使用 aembed_query result = await embed_model.aembed_query(TEST_EMBED_TEXT) latency = (time.perf_counter() - start) * 1000 if not result: return RequestResult(success=False, latency_ms=latency, error="空响应") return RequestResult(success=True, latency_ms=latency) except Exception as e: latency = (time.perf_counter() - start) * 1000 return RequestResult(success=False, latency_ms=latency, error=str(e)[:200]) async def run_stress_test( model_name: str, model_type: str, concurrency: int, total_count: int, function_name: Optional[str] = None, context_size: int = 0, bust_cache: bool = False, ) -> StressTestResult: """执行压力测试 Args: model_name: 模型名称 model_type: "llm" 或 "embedding" concurrency: 并发数 total_count: 总请求次数 function_name: 功能名称(可选,仅 LLM 有效) context_size: 上下文 token 数(0=不填充) """ display_name = function_name or model_name result = StressTestResult( model_name=display_name, model_type=model_type, concurrency=concurrency, total_requests=total_count, context_size_tokens=context_size, ) semaphore = asyncio.Semaphore(concurrency) progress_done = 0 async def _task(idx: int): nonlocal progress_done async with semaphore: trace_id = f"stress_{model_name}_{idx}" if model_type == "embedding": return await _run_embedding_request(trace_id, model_name) else: return await _run_llm_request(trace_id, model_name, function_name, context_size, bust_cache) ctx_label = f" | 上下文: {context_size//1024}k tokens" if context_size > 0 else "" print(f"\n{'='*60}") print(f" 模型压力测试: {display_name} ({model_type.upper()})") print(f" 并发数: {concurrency} | 总请求: {total_count}{ctx_label}") print(f"{'='*60}") wall_start = time.perf_counter() # 创建所有任务 tasks = [_task(i) for i in range(total_count)] # 逐批执行并打印进度 batch_size = min(concurrency * 2, total_count) for batch_start in range(0, total_count, batch_size): batch_end = min(batch_start + batch_size, total_count) batch = tasks[batch_start:batch_end] batch_results = await asyncio.gather(*batch, return_exceptions=True) for r in batch_results: if isinstance(r, Exception): result.fail_count += 1 result.errors.append(str(r)[:200]) result.latencies_ms.append(0) else: if r.success: result.success_count += 1 result.completion_tokens_list.append(r.completion_tokens) result.prompt_tokens_list.append(r.prompt_tokens) else: result.fail_count += 1 if r.error: result.errors.append(r.error) result.latencies_ms.append(r.latency_ms) progress_done = batch_end pct = progress_done / total_count * 100 print(f" 进度: {progress_done}/{total_count} ({pct:.0f}%)", end="\r") result.total_time_s = time.perf_counter() - wall_start print() return result # ============================================================ # 结果报告 # ============================================================ def print_report(result: StressTestResult): """打印测试报告""" successful = [l for l, r in zip(result.latencies_ms, [True]*result.success_count + [False]*result.fail_count) if r and l > 0] all_latencies = [l for l in result.latencies_ms if l > 0] ctx_label = f"{result.context_size_tokens//1024}k tokens" if result.context_size_tokens > 0 else "默认" print(f"\n{'─'*60}") print(f" 测试报告: {result.model_name} ({result.model_type.upper()})") print(f"{'─'*60}") print(f" 并发数: {result.concurrency}") print(f" 上下文长度: {ctx_label}") print(f" 总请求: {result.total_requests}") print(f" 成功: {result.success_count}") print(f" 失败: {result.fail_count}") print(f" 错误率: {result.fail_count/result.total_requests*100:.1f}%") print(f" 总耗时: {result.total_time_s:.2f}s") if result.total_time_s > 0: throughput = result.success_count / result.total_time_s print(f" 吞吐量: {throughput:.2f} req/s") # LLM token 统计 total_completion_tokens = sum(result.completion_tokens_list) total_prompt_tokens = sum(result.prompt_tokens_list) has_token_data = total_completion_tokens > 0 or total_prompt_tokens > 0 if has_token_data and result.total_time_s > 0: tokens_per_sec = total_completion_tokens / result.total_time_s avg_completion = total_completion_tokens / len(result.completion_tokens_list) if result.completion_tokens_list else 0 avg_prompt = total_prompt_tokens / len(result.prompt_tokens_list) if result.prompt_tokens_list else 0 print(f"\n Token 统计:") print(f" 总输入 token: {total_prompt_tokens}") print(f" 总输出 token: {total_completion_tokens}") print(f" 平均输入/请求: {avg_prompt:.0f}") print(f" 平均输出/请求: {avg_completion:.0f}") print(f" 输出 tokens/s: {tokens_per_sec:.1f}") elif has_token_data: print(f"\n Token 统计: (总耗时为0,无法计算吞吐)") if all_latencies: print(f"\n 延迟统计 (ms):") print(f" 最小值: {min(all_latencies):.0f}") print(f" 最大值: {max(all_latencies):.0f}") print(f" 平均值: {statistics.mean(all_latencies):.0f}") sorted_lat = sorted(all_latencies) p50 = sorted_lat[int(len(sorted_lat) * 0.5)] p95 = sorted_lat[min(int(len(sorted_lat) * 0.95), len(sorted_lat)-1)] p99 = sorted_lat[min(int(len(sorted_lat) * 0.99), len(sorted_lat)-1)] print(f" P50: {p50:.0f}") print(f" P95: {p95:.0f}") print(f" P99: {p99:.0f}") if len(all_latencies) > 1: print(f" 标准差: {statistics.stdev(all_latencies):.0f}") if result.errors: unique_errors = list(set(result.errors))[:5] print(f"\n 错误样本 (最多5条):") for err in unique_errors: print(f" - {err}") print(f"{'─'*60}") total_completion_tokens = sum(result.completion_tokens_list) total_prompt_tokens = sum(result.prompt_tokens_list) tokens_per_sec = round(total_completion_tokens / result.total_time_s, 1) if result.total_time_s > 0 and total_completion_tokens > 0 else 0 return { "model": result.model_name, "type": result.model_type, "concurrency": result.concurrency, "context_tokens": result.context_size_tokens, "total": result.total_requests, "success": result.success_count, "fail": result.fail_count, "error_rate": f"{result.fail_count/result.total_requests*100:.1f}%", "total_time_s": round(result.total_time_s, 2), "throughput_rps": round(result.success_count / result.total_time_s, 2) if result.total_time_s > 0 else 0, "latency_avg_ms": round(statistics.mean(all_latencies), 0) if all_latencies else 0, "latency_p95_ms": round(sorted(all_latencies)[min(int(len(all_latencies)*0.95), len(all_latencies)-1)], 0) if all_latencies else 0, "completion_tokens": total_completion_tokens, "prompt_tokens": total_prompt_tokens, "tokens_per_sec": tokens_per_sec, } # ============================================================ # 主入口 # ============================================================ def parse_args(): parser = argparse.ArgumentParser( description="模型压力测试 - 测试 LLM / Embedding 模型的并发能力和延迟", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 默认测试(蜀天35B, 10并发, 50次) python utils_test/Model_Test/test_model_stress.py # 测试蜀天122B, 20并发, 100次 python utils_test/Model_Test/test_model_stress.py --model shutian_qwen3_5_122b -c 20 -n 100 # 测试 4k 上下文长度 python utils_test/Model_Test/test_model_stress.py --context-size 4k # 自动遍历 1k/2k/4k/8k 上下文长度,输出对比 python utils_test/Model_Test/test_model_stress.py --context-size all # 测试 Embedding python utils_test/Model_Test/test_model_stress.py --type embedding -c 20 -n 200 # 使用 function_name python utils_test/Model_Test/test_model_stress.py --function completeness_review_generate # 测试所有 LLM 模型 python utils_test/Model_Test/test_model_stress.py --all-models -c 5 -n 10 """, ) parser.add_argument( "--type", choices=["llm", "embedding"], default="llm", help="模型类型: llm 或 embedding (默认: llm)", ) parser.add_argument( "--model", "-m", type=str, default=None, help="模型名称,如 shutian_qwen3_5_35b (默认: shutian_qwen3_5_35b)", ) parser.add_argument( "--function", "-f", type=str, default=None, help="功能名称(从 model_setting.yaml 加载模型配置),如 completeness_review_generate", ) parser.add_argument( "--concurrency", "-c", type=int, default=10, help="并发数 (默认: 10)", ) parser.add_argument( "--count", "-n", type=int, default=50, help="总请求次数 (默认: 50)", ) parser.add_argument( "--context-size", "-ctx", type=str, default=None, help="上下文长度: 1k / 2k / 4k / 8k / 16k / all(逐个测试)/ 数字如 2048 (默认: 不填充)", ) parser.add_argument( "--all-models", action="store_true", help="逐个测试所有可用模型(使用 -c 和 -n 指定并发和次数)", ) parser.add_argument( "--all-embeddings", action="store_true", help="逐个测试所有 Embedding 模型", ) parser.add_argument( "--bust-cache", action="store_true", help="在每次请求的 prompt 末尾注入随机值,避免服务端 KV 缓存命中", ) return parser.parse_args() def _parse_context_size(raw: Optional[str]) -> List[int]: """解析 --context-size 参数,返回 token 数列表。 支持: 1k, 2k, 4k, 8k, all, 或纯数字如 2048 """ if raw is None: return [0] # 0 = 不填充 raw = raw.strip().lower() if raw == "all": return list(CONTEXT_SIZE_PRESETS.values()) if raw in CONTEXT_SIZE_PRESETS: return [CONTEXT_SIZE_PRESETS[raw]] # 纯数字 try: return [int(raw)] except ValueError: print(f" [错误] 不支持的 --context-size 值: {raw},可选: 1k/2k/4k/8k/all/数字") sys.exit(1) async def _run_single_model_test(args, model_name: str, function_name: Optional[str], context_sizes: List[int]) -> List[dict]: """对单个模型执行一组上下文长度的压力测试""" results_summary = [] for ctx_size in context_sizes: ctx_display = f"{ctx_size//1024}k" if ctx_size > 0 else "默认" try: result = await run_stress_test( model_name=model_name or "via_function", model_type=args.type, concurrency=args.concurrency, total_count=args.count, function_name=function_name, context_size=ctx_size, bust_cache=args.bust_cache, ) summary = print_report(result) summary["context_display"] = ctx_display results_summary.append(summary) except Exception as e: print(f"\n [跳过] {ctx_display}: {e}") results_summary.append({ "model": function_name or model_name, "type": args.type, "context_display": ctx_display, "error": str(e)[:200], }) return results_summary async def main(): args = parse_args() context_sizes = _parse_context_size(args.context_size) if args.all_models or args.all_embeddings: # 逐个测试所有模型 models = LLM_MODELS if args.all_models else EMBEDDING_MODELS model_type = "llm" if args.all_models else "embedding" results_summary = [] for model_name in models: try: result = await run_stress_test( model_name=model_name, model_type=model_type, concurrency=args.concurrency, total_count=args.count, bust_cache=args.bust_cache, ) summary = print_report(result) results_summary.append(summary) except Exception as e: print(f"\n [跳过] {model_name}: {e}") results_summary.append({ "model": model_name, "type": model_type, "error": str(e)[:200], }) # 汇总表 print(f"\n\n{'='*90}") print(f" 汇总对比") print(f"{'='*90}") print(f" {'模型':<30} {'成功':>6} {'失败':>6} {'错误率':>8} {'吞吐量':>10} {'延迟avg':>10} {'P95':>10} {'tok/s':>8}") print(f" {'─'*30} {'─'*6} {'─'*6} {'─'*8} {'─'*10} {'─'*10} {'─'*10} {'─'*8}") for s in results_summary: if "error" in s: print(f" {s['model']:<30} {'SKIP':>6} - {s['error'][:40]}") else: tps = s.get('tokens_per_sec', 0) tps_str = f"{tps:.1f}" if tps > 0 else "n/a" print(f" {s['model']:<30} {s['success']:>6} {s['fail']:>6} {s['error_rate']:>8}" f" {s['throughput_rps']:>8.1f}/s {s['latency_avg_ms']:>8.0f}ms {s['latency_p95_ms']:>8.0f}ms {tps_str:>8}") print(f"{'='*90}") else: # 单模型测试(支持多上下文长度) model_name = args.model if not model_name and not args.function: if args.type == "embedding": model_name = "shutian_qwen3_embed" else: model_name = "shutian_qwen3_5_35b" results_summary = await _run_single_model_test( args, model_name, args.function, context_sizes, ) # 多上下文长度时输出对比汇总 if len(context_sizes) > 1: print(f"\n\n{'='*90}") print(f" 上下文长度对比: {args.function or model_name}") print(f"{'='*90}") print(f" {'上下文':<10} {'成功':>6} {'失败':>6} {'错误率':>8} {'吞吐量':>10} {'延迟avg':>10} {'P95':>10} {'tok/s':>8}") print(f" {'─'*10} {'─'*6} {'─'*6} {'─'*8} {'─'*10} {'─'*10} {'─'*10} {'─'*8}") for s in results_summary: if "error" in s: print(f" {s['context_display']:<10} {'SKIP':>6} - {s['error'][:40]}") else: tps = s.get('tokens_per_sec', 0) tps_str = f"{tps:.1f}" if tps > 0 else "n/a" print(f" {s['context_display']:<10} {s['success']:>6} {s['fail']:>6} {s['error_rate']:>8}" f" {s['throughput_rps']:>8.1f}/s {s['latency_avg_ms']:>8.0f}ms {s['latency_p95_ms']:>8.0f}ms {tps_str:>8}") print(f"{'='*90}") if __name__ == "__main__": asyncio.run(main())