]> code.communitydata.science - mediawiki_dump_tools.git/blob - wikiq_users/wikiq_users_spark.py
Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into...
[mediawiki_dump_tools.git] / wikiq_users / wikiq_users_spark.py
1 #!/usr/bin/env python3
2 """ 
3 Builds a user level dataset. Requires a functional spark installation.
4
5 """
6
7 import sys
8 # add pyspark to your python path e.g.
9 #sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
10 #sys.path.append("/home/nathante/sparkstuff/spark/python/")
11 from pyspark import SparkConf
12 from pyspark.sql import SparkSession, SQLContext
13 from pyspark.sql import Window
14 import pyspark.sql.functions as f
15 from pyspark.sql import types
16 import argparse
17 import glob
18 from os import mkdir
19 from os import path
20 from wikiq_util import PERSISTENCE_RADIUS
21 #read a table
22
23 def parse_args():
24
25     parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
26     parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
27     parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
28 #    parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
29     parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
30     parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
31     parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
32     parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
33 #    parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
34     args = parser.parse_args()
35     return(args)
36
37 if __name__ == "__main__":
38     args = parse_args()
39     conf = SparkConf().setAppName("Wiki Users Spark")
40     spark = SparkSession.builder.getOrCreate()
41
42     # test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
43     files = glob.glob(args.input_file)
44     files = [path.abspath(p) for p in files]
45
46     read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
47     read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
48
49     # going to have to do some coercing of the schema
50
51     # build a schema
52     struct = types.StructType().add("anon",types.StringType(),True)
53     struct = struct.add("articleid",types.LongType(),True)
54     
55     if read_collapse is True:
56         struct = struct.add("collapsed_revs", types.IntegerType(), True)
57
58     struct = struct.add("date_time",types.TimestampType(), True)
59     struct = struct.add("deleted",types.BooleanType(), True)
60     struct = struct.add("editor",types.StringType(),True)
61     struct = struct.add("editor_id",types.LongType(), True)
62     struct = struct.add("minor", types.BooleanType(), True)
63     struct = struct.add("namespace", types.LongType(), True)
64     struct = struct.add("revert", types.BooleanType(), True)
65     struct = struct.add("reverteds", types.StringType(), True)
66     struct = struct.add("revid", types.LongType(), True)
67     struct = struct.add("sha1", types.StringType(), True)
68     struct = struct.add("text_chars", types.LongType(), True)
69     struct = struct.add("title",types.StringType(), True)
70
71     if read_persistence is True:
72         struct = struct.add("token_revs", types.IntegerType(),True)
73         struct = struct.add("tokens_added", types.IntegerType(),True)
74         struct = struct.add("tokens_removed", types.IntegerType(),True)
75         struct = struct.add("tokens_window", types.IntegerType(),True)
76
77
78     reader = spark.read
79
80     df = reader.csv(files,
81                     sep='\t',
82                     inferSchema=False,
83                     header=True,
84                     mode="PERMISSIVE",
85                     schema = struct)
86     df = df.repartition(args.num_partitions)
87
88     # replace na editor ids
89     df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
90
91     # sort by datetime 
92     df = df.orderBy(df.date_time.asc())
93
94     # create our window_specs
95     ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
96     art_win = Window.orderBy("date_time").partitionBy("articleid")
97
98     # assign which edit reverted what edit
99     reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
100     reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
101     reverteds_df = reverteds_df.drop("reverteds")
102     reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
103     reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
104
105     reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
106
107     
108     df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
109     df = df.drop("reverted_id")    
110     del(reverteds_df_explode)
111
112     reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
113     df = df.join(reverteds_df, on= ["revid"], how='left_outer')
114
115     del(reverteds_df)
116
117     # count reverts
118     reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
119     reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
120
121     # articles total reverts
122     reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
123
124     # some kind of bad work around a bug
125     # see https://issues.apache.org/jira/browse/SPARK-14948
126     reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
127     df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
128     df = df.drop("r_revid")
129     del(reverts_df)
130
131     # count edits
132     df = df.withColumn('year', f.year(df.date_time))
133     df = df.withColumn('month',f.month(df.date_time))
134
135     if not read_collapse:
136         df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
137         df = df.withColumn('article_nth_edit', f.rank().over(art_win))
138     else:
139         df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
140         df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
141         df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
142         df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
143
144     # total editor's token_revs
145     if read_persistence:
146         df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
147         df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
148         df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
149         df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
150         df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
151         df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
152         df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
153     
154     # output
155     if not path.exists(args.output_dir):
156         mkdir(args.output_dir)
157     if args.output_format == "csv" or args.output_format == "tsv":
158         df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
159             # format == "parquet"
160     else:
161         df.write.parquet(args.output_dir, mode='overwrite')
162
163         # for writing to csv we need to urlencode

Community Data Science Collective || Want to submit a patch?