X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/8b8c45ee2da18a0c7139e7ff88991aa2a1fff12f..a60747292e91a47d122158659182f82bfd2e922a:/term_cosine_similarity.py?ds=sidebyside diff --git a/term_cosine_similarity.py b/term_cosine_similarity.py index ba6d2c9..dd92b2c 100644 --- a/term_cosine_similarity.py +++ b/term_cosine_similarity.py @@ -8,98 +8,120 @@ import pandas as pd import fire from itertools import islice from pathlib import Path - -min_df = 1000 - -spark = SparkSession.builder.getOrCreate() -conf = spark.sparkContext.getConf() - -# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; -def spark_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0): - ''' - Compute similarities between subreddits based on tfi-idf vectors of comment texts - - included_subreddits : string - Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits - - similarity_threshold : double (default = 0) - set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm -https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm. - - min_df : int (default = 0.1 * (number of included_subreddits) - exclude terms that appear in fewer than this number of documents. - - outfile: string - where to output csv and feather outputs -''' +from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities +import scipy +# outfile='test_similarities_500.feather'; +# min_df = None; +# included_subreddits=None; topN=100; exclude_phrases=True; + +def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + print(exclude_phrases) tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') if included_subreddits is None: - included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),500)) - included_subreddits = [s.strip('\n') for s in included_subreddits] + rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") + included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) else: included_subreddits = set(open(included_subreddits)) - if min_df is None: - min_df = 0.1 * len(included_subreddits) - - tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) + if exclude_phrases == True: + tfidf = tfidf.filter(~f.col(term).contains("_")) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name,'term') + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) + sims['subreddit'] = subreddit_names.subreddit.values - # reset the subreddit ids - sub_ids = tfidf.select('subreddit_id').distinct() - sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id"))) - tfidf = tfidf.join(sub_ids,'subreddit_id') + p = Path(outfile) - # only use terms in at least min_df included subreddits - new_count = tfidf.groupBy('term_id').agg(f.count('term_id').alias('new_count')) - term_ids = term_ids.join(new_count,'term_id') - term_ids = term_ids.filter(new_count >= min_df) + output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - # reset the term ids - term_ids = tfidf.select('term_id').distinct() - term_ids = term_ids.withColumn("term_id_new",f.row_number().over(Window.orderBy("term_id"))) - tfidf = tfidf.join(term_ids,'term_id') + sims.to_feather(outfile) + tempdir.cleanup() + path = "term_tfidf_entriesaukjy5gv.parquet" + - # step 1 make an rdd of entires - # sorted by (dense) spark subreddit id - entries = tfidf.select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd +# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; +# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True): +# ''' +# Compute similarities between subreddits based on tfi-idf vectors of comment texts + +# included_subreddits : string +# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits - # step 2 make it into a distributed.RowMatrix - coordMat = CoordinateMatrix(entries) +# similarity_threshold : double (default = 0) +# set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm +# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm. - # this needs to be an IndexedRowMatrix() - mat = coordMat.toRowMatrix() +# min_df : int (default = 0.1 * (number of included_subreddits) +# exclude terms that appear in fewer than this number of documents. - #goal: build a matrix of subreddit columns and tf-idfs rows - sim_dist = mat.columnSimilarities(threshold=similarity_threshold) +# outfile: string +# where to output csv and feather outputs +# ''' - print(sim_dist.numRows(), sim_dist.numCols()) +# print(outfile) +# print(exclude_phrases) - #instead of toLocalMatrix() why not read as entries and put strait into numpy - sim_entries = sim_dist.entries.collect() +# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') - sim_entries = pd.DataFrame([{'i':me.i,'j':me.j,'value':me.value} for me in sim_entries]) +# if included_subreddits is None: +# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) +# included_subreddits = {s.strip('\n') for s in included_subreddits} - df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() +# else: +# included_subreddits = set(open(included_subreddits)) - df = df.sort_values('subreddit_id_new').reset_index(drop=True) +# if exclude_phrases == True: +# tfidf = tfidf.filter(~f.col(term).contains("_")) - df = df.set_index('subreddit_id_new') +# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold) - similarities = sim_entries.join(df, on='i') - similarities = sim_entries.rename(columns={'subreddit':"subreddit_i"}) - similarities = sim_entries.join(df, on='j') - similarities = sim_entries.rename(columns={'subreddit':"subreddit_j"}) +# p = Path(outfile) - p = Path(outfile) - output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) - output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) +# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) +# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) +# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - pyarrow.write_feather(similarities,output_feather) - pyarrow.write_csv(similarities,output_csv) - return similarities +# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') + +# #instead of toLocalMatrix() why not read as entries and put strait into numpy +# sim_entries = pd.read_parquet(output_parquet) + +# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() +# spark.stop() +# df['subreddit_id_new'] = df['subreddit_id_new'] - 1 +# df = df.sort_values('subreddit_id_new').reset_index(drop=True) +# df = df.set_index('subreddit_id_new') + +# similarities = sim_entries.join(df, on='i') +# similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) +# similarities = similarities.join(df, on='j') +# similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) + +# similarities.to_feather(output_feather) +# similarities.to_csv(output_csv) +# return similarities if __name__ == '__main__': - fire.Fire(spark_similarities) + fire.Fire(term_cosine_similarities)