From bc1f5428f0a501d92eb89b840f245a6e8bf9e89d Mon Sep 17 00:00:00 2001 From: groceryheist Date: Fri, 31 Aug 2018 20:40:22 +0000 Subject: [PATCH] add spark program for running group by users --- wikiq_users/run_wikiq_users_cluster.sh | 2 + wikiq_users/run_wikiq_users_standalone.sh | 2 + wikiq_users/wikiq_users_spark.py | 107 ++++++++++++++++++++++ 3 files changed, 111 insertions(+) create mode 100755 wikiq_users/run_wikiq_users_cluster.sh create mode 100755 wikiq_users/run_wikiq_users_standalone.sh create mode 100755 wikiq_users/wikiq_users_spark.py diff --git a/wikiq_users/run_wikiq_users_cluster.sh b/wikiq_users/run_wikiq_users_cluster.sh new file mode 100755 index 0000000..beca0f9 --- /dev/null +++ b/wikiq_users/run_wikiq_users_cluster.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +spark-submit --master spark://n0649:18899 wikiq_users_spark.py --output-format parquet -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 diff --git a/wikiq_users/run_wikiq_users_standalone.sh b/wikiq_users/run_wikiq_users_standalone.sh new file mode 100755 index 0000000..9d7d033 --- /dev/null +++ b/wikiq_users/run_wikiq_users_standalone.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +spark-submit benchmark_spark.py --output-format csv -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o "./out.tsv" --num-partitions 2 diff --git a/wikiq_users/wikiq_users_spark.py b/wikiq_users/wikiq_users_spark.py new file mode 100755 index 0000000..9f1aa24 --- /dev/null +++ b/wikiq_users/wikiq_users_spark.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Builds a user level dataset. Requires a functional spark installation. + +""" + +import sys +# add pyspark to your python path e.g. +#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark") +#sys.path.append("/home/nathante/sparkstuff/spark/python/") +from pyspark import SparkConf +from pyspark.sql import SparkSession, SQLContext +from pyspark.sql import Window +import pyspark.sql.functions as f +from pyspark.sql import types +import argparse +import glob +from os import mkdir +from os import path + +#read a table + +def parse_args(): + + parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') + parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str) + parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str) +# parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str) +# parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true") + parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str) + parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1) +# parser.add_argument('--ignore-input-errors', help = "ignore bad lines in input",action="store_true") +# parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int) + args = parser.parse_args() + return(args) + +if __name__ == "__main__": + args = parse_args() + conf = SparkConf().setAppName("Wiki Users Spark") + spark = SparkSession.builder.getOrCreate() + files = glob.glob(args.input_file) + files = [path.abspath(p) for p in files] + reader = spark.read + + + # going to have to do some coercing of the schema + + # build a schema + struct = types.StructType().add("anon",types.StringType(),True) + struct = struct.add("articleid",types.LongType(),True) + struct = struct.add("date_time",types.TimestampType(), True) + struct = struct.add("deleted",types.BooleanType(), True) + struct = struct.add("editor",types.StringType(),True) + struct = struct.add("editor_id",types.LongType(), True) + struct = struct.add("minor", types.BooleanType(), True) + struct = struct.add("namespace", types.LongType(), True) + struct = struct.add("revert", types.BooleanType(), True) + struct = struct.add("reverteds", types.StringType(), True) + struct = struct.add("revid", types.LongType(), True) + struct = struct.add("sha1", types.StringType(), True) + struct = struct.add("text_chars", types.LongType(), True) + struct = struct.add("title",types.StringType(), True) + df = reader.csv(files, + sep='\t', + inferSchema=False, + header=True, + mode="PERMISSIVE", + schema = struct) + df = df.repartition(args.num_partitions) + # replace na editor ids + df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip')) + + # assign which edit reverted what edit + reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) + reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) + reverteds_df = reverteds_df.drop("reverteds") + reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") + reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), + f.explode(reverteds_df.reverteds).alias('reverted_id')) + df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') + df.drop("reverted_id") + + # sort by datetime + df = df.orderBy(df.date_time.asc()) + win = Window.orderBy('date_time').partitionBy('editor_id_or_ip') + + # count reverts + reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert']) + reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) + df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer') + del(reverts_df) + + # count edits + df = df.withColumn('year', f.year(df.date_time)) + df = df.withColumn('month',f.month(df.date_time)) + df = df.withColumn('editor_nth_edit',f.rank().over(win)) + + # output + if not path.exists(args.output_dir): + mkdir(args.output_dir) + if args.output_format == "csv" or args.output_format == "tsv": + df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") + # format == "parquet" + else: + df.write.parquet(args.output_dir, mode='overwrite') + + # for writing to csv we need to urlencode -- 2.39.5