#!/usr/bin/env python # -*- coding: utf-8 -*- """全量 chunk 词句语法审查 — 保存所有原始响应用于人工分析""" import sys, os, json, asyncio, time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)).split('utils_test')[0]) RESULT_JSON = os.path.join( os.path.dirname(os.path.abspath(__file__)).split('utils_test')[0], "temp", "construction_review", "final_result", "67d45692fb97aeef8f896e78475ce539-1779781589.json" ) OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "full_scan_results") async def main(): from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer with open(RESULT_JSON, 'r', encoding='utf-8') as f: data = json.load(f) chunks = data['document_result']['structured_content']['chunks'] os.makedirs(OUTPUT_DIR, exist_ok=True) reviewer = GrammarCheckReviewer() all_results = [] for i, chunk in enumerate(chunks): content = chunk['content'] section = chunk.get('section_label', f'chunk_{i}') chapter = chunk.get('chapter_classification', 'unknown') trace_id = f"full_scan_{i}_{int(time.time())}" print(f"[{i:02d}/{len(chunks)}] {chapter}/{section[:40]}... (len={len(content)})") start = time.time() try: result = await reviewer.check_grammar( trace_id=trace_id, review_content=content, state=None, stage_name=None, enable_thinking=False, ) wall_time = time.time() - start response_text = result.details.get('response', '') success = result.success error = result.error_message except Exception as e: wall_time = time.time() - start response_text = "" success = False error = str(e) print(f" ERROR: {e}") record = { "chunk_index": i, "chapter": chapter, "section": section, "content_length": len(content), "content_preview": content[:200], "success": success, "error": error, "wall_time": round(wall_time, 2), "response_length": len(response_text), "raw_response": response_text, } all_results.append(record) is_no_issue = '无明显问题' in response_text and len(response_text) < 50 status = "NO_ISSUE" if is_no_issue else f"ISSUES(response_len={len(response_text)})" print(f" {wall_time:.2f}s | {status}") # 保存汇总 summary_path = os.path.join(OUTPUT_DIR, "all_results.json") with open(summary_path, 'w', encoding='utf-8') as f: json.dump(all_results, f, ensure_ascii=False, indent=2) print(f"\nSaved {len(all_results)} results to {summary_path}") # 保存每个 chunk 的独立文件(方便逐条阅读) for record in all_results: idx = record["chunk_index"] chunk_path = os.path.join(OUTPUT_DIR, f"chunk_{idx:02d}_{record['chapter']}.json") with open(chunk_path, 'w', encoding='utf-8') as f: json.dump(record, f, ensure_ascii=False, indent=2) print(f"Saved individual files to {OUTPUT_DIR}/") # 打印统计 no_issue_count = sum(1 for r in all_results if '无明显问题' in r['raw_response'] and len(r['raw_response']) < 50) issue_count = len(all_results) - no_issue_count error_count = sum(1 for r in all_results if not r['success']) print(f"\nStats: {no_issue_count} no-issue, {issue_count} has-issues, {error_count} errors") asyncio.run(main())