import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
-from choose_clusters import load_clusters, load_densities
+from .choose_clusters import load_clusters, load_densities
import fire
from pathlib import Path
-def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
+def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"):
-
- clusters = load_clusters(term_clusters_path, author_clusters_path)
- densities = load_densities(term_densities_path, author_densities_path)
-
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit')
- spk_clusters = spark.createDataFrame(clusters)
+
+ if term_densities_path is not None and author_densities_path is not None:
+ densities = load_densities(term_densities_path, author_densities_path)
+ spk_densities = spark.createDataFrame(densities)
+ ts = ts.join(spk_densities, on='subreddit', how='inner')
+ clusters = load_clusters(term_clusters_path, author_clusters_path)
+ spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
- spk_densities = spark.createDataFrame(densities)
- ts = ts.join(spk_densities, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":
- fire.Fire(main)
+ fire.Fire(build_cluster_timeseries)