]> code.communitydata.science - cdsc_reddit.git/commitdiff
add code for pulling activity time series from parquet.
authorNate E TeBlunthuis <nathante@n2347.hyak.local>
Wed, 24 Mar 2021 23:08:57 +0000 (16:08 -0700)
committerNate E TeBlunthuis <nathante@n2347.hyak.local>
Wed, 24 Mar 2021 23:08:57 +0000 (16:08 -0700)
timeseries/choose_clusters.py [new file with mode: 0644]
timeseries/cluster_timeseries.py [new file with mode: 0644]

diff --git a/timeseries/choose_clusters.py b/timeseries/choose_clusters.py
new file mode 100644 (file)
index 0000000..c801379
--- /dev/null
@@ -0,0 +1,96 @@
+from pyarrow import dataset as ds
+import numpy as np
+import pandas as pd
+import plotnine as pn
+random = np.random.RandomState(1968)
+
+def load_densities(term_density_file="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
+                   author_density_file="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather"):
+
+    term_density = pd.read_feather(term_density_file)
+    author_density = pd.read_feather(author_density_file)
+
+    term_density.rename({'overlap_density':'term_density','index':'subreddit'},axis='columns',inplace=True)
+    author_density.rename({'overlap_density':'author_density','index':'subreddit'},axis='columns',inplace=True)
+
+    density = term_density.merge(author_density,on='subreddit',how='inner')
+
+    return density
+
+def load_clusters(term_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
+                  author_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather"):
+    term_clusters = pd.read_feather(term_clusters_file)
+    author_clusters = pd.read_feather(author_clusters_file)
+
+    # rename, join and return
+    term_clusters.rename({'cluster':'term_cluster'},axis='columns',inplace=True)
+    author_clusters.rename({'cluster':'author_cluster'},axis='columns',inplace=True)
+
+    clusters = term_clusters.merge(author_clusters,on='subreddit',how='inner')
+
+    return clusters
+
+if __name__ == '__main__':
+
+    df = load_densities()
+    cl = load_clusters()
+
+    df['td_rank'] = df.term_density.rank()
+    df['ad_rank'] = df.author_density.rank()
+
+    df['td_percentile'] = df.td_rank / df.shape[0]
+    df['ad_percentile'] = df.ad_rank / df.shape[0]
+
+    df = df.merge(cl, on='subreddit',how='inner')
+
+    term_cluster_density = df.groupby('term_cluster').agg({'td_rank':['mean','min','max'],
+                                                         'ad_rank':['mean','min','max'],
+                                                         'td_percentile':['mean','min','max'],
+                                                           'ad_percentile':['mean','min','max'],
+                                                           'subreddit':['count']})
+                                                         
+
+    author_cluster_density = df.groupby('author_cluster').agg({'td_rank':['mean','min','max'],
+                                                         'ad_rank':['mean','min','max'],
+                                                         'td_percentile':['mean','min','max'],
+                                                           'ad_percentile':['mean','min','max'],
+                                                           'subreddit':['count']})
+                                                         
+    # which clusters have the most term_density?
+    term_cluster_density.iloc[term_cluster_density.td_rank['mean'].sort_values().index]
+
+    # which clusters have the most author_density?
+    term_cluster_density.iloc[term_cluster_density.ad_rank['mean'].sort_values(ascending=False).index].loc[term_cluster_density.subreddit['count'] >= 5][0:20]
+
+    high_density_term_clusters = term_cluster_density.loc[(term_cluster_density.td_percentile['mean'] > 0.75) & (term_cluster_density.subreddit['count'] > 5)]
+
+    # let's just use term density instead of author density for now. We can do a second batch with author density next.
+    chosen_clusters = high_density_term_clusters.sample(3,random_state=random)
+
+    cluster_info = df.loc[df.term_cluster.isin(chosen_clusters.index.values)]
+
+    chosen_subreddits = cluster_info.subreddit.values
+
+    dataset = ds.dataset("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",format='parquet')
+    comments = dataset.to_table(filter=ds.field("subreddit").isin(chosen_subreddits),columns=['id','subreddit','author','CreatedAt'])
+
+    comments = comments.to_pandas()
+
+    comments['week'] = comments.CreatedAt.dt.date - pd.to_timedelta(comments['CreatedAt'].dt.dayofweek, unit='d')
+
+    author_timeseries = comments.loc[:,['subreddit','author','week']].drop_duplicates().groupby(['subreddit','week']).count().reset_index()
+
+    for clid in chosen_clusters.index.values:
+
+        ts = pd.read_feather(f"data/ts_term_cluster_{clid}.feather")
+
+        pn.options.figure_size = (11.7,8.27)
+        p = pn.ggplot(ts)
+        p = p + pn.geom_line(pn.aes('week','value',group='subreddit'))
+        p = p + pn.facet_wrap('~ subreddit')
+        p.save(f"plots/ts_term_cluster_{clid}.png")
+        
+
+        fig, ax = pyplot.subplots(figsize=(11.7,8.27))
+        g = sns.FacetGrid(ts,row='subreddit')
+        g.map_dataframe(sns.scatterplot,'week','value',data=ts,ax=ax)
diff --git a/timeseries/cluster_timeseries.py b/timeseries/cluster_timeseries.py
new file mode 100644 (file)
index 0000000..07507d7
--- /dev/null
@@ -0,0 +1,37 @@
+import pandas as pd
+import numpy as np
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from choose_clusters import load_clusters, load_densities
+import fire
+from pathlib import Path
+
+def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
+         author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
+         term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
+         author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
+         output="data/subreddit_timeseries.parquet"):
+
+
+    clusters = load_clusters(term_clusters_path, author_clusters_path)
+    densities = load_densities(term_densities_path, author_densities_path)
+    
+    spark = SparkSession.builder.getOrCreate()
+    
+    df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
+    
+    df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt")))
+    
+    # time of unique authors by series by week
+    ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
+    
+    ts = ts.repartition('subreddit')
+    spk_clusters = spark.createDataFrame(clusters)
+    
+    ts = ts.join(spk_clusters, on='subreddit', how='inner')
+    spk_densities = spark.createDataFrame(densities)
+    ts = ts.join(spk_densities, on='subreddit', how='inner')
+    ts.write.parquet(output, mode='overwrite')
+
+if __name__ == "__main__":
+    fire.Fire(main)

Community Data Science Collective || Want to submit a patch?