X-Git-Url: https://code.communitydata.science/mediawiki_dump_tools.git/blobdiff_plain/418fa020e53619503fc85848c82f383d8739eda9..daf1851cbb6fa902f5f1665073243513f3c5207e:/bin/wikiq_users diff --git a/bin/wikiq_users b/bin/wikiq_users new file mode 100755 index 0000000..c02b922 --- /dev/null +++ b/bin/wikiq_users @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import dask.dataframe as dd +import pandas as pd +import csv +import re +import os +import argparse +import fcntl +import sys +import errno +import time +import numpy as np +import struct +from urllib.parse import unquote +sys.path.append("..") +from hashlib import sha256 + +from wikiq_util import IPCheck +from wikiq_util import TO_ENCODE +from wikiq_util import try_unquote + +def parse_args(): + parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') + parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str) + parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str) + parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str) + parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true") + parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true") + parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str) + args = parser.parse_args() + return(args) + +# This script does not do some of the things that might be useful that Jeremy's script did. +# We don't remove bots +# We don't exit on Tech Wiki +# We don't accept an EDITOR_IGNORE_LIST +# We don't have a username-userid mapping file +# We don't remove anonymous editors (though we do indicate IP edits as anon. +# We don't remove any rows, including for malformed data +if __name__ == "__main__": + + args = parse_args() + id_dict = {} + + if not args.no_cluster: + # set up dask distributed + from dask.distributed import Client, LocalCluster + import multiprocessing as mp + cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True) + client = Client(cluster) + + input_file = args.input_file + d = dd.read_table(input_file, dtype={"anon":np.bool, + "articleid":int, + "deleted":bool, + "editor":str, + "minor":bool, + "namespace":np.int32, + "revert":bool, + "reverteds":str, + "revid":int, + "sha1":str, + "title":str}, + true_values=["TRUE"], + false_values=["FALSE"], + parse_dates=["date_time"], + infer_datetime_format=True + ) + + if args.wiki is None: + wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1) + else: + wiki = args.wiki + + d['wiki'] = wiki + + for col in TO_ENCODE: + d[col+"old"] = d[col] + d[col] = d[col].apply(try_unquote, meta=(col,str)) + + d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str)) + d['anon'] = (d['anon'] == True) | d['IPAnon'] + d = d.drop('IPAnon',axis=1) + d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s') + d['timestamp'] = d['timestamp'].astype(int) + # create a new unique identifier by hashing the editor name or editor ip + + # first sort by editor + d = d.set_index(d["date_time"]) + d = d.map_partitions(lambda x: x.sort_index()) + + d['editor_sha'] = d['editor'].apply(lambda x: + sha256(x.encode()).hexdigest() + if x is not None + else None, + meta=("editor_sha",str) + ) + + editor_groups = d.groupby('editor') + d['editor_nth_edit'] = editor_groups.cumcount() + d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1))) + + editor_wiki_groups = d.groupby(['editor_sha','wiki']) + d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount() + d = editor_wiki_groups.apply(lambda df: + df.assign( + tminus_editor_wiki_edit=df.date_time.diff(1) + )) + + editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace']) + d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount() + d = editor_namespace_groups.apply(lambda df: + df.assign( + tminus_namespace_wiki_edit=df.date_time.diff(1) + )) + + editor_article_groups = d.groupby(['editor_sha','wiki','articleid']) + d['editor_nth_article_edit'] = editor_article_groups.cumcount() + d = editor_article_groups.apply(lambda df: + df.assign(tminus_editor_article_edit=df.date_time.diff(1))) + + + d = d.persist() + + if not os.path.exists(args.output_dr): + os.mkdir(args.output_dir +) + + if args.output_format == "csv": + d_csv = d + for col in TO_ENCODE: + d_csv = d_csv.drop(col,axis=1) + d_csv[col] = d_csv[col+'old'] + d.to_csv() + else: + for col in TO_ENCODE: + d = d.drop(col + 'old', axis=1) + + d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"}) + + # for writing to csv we need to urlencode + +if __name__ == '__main__': + main()