# -*- coding: utf-8 -*- """Deterministic diff service for document chat proposals.""" import difflib import hashlib import re from typing import List from core.document_chat.schemas import DiffItem, DiffResult class DiffService: """Build paragraph/line diffs, falling back to full-content comparison.""" _COMPLEX_PATTERNS = ( re.compile(r"]", re.IGNORECASE), re.compile(r"", re.IGNORECASE), re.compile(r"!\[[^\]]*\]\("), re.compile(r"<表格开始>|<表格结束>"), ) def build_diff(self, old_content: str, new_content: str) -> DiffResult: old_text = old_content or "" new_text = new_content or "" old_hash = self.hash_content(old_text) new_hash = self.hash_content(new_text) if self._is_complex(old_text) or self._is_complex(new_text): return DiffResult( old_content_hash=old_hash, new_content_hash=new_hash, diff=[DiffItem(type="full_content", old_text=old_text, new_text=new_text)], diff_granularity="full_content", ) old_units = self._split_units(old_text) new_units = self._split_units(new_text) matcher = difflib.SequenceMatcher(a=old_units, b=new_units, autojunk=False) diff_items: List[DiffItem] = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): old_part = "\n".join(old_units[i1:i2]) new_part = "\n".join(new_units[j1:j2]) if tag == "equal": diff_items.append(DiffItem(type="equal", old_text=old_part, new_text=new_part)) elif tag == "insert": diff_items.append(DiffItem(type="insert", old_text="", new_text=new_part)) elif tag == "delete": diff_items.append(DiffItem(type="delete", old_text=old_part, new_text="")) elif tag == "replace": diff_items.append(DiffItem(type="replace", old_text=old_part, new_text=new_part)) return DiffResult( old_content_hash=old_hash, new_content_hash=new_hash, diff=diff_items, diff_granularity="line", ) @staticmethod def hash_content(content: str) -> str: digest = hashlib.sha256((content or "").encode("utf-8")).hexdigest() return f"sha256:{digest}" def _is_complex(self, content: str) -> bool: if not content: return False if any(pattern.search(content) for pattern in self._COMPLEX_PATTERNS): return True lines = [line for line in content.splitlines() if line.strip()] table_like_lines = sum(1 for line in lines if line.count("|") >= 2) return table_like_lines >= 2 @staticmethod def _split_units(content: str) -> List[str]: if not content: return [] paragraphs = [part.strip() for part in re.split(r"\n\s*\n", content.strip()) if part.strip()] if len(paragraphs) > 1: return paragraphs return [line.rstrip() for line in content.splitlines() if line.strip()]