X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/06430903f058bd9c308147dd5c256067dde5def1..36cb0a5546d220bb19c0029eb7d4365059822f84:/timeseries/cluster_timeseries.py diff --git a/timeseries/cluster_timeseries.py b/timeseries/cluster_timeseries.py new file mode 100644 index 0000000..07507d7 --- /dev/null +++ b/timeseries/cluster_timeseries.py @@ -0,0 +1,37 @@ +import pandas as pd +import numpy as np +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from choose_clusters import load_clusters, load_densities +import fire +from pathlib import Path + +def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather", + author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather", + term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather", + author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", + output="data/subreddit_timeseries.parquet"): + + + clusters = load_clusters(term_clusters_path, author_clusters_path) + densities = load_densities(term_densities_path, author_densities_path) + + spark = SparkSession.builder.getOrCreate() + + df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet") + + df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt"))) + + # time of unique authors by series by week + ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count() + + ts = ts.repartition('subreddit') + spk_clusters = spark.createDataFrame(clusters) + + ts = ts.join(spk_clusters, on='subreddit', how='inner') + spk_densities = spark.createDataFrame(densities) + ts = ts.join(spk_densities, on='subreddit', how='inner') + ts.write.parquet(output, mode='overwrite') + +if __name__ == "__main__": + fire.Fire(main)