]> code.communitydata.science - cdsc_reddit.git/commitdiff
Merge branch 'master' of code:cdsc_reddit into excise_reindex
authorNathan TeBlunthuis <nathante@uw.edu>
Tue, 3 Aug 2021 22:02:08 +0000 (15:02 -0700)
committerNathan TeBlunthuis <nathante@uw.edu>
Tue, 3 Aug 2021 22:02:08 +0000 (15:02 -0700)
1  2 
similarities/similarities_helper.py
similarities/tfidf.py

index e59563e396bc0988cf645dc80a6cba27997a512e,3ace8f29f3922838009adfb4dccf77e5f03b1e34..1492983f88695111af812c600c7ece03e7abe802
@@@ -2,14 -2,11 +2,14 @@@ from pyspark.sql import SparkSessio
  from pyspark.sql import Window
  from pyspark.sql import functions as f
  from enum import Enum
 +from multiprocessing import cpu_count, Pool
  from pyspark.mllib.linalg.distributed import CoordinateMatrix
  from tempfile import TemporaryDirectory
  import pyarrow
  import pyarrow.dataset as ds
 +from sklearn.metrics import pairwise_distances
  from scipy.sparse import csr_matrix, issparse
 +from sklearn.decomposition import TruncatedSVD
  import pandas as pd
  import numpy as np
  import pathlib
@@@ -20,147 -17,128 +20,147 @@@ class tf_weight(Enum)
      MaxTF = 1
      Norm05 = 2
  
 -infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet"
 +infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
 +cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
  
 -def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
 -    term = term_colname
 -    term_id = term + '_id'
 -    term_id_new = term + '_id_new'
 -
 -    spark = SparkSession.builder.getOrCreate()
 -    conf = spark.sparkContext.getConf()
 -    print(exclude_phrases)
 -    tfidf_weekly = spark.read.parquet(infile)
 -
 -    # create the time interval
 -    if from_date is not None:
 -        if type(from_date) is str:
 -            from_date = datetime.fromisoformat(from_date)
 -
 -        tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
 -        
 -    if to_date is not None:
 -        if type(to_date) is str:
 -            to_date = datetime.fromisoformat(to_date)
 -        tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
 -
 -    tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
 -    tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
 -    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
 -    tfidf = spark.read_parquet(tempdir.name)
 -    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
 -    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
 -    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
 -    return(tempdir, subreddit_names)
 -
 -def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
 -    spark = SparkSession.builder.getOrCreate()
 -    conf = spark.sparkContext.getConf()
 -    print(exclude_phrases)
 -
 -    tfidf = spark.read.parquet(infile)
 +# subreddits missing after this step don't have any terms that have a high enough idf
 +# try rewriting without merges
 +def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
 +    print("loading tfidf", flush=True)
 +    tfidf_ds = ds.dataset(infile)
  
      if included_subreddits is None:
          included_subreddits = select_topN_subreddits(topN)
      else:
-         included_subreddits = set(open(included_subreddits))
+         included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
  
 -    if exclude_phrases == True:
 -        tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
 +    ds_filter = ds.field("subreddit").isin(included_subreddits)
 +
 +    if min_df is not None:
 +        ds_filter &= ds.field("count") >= min_df
 +
 +    if max_df is not None:
 +        ds_filter &= ds.field("count") <= max_df
 +
 +    if week is not None:
 +        ds_filter &= ds.field("week") == week
 +
 +    if from_date is not None:
 +        ds_filter &= ds.field("week") >= from_date
  
 -    print("creating temporary parquet with matrix indicies")
 -    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
 +    if to_date is not None:
 +        ds_filter &= ds.field("week") <= to_date
  
 -    tfidf = spark.read.parquet(tempdir.name)
 -    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
 +    term = term_colname
 +    term_id = term + '_id'
 +    term_id_new = term + '_id_new'
 +    
 +    projection = {
 +        'subreddit_id':ds.field('subreddit_id'),
 +        term_id:ds.field(term_id),
 +        'relative_tf':ds.field("relative_tf").cast('float32')
 +        }
 +
 +    if not rescale_idf:
 +        projection = {
 +            'subreddit_id':ds.field('subreddit_id'),
 +            term_id:ds.field(term_id),
 +            'relative_tf':ds.field('relative_tf').cast('float32'),
 +            'tf_idf':ds.field('tf_idf').cast('float32')}
 +
 +    tfidf_ds = ds.dataset(infile)
 +
 +    df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
 +
 +    df = df.to_pandas(split_blocks=True,self_destruct=True)
 +    print("assigning indexes",flush=True)
 +    df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
 +    grouped = df.groupby(term_id)
 +    df[term_id_new] = grouped.ngroup()
 +
 +    if rescale_idf:
 +        print("computing idf", flush=True)
 +        df['new_count'] = grouped[term_id].transform('count')
 +        N_docs = df.subreddit_id_new.max() + 1
 +        df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
 +        if tf_family == tf_weight.MaxTF:
 +            df["tf_idf"] = df.relative_tf * df.idf
 +        else: # tf_fam = tf_weight.Norm05
 +            df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
 +
 +    print("assigning names")
 +    subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
 +    batches = subreddit_names.to_batches()
 +
 +    with Pool(cpu_count()) as pool:
 +        chunks = pool.imap_unordered(pull_names,batches) 
 +        subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
 +
 +    subreddit_names = subreddit_names.set_index("subreddit_id")
 +    new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
 +    new_ids = new_ids.set_index('subreddit_id')
 +    subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
 +    subreddit_names = subreddit_names.drop("subreddit_id",1)
      subreddit_names = subreddit_names.sort_values("subreddit_id_new")
 -    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
 -    spark.stop()
 -    return (tempdir, subreddit_names)
 +    return(df, subreddit_names)
  
 +def pull_names(batch):
 +    return(batch.to_pandas().drop_duplicates())
  
 -def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
 +def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
      '''
      tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
      '''
 -    if from_date is not None or to_date is not None:
 -        tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
 -        
 -    else:
 -        tempdir, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
 -
 -    print("loading matrix")
 -    #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
 -    mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)
 -    print(f'computing similarities on mat. mat.shape:{mat.shape}')
 -    print(f"size of mat is:{mat.data.nbytes}")
 -    sims = simfunc(mat)
 -    del mat
  
 -    if issparse(sims):
 -        sims = sims.todense()
 +    def proc_sims(sims, outfile):
 +        if issparse(sims):
 +            sims = sims.todense()
  
 -    print(f"shape of sims:{sims.shape}")
 -    print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
 -    sims = pd.DataFrame(sims)
 -    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
 -    sims['subreddit'] = subreddit_names.subreddit.values
 +        print(f"shape of sims:{sims.shape}")
 +        print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
 +        sims = pd.DataFrame(sims)
 +        sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
 +        sims['_subreddit'] = subreddit_names.subreddit.values
  
 -    p = Path(outfile)
 +        p = Path(outfile)
  
 -    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
 -    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
 -    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
 +        output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
 +        output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
 +        output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
 +        outfile.parent.mkdir(exist_ok=True, parents=True)
  
 -    sims.to_feather(outfile)
 -    tempdir.cleanup()
 +        sims.to_feather(outfile)
  
 -def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
      term = term_colname
      term_id = term + '_id'
      term_id_new = term + '_id_new'
  
 -    dataset = ds.dataset(path,format='parquet')
 -    entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
 -    return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
 +    entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
 +    mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
  
 -def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
 -    term = term_colname
 -    term_id = term + '_id'
 -    term_id_new = term + '_id_new'
 -    dataset = ds.dataset(path,format='parquet')
 -    print(f"tfidf_colname:{tfidf_colname}")
 -    entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
 -    return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
 -    
 +    print("loading matrix")        
 +
 +    #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
 +
 +    print(f'computing similarities on mat. mat.shape:{mat.shape}')
 +    print(f"size of mat is:{mat.data.nbytes}",flush=True)
 +    sims = simfunc(mat)
 +    del mat
 +
 +    if hasattr(sims,'__next__'):
 +        for simmat, name in sims:
 +            proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
 +    else:
 +        proc_sims(simmat, outfile)
  
  def write_weekly_similarities(path, sims, week, names):
      sims['week'] = week
      p = pathlib.Path(path)
      if not p.is_dir():
 -        p.mkdir()
 +        p.mkdir(exist_ok=True,parents=True)
          
      # reformat as a pairwise list
 -    sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
 +    sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
      sims.to_parquet(p / week.isoformat())
  
  def column_overlaps(mat):
  
      return intersection / den
      
 -def column_similarities(mat):
 -    norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
 -    mat = mat.multiply(1/norm)
 -    sims = mat.T @ mat
 -    return(sims)
 -
 -
 -def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
 -    term = term_colname
 -    term_id = term + '_id'
 -    term_id_new = term + '_id_new'
 -
 -    if min_df is None:
 -        min_df = 0.1 * len(included_subreddits)
 -        tfidf = tfidf.filter(f.col('count') >= min_df)
 -    if max_df is not None:
 -        tfidf = tfidf.filter(f.col('count') <= max_df)
 -
 -    tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
 -
 -    # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
 -    sub_ids = tfidf.select(['subreddit_id','week']).distinct()
 -    sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
 -    tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
 -
 -    # only use terms in at least min_df included subreddits in a given week
 -    new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
 -    tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
 -
 -    # reset the term ids
 -    term_ids = tfidf.select([term_id,'week']).distinct()
 -    term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
 -    tfidf = tfidf.join(term_ids,[term_id,'week'])
 -
 -    tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
 -    tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
 -
 -    tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
 -
 -    tfidf = tfidf.repartition('week')
 -
 -    tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
 -    return(tempdir)
 -    
 -
 -def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
 -    term = term_colname
 +def test_lsi_sims():
 +    term = "term"
      term_id = term + '_id'
      term_id_new = term + '_id_new'
  
 -    if min_df is None:
 -        min_df = 0.1 * len(included_subreddits)
 -        tfidf = tfidf.filter(f.col('count') >= min_df)
 -    if max_df is not None:
 -        tfidf = tfidf.filter(f.col('count') <= max_df)
 -
 -    tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
 -
 -    # reset the subreddit ids
 -    sub_ids = tfidf.select('subreddit_id').distinct()
 -    sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
 -    tfidf = tfidf.join(sub_ids,'subreddit_id')
 -
 -    # only use terms in at least min_df included subreddits
 -    new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
 -    tfidf = tfidf.join(new_count,term_id,how='inner')
 -    
 -    # reset the term ids
 -    term_ids = tfidf.select([term_id]).distinct()
 -    term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
 -    tfidf = tfidf.join(term_ids,term_id)
 -
 -    tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
 -    tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
 +    t1 = time.perf_counter()
 +    entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
 +                                             term_colname='term',
 +                                             min_df=2000,
 +                                             topN=10000
 +                                             )
 +    t2 = time.perf_counter()
 +    print(f"first load took:{t2 - t1}s")
 +
 +    entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
 +                                             term_colname='term',
 +                                             min_df=2000,
 +                                             topN=10000
 +                                             )
 +    t3=time.perf_counter()
 +
 +    print(f"second load took:{t3 - t2}s")
 +
 +    mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
 +    sims = list(lsi_column_similarities(mat, [10,50]))
 +    sims_og = sims
 +    sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
 +
 +# n_components is the latent dimensionality. sklearn recommends 100. More might be better
 +# if n_components is a list we'll return a list of similarities with different latent dimensionalities
 +# if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
 +# this function takes the svd and then the column similarities of it
 +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
 +    # first compute the lsi of the matrix
 +    # then take the column similarities
 +    print("running LSI",flush=True)
 +
 +    if type(n_components) is int:
 +        n_components = [n_components]
 +
 +    n_components = sorted(n_components,reverse=True)
      
 -    tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
 +    svd_components = n_components[0]
 +    svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
 +    mod = svd.fit(tfidfmat.T)
 +    lsimat = mod.transform(tfidfmat.T)
 +    for n_dims in n_components:
 +        sims = column_similarities(lsimat[:,np.arange(n_dims)])
 +        if len(n_components) > 1:
 +            yield (sims, n_dims)
 +        else:
 +            return sims
      
 -    tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
 -    return tempdir
 -
 -
 -# try computing cosine similarities using spark
 -def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
 -    term = term_colname
 -    term_id = term + '_id'
 -    term_id_new = term + '_id_new'
  
 -    if min_df is None:
 -        min_df = 0.1 * len(included_subreddits)
 -
 -    tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
 -    tfidf = tfidf.cache()
 -
 -    # reset the subreddit ids
 -    sub_ids = tfidf.select('subreddit_id').distinct()
 -    sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
 -    tfidf = tfidf.join(sub_ids,'subreddit_id')
 -
 -    # only use terms in at least min_df included subreddits
 -    new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
 -    tfidf = tfidf.join(new_count,term_id,how='inner')
 -    
 -    # reset the term ids
 -    term_ids = tfidf.select([term_id]).distinct()
 -    term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
 -    tfidf = tfidf.join(term_ids,term_id)
 -
 -    tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
 -    tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
 -
 -    # step 1 make an rdd of entires
 -    # sorted by (dense) spark subreddit id
 -    n_partitions = int(len(included_subreddits)*2 / 5)
 -
 -    entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
 -
 -    # put like 10 subredis in each partition
 -
 -    # step 2 make it into a distributed.RowMatrix
 -    coordMat = CoordinateMatrix(entries)
 -
 -    coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
 -
 -    # this needs to be an IndexedRowMatrix()
 -    mat = coordMat.toRowMatrix()
 -
 -    #goal: build a matrix of subreddit columns and tf-idfs rows
 -    sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
 -
 -    return (sim_dist, tfidf)
 +def column_similarities(mat):
 +    return 1 - pairwise_distances(mat,metric='cosine')
  
  
  def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
      else: # tf_fam = tf_weight.Norm05
          df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
  
 -    return df
 +    df = df.repartition(400,'subreddit','week')
 +    dfwriter = df.write.partitionBy("week")
 +    return dfwriter
  
  def _calc_tfidf(df, term_colname, tf_family):
      term = term_colname
  
      df = df.join(max_subreddit_terms, on='subreddit')
  
 -    df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
 +    df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
  
      # group by term. term is unique
      idf = df.groupby([term]).count()
@@@ -335,28 -385,10 +335,28 @@@ def build_tfidf_dataset(df, include_sub
      df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
  
      df = _calc_tfidf(df, term_colname, tf_family)
 -
 -    return df
 +    df = df.repartition('subreddit')
 +    dfwriter = df.write
 +    return dfwriter
  
  def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
      rankdf = pd.read_csv(path)
      included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
      return included_subreddits
 +
 +
 +def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
 +                      outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
 +    spark = SparkSession.builder.getOrCreate()
 +    df = spark.read.parquet(inpath)
 +    df = df.repartition(400,'subreddit')
 +    df.write.parquet(outpath,mode='overwrite')
 +
 +    
 +def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
 +                      outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
 +    spark = SparkSession.builder.getOrCreate()
 +    df = spark.read.parquet(inpath)
 +    df = df.repartition(400,'subreddit','week')
 +    dfwriter = df.write.partitionBy("week")
 +    dfwriter.parquet(outpath,mode='overwrite')
diff --combined similarities/tfidf.py
index 002e89f785b37fd9df3c903775ab6f71846909d4,7f579faabb7092dacfb210f913f7b243d4c79337..110536eeb22b5c13132ff17b33d882fc47da63b7
@@@ -11,13 -11,14 +11,13 @@@ def _tfidf_wrapper(func, inpath, outpat
      df = df.filter(~ f.col(term_colname).isin(exclude))
  
      if included_subreddits is not None:
-         include_subs = list(open(included_subreddits))
+         include_subs = set(map(str.strip,map(str.lower, open(included_subreddits))))
      else:
          include_subs = select_topN_subreddits(topN)
  
 -    df = func(df, include_subs, term_colname)
 -
 -    df.write.parquet(outpath,mode='overwrite',compression='snappy')
 +    dfwriter = func(df, include_subs, term_colname)
  
 +    dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
      spark.stop()
  
  def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
@@@ -27,40 -28,44 +27,44 @@@ def tfidf_weekly(inpath, outpath, topN
      return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
  
  def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
-                   topN=25000):
+                   topN=25000,
+                   included_subreddits=None):
  
      return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
                   outpath,
                   topN,
                   'author',
                   ['[deleted]','AutoModerator'],
-                  included_subreddits=None
+                  included_subreddits=included_subreddits
                   )
  
  def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
-                 topN=25000):
+                 topN=25000,
+                 included_subreddits=None):
  
      return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
                   outpath,
                   topN,
                   'term',
                   [],
-                  included_subreddits=None
+                  included_subreddits=included_subreddits
                   )
  
  def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
-                          topN=25000):
+                          topN=25000,
+                          included_subreddits=None):
  
      return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
                          outpath,
                          topN,
                          'author',
                          ['[deleted]','AutoModerator'],
-                         included_subreddits=None
+                         included_subreddits=included_subreddits
                          )
  
  def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
-                        topN=25000):
+                        topN=25000,
+                        included_subreddits=None):
  
  
      return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
@@@ -68,7 -73,7 +72,7 @@@
                          topN,
                          'term',
                          [],
-                         included_subreddits=None
+                         included_subreddits=included_subreddits
                          )
  
  

Community Data Science Collective || Want to submit a patch?