1 from pyspark.sql import functions as f
 
   2 from pyspark.sql import SparkSession
 
   3 from pyspark.sql import Window
 
   4 from pyspark.sql.types import FloatType
 
   7 def zlib_entropy_rate(s):
 
  12         return len(zlib.compress(s.encode(),level=6))/len(s.encode())
 
  14 zlib_entropy_rate_udf = f.udf(zlib_entropy_rate,FloatType())
 
  16 spark = SparkSession.builder.getOrCreate()
 
  18 df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet",compression='snappy')
 
  20 df = df.withColumn("saidbot",f.lower(f.col("body")).like("%bot%"))
 
  22 # df = df.filter(df.subreddit=='seattle')
 
  24 botreplies = df.filter(f.lower(df.body).rlike(".*[good|bad] bot.*"))
 
  25 botreplies = botreplies.select([f.col("parent_id").substr(4,100).alias("bot_comment_id"),f.lower(f.col("body")).alias("good_bad_bot"),f.col("link_id").alias("gbbb_link_id")])
 
  26 botreplies = botreplies.groupby(['bot_comment_id']).agg(f.count('good_bad_bot').alias("N_goodbad_votes"),
 
  27                                                         f.sum((f.lower(f.col('good_bad_bot')).like('%good bot%').astype("double"))).alias("n_good_votes"),
 
  28                                                         f.sum((f.lower(f.col('good_bad_bot')).like('%bad bot%').astype("double"))).alias("n_bad_votes"))
 
  30 comments_by_author = df.select(['author','id','saidbot']).groupBy('author').agg(f.count('id').alias("N_comments"),
 
  31                                                                                 f.mean(f.col('saidbot').astype("double")).alias("prop_saidbot"),
 
  32                                                                                 f.sum(f.col('saidbot').astype("double")).alias("n_saidbot"))
 
  34 # pd_comments_by_author = comments_by_author.toPandas()
 
  35 # pd_comments_by_author['frac'] = 500 / pd_comments_by_author['N_comments']
 
  36 # pd_comments_by_author.loc[pd_comments_by_author.frac > 1, 'frac'] = 1
 
  37 # fractions = pd_comments_by_author.loc[:,['author','frac']]
 
  38 # fractions = fractions.set_index('author').to_dict()['frac']
 
  40 # sampled_author_comments = df.sampleBy("author",fractions).groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
 
  41 df = df.withColumn("randn",f.randn(seed=1968))
 
  43 win = Window.partitionBy("author").orderBy("randn")
 
  45 df = df.withColumn("randRank",f.rank().over(win))
 
  46 sampled_author_comments = df.filter(f.col("randRank") <= 1000)
 
  47 sampled_author_comments = sampled_author_comments.groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
 
  49 author_entropy_rates = sampled_author_comments.select(['author',zlib_entropy_rate_udf(f.col('comments')).alias("entropy_rate")])
 
  51 parents = df.join(botreplies, on=df.id==botreplies.bot_comment_id,how='right_outer')
 
  53 win1 = Window.partitionBy("author")
 
  54 parents = parents.withColumn("first_bot_reply",f.min(f.col("CreatedAt")).over(win1))
 
  56 first_bot_reply = parents.filter(f.col("first_bot_reply")==f.col("CreatedAt"))
 
  57 first_bot_reply = first_bot_reply.withColumnRenamed("CreatedAt","FB_CreatedAt")
 
  58 first_bot_reply = first_bot_reply.withColumnRenamed("id","FB_id")
 
  60 comments_since_first_bot_reply = df.join(first_bot_reply,on = 'author',how='right_outer').filter(f.col("CreatedAt")>=f.col("first_bot_reply"))
 
  61 comments_since_first_bot_reply = comments_since_first_bot_reply.groupBy("author").agg(f.count("id").alias("N_comments_since_firstbot"))
 
  63 bots = parents.groupby(['author']).agg(f.sum('N_goodbad_votes').alias("N_goodbad_votes"),
 
  64                                           f.sum(f.col('n_good_votes')).alias("n_good_votes"),
 
  65                                           f.sum(f.col('n_bad_votes')).alias("n_bad_votes"),
 
  66                                           f.count(f.col('author')).alias("N_bot_posts"))
 
  68 bots = bots.join(comments_by_author,on="author",how='left_outer')
 
  69 bots = bots.join(comments_since_first_bot_reply,on="author",how='left_outer')
 
  70 bots = bots.join(author_entropy_rates,on='author',how='left_outer')
 
  72 bots = bots.orderBy("N_goodbad_votes",ascending=False)
 
  73 bots = bots.repartition(1)
 
  74 bots.write.parquet("/gscratch/comdata/output/reddit_good_bad_bot.parquet",mode='overwrite')