""" 分析最新审查结果中词句语法审查的质量 检查项: 1. "将A改为A" 模式(修正前后相同) 2. suggestion/reason 中的自我辩论(犹豫措辞) 3. risk_level 为空 4. 技术操作规程越界审查 5. 重复问题 6. JSON 解析失败 7. suggestion 过长(>200字,可能包含推理过程) """ import json import re import sys import os project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, project_root) def extract_correction_pairs(suggestion: str): """从 suggestion 中提取所有 '将X改为Y' 的 (X, Y) 对""" quote_chars = r"""['""''「」]""" pattern = rf"将{quote_chars}(.*?){quote_chars}\s*改为\s*{quote_chars}(.*?){quote_chars}" return re.findall(pattern, suggestion) def check_hesitation_words(text: str): """检查文本中是否包含犹豫措辞""" hesitation_words = [ '可能', '暂定', '不确定', '重新审视', '然而', '不过', '似乎', '但是', '其实', '实际上', '再细看', '再想想', '仔细想想', '反过来', '另一方面', '换个角度' ] found = [w for w in hesitation_words if w in text] return found def is_technical_procedure(issue_point: str, reason: str): """检查是否为技术操作规程越界审查""" technical_keywords = [ '操作步骤', '工艺参数', '施工顺序', '操作规程', '技术规范', '施工方案', '工艺流程' ] combined = issue_point + reason return [kw for kw in technical_keywords if kw in combined] def analyze_grammar_check_results(result_file: str): """分析词句语法审查结果质量""" with open(result_file, encoding='utf-8') as f: data = json.load(f) issues = data.get('issues', []) grammar_items = [] for issue_wrapper in issues: for issue_id, issue_detail in issue_wrapper.items(): review_lists = issue_detail.get('review_lists', []) metadata = issue_detail.get('metadata', {}) for item in review_lists: check_item = item.get('check_item', '') if check_item in ['sensitive_word_check', 'grammar_check']: grammar_items.append({ 'item': item, 'issue_id': issue_id, 'location_label': metadata.get('review_location_label', '') }) print(f"Total grammar_check items: {len(grammar_items)}") print() # Quality checks a_to_a_issues = [] hesitation_issues = [] empty_risk_issues = [] technical_issues = [] duplicate_issues = [] parse_failures = [] long_suggestion_issues = [] seen_corrections = {} for i, entry in enumerate(grammar_items): item = entry['item'] check_result = item.get('check_result', {}) # STRING format = parse failure if isinstance(check_result, str): parse_failures.append({ 'index': i + 1, 'raw': check_result[:200], 'location_label': entry['location_label'] }) continue issue_point = check_result.get('issue_point', '') location = check_result.get('location', '') suggestion = check_result.get('suggestion', '') reason = check_result.get('reason', '') risk_level = check_result.get('risk_level', '') # Check 1: Empty risk_level if not risk_level or risk_level.strip() == '': empty_risk_issues.append({ 'index': i + 1, 'issue_point': issue_point, 'suggestion': suggestion[:80] }) # Check 2: A→A pattern pairs = extract_correction_pairs(suggestion) for before, after in pairs: if before.strip() == after.strip(): a_to_a_issues.append({ 'index': i + 1, 'issue_point': issue_point, 'before': before, 'after': after }) # Check 3: Hesitation words in suggestion sug_hesitation = check_hesitation_words(suggestion) if sug_hesitation: hesitation_issues.append({ 'index': i + 1, 'field': 'suggestion', 'words': sug_hesitation, 'text': suggestion[:100] }) # Check 4: Hesitation words in reason reason_hesitation = check_hesitation_words(reason) if reason_hesitation: hesitation_issues.append({ 'index': i + 1, 'field': 'reason', 'words': reason_hesitation, 'text': reason[:100] }) # Check 5: Technical procedure tech_kws = is_technical_procedure(issue_point, reason) if tech_kws: technical_issues.append({ 'index': i + 1, 'issue_point': issue_point, 'keywords': tech_kws }) # Check 6: Long suggestion (>200 chars) if len(suggestion) > 200: long_suggestion_issues.append({ 'index': i + 1, 'issue_point': issue_point, 'length': len(suggestion), 'text': suggestion[:100] }) # Check 7: Duplicates (same correction key) if pairs: sorted_pairs = sorted(pairs) correction_key = ",".join(f"{a}→{b}" for a, b in sorted_pairs) else: correction_key = suggestion.strip() if correction_key in seen_corrections: duplicate_issues.append({ 'index': i + 1, 'first_index': seen_corrections[correction_key], 'correction_key': correction_key, 'issue_point': issue_point }) else: seen_corrections[correction_key] = i + 1 # Print results print("=" * 60) print("QUALITY ANALYSIS RESULTS") print("=" * 60) sections = [ ("A->A Pattern (will A change to A)", a_to_a_issues), ("Self-debate / Hesitation words", hesitation_issues), ("Empty risk_level", empty_risk_issues), ("Technical procedure (out of scope)", technical_issues), ("Duplicate corrections", duplicate_issues), ("JSON parse failures", parse_failures), ("Long suggestions (>200 chars)", long_suggestion_issues), ] total_problems = 0 for title, items in sections: count = len(items) total_problems += count status = "PASS" if count == 0 else "FAIL" print(f"\n[{status}] {title}: {count}") if items: for item in items: print(f" - #{item.get('index', '?')}: {json.dumps(item, ensure_ascii=False)[:150]}") print(f"\n{'=' * 60}") print(f"TOTAL: {len(grammar_items)} items, {total_problems} quality issues") print(f"Quality rate: {(len(grammar_items) - total_problems) / len(grammar_items) * 100:.1f}%") # Print valid items summary print(f"\n{'=' * 60}") print("VALID ITEMS SUMMARY") print("=" * 60) for i, entry in enumerate(grammar_items): item = entry['item'] check_result = item.get('check_result', {}) if isinstance(check_result, str): print(f" [{i+1}] [PARSE_FAIL] {check_result[:60]}...") continue issue_point = check_result.get('issue_point', '') suggestion = check_result.get('suggestion', '') risk_level = check_result.get('risk_level', '') print(f" [{i+1}] [{risk_level}] {issue_point}: {suggestion[:60]}...") if __name__ == "__main__": result_file = os.path.join( project_root, "temp", "construction_review", "final_result", "67d45692fb97aeef8f896e78475ce539-1779785718.json" ) analyze_grammar_check_results(result_file)