]> code.communitydata.science - cdsc_reddit.git/blob - similarities/top_subreddits_by_comments.py
make pass keyword arg to dataframe.drop
[cdsc_reddit.git] / similarities / top_subreddits_by_comments.py
1 from pyspark.sql import functions as f
2 from pyspark.sql import SparkSession
3 from pyspark.sql import Window
4 from datetime import datetime
5 from pathlib import Path
6
7 spark = SparkSession.builder.getOrCreate()
8 conf = spark.sparkContext.getConf()
9
10 submissions = spark.read.parquet("../../data/reddit_submissions_by_subreddit.parquet")
11
12 submissions = submissions.filter(f.col("CreatedAt") <= datetime(2020,4,13))
13
14 prop_nsfw = submissions.select(['subreddit','over_18']).groupby('subreddit').agg(f.mean(f.col('over_18').astype('double')).alias('prop_nsfw'))
15
16 df = spark.read.parquet("../../data/reddit_comments_by_subreddit.parquet")
17 df = df.filter(f.col("CreatedAt") <= datetime(2020,4,13))
18 # remove /u/ pages
19 df = df.filter(~df.subreddit.like("u_%"))
20
21 df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
22
23 df = df.join(prop_nsfw,on='subreddit')
24 df = df.filter(df.prop_nsfw < 0.5)
25
26 win = Window.orderBy(f.col('n_comments').desc())
27 df = df.withColumn('comments_rank', f.rank().over(win))
28
29 df = df.toPandas()
30
31 df = df.sort_values("n_comments")
32
33 outpath = Path("../../data/reddit_similarity/subreddits_by_num_comments_nonsfw.csv")
34 outpath.parent.mkdir(exist_ok=True, parents=True)
35 df.to_csv(str(outpath), index=False)

Community Data Science Collective || Want to submit a patch?