+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-
-spark = SparkSession.builder.getOrCreate()
-conf = spark.sparkContext.getConf()
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
-    '''
-    Compute similarities between subreddits based on tfi-idf vectors of author comments
-    
-    included_subreddits : string
-        Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
-    similarity_threshold : double (default = 0)
-        set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
-    min_df : int (default = 0.1 * (number of included_subreddits)
-         exclude terms that appear in fewer than this number of documents.
-
-    outfile: string
-         where to output csv and feather outputs
-'''
-
-    spark = SparkSession.builder.getOrCreate()
-    conf = spark.sparkContext.getConf()
-    print(outfile)
-
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
-    if included_subreddits is None:
-        rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-        included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
-    else:
-        included_subreddits = set(open(included_subreddits))
-
-    print("creating temporary parquet with matrix indicies")
-    tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
-    tfidf = spark.read.parquet(tempdir.name)
-    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
-    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-    spark.stop()
-
-    print("loading matrix")
-    mat = read_tfidf_matrix(tempdir.name,'author')
-    print('computing similarities')
-    sims = column_similarities(mat)
-    del mat
-    
-    sims = pd.DataFrame(sims.todense())
-    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
-    sims['subreddit'] = subreddit_names.subreddit.values
-
-    p = Path(outfile)
-
-    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-    sims.to_feather(outfile)
-    tempdir.cleanup()
-
-    # print(outfile)
-
-    # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
-    # if included_subreddits is None:
-    #     included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
-    #     included_subreddits = {s.strip('\n') for s in included_subreddits}
-
-    # else:
-    #     included_subreddits = set(open(included_subreddits))
-
-    # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold)
-
-    # p = Path(outfile)
-
-    # output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    # output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    # output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-    # sim_dist = sim_dist.entries.toDF()
-
-    # sim_dist = sim_dist.repartition(1)
-    # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-    
-
-
-    # #instead of toLocalMatrix() why not read as entries and put strait into numpy
-    # sim_entries = pd.read_parquet(output_parquet)
-
-    # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-
-    # spark.stop()
-    # df['subreddit_id_new'] = df['subreddit_id_new'] - 1
-    # df = df.sort_values('subreddit_id_new').reset_index(drop=True)
-    # df = df.set_index('subreddit_id_new')
-
-    # similarities = sim_entries.join(df, on='i')
-    # similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
-    # similarities = similarities.join(df, on='j')
-    # similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
-    # similarities.to_feather(output_feather)
-    # similarities.to_csv(output_csv)
-    # return similarities
-    
-if __name__ == '__main__':
-    fire.Fire(author_cosine_similarities)
 
 
     preference = np.quantile(mat,preference_quantile)
 
+    print("data loaded")
+
     clustering = AffinityPropagation(damping=damping,
                                      max_iter=max_iter,
                                      convergence_iter=convergence_iter,
 
--- /dev/null
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
+stop-all.sh
 
--- /dev/null
+#!/bin/bash
+## parallel_sql_job.sh
+#SBATCH --job-name=tf_subreddit_comments
+## Allocation Definition
+#SBATCH --account=comdata-ckpt
+#SBATCH --partition=ckpt
+## Resources
+## Nodes. This should always be 1 for parallel-sql.
+#SBATCH --nodes=1    
+## Walltime (12 hours)
+#SBATCH --time=12:00:00
+## Memory per node
+#SBATCH --mem=32G
+#SBATCH --cpus-per-task=4
+#SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
+module load parallel_sql
+echo $(which perl)
+conda list pyarrow
+which python3
+#Put here commands to load other modules (e.g. matlab etc.)
+#Below command means that parallel_sql will get tasks from the database
+#and run them on the node (in parallel). So a 16 core node will have
+#16 tasks running at one time.
+parallel-sql --sql -a parallel --exit-on-term --jobs 4
 
 import fire
 from collections import Counter
 import os
-import datetime
 import re
 from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
 from nltk.corpus import stopwords
     ngram_output = partition.replace("parquet","txt")
 
     if mwe_pass == 'first':
-        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+        if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
     if mwe_pass != 'first':
-        mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
+        mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
         mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
         mwe_phrases = list(mwe_dataset.phrase)
         mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
                 new_sentence.append(new_token)
         return new_sentence
 
-
     stopWords = set(stopwords.words('english'))
 
     # we follow the approach described in datta, phelan, adar 2017
             for sentence in sentences:
                 if random() <= 0.1:
                     grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
-                    with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
+                    with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
                         for ng in grams:
                             gram_file.write(' '.join(ng) + '\n')
                 for token in sentence:
 
     outchunksize = 10000
 
-    with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+    with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
     
         while True:
 
 
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
 
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
 
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+#tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/subreddit_terms.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    tfidf = spark.read.parquet(tfidf_path)
+    
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+    tfidf = spark.read.parquet(tempdir.name)
+
+    # the ids can change each week.
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    weeks = list(subreddit_names.week.drop_duplicates())
+    for week in weeks:
+        print("loading matrix")
+        mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+        print('computing similarities')
+        sims = column_similarities(mat)
+        del mat
+
+        names = subreddit_names.loc[subreddit_names.week==week]
+
+        sims = sims.rename({i:sr for i, sr in enumerate(names.subreddit.values)},axis=1)
+        sims['subreddit'] = names.subreddit.values
+        write_weekly_similarities(outfile, sims, week)
+
+
+
+def cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500):
+    '''
+    Compute similarities between subreddits based on tfi-idf vectors of author comments
+    
+    included_subreddits : string
+        Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
+
+    min_df : int (default = 0.1 * (number of included_subreddits)
+         exclude terms that appear in fewer than this number of documents.
+
+    outfile: string
+         where to output csv and feather outputs
+'''
+
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+
+    tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet')
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name,'author')
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+    
+if __name__ == '__main__':
+    fire.Fire(author_cosine_similarities)
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, column_similarities, select_topN
+import scipy
+
+# outfile='test_similarities_500.feather';
+# min_df = None;
+# included_subreddits=None; topN=100; exclude_phrases=True;
+def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_terms.parquet')
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name,'term')
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+    
+if __name__ == '__main__':
+    fire.Fire(term_cosine_similarities)
 
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
 
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', mode='overwrite', compression='snappy')
+
+spark.stop()
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_tfidf_dataset
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+df = build_tfidf_dataset(df, include_subs, 'term')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/reddit_similarity/subreddit_terms.parquet',mode='overwrite',compression='snappy')
+spark.stop()
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet(infile)
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name, term_colname)
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                               'term',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                               'author',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases=False)
+
+if __name__ == "__main__":
+    fire.Fire({'term':term_cosine_similarities,
+               'author':author_cosine_similarities})
+
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+    
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
 
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
 
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
 
--- /dev/null
+/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
+       start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet(infile)
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name, term_colname)
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                               'term',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                               'author',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases=False)
+
+if __name__ == "__main__":
+    fire.Fire({'term':term_cosine_similarities,
+               'author':author_cosine_similarities})
+
 
--- /dev/null
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
+stop-all.sh
 
 from scipy.sparse import csr_matrix
 import pandas as pd
 import numpy as np
+import pathlib
 
 class tf_weight(Enum):
     MaxTF = 1
     Norm05 = 2
 
+def read_tfidf_matrix_weekly(path, term_colname, week):
+    term = term_colname
+    term_id = term + '_id'
+    term_id_new = term + '_id_new'
+
+    dataset = ds.dataset(path,format='parquet')
+    entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new],filter=ds.field('week')==week).to_pandas()
+    return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
+
+def write_weekly_similarities(path, sims, week, names):
+    sims['week'] = week
+    p = pathlib.Path(path)
+    if not p.is_dir():
+        p.mkdir()
+        
+    # reformat as a pairwise list
+    sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
+    sims.to_parquet(p / week.isoformat())
+
+
+
 def read_tfidf_matrix(path,term_colname):
     term = term_colname
     term_id = term + '_id'
     return(sims)
 
 
+def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
+    term = term_colname
+    term_id = term + '_id'
+    term_id_new = term + '_id_new'
+
+    if min_df is None:
+        min_df = 0.1 * len(included_subreddits)
+
+    tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
+
+    # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
+    sub_ids = tfidf.select(['subreddit_id','week']).distinct()
+    sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
+    tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
+
+    # only use terms in at least min_df included subreddits in a given week
+    new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
+    tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
+
+    # reset the term ids
+    term_ids = tfidf.select([term_id,'week']).distinct()
+    term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
+    tfidf = tfidf.join(term_ids,[term_id,'week'])
+
+    tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
+    tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
+
+    tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
+
+    tfidf = tfidf.repartition('week')
+
+    tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
+    return(tempdir)
+    
+
 def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
     term = term_colname
     term_id = term + '_id'
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-#    new_count = new_count.filter(f.col('new_count') >= min_df)
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-    # tfidf = tfidf.withColumnRenamed("idf","idf_old")
-    # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
     
     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
     return tempdir
 
-def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
+
+# try computing cosine similarities using spark
+def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
     term = term_colname
     term_id = term + '_id'
     term_id_new = term + '_id_new'
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-#    new_count = new_count.filter(f.col('new_count') >= min_df)
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-    # tfidf = tfidf.withColumnRenamed("idf","idf_old")
-    # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
 
     # step 1 make an rdd of entires
     # sorted by (dense) spark subreddit id
-    #    entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd
- 
     n_partitions = int(len(included_subreddits)*2 / 5)
 
     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
     df = df.join(idf, on=[term_id, term])
 
     # agg terms by subreddit to make sparse tf/df vectors
-    
     if tf_family == tf_weight.MaxTF:
         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
     else: # tf_fam = tf_weight.Norm05
 
     return df
 
-
+def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"):
+    rankdf = pd.read_csv(path)
+    included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
+    return included_subreddits
 
--- /dev/null
+import fire
+from pyspark.sql import SparkSession
+from pyspark.sql import functions as f
+from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
+
+
+def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude):
+    spark = SparkSession.builder.getOrCreate()
+
+    df = spark.read.parquet(inpath)
+
+    df = df.filter(~ f.col(term_colname).isin(exclude))
+
+    include_subs = select_topN_subreddits(topN)
+
+    df = func(df, include_subs, term_colname)
+
+    df.write.parquet(outpath,mode='overwrite',compression='snappy')
+
+    spark.stop()
+
+def tfidf(inpath, outpath, topN, term_colname, exclude):
+    return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+    return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                  topN=25000):
+
+    return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+                 outpath,
+                 topN,
+                 'author',
+                 ['[deleted]','AutoModerator']
+                 )
+
+def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                topN=25000):
+
+    return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+                 outpath,
+                 topN,
+                 'term',
+                 []
+                 )
+
+def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                  topN=25000):
+
+    return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+                 outpath,
+                 topN,
+                 'author',
+                 ['[deleted]','AutoModerator']
+                 )
+
+def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                topN=25000):
+
+    return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+                 outpath,
+                 topN,
+                 'term',
+                 []
+                 )
+
+
+if __name__ == "__main__":
+    fire.Fire({'authors':tfidf_authors,
+               'terms':tfidf_terms,
+               'authors_weekly':tfidf_authors_weekly,
+               'terms_weekly':tfidf_terms_weekly})
 
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
 from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
 df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
 
 win = Window.orderBy(f.col('n_comments').desc())
-df = df.withColumn('comments_rank',f.rank().over(win))
+df = df.withColumn('comments_rank', f.rank().over(win))
 
 df = df.toPandas()
 
 df = df.sort_values("n_comments")
 
-df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)
+df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)
 
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+
+#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    tfidf = spark.read.parquet(tfidf_path)
+    
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+    tfidf = spark.read.parquet(tempdir.name)
+
+    # the ids can change each week.
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    weeks = list(subreddit_names.week.drop_duplicates())
+    for week in weeks:
+        print(f"loading matrix: {week}")
+        mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+        print('computing similarities')
+        sims = column_similarities(mat)
+        del mat
+
+        names = subreddit_names.loc[subreddit_names.week == week]
+        sims = pd.DataFrame(sims.todense())
+
+        sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
+        sims['subreddit'] = names.subreddit.values
+
+        write_weekly_similarities(outfile, sims, week, names)
+
+
+def author_cosine_similarities_weekly(outfile, min_df=None , included_subreddits=None, topN=500):
+    return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
+                                      outfile,
+                                      'author',
+                                      min_df,
+                                      included_subreddits,
+                                      topN)
+
+def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
+    return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
+                                      outfile,
+                                      'term',
+                                      min_df,
+                                      included_subreddits,
+                                      topN)
+
+if __name__ == "__main__":
+    fire.Fire({'author':author_cosine_similarities_weekly,
+               'term':term_cosine_similarities_weekly})
 
+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-import scipy
-# outfile='test_similarities_500.feather';
-# min_df = None;
-# included_subreddits=None; topN=100; exclude_phrases=True;
-
-def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
-    spark = SparkSession.builder.getOrCreate()
-    conf = spark.sparkContext.getConf()
-    print(outfile)
-    print(exclude_phrases)
-
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
-    if included_subreddits is None:
-        rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-        included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
-    else:
-        included_subreddits = set(open(included_subreddits))
-
-    if exclude_phrases == True:
-        tfidf = tfidf.filter(~f.col(term).contains("_"))
-
-    print("creating temporary parquet with matrix indicies")
-    tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
-    tfidf = spark.read.parquet(tempdir.name)
-    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
-    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-    spark.stop()
-
-    print("loading matrix")
-    mat = read_tfidf_matrix(tempdir.name,'term')
-    print('computing similarities')
-    sims = column_similarities(mat)
-    del mat
-    
-    sims = pd.DataFrame(sims.todense())
-    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
-    sims['subreddit'] = subreddit_names.subreddit.values
-
-    p = Path(outfile)
-
-    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-    sims.to_feather(outfile)
-    tempdir.cleanup()
-    path = "term_tfidf_entriesaukjy5gv.parquet"
-    
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
-#     '''
-#     Compute similarities between subreddits based on tfi-idf vectors of comment texts 
-    
-#     included_subreddits : string
-#         Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
-#     similarity_threshold : double (default = 0)
-#         set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
-#     min_df : int (default = 0.1 * (number of included_subreddits)
-#          exclude terms that appear in fewer than this number of documents.
-
-#     outfile: string
-#          where to output csv and feather outputs
-# '''
-
-#     print(outfile)
-#     print(exclude_phrases)
-
-#     tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
-#     if included_subreddits is None:
-#         included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
-#         included_subreddits = {s.strip('\n') for s in included_subreddits}
-
-#     else:
-#         included_subreddits = set(open(included_subreddits))
-
-#     if exclude_phrases == True:
-#         tfidf = tfidf.filter(~f.col(term).contains("_"))
-
-#     sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
-
-#     p = Path(outfile)
-
-#     output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-#     output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-#     output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-#     sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-    
-#     #instead of toLocalMatrix() why not read as entries and put strait into numpy
-#     sim_entries = pd.read_parquet(output_parquet)
-
-#     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-#     spark.stop()
-#     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
-#     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
-#     df = df.set_index('subreddit_id_new')
-
-#     similarities = sim_entries.join(df, on='i')
-#     similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
-#     similarities = similarities.join(df, on='j')
-#     similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
-#     similarities.to_feather(output_feather)
-#     similarities.to_csv(output_csv)
-#     return similarities
-    
-if __name__ == '__main__':
-    fire.Fire(term_cosine_similarities)
 
+++ /dev/null
-from pyspark.sql import SparkSession
-from similarities_helper import build_tfidf_dataset
-import pandas as pd
-
-spark = SparkSession.builder.getOrCreate()
-
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
-
-include_subs = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-
-#include_subs = set(include_subs.loc[include_subs.comments_rank < 300]['subreddit'])
-
-# remove [deleted] and AutoModerator (TODO remove other bots)
-df = df.filter(df.author != '[deleted]')
-df = df.filter(df.author != 'AutoModerator')
-
-df = build_tfidf_dataset(df, include_subs, 'author')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
-
-spark.stop()
 
+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from similarities_helper import build_tfidf_dataset
-
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks. 
-
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp")
-
-include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
-include_subs = {s.strip('\n') for s in include_subs}
-
-df = build_tfidf_dataset(df, include_subs, 'term')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
-spark.stop()