3 Builds a user level dataset. Requires a functional spark installation.
8 # add pyspark to your python path e.g.
9 #sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
10 #sys.path.append("/home/nathante/sparkstuff/spark/python/")
11 from pyspark import SparkConf
12 from pyspark.sql import SparkSession, SQLContext
13 from pyspark.sql import Window
14 import pyspark.sql.functions as f
15 from pyspark.sql import types
25 parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
26 parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
27 parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
28 # parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
29 # parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
30 parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
31 parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
32 # parser.add_argument('--ignore-input-errors', help = "ignore bad lines in input",action="store_true")
33 # parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
34 args = parser.parse_args()
37 if __name__ == "__main__":
39 conf = SparkConf().setAppName("Wiki Users Spark")
40 spark = SparkSession.builder.getOrCreate()
41 files = glob.glob(args.input_file)
42 files = [path.abspath(p) for p in files]
46 # going to have to do some coercing of the schema
49 struct = types.StructType().add("anon",types.StringType(),True)
50 struct = struct.add("articleid",types.LongType(),True)
51 struct = struct.add("date_time",types.TimestampType(), True)
52 struct = struct.add("deleted",types.BooleanType(), True)
53 struct = struct.add("editor",types.StringType(),True)
54 struct = struct.add("editor_id",types.LongType(), True)
55 struct = struct.add("minor", types.BooleanType(), True)
56 struct = struct.add("namespace", types.LongType(), True)
57 struct = struct.add("revert", types.BooleanType(), True)
58 struct = struct.add("reverteds", types.StringType(), True)
59 struct = struct.add("revid", types.LongType(), True)
60 struct = struct.add("sha1", types.StringType(), True)
61 struct = struct.add("text_chars", types.LongType(), True)
62 struct = struct.add("title",types.StringType(), True)
63 df = reader.csv(files,
69 df = df.repartition(args.num_partitions)
70 # replace na editor ids
71 df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
73 # assign which edit reverted what edit
74 reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])
75 reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
76 reverteds_df = reverteds_df.drop("reverteds")
77 reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
78 reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
79 f.explode(reverteds_df.reverteds).alias('reverted_id'))
80 df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
81 df.drop("reverted_id")
84 df = df.orderBy(df.date_time.asc())
85 win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
88 reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert'])
89 reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))
90 df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer')
94 df = df.withColumn('year', f.year(df.date_time))
95 df = df.withColumn('month',f.month(df.date_time))
96 df = df.withColumn('editor_nth_edit',f.rank().over(win))
99 if not path.exists(args.output_dir):
100 mkdir(args.output_dir)
101 if args.output_format == "csv" or args.output_format == "tsv":
102 df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
103 # format == "parquet"
105 df.write.parquet(args.output_dir, mode='overwrite')
107 # for writing to csv we need to urlencode