import fire
from pathlib import Path
from similarities_helper import similarities, column_similarities
+from functools import partial
- def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
+ def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
- return similarities(inpath=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
+ return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
+# change so that these take in an input as an optional argument (for speed, but also for idf).
+def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
- return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',
+ def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
+
+ return cosine_similarities(infile,
'term',
outfile,
min_df,
'relative_tf':ds.field('relative_tf').cast('float32'),
'tf_idf':ds.field('tf_idf').cast('float32')}
- tfidf_ds = ds.dataset(infile)
+
df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
df = df.to_pandas(split_blocks=True,self_destruct=True)
else: # tf_fam = tf_weight.Norm05
df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
- print("assigning names")
- subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
- batches = subreddit_names.to_batches()
+ return (df, tfidf_ds, ds_filter)
+ with Pool(cpu_count()) as pool:
+ chunks = pool.imap_unordered(pull_names,batches)
+ subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
+
+ subreddit_names = subreddit_names.set_index("subreddit_id")
+ new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
+ new_ids = new_ids.set_index('subreddit_id')
+ subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
+ subreddit_names = subreddit_names.drop("subreddit_id",1)
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ return(df, subreddit_names)
def pull_names(batch):
return(batch.to_pandas().drop_duplicates())
idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
# collect the dictionary to make a pydict of terms to indexes
- terms = idf.select([term,'week']).distinct() # terms are distinct
+ terms = idf.select([term]).distinct() # terms are distinct
- terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
+ terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
- subreddits = df.select(['subreddit','week']).distinct()
- subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
+ subreddits = df.select(['subreddit']).distinct()
+ subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
- # df = df.cache()
- df = df.join(subreddits,on=['subreddit','week'])
+ df = df.join(subreddits,on=['subreddit'])
# map terms to indexes in the tfs and the idfs
- df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
+ df = df.join(terms,on=[term]) # subreddit-term-id is unique
- idf = idf.join(terms,on=[term,'week'])
+ idf = idf.join(terms,on=[term])
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term,'week'])