from pyspark.sql import functions as f from pyspark.sql import SparkSession from pyspark.sql import Window from pyspark.sql.types import FloatType import zlib def zlib_entropy_rate(s): sb = s.encode() if len(sb) == 0: return None else: return len(zlib.compress(s.encode(),level=6))/len(s.encode()) zlib_entropy_rate_udf = f.udf(zlib_entropy_rate,FloatType()) spark = SparkSession.builder.getOrCreate() df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet",compression='snappy') df = df.withColumn("saidbot",f.lower(f.col("body")).like("%bot%")) # df = df.filter(df.subreddit=='seattle') # df = df.cache() botreplies = df.filter(f.lower(df.body).rlike(".*[good|bad] bot.*")) botreplies = botreplies.select([f.col("parent_id").substr(4,100).alias("bot_comment_id"),f.lower(f.col("body")).alias("good_bad_bot"),f.col("link_id").alias("gbbb_link_id")]) botreplies = botreplies.groupby(['bot_comment_id']).agg(f.count('good_bad_bot').alias("N_goodbad_votes"), f.sum((f.lower(f.col('good_bad_bot')).like('%good bot%').astype("double"))).alias("n_good_votes"), f.sum((f.lower(f.col('good_bad_bot')).like('%bad bot%').astype("double"))).alias("n_bad_votes")) comments_by_author = df.select(['author','id','saidbot']).groupBy('author').agg(f.count('id').alias("N_comments"), f.mean(f.col('saidbot').astype("double")).alias("prop_saidbot"), f.sum(f.col('saidbot').astype("double")).alias("n_saidbot")) # pd_comments_by_author = comments_by_author.toPandas() # pd_comments_by_author['frac'] = 500 / pd_comments_by_author['N_comments'] # pd_comments_by_author.loc[pd_comments_by_author.frac > 1, 'frac'] = 1 # fractions = pd_comments_by_author.loc[:,['author','frac']] # fractions = fractions.set_index('author').to_dict()['frac'] # sampled_author_comments = df.sampleBy("author",fractions).groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments')) df = df.withColumn("randn",f.randn(seed=1968)) win = Window.partitionBy("author").orderBy("randn") df = df.withColumn("randRank",f.rank().over(win)) sampled_author_comments = df.filter(f.col("randRank") <= 1000) sampled_author_comments = sampled_author_comments.groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments')) author_entropy_rates = sampled_author_comments.select(['author',zlib_entropy_rate_udf(f.col('comments')).alias("entropy_rate")]) parents = df.join(botreplies, on=df.id==botreplies.bot_comment_id,how='right_outer') win1 = Window.partitionBy("author") parents = parents.withColumn("first_bot_reply",f.min(f.col("CreatedAt")).over(win1)) first_bot_reply = parents.filter(f.col("first_bot_reply")==f.col("CreatedAt")) first_bot_reply = first_bot_reply.withColumnRenamed("CreatedAt","FB_CreatedAt") first_bot_reply = first_bot_reply.withColumnRenamed("id","FB_id") comments_since_first_bot_reply = df.join(first_bot_reply,on = 'author',how='right_outer').filter(f.col("CreatedAt")>=f.col("first_bot_reply")) comments_since_first_bot_reply = comments_since_first_bot_reply.groupBy("author").agg(f.count("id").alias("N_comments_since_firstbot")) bots = parents.groupby(['author']).agg(f.sum('N_goodbad_votes').alias("N_goodbad_votes"), f.sum(f.col('n_good_votes')).alias("n_good_votes"), f.sum(f.col('n_bad_votes')).alias("n_bad_votes"), f.count(f.col('author')).alias("N_bot_posts")) bots = bots.join(comments_by_author,on="author",how='left_outer') bots = bots.join(comments_since_first_bot_reply,on="author",how='left_outer') bots = bots.join(author_entropy_rates,on='author',how='left_outer') bots = bots.orderBy("N_goodbad_votes",ascending=False) bots = bots.repartition(1) bots.write.parquet("/gscratch/comdata/output/reddit_good_bad_bot.parquet",mode='overwrite')