X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/db5879d6c92a826c65b86a68675c503b20914cf8..a60747292e91a47d122158659182f82bfd2e922a:/author_cosine_similarity.py diff --git a/author_cosine_similarity.py b/author_cosine_similarity.py index 08001c2..7ae708b 100644 --- a/author_cosine_similarity.py +++ b/author_cosine_similarity.py @@ -7,7 +7,7 @@ import pandas as pd import fire from itertools import islice from pathlib import Path -from similarities_helper import cosine_similarities +from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities spark = SparkSession.builder.getOrCreate() conf = spark.sparkContext.getConf() @@ -31,49 +31,89 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get where to output csv and feather outputs ''' + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() print(outfile) tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') if included_subreddits is None: - included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) - included_subreddits = {s.strip('\n') for s in included_subreddits} + rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") + included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) else: included_subreddits = set(open(included_subreddits)) - sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold) + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name,'author') + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) + sims['subreddit'] = subreddit_names.subreddit.values p = Path(outfile) output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - sim_dist = sim_dist.entries.toDF() - sim_dist = sim_dist.repartition(1) - sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') + sims.to_feather(outfile) + tempdir.cleanup() + + # print(outfile) + + # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') + + # if included_subreddits is None: + # included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) + # included_subreddits = {s.strip('\n') for s in included_subreddits} + + # else: + # included_subreddits = set(open(included_subreddits)) + + # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold) + + # p = Path(outfile) + + # output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + # output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + # output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + # sim_dist = sim_dist.entries.toDF() + + # sim_dist = sim_dist.repartition(1) + # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - #instead of toLocalMatrix() why not read as entries and put strait into numpy - sim_entries = pd.read_parquet(output_parquet) + # #instead of toLocalMatrix() why not read as entries and put strait into numpy + # sim_entries = pd.read_parquet(output_parquet) - df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() + # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() - spark.stop() - df['subreddit_id_new'] = df['subreddit_id_new'] - 1 - df = df.sort_values('subreddit_id_new').reset_index(drop=True) - df = df.set_index('subreddit_id_new') - - similarities = sim_entries.join(df, on='i') - similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) - similarities = similarities.join(df, on='j') - similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) - - similarities.to_feather(output_feather) - similarities.to_csv(output_csv) - return similarities + # spark.stop() + # df['subreddit_id_new'] = df['subreddit_id_new'] - 1 + # df = df.sort_values('subreddit_id_new').reset_index(drop=True) + # df = df.set_index('subreddit_id_new') + + # similarities = sim_entries.join(df, on='i') + # similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) + # similarities = similarities.join(df, on='j') + # similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) + + # similarities.to_feather(output_feather) + # similarities.to_csv(output_csv) + # return similarities if __name__ == '__main__': fire.Fire(author_cosine_similarities)