]> code.communitydata.science - cdsc_reddit.git/commitdiff
Merge branch 'excise_reindex' of code:cdsc_reddit into charliepatch
authorNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Mon, 3 May 2021 06:52:52 +0000 (23:52 -0700)
committerNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Mon, 3 May 2021 06:52:52 +0000 (23:52 -0700)
1  2 
similarities/cosine_similarities.py
similarities/similarities_helper.py
similarities/tfidf.py

index cc681b130c9a47aad89aef628defcbac5258842b,0c9c986c3e60ff28224f1eaed0072453af5d2948..8b9ee8e43a31ad5fa2baf14e83c8367d46bcad59
@@@ -7,10 -8,10 +8,10 @@@ def cosine_similarities(infile, term_co
  
      return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
  
+ # change so that these take in an input as an optional argument (for speed, but also for idf).
 -def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
 +def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
  
 -    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',
 +    return cosine_similarities(infile,
                                 'term',
                                 outfile,
                                 min_df,
index 3ace8f29f3922838009adfb4dccf77e5f03b1e34,7f8a639aeecf255ed3db0e47f4ad14769cb5ceb4..0d49a56579649194175e313191bdd174370bc0dd
@@@ -60,23 -35,82 +35,82 @@@ def reindex_tfidf(infile, term_colname
      if included_subreddits is None:
          included_subreddits = select_topN_subreddits(topN)
      else:
 -        included_subreddits = set(open(included_subreddits))
 +        included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
  
-     if exclude_phrases == True:
-         tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+     ds_filter = ds.field("subreddit").isin(included_subreddits)
+     if min_df is not None:
+         ds_filter &= ds.field("count") >= min_df
+     if max_df is not None:
+         ds_filter &= ds.field("count") <= max_df
+     if week is not None:
+         ds_filter &= ds.field("week") == week
+     if from_date is not None:
+         ds_filter &= ds.field("week") >= from_date
  
-     print("creating temporary parquet with matrix indicies")
-     tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
+     if to_date is not None:
+         ds_filter &= ds.field("week") <= to_date
  
-     tfidf = spark.read.parquet(tempdir.name)
-     subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+     term = term_colname
+     term_id = term + '_id'
+     term_id_new = term + '_id_new'
+     
+     projection = {
+         'subreddit_id':ds.field('subreddit_id'),
+         term_id:ds.field(term_id),
+         'relative_tf':ds.field("relative_tf").cast('float32')
+         }
+     if not rescale_idf:
+         projection = {
+             'subreddit_id':ds.field('subreddit_id'),
+             term_id:ds.field(term_id),
+             'relative_tf':ds.field('relative_tf').cast('float32'),
+             'tf_idf':ds.field('tf_idf').cast('float32')}
+     tfidf_ds = ds.dataset(infile)
+     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
+     df = df.to_pandas(split_blocks=True,self_destruct=True)
+     print("assigning indexes",flush=True)
+     df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
+     grouped = df.groupby(term_id)
+     df[term_id_new] = grouped.ngroup()
+     if rescale_idf:
+         print("computing idf", flush=True)
+         df['new_count'] = grouped[term_id].transform('count')
+         N_docs = df.subreddit_id_new.max() + 1
+         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
+         if tf_family == tf_weight.MaxTF:
+             df["tf_idf"] = df.relative_tf * df.idf
+         else: # tf_fam = tf_weight.Norm05
+             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
+     print("assigning names")
+     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
+     batches = subreddit_names.to_batches()
+     with Pool(cpu_count()) as pool:
+         chunks = pool.imap_unordered(pull_names,batches) 
+         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
+     subreddit_names = subreddit_names.set_index("subreddit_id")
+     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
+     new_ids = new_ids.set_index('subreddit_id')
+     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
+     subreddit_names = subreddit_names.drop("subreddit_id",1)
      subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-     subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-     spark.stop()
-     return (tempdir, subreddit_names)
+     return(df, subreddit_names)
  
+ def pull_names(batch):
+     return(batch.to_pandas().drop_duplicates())
  
- def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
+ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
      '''
      tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
      '''
Simple merge

Community Data Science Collective || Want to submit a patch?