X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/c2854026831c54085ed254c97bbdff0779c9e3e0..bb83d62b745d61e761a1be011e814bbb6aa241aa:/wikiq diff --git a/wikiq b/wikiq index 1a951e8..bffbbf4 100755 --- a/wikiq +++ b/wikiq @@ -26,7 +26,7 @@ from deltas import SequenceMatcher from deltas import SegmentMatcher import dataclasses as dc -from dataclasses import dataclass, make_dataclass +from dataclasses import dataclass import pyarrow as pa import pyarrow.parquet as pq @@ -133,6 +133,11 @@ class WikiqPage(): return next(self.__revisions) +""" +A RegexPair is defined by a regular expression (pattern) and a label. +The pattern can include capture groups. If it does then each capture group will have a resulting column in the output. +If the pattern does not include a capture group, then only one output column will result. +""" class RegexPair(object): def __init__(self, pattern, label): self.pattern = re.compile(pattern) @@ -201,6 +206,22 @@ class RegexPair(object): return rev_data +""" + +We used to use a dictionary to collect fields for the output. +Now we use dataclasses. Compared to a dictionary, this should help: +- prevent some bugs +- make it easier to output parquet data. +- use class attribute '.' syntax instead of dictionary syntax. +- improve support for tooling (autocomplete, type hints) +- use type information to define formatting rules + +Depending on the parameters passed into Wikiq, the output schema can be different. +Therefore, we need to end up constructing a dataclass with the correct output schema. +It also needs to have the correct pyarrow schema so we can write parquet files. + +The RevDataBase type has all the fields that will be output no matter how wikiq is invoked. +""" @dataclass() class RevDataBase(): revid: int @@ -218,10 +239,16 @@ class RevDataBase(): editor: str = None anon: bool = None + # toggles url encoding. this isn't a dataclass field since it doesn't have a type annotation urlencode = False + + # defines pyarrow schema. + # each field in the data class needs an entry in this array. + # the names should match and be in the same order. + # this isn't a dataclass field since it doesn't have a type annotation pa_schema_fields = [ pa.field("revid", pa.int64()), - pa.field("date_time",pa.timestamp('ms')), + pa.field("date_time", pa.timestamp('ms')), pa.field("articleid",pa.int64()), pa.field("editorid",pa.int64()), pa.field("title",pa.string()), @@ -236,9 +263,11 @@ class RevDataBase(): pa.field("anon",pa.bool_()) ] + # pyarrow is a columnar format, so most of the work happens in the flush_parquet_buffer function def to_pyarrow(self): return dc.astuple(self) + # logic to convert each field into the wikiq tsv format goes here. def to_tsv_row(self): row = [] @@ -275,12 +304,26 @@ class RevDataBase(): def header_row(self): return '\t'.join(map(lambda f: f.name, dc.fields(self))) +""" + +If collapse=True we'll use a RevDataCollapse dataclass. +This class inherits from RevDataBase. This means that it has all the same fields and functions. + +It just adds a new field and updates the pyarrow schema. + +""" @dataclass() class RevDataCollapse(RevDataBase): collapsed_revs:int = None + pa_collapsed_revs_schema = pa.field('collapsed_revs',pa.int64()) pa_schema_fields = RevDataBase.pa_schema_fields + [pa_collapsed_revs_schema] +""" + +If persistence data is to be computed we'll need the fields added by RevDataPersistence. + +""" @dataclass() class RevDataPersistence(RevDataBase): token_revs:int = None @@ -296,6 +339,10 @@ class RevDataPersistence(RevDataBase): pa_schema_fields = RevDataBase.pa_schema_fields + pa_persistence_schema_fields +""" +class RevDataCollapsePersistence uses multiple inheritence to make a class that has both persistence and collapse fields. + +""" @dataclass() class RevDataCollapsePersistence(RevDataCollapse, RevDataPersistence): pa_schema_fields = RevDataCollapse.pa_schema_fields + RevDataPersistence.pa_persistence_schema_fields @@ -323,6 +370,9 @@ class WikiqParser(): self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label) self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label) + + # This is where we set the type for revdata. + if self.collapse_user is True: if self.persist == PersistMethod.none: revdata_type = RevDataCollapse @@ -333,16 +383,23 @@ class WikiqParser(): else: revdata_type = RevDataBase + # if there are regex fields, we need to add them to the revdata type. regex_fields = [(field.name, list[str], dc.field(default=None)) for field in self.regex_schemas] - self.revdata_type = make_dataclass('RevData_Parser', - fields=regex_fields, - bases=(revdata_type,)) + # make_dataclass is a function that defines a new dataclass type. + # here we extend the type we have already chosen and add the regular expression types + self.revdata_type = dc.make_dataclass('RevData_Parser', + fields=regex_fields, + bases=(revdata_type,)) + # we also need to make sure that we have the right pyarrow schema self.revdata_type.pa_schema_fields = revdata_type.pa_schema_fields + self.regex_schemas self.revdata_type.urlencode = self.urlencode + self.schema = pa.schema(self.revdata_type.pa_schema_fields) + + # here we initialize the variables we need for output. if output_parquet is True: self.output_parquet = True self.pq_writer = None @@ -451,6 +508,7 @@ class WikiqParser(): # Iterate through a page's revisions for rev in page: + # create a new data object instead of a dictionary. rev_data = self.revdata_type(revid = rev.id, date_time = datetime.fromtimestamp(rev.timestamp.unix(), tz=timezone.utc), articleid = page.id, @@ -556,6 +614,7 @@ class WikiqParser(): print("Done: %s revisions and %s pages." % (rev_count, page_count), file=sys.stderr) + # remember to flush the parquet_buffer if we're done if self.output_parquet is True: self.flush_parquet_buffer() self.pq_writer.close() @@ -564,6 +623,10 @@ class WikiqParser(): self.output_file.close() + """ + For performance reasons it's better to write parquet in batches instead of one row at a time. + So this function just puts the data on a buffer. If the buffer is full, then it gets flushed (written). + """ def write_parquet_row(self, rev_data): padata = rev_data.to_pyarrow() self.parquet_buffer.append(padata) @@ -572,10 +635,16 @@ class WikiqParser(): self.flush_parquet_buffer() + """ + Function that actually writes data to the parquet file. + It needs to transpose the data from row-by-row to column-by-column + """ def flush_parquet_buffer(self): - schema = pa.schema(self.revdata_type.pa_schema_fields) - def row_to_col(rg, types): + """ + Returns the pyarrow table that we'll write + """ + def rows_to_table(rg, schema): cols = [] first = rg[0] for col in first: @@ -586,17 +655,18 @@ class WikiqParser(): cols[j].append(row[j]) arrays = [] - for col, typ in zip(cols, types): + for col, typ in zip(cols, schema.types): arrays.append(pa.array(col, typ)) - return arrays + return pa.Table.from_arrays(arrays, schema=schema) - outtable = pa.Table.from_arrays(row_to_col(self.parquet_buffer, schema.types), schema=schema) + outtable = rows_to_table(self.parquet_buffer, self.schema) if self.pq_writer is None: self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark') self.pq_writer.write_table(outtable) self.parquet_buffer = [] + # depending on if we are configured to write tsv or parquet, we'll call a different function. def print_rev_data(self, rev_data): if self.output_parquet is False: printfunc = self.write_tsv_row