]> code.communitydata.science - cdsc_reddit.git/blob - timeseries/cluster_timeseries.py
commit changes from smap project.
[cdsc_reddit.git] / timeseries / cluster_timeseries.py
1 import pandas as pd
2 import numpy as np
3 from pyspark.sql import functions as f
4 from pyspark.sql import SparkSession
5 from .choose_clusters import load_clusters, load_densities
6 import fire
7 from pathlib import Path
8
9 def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
10          author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
11          term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
12          author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
13          output="data/subreddit_timeseries.parquet"):
14
15     spark = SparkSession.builder.getOrCreate()
16     
17     df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
18     
19     df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt")))
20     
21     # time of unique authors by series by week
22     ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
23     
24     ts = ts.repartition('subreddit')
25
26     if term_densities_path is not None and author_densities_path is not None:
27         densities = load_densities(term_densities_path, author_densities_path)
28         spk_densities = spark.createDataFrame(densities)
29         ts = ts.join(spk_densities, on='subreddit', how='inner')
30     
31     clusters = load_clusters(term_clusters_path, author_clusters_path)
32     spk_clusters = spark.createDataFrame(clusters)
33     ts = ts.join(spk_clusters, on='subreddit', how='inner')
34     ts.write.parquet(output, mode='overwrite')
35
36 if __name__ == "__main__":
37     fire.Fire(build_cluster_timeseries)

Community Data Science Collective || Want to submit a patch?