| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """全量 chunk 词句语法审查 — 保存所有原始响应用于人工分析"""
- import sys, os, json, asyncio, time
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)).split('utils_test')[0])
- RESULT_JSON = os.path.join(
- os.path.dirname(os.path.abspath(__file__)).split('utils_test')[0],
- "temp", "construction_review", "final_result",
- "67d45692fb97aeef8f896e78475ce539-1779781589.json"
- )
- OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "full_scan_results")
- async def main():
- from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer
- with open(RESULT_JSON, 'r', encoding='utf-8') as f:
- data = json.load(f)
- chunks = data['document_result']['structured_content']['chunks']
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- reviewer = GrammarCheckReviewer()
- all_results = []
- for i, chunk in enumerate(chunks):
- content = chunk['content']
- section = chunk.get('section_label', f'chunk_{i}')
- chapter = chunk.get('chapter_classification', 'unknown')
- trace_id = f"full_scan_{i}_{int(time.time())}"
- print(f"[{i:02d}/{len(chunks)}] {chapter}/{section[:40]}... (len={len(content)})")
- start = time.time()
- try:
- result = await reviewer.check_grammar(
- trace_id=trace_id,
- review_content=content,
- state=None, stage_name=None,
- enable_thinking=False,
- )
- wall_time = time.time() - start
- response_text = result.details.get('response', '')
- success = result.success
- error = result.error_message
- except Exception as e:
- wall_time = time.time() - start
- response_text = ""
- success = False
- error = str(e)
- print(f" ERROR: {e}")
- record = {
- "chunk_index": i,
- "chapter": chapter,
- "section": section,
- "content_length": len(content),
- "content_preview": content[:200],
- "success": success,
- "error": error,
- "wall_time": round(wall_time, 2),
- "response_length": len(response_text),
- "raw_response": response_text,
- }
- all_results.append(record)
- is_no_issue = '无明显问题' in response_text and len(response_text) < 50
- status = "NO_ISSUE" if is_no_issue else f"ISSUES(response_len={len(response_text)})"
- print(f" {wall_time:.2f}s | {status}")
- # 保存汇总
- summary_path = os.path.join(OUTPUT_DIR, "all_results.json")
- with open(summary_path, 'w', encoding='utf-8') as f:
- json.dump(all_results, f, ensure_ascii=False, indent=2)
- print(f"\nSaved {len(all_results)} results to {summary_path}")
- # 保存每个 chunk 的独立文件(方便逐条阅读)
- for record in all_results:
- idx = record["chunk_index"]
- chunk_path = os.path.join(OUTPUT_DIR, f"chunk_{idx:02d}_{record['chapter']}.json")
- with open(chunk_path, 'w', encoding='utf-8') as f:
- json.dump(record, f, ensure_ascii=False, indent=2)
- print(f"Saved individual files to {OUTPUT_DIR}/")
- # 打印统计
- no_issue_count = sum(1 for r in all_results if '无明显问题' in r['raw_response'] and len(r['raw_response']) < 50)
- issue_count = len(all_results) - no_issue_count
- error_count = sum(1 for r in all_results if not r['success'])
- print(f"\nStats: {no_issue_count} no-issue, {issue_count} has-issues, {error_count} errors")
- asyncio.run(main())
|