From: Nathan TeBlunthuis Date: Tue, 3 Aug 2021 22:13:21 +0000 (-0700) Subject: Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/ce549c6c97058325ac6f1b9dab20406af1dbb2af?ds=sidebyside;hp=-c Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex --- ce549c6c97058325ac6f1b9dab20406af1dbb2af diff --combined similarities/cosine_similarities.py index 98f1454,cc681b1..b9bab17 --- a/similarities/cosine_similarities.py +++ b/similarities/cosine_similarities.py @@@ -2,46 -2,48 +2,51 @@@ import pandas as p import fire from pathlib import Path from similarities_helper import similarities, column_similarities +from functools import partial - def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): + def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'): - return similarities(inpath=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) + return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) +# change so that these take in an input as an optional argument (for speed, but also for idf). +def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None): - return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', + def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None): + + return cosine_similarities(infile, 'term', outfile, min_df, max_df, included_subreddits, topN, + exclude_phrases, from_date, to_date ) - def author_cosine_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None): - return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', + def author_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None): + return cosine_similarities(infile, 'author', outfile, min_df, max_df, included_subreddits, topN, + exclude_phrases=False, from_date=from_date, to_date=to_date ) - def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None): - return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', + def author_tf_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None): + return cosine_similarities(infile, 'author', outfile, min_df, max_df, included_subreddits, topN, + exclude_phrases=False, from_date=from_date, to_date=to_date, tfidf_colname='relative_tf' diff --combined similarities/similarities_helper.py index a4983b3,1492983..13845d1 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@@ -15,53 -15,24 +15,53 @@@ import numpy as n import pathlib from datetime import datetime from pathlib import Path +import pickle class tf_weight(Enum): MaxTF = 1 Norm05 = 2 -infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" -cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" +# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" +# cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" # subreddits missing after this step don't have any terms that have a high enough idf # try rewriting without merges -def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF): - print("loading tfidf", flush=True) - tfidf_ds = ds.dataset(infile) + +# does reindex_tfidf, but without reindexing. +def reindex_tfidf(*args, **kwargs): + df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True) + + print("assigning names") + subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) + batches = subreddit_names.to_batches() + + with Pool(cpu_count()) as pool: + chunks = pool.imap_unordered(pull_names,batches) + subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + subreddit_names = subreddit_names.set_index("subreddit_id") + + new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + new_ids = new_ids.set_index('subreddit_id') + subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + return(df, subreddit_names) + +def pull_tfidf(*args, **kwargs): + df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False) + return df + +def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): + print(f"loading tfidf {infile}", flush=True) + if week is not None: + tfidf_ds = ds.dataset(infile, partitioning='hive') + else: + tfidf_ds = ds.dataset(infile) if included_subreddits is None: included_subreddits = select_topN_subreddits(topN) else: - included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits)))) + included_subreddits = set(map(str.strip,open(included_subreddits))) ds_filter = ds.field("subreddit").isin(included_subreddits) @@@ -97,20 -68,15 +97,21 @@@ 'relative_tf':ds.field('relative_tf').cast('float32'), 'tf_idf':ds.field('tf_idf').cast('float32')} - tfidf_ds = ds.dataset(infile) + df = tfidf_ds.to_table(filter=ds_filter,columns=projection) df = df.to_pandas(split_blocks=True,self_destruct=True) print("assigning indexes",flush=True) - df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() - grouped = df.groupby(term_id) - df[term_id_new] = grouped.ngroup() + if reindex: + df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + else: + df['subreddit_id_new'] = df['subreddit_id'] + + if reindex: + grouped = df.groupby(term_id) + df[term_id_new] = grouped.ngroup() + else: + df[term_id_new] = df[term_id] if rescale_idf: print("computing idf", flush=True) @@@ -122,13 -88,26 +123,24 @@@ else: # tf_fam = tf_weight.Norm05 df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf - print("assigning names") - subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) - batches = subreddit_names.to_batches() + return (df, tfidf_ds, ds_filter) + with Pool(cpu_count()) as pool: + chunks = pool.imap_unordered(pull_names,batches) + subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + + subreddit_names = subreddit_names.set_index("subreddit_id") + new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + new_ids = new_ids.set_index('subreddit_id') + subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + return(df, subreddit_names) def pull_names(batch): return(batch.to_pandas().drop_duplicates()) -def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): +def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): ''' tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities. ''' @@@ -148,7 -127,7 +160,7 @@@ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - outfile.parent.mkdir(exist_ok=True, parents=True) + p.parent.mkdir(exist_ok=True, parents=True) sims.to_feather(outfile) @@@ -156,7 -135,7 +168,7 @@@ term_id = term + '_id' term_id_new = term + '_id_new' - entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) + entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) print("loading matrix") @@@ -165,7 -144,6 +177,6 @@@ print(f'computing similarities on mat. mat.shape:{mat.shape}') print(f"size of mat is:{mat.data.nbytes}",flush=True) - # transform this to debug term tfidf sims = simfunc(mat) del mat @@@ -173,7 -151,7 +184,7 @@@ for simmat, name in sims: proc_sims(simmat, Path(outfile)/(str(name) + ".feather")) else: - proc_sims(simmat, outfile) + proc_sims(sims, outfile) def write_weekly_similarities(path, sims, week, names): sims['week'] = week @@@ -226,7 -204,7 +237,7 @@@ def test_lsi_sims() # if n_components is a list we'll return a list of similarities with different latent dimensionalities # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations. # this function takes the svd and then the column similarities of it -def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'): +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None): # first compute the lsi of the matrix # then take the column similarities print("running LSI",flush=True) @@@ -237,32 -215,21 +248,31 @@@ n_components = sorted(n_components,reverse=True) svd_components = n_components[0] - svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) - mod = svd.fit(tfidfmat.T) + + if lsi_model_load is not None: + mod = pickle.load(open(lsi_model_load ,'rb')) + + else: + svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) + mod = svd.fit(tfidfmat.T) + lsimat = mod.transform(tfidfmat.T) + if lsi_model_save is not None: + pickle.dump(mod, open(lsi_model_save,'wb')) + + sims_list = [] for n_dims in n_components: sims = column_similarities(lsimat[:,np.arange(n_dims)]) if len(n_components) > 1: yield (sims, n_dims) else: return sims + def column_similarities(mat): return 1 - pairwise_distances(mat,metric='cosine') - # need to rewrite this so that subreddit ids and term ids are fixed over the whole thing. - # this affords taking the LSI similarities. - # fill all 0s if we don't have it. + def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname term_id = term + '_id' @@@ -287,21 -254,20 +297,20 @@@ idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1) # collect the dictionary to make a pydict of terms to indexes - terms = idf.select([term,'week']).distinct() # terms are distinct + terms = idf.select([term]).distinct() # terms are distinct - terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct + terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct # make subreddit ids - subreddits = df.select(['subreddit','week']).distinct() - subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit"))) + subreddits = df.select(['subreddit']).distinct() + subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) - # df = df.cache() - df = df.join(subreddits,on=['subreddit','week']) + df = df.join(subreddits,on=['subreddit']) # map terms to indexes in the tfs and the idfs - df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique + df = df.join(terms,on=[term]) # subreddit-term-id is unique - idf = idf.join(terms,on=[term,'week']) + idf = idf.join(terms,on=[term]) # join on subreddit/term to create tf/dfs indexed by term df = df.join(idf, on=[term_id, term,'week']) @@@ -361,7 -327,7 +370,7 @@@ def _calc_tfidf(df, term_colname, tf_fa return df -def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): +def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname term_id = term + '_id' # aggregate counts by week. now subreddit-term is distinct diff --combined similarities/tfidf.py index 94dcbf5,110536e..19d3013 --- a/similarities/tfidf.py +++ b/similarities/tfidf.py @@@ -1,7 -1,7 +1,7 @@@ import fire from pyspark.sql import SparkSession from pyspark.sql import functions as f -from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits +from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits): spark = SparkSession.builder.getOrCreate() @@@ -11,7 -11,7 +11,7 @@@ df = df.filter(~ f.col(term_colname).isin(exclude)) if included_subreddits is not None: - include_subs = set(map(str.strip,map(str.lower, open(included_subreddits)))) + include_subs = set(map(str.strip,open(included_subreddits))) else: include_subs = select_topN_subreddits(topN) @@@ -21,13 -21,13 +21,13 @@@ spark.stop() def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits): - return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) + return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits): return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', - topN=25000, + topN=None, included_subreddits=None): return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", @@@ -39,7 -39,7 +39,7 @@@ ) def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', - topN=25000, + topN=None, included_subreddits=None): return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", @@@ -51,8 -51,8 +51,8 @@@ ) def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', - topN=25000, + topN=None, - include_subreddits=None): + included_subreddits=None): return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", outpath, @@@ -63,7 -63,8 +63,8 @@@ ) def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', - topN=25000): - topN=25000, ++ topN=None, + included_subreddits=None): return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", @@@ -71,7 -72,7 +72,7 @@@ topN, 'term', [], - included_subreddits=None + included_subreddits=included_subreddits )