From 6e43294a41e030e557d7e612f1e6ddb063482689 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Tue, 3 Aug 2021 15:06:48 -0700 Subject: [PATCH] Updates to similarities code for smap project. --- similarities/cosine_similarities.py | 2 +- similarities/lsi_similarities.py | 48 ++++++--- similarities/similarities_helper.py | 118 +++++++++++++-------- similarities/tfidf.py | 21 ++-- similarities/weekly_cosine_similarities.py | 71 ++++++++----- 5 files changed, 170 insertions(+), 90 deletions(-) diff --git a/similarities/cosine_similarities.py b/similarities/cosine_similarities.py index 8b85692..98f1454 100644 --- a/similarities/cosine_similarities.py +++ b/similarities/cosine_similarities.py @@ -6,7 +6,7 @@ from functools import partial def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): - return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) + return similarities(inpath=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) # change so that these take in an input as an optional argument (for speed, but also for idf). def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None): diff --git a/similarities/lsi_similarities.py b/similarities/lsi_similarities.py index 7ab7e8c..eb89f55 100644 --- a/similarities/lsi_similarities.py +++ b/similarities/lsi_similarities.py @@ -1,20 +1,41 @@ import pandas as pd import fire from pathlib import Path -from similarities_helper import similarities, lsi_column_similarities +from similarities_helper import * +#from similarities_helper import similarities, lsi_column_similarities from functools import partial -def lsi_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack'): +inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/" +term_colname='term' +outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI' +n_components=[10,50,100] +included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" +n_iter=5 +random_state=1968 +algorithm='arpack' +topN = None +from_date=None +to_date=None +min_df=None +max_df=None +def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None): print(n_components,flush=True) - simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm) + + if lsi_model is None: + if type(n_components) == list: + lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}_LSIMOD.pkl' + else: + lsi_model = Path(outfile) / f'{n_components}_{term_colname}_LSIMOD.pkl' - return similarities(infile=infile, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) + simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model) + + return similarities(inpath=inpath, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) # change so that these take in an input as an optional argument (for speed, but also for idf). -def term_lsi_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, n_components=300,n_iter=5,random_state=1968,algorithm='arpack'): +def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',outfile=None, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, algorithm='arpack', n_components=300,n_iter=5,random_state=1968): - return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', + res = lsi_similarities(inpath, 'term', outfile, min_df, @@ -23,11 +44,13 @@ def term_lsi_similarities(outfile, min_df=None, max_df=None, included_subreddits topN, from_date, to_date, - n_components=n_components + n_components=n_components, + algorithm = algorithm ) + return res -def author_lsi_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'): - return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', +def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968): + return lsi_similarities(inpath, 'author', outfile, min_df, @@ -39,8 +62,8 @@ def author_lsi_similarities(outfile, min_df=2, max_df=None, included_subreddits= n_components=n_components ) -def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'): - return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', +def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968): + return lsi_similarities(inpath, 'author', outfile, min_df, @@ -50,7 +73,8 @@ def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=N from_date=from_date, to_date=to_date, tfidf_colname='relative_tf', - n_components=n_components + n_components=n_components, + algorithm=algorithm ) diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index e59563e..a4983b3 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -15,24 +15,53 @@ import numpy as np import pathlib from datetime import datetime from pathlib import Path +import pickle class tf_weight(Enum): MaxTF = 1 Norm05 = 2 -infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" -cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" +# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" +# cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" # subreddits missing after this step don't have any terms that have a high enough idf # try rewriting without merges -def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF): - print("loading tfidf", flush=True) - tfidf_ds = ds.dataset(infile) + +# does reindex_tfidf, but without reindexing. +def reindex_tfidf(*args, **kwargs): + df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True) + + print("assigning names") + subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) + batches = subreddit_names.to_batches() + + with Pool(cpu_count()) as pool: + chunks = pool.imap_unordered(pull_names,batches) + subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + subreddit_names = subreddit_names.set_index("subreddit_id") + + new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + new_ids = new_ids.set_index('subreddit_id') + subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + return(df, subreddit_names) + +def pull_tfidf(*args, **kwargs): + df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False) + return df + +def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): + print(f"loading tfidf {infile}", flush=True) + if week is not None: + tfidf_ds = ds.dataset(infile, partitioning='hive') + else: + tfidf_ds = ds.dataset(infile) if included_subreddits is None: included_subreddits = select_topN_subreddits(topN) else: - included_subreddits = set(open(included_subreddits)) + included_subreddits = set(map(str.strip,open(included_subreddits))) ds_filter = ds.field("subreddit").isin(included_subreddits) @@ -68,15 +97,20 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre 'relative_tf':ds.field('relative_tf').cast('float32'), 'tf_idf':ds.field('tf_idf').cast('float32')} - tfidf_ds = ds.dataset(infile) - df = tfidf_ds.to_table(filter=ds_filter,columns=projection) df = df.to_pandas(split_blocks=True,self_destruct=True) print("assigning indexes",flush=True) - df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() - grouped = df.groupby(term_id) - df[term_id_new] = grouped.ngroup() + if reindex: + df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + else: + df['subreddit_id_new'] = df['subreddit_id'] + + if reindex: + grouped = df.groupby(term_id) + df[term_id_new] = grouped.ngroup() + else: + df[term_id_new] = df[term_id] if rescale_idf: print("computing idf", flush=True) @@ -88,26 +122,13 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre else: # tf_fam = tf_weight.Norm05 df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf - print("assigning names") - subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) - batches = subreddit_names.to_batches() + return (df, tfidf_ds, ds_filter) - with Pool(cpu_count()) as pool: - chunks = pool.imap_unordered(pull_names,batches) - subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() - - subreddit_names = subreddit_names.set_index("subreddit_id") - new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() - new_ids = new_ids.set_index('subreddit_id') - subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() - subreddit_names = subreddit_names.drop("subreddit_id",1) - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - return(df, subreddit_names) def pull_names(batch): return(batch.to_pandas().drop_duplicates()) -def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): +def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): ''' tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities. ''' @@ -127,7 +148,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - outfile.parent.mkdir(exist_ok=True, parents=True) + p.parent.mkdir(exist_ok=True, parents=True) sims.to_feather(outfile) @@ -135,7 +156,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non term_id = term + '_id' term_id_new = term + '_id_new' - entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) + entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) print("loading matrix") @@ -144,6 +165,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non print(f'computing similarities on mat. mat.shape:{mat.shape}') print(f"size of mat is:{mat.data.nbytes}",flush=True) + # transform this to debug term tfidf sims = simfunc(mat) del mat @@ -151,7 +173,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non for simmat, name in sims: proc_sims(simmat, Path(outfile)/(str(name) + ".feather")) else: - proc_sims(simmat, outfile) + proc_sims(sims, outfile) def write_weekly_similarities(path, sims, week, names): sims['week'] = week @@ -204,7 +226,7 @@ def test_lsi_sims(): # if n_components is a list we'll return a list of similarities with different latent dimensionalities # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations. # this function takes the svd and then the column similarities of it -def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'): +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None): # first compute the lsi of the matrix # then take the column similarities print("running LSI",flush=True) @@ -215,21 +237,32 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196 n_components = sorted(n_components,reverse=True) svd_components = n_components[0] - svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) - mod = svd.fit(tfidfmat.T) + + if lsi_model_load is not None: + mod = pickle.load(open(lsi_model_load ,'rb')) + + else: + svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) + mod = svd.fit(tfidfmat.T) + lsimat = mod.transform(tfidfmat.T) + if lsi_model_save is not None: + pickle.dump(mod, open(lsi_model_save,'wb')) + + sims_list = [] for n_dims in n_components: sims = column_similarities(lsimat[:,np.arange(n_dims)]) if len(n_components) > 1: yield (sims, n_dims) else: return sims - def column_similarities(mat): return 1 - pairwise_distances(mat,metric='cosine') - +# need to rewrite this so that subreddit ids and term ids are fixed over the whole thing. +# this affords taking the LSI similarities. +# fill all 0s if we don't have it. def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname term_id = term + '_id' @@ -254,20 +287,21 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1) # collect the dictionary to make a pydict of terms to indexes - terms = idf.select([term,'week']).distinct() # terms are distinct + terms = idf.select([term]).distinct() # terms are distinct - terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct + terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct # make subreddit ids - subreddits = df.select(['subreddit','week']).distinct() - subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit"))) + subreddits = df.select(['subreddit']).distinct() + subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) - df = df.join(subreddits,on=['subreddit','week']) + # df = df.cache() + df = df.join(subreddits,on=['subreddit']) # map terms to indexes in the tfs and the idfs - df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique + df = df.join(terms,on=[term]) # subreddit-term-id is unique - idf = idf.join(terms,on=[term,'week']) + idf = idf.join(terms,on=[term]) # join on subreddit/term to create tf/dfs indexed by term df = df.join(idf, on=[term_id, term,'week']) @@ -327,7 +361,7 @@ def _calc_tfidf(df, term_colname, tf_family): return df -def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): +def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname term_id = term + '_id' # aggregate counts by week. now subreddit-term is distinct diff --git a/similarities/tfidf.py b/similarities/tfidf.py index 002e89f..94dcbf5 100644 --- a/similarities/tfidf.py +++ b/similarities/tfidf.py @@ -1,7 +1,7 @@ import fire from pyspark.sql import SparkSession from pyspark.sql import functions as f -from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits +from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits): spark = SparkSession.builder.getOrCreate() @@ -11,7 +11,7 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_ df = df.filter(~ f.col(term_colname).isin(exclude)) if included_subreddits is not None: - include_subs = list(open(included_subreddits)) + include_subs = set(map(str.strip,open(included_subreddits))) else: include_subs = select_topN_subreddits(topN) @@ -21,42 +21,45 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_ spark.stop() def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits): - return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) + return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits): return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', - topN=25000): + topN=None, + included_subreddits=None): return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", outpath, topN, 'author', ['[deleted]','AutoModerator'], - included_subreddits=None + included_subreddits=included_subreddits ) def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', - topN=25000): + topN=None, + included_subreddits=None): return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", outpath, topN, 'term', [], - included_subreddits=None + included_subreddits=included_subreddits ) def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', - topN=25000): + topN=None, + include_subreddits=None): return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", outpath, topN, 'author', ['[deleted]','AutoModerator'], - included_subreddits=None + included_subreddits=included_subreddits ) def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index e24ceee..7cafcb9 100644 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -8,32 +8,47 @@ import pandas as pd import fire from itertools import islice, chain from pathlib import Path -from similarities_helper import * +from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities +from scipy.sparse import csr_matrix from multiprocessing import Pool, cpu_count from functools import partial +# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet" +# tfidf_path = infile +# min_df=None +# max_df = None +# topN=100 +# term_colname='author' +# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet' +# included_subreddits=None -def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path): +def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms): term = term_colname term_id = term + '_id' term_id_new = term + '_id_new' print(f"loading matrix: {week}") - entries, subreddit_names = reindex_tfidf(infile = tfidf_path, - term_colname=term_colname, - min_df=min_df, - max_df=max_df, - included_subreddits=included_subreddits, - topN=topN, - week=week) - mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) + + entries = pull_tfidf(infile = tfidf_path, + term_colname=term_colname, + min_df=min_df, + max_df=max_df, + included_subreddits=included_subreddits, + topN=topN, + week=week.isoformat(), + rescale_idf=False) + + tfidf_colname='tf_idf' + # if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s + mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0])) + print('computing similarities') - sims = column_similarities(mat) + sims = simfunc(mat.T) del mat - sims = pd.DataFrame(sims.todense()) + sims = pd.DataFrame(sims) sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) - sims['_subreddit'] = names.subreddit.values + sims['_subreddit'] = subreddit_names.subreddit.values outfile = str(Path(outdir) / str(week)) - write_weekly_similarities(outfile, sims, week, names) + write_weekly_similarities(outfile, sims, week, subreddit_names) def pull_weeks(batch): return set(batch.to_pandas()['week']) @@ -41,25 +56,29 @@ def pull_weeks(batch): #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet') def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500): print(outfile) - tfidf_ds = ds.dataset(tfidf_path) - tfidf_ds = tfidf_ds.to_table(columns=["week"]) - batches = tfidf_ds.to_batches() - - with Pool(cpu_count()) as pool: - weeks = set(chain( * pool.imap_unordered(pull_weeks,batches))) - - weeks = sorted(weeks) # do this step in parallel if we have the memory for it. # should be doable with pool.map + spark = SparkSession.builder.getOrCreate() + df = spark.read.parquet(tfidf_path) + subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id") + nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max + weeks = df.select(f.col("week")).distinct().toPandas().week.values + spark.stop() + print(f"computing weekly similarities") - week_similarities_helper = partial(_week_similarities,simfunc=column_similarities, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN) + week_similarities_helper = partial(_week_similarities,simfunc=column_similarities, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms) + + pool = Pool(cpu_count()) + + list(pool.imap(week_similarities_helper,weeks)) + pool.close() + # with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine? - with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine? - list(pool.map(week_similarities_helper,weeks)) def author_cosine_similarities_weekly(outfile, min_df=2, max_df=None, included_subreddits=None, topN=500): - return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', + return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', outfile, 'author', min_df, -- 2.39.5