X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/0b95bea30eebe7660013a799bd09f4564d025ddc..refs/heads/synced/excise_reindex:/similarities/similarities_helper.py?ds=inline diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index e59563e..03c10b2 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -15,24 +15,53 @@ import numpy as np import pathlib from datetime import datetime from pathlib import Path +import pickle class tf_weight(Enum): MaxTF = 1 Norm05 = 2 -infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" -cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" +# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet" +# cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet" # subreddits missing after this step don't have any terms that have a high enough idf # try rewriting without merges -def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF): - print("loading tfidf", flush=True) - tfidf_ds = ds.dataset(infile) + +# does reindex_tfidf, but without reindexing. +def reindex_tfidf(*args, **kwargs): + df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True) + + print("assigning names") + subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) + batches = subreddit_names.to_batches() + + with Pool(cpu_count()) as pool: + chunks = pool.imap_unordered(pull_names,batches) + subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + subreddit_names = subreddit_names.set_index("subreddit_id") + + new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + new_ids = new_ids.set_index('subreddit_id') + subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + return(df, subreddit_names) + +def pull_tfidf(*args, **kwargs): + df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False) + return df + +def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): + print(f"loading tfidf {infile}", flush=True) + if week is not None: + tfidf_ds = ds.dataset(infile, partitioning='hive') + else: + tfidf_ds = ds.dataset(infile) if included_subreddits is None: included_subreddits = select_topN_subreddits(topN) else: - included_subreddits = set(open(included_subreddits)) + included_subreddits = set(map(str.strip,open(included_subreddits))) ds_filter = ds.field("subreddit").isin(included_subreddits) @@ -68,15 +97,22 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre 'relative_tf':ds.field('relative_tf').cast('float32'), 'tf_idf':ds.field('tf_idf').cast('float32')} - tfidf_ds = ds.dataset(infile) + print(projection) df = tfidf_ds.to_table(filter=ds_filter,columns=projection) df = df.to_pandas(split_blocks=True,self_destruct=True) print("assigning indexes",flush=True) - df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() - grouped = df.groupby(term_id) - df[term_id_new] = grouped.ngroup() + if reindex: + df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + else: + df['subreddit_id_new'] = df['subreddit_id'] + + if reindex: + grouped = df.groupby(term_id) + df[term_id_new] = grouped.ngroup() + else: + df[term_id_new] = df[term_id] if rescale_idf: print("computing idf", flush=True) @@ -88,9 +124,7 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre else: # tf_fam = tf_weight.Norm05 df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf - print("assigning names") - subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id']) - batches = subreddit_names.to_batches() + return (df, tfidf_ds, ds_filter) with Pool(cpu_count()) as pool: chunks = pool.imap_unordered(pull_names,batches) @@ -107,7 +141,7 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre def pull_names(batch): return(batch.to_pandas().drop_duplicates()) -def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): +def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'): ''' tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities. ''' @@ -127,7 +161,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - outfile.parent.mkdir(exist_ok=True, parents=True) + p.parent.mkdir(exist_ok=True, parents=True) sims.to_feather(outfile) @@ -135,7 +169,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non term_id = term + '_id' term_id_new = term + '_id_new' - entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) + entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) print("loading matrix") @@ -151,7 +185,7 @@ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=Non for simmat, name in sims: proc_sims(simmat, Path(outfile)/(str(name) + ".feather")) else: - proc_sims(simmat, outfile) + proc_sims(sims, outfile) def write_weekly_similarities(path, sims, week, names): sims['week'] = week @@ -204,10 +238,9 @@ def test_lsi_sims(): # if n_components is a list we'll return a list of similarities with different latent dimensionalities # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations. # this function takes the svd and then the column similarities of it -def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'): +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None): # first compute the lsi of the matrix # then take the column similarities - print("running LSI",flush=True) if type(n_components) is int: n_components = [n_components] @@ -215,9 +248,24 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196 n_components = sorted(n_components,reverse=True) svd_components = n_components[0] - svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) - mod = svd.fit(tfidfmat.T) + + if lsi_model_load is not None and Path(lsi_model_load).exists(): + print("loading LSI") + mod = pickle.load(open(lsi_model_load ,'rb')) + lsi_model_save = lsi_model_load + + else: + print("running LSI",flush=True) + + svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) + mod = svd.fit(tfidfmat.T) + lsimat = mod.transform(tfidfmat.T) + if lsi_model_save is not None: + Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True) + pickle.dump(mod, open(lsi_model_save,'wb')) + + sims_list = [] for n_dims in n_components: sims = column_similarities(lsimat[:,np.arange(n_dims)]) if len(n_components) > 1: @@ -254,20 +302,20 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1) # collect the dictionary to make a pydict of terms to indexes - terms = idf.select([term,'week']).distinct() # terms are distinct + terms = idf.select([term]).distinct() # terms are distinct - terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct + terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct # make subreddit ids - subreddits = df.select(['subreddit','week']).distinct() - subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit"))) + subreddits = df.select(['subreddit']).distinct() + subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) - df = df.join(subreddits,on=['subreddit','week']) + df = df.join(subreddits,on=['subreddit']) # map terms to indexes in the tfs and the idfs - df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique + df = df.join(terms,on=[term]) # subreddit-term-id is unique - idf = idf.join(terms,on=[term,'week']) + idf = idf.join(terms,on=[term]) # join on subreddit/term to create tf/dfs indexed by term df = df.join(idf, on=[term_id, term,'week']) @@ -327,7 +375,7 @@ def _calc_tfidf(df, term_colname, tf_family): return df -def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): +def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): term = term_colname term_id = term + '_id' # aggregate counts by week. now subreddit-term is distinct