#!/usr/bin/env python3 """ Builds a user level dataset. Requires a functional spark installation. """ import sys # add pyspark to your python path e.g. #sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark") #sys.path.append("/home/nathante/sparkstuff/spark/python/") from pyspark import SparkConf from pyspark.sql import SparkSession, SQLContext from pyspark.sql import Window import pyspark.sql.functions as f from pyspark.sql import types import argparse import glob from os import mkdir from os import path from wikiq_util import PERSISTENCE_RADIUS #read a table def parse_args(): parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str) parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str) # parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str) parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true") parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str) parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1) parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"]) # parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int) args = parser.parse_args() return(args) if __name__ == "__main__": args = parse_args() conf = SparkConf().setAppName("Wiki Users Spark") spark = SparkSession.builder.getOrCreate() # test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv" files = glob.glob(args.input_file) files = [path.abspath(p) for p in files] read_persistence = args.schema_opt in ["persistence", "persistence+collapse"] read_collapse = args.schema_opt in ["collapse", "persistence+collapse"] # going to have to do some coercing of the schema # build a schema struct = types.StructType().add("anon",types.StringType(),True) struct = struct.add("articleid",types.LongType(),True) if read_collapse is True: struct = struct.add("collapsed_revs", type.IntegerType(), True) struct = struct.add("date_time",types.TimestampType(), True) struct = struct.add("deleted",types.BooleanType(), True) struct = struct.add("editor",types.StringType(),True) struct = struct.add("editor_id",types.LongType(), True) struct = struct.add("minor", types.BooleanType(), True) struct = struct.add("namespace", types.LongType(), True) struct = struct.add("revert", types.BooleanType(), True) struct = struct.add("reverteds", types.StringType(), True) struct = struct.add("revid", types.LongType(), True) struct = struct.add("sha1", types.StringType(), True) struct = struct.add("text_chars", types.LongType(), True) struct = struct.add("title",types.StringType(), True) if read_persistence is True: struct = struct.add("token_revs", types.IntegerType(),True) struct = struct.add("tokens_added", types.IntegerType(),True) struct = struct.add("tokens_removed", types.IntegerType(),True) struct = struct.add("tokens_window", types.IntegerType(),True) reader = spark.read df = reader.csv(files, sep='\t', inferSchema=False, header=True, mode="PERMISSIVE", schema = struct) df = df.repartition(args.num_partitions) # replace na editor ids df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip')) # sort by datetime df = df.orderBy(df.date_time.asc()) # create our window_specs ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip') art_win = Window.orderBy("date_time").partitionBy("articleid") # assign which edit reverted what edit reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time']) reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) reverteds_df = reverteds_df.drop("reverteds") reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win)) reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id')) df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer') df = df.drop("reverted_id") del(reverteds_df_explode) reverteds_df = reverteds_df.select("revid","editor_nth_revert_action") df = df.join(reverteds_df, on= ["revid"], how='left_outer') del(reverteds_df) # count reverts reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert') reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win)) # articles total reverts reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win)) # some kind of bad work around a bug # see https://issues.apache.org/jira/browse/SPARK-14948 reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_r evert') df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer') df = df.drop("r_revid") del(reverts_df) # count edits df = df.withColumn('year', f.year(df.date_time)) df = df.withColumn('month',f.month(df.date_time)) if not read_collapse: df = df.withColumn('editor_nth_edit', f.rank().over(ed_win)) df = df.withColumn('article_nth_edit', f.rank().over(art_win)) else: df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win)) df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win)) df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win)) df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win)) # total editor's token_revs if read_persistence: df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1)) df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win)) df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win)) df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win)) df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win)) df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win)) df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win)) # output if not path.exists(args.output_dir): mkdir(args.output_dir) if args.output_format == "csv" or args.output_format == "tsv": df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") # format == "parquet" else: df.write.parquet(args.output_dir, mode='overwrite') # for writing to csv we need to urlencode