]> code.communitydata.science - mediawiki_dump_tools.git/commitdiff
add spark program for running group by users
authorgroceryheist <nathante@uw.edu>
Fri, 31 Aug 2018 20:40:22 +0000 (20:40 +0000)
committergroceryheist <nathante@uw.edu>
Fri, 31 Aug 2018 20:40:22 +0000 (20:40 +0000)
wikiq_users/run_wikiq_users_cluster.sh [new file with mode: 0755]
wikiq_users/run_wikiq_users_standalone.sh [new file with mode: 0755]
wikiq_users/wikiq_users_spark.py [new file with mode: 0755]

diff --git a/wikiq_users/run_wikiq_users_cluster.sh b/wikiq_users/run_wikiq_users_cluster.sh
new file mode 100755 (executable)
index 0000000..beca0f9
--- /dev/null
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+spark-submit --master  spark://n0649:18899 wikiq_users_spark.py --output-format parquet  -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o  "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500
diff --git a/wikiq_users/run_wikiq_users_standalone.sh b/wikiq_users/run_wikiq_users_standalone.sh
new file mode 100755 (executable)
index 0000000..9d7d033
--- /dev/null
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+spark-submit benchmark_spark.py --output-format csv  -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o  "./out.tsv" --num-partitions 2
diff --git a/wikiq_users/wikiq_users_spark.py b/wikiq_users/wikiq_users_spark.py
new file mode 100755 (executable)
index 0000000..9f1aa24
--- /dev/null
@@ -0,0 +1,107 @@
+#!/usr/bin/env python3
+""" 
+Builds a user level dataset. Requires a functional spark installation.
+
+"""
+
+import sys
+# add pyspark to your python path e.g.
+#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
+#sys.path.append("/home/nathante/sparkstuff/spark/python/")
+from pyspark import SparkConf
+from pyspark.sql import SparkSession, SQLContext
+from pyspark.sql import Window
+import pyspark.sql.functions as f
+from pyspark.sql import types
+import argparse
+import glob
+from os import mkdir
+from os import path
+
+#read a table
+
+def parse_args():
+
+    parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
+    parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
+    parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
+#    parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
+#    parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
+    parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
+    parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
+#    parser.add_argument('--ignore-input-errors', help = "ignore bad lines in input",action="store_true")
+#    parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
+    args = parser.parse_args()
+    return(args)
+
+if __name__ == "__main__":
+    args = parse_args()
+    conf = SparkConf().setAppName("Wiki Users Spark")
+    spark = SparkSession.builder.getOrCreate()
+    files = glob.glob(args.input_file)
+    files = [path.abspath(p) for p in files]
+    reader = spark.read
+
+
+    # going to have to do some coercing of the schema
+
+    # build a schema
+    struct = types.StructType().add("anon",types.StringType(),True)
+    struct = struct.add("articleid",types.LongType(),True)
+    struct = struct.add("date_time",types.TimestampType(), True)
+    struct = struct.add("deleted",types.BooleanType(), True)
+    struct = struct.add("editor",types.StringType(),True)
+    struct = struct.add("editor_id",types.LongType(), True)
+    struct = struct.add("minor", types.BooleanType(), True)
+    struct = struct.add("namespace", types.LongType(), True)
+    struct = struct.add("revert", types.BooleanType(), True)
+    struct = struct.add("reverteds", types.StringType(), True)
+    struct = struct.add("revid", types.LongType(), True)
+    struct = struct.add("sha1", types.StringType(), True)
+    struct = struct.add("text_chars", types.LongType(), True)
+    struct = struct.add("title",types.StringType(), True)
+    df = reader.csv(files,
+                    sep='\t',
+                    inferSchema=False,
+                    header=True,
+                    mode="PERMISSIVE",
+                    schema = struct)
+    df = df.repartition(args.num_partitions)
+    # replace na editor ids
+    df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
+
+    # assign which edit reverted what edit
+    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])
+    reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
+    reverteds_df = reverteds_df.drop("reverteds")
+    reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
+    reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
+                                       f.explode(reverteds_df.reverteds).alias('reverted_id'))
+    df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
+    df.drop("reverted_id")    
+
+    # sort by datetime 
+    df = df.orderBy(df.date_time.asc())
+    win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
+
+    # count reverts
+    reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert'])
+    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))
+    df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer')
+    del(reverts_df)
+
+    # count edits
+    df = df.withColumn('year', f.year(df.date_time))
+    df = df.withColumn('month',f.month(df.date_time))
+    df = df.withColumn('editor_nth_edit',f.rank().over(win))
+    
+    # output
+    if not path.exists(args.output_dir):
+        mkdir(args.output_dir)
+    if args.output_format == "csv" or args.output_format == "tsv":
+        df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
+            # format == "parquet"
+    else:
+        df.write.parquet(args.output_dir, mode='overwrite')
+
+        # for writing to csv we need to urlencode

Community Data Science Collective || Want to submit a patch?