diff_service.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # -*- coding: utf-8 -*-
  2. """Deterministic diff service for document chat proposals."""
  3. import difflib
  4. import hashlib
  5. import re
  6. from typing import List
  7. from core.document_chat.schemas import DiffItem, DiffResult
  8. class DiffService:
  9. """Build paragraph/line diffs, falling back to full-content comparison."""
  10. _COMPLEX_PATTERNS = (
  11. re.compile(r"<table[\s>]", re.IGNORECASE),
  12. re.compile(r"</table>", re.IGNORECASE),
  13. re.compile(r"!\[[^\]]*\]\("),
  14. re.compile(r"<表格开始>|<表格结束>"),
  15. )
  16. def build_diff(self, old_content: str, new_content: str) -> DiffResult:
  17. old_text = old_content or ""
  18. new_text = new_content or ""
  19. old_hash = self.hash_content(old_text)
  20. new_hash = self.hash_content(new_text)
  21. if self._is_complex(old_text) or self._is_complex(new_text):
  22. return DiffResult(
  23. old_content_hash=old_hash,
  24. new_content_hash=new_hash,
  25. diff=[DiffItem(type="full_content", old_text=old_text, new_text=new_text)],
  26. diff_granularity="full_content",
  27. )
  28. old_units = self._split_units(old_text)
  29. new_units = self._split_units(new_text)
  30. matcher = difflib.SequenceMatcher(a=old_units, b=new_units, autojunk=False)
  31. diff_items: List[DiffItem] = []
  32. for tag, i1, i2, j1, j2 in matcher.get_opcodes():
  33. old_part = "\n".join(old_units[i1:i2])
  34. new_part = "\n".join(new_units[j1:j2])
  35. if tag == "equal":
  36. diff_items.append(DiffItem(type="equal", old_text=old_part, new_text=new_part))
  37. elif tag == "insert":
  38. diff_items.append(DiffItem(type="insert", old_text="", new_text=new_part))
  39. elif tag == "delete":
  40. diff_items.append(DiffItem(type="delete", old_text=old_part, new_text=""))
  41. elif tag == "replace":
  42. diff_items.append(DiffItem(type="replace", old_text=old_part, new_text=new_part))
  43. return DiffResult(
  44. old_content_hash=old_hash,
  45. new_content_hash=new_hash,
  46. diff=diff_items,
  47. diff_granularity="line",
  48. )
  49. @staticmethod
  50. def hash_content(content: str) -> str:
  51. digest = hashlib.sha256((content or "").encode("utf-8")).hexdigest()
  52. return f"sha256:{digest}"
  53. def _is_complex(self, content: str) -> bool:
  54. if not content:
  55. return False
  56. if any(pattern.search(content) for pattern in self._COMPLEX_PATTERNS):
  57. return True
  58. lines = [line for line in content.splitlines() if line.strip()]
  59. table_like_lines = sum(1 for line in lines if line.count("|") >= 2)
  60. return table_like_lines >= 2
  61. @staticmethod
  62. def _split_units(content: str) -> List[str]:
  63. if not content:
  64. return []
  65. paragraphs = [part.strip() for part in re.split(r"\n\s*\n", content.strip()) if part.strip()]
  66. if len(paragraphs) > 1:
  67. return paragraphs
  68. return [line.rstrip() for line in content.splitlines() if line.strip()]