3 from pyspark.sql import functions as f
 
   4 from pyspark.sql import SparkSession
 
   5 from choose_clusters import load_clusters, load_densities
 
   7 from pathlib import Path
 
   9 def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
 
  10          author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
 
  11          term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
 
  12          author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
 
  13          output="data/subreddit_timeseries.parquet"):
 
  16     clusters = load_clusters(term_clusters_path, author_clusters_path)
 
  17     densities = load_densities(term_densities_path, author_densities_path)
 
  19     spark = SparkSession.builder.getOrCreate()
 
  21     df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
 
  23     df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt")))
 
  25     # time of unique authors by series by week
 
  26     ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
 
  28     ts = ts.repartition('subreddit')
 
  29     spk_clusters = spark.createDataFrame(clusters)
 
  31     ts = ts.join(spk_clusters, on='subreddit', how='inner')
 
  32     spk_densities = spark.createDataFrame(densities)
 
  33     ts = ts.join(spk_densities, on='subreddit', how='inner')
 
  34     ts.write.parquet(output, mode='overwrite')
 
  36 if __name__ == "__main__":