]> code.communitydata.science - mediawiki_dump_tools.git/commitdiff
add more variables and support for persistence
authorgroceryheist <nathante@uw.edu>
Fri, 31 Aug 2018 22:57:48 +0000 (15:57 -0700)
committergroceryheist <nathante@uw.edu>
Fri, 31 Aug 2018 23:00:47 +0000 (16:00 -0700)
wikiq_users/run_wikiq_users_cluster.sh
wikiq_users/wikiq_users_spark.py

index beca0f9a8770aa8baa8b55a519232a8f71c7d7cb..84e23f05152b6fef478d3b4c6acf72cdec508007 100755 (executable)
@@ -1,2 +1,2 @@
 #!/usr/bin/env bash
-spark-submit --master  spark://n0649:18899 wikiq_users_spark.py --output-format parquet  -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o  "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500
+spark-submit --master  spark://n0649:18899 wikiq_users_spark.py --output-format parquet  -i "/com/output/wikiq-enwiki-persist-sequence-20180301/enwiki/enwiki-20180301-pages-meta-history*.tsv" -o  "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 --schema-opt persistence+collapse
index 9f1aa2457e1cbf8a7a5312cbbfef548b230d863b..31d078a24a0e3d8fc9c2952d5cabca904e134d3b 100755 (executable)
@@ -17,7 +17,7 @@ import argparse
 import glob
 from os import mkdir
 from os import path
-
+from wikiq_util import PERSISTENCE_RADIUS
 #read a table
 
 def parse_args():
@@ -26,10 +26,10 @@ def parse_args():
     parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
     parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
 #    parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
-#    parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
+    parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
     parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
     parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
-#    parser.add_argument('--ignore-input-errors', help = "ignore bad lines in input",action="store_true")
+    parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
 #    parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
     args = parser.parse_args()
     return(args)
@@ -38,16 +38,23 @@ if __name__ == "__main__":
     args = parse_args()
     conf = SparkConf().setAppName("Wiki Users Spark")
     spark = SparkSession.builder.getOrCreate()
+
+    # test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
     files = glob.glob(args.input_file)
     files = [path.abspath(p) for p in files]
-    reader = spark.read
 
+    read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
+    read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
 
     # going to have to do some coercing of the schema
 
     # build a schema
     struct = types.StructType().add("anon",types.StringType(),True)
     struct = struct.add("articleid",types.LongType(),True)
+    
+    if read_collapse is True:
+        struct = struct.add("collapsed_revs", type.IntegerType(), True)
+
     struct = struct.add("date_time",types.TimestampType(), True)
     struct = struct.add("deleted",types.BooleanType(), True)
     struct = struct.add("editor",types.StringType(),True)
@@ -60,6 +67,16 @@ if __name__ == "__main__":
     struct = struct.add("sha1", types.StringType(), True)
     struct = struct.add("text_chars", types.LongType(), True)
     struct = struct.add("title",types.StringType(), True)
+
+    if read_persistence is True:
+        struct = struct.add("token_revs", types.IntegerType(),True)
+        struct = struct.add("tokens_added", types.IntegerType(),True)
+        struct = struct.add("tokens_removed", types.IntegerType(),True)
+        struct = struct.add("tokens_window", types.IntegerType(),True)
+
+
+    reader = spark.read
+
     df = reader.csv(files,
                     sep='\t',
                     inferSchema=False,
@@ -67,33 +84,73 @@ if __name__ == "__main__":
                     mode="PERMISSIVE",
                     schema = struct)
     df = df.repartition(args.num_partitions)
+
     # replace na editor ids
     df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
 
+    # sort by datetime 
+    df = df.orderBy(df.date_time.asc())
+
+    # create our window_specs
+    ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
+    art_win = Window.orderBy("date_time").partitionBy("articleid")
+
     # assign which edit reverted what edit
-    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])
+    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
     reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
     reverteds_df = reverteds_df.drop("reverteds")
     reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
-    reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
-                                       f.explode(reverteds_df.reverteds).alias('reverted_id'))
-    df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
-    df.drop("reverted_id")    
+    reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
 
-    # sort by datetime 
-    df = df.orderBy(df.date_time.asc())
-    win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
+    reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
+
+    
+    df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
+    df = df.drop("reverted_id")    
+    del(reverteds_df_explode)
+
+    reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
+    df = df.join(reverteds_df, on= ["revid"], how='left_outer')
+
+    del(reverteds_df)
 
     # count reverts
-    reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert'])
-    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))
-    df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer')
+    reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
+    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
+
+    # articles total reverts
+    reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
+
+    # some kind of bad work around a bug
+    # see https://issues.apache.org/jira/browse/SPARK-14948
+    reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_r
+evert')
+    df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
+    df = df.drop("r_revid")
     del(reverts_df)
 
     # count edits
     df = df.withColumn('year', f.year(df.date_time))
     df = df.withColumn('month',f.month(df.date_time))
-    df = df.withColumn('editor_nth_edit',f.rank().over(win))
+
+    if not read_collapse:
+        df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
+        df = df.withColumn('article_nth_edit', f.rank().over(art_win))
+    else:
+        df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
+        df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
+        df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
+        df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
+
+    # total editor's token_revs
+    if read_persistence:
+        df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
+        df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
+        df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
+        df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
+        df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
+        df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
+        df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
     
     # output
     if not path.exists(args.output_dir):

Community Data Science Collective || Want to submit a patch?