1 from pyspark.sql import functions as f
2 from pyspark.sql import SparkSession
3 from pyspark.sql import Window
4 from pyspark.sql.types import FloatType
7 def zlib_entropy_rate(s):
12 return len(zlib.compress(s.encode(),level=6))/len(s.encode())
14 zlib_entropy_rate_udf = f.udf(zlib_entropy_rate,FloatType())
16 spark = SparkSession.builder.getOrCreate()
18 df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet",compression='snappy')
20 df = df.withColumn("saidbot",f.lower(f.col("body")).like("%bot%"))
22 # df = df.filter(df.subreddit=='seattle')
24 botreplies = df.filter(f.lower(df.body).rlike(".*[good|bad] bot.*"))
25 botreplies = botreplies.select([f.col("parent_id").substr(4,100).alias("bot_comment_id"),f.lower(f.col("body")).alias("good_bad_bot"),f.col("link_id").alias("gbbb_link_id")])
26 botreplies = botreplies.groupby(['bot_comment_id']).agg(f.count('good_bad_bot').alias("N_goodbad_votes"),
27 f.sum((f.lower(f.col('good_bad_bot')).like('%good bot%').astype("double"))).alias("n_good_votes"),
28 f.sum((f.lower(f.col('good_bad_bot')).like('%bad bot%').astype("double"))).alias("n_bad_votes"))
30 comments_by_author = df.select(['author','id','saidbot']).groupBy('author').agg(f.count('id').alias("N_comments"),
31 f.mean(f.col('saidbot').astype("double")).alias("prop_saidbot"),
32 f.sum(f.col('saidbot').astype("double")).alias("n_saidbot"))
34 # pd_comments_by_author = comments_by_author.toPandas()
35 # pd_comments_by_author['frac'] = 500 / pd_comments_by_author['N_comments']
36 # pd_comments_by_author.loc[pd_comments_by_author.frac > 1, 'frac'] = 1
37 # fractions = pd_comments_by_author.loc[:,['author','frac']]
38 # fractions = fractions.set_index('author').to_dict()['frac']
40 # sampled_author_comments = df.sampleBy("author",fractions).groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
41 df = df.withColumn("randn",f.randn(seed=1968))
43 win = Window.partitionBy("author").orderBy("randn")
45 df = df.withColumn("randRank",f.rank().over(win))
46 sampled_author_comments = df.filter(f.col("randRank") <= 1000)
47 sampled_author_comments = sampled_author_comments.groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
49 author_entropy_rates = sampled_author_comments.select(['author',zlib_entropy_rate_udf(f.col('comments')).alias("entropy_rate")])
51 parents = df.join(botreplies, on=df.id==botreplies.bot_comment_id,how='right_outer')
53 win1 = Window.partitionBy("author")
54 parents = parents.withColumn("first_bot_reply",f.min(f.col("CreatedAt")).over(win1))
56 first_bot_reply = parents.filter(f.col("first_bot_reply")==f.col("CreatedAt"))
57 first_bot_reply = first_bot_reply.withColumnRenamed("CreatedAt","FB_CreatedAt")
58 first_bot_reply = first_bot_reply.withColumnRenamed("id","FB_id")
60 comments_since_first_bot_reply = df.join(first_bot_reply,on = 'author',how='right_outer').filter(f.col("CreatedAt")>=f.col("first_bot_reply"))
61 comments_since_first_bot_reply = comments_since_first_bot_reply.groupBy("author").agg(f.count("id").alias("N_comments_since_firstbot"))
63 bots = parents.groupby(['author']).agg(f.sum('N_goodbad_votes').alias("N_goodbad_votes"),
64 f.sum(f.col('n_good_votes')).alias("n_good_votes"),
65 f.sum(f.col('n_bad_votes')).alias("n_bad_votes"),
66 f.count(f.col('author')).alias("N_bot_posts"))
68 bots = bots.join(comments_by_author,on="author",how='left_outer')
69 bots = bots.join(comments_since_first_bot_reply,on="author",how='left_outer')
70 bots = bots.join(author_entropy_rates,on='author',how='left_outer')
72 bots = bots.orderBy("N_goodbad_votes",ascending=False)
73 bots = bots.repartition(1)
74 bots.write.parquet("/gscratch/comdata/output/reddit_good_bad_bot.parquet",mode='overwrite')