+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-
-spark = SparkSession.builder.getOrCreate()
-conf = spark.sparkContext.getConf()
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
- '''
- Compute similarities between subreddits based on tfi-idf vectors of author comments
-
- included_subreddits : string
- Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
- similarity_threshold : double (default = 0)
- set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
- min_df : int (default = 0.1 * (number of included_subreddits)
- exclude terms that appear in fewer than this number of documents.
-
- outfile: string
- where to output csv and feather outputs
-'''
-
- spark = SparkSession.builder.getOrCreate()
- conf = spark.sparkContext.getConf()
- print(outfile)
-
- tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
- if included_subreddits is None:
- rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
- included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
- else:
- included_subreddits = set(open(included_subreddits))
-
- print("creating temporary parquet with matrix indicies")
- tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
- tfidf = spark.read.parquet(tempdir.name)
- subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
- subreddit_names = subreddit_names.sort_values("subreddit_id_new")
- subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
- spark.stop()
-
- print("loading matrix")
- mat = read_tfidf_matrix(tempdir.name,'author')
- print('computing similarities')
- sims = column_similarities(mat)
- del mat
-
- sims = pd.DataFrame(sims.todense())
- sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
- sims['subreddit'] = subreddit_names.subreddit.values
-
- p = Path(outfile)
-
- output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
- output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
- output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
- sims.to_feather(outfile)
- tempdir.cleanup()
-
- # print(outfile)
-
- # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
- # if included_subreddits is None:
- # included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
- # included_subreddits = {s.strip('\n') for s in included_subreddits}
-
- # else:
- # included_subreddits = set(open(included_subreddits))
-
- # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold)
-
- # p = Path(outfile)
-
- # output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
- # output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
- # output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
- # sim_dist = sim_dist.entries.toDF()
-
- # sim_dist = sim_dist.repartition(1)
- # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-
-
-
- # #instead of toLocalMatrix() why not read as entries and put strait into numpy
- # sim_entries = pd.read_parquet(output_parquet)
-
- # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-
- # spark.stop()
- # df['subreddit_id_new'] = df['subreddit_id_new'] - 1
- # df = df.sort_values('subreddit_id_new').reset_index(drop=True)
- # df = df.set_index('subreddit_id_new')
-
- # similarities = sim_entries.join(df, on='i')
- # similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
- # similarities = similarities.join(df, on='j')
- # similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
- # similarities.to_feather(output_feather)
- # similarities.to_csv(output_csv)
- # return similarities
-
-if __name__ == '__main__':
- fire.Fire(author_cosine_similarities)
preference = np.quantile(mat,preference_quantile)
+ print("data loaded")
+
clustering = AffinityPropagation(damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
--- /dev/null
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
+stop-all.sh
--- /dev/null
+#!/bin/bash
+## parallel_sql_job.sh
+#SBATCH --job-name=tf_subreddit_comments
+## Allocation Definition
+#SBATCH --account=comdata-ckpt
+#SBATCH --partition=ckpt
+## Resources
+## Nodes. This should always be 1 for parallel-sql.
+#SBATCH --nodes=1
+## Walltime (12 hours)
+#SBATCH --time=12:00:00
+## Memory per node
+#SBATCH --mem=32G
+#SBATCH --cpus-per-task=4
+#SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
+module load parallel_sql
+echo $(which perl)
+conda list pyarrow
+which python3
+#Put here commands to load other modules (e.g. matlab etc.)
+#Below command means that parallel_sql will get tasks from the database
+#and run them on the node (in parallel). So a 16 core node will have
+#16 tasks running at one time.
+parallel-sql --sql -a parallel --exit-on-term --jobs 4
import fire
from collections import Counter
import os
-import datetime
import re
from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
from nltk.corpus import stopwords
ngram_output = partition.replace("parquet","txt")
if mwe_pass == 'first':
- if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
- os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+ if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
+ os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
if mwe_pass != 'first':
- mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
+ mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
new_sentence.append(new_token)
return new_sentence
-
stopWords = set(stopwords.words('english'))
# we follow the approach described in datta, phelan, adar 2017
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
- with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
+ with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
outchunksize = 10000
- with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+ with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
while True:
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks.
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+#tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/subreddit_terms.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+ tfidf = spark.read.parquet(tfidf_path)
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+ tfidf = spark.read.parquet(tempdir.name)
+
+ # the ids can change each week.
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ weeks = list(subreddit_names.week.drop_duplicates())
+ for week in weeks:
+ print("loading matrix")
+ mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ names = subreddit_names.loc[subreddit_names.week==week]
+
+ sims = sims.rename({i:sr for i, sr in enumerate(names.subreddit.values)},axis=1)
+ sims['subreddit'] = names.subreddit.values
+ write_weekly_similarities(outfile, sims, week)
+
+
+
+def cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500):
+ '''
+ Compute similarities between subreddits based on tfi-idf vectors of author comments
+
+ included_subreddits : string
+ Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
+
+ min_df : int (default = 0.1 * (number of included_subreddits)
+ exclude terms that appear in fewer than this number of documents.
+
+ outfile: string
+ where to output csv and feather outputs
+'''
+
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+
+ tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet')
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
+ tfidf = spark.read.parquet(tempdir.name)
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ print("loading matrix")
+ mat = read_tfidf_matrix(tempdir.name,'author')
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ sims = pd.DataFrame(sims.todense())
+ sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+ sims['subreddit'] = subreddit_names.subreddit.values
+
+ p = Path(outfile)
+
+ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
+ output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
+ output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+ sims.to_feather(outfile)
+ tempdir.cleanup()
+
+if __name__ == '__main__':
+ fire.Fire(author_cosine_similarities)
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, column_similarities, select_topN
+import scipy
+
+# outfile='test_similarities_500.feather';
+# min_df = None;
+# included_subreddits=None; topN=100; exclude_phrases=True;
+def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+ print(exclude_phrases)
+
+ tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_terms.parquet')
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ if exclude_phrases == True:
+ tfidf = tfidf.filter(~f.col(term).contains("_"))
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
+ tfidf = spark.read.parquet(tempdir.name)
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ print("loading matrix")
+ mat = read_tfidf_matrix(tempdir.name,'term')
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ sims = pd.DataFrame(sims.todense())
+ sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+ sims['subreddit'] = subreddit_names.subreddit.values
+
+ p = Path(outfile)
+
+ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
+ output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
+ output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+ sims.to_feather(outfile)
+ tempdir.cleanup()
+
+if __name__ == '__main__':
+ fire.Fire(term_cosine_similarities)
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
--- /dev/null
+from pyspark.sql import SparkSession
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', mode='overwrite', compression='snappy')
+
+spark.stop()
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_tfidf_dataset
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks.
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+df = build_tfidf_dataset(df, include_subs, 'term')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/reddit_similarity/subreddit_terms.parquet',mode='overwrite',compression='snappy')
+spark.stop()
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks.
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+ print(exclude_phrases)
+
+ tfidf = spark.read.parquet(infile)
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ if exclude_phrases == True:
+ tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+ tfidf = spark.read.parquet(tempdir.name)
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ print("loading matrix")
+ mat = read_tfidf_matrix(tempdir.name, term_colname)
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ sims = pd.DataFrame(sims.todense())
+ sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+ sims['subreddit'] = subreddit_names.subreddit.values
+
+ p = Path(outfile)
+
+ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
+ output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
+ output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+ sims.to_feather(outfile)
+ tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+ return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+ 'term',
+ outfile,
+ min_df,
+ included_subreddits,
+ topN,
+ exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+ return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+ 'author',
+ outfile,
+ min_df,
+ included_subreddits,
+ topN,
+ exclude_phrases=False)
+
+if __name__ == "__main__":
+ fire.Fire({'term':term_cosine_similarities,
+ 'author':author_cosine_similarities})
+
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
--- /dev/null
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
--- /dev/null
+/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
+ start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+ print(exclude_phrases)
+
+ tfidf = spark.read.parquet(infile)
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ if exclude_phrases == True:
+ tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+ tfidf = spark.read.parquet(tempdir.name)
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ print("loading matrix")
+ mat = read_tfidf_matrix(tempdir.name, term_colname)
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ sims = pd.DataFrame(sims.todense())
+ sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+ sims['subreddit'] = subreddit_names.subreddit.values
+
+ p = Path(outfile)
+
+ output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
+ output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
+ output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+ sims.to_feather(outfile)
+ tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+ return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+ 'term',
+ outfile,
+ min_df,
+ included_subreddits,
+ topN,
+ exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+ return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+ 'author',
+ outfile,
+ min_df,
+ included_subreddits,
+ topN,
+ exclude_phrases=False)
+
+if __name__ == "__main__":
+ fire.Fire({'term':term_cosine_similarities,
+ 'author':author_cosine_similarities})
+
--- /dev/null
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
+stop-all.sh
from scipy.sparse import csr_matrix
import pandas as pd
import numpy as np
+import pathlib
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
+def read_tfidf_matrix_weekly(path, term_colname, week):
+ term = term_colname
+ term_id = term + '_id'
+ term_id_new = term + '_id_new'
+
+ dataset = ds.dataset(path,format='parquet')
+ entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new],filter=ds.field('week')==week).to_pandas()
+ return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
+
+def write_weekly_similarities(path, sims, week, names):
+ sims['week'] = week
+ p = pathlib.Path(path)
+ if not p.is_dir():
+ p.mkdir()
+
+ # reformat as a pairwise list
+ sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
+ sims.to_parquet(p / week.isoformat())
+
+
+
def read_tfidf_matrix(path,term_colname):
term = term_colname
term_id = term + '_id'
return(sims)
+def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
+ term = term_colname
+ term_id = term + '_id'
+ term_id_new = term + '_id_new'
+
+ if min_df is None:
+ min_df = 0.1 * len(included_subreddits)
+
+ tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
+
+ # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
+ sub_ids = tfidf.select(['subreddit_id','week']).distinct()
+ sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
+ tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
+
+ # only use terms in at least min_df included subreddits in a given week
+ new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
+ tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
+
+ # reset the term ids
+ term_ids = tfidf.select([term_id,'week']).distinct()
+ term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
+ tfidf = tfidf.join(term_ids,[term_id,'week'])
+
+ tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
+ tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
+
+ tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
+
+ tfidf = tfidf.repartition('week')
+
+ tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
+ return(tempdir)
+
+
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- # tfidf = tfidf.withColumnRenamed("idf","idf_old")
- # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
-def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
+
+# try computing cosine similarities using spark
+def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- # tfidf = tfidf.withColumnRenamed("idf","idf_old")
- # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
- # entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd
-
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
df = df.join(idf, on=[term_id, term])
# agg terms by subreddit to make sparse tf/df vectors
-
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
return df
-
+def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"):
+ rankdf = pd.read_csv(path)
+ included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
+ return included_subreddits
--- /dev/null
+import fire
+from pyspark.sql import SparkSession
+from pyspark.sql import functions as f
+from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
+
+
+def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude):
+ spark = SparkSession.builder.getOrCreate()
+
+ df = spark.read.parquet(inpath)
+
+ df = df.filter(~ f.col(term_colname).isin(exclude))
+
+ include_subs = select_topN_subreddits(topN)
+
+ df = func(df, include_subs, term_colname)
+
+ df.write.parquet(outpath,mode='overwrite',compression='snappy')
+
+ spark.stop()
+
+def tfidf(inpath, outpath, topN, term_colname, exclude):
+ return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+ return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+ topN=25000):
+
+ return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+ outpath,
+ topN,
+ 'author',
+ ['[deleted]','AutoModerator']
+ )
+
+def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+ topN=25000):
+
+ return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+ outpath,
+ topN,
+ 'term',
+ []
+ )
+
+def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+ topN=25000):
+
+ return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+ outpath,
+ topN,
+ 'author',
+ ['[deleted]','AutoModerator']
+ )
+
+def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+ topN=25000):
+
+ return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+ outpath,
+ topN,
+ 'term',
+ []
+ )
+
+
+if __name__ == "__main__":
+ fire.Fire({'authors':tfidf_authors,
+ 'terms':tfidf_terms,
+ 'authors_weekly':tfidf_authors_weekly,
+ 'terms_weekly':tfidf_terms_weekly})
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
win = Window.orderBy(f.col('n_comments').desc())
-df = df.withColumn('comments_rank',f.rank().over(win))
+df = df.withColumn('comments_rank', f.rank().over(win))
df = df.toPandas()
df = df.sort_values("n_comments")
-df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)
+df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)
--- /dev/null
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+
+#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+ spark = SparkSession.builder.getOrCreate()
+ conf = spark.sparkContext.getConf()
+ print(outfile)
+ tfidf = spark.read.parquet(tfidf_path)
+
+ if included_subreddits is None:
+ included_subreddits = select_topN_subreddits(topN)
+ else:
+ included_subreddits = set(open(included_subreddits))
+
+ print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
+
+ print("creating temporary parquet with matrix indicies")
+ tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+ tfidf = spark.read.parquet(tempdir.name)
+
+ # the ids can change each week.
+ subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+ subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+ subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+ spark.stop()
+
+ weeks = list(subreddit_names.week.drop_duplicates())
+ for week in weeks:
+ print(f"loading matrix: {week}")
+ mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+ print('computing similarities')
+ sims = column_similarities(mat)
+ del mat
+
+ names = subreddit_names.loc[subreddit_names.week == week]
+ sims = pd.DataFrame(sims.todense())
+
+ sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
+ sims['subreddit'] = names.subreddit.values
+
+ write_weekly_similarities(outfile, sims, week, names)
+
+
+def author_cosine_similarities_weekly(outfile, min_df=None , included_subreddits=None, topN=500):
+ return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
+ outfile,
+ 'author',
+ min_df,
+ included_subreddits,
+ topN)
+
+def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
+ return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
+ outfile,
+ 'term',
+ min_df,
+ included_subreddits,
+ topN)
+
+if __name__ == "__main__":
+ fire.Fire({'author':author_cosine_similarities_weekly,
+ 'term':term_cosine_similarities_weekly})
+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-import scipy
-# outfile='test_similarities_500.feather';
-# min_df = None;
-# included_subreddits=None; topN=100; exclude_phrases=True;
-
-def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
- spark = SparkSession.builder.getOrCreate()
- conf = spark.sparkContext.getConf()
- print(outfile)
- print(exclude_phrases)
-
- tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
- if included_subreddits is None:
- rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
- included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
- else:
- included_subreddits = set(open(included_subreddits))
-
- if exclude_phrases == True:
- tfidf = tfidf.filter(~f.col(term).contains("_"))
-
- print("creating temporary parquet with matrix indicies")
- tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
- tfidf = spark.read.parquet(tempdir.name)
- subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
- subreddit_names = subreddit_names.sort_values("subreddit_id_new")
- subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
- spark.stop()
-
- print("loading matrix")
- mat = read_tfidf_matrix(tempdir.name,'term')
- print('computing similarities')
- sims = column_similarities(mat)
- del mat
-
- sims = pd.DataFrame(sims.todense())
- sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
- sims['subreddit'] = subreddit_names.subreddit.values
-
- p = Path(outfile)
-
- output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
- output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
- output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
- sims.to_feather(outfile)
- tempdir.cleanup()
- path = "term_tfidf_entriesaukjy5gv.parquet"
-
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
-# '''
-# Compute similarities between subreddits based on tfi-idf vectors of comment texts
-
-# included_subreddits : string
-# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
-# similarity_threshold : double (default = 0)
-# set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
-# min_df : int (default = 0.1 * (number of included_subreddits)
-# exclude terms that appear in fewer than this number of documents.
-
-# outfile: string
-# where to output csv and feather outputs
-# '''
-
-# print(outfile)
-# print(exclude_phrases)
-
-# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
-# if included_subreddits is None:
-# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
-# included_subreddits = {s.strip('\n') for s in included_subreddits}
-
-# else:
-# included_subreddits = set(open(included_subreddits))
-
-# if exclude_phrases == True:
-# tfidf = tfidf.filter(~f.col(term).contains("_"))
-
-# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
-
-# p = Path(outfile)
-
-# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
-# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
-# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-
-# #instead of toLocalMatrix() why not read as entries and put strait into numpy
-# sim_entries = pd.read_parquet(output_parquet)
-
-# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-# spark.stop()
-# df['subreddit_id_new'] = df['subreddit_id_new'] - 1
-# df = df.sort_values('subreddit_id_new').reset_index(drop=True)
-# df = df.set_index('subreddit_id_new')
-
-# similarities = sim_entries.join(df, on='i')
-# similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
-# similarities = similarities.join(df, on='j')
-# similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
-# similarities.to_feather(output_feather)
-# similarities.to_csv(output_csv)
-# return similarities
-
-if __name__ == '__main__':
- fire.Fire(term_cosine_similarities)
+++ /dev/null
-from pyspark.sql import SparkSession
-from similarities_helper import build_tfidf_dataset
-import pandas as pd
-
-spark = SparkSession.builder.getOrCreate()
-
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
-
-include_subs = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-
-#include_subs = set(include_subs.loc[include_subs.comments_rank < 300]['subreddit'])
-
-# remove [deleted] and AutoModerator (TODO remove other bots)
-df = df.filter(df.author != '[deleted]')
-df = df.filter(df.author != 'AutoModerator')
-
-df = build_tfidf_dataset(df, include_subs, 'author')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
-
-spark.stop()
+++ /dev/null
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from similarities_helper import build_tfidf_dataset
-
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks.
-
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp")
-
-include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
-include_subs = {s.strip('\n') for s in include_subs}
-
-df = build_tfidf_dataset(df, include_subs, 'term')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
-spark.stop()