X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/36cb0a5546d220bb19c0029eb7d4365059822f84..07b0dff9bc0dae2ab6f7fb7334007a5269a512ad:/timeseries/cluster_timeseries.py diff --git a/timeseries/cluster_timeseries.py b/timeseries/cluster_timeseries.py deleted file mode 100644 index 07507d7..0000000 --- a/timeseries/cluster_timeseries.py +++ /dev/null @@ -1,37 +0,0 @@ -import pandas as pd -import numpy as np -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -from choose_clusters import load_clusters, load_densities -import fire -from pathlib import Path - -def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather", - author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather", - term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather", - author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", - output="data/subreddit_timeseries.parquet"): - - - clusters = load_clusters(term_clusters_path, author_clusters_path) - densities = load_densities(term_densities_path, author_densities_path) - - spark = SparkSession.builder.getOrCreate() - - df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet") - - df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt"))) - - # time of unique authors by series by week - ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count() - - ts = ts.repartition('subreddit') - spk_clusters = spark.createDataFrame(clusters) - - ts = ts.join(spk_clusters, on='subreddit', how='inner') - spk_densities = spark.createDataFrame(densities) - ts = ts.join(spk_densities, on='subreddit', how='inner') - ts.write.parquet(output, mode='overwrite') - -if __name__ == "__main__": - fire.Fire(main)