3 # original wikiq headers are: title articleid revid date_time anon
4 # editor editor_id minor text_size text_entropy text_md5 reversion
5 # additions_size deletions_size
11 from datetime import datetime,timezone
13 from subprocess import Popen, PIPE
14 from collections import deque
15 from hashlib import sha1
17 from mwxml import Dump
19 from deltas.tokenizers import wikitext_split
22 from urllib.parse import quote
23 TO_ENCODE = ('title', 'editor')
25 from deltas import SequenceMatcher
26 from deltas import SegmentMatcher
28 import dataclasses as dc
29 from dataclasses import dataclass
31 import pyarrow.parquet as pq
39 def calculate_persistence(tokens_added):
40 return(sum([(len(x.revisions)-1) for x in tokens_added]),
43 class WikiqIterator():
44 def __init__(self, fh, collapse_user=False):
46 self.collapse_user = collapse_user
47 self.mwiterator = Dump.from_file(self.fh)
48 self.namespace_map = { ns.id : ns.name for ns in
49 self.mwiterator.site_info.namespaces }
50 self.__pages = self.load_pages()
53 for page in self.mwiterator:
55 namespace_map = self.namespace_map,
56 collapse_user=self.collapse_user)
62 return next(self._pages)
65 __slots__ = ('id', 'title', 'namespace', 'redirect',
66 'restrictions', 'mwpage', '__revisions',
69 def __init__(self, page, namespace_map, collapse_user=False):
71 self.namespace = page.namespace
72 # following mwxml, we assume namespace 0 in cases where
73 # page.namespace is inconsistent with namespace_map
74 if page.namespace not in namespace_map:
75 self.title = page.title
77 if page.namespace != 0:
78 self.title = ':'.join([namespace_map[page.namespace], page.title])
80 self.title = page.title
81 self.restrictions = page.restrictions
82 self.collapse_user = collapse_user
84 self.__revisions = self.rev_list()
87 # Outline for how we want to handle collapse_user=True
88 # iteration rev.user prev_rev.user add prev_rev?
95 for i, rev in enumerate(self.mwpage):
96 # never yield the first time
98 if self.collapse_user:
100 rev.collapsed_revs = collapsed_revs
103 if self.collapse_user:
104 # yield if this is the last edit in a seq by a user and reset
105 # also yield if we do know who the user is
107 if rev.deleted.user or prev_rev.deleted.user:
110 rev.collapsed_revs = collapsed_revs
112 elif not rev.user.text == prev_rev.user.text:
115 rev.collapsed_revs = collapsed_revs
116 # otherwise, add one to the counter
119 rev.collapsed_revs = collapsed_revs
120 # if collapse_user is false, we always yield
126 # also yield the final time
130 return self.__revisions
133 return next(self.__revisions)
137 A RegexPair is defined by a regular expression (pattern) and a label.
138 The pattern can include capture groups. If it does then each capture group will have a resulting column in the output.
139 If the pattern does not include a capture group, then only one output column will result.
141 class RegexPair(object):
142 def __init__(self, pattern, label):
143 self.pattern = re.compile(pattern)
145 self.has_groups = bool(self.pattern.groupindex)
147 self.capture_groups = list(self.pattern.groupindex.keys())
149 def get_pyarrow_fields(self):
151 fields = [pa.field(self._make_key(cap_group),pa.list_(pa.string()))
152 for cap_group in self.capture_groups]
154 fields = [pa.field(self.label, pa.list_(pa.string()))]
158 def _make_key(self, cap_group):
159 return ("{}_{}".format(self.label, cap_group))
161 def matchmake(self, content, rev_data):
164 # if there are named capture groups in the regex
167 # if there are matches of some sort in this revision content, fill the lists for each cap_group
168 if self.pattern.search(content) is not None:
169 m = self.pattern.finditer(content)
170 matchobjects = list(m)
172 for cap_group in self.capture_groups:
173 key = self._make_key(cap_group)
175 for match in matchobjects:
176 # we only want to add the match for the capture group if the match is not None
177 if match.group(cap_group) != None:
178 temp_list.append(match.group(cap_group))
180 # if temp_list of matches is empty just make that column None
181 if len(temp_list)==0:
182 temp_dict[key] = None
183 # else we put in the list we made in the for-loop above
185 temp_dict[key] = ', '.join(temp_list)
187 # there are no matches at all in this revision content, we default values to None
189 for cap_group in self.capture_groups:
190 key = self._make_key(cap_group)
191 temp_dict[key] = None
193 # there are no capture groups, we just search for all the matches of the regex
195 #given that there are matches to be made
196 if type(content) in(str, bytes):
197 if self.pattern.search(content) is not None:
198 m = self.pattern.findall(content)
199 temp_dict[self.label] = ', '.join(m)
201 temp_dict[self.label] = None
203 # update rev_data with our new columns
204 for k, v in temp_dict.items():
205 setattr(rev_data, k, v)
211 We used to use a dictionary to collect fields for the output.
212 Now we use dataclasses. Compared to a dictionary, this should help:
214 - make it easier to output parquet data.
215 - use class attribute '.' syntax instead of dictionary syntax.
216 - improve support for tooling (autocomplete, type hints)
217 - use type information to define formatting rules
219 Depending on the parameters passed into Wikiq, the output schema can be different.
220 Therefore, we need to end up constructing a dataclass with the correct output schema.
221 It also needs to have the correct pyarrow schema so we can write parquet files.
223 The RevDataBase type has all the fields that will be output no matter how wikiq is invoked.
234 text_chars: int = None
236 reverteds: list[int] = None
242 # toggles url encoding. this isn't a dataclass field since it doesn't have a type annotation
245 # defines pyarrow schema.
246 # each field in the data class needs an entry in this array.
247 # the names should match and be in the same order.
248 # this isn't a dataclass field since it doesn't have a type annotation
250 pa.field("revid", pa.int64()),
251 pa.field("date_time", pa.timestamp('ms')),
252 pa.field("articleid",pa.int64()),
253 pa.field("editorid",pa.int64()),
254 pa.field("title",pa.string()),
255 pa.field("namespace",pa.int32()),
256 pa.field("deleted",pa.bool_()),
257 pa.field("test_chars",pa.int32()),
258 pa.field("revert",pa.bool_()),
259 pa.field("reverteds",pa.list_(pa.int64())),
260 pa.field("sha1",pa.string()),
261 pa.field("minor",pa.bool_()),
262 pa.field("editor",pa.string()),
263 pa.field("anon",pa.bool_())
266 # pyarrow is a columnar format, so most of the work happens in the flush_parquet_buffer function
267 def to_pyarrow(self):
268 return dc.astuple(self)
270 # logic to convert each field into the wikiq tsv format goes here.
271 def to_tsv_row(self):
274 for f in dc.fields(self):
275 val = getattr(self, f.name)
276 if getattr(self, f.name) is None:
279 row.append("TRUE" if val else "FALSE")
281 elif f.type == datetime:
282 row.append(val.strftime('%Y-%m-%d %H:%M:%S'))
284 elif f.name in {'editor','title'}:
286 if self.urlencode and f.name in TO_ENCODE:
287 row.append(quote(str(s)))
291 elif f.type == list[int]:
292 row.append('"' + ",".join([str(x) for x in val]) + '"')
295 if self.urlencode and f.name in TO_ENCODE:
296 row.append(quote(str(val)))
302 return '\t'.join(map(str,row))
304 def header_row(self):
305 return '\t'.join(map(lambda f: f.name, dc.fields(self)))
309 If collapse=True we'll use a RevDataCollapse dataclass.
310 This class inherits from RevDataBase. This means that it has all the same fields and functions.
312 It just adds a new field and updates the pyarrow schema.
316 class RevDataCollapse(RevDataBase):
317 collapsed_revs:int = None
319 pa_collapsed_revs_schema = pa.field('collapsed_revs',pa.int64())
320 pa_schema_fields = RevDataBase.pa_schema_fields + [pa_collapsed_revs_schema]
324 If persistence data is to be computed we'll need the fields added by RevDataPersistence.
328 class RevDataPersistence(RevDataBase):
329 token_revs:int = None
330 tokens_added:int = None
331 tokens_removed:int = None
332 tokens_window:int = None
334 pa_persistence_schema_fields = [
335 pa.field("token_revs", pa.int64()),
336 pa.field("tokens_added", pa.int64()),
337 pa.field("tokens_removed", pa.int64()),
338 pa.field("tokens_window", pa.int64())]
340 pa_schema_fields = RevDataBase.pa_schema_fields + pa_persistence_schema_fields
343 class RevDataCollapsePersistence uses multiple inheritence to make a class that has both persistence and collapse fields.
347 class RevDataCollapsePersistence(RevDataCollapse, RevDataPersistence):
348 pa_schema_fields = RevDataCollapse.pa_schema_fields + RevDataPersistence.pa_persistence_schema_fields
351 def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15, output_parquet=True, parquet_buffer_size=2000):
354 persist : what persistence method to use. Takes a PersistMethod value
356 self.input_file = input_file
358 self.collapse_user = collapse_user
359 self.persist = persist
361 self.urlencode = urlencode
362 self.revert_radius = revert_radius
364 if namespaces is not None:
365 self.namespace_filter = set(namespaces)
367 self.namespace_filter = None
369 self.regex_schemas = []
370 self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
371 self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
374 # This is where we set the type for revdata.
376 if self.collapse_user is True:
377 if self.persist == PersistMethod.none:
378 revdata_type = RevDataCollapse
380 revdata_type = RevDataCollapsePersistence
381 elif self.persist != PersistMethod.none:
382 revdata_type = RevDataPersistence
384 revdata_type = RevDataBase
386 # if there are regex fields, we need to add them to the revdata type.
387 regex_fields = [(field.name, list[str], dc.field(default=None)) for field in self.regex_schemas]
389 # make_dataclass is a function that defines a new dataclass type.
390 # here we extend the type we have already chosen and add the regular expression types
391 self.revdata_type = dc.make_dataclass('RevData_Parser',
393 bases=(revdata_type,))
395 # we also need to make sure that we have the right pyarrow schema
396 self.revdata_type.pa_schema_fields = revdata_type.pa_schema_fields + self.regex_schemas
398 self.revdata_type.urlencode = self.urlencode
400 self.schema = pa.schema(self.revdata_type.pa_schema_fields)
402 # here we initialize the variables we need for output.
403 if output_parquet is True:
404 self.output_parquet = True
405 self.pq_writer = None
406 self.output_file = output_file
407 self.parquet_buffer = []
408 self.parquet_buffer_size = parquet_buffer_size
410 self.print_header = True
411 if output_file == sys.stdout:
413 self.output_file = output_file
415 self.output_file = open(output_file,'w')
416 self.output_parquet = False
418 def make_matchmake_pairs(self, patterns, labels):
419 if (patterns is not None and labels is not None) and \
420 (len(patterns) == len(labels)):
422 for pattern, label in zip(patterns, labels):
423 rp = RegexPair(pattern, label)
425 self.regex_schemas = self.regex_schemas + rp.get_pyarrow_fields()
427 elif (patterns is None and labels is None):
430 sys.exit('Each regular expression *must* come with a corresponding label and vice versa.')
432 def matchmake_revision(self, rev, rev_data):
433 rev_data = self.matchmake_text(rev.text, rev_data)
434 rev_data = self.matchmake_comment(rev.comment, rev_data)
437 def matchmake_text(self, text, rev_data):
438 return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs)
440 def matchmake_comment(self, comment, rev_data):
441 return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs)
443 def matchmake_pairs(self, text, rev_data, pairs):
445 rev_data = pair.matchmake(text, rev_data)
448 def __get_namespace_from_title(self, title):
451 for ns in self.namespaces:
452 # skip if the namespace is not defined
454 default_ns = self.namespaces[ns]
457 if title.startswith(ns + ":"):
458 return self.namespaces[ns]
460 # if we've made it this far with no matches, we return the default namespace
466 # create a regex that creates the output filename
467 # output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$',
468 # r'output/wikiq-\1-\2.tsv',
471 # Construct dump file iterator
472 dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
474 # extract list of namspaces
475 self.namespaces = {ns.name : ns.id for ns in dump.mwiterator.site_info.namespaces}
481 # Iterate through pages
483 namespace = page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title)
485 # skip namespaces not in the filter
486 if self.namespace_filter is not None:
487 if namespace not in self.namespace_filter:
490 rev_detector = mwreverts.Detector(radius = self.revert_radius)
492 if self.persist != PersistMethod.none:
493 window = deque(maxlen=PERSISTENCE_RADIUS)
495 if self.persist == PersistMethod.sequence:
496 state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
497 revert_radius=PERSISTENCE_RADIUS)
499 elif self.persist == PersistMethod.segment:
500 state = mwpersistence.DiffState(SegmentMatcher(tokenizer = wikitext_split),
501 revert_radius=PERSISTENCE_RADIUS)
503 # self.persist == PersistMethod.legacy
505 from mw.lib import persistence
506 state = persistence.State()
508 # Iterate through a page's revisions
511 # create a new data object instead of a dictionary.
512 rev_data = self.revdata_type(revid = rev.id,
513 date_time = datetime.fromtimestamp(rev.timestamp.unix(), tz=timezone.utc),
515 editorid = "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
517 deleted = rev.deleted.text,
518 namespace = namespace
521 rev_data = self.matchmake_revision(rev, rev_data)
523 if not rev.deleted.text:
524 # rev.text can be None if the page has no text
527 # if text exists, we'll check for a sha1 and generate one otherwise
532 text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
534 rev_data.sha1 = text_sha1
536 # TODO rev.bytes doesn't work.. looks like a bug
537 rev_data.text_chars = len(rev.text)
539 # generate revert data
540 revert = rev_detector.process(text_sha1, rev.id)
543 rev_data.revert = True
544 rev_data.reverteds = revert.reverteds
546 rev_data.revert = False
548 # if the fact that the edit was minor can be hidden, this might be an issue
549 rev_data.minor = rev.minor
551 if not rev.deleted.user:
552 # wrap user-defined editors in quotes for fread
553 rev_data.editor = rev.user.text
554 rev_data.anon = rev.user.id is None
556 #if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I):
561 #TODO missing: additions_size deletions_size
563 # if collapse user was on, lets run that
564 if self.collapse_user:
565 rev_data.collapsed_revs = rev.collapsed_revs
568 if self.persist != PersistMethod.none:
569 if not rev.deleted.text:
571 if self.persist != PersistMethod.legacy:
572 _, tokens_added, tokens_removed = state.update(rev.text, rev.id)
575 _, tokens_added, tokens_removed = state.process(rev.text, rev.id, text_sha1)
577 window.append((rev.id, rev_data, tokens_added, tokens_removed))
579 if len(window) == PERSISTENCE_RADIUS:
580 old_rev_id, old_rev_data, old_tokens_added, old_tokens_removed = window[0]
582 num_token_revs, num_tokens = calculate_persistence(old_tokens_added)
584 old_rev_data.token_revs = num_token_revs
585 old_rev_data.tokens_added = num_tokens
586 old_rev_data.tokens_removed = len(old_tokens_removed)
587 old_rev_data.tokens_window = PERSISTENCE_RADIUS-1
589 self.print_rev_data(old_rev_data)
592 self.print_rev_data(rev_data)
596 if self.persist != PersistMethod.none:
597 # print out metadata for the last RADIUS revisions
598 for i, item in enumerate(window):
599 # if the window was full, we've already printed item 0
600 if len(window) == PERSISTENCE_RADIUS and i == 0:
603 rev_id, rev_data, tokens_added, tokens_removed = item
604 num_token_revs, num_tokens = calculate_persistence(tokens_added)
606 rev_data.token_revs = num_token_revs
607 rev_data.tokens_added = num_tokens
608 rev_data.tokens_removed = len(tokens_removed)
609 rev_data.tokens_window = len(window)-(i+1)
610 self.print_rev_data(rev_data)
614 print("Done: %s revisions and %s pages." % (rev_count, page_count),
617 # remember to flush the parquet_buffer if we're done
618 if self.output_parquet is True:
619 self.flush_parquet_buffer()
620 self.pq_writer.close()
623 self.output_file.close()
627 For performance reasons it's better to write parquet in batches instead of one row at a time.
628 So this function just puts the data on a buffer. If the buffer is full, then it gets flushed (written).
630 def write_parquet_row(self, rev_data):
631 padata = rev_data.to_pyarrow()
632 self.parquet_buffer.append(padata)
634 if len(self.parquet_buffer) >= self.parquet_buffer_size:
635 self.flush_parquet_buffer()
639 Function that actually writes data to the parquet file.
640 It needs to transpose the data from row-by-row to column-by-column
642 def flush_parquet_buffer(self):
645 Returns the pyarrow table that we'll write
647 def rows_to_table(rg, schema):
654 for j in range(len(cols)):
655 cols[j].append(row[j])
658 for col, typ in zip(cols, schema.types):
659 arrays.append(pa.array(col, typ))
660 return pa.Table.from_arrays(arrays, schema=schema)
662 outtable = rows_to_table(self.parquet_buffer, self.schema)
663 if self.pq_writer is None:
664 self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
666 self.pq_writer.write_table(outtable)
667 self.parquet_buffer = []
669 # depending on if we are configured to write tsv or parquet, we'll call a different function.
670 def print_rev_data(self, rev_data):
671 if self.output_parquet is False:
672 printfunc = self.write_tsv_row
674 printfunc = self.write_parquet_row
678 def write_tsv_row(self, rev_data):
679 if self.print_header:
680 print(rev_data.header_row(), file=self.output_file)
681 self.print_header = False
683 line = rev_data.to_tsv_row()
684 print(line, file=self.output_file)
687 def open_input_file(input_filename):
688 if re.match(r'.*\.7z$', input_filename):
689 cmd = ["7za", "x", "-so", input_filename, "*.xml"]
690 elif re.match(r'.*\.gz$', input_filename):
691 cmd = ["zcat", input_filename]
692 elif re.match(r'.*\.bz2$', input_filename):
693 cmd = ["bzcat", "-dk", input_filename]
696 input_file = Popen(cmd, stdout=PIPE).stdout
698 input_file = open(input_filename, 'r')
702 def get_output_filename(input_filename, parquet = False):
703 output_filename = re.sub(r'\.(7z|gz|bz2)?$', '', input_filename)
704 output_filename = re.sub(r'\.xml', '', output_filename)
706 output_filename = output_filename + ".tsv"
708 output_filename = output_filename + ".parquet"
709 return output_filename
711 def open_output_file(input_filename):
712 # create a regex that creates the output filename
713 output_filename = get_output_filename(input_filename, parquet = False)
714 output_file = open(output_filename, "w")
717 parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
719 # arguments for the input direction
720 parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
721 help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
723 parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
724 help="Directory for output files. If it ends with .parquet output will be in parquet format.")
726 parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
727 help="Write output to standard out (do not create dump file)")
729 parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
730 help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
732 parser.add_argument('-p', '--persistence', dest="persist", default=None, const='', type=str, choices = ['','segment','sequence','legacy'], nargs='?',
733 help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure. This may by slow. The defualt is -p=sequence, which uses the same algorithm as in the past, but with improvements to wikitext parsing. Use -p=legacy for old behavior used in older research projects. Use -p=segment for advanced persistence calculation method that is robust to content moves, but prone to bugs, and slower.")
735 parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
736 help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
738 parser.add_argument('-n', '--namespace-include', dest="namespace_filter", type=int, action='append',
739 help="Id number of namspace to include. Can be specified more than once.")
741 parser.add_argument('-rr',
743 dest="revert_radius",
747 help="Number of edits to check when looking for reverts (default: 15)")
749 parser.add_argument('-RP', '--revision-pattern', dest="regex_match_revision", default=None, type=str, action='append',
750 help="The regular expression to search for in revision text. The regex must be surrounded by quotes.")
752 parser.add_argument('-RPl', '--revision-pattern-label', dest="regex_revision_label", default=None, type=str, action='append',
753 help="The label for the outputted column based on matching the regex in revision text.")
755 parser.add_argument('-CP', '--comment-pattern', dest="regex_match_comment", default=None, type=str, action='append',
756 help="The regular expression to search for in comments of revisions.")
758 parser.add_argument('-CPl', '--comment-pattern-label', dest="regex_comment_label", default=None, type=str, action='append',
759 help="The label for the outputted column based on matching the regex in comments.")
761 args = parser.parse_args()
765 # set persistence method
767 if args.persist is None:
768 persist = PersistMethod.none
769 elif args.persist == "segment":
770 persist = PersistMethod.segment
771 elif args.persist == "legacy":
772 persist = PersistMethod.legacy
774 persist = PersistMethod.sequence
776 if args.namespace_filter is not None:
777 namespaces = args.namespace_filter
781 if len(args.dumpfiles) > 0:
782 output_parquet = False
783 for filename in args.dumpfiles:
784 input_file = open_input_file(filename)
786 # open directory for output
788 output_dir = args.output_dir[0]
792 if output_dir.endswith(".parquet"):
793 output_parquet = True
795 print("Processing file: %s" % filename, file=sys.stderr)
798 output_file = sys.stdout
800 filename = os.path.join(output_dir, os.path.basename(filename))
801 output_file = get_output_filename(filename, parquet = output_parquet)
803 wikiq = WikiqParser(input_file,
805 collapse_user=args.collapse_user,
807 urlencode=args.urlencode,
808 namespaces=namespaces,
809 revert_radius=args.revert_radius,
810 regex_match_revision = args.regex_match_revision,
811 regex_revision_label = args.regex_revision_label,
812 regex_match_comment = args.regex_match_comment,
813 regex_comment_label = args.regex_comment_label,
814 output_parquet=output_parquet)
822 wikiq = WikiqParser(sys.stdin,
824 collapse_user=args.collapse_user,
826 #persist_legacy=args.persist_legacy,
827 urlencode=args.urlencode,
828 namespaces=namespaces,
829 revert_radius=args.revert_radius,
830 regex_match_revision = args.regex_match_revision,
831 regex_revision_label = args.regex_revision_label,
832 regex_match_comment = args.regex_match_comment,
833 regex_comment_label = args.regex_comment_label)
837 # stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
838 # stop_words = stop_words.split(",")