+ projection = {
+ 'subreddit_id':ds.field('subreddit_id'),
+ term_id:ds.field(term_id),
+ 'relative_tf':ds.field("relative_tf").cast('float32')
+ }
+
+ if not rescale_idf:
+ projection = {
+ 'subreddit_id':ds.field('subreddit_id'),
+ term_id:ds.field(term_id),
+ 'relative_tf':ds.field('relative_tf').cast('float32'),
+ 'tf_idf':ds.field('tf_idf').cast('float32')}
+
+
+ df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
+
+ df = df.to_pandas(split_blocks=True,self_destruct=True)
+ print("assigning indexes",flush=True)
+ if reindex:
+ df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
+ else:
+ df['subreddit_id_new'] = df['subreddit_id']
+
+ if reindex:
+ grouped = df.groupby(term_id)
+ df[term_id_new] = grouped.ngroup()
+ else:
+ df[term_id_new] = df[term_id]
+
+ if rescale_idf:
+ print("computing idf", flush=True)
+ df['new_count'] = grouped[term_id].transform('count')
+ N_docs = df.subreddit_id_new.max() + 1
+ df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
+ if tf_family == tf_weight.MaxTF:
+ df["tf_idf"] = df.relative_tf * df.idf
+ else: # tf_fam = tf_weight.Norm05
+ df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
+
+ return (df, tfidf_ds, ds_filter)
+
+ with Pool(cpu_count()) as pool:
+ chunks = pool.imap_unordered(pull_names,batches)
+ subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
+
+ subreddit_names = subreddit_names.set_index("subreddit_id")
+ new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
+ new_ids = new_ids.set_index('subreddit_id')
+ subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
+ subreddit_names = subreddit_names.drop("subreddit_id",1)
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ return(df, subreddit_names)
+
+def pull_names(batch):
+ return(batch.to_pandas().drop_duplicates())
+
+def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
+ '''
+ tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
+ '''
+
+ def proc_sims(sims, outfile):
+ if issparse(sims):
+ sims = sims.todense()
+
+ print(f"shape of sims:{sims.shape}")
+ print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
+ sims = pd.DataFrame(sims)
+ sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+ sims['_subreddit'] = subreddit_names.subreddit.values
+
+ p = Path(outfile)
+
+ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
+ output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
+ output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
+ p.parent.mkdir(exist_ok=True, parents=True)
+
+ sims.to_feather(outfile)