]> code.communitydata.science - mediawiki_dump_tools.git/blob - wikiq
initial work on parquet support
[mediawiki_dump_tools.git] / wikiq
1 #!/usr/bin/env python3
2
3 # original wikiq headers are: title articleid revid date_time anon
4 # editor editor_id minor text_size text_entropy text_md5 reversion
5 # additions_size deletions_size
6
7 import argparse
8 import sys
9 import os, os.path
10 import re
11
12 from subprocess import Popen, PIPE
13 from collections import deque
14 from hashlib import sha1
15
16 from mwxml import Dump
17
18 from deltas.tokenizers import wikitext_split
19 import mwpersistence
20 import mwreverts
21 from urllib.parse import quote
22 TO_ENCODE = ('title', 'editor')
23 PERSISTENCE_RADIUS=7
24 from deltas import SequenceMatcher
25 from deltas import SegmentMatcher
26
27 from dataclasses import dataclass
28 import pandas as pd
29 import pyarrow as pa
30 import pyarrow.parquet as pq
31
32 from typing import List
33
34 class PersistMethod:
35     none = 0
36     sequence = 1
37     segment = 2
38     legacy = 3
39
40 def calculate_persistence(tokens_added):
41     return(sum([(len(x.revisions)-1) for x in tokens_added]),
42            len(tokens_added))
43
44
45 class WikiqIterator():
46     def __init__(self, fh, collapse_user=False):
47         self.fh = fh
48         self.collapse_user = collapse_user
49         self.mwiterator = Dump.from_file(self.fh)
50         self.namespace_map = { ns.id : ns.name for ns in
51                                self.mwiterator.site_info.namespaces }
52         self.__pages = self.load_pages()
53
54     def load_pages(self):
55         for page in self.mwiterator:
56             yield WikiqPage(page,
57                             namespace_map = self.namespace_map,
58                             collapse_user=self.collapse_user)
59
60     def __iter__(self):
61         return self.__pages
62
63     def __next__(self):
64         return next(self._pages)
65
66 class WikiqPage():
67     __slots__ = ('id', 'title', 'namespace', 'redirect',
68                  'restrictions', 'mwpage', '__revisions',
69                  'collapse_user')
70     
71     def __init__(self, page, namespace_map, collapse_user=False):
72         self.id = page.id
73         self.namespace = page.namespace
74         # following mwxml, we assume namespace 0 in cases where
75         # page.namespace is inconsistent with namespace_map
76         if page.namespace not in namespace_map:
77             self.title = page.title
78             page.namespace = 0
79         if page.namespace != 0:
80             self.title = ':'.join([namespace_map[page.namespace], page.title])
81         else:
82             self.title = page.title
83         self.restrictions = page.restrictions
84         self.collapse_user = collapse_user
85         self.mwpage = page
86         self.__revisions = self.rev_list()
87
88     def rev_list(self):
89         # Outline for how we want to handle collapse_user=True
90         # iteration   rev.user   prev_rev.user   add prev_rev?
91         #         0          A            None           Never
92         #         1          A               A           False
93         #         2          B               A            True
94         #         3          A               B            True
95         #         4          A               A           False
96         # Post-loop                          A          Always
97         for i, rev in enumerate(self.mwpage):
98             # never yield the first time
99             if i == 0:
100                 if self.collapse_user: 
101                     collapsed_revs = 1
102                     rev.collapsed_revs = collapsed_revs
103
104             else:
105                 if self.collapse_user:
106                     # yield if this is the last edit in a seq by a user and reset
107                     # also yield if we do know who the user is
108
109                     if rev.deleted.user or prev_rev.deleted.user:
110                         yield prev_rev
111                         collapsed_revs = 1
112                         rev.collapsed_revs = collapsed_revs
113
114                     elif not rev.user.text == prev_rev.user.text:
115                         yield prev_rev
116                         collapsed_revs = 1
117                         rev.collapsed_revs = collapsed_revs
118                     # otherwise, add one to the counter
119                     else:
120                         collapsed_revs += 1
121                         rev.collapsed_revs = collapsed_revs
122                 # if collapse_user is false, we always yield
123                 else:
124                     yield prev_rev
125
126             prev_rev = rev
127
128         # also yield the final time
129         yield prev_rev
130
131     def __iter__(self):
132         return self.__revisions
133
134     def __next__(self):
135         return next(self.__revisions)
136
137
138 class RegexPair(object):
139     def __init__(self, pattern, label):
140         self.pattern = re.compile(pattern)
141         self.label = label
142         self.has_groups = bool(self.pattern.groupindex)
143         if self.has_groups:
144             self.capture_groups = list(self.pattern.groupindex.keys())
145             
146     def _make_key(self, cap_group):
147         return ("{}_{}".format(self.label, cap_group))
148
149     def matchmake(self, content, rev_data):
150         
151         temp_dict = {}
152         # if there are named capture groups in the regex
153         if self.has_groups:
154
155             # if there are matches of some sort in this revision content, fill the lists for each cap_group
156             if self.pattern.search(content) is not None:
157                 m = self.pattern.finditer(content)
158                 matchobjects = list(m)
159
160                 for cap_group in self.capture_groups:
161                     key = self._make_key(cap_group)
162                     temp_list = []
163                     for match in matchobjects:
164                         # we only want to add the match for the capture group if the match is not None
165                         if match.group(cap_group) != None:
166                             temp_list.append(match.group(cap_group))
167
168                     # if temp_list of matches is empty just make that column None
169                     if len(temp_list)==0:
170                         temp_dict[key] = None
171                     # else we put in the list we made in the for-loop above
172                     else:
173                         temp_dict[key] = ', '.join(temp_list)
174
175             # there are no matches at all in this revision content, we default values to None
176             else:
177                 for cap_group in self.capture_groups:
178                     key = self._make_key(cap_group)
179                     temp_dict[key] = None
180
181         # there are no capture groups, we just search for all the matches of the regex
182         else:
183             #given that there are matches to be made
184             if type(content) in(str, bytes):
185                 if self.pattern.search(content) is not None:
186                     m = self.pattern.findall(content)
187                     temp_dict[self.label] = ', '.join(m)
188                 else:
189                     temp_dict[self.label] = None
190
191         # update rev_data with our new columns
192         rev_data.update(temp_dict)
193         return rev_data
194
195 @dataclass
196 class RevData():
197     revid: int
198     date_time: datetime
199     articleid: int
200     editorid: int
201     title: str
202     namespace: int
203     deleted: bool
204     text_chars: int
205     revert: bool
206     reverteds: list[bool]
207     sha1: str
208     text_chars: int
209     revert: bool
210     reverteds: list[int]
211     minor: bool
212     editor: str
213     anon: bool
214     collapsed_revs:int
215     token_revs:int
216     tokens_added:int
217     tokens_removed:int
218     tokens_window:int
219
220 class WikiqParser():
221     def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15, output_parquet=True, parquet_buffer_size=2000):
222         """ 
223         Parameters:
224            persist : what persistence method to use. Takes a PersistMethod value
225         """
226         self.input_file = input_file
227
228         self.collapse_user = collapse_user
229         self.persist = persist
230         self.printed_header = False
231         self.namespaces = []
232         self.urlencode = urlencode
233         self.revert_radius = revert_radius
234         
235         self.parquet_buffer = []
236         self.parquet_buffer_size = parquet_buffer_size
237
238         if namespaces is not None:
239             self.namespace_filter = set(namespaces)
240         else:
241             self.namespace_filter = None
242
243         self.regex_schemas = []
244         self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
245         self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
246
247
248         if output_parquet is True:
249             self.output_parquet = True
250             self.pq_writer = None
251             self.output_file = output_file
252         else:
253             self.output_file = open(output_file,'w')
254
255
256     def make_matchmake_pairs(self, patterns, labels):
257         if (patterns is not None and labels is not None) and \
258            (len(patterns) == len(labels)):
259             result = []
260             for pattern, label in zip(patterns, labels):
261                 result.append(RegexPair(pattern, label))
262                 self.regex_schemas.append(pa.field(label, pa.list_(pa.string())))
263
264             return result
265         elif (patterns is None and labels is None):
266             return []
267         else:
268             sys.exit('Each regular expression *must* come with a corresponding label and vice versa.')
269
270     def matchmake(self, rev, rev_data):
271         rev_data = self.matchmake_revision(rev.text, rev_data)
272         rev_data = self.matchmake_comment(rev.comment, rev_data)
273         return rev_data
274
275     def matchmake_revision(self, text, rev_data):
276         return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs)
277
278     def matchmake_comment(self, comment, rev_data):
279         return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs)
280
281     def matchmake_pairs(self, text, rev_data, pairs):
282         for pair in pairs:
283             rev_data = pair.matchmake(text, rev_data)
284         return rev_data
285
286     def __get_namespace_from_title(self, title):
287         default_ns = None
288
289         for ns in self.namespaces:
290             # skip if the namespace is not defined
291             if ns == None:
292                 default_ns = self.namespaces[ns]
293                 continue
294
295             if title.startswith(ns + ":"):
296                 return self.namespaces[ns]
297
298         # if we've made it this far with no matches, we return the default namespace
299         return default_ns
300
301
302     def process(self):
303
304         # create a regex that creates the output filename
305         # output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$',
306         #                         r'output/wikiq-\1-\2.tsv',
307         #                         input_filename)
308
309         # Construct dump file iterator
310         dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
311
312         # extract list of namspaces
313         self.namespaces = {ns.name : ns.id for ns in dump.mwiterator.site_info.namespaces}
314
315         page_count = 0
316         rev_count = 0
317
318
319         # Iterate through pages
320         for page in dump:
321             namespace = page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title)
322
323             # skip namespaces not in the filter
324             if self.namespace_filter is not None:
325                 if namespace not in self.namespace_filter:
326                     continue
327
328             rev_detector = mwreverts.Detector(radius = self.revert_radius)
329
330             if self.persist != PersistMethod.none:
331                 window = deque(maxlen=PERSISTENCE_RADIUS)
332
333                 if self.persist == PersistMethod.sequence:
334                     state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
335                                                     revert_radius=PERSISTENCE_RADIUS)
336
337                 elif self.persist == PersistMethod.segment:
338                     state = mwpersistence.DiffState(SegmentMatcher(tokenizer = wikitext_split),
339                                                     revert_radius=PERSISTENCE_RADIUS)
340
341                 # self.persist == PersistMethod.legacy
342                 else:
343                     from mw.lib import persistence
344                     state = persistence.State()
345
346             # Iterate through a page's revisions
347             for rev in page:
348                 
349                 # initialize rev_data
350                 rev_data = {
351                     'revid':rev.id,
352                     'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
353                     'articleid' : page.id,
354                     'editorid' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
355                     'title' : '"' + page.title + '"',
356                     'namespace' : namespace,
357                     'deleted' : "TRUE" if rev.deleted.text else "FALSE"
358                 }
359
360                 rev_data = self.matchmake(rev, rev_data)
361
362                 # if revisions are deleted, /many/ things will be missing
363                 if rev.deleted.text:
364                     rev_data['text_chars'] = ""
365                     rev_data['sha1'] = ""
366                     rev_data['revert'] = ""
367                     rev_data['reverteds'] = ""
368
369                 else:
370                     # rev.text can be None if the page has no text
371                     if not rev.text:
372                         rev.text = ""
373                     # if text exists, we'll check for a sha1 and generate one otherwise
374
375                     if rev.sha1:
376                         text_sha1 = rev.sha1
377                     else:
378
379                         text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
380                     
381                     rev_data['sha1'] = text_sha1
382
383                     # TODO rev.bytes doesn't work.. looks like a bug
384                     rev_data['text_chars'] = len(rev.text)
385
386                     # generate revert data
387                     revert = rev_detector.process(text_sha1, rev.id)
388                     
389                     if revert:
390                         rev_data['revert'] = "TRUE"
391                         rev_data['reverteds'] = '"' + ",".join([str(x) for x in revert.reverteds]) + '"'
392                     else:
393                         rev_data['revert'] = "FALSE"
394                         rev_data['reverteds'] = ""
395
396                 # if the fact that the edit was minor can be hidden, this might be an issue
397                 rev_data['minor'] = "TRUE" if rev.minor else "FALSE"
398
399                 if not rev.deleted.user:
400                     # wrap user-defined editors in quotes for fread
401                     rev_data['editor'] = '"' + rev.user.text + '"'
402                     rev_data['anon'] = "TRUE" if rev.user.id == None else "FALSE"
403                     
404                 else:
405                     rev_data['anon'] = ""
406                     rev_data['editor'] = ""
407
408                 #if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I):
409                 #    redirect = True
410                 #else:
411                 #    redirect = False
412                 
413                 #TODO missing: additions_size deletions_size
414                 
415                 # if collapse user was on, lets run that
416                 if self.collapse_user:
417                     rev_data['collapsed_revs'] = rev.collapsed_revs
418
419                 if self.persist != PersistMethod.none:
420                     if rev.deleted.text:
421                         for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]:
422                             old_rev_data[k] = None
423                     else:
424
425                         if self.persist != PersistMethod.legacy:
426                             _, tokens_added, tokens_removed = state.update(rev.text, rev.id)
427
428                         else:
429                             _, tokens_added, tokens_removed = state.process(rev.text, rev.id, text_sha1)
430                             
431                         window.append((rev.id, rev_data, tokens_added, tokens_removed))
432                         
433                         if len(window) == PERSISTENCE_RADIUS:
434                             old_rev_id, old_rev_data, old_tokens_added, old_tokens_removed = window[0]
435                             
436                             num_token_revs, num_tokens = calculate_persistence(old_tokens_added)
437
438                             old_rev_data["token_revs"] = num_token_revs
439                             old_rev_data["tokens_added"] = num_tokens
440                             old_rev_data["tokens_removed"] = len(old_tokens_removed)
441                             old_rev_data["tokens_window"] = PERSISTENCE_RADIUS-1
442
443                             self.print_rev_data(old_rev_data)
444
445                 else:
446                     self.print_rev_data(rev_data)
447
448                 rev_count += 1
449
450             if self.persist != PersistMethod.none:
451                 # print out metadata for the last RADIUS revisions
452                 for i, item in enumerate(window):
453                     # if the window was full, we've already printed item 0
454                     if len(window) == PERSISTENCE_RADIUS and i == 0:
455                         continue
456
457                     rev_id, rev_data, tokens_added, tokens_removed = item
458                     num_token_revs, num_tokens = calculate_persistence(tokens_added)
459
460                     rev_data["token_revs"] = num_token_revs
461                     rev_data["tokens_added"] = num_tokens
462                     rev_data["tokens_removed"] = len(tokens_removed)
463                     rev_data["tokens_window"] = len(window)-(i+1)
464                     self.print_rev_data(rev_data)
465
466             page_count += 1
467
468         print("Done: %s revisions and %s pages." % (rev_count, page_count),
469               file=sys.stderr)
470
471         if self.output_parquet is True:
472             self.flush_parquet_buffer()
473             self.pq_writer.close()
474
475         else:
476             output_file.close()
477
478
479     def write_parquet_row(self, rev_data):
480         if 'deleted' in rev_data.keys():
481             rev_data['deleted'] = True if rev_data['deleted'] == "TRUE" else False
482
483         if 'minor' in rev_data.keys():
484             rev_data['minor'] = True if rev_data['minor'] == "TRUE" else False
485
486
487         if 'anon' in rev_data.keys():
488             rev_data['anon'] = True if rev_data['anon'] == "TRUE" else False
489
490
491         self.parquet_buffer.append(rev_data)
492
493         if len(self.parquet_buffer) >= self.parquet_buffer_size:
494             self.flush_parquet_buffer()
495
496     def flush_parquet_buffer(self):
497         outtable = pd.DataFrame.from_records(self.parquet_buffer) 
498         outtable = pa.Table.from_pandas(outtable)                        
499         if self.pq_writer is None:
500             schema = outtable.schema
501             for regex_schema in self.regex_schemas:
502                 schema.append(regex_schema)
503
504             self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
505
506         self.pq_writer.write_table(outtable)
507
508     def print_rev_data(self, rev_data):
509
510         if self.output_parquet is False:
511             printfunc = lambda rev_data: print("\t".join(rev_data), file=self.output_file)
512         else:
513             printfunc = self.write_parquet_row
514
515         # if it's the first time through, print the header
516         if self.urlencode:
517             for field in TO_ENCODE:
518                 rev_data[field] = quote(str(rev_data[field]))
519
520         if not self.printed_header:
521             printfunc(rev_data)
522             self.printed_header = True
523         
524         printfunc(rev_data)
525
526
527 def open_input_file(input_filename):
528     if re.match(r'.*\.7z$', input_filename):
529         cmd = ["7za", "x", "-so", input_filename, "*.xml"] 
530     elif re.match(r'.*\.gz$', input_filename):
531         cmd = ["zcat", input_filename] 
532     elif re.match(r'.*\.bz2$', input_filename):
533         cmd = ["bzcat", "-dk", input_filename] 
534
535     try:
536         input_file = Popen(cmd, stdout=PIPE).stdout
537     except NameError:
538         input_file = open(input_filename, 'r')
539
540     return input_file
541
542 def get_output_filename(input_filename, parquet = False):
543     output_filename = re.sub(r'\.(7z|gz|bz2)?$', '', input_filename)
544     output_filename = re.sub(r'\.xml', '', output_filename)
545     if parquet is False:
546         output_filename = output_filename + ".tsv"
547     else:
548         output_filename = output_filename + ".parquet"
549     return output_filename
550
551 def open_output_file(input_filename):
552     # create a regex that creates the output filename
553     output_filename = get_output_filename(input_filename, parquet = False)
554     output_file = open(output_filename, "w")
555     return output_file
556
557 parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
558
559 # arguments for the input direction
560 parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str, 
561                     help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
562
563 parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
564                     help="Directory for output files. If it ends with .parquet output will be in parquet format.")
565
566 parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
567                     help="Write output to standard out (do not create dump file)")
568
569 parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
570                     help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
571
572 parser.add_argument('-p', '--persistence', dest="persist", default=None, const='', type=str, choices = ['','segment','sequence','legacy'], nargs='?',
573                     help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure. This may by slow.  The defualt is -p=sequence, which uses the same algorithm as in the past, but with improvements to wikitext parsing. Use -p=legacy for old behavior used in older research projects. Use -p=segment for advanced persistence calculation method that is robust to content moves, but prone to bugs, and slower.")
574
575 parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
576                     help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
577
578 parser.add_argument('-n', '--namespace-include', dest="namespace_filter", type=int, action='append',
579                     help="Id number of namspace to include. Can be specified more than once.")
580
581 parser.add_argument('-rr',
582                     '--revert-radius',
583                     dest="revert_radius",
584                     type=int,
585                     action='store',
586                     default=15,
587                     help="Number of edits to check when looking for reverts (default: 15)")
588
589 parser.add_argument('-RP', '--revision-pattern', dest="regex_match_revision", default=None, type=str, action='append',
590                     help="The regular expression to search for in revision text. The regex must be surrounded by quotes.")
591
592 parser.add_argument('-RPl', '--revision-pattern-label', dest="regex_revision_label", default=None, type=str, action='append',
593                     help="The label for the outputted column based on matching the regex in revision text.")
594
595 parser.add_argument('-CP', '--comment-pattern', dest="regex_match_comment", default=None, type=str, action='append',
596                     help="The regular expression to search for in comments of revisions.")
597
598 parser.add_argument('-CPl', '--comment-pattern-label', dest="regex_comment_label", default=None, type=str, action='append',
599                     help="The label for the outputted column based on matching the regex in comments.")
600
601 args = parser.parse_args()
602
603
604
605 # set persistence method
606
607 if args.persist is None:
608     persist = PersistMethod.none
609 elif args.persist == "segment":
610     persist = PersistMethod.segment
611 elif args.persist == "legacy":
612     persist = PersistMethod.legacy
613 else:
614     persist = PersistMethod.sequence
615
616 if args.namespace_filter is not None:
617     namespaces = args.namespace_filter
618 else:
619     namespaces = None
620
621 if len(args.dumpfiles) > 0:
622     output_parquet = False
623     for filename in args.dumpfiles:
624         input_file = open_input_file(filename)
625
626         # open directory for output
627         if args.output_dir:
628             output_dir = args.output_dir[0]
629         else:
630             output_dir = "."
631
632         if output_dir.endswith(".parquet"):
633             output_parquet = True
634
635         print("Processing file: %s" % filename, file=sys.stderr)
636
637         if args.stdout:
638             output_file = sys.stdout
639         else:
640             filename = os.path.join(output_dir, os.path.basename(filename))
641             output_file = get_output_filename(filename, parquet = output_parquet)
642
643         wikiq = WikiqParser(input_file,
644                             output_file,
645                             collapse_user=args.collapse_user,
646                             persist=persist,
647                             urlencode=args.urlencode,
648                             namespaces=namespaces,
649                             revert_radius=args.revert_radius,
650                             regex_match_revision = args.regex_match_revision,
651                             regex_revision_label = args.regex_revision_label,
652                             regex_match_comment = args.regex_match_comment,
653                             regex_comment_label = args.regex_comment_label,
654                             output_parquet=output_parquet)
655
656         print(wikiq.output_parquet)
657         wikiq.process()
658
659         # close things 
660         input_file.close()
661
662 else:
663     wikiq = WikiqParser(sys.stdin,
664                         sys.stdout,
665                         collapse_user=args.collapse_user,
666                         persist=persist,
667                         #persist_legacy=args.persist_legacy,
668                         urlencode=args.urlencode,
669                         namespaces=namespaces,
670                         revert_radius=args.revert_radius,
671                         regex_match_revision = args.regex_match_revision,
672                         regex_revision_label = args.regex_revision_label,
673                         regex_match_comment = args.regex_match_comment,
674                         regex_comment_label = args.regex_comment_label)
675
676     wikiq.process() 
677
678 # stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
679 # stop_words = stop_words.split(",")

Community Data Science Collective || Want to submit a patch?