2 from itertools import chain
5 from ...types import Timestamp
6 from ...util import none_or
7 from .dummy_checksum import DummyChecksum
8 from .functions import detect
10 HEX = "1234567890abcdef"
13 return ''.join(random.choice(HEX) for i in range(40))
16 Simple constant used in order to not do weird things with a dummy revision.
20 def check_row(db, rev_row, **kwargs):
22 Checks whether a revision (database row) was reverted (identity) and returns
23 a named tuple of Revert(reverting, reverteds, reverted_to).
26 db : :class:`mw.database.DB`
27 A database connection to make use of.
29 a revision row containing 'rev_id' and 'rev_page' or 'page_id'
31 a positive integer indicating the the maximum number of revisions that can be reverted
33 should the archive table be checked for reverting revisions?
35 if set, limits the search for *reverting* revisions to those which were saved before this timestamp
38 # extract rev_id, sha1, page_id
39 if 'rev_id' in rev_row:
40 rev_id = rev_row['rev_id']
42 raise TypeError("rev_row must have 'rev_id'")
43 if 'page_id' in rev_row:
44 page_id = rev_row['page_id']
45 elif 'rev_page' in rev_row:
46 page_id = rev_row['rev_page']
48 raise TypeError("rev_row must have 'page_id' or 'rev_page'")
50 # run the regular check
51 return check(db, rev_id, page_id=page_id, **kwargs)
54 def check(db, rev_id, page_id=None, radius=defaults.RADIUS, check_archive=False,
55 before=None, window=None):
58 Checks whether a revision was reverted (identity) and returns a named tuple
59 of Revert(reverting, reverteds, reverted_to).
63 A database connection to make use of.
65 the ID of the revision to check
67 the ID of the page the revision occupies (slower if not provided)
69 a positive integer indicating the maximum number of revisions that can be reverted
71 should the archive table be checked for reverting revisions?
73 if set, limits the search for *reverting* revisions to those which were saved before this timestamp
75 if set, limits the search for *reverting* revisions to those which
76 were saved within `window` seconds after the reverted edit
79 if not hasattr(db, "revisions") and hasattr(db, "all_revisions"):
80 raise TypeError("db wrong type. Expected a mw.database.DB.")
85 raise TypeError("invalid radius. Expected a positive integer.")
86 page_id = none_or(page_id, int)
87 check_archive = bool(check_archive)
88 before = none_or(before, Timestamp)
90 # If we are searching the archive, we'll need to use `all_revisions`.
92 dbrevs = db.all_revisions
96 # If we don't have the sha1 or page_id, we're going to need to look them up
98 row = dbrevs.get(rev_id=rev_id)
99 page_id = row['rev_page']
101 # Load history and current rev
102 current_and_past_revs = list(dbrevs.query(
105 before_id=rev_id + 1, # Ensures that we capture the current revision
110 # Extract current rev and reorder history
111 current_rev, past_revs = (
112 current_and_past_revs[0], # Current rev is the first one returned
113 reversed(current_and_past_revs[1:]) # The rest are past revs, but they are in the wrong order
116 # Only way to get here is if there isn't enough history. Couldn't be
117 # reverted. Just return None.
120 if window is not None and before is None:
121 before = Timestamp(current_rev['rev_timestamp']) + window
123 # Load future revisions
124 future_revs = dbrevs.query(
132 # Convert to an iterable of (checksum, rev) pairs for detect() to consume
133 checksum_revisions = chain(
134 ((rev['rev_sha1'] if rev['rev_sha1'] is not None \
135 else DummyChecksum(), rev)
136 for rev in past_revs),
137 [(current_rev['rev_sha1'] or DummyChecksum(), current_rev)],
138 ((rev['rev_sha1'] if rev['rev_sha1'] is not None \
139 else DummyChecksum(), rev)
140 for rev in future_revs)
143 for revert in detect(checksum_revisions, radius=radius):
144 # Check that this is a relevant revert
145 if rev_id in [rev['rev_id'] for rev in revert.reverteds]: