]> code.communitydata.science - mediawiki_dump_tools.git/blobdiff - bin/wikiq_users
Use dask to parallelize and scale user level datasets
[mediawiki_dump_tools.git] / bin / wikiq_users
diff --git a/bin/wikiq_users b/bin/wikiq_users
new file mode 100755 (executable)
index 0000000..c02b922
--- /dev/null
@@ -0,0 +1,144 @@
+#!/usr/bin/env python3
+import dask.dataframe as dd
+import pandas as pd
+import csv
+import re
+import os
+import argparse
+import fcntl
+import sys
+import errno
+import time
+import numpy as np
+import struct
+from urllib.parse import unquote
+sys.path.append("..")
+from hashlib import sha256
+
+from wikiq_util import IPCheck
+from wikiq_util import TO_ENCODE
+from wikiq_util import try_unquote
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
+    parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
+    parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
+    parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
+    parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
+    parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
+    parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
+    args = parser.parse_args()
+    return(args)
+
+# This script does not do some of the things that might be useful that Jeremy's script did.
+# We don't remove bots
+# We don't exit on Tech Wiki
+# We don't accept an EDITOR_IGNORE_LIST
+# We don't have a username-userid mapping file
+# We don't remove anonymous editors (though we do indicate IP edits as anon.
+# We don't remove any rows, including for malformed data
+if __name__ == "__main__":
+
+    args = parse_args()
+    id_dict = {}
+
+    if not args.no_cluster:
+    # set up dask distributed
+        from dask.distributed import Client, LocalCluster
+        import multiprocessing as mp
+        cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
+        client = Client(cluster)
+
+    input_file = args.input_file
+    d = dd.read_table(input_file, dtype={"anon":np.bool,
+                                         "articleid":int,
+                                        "deleted":bool,
+                                         "editor":str,
+                                         "minor":bool,
+                                         "namespace":np.int32,
+                                         "revert":bool,
+                                         "reverteds":str,
+                                         "revid":int,
+                                         "sha1":str,
+                                         "title":str},
+                      true_values=["TRUE"],
+                      false_values=["FALSE"],
+                      parse_dates=["date_time"], 
+                      infer_datetime_format=True
+    )
+
+    if args.wiki is None:
+        wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
+    else:
+        wiki = args.wiki
+
+    d['wiki'] = wiki
+
+    for col in TO_ENCODE:
+        d[col+"old"] = d[col]
+        d[col] = d[col].apply(try_unquote, meta=(col,str))
+
+    d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
+    d['anon'] = (d['anon'] == True) | d['IPAnon']
+    d = d.drop('IPAnon',axis=1)
+    d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
+    d['timestamp'] = d['timestamp'].astype(int)
+    # create a new unique identifier by hashing the editor name or editor ip
+
+    # first sort by editor
+    d = d.set_index(d["date_time"])
+    d = d.map_partitions(lambda x: x.sort_index())
+
+    d['editor_sha'] = d['editor'].apply(lambda x:
+                                        sha256(x.encode()).hexdigest()
+                                        if x is not None
+                                        else None,
+                                        meta=("editor_sha",str)
+                                        )
+
+    editor_groups = d.groupby('editor')
+    d['editor_nth_edit'] = editor_groups.cumcount()
+    d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
+    
+    editor_wiki_groups = d.groupby(['editor_sha','wiki'])
+    d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
+    d = editor_wiki_groups.apply(lambda df:
+                                 df.assign(
+                                     tminus_editor_wiki_edit=df.date_time.diff(1)
+                                 ))
+
+    editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
+    d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
+    d = editor_namespace_groups.apply(lambda df:
+                                 df.assign(
+                                     tminus_namespace_wiki_edit=df.date_time.diff(1)
+                                 ))
+    
+    editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
+    d['editor_nth_article_edit'] = editor_article_groups.cumcount()
+    d = editor_article_groups.apply(lambda df:
+                                    df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
+
+
+    d = d.persist()
+
+    if not os.path.exists(args.output_dr):
+        os.mkdir(args.output_dir
+)
+
+    if args.output_format == "csv":
+        d_csv = d
+        for col in TO_ENCODE:
+            d_csv = d_csv.drop(col,axis=1)
+            d_csv[col] = d_csv[col+'old']
+        d.to_csv()
+    else:
+        for col in TO_ENCODE:
+            d = d.drop(col + 'old', axis=1)
+            
+        d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
+
+    # for writing to csv we need to urlencode
+
+if __name__ == '__main__':
+    main()

Community Data Science Collective || Want to submit a patch?