]> code.communitydata.science - cdsc_reddit.git/commitdiff
Merge branch 'excise_reindex' of code:cdsc_reddit into charliepatch
authorNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Mon, 3 May 2021 06:52:52 +0000 (23:52 -0700)
committerNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Mon, 3 May 2021 06:52:52 +0000 (23:52 -0700)
1  2 
similarities/cosine_similarities.py
similarities/similarities_helper.py
similarities/tfidf.py

index cc681b130c9a47aad89aef628defcbac5258842b,0c9c986c3e60ff28224f1eaed0072453af5d2948..8b9ee8e43a31ad5fa2baf14e83c8367d46bcad59
@@@ -2,15 -2,16 +2,16 @@@ import pandas as p
  import fire
  from pathlib import Path
  from similarities_helper import similarities, column_similarities
+ from functools import partial
  
  def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
  
      return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
  
+ # change so that these take in an input as an optional argument (for speed, but also for idf).
 -def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
 +def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
  
 -    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',
 +    return cosine_similarities(infile,
                                 'term',
                                 outfile,
                                 min_df,
@@@ -22,8 -23,8 +23,8 @@@
                                 to_date
                                 )
  
 -def author_cosine_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
 -    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
 +def author_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
 +    return cosine_similarities(infile,
                                 'author',
                                 outfile,
                                 min_df,
@@@ -35,8 -36,8 +36,8 @@@
                                 to_date=to_date
                                 )
  
 -def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
 -    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
 +def author_tf_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
 +    return cosine_similarities(infile,
                                 'author',
                                 outfile,
                                 min_df,
index 3ace8f29f3922838009adfb4dccf77e5f03b1e34,7f8a639aeecf255ed3db0e47f4ad14769cb5ceb4..0d49a56579649194175e313191bdd174370bc0dd
@@@ -2,11 -2,14 +2,14 @@@ from pyspark.sql import SparkSessio
  from pyspark.sql import Window
  from pyspark.sql import functions as f
  from enum import Enum
+ from multiprocessing import cpu_count, Pool
  from pyspark.mllib.linalg.distributed import CoordinateMatrix
  from tempfile import TemporaryDirectory
  import pyarrow
  import pyarrow.dataset as ds
+ from sklearn.metrics import pairwise_distances
  from scipy.sparse import csr_matrix, issparse
+ from sklearn.decomposition import TruncatedSVD
  import pandas as pd
  import numpy as np
  import pathlib
@@@ -17,128 -20,150 +20,150 @@@ class tf_weight(Enum)
      MaxTF = 1
      Norm05 = 2
  
- infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet"
+ infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
+ cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
  
- def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
-     term = term_colname
-     term_id = term + '_id'
-     term_id_new = term + '_id_new'
-     spark = SparkSession.builder.getOrCreate()
-     conf = spark.sparkContext.getConf()
-     print(exclude_phrases)
-     tfidf_weekly = spark.read.parquet(infile)
-     # create the time interval
-     if from_date is not None:
-         if type(from_date) is str:
-             from_date = datetime.fromisoformat(from_date)
-         tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
-         
-     if to_date is not None:
-         if type(to_date) is str:
-             to_date = datetime.fromisoformat(to_date)
-         tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
-     tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
-     tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
-     tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
-     tfidf = spark.read_parquet(tempdir.name)
-     subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
-     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-     subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-     return(tempdir, subreddit_names)
- def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
-     spark = SparkSession.builder.getOrCreate()
-     conf = spark.sparkContext.getConf()
-     print(exclude_phrases)
+ def termauthor_tfidf(term_tfidf_callable, author_tfidf_callable):
+     
  
-     tfidf = spark.read.parquet(infile)
+ # subreddits missing after this step don't have any terms that have a high enough idf
+ # try rewriting without merges
+ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
+     print("loading tfidf", flush=True)
+     tfidf_ds = ds.dataset(infile)
  
      if included_subreddits is None:
          included_subreddits = select_topN_subreddits(topN)
      else:
 -        included_subreddits = set(open(included_subreddits))
 +        included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
  
-     if exclude_phrases == True:
-         tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+     ds_filter = ds.field("subreddit").isin(included_subreddits)
+     if min_df is not None:
+         ds_filter &= ds.field("count") >= min_df
+     if max_df is not None:
+         ds_filter &= ds.field("count") <= max_df
+     if week is not None:
+         ds_filter &= ds.field("week") == week
+     if from_date is not None:
+         ds_filter &= ds.field("week") >= from_date
  
-     print("creating temporary parquet with matrix indicies")
-     tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
+     if to_date is not None:
+         ds_filter &= ds.field("week") <= to_date
  
-     tfidf = spark.read.parquet(tempdir.name)
-     subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+     term = term_colname
+     term_id = term + '_id'
+     term_id_new = term + '_id_new'
+     
+     projection = {
+         'subreddit_id':ds.field('subreddit_id'),
+         term_id:ds.field(term_id),
+         'relative_tf':ds.field("relative_tf").cast('float32')
+         }
+     if not rescale_idf:
+         projection = {
+             'subreddit_id':ds.field('subreddit_id'),
+             term_id:ds.field(term_id),
+             'relative_tf':ds.field('relative_tf').cast('float32'),
+             'tf_idf':ds.field('tf_idf').cast('float32')}
+     tfidf_ds = ds.dataset(infile)
+     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
+     df = df.to_pandas(split_blocks=True,self_destruct=True)
+     print("assigning indexes",flush=True)
+     df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
+     grouped = df.groupby(term_id)
+     df[term_id_new] = grouped.ngroup()
+     if rescale_idf:
+         print("computing idf", flush=True)
+         df['new_count'] = grouped[term_id].transform('count')
+         N_docs = df.subreddit_id_new.max() + 1
+         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
+         if tf_family == tf_weight.MaxTF:
+             df["tf_idf"] = df.relative_tf * df.idf
+         else: # tf_fam = tf_weight.Norm05
+             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
+     print("assigning names")
+     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
+     batches = subreddit_names.to_batches()
+     with Pool(cpu_count()) as pool:
+         chunks = pool.imap_unordered(pull_names,batches) 
+         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
+     subreddit_names = subreddit_names.set_index("subreddit_id")
+     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
+     new_ids = new_ids.set_index('subreddit_id')
+     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
+     subreddit_names = subreddit_names.drop("subreddit_id",1)
      subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-     subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-     spark.stop()
-     return (tempdir, subreddit_names)
+     return(df, subreddit_names)
  
+ def pull_names(batch):
+     return(batch.to_pandas().drop_duplicates())
  
- def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
+ def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
      '''
      tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
      '''
-     if from_date is not None or to_date is not None:
-         tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
-         
-     else:
-         tempdir, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
-     print("loading matrix")
-     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
-     mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)
-     print(f'computing similarities on mat. mat.shape:{mat.shape}')
-     print(f"size of mat is:{mat.data.nbytes}")
-     sims = simfunc(mat)
-     del mat
  
-     if issparse(sims):
-         sims = sims.todense()
+     def proc_sims(sims, outfile):
+         if issparse(sims):
+             sims = sims.todense()
  
-     print(f"shape of sims:{sims.shape}")
-     print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
-     sims = pd.DataFrame(sims)
-     sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
-     sims['subreddit'] = subreddit_names.subreddit.values
+         print(f"shape of sims:{sims.shape}")
+         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
+         sims = pd.DataFrame(sims)
+         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+         sims['_subreddit'] = subreddit_names.subreddit.values
  
-     p = Path(outfile)
+         p = Path(outfile)
  
-     output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-     output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-     output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+         outfile.parent.mkdir(exist_ok=True, parents=True)
  
-     sims.to_feather(outfile)
-     tempdir.cleanup()
+         sims.to_feather(outfile)
  
- def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
      term = term_colname
      term_id = term + '_id'
      term_id_new = term + '_id_new'
  
-     dataset = ds.dataset(path,format='parquet')
-     entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
-     return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
+     entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
+     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
  
- def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
-     term = term_colname
-     term_id = term + '_id'
-     term_id_new = term + '_id_new'
-     dataset = ds.dataset(path,format='parquet')
-     print(f"tfidf_colname:{tfidf_colname}")
-     entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
-     return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
-     
+     print("loading matrix")        
+     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
+     print(f'computing similarities on mat. mat.shape:{mat.shape}')
+     print(f"size of mat is:{mat.data.nbytes}",flush=True)
+     sims = simfunc(mat)
+     del mat
+     if hasattr(sims,'__next__'):
+         for simmat, name in sims:
+             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
+     else:
+         proc_sims(simmat, outfile)
  
  def write_weekly_similarities(path, sims, week, names):
      sims['week'] = week
      p = pathlib.Path(path)
      if not p.is_dir():
-         p.mkdir()
+         p.mkdir(exist_ok=True,parents=True)
          
      # reformat as a pairwise list
-     sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
+     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
      sims.to_parquet(p / week.isoformat())
  
  def column_overlaps(mat):
  
      return intersection / den
      
- def column_similarities(mat):
-     norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
-     mat = mat.multiply(1/norm)
-     sims = mat.T @ mat
-     return(sims)
- def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
-     term = term_colname
-     term_id = term + '_id'
-     term_id_new = term + '_id_new'
-     if min_df is None:
-         min_df = 0.1 * len(included_subreddits)
-         tfidf = tfidf.filter(f.col('count') >= min_df)
-     if max_df is not None:
-         tfidf = tfidf.filter(f.col('count') <= max_df)
-     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-     # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
-     sub_ids = tfidf.select(['subreddit_id','week']).distinct()
-     sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
-     tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
-     # only use terms in at least min_df included subreddits in a given week
-     new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
-     tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
-     # reset the term ids
-     term_ids = tfidf.select([term_id,'week']).distinct()
-     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
-     tfidf = tfidf.join(term_ids,[term_id,'week'])
-     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
-     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
-     tfidf = tfidf.repartition('week')
-     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
-     return(tempdir)
-     
- def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
-     term = term_colname
+ def test_lsi_sims():
+     term = "term"
      term_id = term + '_id'
      term_id_new = term + '_id_new'
  
-     if min_df is None:
-         min_df = 0.1 * len(included_subreddits)
-         tfidf = tfidf.filter(f.col('count') >= min_df)
-     if max_df is not None:
-         tfidf = tfidf.filter(f.col('count') <= max_df)
-     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-     # reset the subreddit ids
-     sub_ids = tfidf.select('subreddit_id').distinct()
-     sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
-     tfidf = tfidf.join(sub_ids,'subreddit_id')
-     # only use terms in at least min_df included subreddits
-     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-     tfidf = tfidf.join(new_count,term_id,how='inner')
-     
-     # reset the term ids
-     term_ids = tfidf.select([term_id]).distinct()
-     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
-     tfidf = tfidf.join(term_ids,term_id)
-     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
+     t1 = time.perf_counter()
+     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
+                                              term_colname='term',
+                                              min_df=2000,
+                                              topN=10000
+                                              )
+     t2 = time.perf_counter()
+     print(f"first load took:{t2 - t1}s")
+     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
+                                              term_colname='term',
+                                              min_df=2000,
+                                              topN=10000
+                                              )
+     t3=time.perf_counter()
+     print(f"second load took:{t3 - t2}s")
+     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
+     sims = list(lsi_column_similarities(mat, [10,50]))
+     sims_og = sims
+     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
+ # n_components is the latent dimensionality. sklearn recommends 100. More might be better
+ # if n_components is a list we'll return a list of similarities with different latent dimensionalities
+ # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
+ # this function takes the svd and then the column similarities of it
+ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
+     # first compute the lsi of the matrix
+     # then take the column similarities
+     print("running LSI",flush=True)
+     if type(n_components) is int:
+         n_components = [n_components]
+     n_components = sorted(n_components,reverse=True)
      
-     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
+     svd_components = n_components[0]
+     svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
+     mod = svd.fit(tfidfmat.T)
+     lsimat = mod.transform(tfidfmat.T)
+     for n_dims in n_components:
+         sims = column_similarities(lsimat[:,np.arange(n_dims)])
+         if len(n_components) > 1:
+             yield (sims, n_dims)
+         else:
+             return sims
      
-     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
-     return tempdir
- # try computing cosine similarities using spark
- def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
-     term = term_colname
-     term_id = term + '_id'
-     term_id_new = term + '_id_new'
  
-     if min_df is None:
-         min_df = 0.1 * len(included_subreddits)
-     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-     tfidf = tfidf.cache()
-     # reset the subreddit ids
-     sub_ids = tfidf.select('subreddit_id').distinct()
-     sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
-     tfidf = tfidf.join(sub_ids,'subreddit_id')
-     # only use terms in at least min_df included subreddits
-     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-     tfidf = tfidf.join(new_count,term_id,how='inner')
-     
-     # reset the term ids
-     term_ids = tfidf.select([term_id]).distinct()
-     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
-     tfidf = tfidf.join(term_ids,term_id)
-     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
-     # step 1 make an rdd of entires
-     # sorted by (dense) spark subreddit id
-     n_partitions = int(len(included_subreddits)*2 / 5)
-     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
-     # put like 10 subredis in each partition
-     # step 2 make it into a distributed.RowMatrix
-     coordMat = CoordinateMatrix(entries)
-     coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
-     # this needs to be an IndexedRowMatrix()
-     mat = coordMat.toRowMatrix()
-     #goal: build a matrix of subreddit columns and tf-idfs rows
-     sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
-     return (sim_dist, tfidf)
+ def column_similarities(mat):
+     return 1 - pairwise_distances(mat,metric='cosine')
  
  
  def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
      else: # tf_fam = tf_weight.Norm05
          df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
  
-     return df
+     df = df.repartition(400,'subreddit','week')
+     dfwriter = df.write.partitionBy("week").sortBy("subreddit")
+     return dfwriter
  
  def _calc_tfidf(df, term_colname, tf_family):
      term = term_colname
  
      df = df.join(max_subreddit_terms, on='subreddit')
  
-     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
+     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
  
      # group by term. term is unique
      idf = df.groupby([term]).count()
@@@ -385,10 -338,28 +338,28 @@@ def build_tfidf_dataset(df, include_sub
      df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
  
      df = _calc_tfidf(df, term_colname, tf_family)
-     return df
+     df = df.repartition('subreddit')
+     dfwriter = df.write.sortBy("subreddit","tf")
+     return dfwriter
  
  def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
      rankdf = pd.read_csv(path)
      included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
      return included_subreddits
+ def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
+                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
+     spark = SparkSession.builder.getOrCreate()
+     df = spark.read.parquet(inpath)
+     df = df.repartition(400,'subreddit')
+     df.write.parquet(outpath,mode='overwrite')
+     
+ def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
+                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
+     spark = SparkSession.builder.getOrCreate()
+     df = spark.read.parquet(inpath)
+     df = df.repartition(400,'subreddit','week')
+     dfwriter = df.write.partitionBy("week")
+     dfwriter.parquet(outpath,mode='overwrite')
diff --combined similarities/tfidf.py
index 7f579faabb7092dacfb210f913f7b243d4c79337,002e89f785b37fd9df3c903775ab6f71846909d4..110536eeb22b5c13132ff17b33d882fc47da63b7
@@@ -11,14 -11,13 +11,13 @@@ def _tfidf_wrapper(func, inpath, outpat
      df = df.filter(~ f.col(term_colname).isin(exclude))
  
      if included_subreddits is not None:
 -        include_subs = list(open(included_subreddits))
 +        include_subs = set(map(str.strip,map(str.lower, open(included_subreddits))))
      else:
          include_subs = select_topN_subreddits(topN)
  
-     df = func(df, include_subs, term_colname)
-     df.write.parquet(outpath,mode='overwrite',compression='snappy')
+     dfwriter = func(df, include_subs, term_colname)
  
+     dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
      spark.stop()
  
  def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
@@@ -28,44 -27,40 +27,44 @@@ def tfidf_weekly(inpath, outpath, topN
      return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
  
  def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
 -                  topN=25000):
 +                  topN=25000,
 +                  included_subreddits=None):
  
      return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
                   outpath,
                   topN,
                   'author',
                   ['[deleted]','AutoModerator'],
 -                 included_subreddits=None
 +                 included_subreddits=included_subreddits
                   )
  
  def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
 -                topN=25000):
 +                topN=25000,
 +                included_subreddits=None):
  
      return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
                   outpath,
                   topN,
                   'term',
                   [],
 -                 included_subreddits=None
 +                 included_subreddits=included_subreddits
                   )
  
  def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
 -                         topN=25000):
 +                         topN=25000,
 +                         included_subreddits=None):
  
      return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
                          outpath,
                          topN,
                          'author',
                          ['[deleted]','AutoModerator'],
 -                        included_subreddits=None
 +                        included_subreddits=included_subreddits
                          )
  
  def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
 -                       topN=25000):
 +                       topN=25000,
 +                       included_subreddits=None):
  
  
      return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
@@@ -73,7 -68,7 +72,7 @@@
                          topN,
                          'term',
                          [],
 -                        included_subreddits=None
 +                        included_subreddits=included_subreddits
                          )
  
  

Community Data Science Collective || Want to submit a patch?