| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- # -*- coding: utf-8 -*-
- """Deterministic diff service for document chat proposals."""
- import difflib
- import hashlib
- import re
- from typing import List
- from core.document_chat.schemas import DiffItem, DiffResult
- class DiffService:
- """Build paragraph/line diffs, falling back to full-content comparison."""
- _COMPLEX_PATTERNS = (
- re.compile(r"<table[\s>]", re.IGNORECASE),
- re.compile(r"</table>", re.IGNORECASE),
- re.compile(r"!\[[^\]]*\]\("),
- re.compile(r"<表格开始>|<表格结束>"),
- )
- def build_diff(self, old_content: str, new_content: str) -> DiffResult:
- old_text = old_content or ""
- new_text = new_content or ""
- old_hash = self.hash_content(old_text)
- new_hash = self.hash_content(new_text)
- if self._is_complex(old_text) or self._is_complex(new_text):
- return DiffResult(
- old_content_hash=old_hash,
- new_content_hash=new_hash,
- diff=[DiffItem(type="full_content", old_text=old_text, new_text=new_text)],
- diff_granularity="full_content",
- )
- old_units = self._split_units(old_text)
- new_units = self._split_units(new_text)
- matcher = difflib.SequenceMatcher(a=old_units, b=new_units, autojunk=False)
- diff_items: List[DiffItem] = []
- for tag, i1, i2, j1, j2 in matcher.get_opcodes():
- old_part = "\n".join(old_units[i1:i2])
- new_part = "\n".join(new_units[j1:j2])
- if tag == "equal":
- diff_items.append(DiffItem(type="equal", old_text=old_part, new_text=new_part))
- elif tag == "insert":
- diff_items.append(DiffItem(type="insert", old_text="", new_text=new_part))
- elif tag == "delete":
- diff_items.append(DiffItem(type="delete", old_text=old_part, new_text=""))
- elif tag == "replace":
- diff_items.append(DiffItem(type="replace", old_text=old_part, new_text=new_part))
- return DiffResult(
- old_content_hash=old_hash,
- new_content_hash=new_hash,
- diff=diff_items,
- diff_granularity="line",
- )
- @staticmethod
- def hash_content(content: str) -> str:
- digest = hashlib.sha256((content or "").encode("utf-8")).hexdigest()
- return f"sha256:{digest}"
- def _is_complex(self, content: str) -> bool:
- if not content:
- return False
- if any(pattern.search(content) for pattern in self._COMPLEX_PATTERNS):
- return True
- lines = [line for line in content.splitlines() if line.strip()]
- table_like_lines = sum(1 for line in lines if line.count("|") >= 2)
- return table_like_lines >= 2
- @staticmethod
- def _split_units(content: str) -> List[str]:
- if not content:
- return []
- paragraphs = [part.strip() for part in re.split(r"\n\s*\n", content.strip()) if part.strip()]
- if len(paragraphs) > 1:
- return paragraphs
- return [line.rstrip() for line in content.splitlines() if line.strip()]
|