analyze_grammar_quality.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """
  2. 分析最新审查结果中词句语法审查的质量
  3. 检查项:
  4. 1. "将A改为A" 模式(修正前后相同)
  5. 2. suggestion/reason 中的自我辩论(犹豫措辞)
  6. 3. risk_level 为空
  7. 4. 技术操作规程越界审查
  8. 5. 重复问题
  9. 6. JSON 解析失败
  10. 7. suggestion 过长(>200字,可能包含推理过程)
  11. """
  12. import json
  13. import re
  14. import sys
  15. import os
  16. project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. sys.path.insert(0, project_root)
  18. def extract_correction_pairs(suggestion: str):
  19. """从 suggestion 中提取所有 '将X改为Y' 的 (X, Y) 对"""
  20. quote_chars = r"""['""''「」]"""
  21. pattern = rf"将{quote_chars}(.*?){quote_chars}\s*改为\s*{quote_chars}(.*?){quote_chars}"
  22. return re.findall(pattern, suggestion)
  23. def check_hesitation_words(text: str):
  24. """检查文本中是否包含犹豫措辞"""
  25. hesitation_words = [
  26. '可能', '暂定', '不确定', '重新审视', '然而', '不过', '似乎',
  27. '但是', '其实', '实际上', '再细看', '再想想', '仔细想想',
  28. '反过来', '另一方面', '换个角度'
  29. ]
  30. found = [w for w in hesitation_words if w in text]
  31. return found
  32. def is_technical_procedure(issue_point: str, reason: str):
  33. """检查是否为技术操作规程越界审查"""
  34. technical_keywords = [
  35. '操作步骤', '工艺参数', '施工顺序', '操作规程',
  36. '技术规范', '施工方案', '工艺流程'
  37. ]
  38. combined = issue_point + reason
  39. return [kw for kw in technical_keywords if kw in combined]
  40. def analyze_grammar_check_results(result_file: str):
  41. """分析词句语法审查结果质量"""
  42. with open(result_file, encoding='utf-8') as f:
  43. data = json.load(f)
  44. issues = data.get('issues', [])
  45. grammar_items = []
  46. for issue_wrapper in issues:
  47. for issue_id, issue_detail in issue_wrapper.items():
  48. review_lists = issue_detail.get('review_lists', [])
  49. metadata = issue_detail.get('metadata', {})
  50. for item in review_lists:
  51. check_item = item.get('check_item', '')
  52. if check_item in ['sensitive_word_check', 'grammar_check']:
  53. grammar_items.append({
  54. 'item': item,
  55. 'issue_id': issue_id,
  56. 'location_label': metadata.get('review_location_label', '')
  57. })
  58. print(f"Total grammar_check items: {len(grammar_items)}")
  59. print()
  60. # Quality checks
  61. a_to_a_issues = []
  62. hesitation_issues = []
  63. empty_risk_issues = []
  64. technical_issues = []
  65. duplicate_issues = []
  66. parse_failures = []
  67. long_suggestion_issues = []
  68. seen_corrections = {}
  69. for i, entry in enumerate(grammar_items):
  70. item = entry['item']
  71. check_result = item.get('check_result', {})
  72. # STRING format = parse failure
  73. if isinstance(check_result, str):
  74. parse_failures.append({
  75. 'index': i + 1,
  76. 'raw': check_result[:200],
  77. 'location_label': entry['location_label']
  78. })
  79. continue
  80. issue_point = check_result.get('issue_point', '')
  81. location = check_result.get('location', '')
  82. suggestion = check_result.get('suggestion', '')
  83. reason = check_result.get('reason', '')
  84. risk_level = check_result.get('risk_level', '')
  85. # Check 1: Empty risk_level
  86. if not risk_level or risk_level.strip() == '':
  87. empty_risk_issues.append({
  88. 'index': i + 1,
  89. 'issue_point': issue_point,
  90. 'suggestion': suggestion[:80]
  91. })
  92. # Check 2: A→A pattern
  93. pairs = extract_correction_pairs(suggestion)
  94. for before, after in pairs:
  95. if before.strip() == after.strip():
  96. a_to_a_issues.append({
  97. 'index': i + 1,
  98. 'issue_point': issue_point,
  99. 'before': before,
  100. 'after': after
  101. })
  102. # Check 3: Hesitation words in suggestion
  103. sug_hesitation = check_hesitation_words(suggestion)
  104. if sug_hesitation:
  105. hesitation_issues.append({
  106. 'index': i + 1,
  107. 'field': 'suggestion',
  108. 'words': sug_hesitation,
  109. 'text': suggestion[:100]
  110. })
  111. # Check 4: Hesitation words in reason
  112. reason_hesitation = check_hesitation_words(reason)
  113. if reason_hesitation:
  114. hesitation_issues.append({
  115. 'index': i + 1,
  116. 'field': 'reason',
  117. 'words': reason_hesitation,
  118. 'text': reason[:100]
  119. })
  120. # Check 5: Technical procedure
  121. tech_kws = is_technical_procedure(issue_point, reason)
  122. if tech_kws:
  123. technical_issues.append({
  124. 'index': i + 1,
  125. 'issue_point': issue_point,
  126. 'keywords': tech_kws
  127. })
  128. # Check 6: Long suggestion (>200 chars)
  129. if len(suggestion) > 200:
  130. long_suggestion_issues.append({
  131. 'index': i + 1,
  132. 'issue_point': issue_point,
  133. 'length': len(suggestion),
  134. 'text': suggestion[:100]
  135. })
  136. # Check 7: Duplicates (same correction key)
  137. if pairs:
  138. sorted_pairs = sorted(pairs)
  139. correction_key = ",".join(f"{a}→{b}" for a, b in sorted_pairs)
  140. else:
  141. correction_key = suggestion.strip()
  142. if correction_key in seen_corrections:
  143. duplicate_issues.append({
  144. 'index': i + 1,
  145. 'first_index': seen_corrections[correction_key],
  146. 'correction_key': correction_key,
  147. 'issue_point': issue_point
  148. })
  149. else:
  150. seen_corrections[correction_key] = i + 1
  151. # Print results
  152. print("=" * 60)
  153. print("QUALITY ANALYSIS RESULTS")
  154. print("=" * 60)
  155. sections = [
  156. ("A->A Pattern (will A change to A)", a_to_a_issues),
  157. ("Self-debate / Hesitation words", hesitation_issues),
  158. ("Empty risk_level", empty_risk_issues),
  159. ("Technical procedure (out of scope)", technical_issues),
  160. ("Duplicate corrections", duplicate_issues),
  161. ("JSON parse failures", parse_failures),
  162. ("Long suggestions (>200 chars)", long_suggestion_issues),
  163. ]
  164. total_problems = 0
  165. for title, items in sections:
  166. count = len(items)
  167. total_problems += count
  168. status = "PASS" if count == 0 else "FAIL"
  169. print(f"\n[{status}] {title}: {count}")
  170. if items:
  171. for item in items:
  172. print(f" - #{item.get('index', '?')}: {json.dumps(item, ensure_ascii=False)[:150]}")
  173. print(f"\n{'=' * 60}")
  174. print(f"TOTAL: {len(grammar_items)} items, {total_problems} quality issues")
  175. print(f"Quality rate: {(len(grammar_items) - total_problems) / len(grammar_items) * 100:.1f}%")
  176. # Print valid items summary
  177. print(f"\n{'=' * 60}")
  178. print("VALID ITEMS SUMMARY")
  179. print("=" * 60)
  180. for i, entry in enumerate(grammar_items):
  181. item = entry['item']
  182. check_result = item.get('check_result', {})
  183. if isinstance(check_result, str):
  184. print(f" [{i+1}] [PARSE_FAIL] {check_result[:60]}...")
  185. continue
  186. issue_point = check_result.get('issue_point', '')
  187. suggestion = check_result.get('suggestion', '')
  188. risk_level = check_result.get('risk_level', '')
  189. print(f" [{i+1}] [{risk_level}] {issue_point}: {suggestion[:60]}...")
  190. if __name__ == "__main__":
  191. result_file = os.path.join(
  192. project_root,
  193. "temp", "construction_review", "final_result",
  194. "67d45692fb97aeef8f896e78475ce539-1779785718.json"
  195. )
  196. analyze_grammar_check_results(result_file)