# ------------------------------------------------------------------------------
import re, difflib
# ------------------------------------------------------------------------------
innerDiff = re.compile('' \
'(.*?)')
htmlTag = re.compile('<(?P\w+)( .*?)?>(.*)(?P=tag)>')
# ------------------------------------------------------------------------------
class Merger:
'''This class allows to merge 2 lines of text, each containing inserts and
deletions.'''
# Exception that may be raised by this class if the merge fails.
class MergeError(Exception): pass
def __init__(self, lineA, lineB, previousDiffs, differ):
# lineA comes "naked": any diff previously found on it was removed from
# it (ie, deleted text has been completely removed, while inserted text
# has been included, but without its surrounding tag). Info about
# previous diffs is kept in a separate variable "previousDiffs".
self.lineA = lineA
self.previousDiffs = previousDiffs
# Differences between lineA and lineB have just been computed and are
# included (within inner tags) in lineB. We will compute their position
# in self.newDiffs (see below).
self.lineB = lineB
self.newDiffs = self.computeNewDiffs()
# We choose to walk within self.lineB. We will keep in self.i our
# current position within self.lineB.
self.i = 0
# The delta index that must be applied on previous diffs
self.deltaPrevious = 0
# A link to the caller HtmlDiff class.
self.differ = differ
def computeNewDiffs(self):
'''lineB may include inner "insert" and/or tags. This function
detects them.'''
i = 0
res = []
while i < len(self.lineB):
match = innerDiff.search(self.lineB, i)
if not match: break
res.append(match)
i = match.end()
return res
def getNextDiff(self):
'''During the merging process on self.lineB, what next diff to
"consume"? An old one? A new one?'''
# No more diff ?
if not self.previousDiffs and not self.newDiffs:
return None, None, None
# No more new diff ?
if not self.newDiffs:
diff = self.previousDiffs[0]
del self.previousDiffs[0]
return diff, diff.start() + self.deltaPrevious, True
# No more previous diff ?
if not self.previousDiffs:
diff = self.newDiffs[0]
del self.newDiffs[0]
return diff, diff.start(), False
# At least one more new and previous diff. Which one to consume?
previousDiff = self.previousDiffs[0]
newDiff = self.newDiffs[0]
previousDiffIndex = previousDiff.start() + self.deltaPrevious
newDiffIndex = newDiff.start()
if previousDiffIndex <= newDiffIndex:
# Previous wins
del self.previousDiffs[0]
return previousDiff, previousDiffIndex, True
else:
# New wins
del self.newDiffs[0]
return newDiff, newDiffIndex, False
def manageBackOverlap(self, newDiff, oldText):
'''p_newDiff has been removed from self.lineB. Here we check if there
is no overlap with inserts from self.lineA, ie, text that was
inserted in one if the many cumulative updates from self.lineA and
that was deleted in self.lineB.'''
# Before managing the overlap, check if there is one.
oldDiff, oldDiffStart, isPrevious = self.getNextDiff()
newDiffEnd = self.i + len(newDiff.group(3)) - len(oldText)
if not oldDiff or not isPrevious or (oldDiffStart >= newDiffEnd):
# There is no overlapping. Dump p_newDiff and the next diff as is
# (if any).
res = self.dumpNewDiff(newDiff)
if oldDiff:
# WARNING: oldDiffStart is not up-to-date! Indeed, we have
# called getNextDiff (at the start of this method) BEFORE
# calling dumpNewDiff (the line above). But dumpNewDiff updates
# self.deltaPrevious. So we need to recompute oldDiffStart with
# the current self.deltaPrevious
if isPrevious:
oldDiffStart = oldDiff.start() + self.deltaPrevious
res += self.dumpDiff(oldDiff, oldDiffStart, isPrevious)
return res
# If we are here, we must manage a back overlap. We will do it by
# "consuming" p_newDiff.
newText = newDiff.group(3)
res = ''
consumed = 0
while True:
# First, dump the part of p_newDiff that is not impacted by oldDiff.
text = newText[consumed:oldDiffStart-self.i]
if text:
res += self.differ.getModifiedChunk(text, 'delete', '',
msg=newDiff.group(2))
consumed += len(text)
# Then, dump the part that overlaps with oldDiff
text = oldDiff.group(3)
res += self.differ.getModifiedChunk(text, 'delete', '',
msg=newDiff.group(2))
consumed += len(text)
if consumed >= len(newText): break
# Get the next diff
oldDiff, oldDiffStart, isPrevious = self.getNextDiff()
if not oldDiff or not isPrevious or (oldDiffStart > newDiffEnd):
# End of the overlapping. Dump what remains in newText and dump
# this next uncorrelated diff afterwards.
res += self.differ.getModifiedChunk(newText[consumed:],
'delete', '', msg=newDiff.group(2))
self.i += len(newDiff.group(0))
if oldDiff:
res += self.dumpDiff(oldDiff, oldDiffStart, isPrevious)
return res
# We have consumed p_newDiff entirely. Move forward within self.lineB
# w.r.t p_newDiff.
self.i += len(newDiff.group(0))
return res
def manageOverlap(self, oldDiff):
'''p_oldDiff is a previously inserted text from self.lineA. This text
is not found anymore at the start of self.lineB[self.i:]: it means
that an overlapping diff exists among new diffs. We will manage this
by identifying several, cutted, "insert" and/or "edit" zones.'''
# The idea here is to "consume" the old inserted text until we have
# found, within the new diff, all updates that have been performed on
# this old text. Then, we will have found the complete "zone" that was
# impacted by both old and new diffs.
oldText = oldDiff.group(3)
res = ''
while oldText:
# Get the overlapping (new) diff.
newDiff, newDiffStart, isPrevious = self.getNextDiff()
if not newDiff or (newDiffStart >= (self.i + len(oldText))):
# No more new diff, or a new diff but far away, not within
# oldText. So insert new the rest of p_oldText.
# Invariant: at this point, we should find what remains in
# oldText at self.lineB[self.i:].
if not self.lineB[self.i:].startswith(oldText):
raise self.MergeError('An error occurred while computing ' \
'overlapping diffs.')
res += self.differ.getModifiedChunk(oldText, 'insert', '',
msg=oldDiff.group(2))
self.i += len(oldText)
oldText = ''
# If we have "popped" a new diff, dump it anyway.
if newDiff:
res += self.dumpDiff(newDiff, newDiffStart, isPrevious)
break
# Dump the part of the old text that has been untouched by the new
# diff.
if self.i < newDiffStart:
untouched = self.lineB[self.i:newDiffStart]
res += self.differ.getModifiedChunk(untouched, 'insert', '',
msg=oldDiff.group(2))
self.i = newDiffStart
oldText = oldText[len(untouched):]
# Manage the new diff
if (newDiff.group(1) == 'delete') and \
len(newDiff.group(3)) > len(oldText):
# Among deleted text, check if there is no overlap with previous
# diffs (text deleted in self.lineB, might have been added in
# one of the many cumulated updates in self.lineA).
res += self.manageBackOverlap(newDiff, oldText)
oldText = ''
else:
# Dump the new diff and update oldText
res += self.dumpNewDiff(newDiff)
if newDiff.group(1) == 'delete':
# Consume oldText, that was deleted, at least partly, by
# this diff.
oldText = oldText[len(newDiff.group(3)):]
return res
def dumpNewDiff(self, diff):
'''Computes p_newDiff as it must appear in the result and return it.'''
# Dump the new diff (from self.lineB)
res = diff.group(0)
# Move forward within self.lineB
self.i += len(diff.group(0))
# Because of this new diff, all indexes computed on self.lineA are now
# wrong because we express them relative to lineB. So: update
# self.deltaPrevious to take this into account.
self.deltaPrevious += len(diff.group(0))
if diff.group(1) == 'delete':
# The indexes in self.lineA do not take the deleted text into
# account, because it wasn't deleted at this time. So remove
# from self.deltaPrevious the length of removed text.
self.deltaPrevious -= len(diff.group(3))
return res
def dumpDiff(self, diff, diffStart, isPrevious):
'''Computes the next p_diff (starting at p_diffStart) to insert into the
result and return it. If p_isPrevious is True, the diff is an old one
(from self.lineA); else, it is a new one (from self.lineB).'''
# Dump the part of lineB between self.i and diffStart
res = self.lineB[self.i:diffStart]
self.i = diffStart
if isPrevious:
# Dump the old diff (from self.lineA)
if diff.group(1) == 'insert':
# Check if the inserted text is still present in lineB
if self.lineB[self.i:].startswith(diff.group(3)):
# Yes. Dump the diff and go ahead within lineB
res += diff.group(0)
self.i += len(diff.group(3))
else:
# The inserted text can't be found as is in lineB.
# Must have been (partly) re-edited or removed.
overlap = self.manageOverlap(diff)
res += overlap
elif diff.group(1) == 'delete':
res += diff.group(0)
else:
res += self.dumpNewDiff(diff)
return res
def merge(self):
'''Merges self.previousDiffs into self.lineB.'''
res = ''
diff, diffStart, isPrevious = self.getNextDiff()
while diff:
res += self.dumpDiff(diff, diffStart, isPrevious)
# Load the next diff, if any
diff, diffStart, isPrevious = self.getNextDiff()
# Dump the end of self.lineB if not completely consumed
if self.i < len(self.lineB): res += self.lineB[self.i:]
return res
# ------------------------------------------------------------------------------
class HtmlDiff:
'''This class allows to compute differences between two versions of some
HTML chunk.'''
insertStyle = 'color: blue; cursor: help'
deleteStyle = 'color: red; text-decoration: line-through; cursor: help'
def __init__(self, old, new,
insertMsg=u'Inserted text', deleteMsg=u'Deleted text',
insertCss=None, deleteCss=None, insertName='insert',
deleteName='delete', diffRatio=0.7):
# p_old and p_new are strings containing chunks of HTML. If they are not
# unicode strings, we convert them to unicode; this way, every char is
# only one char lenght.
self.old = old.strip()
if isinstance(self.old, str): self.old = self.old.decode('utf-8')
self.new = new.strip()
if isinstance(self.new, str): self.new = self.new.decode('utf-8')
# Every time an "insert" or "delete" difference will be detected from
# p_old to p_new, the impacted chunk will be surrounded by a tag that
# will get, respectively, a 'title' attribute filled p_insertMsg or
# p_deleteMsg. The message will give an explanation about the change
# (who made it and at what time, for example).
self.insertMsg = insertMsg
if isinstance(self.insertMsg, str):
self.insertMsg = self.insertMsg.decode('utf-8')
self.deleteMsg = deleteMsg
if isinstance(self.deleteMsg, str):
self.deleteMsg = self.deleteMsg.decode('utf-8')
# This tag will get a CSS class p_insertCss or p_deleteCss for
# highlighting the change. If no class is provided, default styles will
# be used (see HtmlDiff.insertStyle and HtmlDiff.deleteStyle).
self.insertCss = insertCss
self.deleteCss = deleteCss
# This tag will get a "name" attribute whose content will be
# p_insertName or p_deleteName
self.insertName = insertName
self.deleteName = deleteName
# The diff algorithm of this class will need to identify similarities
# between strings. Similarity ratios will be computed by using method
# difflib.SequenceMatcher.ratio (see m_isSimilar below). Strings whose
# comparison will produce a ratio above p_diffRatio will be considered
# as similar.
self.diffRatio = diffRatio
# Some computed values
for tag in ('div', 'span'):
for type in ('insert', 'delete'):
setattr(self, '%s%sPrefix' % (tag, type.capitalize()),
'<%s name="%s"' % (tag, getattr(self, '%sName' % type)))
def getModifiedChunk(self, seq, type, sep, msg=None):
'''p_sep.join(p_seq) (if p_seq is a list) or p_seq (if p_seq is a
string) is a chunk that was either inserted (p_type='insert') or
deleted (p_type='delete'). This method will surround this part with
a div or span tag that will get some CSS class allowing to highlight
the update.
If p_msg is given, it will be used instead of the default
p_type-related message stored on p_self.'''
# Will the surrouding tag be a div or a span?
if sep == '\n': tag = 'div'
else: tag = 'span'
# What message will it show in its 'title' attribute?
if not msg:
exec 'msg = self.%sMsg' % type
# What CSS class (or, if none, tag-specific style) will be used ?
exec 'cssClass = self.%sCss' % type
if cssClass:
style = 'class="%s"' % cssClass
else:
exec 'style = self.%sStyle' % type
style = 'style="%s"' % style
# The 'name' attribute of the tag indicates the type of the update.
exec 'tagName = self.%sName' % type
# The idea is: if there are several lines, every line must be surrounded
# by a tag. This way, we know that a surrounding tag can't span several
# lines, which is a prerequisite for managing cumulative diffs.
if sep == ' ':
if not isinstance(seq, basestring):
seq = sep.join(seq)
sep = ''
if isinstance(seq, basestring):
return '%s<%s name="%s" %s title="%s">%s%s>%s' % \
(sep, tag, tagName, style, msg, seq, tag, sep)
else:
res = ''
for line in seq:
res += '%s<%s name="%s" %s title="%s">%s%s>%s' % \
(sep, tag, tagName, style, msg, line, tag, sep)
return res
def applyDiff(self, line, diff):
'''p_diff is a regex containing an insert or delete that was found
within line. This function applies the diff, removing or inserting
the diff into p_line.'''
# Keep content only for "insert" tags.
content = ''
if diff.group(1) == 'insert':
content = diff.group(3)
return line[:diff.start()] + content + line[diff.end():]
def isSimilar(self, s1, s2):
'''Returns True if strings p_s1 and p_s2 can be considered as
similar.'''
# Bypass the similarity algorithm for strings of length==1. Else, it can
# lead to infinite loops between methods getHtmlDiff and getReplacement.
if (len(s1) == 1) and (len(s2) == 1) and (s1 != s2): return False
ratio = difflib.SequenceMatcher(a=s1.lower(), b=s2.lower()).ratio()
return ratio > self.diffRatio
def getLineAndType(self, line):
'''p_line is a string that can already have been surrounded by an
"insert" or "delete" tag. This is what we try to determine here.
This method returns a tuple (type, line, innerDiffs, outerTag),
where "type" can be:
* "insert" if it has already been flagged as inserted;
* "delete" if it has already been flagged as deleted;
* None else;
"line" holds the original parameter p_line, excepted:
* if type="insert". In that case, the surrounding insert tag has been
removed and placed into "outerTag" (a re.MatchObject from regex
innerHtml, see above);
* if inner diff tags (insert or delete) are found. In that case,
- if inner "insert" tags are found, they are removed but their
content is kept;
- if inner "delete" tags are found, they are removed, content
included;
- "innerDiffs" holds the list of re.MatchObject instances
representing the found inner tags.
'''
if line.startswith(self.divDeletePrefix):
return ('delete', line, None, None)
if line.startswith(self.divInsertPrefix):
# Return the line without the surrounding tag.
action = 'insert'
outerTag = htmlTag.match(line)
line = outerTag.group(3)
else:
action = None
outerTag = None
# Replace found inner inserts with their content.
innerDiffs = []
while True:
match = innerDiff.search(line)
if not match: break
# I found one.
innerDiffs.append(match)
line = self.applyDiff(line, match)
return (action, line, innerDiffs, outerTag)
def computeTag(self, regexTag, content):
'''p_regexTag is a re.MatchObject from regex htmlTag. p_content is a
new content to put within this tag. This method produces the new
string tag filled with p_content.'''
# Recompute start tag from p_regexTag
startTag = '<%s' % regexTag.group(1)
# Add tag attributes if found
if regexTag.group(2):
startTag += regexTag.group(2)
startTag += '>'
# Recompute end tag
endTag = '%s>' % regexTag.group(1)
# Wrap content info reified tag
return startTag + content + endTag
def getSeqDiff(self, seqA, seqB, sep):
'''p_seqA and p_seqB are lists of strings. Here we will try to identify
similarities between strings from p_seqA and p_seqB, and return a
list of differences between p_seqA and p_seqB, where each element
is a tuple (action, line).
* If p_action is "delete", "line" is a line of p_seqA considered as
not included anymore in p_seqB;
* If p_action is "insert", "line" is a line of p_seqB considered as
not included in p_seqA;
* If p_action is "replace", "line" is a tuple
(lineA, lineB, previousDiffsA) containing one line from p_seqA and
one from p_seqB considered as similar. "previousDiffsA" contains
potential previous inner diffs that were found (but extracted
from, for comparison purposes) lineA.
'''
res = []
i = j = k = 0
# Scan every string from p_seqA and try to find a similar string in
# p_seqB.
while i < len(seqA):
pastAction, lineA, innerDiffs, outerTag=self.getLineAndType(seqA[i])
if pastAction == 'delete':
# We will consider this line as "equal" because it already has
# been noted as deleted in a previous diff.
res.append( ('equal', seqA[i]) )
elif k == len(seqB):
# We have already "consumed" every string from p_seqB. Remaining
# strings from p_seqA must be considered as deleted (or
# sometimes equal, see above)
if not pastAction: res.append( ('delete', seqA[i]) )
else:
# 'insert': should not happen. The inserted line should also
# be found in seqB.
res.append( ('equal', seqA[i]) )
else:
# Try to find a line in seqB which is similar to lineA.
similarFound = False
for j in range(k, len(seqB)):
if self.isSimilar(lineA, seqB[j]):
similarFound = True
# Strings between indices k and j in p_seqB must be
# considered as inserted, because no similar line exists
# in p_seqA.
if k < j:
for line in seqB[k:j]: res.append(('insert', line))
# Similar strings are appended in a 'replace' entry,
# excepted if lineA is already an insert from a
# previous diff: in this case, we keep the "old"
# version: the new one is the same, but for which we
# don't remember who updated it.
if (pastAction == 'insert') and (lineA == seqB[j]):
res.append( ('equal', seqA[i]) )
else:
res.append(('replace', (lineA, seqB[j],
innerDiffs, outerTag)))
k = j+1
break
if not similarFound: res.append( ('delete', seqA[i]) )
i += 1
# Consider any "unconsumed" line from p_seqB as being inserted.
if k < len(seqB):
for line in seqB[k:]: res.append( ('insert', line) )
# Merge similar diffs, excepted if separator is a carriage return
if sep == '\n': return res
newRes = []
lastType = None
for type, data in res:
if lastType and (type != 'replace') and (lastType == type):
newRes[-1] = (type, newRes[-1][1] + sep + data)
else:
newRes.append( (type, data) )
lastType = type
return newRes
def split(self, s, sep):
'''Splits string p_s with p_sep. If p_sep is a space, the split can't
happen for a leading or trailing space, which must be considered as
being part of the first or last word.'''
# Manage sep == \n
if sep == '\n': return s.split(sep)
leadSpace = s.startswith(sep)
trailSpace = s.endswith(sep)
if not leadSpace and not trailSpace: return s.split(sep)
res = s.strip(sep).split(sep)
if leadSpace: res[0] = sep + res[0]
if trailSpace: res[-1] = res[-1] + sep
return res
garbage = ('', '\r')
def removeGarbage(self, l, sep):
'''Removes from list p_l elements that have no interest, like blank
strings or considered as is. Also: strip lines (ie, if sep is a
carriage return.'''
i = len(l)-1
while i >= 0:
if l[i] in self.garbage: del l[i]
elif sep == '\n': l[i] = l[i].strip()
i -= 1
return l
def getStringDiff(self, old, new):
'''Identifies the differences between strings p_old and p_new by
computing:
* i = the end index of the potential common starting part (if no
common part is found, i=0);
* jo = the start index in p_old of the potential common ending part;
* jn = the start index in p_new of the potential common ending part.
'''
# Compute i
i = -1
diffFound = False
while not diffFound:
i += 1
if (i == len(old)) or (i == len(new)): break
if old[i] != new[i]: diffFound = True
# i can't be inside an HTML tag.
if (i > 0) and (old[i-1] == '<'): i -= 1
# Compute jo and jn
jo = len(old)
jn = len(new)
diffFound = False
while not diffFound:
if (jo == i) or (jn == i):
# We have reached the end of substring old[i:] or new[i:]
jo -=1
jn -= 1
break
jo -= 1
jn -= 1
if old[jo] != new[jn]: diffFound=True
return i, jo+1, jn+1
def getDumpPrefix(self, res, add, previousAdd, sep):
'''In most cases, when concatenating the next diff (p_add) to the
global result (p_res), I must prefix it with p_sep (excepted if p_res
is still empty). But when p_sep is a space, no space must be inserted
between 2 adjacent updates (p_add and p_previousAdd), because such a
space was not in the original version. This method computes the
prefix, that can thus be empty if this latter case is met.'''
prefix = ''
if not res: return prefix
if (sep == ' ') and previousAdd and \
previousAdd.endswith('') and add.startswith('