]> code.communitydata.science - mediawiki_dump_tools.git/blob - bin/wikiq_users
add more variables and support for persistence
[mediawiki_dump_tools.git] / bin / wikiq_users
1 #!/usr/bin/env python3
2 import dask.dataframe as dd
3 import pandas as pd
4 import csv
5 import re
6 import os
7 import argparse
8 import fcntl
9 import sys
10 import errno
11 import time
12 import numpy as np
13 import struct
14 from urllib.parse import unquote
15 sys.path.append("..")
16 from hashlib import sha256
17
18 from wikiq_util import IPCheck
19 from wikiq_util import TO_ENCODE
20 from wikiq_util import try_unquote
21
22 def parse_args():
23     parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
24     parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
25     parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
26     parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
27     parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
28     parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
29     parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
30     args = parser.parse_args()
31     return(args)
32
33 # This script does not do some of the things that might be useful that Jeremy's script did.
34 # We don't remove bots
35 # We don't exit on Tech Wiki
36 # We don't accept an EDITOR_IGNORE_LIST
37 # We don't have a username-userid mapping file
38 # We don't remove anonymous editors (though we do indicate IP edits as anon.
39 # We don't remove any rows, including for malformed data
40 if __name__ == "__main__":
41
42     args = parse_args()
43     id_dict = {}
44
45     if not args.no_cluster:
46     # set up dask distributed
47         from dask.distributed import Client, LocalCluster
48         import multiprocessing as mp
49         cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
50         client = Client(cluster)
51
52     input_file = args.input_file
53     d = dd.read_table(input_file, dtype={"anon":np.bool,
54                                          "articleid":int,
55                                         "deleted":bool,
56                                          "editor":str,
57                                          "minor":bool,
58                                          "namespace":np.int32,
59                                          "revert":bool,
60                                          "reverteds":str,
61                                          "revid":int,
62                                          "sha1":str,
63                                          "title":str},
64                       true_values=["TRUE"],
65                       false_values=["FALSE"],
66                       parse_dates=["date_time"], 
67                       infer_datetime_format=True
68     )
69
70     if args.wiki is None:
71         wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
72     else:
73         wiki = args.wiki
74
75     d['wiki'] = wiki
76
77     for col in TO_ENCODE:
78         d[col+"old"] = d[col]
79         d[col] = d[col].apply(try_unquote, meta=(col,str))
80
81     d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
82     d['anon'] = (d['anon'] == True) | d['IPAnon']
83     d = d.drop('IPAnon',axis=1)
84     d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
85     d['timestamp'] = d['timestamp'].astype(int)
86     # create a new unique identifier by hashing the editor name or editor ip
87
88     # first sort by editor
89     d = d.set_index(d["date_time"])
90     d = d.map_partitions(lambda x: x.sort_index())
91
92     d['editor_sha'] = d['editor'].apply(lambda x:
93                                         sha256(x.encode()).hexdigest()
94                                         if x is not None
95                                         else None,
96                                         meta=("editor_sha",str)
97                                         )
98
99     editor_groups = d.groupby('editor')
100     d['editor_nth_edit'] = editor_groups.cumcount()
101     d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
102     
103     editor_wiki_groups = d.groupby(['editor_sha','wiki'])
104     d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
105     d = editor_wiki_groups.apply(lambda df:
106                                  df.assign(
107                                      tminus_editor_wiki_edit=df.date_time.diff(1)
108                                  ))
109
110     editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
111     d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
112     d = editor_namespace_groups.apply(lambda df:
113                                  df.assign(
114                                      tminus_namespace_wiki_edit=df.date_time.diff(1)
115                                  ))
116     
117     editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
118     d['editor_nth_article_edit'] = editor_article_groups.cumcount()
119     d = editor_article_groups.apply(lambda df:
120                                     df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
121
122
123     d = d.persist()
124
125     if not os.path.exists(args.output_dr):
126         os.mkdir(args.output_dir
127 )
128
129     if args.output_format == "csv":
130         d_csv = d
131         for col in TO_ENCODE:
132             d_csv = d_csv.drop(col,axis=1)
133             d_csv[col] = d_csv[col+'old']
134         d.to_csv()
135     else:
136         for col in TO_ENCODE:
137             d = d.drop(col + 'old', axis=1)
138             
139         d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
140
141     # for writing to csv we need to urlencode
142
143 if __name__ == '__main__':
144     main()

Community Data Science Collective || Want to submit a patch?