# -*- coding: utf-8 -*-
"""Deterministic diff service for document chat proposals."""
import difflib
import hashlib
import re
from typing import List
from core.document_chat.schemas import DiffItem, DiffResult
class DiffService:
"""Build paragraph/line diffs, falling back to full-content comparison."""
_COMPLEX_PATTERNS = (
re.compile(r"
]", re.IGNORECASE),
re.compile(r"
", re.IGNORECASE),
re.compile(r"!\[[^\]]*\]\("),
re.compile(r"<表格开始>|<表格结束>"),
)
def build_diff(self, old_content: str, new_content: str) -> DiffResult:
old_text = old_content or ""
new_text = new_content or ""
old_hash = self.hash_content(old_text)
new_hash = self.hash_content(new_text)
if self._is_complex(old_text) or self._is_complex(new_text):
return DiffResult(
old_content_hash=old_hash,
new_content_hash=new_hash,
diff=[DiffItem(type="full_content", old_text=old_text, new_text=new_text)],
diff_granularity="full_content",
)
old_units = self._split_units(old_text)
new_units = self._split_units(new_text)
matcher = difflib.SequenceMatcher(a=old_units, b=new_units, autojunk=False)
diff_items: List[DiffItem] = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
old_part = "\n".join(old_units[i1:i2])
new_part = "\n".join(new_units[j1:j2])
if tag == "equal":
diff_items.append(DiffItem(type="equal", old_text=old_part, new_text=new_part))
elif tag == "insert":
diff_items.append(DiffItem(type="insert", old_text="", new_text=new_part))
elif tag == "delete":
diff_items.append(DiffItem(type="delete", old_text=old_part, new_text=""))
elif tag == "replace":
diff_items.append(DiffItem(type="replace", old_text=old_part, new_text=new_part))
return DiffResult(
old_content_hash=old_hash,
new_content_hash=new_hash,
diff=diff_items,
diff_granularity="line",
)
@staticmethod
def hash_content(content: str) -> str:
digest = hashlib.sha256((content or "").encode("utf-8")).hexdigest()
return f"sha256:{digest}"
def _is_complex(self, content: str) -> bool:
if not content:
return False
if any(pattern.search(content) for pattern in self._COMPLEX_PATTERNS):
return True
lines = [line for line in content.splitlines() if line.strip()]
table_like_lines = sum(1 for line in lines if line.count("|") >= 2)
return table_like_lines >= 2
@staticmethod
def _split_units(content: str) -> List[str]:
if not content:
return []
paragraphs = [part.strip() for part in re.split(r"\n\s*\n", content.strip()) if part.strip()]
if len(paragraphs) > 1:
return paragraphs
return [line.rstrip() for line in content.splitlines() if line.strip()]