+#!/usr/bin/env python3
+import dask.dataframe as dd
+import pandas as pd
+import csv
+import re
+import os
+import argparse
+import fcntl
+import sys
+import errno
+import time
+import numpy as np
+import struct
+from urllib.parse import unquote
+sys.path.append("..")
+from hashlib import sha256
+
+from wikiq_util import IPCheck
+from wikiq_util import TO_ENCODE
+from wikiq_util import try_unquote
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
+ parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
+ parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
+ parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
+ parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
+ parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
+ parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
+ args = parser.parse_args()
+ return(args)
+
+# This script does not do some of the things that might be useful that Jeremy's script did.
+# We don't remove bots
+# We don't exit on Tech Wiki
+# We don't accept an EDITOR_IGNORE_LIST
+# We don't have a username-userid mapping file
+# We don't remove anonymous editors (though we do indicate IP edits as anon.
+# We don't remove any rows, including for malformed data
+if __name__ == "__main__":
+
+ args = parse_args()
+ id_dict = {}
+
+ if not args.no_cluster:
+ # set up dask distributed
+ from dask.distributed import Client, LocalCluster
+ import multiprocessing as mp
+ cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
+ client = Client(cluster)
+
+ input_file = args.input_file
+ d = dd.read_table(input_file, dtype={"anon":np.bool,
+ "articleid":int,
+ "deleted":bool,
+ "editor":str,
+ "minor":bool,
+ "namespace":np.int32,
+ "revert":bool,
+ "reverteds":str,
+ "revid":int,
+ "sha1":str,
+ "title":str},
+ true_values=["TRUE"],
+ false_values=["FALSE"],
+ parse_dates=["date_time"],
+ infer_datetime_format=True
+ )
+
+ if args.wiki is None:
+ wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
+ else:
+ wiki = args.wiki
+
+ d['wiki'] = wiki
+
+ for col in TO_ENCODE:
+ d[col+"old"] = d[col]
+ d[col] = d[col].apply(try_unquote, meta=(col,str))
+
+ d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
+ d['anon'] = (d['anon'] == True) | d['IPAnon']
+ d = d.drop('IPAnon',axis=1)
+ d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
+ d['timestamp'] = d['timestamp'].astype(int)
+ # create a new unique identifier by hashing the editor name or editor ip
+
+ # first sort by editor
+ d = d.set_index(d["date_time"])
+ d = d.map_partitions(lambda x: x.sort_index())
+
+ d['editor_sha'] = d['editor'].apply(lambda x:
+ sha256(x.encode()).hexdigest()
+ if x is not None
+ else None,
+ meta=("editor_sha",str)
+ )
+
+ editor_groups = d.groupby('editor')
+ d['editor_nth_edit'] = editor_groups.cumcount()
+ d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
+
+ editor_wiki_groups = d.groupby(['editor_sha','wiki'])
+ d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
+ d = editor_wiki_groups.apply(lambda df:
+ df.assign(
+ tminus_editor_wiki_edit=df.date_time.diff(1)
+ ))
+
+ editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
+ d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
+ d = editor_namespace_groups.apply(lambda df:
+ df.assign(
+ tminus_namespace_wiki_edit=df.date_time.diff(1)
+ ))
+
+ editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
+ d['editor_nth_article_edit'] = editor_article_groups.cumcount()
+ d = editor_article_groups.apply(lambda df:
+ df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
+
+
+ d = d.persist()
+
+ if not os.path.exists(args.output_dr):
+ os.mkdir(args.output_dir
+)
+
+ if args.output_format == "csv":
+ d_csv = d
+ for col in TO_ENCODE:
+ d_csv = d_csv.drop(col,axis=1)
+ d_csv[col] = d_csv[col+'old']
+ d.to_csv()
+ else:
+ for col in TO_ENCODE:
+ d = d.drop(col + 'old', axis=1)
+
+ d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
+
+ # for writing to csv we need to urlencode
+
+if __name__ == '__main__':
+ main()