From e6294b5b90135a5163441c8dc62252dd6a188412 Mon Sep 17 00:00:00 2001 From: Nate E TeBlunthuis Date: Tue, 8 Dec 2020 17:32:20 -0800 Subject: [PATCH] Refactor and reorganze. --- author_cosine_similarity.py | 119 ---------------- clustering.py => clustering/clustering.py | 2 + fit_tsne.py => clustering/fit_tsne.py | 0 .../checkpoint_parallelsql.sbatch | 0 .../comments_2_parquet.sh | 0 .../comments_2_parquet_part1.py | 0 .../comments_2_parquet_part2.py | 0 helper.py => datasets/helper.py | 0 datasets/job_script.sh | 4 + .../submissions_2_parquet.sh | 0 .../submissions_2_parquet_part1.py | 0 .../submissions_2_parquet_part2.py | 0 .../check_comments_shas.py | 0 .../check_submission_shas.py | 0 .../pull_pushshift_comments.sh | 0 .../pull_pushshift_submissions.sh | 0 ngrams/#ngrams_helper.py# | 0 ngrams/checkpoint_parallelsql.sbatch | 26 ++++ run_tf_jobs.sh => ngrams/run_tf_jobs.sh | 0 .../sort_tf_comments.py | 0 tf_comments.py => ngrams/tf_comments.py | 12 +- .../top_comment_phrases.py | 0 old/#tfidf_authors.py# | 21 +++ old/#tfidf_comments_weekly.py# | 27 ++++ old/.#tfidf_authors.py | 1 + old/.#tfidf_comments_weekly.py | 1 + old/author_cosine_similarity.py | 106 +++++++++++++++ old/term_cosine_similarity.py | 61 +++++++++ old/tfidf_authors.py | 21 +++ old/tfidf_authors_weekly.py | 21 +++ old/tfidf_comments.py | 18 +++ old/tfidf_comments_weekly.py | 27 ++++ similarities/#cosine_similarities.py# | 73 ++++++++++ similarities/#tfidf_weekly.py# | 24 ++++ similarities/.#cosine_similarities.py | 1 + similarities/.#tfidf_weekly.py | 1 + similarities/Makefile | 2 + .../similarities_helper.cpython-37.pyc | Bin 0 -> 6845 bytes similarities/cosine_similarities.py | 73 ++++++++++ similarities/job_script.sh | 4 + .../similarities_helper.py | 75 +++++++++-- similarities/tfidf.py | 73 ++++++++++ .../top_subreddits_by_comments.py | 12 +- similarities/weekly_cosine_similarities.py | 73 ++++++++++ term_cosine_similarity.py | 127 ------------------ tfidf_authors.py | 21 --- tfidf_comments.py | 18 --- 47 files changed, 731 insertions(+), 313 deletions(-) delete mode 100644 author_cosine_similarity.py rename clustering.py => clustering/clustering.py (98%) rename fit_tsne.py => clustering/fit_tsne.py (100%) rename checkpoint_parallelsql.sbatch => datasets/checkpoint_parallelsql.sbatch (100%) rename comments_2_parquet.sh => datasets/comments_2_parquet.sh (100%) rename comments_2_parquet_part1.py => datasets/comments_2_parquet_part1.py (100%) rename comments_2_parquet_part2.py => datasets/comments_2_parquet_part2.py (100%) rename helper.py => datasets/helper.py (100%) create mode 100755 datasets/job_script.sh rename submissions_2_parquet.sh => datasets/submissions_2_parquet.sh (100%) rename submissions_2_parquet_part1.py => datasets/submissions_2_parquet_part1.py (100%) rename submissions_2_parquet_part2.py => datasets/submissions_2_parquet_part2.py (100%) rename check_comments_shas.py => dumps/check_comments_shas.py (100%) rename check_submission_shas.py => dumps/check_submission_shas.py (100%) rename pull_pushshift_comments.sh => dumps/pull_pushshift_comments.sh (100%) rename pull_pushshift_submissions.sh => dumps/pull_pushshift_submissions.sh (100%) create mode 100644 ngrams/#ngrams_helper.py# create mode 100644 ngrams/checkpoint_parallelsql.sbatch rename run_tf_jobs.sh => ngrams/run_tf_jobs.sh (100%) rename sort_tf_comments.py => ngrams/sort_tf_comments.py (100%) rename tf_comments.py => ngrams/tf_comments.py (89%) rename top_comment_phrases.py => ngrams/top_comment_phrases.py (100%) create mode 100644 old/#tfidf_authors.py# create mode 100644 old/#tfidf_comments_weekly.py# create mode 120000 old/.#tfidf_authors.py create mode 120000 old/.#tfidf_comments_weekly.py create mode 100644 old/author_cosine_similarity.py create mode 100644 old/term_cosine_similarity.py create mode 100644 old/tfidf_authors.py create mode 100644 old/tfidf_authors_weekly.py create mode 100644 old/tfidf_comments.py create mode 100644 old/tfidf_comments_weekly.py create mode 100644 similarities/#cosine_similarities.py# create mode 100644 similarities/#tfidf_weekly.py# create mode 120000 similarities/.#cosine_similarities.py create mode 120000 similarities/.#tfidf_weekly.py create mode 100644 similarities/Makefile create mode 100644 similarities/__pycache__/similarities_helper.cpython-37.pyc create mode 100644 similarities/cosine_similarities.py create mode 100755 similarities/job_script.sh rename similarities_helper.py => similarities/similarities_helper.py (74%) create mode 100644 similarities/tfidf.py rename top_subreddits_by_comments.py => similarities/top_subreddits_by_comments.py (56%) create mode 100644 similarities/weekly_cosine_similarities.py delete mode 100644 term_cosine_similarity.py delete mode 100644 tfidf_authors.py delete mode 100644 tfidf_comments.py diff --git a/author_cosine_similarity.py b/author_cosine_similarity.py deleted file mode 100644 index 7ae708b..0000000 --- a/author_cosine_similarity.py +++ /dev/null @@ -1,119 +0,0 @@ -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -from pyspark.sql import Window -import numpy as np -import pyarrow -import pandas as pd -import fire -from itertools import islice -from pathlib import Path -from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities - -spark = SparkSession.builder.getOrCreate() -conf = spark.sparkContext.getConf() - -# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; -def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500): - ''' - Compute similarities between subreddits based on tfi-idf vectors of author comments - - included_subreddits : string - Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits - - similarity_threshold : double (default = 0) - set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm -https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm. - - min_df : int (default = 0.1 * (number of included_subreddits) - exclude terms that appear in fewer than this number of documents. - - outfile: string - where to output csv and feather outputs -''' - - spark = SparkSession.builder.getOrCreate() - conf = spark.sparkContext.getConf() - print(outfile) - - tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') - - if included_subreddits is None: - rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") - included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) - - else: - included_subreddits = set(open(included_subreddits)) - - print("creating temporary parquet with matrix indicies") - tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits) - tfidf = spark.read.parquet(tempdir.name) - subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 - spark.stop() - - print("loading matrix") - mat = read_tfidf_matrix(tempdir.name,'author') - print('computing similarities') - sims = column_similarities(mat) - del mat - - sims = pd.DataFrame(sims.todense()) - sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) - sims['subreddit'] = subreddit_names.subreddit.values - - p = Path(outfile) - - output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) - output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) - output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - - sims.to_feather(outfile) - tempdir.cleanup() - - # print(outfile) - - # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') - - # if included_subreddits is None: - # included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) - # included_subreddits = {s.strip('\n') for s in included_subreddits} - - # else: - # included_subreddits = set(open(included_subreddits)) - - # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold) - - # p = Path(outfile) - - # output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) - # output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) - # output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - # sim_dist = sim_dist.entries.toDF() - - # sim_dist = sim_dist.repartition(1) - # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - - - - # #instead of toLocalMatrix() why not read as entries and put strait into numpy - # sim_entries = pd.read_parquet(output_parquet) - - # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() - - # spark.stop() - # df['subreddit_id_new'] = df['subreddit_id_new'] - 1 - # df = df.sort_values('subreddit_id_new').reset_index(drop=True) - # df = df.set_index('subreddit_id_new') - - # similarities = sim_entries.join(df, on='i') - # similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) - # similarities = similarities.join(df, on='j') - # similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) - - # similarities.to_feather(output_feather) - # similarities.to_csv(output_csv) - # return similarities - -if __name__ == '__main__': - fire.Fire(author_cosine_similarities) diff --git a/clustering.py b/clustering/clustering.py similarity index 98% rename from clustering.py rename to clustering/clustering.py index 552d8ae..38af31c 100644 --- a/clustering.py +++ b/clustering/clustering.py @@ -16,6 +16,8 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv preference = np.quantile(mat,preference_quantile) + print("data loaded") + clustering = AffinityPropagation(damping=damping, max_iter=max_iter, convergence_iter=convergence_iter, diff --git a/fit_tsne.py b/clustering/fit_tsne.py similarity index 100% rename from fit_tsne.py rename to clustering/fit_tsne.py diff --git a/checkpoint_parallelsql.sbatch b/datasets/checkpoint_parallelsql.sbatch similarity index 100% rename from checkpoint_parallelsql.sbatch rename to datasets/checkpoint_parallelsql.sbatch diff --git a/comments_2_parquet.sh b/datasets/comments_2_parquet.sh similarity index 100% rename from comments_2_parquet.sh rename to datasets/comments_2_parquet.sh diff --git a/comments_2_parquet_part1.py b/datasets/comments_2_parquet_part1.py similarity index 100% rename from comments_2_parquet_part1.py rename to datasets/comments_2_parquet_part1.py diff --git a/comments_2_parquet_part2.py b/datasets/comments_2_parquet_part2.py similarity index 100% rename from comments_2_parquet_part2.py rename to datasets/comments_2_parquet_part2.py diff --git a/helper.py b/datasets/helper.py similarity index 100% rename from helper.py rename to datasets/helper.py diff --git a/datasets/job_script.sh b/datasets/job_script.sh new file mode 100755 index 0000000..d90b618 --- /dev/null +++ b/datasets/job_script.sh @@ -0,0 +1,4 @@ +#!/usr/bin/bash +start_spark_cluster.sh +spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000 +stop-all.sh diff --git a/submissions_2_parquet.sh b/datasets/submissions_2_parquet.sh similarity index 100% rename from submissions_2_parquet.sh rename to datasets/submissions_2_parquet.sh diff --git a/submissions_2_parquet_part1.py b/datasets/submissions_2_parquet_part1.py similarity index 100% rename from submissions_2_parquet_part1.py rename to datasets/submissions_2_parquet_part1.py diff --git a/submissions_2_parquet_part2.py b/datasets/submissions_2_parquet_part2.py similarity index 100% rename from submissions_2_parquet_part2.py rename to datasets/submissions_2_parquet_part2.py diff --git a/check_comments_shas.py b/dumps/check_comments_shas.py similarity index 100% rename from check_comments_shas.py rename to dumps/check_comments_shas.py diff --git a/check_submission_shas.py b/dumps/check_submission_shas.py similarity index 100% rename from check_submission_shas.py rename to dumps/check_submission_shas.py diff --git a/pull_pushshift_comments.sh b/dumps/pull_pushshift_comments.sh similarity index 100% rename from pull_pushshift_comments.sh rename to dumps/pull_pushshift_comments.sh diff --git a/pull_pushshift_submissions.sh b/dumps/pull_pushshift_submissions.sh similarity index 100% rename from pull_pushshift_submissions.sh rename to dumps/pull_pushshift_submissions.sh diff --git a/ngrams/#ngrams_helper.py# b/ngrams/#ngrams_helper.py# new file mode 100644 index 0000000..e69de29 diff --git a/ngrams/checkpoint_parallelsql.sbatch b/ngrams/checkpoint_parallelsql.sbatch new file mode 100644 index 0000000..dd61e65 --- /dev/null +++ b/ngrams/checkpoint_parallelsql.sbatch @@ -0,0 +1,26 @@ +#!/bin/bash +## parallel_sql_job.sh +#SBATCH --job-name=tf_subreddit_comments +## Allocation Definition +#SBATCH --account=comdata-ckpt +#SBATCH --partition=ckpt +## Resources +## Nodes. This should always be 1 for parallel-sql. +#SBATCH --nodes=1 +## Walltime (12 hours) +#SBATCH --time=12:00:00 +## Memory per node +#SBATCH --mem=32G +#SBATCH --cpus-per-task=4 +#SBATCH --ntasks=1 +#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit +source ./bin/activate +module load parallel_sql +echo $(which perl) +conda list pyarrow +which python3 +#Put here commands to load other modules (e.g. matlab etc.) +#Below command means that parallel_sql will get tasks from the database +#and run them on the node (in parallel). So a 16 core node will have +#16 tasks running at one time. +parallel-sql --sql -a parallel --exit-on-term --jobs 4 diff --git a/run_tf_jobs.sh b/ngrams/run_tf_jobs.sh similarity index 100% rename from run_tf_jobs.sh rename to ngrams/run_tf_jobs.sh diff --git a/sort_tf_comments.py b/ngrams/sort_tf_comments.py similarity index 100% rename from sort_tf_comments.py rename to ngrams/sort_tf_comments.py diff --git a/tf_comments.py b/ngrams/tf_comments.py similarity index 89% rename from tf_comments.py rename to ngrams/tf_comments.py index 526bac2..f86548a 100755 --- a/tf_comments.py +++ b/ngrams/tf_comments.py @@ -7,7 +7,6 @@ from itertools import groupby, islice, chain import fire from collections import Counter import os -import datetime import re from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize from nltk.corpus import stopwords @@ -31,8 +30,8 @@ def weekly_tf(partition, mwe_pass = 'first'): ngram_output = partition.replace("parquet","txt") if mwe_pass == 'first': - if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"): - os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}") + if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"): + os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}") batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) @@ -67,7 +66,7 @@ def weekly_tf(partition, mwe_pass = 'first'): subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) if mwe_pass != 'first': - mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather') + mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather') mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False) mwe_phrases = list(mwe_dataset.phrase) mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases] @@ -88,7 +87,6 @@ def weekly_tf(partition, mwe_pass = 'first'): new_sentence.append(new_token) return new_sentence - stopWords = set(stopwords.words('english')) # we follow the approach described in datta, phelan, adar 2017 @@ -121,7 +119,7 @@ def weekly_tf(partition, mwe_pass = 'first'): for sentence in sentences: if random() <= 0.1: grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4)))) - with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file: + with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file: for ng in grams: gram_file.write(' '.join(ng) + '\n') for token in sentence: @@ -156,7 +154,7 @@ def weekly_tf(partition, mwe_pass = 'first'): outchunksize = 10000 - with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: + with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: while True: diff --git a/top_comment_phrases.py b/ngrams/top_comment_phrases.py similarity index 100% rename from top_comment_phrases.py rename to ngrams/top_comment_phrases.py diff --git a/old/#tfidf_authors.py# b/old/#tfidf_authors.py# new file mode 100644 index 0000000..e92d8df --- /dev/null +++ b/old/#tfidf_authors.py# @@ -0,0 +1,21 @@ +from pyspark.sql import SparkSession +from similarities_helper import build_tfidf_dataset +import pandas as pd + +spark = SparkSession.builder.getOrCreate() + +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +df = df.filter(df.author != '[deleted]') +df = df.filter(df.author != 'AutoModerator') + +df = build_tfidf_dataset(df, include_subs, 'author') + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy') + +spark.stop() diff --git a/old/#tfidf_comments_weekly.py# b/old/#tfidf_comments_weekly.py# new file mode 100644 index 0000000..1f30184 --- /dev/null +++ b/old/#tfidf_comments_weekly.py# @@ -0,0 +1,27 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +from similarities_helper import build_weekly_tfidf_dataset +import pandas as pd + + +## TODO:need to exclude automoderator / bot posts. +## TODO:need to exclude better handle hyperlinks. + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +# df = df.filter(df.author != '[deleted]') +# df = df.filter(df.author != 'AutoModerator') + +df = build_weekly_tfidf_dataset(df, include_subs, 'term') + + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy') +spark.stop() + diff --git a/old/.#tfidf_authors.py b/old/.#tfidf_authors.py new file mode 120000 index 0000000..8972575 --- /dev/null +++ b/old/.#tfidf_authors.py @@ -0,0 +1 @@ +nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/old/.#tfidf_comments_weekly.py b/old/.#tfidf_comments_weekly.py new file mode 120000 index 0000000..8972575 --- /dev/null +++ b/old/.#tfidf_comments_weekly.py @@ -0,0 +1 @@ +nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/old/author_cosine_similarity.py b/old/author_cosine_similarity.py new file mode 100644 index 0000000..5bd5405 --- /dev/null +++ b/old/author_cosine_similarity.py @@ -0,0 +1,106 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +import numpy as np +import pyarrow +import pandas as pd +import fire +from itertools import islice +from pathlib import Path +from similarities_helper import * + +#tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/subreddit_terms.parquet') +def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + tfidf = spark.read.parquet(tfidf_path) + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + + else: + included_subreddits = set(open(included_subreddits)) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits) + + tfidf = spark.read.parquet(tempdir.name) + + # the ids can change each week. + subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + weeks = list(subreddit_names.week.drop_duplicates()) + for week in weeks: + print("loading matrix") + mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week) + print('computing similarities') + sims = column_similarities(mat) + del mat + + names = subreddit_names.loc[subreddit_names.week==week] + + sims = sims.rename({i:sr for i, sr in enumerate(names.subreddit.values)},axis=1) + sims['subreddit'] = names.subreddit.values + write_weekly_similarities(outfile, sims, week) + + + +def cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500): + ''' + Compute similarities between subreddits based on tfi-idf vectors of author comments + + included_subreddits : string + Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits + + min_df : int (default = 0.1 * (number of included_subreddits) + exclude terms that appear in fewer than this number of documents. + + outfile: string + where to output csv and feather outputs +''' + + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + + tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet') + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + + else: + included_subreddits = set(open(included_subreddits)) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name,'author') + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) + sims['subreddit'] = subreddit_names.subreddit.values + + p = Path(outfile) + + output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + + sims.to_feather(outfile) + tempdir.cleanup() + +if __name__ == '__main__': + fire.Fire(author_cosine_similarities) diff --git a/old/term_cosine_similarity.py b/old/term_cosine_similarity.py new file mode 100644 index 0000000..88ba71e --- /dev/null +++ b/old/term_cosine_similarity.py @@ -0,0 +1,61 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix +import numpy as np +import pyarrow +import pandas as pd +import fire +from itertools import islice +from pathlib import Path +from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, column_similarities, select_topN +import scipy + +# outfile='test_similarities_500.feather'; +# min_df = None; +# included_subreddits=None; topN=100; exclude_phrases=True; +def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + print(exclude_phrases) + + tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_terms.parquet') + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + else: + included_subreddits = set(open(included_subreddits)) + + if exclude_phrases == True: + tfidf = tfidf.filter(~f.col(term).contains("_")) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name,'term') + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) + sims['subreddit'] = subreddit_names.subreddit.values + + p = Path(outfile) + + output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + + sims.to_feather(outfile) + tempdir.cleanup() + +if __name__ == '__main__': + fire.Fire(term_cosine_similarities) diff --git a/old/tfidf_authors.py b/old/tfidf_authors.py new file mode 100644 index 0000000..e92d8df --- /dev/null +++ b/old/tfidf_authors.py @@ -0,0 +1,21 @@ +from pyspark.sql import SparkSession +from similarities_helper import build_tfidf_dataset +import pandas as pd + +spark = SparkSession.builder.getOrCreate() + +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +df = df.filter(df.author != '[deleted]') +df = df.filter(df.author != 'AutoModerator') + +df = build_tfidf_dataset(df, include_subs, 'author') + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy') + +spark.stop() diff --git a/old/tfidf_authors_weekly.py b/old/tfidf_authors_weekly.py new file mode 100644 index 0000000..0073262 --- /dev/null +++ b/old/tfidf_authors_weekly.py @@ -0,0 +1,21 @@ +from pyspark.sql import SparkSession +from similarities_helper import build_weekly_tfidf_dataset +import pandas as pd + +spark = SparkSession.builder.getOrCreate() + +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +df = df.filter(df.author != '[deleted]') +df = df.filter(df.author != 'AutoModerator') + +df = build_weekly_tfidf_dataset(df, include_subs, 'author') + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', mode='overwrite', compression='snappy') + +spark.stop() diff --git a/old/tfidf_comments.py b/old/tfidf_comments.py new file mode 100644 index 0000000..ec3f6bf --- /dev/null +++ b/old/tfidf_comments.py @@ -0,0 +1,18 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +from similarities_helper import build_tfidf_dataset + +## TODO:need to exclude automoderator / bot posts. +## TODO:need to exclude better handle hyperlinks. + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +df = build_tfidf_dataset(df, include_subs, 'term') + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/reddit_similarity/subreddit_terms.parquet',mode='overwrite',compression='snappy') +spark.stop() diff --git a/old/tfidf_comments_weekly.py b/old/tfidf_comments_weekly.py new file mode 100644 index 0000000..1f30184 --- /dev/null +++ b/old/tfidf_comments_weekly.py @@ -0,0 +1,27 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +from similarities_helper import build_weekly_tfidf_dataset +import pandas as pd + + +## TODO:need to exclude automoderator / bot posts. +## TODO:need to exclude better handle hyperlinks. + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +# df = df.filter(df.author != '[deleted]') +# df = df.filter(df.author != 'AutoModerator') + +df = build_weekly_tfidf_dataset(df, include_subs, 'term') + + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy') +spark.stop() + diff --git a/similarities/#cosine_similarities.py# b/similarities/#cosine_similarities.py# new file mode 100644 index 0000000..ae080d5 --- /dev/null +++ b/similarities/#cosine_similarities.py# @@ -0,0 +1,73 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +import pandas as pd +import fire +from pathlib import Path +from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits + + +def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + print(exclude_phrases) + + tfidf = spark.read.parquet(infile) + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + else: + included_subreddits = set(open(included_subreddits)) + + if exclude_phrases == True: + tfidf = tfidf.filter(~f.col(term_colname).contains("_")) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name, term_colname) + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) + sims['subreddit'] = subreddit_names.subreddit.values + + p = Path(outfile) + + output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + + sims.to_feather(outfile) + tempdir.cleanup() + +def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): + return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', + 'term', + outfile, + min_df, + included_subreddits, + topN, + exclude_phrases) + +def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000): + return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', + 'author', + outfile, + min_df, + included_subreddits, + topN, + exclude_phrases=False) + +if __name__ == "__main__": + fire.Fire({'term':term_cosine_similarities, + 'author':author_cosine_similarities}) + diff --git a/similarities/#tfidf_weekly.py# b/similarities/#tfidf_weekly.py# new file mode 100644 index 0000000..8b0e8ff --- /dev/null +++ b/similarities/#tfidf_weekly.py# @@ -0,0 +1,24 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +from similarities_helper import build_weekly_tfidf_dataset +import pandas as pd + +def tfidf_weekly(inpath, outpath, topN, term_colname, exclude): + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet") + +include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv") + +include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit']) + +# remove [deleted] and AutoModerator (TODO remove other bots) +# df = df.filter(df.author != '[deleted]') +# df = df.filter(df.author != 'AutoModerator') + +df = build_weekly_tfidf_dataset(df, include_subs, 'term') + + +df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy') +spark.stop() diff --git a/similarities/.#cosine_similarities.py b/similarities/.#cosine_similarities.py new file mode 120000 index 0000000..8972575 --- /dev/null +++ b/similarities/.#cosine_similarities.py @@ -0,0 +1 @@ +nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/similarities/.#tfidf_weekly.py b/similarities/.#tfidf_weekly.py new file mode 120000 index 0000000..8972575 --- /dev/null +++ b/similarities/.#tfidf_weekly.py @@ -0,0 +1 @@ +nathante@n2347.hyak.local.31061:1602221800 \ No newline at end of file diff --git a/similarities/Makefile b/similarities/Makefile new file mode 100644 index 0000000..89a908f --- /dev/null +++ b/similarities/Makefile @@ -0,0 +1,2 @@ +/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet + start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet diff --git a/similarities/__pycache__/similarities_helper.cpython-37.pyc b/similarities/__pycache__/similarities_helper.cpython-37.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e5e4965095c4996e907817c2cdc72dede4e3e7b0 GIT binary patch literal 6845 zcmb_hOLHSv74E0HB}=j_%WqFInIw}@D%eS=P%Ki(kS77g49P%1Je0O>%Wk>#$nDnH z4sBkO1)Hg239RHL3w{7=_7v-0QBXzIZ1@3L@qM@3lI@v+P(-di_x64Fx#v6Ixz}%0 zDkTlSKiB`-9yB%WAAC9cS;*YP6a5oHXhILQzTVPx<_+YHmZ|b)X!Y%ut@2h_=sPV( z>-FV(nAC z^+f)*E;jJ?q0kUwq{cnyjI-5})1P-itcYi#z+iv9~>rd>L)yd-lCS z>~D8O)cKP1g$Uy9DCh^FCxbZfqYE$XeLo!ea%*%fU5s796VYf~-O^P_$HW_dthVvI zi6;^eiGE@xrqEZk6AP(<)Q+{Do@l}p)_vookQ91`utg!U_sm$En8G>L6XU=T?tNoo zooXjeY$r~ykT{9G1`}D;@a8b>8eQX_4*ivOQnMY1)PZOZ{KM24dGh12A7fVvU8u#2 z(*iUch;CYo#=F_~o;9`(egB|oq)ulTj{AcMU4k(7r7U9oO*b_}lseR1k%U7lEiFVw@m*l1fv+)nYW7bAx2n~DU>aJ1Muj%qS-aFhW{?8m8&9kM3IsTN&BYMyF3cCTj%*y^5W!0@6!m|BbUG4L> zzK$Vs-MM;++B27R`4UPy#36s&+Ntioj~Cdrkw7!BXA>#y+PYxZc4FstZSBFP@9Wg) zNg=ji?{-p1EZDLOTP{GWNT;@3+>v#RFBd83F`BwuVt!^=x?E#{GEYZtFi)eAS16vO zxMYcr(vp0NRo5tN|_@f)(+zS40AVbrF zT8u=SKv}1H4=w;kqzfv`683!)W(VV&&;jT)rV}mE0CugWQNb`PXGfB{BKm77>v?F=7dvwG(7ZWYDQyVts4%#_{%S+Zhsuc!6-aV^*c>7 zHR&6y0epnq!ibl(u$cLT3+A$ob`c$&reD{qFc|q7@>7`2;f1wI2e+|mmq5{+saTrakp{`N$bivXD&DQ%Bj8u( zHR-{M!@Iszo(A;;Z!|gvvshqmKb->wqiGZ&#At4$X6O$jT~DfaQxkTUT0s;H(9I7K zRHUv5qBuZAl9mpGc>hgBPH9OF58DV$c462S=aO_$=^R!YgSdx-H;z*W5sNR8v3f%c z=6tgjnYqhvI6d4RT$hVL_9 z5PUB{_acPOz(dfM3ECVi6$Q~M;5M&vHO57N1;JWYU`5D01L_{?lzVstOfLYmoB*Kt z919Gzr{DzidUZUD0A~ZBr7O{5z%+wr2F~7cZv~KC6$=FA6q1d{LQ=jC?J5wet(9NH zD=3ZRkWm`_qbeDuIih5+1!Ll*O<%A8O9u3dDA&Kk}^>;MkOiX{fu#DM(qk3 zbuO)tdW+4vFsqsEoWFKVJ+_#i0OlKXUS&mQSfLrrZM<& zd-_TL88of(0Eexd=_09ScRM_xbM{xc7#E}BECIs0gsrs*Jr58BfMop(0o_m0FM1K; z0>p3%co<|2t6@BEyGG4)&9WgS`aGIB{zb4E=K?=%!hlAf;LN4`I6;J980KQ`A=H~# zu?=d@lG;dEB!s#zF7 z&cjT}G_~ot3jnQ)IWVJI!@OWSoUhdy06NT}>eUPkKFgwnwboIy2$0Mnuh?P*XF)Ny z!a3QhHk}hQsNk`j*l4?i`Ih6A$*SO2uO;eR(TBm~g;gog;{xd6K3pOG3UFo6* zP*Tk1&UL~sIA4PAM+t5dGTrB`%y_-GabWyLmw&-Z%A$%^n|Q7?lokibvlM|UfZf|{ zXe->$yf)$a_YCOI!6>q`s1bKRar)lTjkw!Pi|Tgh2r&y{Ph&Vxypq;uAyFIA5)bnk z*YG@(F_9urp8jU1o+mPD>>JOg>Uh-J;f z!Zzc+NGSmKS#2@wM2IL^mX>CUx{U1g-Ek0#vy*)mspt1Kf5h5m&}Qg|fUm6A83r0J z0LIsmUQ^|DL-p-)Nw{tMuGelNz#yIgQ5FRPGmGRB%cs4G8 z5Kci*8|yg;^I8SMYR_fWBLd}G^YZOmYoKSp3@iU5_Q~ZMB5)&?Fc)0+SzoT8m8xYOF{h?9U zui`1=gw!x=fWkE=g&v>1_wY6Ia2$`u@pgXGI%E3n8I0Sz$2`e` zk^6ld;i9chba#5;#!E{+3UN1(QJ0iNj}hZusE13po|R|^xWMchYg*4n9EOW#xM5>Q zT?I4?sWB2QS6yRaWLyK``WB9@)2f`!*p}YlK)!&{aRqP1!_khqC~Fm1F1lG1e@S`E z+3~#1)-%TyEy5{8JQ`@Y5L(yeuQ{kL?=g7_$Ma(qeQZS^huOKqM1?x3q?QjQNPcVU z>RGdXh`Z6P5dU@v_qJ3p6YP#-U#Osqr{n$zchr7%CU-{1p2W34Yd+6x<<2j=+~fUV zs~mNL(eV}+f%|_4MYy)eZmmD%i;XqlcK;RRCAFb{!&_u@6^NsFF`D8mnDe>kAy57(|Ryqe_}-7+4>ecHX| LzFd8!`qIAvgi>*; literal 0 HcmV?d00001 diff --git a/similarities/cosine_similarities.py b/similarities/cosine_similarities.py new file mode 100644 index 0000000..ae080d5 --- /dev/null +++ b/similarities/cosine_similarities.py @@ -0,0 +1,73 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +import pandas as pd +import fire +from pathlib import Path +from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits + + +def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + print(exclude_phrases) + + tfidf = spark.read.parquet(infile) + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + else: + included_subreddits = set(open(included_subreddits)) + + if exclude_phrases == True: + tfidf = tfidf.filter(~f.col(term_colname).contains("_")) + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits) + tfidf = spark.read.parquet(tempdir.name) + subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + print("loading matrix") + mat = read_tfidf_matrix(tempdir.name, term_colname) + print('computing similarities') + sims = column_similarities(mat) + del mat + + sims = pd.DataFrame(sims.todense()) + sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) + sims['subreddit'] = subreddit_names.subreddit.values + + p = Path(outfile) + + output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) + output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) + output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) + + sims.to_feather(outfile) + tempdir.cleanup() + +def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): + return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', + 'term', + outfile, + min_df, + included_subreddits, + topN, + exclude_phrases) + +def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000): + return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', + 'author', + outfile, + min_df, + included_subreddits, + topN, + exclude_phrases=False) + +if __name__ == "__main__": + fire.Fire({'term':term_cosine_similarities, + 'author':author_cosine_similarities}) + diff --git a/similarities/job_script.sh b/similarities/job_script.sh new file mode 100755 index 0000000..1c2cacf --- /dev/null +++ b/similarities/job_script.sh @@ -0,0 +1,4 @@ +#!/usr/bin/bash +start_spark_cluster.sh +spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet +stop-all.sh diff --git a/similarities_helper.py b/similarities/similarities_helper.py similarity index 74% rename from similarities_helper.py rename to similarities/similarities_helper.py index ef434ac..88c830c 100644 --- a/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -8,11 +8,33 @@ import pyarrow.dataset as ds from scipy.sparse import csr_matrix import pandas as pd import numpy as np +import pathlib class tf_weight(Enum): MaxTF = 1 Norm05 = 2 +def read_tfidf_matrix_weekly(path, term_colname, week): + term = term_colname + term_id = term + '_id' + term_id_new = term + '_id_new' + + dataset = ds.dataset(path,format='parquet') + entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new],filter=ds.field('week')==week).to_pandas() + return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1)))) + +def write_weekly_similarities(path, sims, week, names): + sims['week'] = week + p = pathlib.Path(path) + if not p.is_dir(): + p.mkdir() + + # reformat as a pairwise list + sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values) + sims.to_parquet(p / week.isoformat()) + + + def read_tfidf_matrix(path,term_colname): term = term_colname term_id = term + '_id' @@ -29,6 +51,41 @@ def column_similarities(mat): return(sims) +def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits): + term = term_colname + term_id = term + '_id' + term_id_new = term + '_id_new' + + if min_df is None: + min_df = 0.1 * len(included_subreddits) + + tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) + + # we might not have the same terms or subreddits each week, so we need to make unique ids for each week. + sub_ids = tfidf.select(['subreddit_id','week']).distinct() + sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id"))) + tfidf = tfidf.join(sub_ids,['subreddit_id','week']) + + # only use terms in at least min_df included subreddits in a given week + new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count')) + tfidf = tfidf.join(new_count,[term_id,'week'],how='inner') + + # reset the term ids + term_ids = tfidf.select([term_id,'week']).distinct() + term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id))) + tfidf = tfidf.join(term_ids,[term_id,'week']) + + tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") + tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float')) + + tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.') + + tfidf = tfidf.repartition('week') + + tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy') + return(tempdir) + + def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): term = term_colname term_id = term + '_id' @@ -46,7 +103,6 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): # only use terms in at least min_df included subreddits new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count')) -# new_count = new_count.filter(f.col('new_count') >= min_df) tfidf = tfidf.join(new_count,term_id,how='inner') # reset the term ids @@ -55,8 +111,6 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): tfidf = tfidf.join(term_ids,term_id) tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") - # tfidf = tfidf.withColumnRenamed("idf","idf_old") - # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count"))) tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float')) tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.') @@ -64,7 +118,9 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy') return tempdir -def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): + +# try computing cosine similarities using spark +def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): term = term_colname term_id = term + '_id' term_id_new = term + '_id_new' @@ -82,7 +138,6 @@ def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, simila # only use terms in at least min_df included subreddits new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count')) -# new_count = new_count.filter(f.col('new_count') >= min_df) tfidf = tfidf.join(new_count,term_id,how='inner') # reset the term ids @@ -91,14 +146,10 @@ def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, simila tfidf = tfidf.join(term_ids,term_id) tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") - # tfidf = tfidf.withColumnRenamed("idf","idf_old") - # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count"))) tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf) # step 1 make an rdd of entires # sorted by (dense) spark subreddit id - # entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd - n_partitions = int(len(included_subreddits)*2 / 5) entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions) @@ -214,7 +265,6 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm df = df.join(idf, on=[term_id, term]) # agg terms by subreddit to make sparse tf/df vectors - if tf_family == tf_weight.MaxTF: df = df.withColumn("tf_idf", df.relative_tf * df.idf) else: # tf_fam = tf_weight.Norm05 @@ -222,4 +272,7 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm return df - +def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"): + rankdf = pd.read_csv(path) + included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) + return included_subreddits diff --git a/similarities/tfidf.py b/similarities/tfidf.py new file mode 100644 index 0000000..5b1c0c9 --- /dev/null +++ b/similarities/tfidf.py @@ -0,0 +1,73 @@ +import fire +from pyspark.sql import SparkSession +from pyspark.sql import functions as f +from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits + + +def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude): + spark = SparkSession.builder.getOrCreate() + + df = spark.read.parquet(inpath) + + df = df.filter(~ f.col(term_colname).isin(exclude)) + + include_subs = select_topN_subreddits(topN) + + df = func(df, include_subs, term_colname) + + df.write.parquet(outpath,mode='overwrite',compression='snappy') + + spark.stop() + +def tfidf(inpath, outpath, topN, term_colname, exclude): + return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude) + +def tfidf_weekly(inpath, outpath, topN, term_colname, exclude): + return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude) + +def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', + topN=25000): + + return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", + outpath, + topN, + 'author', + ['[deleted]','AutoModerator'] + ) + +def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', + topN=25000): + + return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", + outpath, + topN, + 'term', + [] + ) + +def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', + topN=25000): + + return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", + outpath, + topN, + 'author', + ['[deleted]','AutoModerator'] + ) + +def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', + topN=25000): + + return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", + outpath, + topN, + 'term', + [] + ) + + +if __name__ == "__main__": + fire.Fire({'authors':tfidf_authors, + 'terms':tfidf_terms, + 'authors_weekly':tfidf_authors_weekly, + 'terms_weekly':tfidf_terms_weekly}) diff --git a/top_subreddits_by_comments.py b/similarities/top_subreddits_by_comments.py similarity index 56% rename from top_subreddits_by_comments.py rename to similarities/top_subreddits_by_comments.py index 9e172c5..214c7e0 100644 --- a/top_subreddits_by_comments.py +++ b/similarities/top_subreddits_by_comments.py @@ -1,14 +1,6 @@ from pyspark.sql import functions as f from pyspark.sql import SparkSession from pyspark.sql import Window -from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix -import numpy as np -import pyarrow -import pandas as pd -import fire -from itertools import islice -from pathlib import Path -from similarities_helper import cosine_similarities spark = SparkSession.builder.getOrCreate() conf = spark.sparkContext.getConf() @@ -21,10 +13,10 @@ df = df.filter(~df.subreddit.like("u_%")) df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments")) win = Window.orderBy(f.col('n_comments').desc()) -df = df.withColumn('comments_rank',f.rank().over(win)) +df = df.withColumn('comments_rank', f.rank().over(win)) df = df.toPandas() df = df.sort_values("n_comments") -df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False) +df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False) diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py new file mode 100644 index 0000000..2b3c90b --- /dev/null +++ b/similarities/weekly_cosine_similarities.py @@ -0,0 +1,73 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession +from pyspark.sql import Window +import numpy as np +import pyarrow +import pandas as pd +import fire +from itertools import islice +from pathlib import Path +from similarities_helper import * + + +#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet') +def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500): + spark = SparkSession.builder.getOrCreate() + conf = spark.sparkContext.getConf() + print(outfile) + tfidf = spark.read.parquet(tfidf_path) + + if included_subreddits is None: + included_subreddits = select_topN_subreddits(topN) + else: + included_subreddits = set(open(included_subreddits)) + + print(f"computing weekly similarities for {len(included_subreddits)} subreddits") + + print("creating temporary parquet with matrix indicies") + tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits) + + tfidf = spark.read.parquet(tempdir.name) + + # the ids can change each week. + subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas() + subreddit_names = subreddit_names.sort_values("subreddit_id_new") + subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 + spark.stop() + + weeks = list(subreddit_names.week.drop_duplicates()) + for week in weeks: + print(f"loading matrix: {week}") + mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week) + print('computing similarities') + sims = column_similarities(mat) + del mat + + names = subreddit_names.loc[subreddit_names.week == week] + sims = pd.DataFrame(sims.todense()) + + sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1) + sims['subreddit'] = names.subreddit.values + + write_weekly_similarities(outfile, sims, week, names) + + +def author_cosine_similarities_weekly(outfile, min_df=None , included_subreddits=None, topN=500): + return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', + outfile, + 'author', + min_df, + included_subreddits, + topN) + +def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500): + return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', + outfile, + 'term', + min_df, + included_subreddits, + topN) + +if __name__ == "__main__": + fire.Fire({'author':author_cosine_similarities_weekly, + 'term':term_cosine_similarities_weekly}) diff --git a/term_cosine_similarity.py b/term_cosine_similarity.py deleted file mode 100644 index dd92b2c..0000000 --- a/term_cosine_similarity.py +++ /dev/null @@ -1,127 +0,0 @@ -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -from pyspark.sql import Window -from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix -import numpy as np -import pyarrow -import pandas as pd -import fire -from itertools import islice -from pathlib import Path -from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities -import scipy -# outfile='test_similarities_500.feather'; -# min_df = None; -# included_subreddits=None; topN=100; exclude_phrases=True; - -def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False): - spark = SparkSession.builder.getOrCreate() - conf = spark.sparkContext.getConf() - print(outfile) - print(exclude_phrases) - - tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') - - if included_subreddits is None: - rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") - included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) - - else: - included_subreddits = set(open(included_subreddits)) - - if exclude_phrases == True: - tfidf = tfidf.filter(~f.col(term).contains("_")) - - print("creating temporary parquet with matrix indicies") - tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits) - tfidf = spark.read.parquet(tempdir.name) - subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas() - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 - spark.stop() - - print("loading matrix") - mat = read_tfidf_matrix(tempdir.name,'term') - print('computing similarities') - sims = column_similarities(mat) - del mat - - sims = pd.DataFrame(sims.todense()) - sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1) - sims['subreddit'] = subreddit_names.subreddit.values - - p = Path(outfile) - - output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) - output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) - output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - - sims.to_feather(outfile) - tempdir.cleanup() - path = "term_tfidf_entriesaukjy5gv.parquet" - - -# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; -# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True): -# ''' -# Compute similarities between subreddits based on tfi-idf vectors of comment texts - -# included_subreddits : string -# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits - -# similarity_threshold : double (default = 0) -# set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm -# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm. - -# min_df : int (default = 0.1 * (number of included_subreddits) -# exclude terms that appear in fewer than this number of documents. - -# outfile: string -# where to output csv and feather outputs -# ''' - -# print(outfile) -# print(exclude_phrases) - -# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') - -# if included_subreddits is None: -# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) -# included_subreddits = {s.strip('\n') for s in included_subreddits} - -# else: -# included_subreddits = set(open(included_subreddits)) - -# if exclude_phrases == True: -# tfidf = tfidf.filter(~f.col(term).contains("_")) - -# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold) - -# p = Path(outfile) - -# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather")) -# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) -# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) - -# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - -# #instead of toLocalMatrix() why not read as entries and put strait into numpy -# sim_entries = pd.read_parquet(output_parquet) - -# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() -# spark.stop() -# df['subreddit_id_new'] = df['subreddit_id_new'] - 1 -# df = df.sort_values('subreddit_id_new').reset_index(drop=True) -# df = df.set_index('subreddit_id_new') - -# similarities = sim_entries.join(df, on='i') -# similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) -# similarities = similarities.join(df, on='j') -# similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) - -# similarities.to_feather(output_feather) -# similarities.to_csv(output_csv) -# return similarities - -if __name__ == '__main__': - fire.Fire(term_cosine_similarities) diff --git a/tfidf_authors.py b/tfidf_authors.py deleted file mode 100644 index 6852fe8..0000000 --- a/tfidf_authors.py +++ /dev/null @@ -1,21 +0,0 @@ -from pyspark.sql import SparkSession -from similarities_helper import build_tfidf_dataset -import pandas as pd - -spark = SparkSession.builder.getOrCreate() - -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp") - -include_subs = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv") - -#include_subs = set(include_subs.loc[include_subs.comments_rank < 300]['subreddit']) - -# remove [deleted] and AutoModerator (TODO remove other bots) -df = df.filter(df.author != '[deleted]') -df = df.filter(df.author != 'AutoModerator') - -df = build_tfidf_dataset(df, include_subs, 'author') - -df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy') - -spark.stop() diff --git a/tfidf_comments.py b/tfidf_comments.py deleted file mode 100644 index 65d2420..0000000 --- a/tfidf_comments.py +++ /dev/null @@ -1,18 +0,0 @@ -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -from pyspark.sql import Window -from similarities_helper import build_tfidf_dataset - -## TODO:need to exclude automoderator / bot posts. -## TODO:need to exclude better handle hyperlinks. - -spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp") - -include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) -include_subs = {s.strip('\n') for s in include_subs} - -df = build_tfidf_dataset(df, include_subs, 'term') - -df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') -spark.stop() -- 2.39.5