From: Nate E TeBlunthuis Date: Wed, 2 Dec 2020 06:54:48 +0000 (-0800) Subject: Add code for running tf-idf at the weekly level. X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/a60747292e91a47d122158659182f82bfd2e922a?ds=sidebyside Add code for running tf-idf at the weekly level. --- diff --git a/author_cosine_similarity.py b/author_cosine_similarity.py index 08001c2..7ae708b 100644 --- a/author_cosine_similarity.py +++ b/author_cosine_similarity.py @@ -7,7 +7,7 @@ import pandas as pd import fire from itertools import islice from pathlib import Path -from similarities_helper import cosine_similarities +from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities spark = SparkSession.builder.getOrCreate() conf = spark.sparkContext.getConf() @@ -31,49 +31,89 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get where to output csv and feather outputs ''' + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() print(outfile) tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') if included_subreddits is None: - included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) - included_subreddits = {s.strip('\n') for s in included_subreddits} + rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") + included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) else: included_subreddits = set(open(included_subreddits)) - sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold) + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name,'author') + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) + sims['subreddit'] = subreddit_names.subreddit.values p = Path(outfile) output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - sim_dist = sim_dist.entries.toDF() - sim_dist = sim_dist.repartition(1) - sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') + sims.to_feather(outfile) + tempdir.cleanup() + + # print(outfile) + + # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') + + # if included_subreddits is None: + # included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) + # included_subreddits = {s.strip('\n') for s in included_subreddits} + + # else: + # included_subreddits = set(open(included_subreddits)) + + # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold) + + # p = Path(outfile) + + # output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + # output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + # output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + # sim_dist = sim_dist.entries.toDF() + + # sim_dist = sim_dist.repartition(1) + # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - #instead of toLocalMatrix() why not read as entries and put strait into numpy - sim_entries = pd.read_parquet(output_parquet) + # #instead of toLocalMatrix() why not read as entries and put strait into numpy + # sim_entries = pd.read_parquet(output_parquet) - df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() + # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() - spark.stop() - df['subreddit_id_new'] = df['subreddit_id_new'] - 1 - df = df.sort_values('subreddit_id_new').reset_index(drop=True) - df = df.set_index('subreddit_id_new') - - similarities = sim_entries.join(df, on='i') - similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) - similarities = similarities.join(df, on='j') - similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) - - similarities.to_feather(output_feather) - similarities.to_csv(output_csv) - return similarities + # spark.stop() + # df['subreddit_id_new'] = df['subreddit_id_new'] - 1 + # df = df.sort_values('subreddit_id_new').reset_index(drop=True) + # df = df.set_index('subreddit_id_new') + + # similarities = sim_entries.join(df, on='i') + # similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) + # similarities = similarities.join(df, on='j') + # similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) + + # similarities.to_feather(output_feather) + # similarities.to_csv(output_csv) + # return similarities if __name__ == '__main__': fire.Fire(author_cosine_similarities) diff --git a/similarities_helper.py b/similarities_helper.py index 5933f8e..ef434ac 100644 --- a/similarities_helper.py +++ b/similarities_helper.py @@ -119,6 +119,59 @@ def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, simila return (sim_dist, tfidf) +def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): + term = term_colname + term_id = term + '_id' + + # aggregate counts by week. now subreddit-term is distinct + df = df.filter(df.subreddit.isin(include_subs)) + df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf')) + + max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique + max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') + df = df.join(max_subreddit_terms, on=['subreddit','week']) + df = df.withColumn("relative_tf", df.tf / df.sr_max_tf) + + # group by term. term is unique + idf = df.groupby([term,'week']).count() + + N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week")) + + idf = idf.join(N_docs, on=['week']) + + # add a little smoothing to the idf + idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1) + + # collect the dictionary to make a pydict of terms to indexes + terms = idf.select([term,'week']).distinct() # terms are distinct + + terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct + + # make subreddit ids + subreddits = df.select(['subreddit','week']).distinct() + subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit"))) + + df = df.join(subreddits,on=['subreddit','week']) + + # map terms to indexes in the tfs and the idfs + df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique + + idf = idf.join(terms,on=[term,'week']) + + # join on subreddit/term to create tf/dfs indexed by term + df = df.join(idf, on=[term_id, term,'week']) + + # agg terms by subreddit to make sparse tf/df vectors + + if tf_family == tf_weight.MaxTF: + df = df.withColumn("tf_idf", df.relative_tf * df.idf) + else: # tf_fam = tf_weight.Norm05 + df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf) + + return df + + + def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname diff --git a/tfidf_authors.py b/tfidf_authors.py index 432ec39..6852fe8 100644 --- a/tfidf_authors.py +++ b/tfidf_authors.py @@ -1,12 +1,14 @@ from pyspark.sql import SparkSession from similarities_helper import build_tfidf_dataset +import pandas as pd spark = SparkSession.builder.getOrCreate() df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp") -include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) -include_subs = {s.strip('\n') for s in include_subs} +include_subs = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") + +#include_subs = set(include_subs.loc[include_subs.comments_rank < 300]['subreddit']) # remove [deleted] and AutoModerator (TODO remove other bots) df = df.filter(df.author != '[deleted]')