From 36cb0a5546d220bb19c0029eb7d4365059822f84 Mon Sep 17 00:00:00 2001 From: Nate E TeBlunthuis Date: Wed, 24 Mar 2021 16:08:57 -0700 Subject: [PATCH 1/1] add code for pulling activity time series from parquet. --- timeseries/choose_clusters.py | 96 ++++++++++++++++++++++++++++++++ timeseries/cluster_timeseries.py | 37 ++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 timeseries/choose_clusters.py create mode 100644 timeseries/cluster_timeseries.py diff --git a/timeseries/choose_clusters.py b/timeseries/choose_clusters.py new file mode 100644 index 0000000..c801379 --- /dev/null +++ b/timeseries/choose_clusters.py @@ -0,0 +1,96 @@ +from pyarrow import dataset as ds +import numpy as np +import pandas as pd +import plotnine as pn +random = np.random.RandomState(1968) + +def load_densities(term_density_file="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather", + author_density_file="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather"): + + term_density = pd.read_feather(term_density_file) + author_density = pd.read_feather(author_density_file) + + term_density.rename({'overlap_density':'term_density','index':'subreddit'},axis='columns',inplace=True) + author_density.rename({'overlap_density':'author_density','index':'subreddit'},axis='columns',inplace=True) + + density = term_density.merge(author_density,on='subreddit',how='inner') + + return density + +def load_clusters(term_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather", + author_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather"): + term_clusters = pd.read_feather(term_clusters_file) + author_clusters = pd.read_feather(author_clusters_file) + + # rename, join and return + term_clusters.rename({'cluster':'term_cluster'},axis='columns',inplace=True) + author_clusters.rename({'cluster':'author_cluster'},axis='columns',inplace=True) + + clusters = term_clusters.merge(author_clusters,on='subreddit',how='inner') + + return clusters + +if __name__ == '__main__': + + df = load_densities() + cl = load_clusters() + + df['td_rank'] = df.term_density.rank() + df['ad_rank'] = df.author_density.rank() + + df['td_percentile'] = df.td_rank / df.shape[0] + df['ad_percentile'] = df.ad_rank / df.shape[0] + + df = df.merge(cl, on='subreddit',how='inner') + + term_cluster_density = df.groupby('term_cluster').agg({'td_rank':['mean','min','max'], + 'ad_rank':['mean','min','max'], + 'td_percentile':['mean','min','max'], + 'ad_percentile':['mean','min','max'], + 'subreddit':['count']}) + + + author_cluster_density = df.groupby('author_cluster').agg({'td_rank':['mean','min','max'], + 'ad_rank':['mean','min','max'], + 'td_percentile':['mean','min','max'], + 'ad_percentile':['mean','min','max'], + 'subreddit':['count']}) + + # which clusters have the most term_density? + term_cluster_density.iloc[term_cluster_density.td_rank['mean'].sort_values().index] + + # which clusters have the most author_density? + term_cluster_density.iloc[term_cluster_density.ad_rank['mean'].sort_values(ascending=False).index].loc[term_cluster_density.subreddit['count'] >= 5][0:20] + + high_density_term_clusters = term_cluster_density.loc[(term_cluster_density.td_percentile['mean'] > 0.75) & (term_cluster_density.subreddit['count'] > 5)] + + # let's just use term density instead of author density for now. We can do a second batch with author density next. + chosen_clusters = high_density_term_clusters.sample(3,random_state=random) + + cluster_info = df.loc[df.term_cluster.isin(chosen_clusters.index.values)] + + chosen_subreddits = cluster_info.subreddit.values + + dataset = ds.dataset("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",format='parquet') + comments = dataset.to_table(filter=ds.field("subreddit").isin(chosen_subreddits),columns=['id','subreddit','author','CreatedAt']) + + comments = comments.to_pandas() + + comments['week'] = comments.CreatedAt.dt.date - pd.to_timedelta(comments['CreatedAt'].dt.dayofweek, unit='d') + + author_timeseries = comments.loc[:,['subreddit','author','week']].drop_duplicates().groupby(['subreddit','week']).count().reset_index() + + for clid in chosen_clusters.index.values: + + ts = pd.read_feather(f"data/ts_term_cluster_{clid}.feather") + + pn.options.figure_size = (11.7,8.27) + p = pn.ggplot(ts) + p = p + pn.geom_line(pn.aes('week','value',group='subreddit')) + p = p + pn.facet_wrap('~ subreddit') + p.save(f"plots/ts_term_cluster_{clid}.png") + + + fig, ax = pyplot.subplots(figsize=(11.7,8.27)) + g = sns.FacetGrid(ts,row='subreddit') + g.map_dataframe(sns.scatterplot,'week','value',data=ts,ax=ax) diff --git a/timeseries/cluster_timeseries.py b/timeseries/cluster_timeseries.py new file mode 100644 index 0000000..07507d7 --- /dev/null +++ b/timeseries/cluster_timeseries.py @@ -0,0 +1,37 @@ +import pandas as pd +import numpy as np +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from choose_clusters import load_clusters, load_densities +import fire +from pathlib import Path + +def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather", + author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather", + term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather", + author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", + output="data/subreddit_timeseries.parquet"): + + + clusters = load_clusters(term_clusters_path, author_clusters_path) + densities = load_densities(term_densities_path, author_densities_path) + + spark = SparkSession.builder.getOrCreate() + + df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet") + + df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt"))) + + # time of unique authors by series by week + ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count() + + ts = ts.repartition('subreddit') + spk_clusters = spark.createDataFrame(clusters) + + ts = ts.join(spk_clusters, on='subreddit', how='inner') + spk_densities = spark.createDataFrame(densities) + ts = ts.join(spk_densities, on='subreddit', how='inner') + ts.write.parquet(output, mode='overwrite') + +if __name__ == "__main__": + fire.Fire(main) -- 2.39.5