]> code.communitydata.science - cdsc_reddit.git/blobdiff - timeseries/cluster_timeseries.py
Merge remote-tracking branch 'refs/remotes/origin/excise_reindex' into excise_reindex
[cdsc_reddit.git] / timeseries / cluster_timeseries.py
index 91fa705af34f4242d05f8e7d28e4d212e0fc1419..2286ab0cad083307fbe977344f96a35f8b6a1c41 100644 (file)
@@ -12,10 +12,6 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
          author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
          output="data/subreddit_timeseries.parquet"):
 
          author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
          output="data/subreddit_timeseries.parquet"):
 
-
-    clusters = load_clusters(term_clusters_path, author_clusters_path)
-    densities = load_densities(term_densities_path, author_densities_path)
-    
     spark = SparkSession.builder.getOrCreate()
     
     df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
     spark = SparkSession.builder.getOrCreate()
     
     df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
@@ -26,11 +22,15 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
     ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
     
     ts = ts.repartition('subreddit')
     ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
     
     ts = ts.repartition('subreddit')
-    spk_clusters = spark.createDataFrame(clusters)
+
+    if term_densities_path is not None and author_densities_path is not None:
+        densities = load_densities(term_densities_path, author_densities_path)
+        spk_densities = spark.createDataFrame(densities)
+        ts = ts.join(spk_densities, on='subreddit', how='inner')
     
     
+    clusters = load_clusters(term_clusters_path, author_clusters_path)
+    spk_clusters = spark.createDataFrame(clusters)
     ts = ts.join(spk_clusters, on='subreddit', how='inner')
     ts = ts.join(spk_clusters, on='subreddit', how='inner')
-    spk_densities = spark.createDataFrame(densities)
-    ts = ts.join(spk_densities, on='subreddit', how='inner')
     ts.write.parquet(output, mode='overwrite')
 
 if __name__ == "__main__":
     ts.write.parquet(output, mode='overwrite')
 
 if __name__ == "__main__":

Community Data Science Collective || Want to submit a patch?