X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/541e125b28dbca5c06d2160a5cd59ce112657b2a..65deba5e4e4ad9e3f23e82573491f7d6b190e644:/similarities/similarities_helper.py diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index d97e519..202220c 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -43,7 +43,7 @@ def reindex_tfidf(*args, **kwargs): new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() new_ids = new_ids.set_index('subreddit_id') subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() - subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.drop("subreddit_id",axis=1) subreddit_names = subreddit_names.sort_values("subreddit_id_new") return(df, subreddit_names) @@ -51,8 +51,9 @@ def pull_tfidf(*args, **kwargs): df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False) return df -def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): - print(f"loading tfidf {infile}", flush=True) +def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): + print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True) + if week is not None: tfidf_ds = ds.dataset(infile, partitioning='hive') else: @@ -94,23 +95,23 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu projection = { 'subreddit_id':ds.field('subreddit_id'), term_id:ds.field(term_id), - 'relative_tf':ds.field('relative_tf').cast('float32'), 'tf_idf':ds.field('tf_idf').cast('float32')} - print(projection) - + print(projection, flush=True) + print(ds_filter, flush=True) df = tfidf_ds.to_table(filter=ds_filter,columns=projection) df = df.to_pandas(split_blocks=True,self_destruct=True) - print("assigning indexes",flush=True) + if reindex: - df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + print("assigning indexes",flush=True) + df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1 else: df['subreddit_id_new'] = df['subreddit_id'] if reindex: grouped = df.groupby(term_id) - df[term_id_new] = grouped.ngroup() + df[term_id_new] = grouped.ngroup() + 1 else: df[term_id_new] = df[term_id] @@ -126,17 +127,17 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu return (df, tfidf_ds, ds_filter) - with Pool(cpu_count()) as pool: - chunks = pool.imap_unordered(pull_names,batches) - subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + # with Pool(cpu_count()) as pool: + # chunks = pool.imap_unordered(pull_names,batches) + # subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() - subreddit_names = subreddit_names.set_index("subreddit_id") - new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() - new_ids = new_ids.set_index('subreddit_id') - subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() - subreddit_names = subreddit_names.drop("subreddit_id",1) - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - return(df, subreddit_names) + # subreddit_names = subreddit_names.set_index("subreddit_id") + # new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + # new_ids = new_ids.set_index('subreddit_id') + # subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + # subreddit_names = subreddit_names.drop("subreddit_id",1) + # subreddit_names = subreddit_names.sort_values("subreddit_id_new") + # return(df, subreddit_names) def pull_names(batch): return(batch.to_pandas().drop_duplicates()) @@ -170,7 +171,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non term_id_new = term + '_id_new' entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) - mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) + mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))) print("loading matrix") @@ -238,7 +239,8 @@ def test_lsi_sims(): # if n_components is a list we'll return a list of similarities with different latent dimensionalities # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations. # this function takes the svd and then the column similarities of it -def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None): +# lsi_model_load = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl" +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model=None): # first compute the lsi of the matrix # then take the column similarities @@ -249,28 +251,24 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196 svd_components = n_components[0] - if lsi_model_load is not None and Path(lsi_model_load).exists(): - print("loading LSI") - mod = pickle.load(open(lsi_model_load ,'rb')) - lsi_model_save = lsi_model_load - - else: + if lsi_model is None: print("running LSI",flush=True) - svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) mod = svd.fit(tfidfmat.T) + else: + mod = lsi_model lsimat = mod.transform(tfidfmat.T) if lsi_model_save is not None: + Path(lsi_model_save).parent.mkdir(exist_ok=True,parents=True) pickle.dump(mod, open(lsi_model_save,'wb')) - sims_list = [] + print(n_components) for n_dims in n_components: + print("computing similarities") sims = column_similarities(lsimat[:,np.arange(n_dims)]) - if len(n_components) > 1: - yield (sims, n_dims) - else: - return sims + yield (sims, n_dims) + def column_similarities(mat): @@ -326,11 +324,11 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig else: # tf_fam = tf_weight.Norm05 df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf) - df = df.repartition(400,'subreddit','week') + df = df.repartition('week') dfwriter = df.write.partitionBy("week") return dfwriter -def _calc_tfidf(df, term_colname, tf_family): +def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None): term = term_colname term_id = term + '_id' @@ -348,7 +346,13 @@ def _calc_tfidf(df, term_colname, tf_family): idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1) # collect the dictionary to make a pydict of terms to indexes - terms = idf.select(term).distinct() # terms are distinct + terms = idf + if min_df is not None: + terms = terms.filter(f.col('count')>=min_df) + if max_df is not None: + terms = terms.filter(f.col('count')<=max_df) + + terms = terms.select(term).distinct() # terms are distinct terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct # make subreddit ids @@ -358,12 +362,12 @@ def _calc_tfidf(df, term_colname, tf_family): df = df.join(subreddits,on='subreddit') # map terms to indexes in the tfs and the idfs - df = df.join(terms,on=term) # subreddit-term-id is unique + df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique - idf = idf.join(terms,on=term) + idf = idf.join(terms,on=term,how='inner') # join on subreddit/term to create tf/dfs indexed by term - df = df.join(idf, on=[term_id, term]) + df = df.join(idf, on=[term_id, term],how='inner') # agg terms by subreddit to make sparse tf/df vectors if tf_family == tf_weight.MaxTF: @@ -374,14 +378,14 @@ def _calc_tfidf(df, term_colname, tf_family): return df -def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): +def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None): term = term_colname term_id = term + '_id' - # aggregate counts by week. now subreddit-term is distinct + df = df.filter(df.subreddit.isin(include_subs)) df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf')) - df = _calc_tfidf(df, term_colname, tf_family) + df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df) df = df.repartition('subreddit') dfwriter = df.write return dfwriter