From: Nate E TeBlunthuis Date: Sun, 13 Dec 2020 04:42:47 +0000 (-0800) Subject: Some improvements to run affinity clustering on larger dataset and X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/56269deee3d33620550d67bdd3c1a7b64eb3f7e4?ds=sidebyside;hp=e6294b5b90135a5163441c8dc62252dd6a188412 Some improvements to run affinity clustering on larger dataset and compute density. --- diff --git a/clustering/Makefile b/clustering/Makefile new file mode 100644 index 0000000..c97cb0d --- /dev/null +++ b/clustering/Makefile @@ -0,0 +1,4 @@ +srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28' +affinity/subreddit_comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet +# $srun_cdsc python3 + clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather affinity/subreddit_comment_authors_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85 diff --git a/clustering/clustering.py b/clustering/clustering.py index 38af31c..e652304 100644 --- a/clustering/clustering.py +++ b/clustering/clustering.py @@ -1,12 +1,15 @@ +#!/usr/bin/env python3 + import pandas as pd import numpy as np from sklearn.cluster import AffinityPropagation import fire -def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968): +def affinity_clustering(similarities, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): ''' similarities: feather file with a dataframe of similarity scores preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits. + damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. ''' df = pd.read_feather(similarities) @@ -16,6 +19,8 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv preference = np.quantile(mat,preference_quantile) + print(f"preference is {preference}") + print("data loaded") clustering = AffinityPropagation(damping=damping, @@ -24,6 +29,7 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv copy=False, preference=preference, affinity='precomputed', + verbose=verbose, random_state=random_state).fit(mat) diff --git a/density/Makefile b/density/Makefile new file mode 100644 index 0000000..43075a4 --- /dev/null +++ b/density/Makefile @@ -0,0 +1,7 @@ +all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscratch/comdata/output/reddit_density/comment_authors_10000.feather + +/gscratch/comdata/output/reddit_density/comment_terms_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather + python3 overlap_density.py terms --inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather" --agg=pd.DataFrame.sum + +/gscratch/comdata/output/reddit_density/comment_authors_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather + python3 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather" --agg=pd.DataFrame.sum diff --git a/density/overlap_density.py b/density/overlap_density.py new file mode 100644 index 0000000..2bddb8b --- /dev/null +++ b/density/overlap_density.py @@ -0,0 +1,57 @@ +import pandas as pd +from pandas.core.groupby import DataFrameGroupBy as GroupBy +import fire +import numpy as np + +def overlap_density(inpath, outpath, agg = pd.DataFrame.sum): + df = pd.read_feather(inpath) + df = df.drop('subreddit',1) + np.fill_diagonal(df.values,0) + df = agg(df, 0).reset_index() + df = df.rename({0:'overlap_density'},axis='columns') + df.to_feather(outpath) + return df + +def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum): + df = pd.read_parquet(inpath) + # exclude the diagonal + df = df.loc[df.subreddit != df.variable] + res = agg(df.groupby(['subreddit','week'])).reset_index() + res.to_feather(outpath) + return res + +def author_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather", + outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", agg=pd.DataFrame.sum): + if type(agg) == str: + agg = eval(agg) + + overlap_density(inpath, outpath, agg) + +def term_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather", + outpath="/gscratch/comdata/output/reddit_density/comment_term_similarity_10000.feather", agg=pd.DataFrame.sum): + + if type(agg) == str: + agg = eval(agg) + + overlap_density(inpath, outpath, agg) + +def author_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/subreddit_authors_10000_weekly.parquet", + outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000_weekly.feather", agg=GroupBy.sum): + if type(agg) == str: + agg = eval(agg) + + overlap_density_weekly(inpath, outpath, agg) + +def term_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet", + outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000_weekly.parquet", agg=GroupBy.sum): + if type(agg) == str: + agg = eval(agg) + + overlap_density_weekly(inpath, outpath, agg) + + +if __name__ == "__main__": + fire.Fire({'authors':author_overlap_density, + 'terms':term_overlap_density, + 'author_weekly':author_overlap_density_weekly, + 'term_weekly':term_overlap_density_weekly}) diff --git a/old/.#tfidf_authors.py b/old/.#tfidf_authors.py deleted file mode 120000 index 8972575..0000000 --- a/old/.#tfidf_authors.py +++ /dev/null @@ -1 +0,0 @@ -nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/old/.#tfidf_comments_weekly.py b/old/.#tfidf_comments_weekly.py deleted file mode 120000 index 8972575..0000000 --- a/old/.#tfidf_comments_weekly.py +++ /dev/null @@ -1 +0,0 @@ -nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/similarities/#cosine_similarities.py# b/similarities/#cosine_similarities.py# deleted file mode 100644 index ae080d5..0000000 --- a/similarities/#cosine_similarities.py# +++ /dev/null @@ -1,73 +0,0 @@ -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -import pandas as pd -import fire -from pathlib import Path -from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits - - -def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): - spark = SparkSession.builder.getOrCreate() - conf = spark.sparkContext.getConf() - print(outfile) - print(exclude_phrases) - - tfidf = spark.read.parquet(infile) - - if included_subreddits is None: - included_subreddits = select_topN_subreddits(topN) - else: - included_subreddits = set(open(included_subreddits)) - - if exclude_phrases == True: - tfidf = tfidf.filter(~f.col(term_colname).contains("_")) - - print("creating temporary parquet with matrix indicies") - tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits) - tfidf = spark.read.parquet(tempdir.name) - subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 - spark.stop() - - print("loading matrix") - mat = read_tfidf_matrix(tempdir.name, term_colname) - print('computing similarities') - sims = column_similarities(mat) - del mat - - sims = pd.DataFrame(sims.todense()) - sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) - sims['subreddit'] = subreddit_names.subreddit.values - - p = Path(outfile) - - output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) - output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) - output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - - sims.to_feather(outfile) - tempdir.cleanup() - -def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): - return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', - 'term', - outfile, - min_df, - included_subreddits, - topN, - exclude_phrases) - -def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000): - return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', - 'author', - outfile, - min_df, - included_subreddits, - topN, - exclude_phrases=False) - -if __name__ == "__main__": - fire.Fire({'term':term_cosine_similarities, - 'author':author_cosine_similarities}) - diff --git a/similarities/.#cosine_similarities.py b/similarities/.#cosine_similarities.py deleted file mode 120000 index 8972575..0000000 --- a/similarities/.#cosine_similarities.py +++ /dev/null @@ -1 +0,0 @@ -nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/similarities/.#tfidf_weekly.py b/similarities/.#tfidf_weekly.py deleted file mode 120000 index 8972575..0000000 --- a/similarities/.#tfidf_weekly.py +++ /dev/null @@ -1 +0,0 @@ -nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/similarities/Makefile b/similarities/Makefile index 89a908f..d5187c9 100644 --- a/similarities/Makefile +++ b/similarities/Makefile @@ -1,2 +1,5 @@ /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet - start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet + start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather + +/gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet + start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet diff --git a/similarities/cosine_similarities.py b/similarities/cosine_similarities.py index ae080d5..54b9599 100644 --- a/similarities/cosine_similarities.py +++ b/similarities/cosine_similarities.py @@ -3,7 +3,7 @@ from pyspark.sql import SparkSession import pandas as pd import fire from pathlib import Path -from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits +from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits, column_similarities def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): diff --git a/similarities/job_script.sh b/similarities/job_script.sh index 1c2cacf..e1031ce 100755 --- a/similarities/job_script.sh +++ b/similarities/job_script.sh @@ -1,4 +1,4 @@ #!/usr/bin/bash start_spark_cluster.sh -spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet +spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet stop-all.sh diff --git a/similarities/tfidf.py b/similarities/tfidf.py index 5b1c0c9..b7b4e63 100644 --- a/similarities/tfidf.py +++ b/similarities/tfidf.py @@ -45,7 +45,7 @@ def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/commen [] ) -def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', +def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', topN=25000): return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", @@ -55,7 +55,7 @@ def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfi ['[deleted]','AutoModerator'] ) -def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', +def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', topN=25000): return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index 2b3c90b..54856b0 100644 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -35,7 +35,7 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 spark.stop() - weeks = list(subreddit_names.week.drop_duplicates()) +d weeks = sorted(list(subreddit_names.week.drop_duplicates())) for week in weeks: print(f"loading matrix: {week}") mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week) diff --git a/visualization/Makefile b/visualization/Makefile new file mode 100644 index 0000000..e69de29