| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- """
- 分析最新审查结果中词句语法审查的质量
- 检查项:
- 1. "将A改为A" 模式(修正前后相同)
- 2. suggestion/reason 中的自我辩论(犹豫措辞)
- 3. risk_level 为空
- 4. 技术操作规程越界审查
- 5. 重复问题
- 6. JSON 解析失败
- 7. suggestion 过长(>200字,可能包含推理过程)
- """
- import json
- import re
- import sys
- import os
- project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- sys.path.insert(0, project_root)
- def extract_correction_pairs(suggestion: str):
- """从 suggestion 中提取所有 '将X改为Y' 的 (X, Y) 对"""
- quote_chars = r"""['""''「」]"""
- pattern = rf"将{quote_chars}(.*?){quote_chars}\s*改为\s*{quote_chars}(.*?){quote_chars}"
- return re.findall(pattern, suggestion)
- def check_hesitation_words(text: str):
- """检查文本中是否包含犹豫措辞"""
- hesitation_words = [
- '可能', '暂定', '不确定', '重新审视', '然而', '不过', '似乎',
- '但是', '其实', '实际上', '再细看', '再想想', '仔细想想',
- '反过来', '另一方面', '换个角度'
- ]
- found = [w for w in hesitation_words if w in text]
- return found
- def is_technical_procedure(issue_point: str, reason: str):
- """检查是否为技术操作规程越界审查"""
- technical_keywords = [
- '操作步骤', '工艺参数', '施工顺序', '操作规程',
- '技术规范', '施工方案', '工艺流程'
- ]
- combined = issue_point + reason
- return [kw for kw in technical_keywords if kw in combined]
- def analyze_grammar_check_results(result_file: str):
- """分析词句语法审查结果质量"""
- with open(result_file, encoding='utf-8') as f:
- data = json.load(f)
- issues = data.get('issues', [])
- grammar_items = []
- for issue_wrapper in issues:
- for issue_id, issue_detail in issue_wrapper.items():
- review_lists = issue_detail.get('review_lists', [])
- metadata = issue_detail.get('metadata', {})
- for item in review_lists:
- check_item = item.get('check_item', '')
- if check_item in ['sensitive_word_check', 'grammar_check']:
- grammar_items.append({
- 'item': item,
- 'issue_id': issue_id,
- 'location_label': metadata.get('review_location_label', '')
- })
- print(f"Total grammar_check items: {len(grammar_items)}")
- print()
- # Quality checks
- a_to_a_issues = []
- hesitation_issues = []
- empty_risk_issues = []
- technical_issues = []
- duplicate_issues = []
- parse_failures = []
- long_suggestion_issues = []
- seen_corrections = {}
- for i, entry in enumerate(grammar_items):
- item = entry['item']
- check_result = item.get('check_result', {})
- # STRING format = parse failure
- if isinstance(check_result, str):
- parse_failures.append({
- 'index': i + 1,
- 'raw': check_result[:200],
- 'location_label': entry['location_label']
- })
- continue
- issue_point = check_result.get('issue_point', '')
- location = check_result.get('location', '')
- suggestion = check_result.get('suggestion', '')
- reason = check_result.get('reason', '')
- risk_level = check_result.get('risk_level', '')
- # Check 1: Empty risk_level
- if not risk_level or risk_level.strip() == '':
- empty_risk_issues.append({
- 'index': i + 1,
- 'issue_point': issue_point,
- 'suggestion': suggestion[:80]
- })
- # Check 2: A→A pattern
- pairs = extract_correction_pairs(suggestion)
- for before, after in pairs:
- if before.strip() == after.strip():
- a_to_a_issues.append({
- 'index': i + 1,
- 'issue_point': issue_point,
- 'before': before,
- 'after': after
- })
- # Check 3: Hesitation words in suggestion
- sug_hesitation = check_hesitation_words(suggestion)
- if sug_hesitation:
- hesitation_issues.append({
- 'index': i + 1,
- 'field': 'suggestion',
- 'words': sug_hesitation,
- 'text': suggestion[:100]
- })
- # Check 4: Hesitation words in reason
- reason_hesitation = check_hesitation_words(reason)
- if reason_hesitation:
- hesitation_issues.append({
- 'index': i + 1,
- 'field': 'reason',
- 'words': reason_hesitation,
- 'text': reason[:100]
- })
- # Check 5: Technical procedure
- tech_kws = is_technical_procedure(issue_point, reason)
- if tech_kws:
- technical_issues.append({
- 'index': i + 1,
- 'issue_point': issue_point,
- 'keywords': tech_kws
- })
- # Check 6: Long suggestion (>200 chars)
- if len(suggestion) > 200:
- long_suggestion_issues.append({
- 'index': i + 1,
- 'issue_point': issue_point,
- 'length': len(suggestion),
- 'text': suggestion[:100]
- })
- # Check 7: Duplicates (same correction key)
- if pairs:
- sorted_pairs = sorted(pairs)
- correction_key = ",".join(f"{a}→{b}" for a, b in sorted_pairs)
- else:
- correction_key = suggestion.strip()
- if correction_key in seen_corrections:
- duplicate_issues.append({
- 'index': i + 1,
- 'first_index': seen_corrections[correction_key],
- 'correction_key': correction_key,
- 'issue_point': issue_point
- })
- else:
- seen_corrections[correction_key] = i + 1
- # Print results
- print("=" * 60)
- print("QUALITY ANALYSIS RESULTS")
- print("=" * 60)
- sections = [
- ("A->A Pattern (will A change to A)", a_to_a_issues),
- ("Self-debate / Hesitation words", hesitation_issues),
- ("Empty risk_level", empty_risk_issues),
- ("Technical procedure (out of scope)", technical_issues),
- ("Duplicate corrections", duplicate_issues),
- ("JSON parse failures", parse_failures),
- ("Long suggestions (>200 chars)", long_suggestion_issues),
- ]
- total_problems = 0
- for title, items in sections:
- count = len(items)
- total_problems += count
- status = "PASS" if count == 0 else "FAIL"
- print(f"\n[{status}] {title}: {count}")
- if items:
- for item in items:
- print(f" - #{item.get('index', '?')}: {json.dumps(item, ensure_ascii=False)[:150]}")
- print(f"\n{'=' * 60}")
- print(f"TOTAL: {len(grammar_items)} items, {total_problems} quality issues")
- print(f"Quality rate: {(len(grammar_items) - total_problems) / len(grammar_items) * 100:.1f}%")
- # Print valid items summary
- print(f"\n{'=' * 60}")
- print("VALID ITEMS SUMMARY")
- print("=" * 60)
- for i, entry in enumerate(grammar_items):
- item = entry['item']
- check_result = item.get('check_result', {})
- if isinstance(check_result, str):
- print(f" [{i+1}] [PARSE_FAIL] {check_result[:60]}...")
- continue
- issue_point = check_result.get('issue_point', '')
- suggestion = check_result.get('suggestion', '')
- risk_level = check_result.get('risk_level', '')
- print(f" [{i+1}] [{risk_level}] {issue_point}: {suggestion[:60]}...")
- if __name__ == "__main__":
- result_file = os.path.join(
- project_root,
- "temp", "construction_review", "final_result",
- "67d45692fb97aeef8f896e78475ce539-1779785718.json"
- )
- analyze_grammar_check_results(result_file)
|