2 import dask.dataframe as dd
14 from urllib.parse import unquote
16 from hashlib import sha256
18 from wikiq_util import IPCheck
19 from wikiq_util import TO_ENCODE
20 from wikiq_util import try_unquote
23 parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
24 parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
25 parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
26 parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
27 parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
28 parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
29 parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
30 args = parser.parse_args()
33 # This script does not do some of the things that might be useful that Jeremy's script did.
34 # We don't remove bots
35 # We don't exit on Tech Wiki
36 # We don't accept an EDITOR_IGNORE_LIST
37 # We don't have a username-userid mapping file
38 # We don't remove anonymous editors (though we do indicate IP edits as anon.
39 # We don't remove any rows, including for malformed data
40 if __name__ == "__main__":
45 if not args.no_cluster:
46 # set up dask distributed
47 from dask.distributed import Client, LocalCluster
48 import multiprocessing as mp
49 cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
50 client = Client(cluster)
52 input_file = args.input_file
53 d = dd.read_table(input_file, dtype={"anon":np.bool,
65 false_values=["FALSE"],
66 parse_dates=["date_time"],
67 infer_datetime_format=True
71 wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
79 d[col] = d[col].apply(try_unquote, meta=(col,str))
81 d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
82 d['anon'] = (d['anon'] == True) | d['IPAnon']
83 d = d.drop('IPAnon',axis=1)
84 d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
85 d['timestamp'] = d['timestamp'].astype(int)
86 # create a new unique identifier by hashing the editor name or editor ip
88 # first sort by editor
89 d = d.set_index(d["date_time"])
90 d = d.map_partitions(lambda x: x.sort_index())
92 d['editor_sha'] = d['editor'].apply(lambda x:
93 sha256(x.encode()).hexdigest()
96 meta=("editor_sha",str)
99 editor_groups = d.groupby('editor')
100 d['editor_nth_edit'] = editor_groups.cumcount()
101 d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
103 editor_wiki_groups = d.groupby(['editor_sha','wiki'])
104 d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
105 d = editor_wiki_groups.apply(lambda df:
107 tminus_editor_wiki_edit=df.date_time.diff(1)
110 editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
111 d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
112 d = editor_namespace_groups.apply(lambda df:
114 tminus_namespace_wiki_edit=df.date_time.diff(1)
117 editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
118 d['editor_nth_article_edit'] = editor_article_groups.cumcount()
119 d = editor_article_groups.apply(lambda df:
120 df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
125 if not os.path.exists(args.output_dr):
126 os.mkdir(args.output_dir
129 if args.output_format == "csv":
131 for col in TO_ENCODE:
132 d_csv = d_csv.drop(col,axis=1)
133 d_csv[col] = d_csv[col+'old']
136 for col in TO_ENCODE:
137 d = d.drop(col + 'old', axis=1)
139 d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
141 # for writing to csv we need to urlencode
143 if __name__ == '__main__':