]> code.communitydata.science - cdsc_reddit.git/commitdiff
Refactor and reorganze.
authorNate E TeBlunthuis <nathante@mox2.hyak.local>
Wed, 9 Dec 2020 01:32:20 +0000 (17:32 -0800)
committerNate E TeBlunthuis <nathante@mox2.hyak.local>
Wed, 9 Dec 2020 01:32:20 +0000 (17:32 -0800)
47 files changed:
author_cosine_similarity.py [deleted file]
clustering/clustering.py [moved from clustering.py with 98% similarity]
clustering/fit_tsne.py [moved from fit_tsne.py with 100% similarity]
datasets/checkpoint_parallelsql.sbatch [moved from checkpoint_parallelsql.sbatch with 100% similarity]
datasets/comments_2_parquet.sh [moved from comments_2_parquet.sh with 100% similarity]
datasets/comments_2_parquet_part1.py [moved from comments_2_parquet_part1.py with 100% similarity]
datasets/comments_2_parquet_part2.py [moved from comments_2_parquet_part2.py with 100% similarity]
datasets/helper.py [moved from helper.py with 100% similarity]
datasets/job_script.sh [new file with mode: 0755]
datasets/submissions_2_parquet.sh [moved from submissions_2_parquet.sh with 100% similarity]
datasets/submissions_2_parquet_part1.py [moved from submissions_2_parquet_part1.py with 100% similarity]
datasets/submissions_2_parquet_part2.py [moved from submissions_2_parquet_part2.py with 100% similarity]
dumps/check_comments_shas.py [moved from check_comments_shas.py with 100% similarity]
dumps/check_submission_shas.py [moved from check_submission_shas.py with 100% similarity]
dumps/pull_pushshift_comments.sh [moved from pull_pushshift_comments.sh with 100% similarity]
dumps/pull_pushshift_submissions.sh [moved from pull_pushshift_submissions.sh with 100% similarity]
ngrams/#ngrams_helper.py# [new file with mode: 0644]
ngrams/checkpoint_parallelsql.sbatch [new file with mode: 0644]
ngrams/run_tf_jobs.sh [moved from run_tf_jobs.sh with 100% similarity]
ngrams/sort_tf_comments.py [moved from sort_tf_comments.py with 100% similarity]
ngrams/tf_comments.py [moved from tf_comments.py with 89% similarity]
ngrams/top_comment_phrases.py [moved from top_comment_phrases.py with 100% similarity]
old/#tfidf_authors.py# [new file with mode: 0644]
old/#tfidf_comments_weekly.py# [new file with mode: 0644]
old/.#tfidf_authors.py [new symlink]
old/.#tfidf_comments_weekly.py [new symlink]
old/author_cosine_similarity.py [new file with mode: 0644]
old/term_cosine_similarity.py [new file with mode: 0644]
old/tfidf_authors.py [new file with mode: 0644]
old/tfidf_authors_weekly.py [new file with mode: 0644]
old/tfidf_comments.py [new file with mode: 0644]
old/tfidf_comments_weekly.py [new file with mode: 0644]
similarities/#cosine_similarities.py# [new file with mode: 0644]
similarities/#tfidf_weekly.py# [new file with mode: 0644]
similarities/.#cosine_similarities.py [new symlink]
similarities/.#tfidf_weekly.py [new symlink]
similarities/Makefile [new file with mode: 0644]
similarities/__pycache__/similarities_helper.cpython-37.pyc [new file with mode: 0644]
similarities/cosine_similarities.py [new file with mode: 0644]
similarities/job_script.sh [new file with mode: 0755]
similarities/similarities_helper.py [moved from similarities_helper.py with 74% similarity]
similarities/tfidf.py [new file with mode: 0644]
similarities/top_subreddits_by_comments.py [moved from top_subreddits_by_comments.py with 56% similarity]
similarities/weekly_cosine_similarities.py [new file with mode: 0644]
term_cosine_similarity.py [deleted file]
tfidf_authors.py [deleted file]
tfidf_comments.py [deleted file]

diff --git a/author_cosine_similarity.py b/author_cosine_similarity.py
deleted file mode 100644 (file)
index 7ae708b..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-
-spark = SparkSession.builder.getOrCreate()
-conf = spark.sparkContext.getConf()
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
-    '''
-    Compute similarities between subreddits based on tfi-idf vectors of author comments
-    
-    included_subreddits : string
-        Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
-    similarity_threshold : double (default = 0)
-        set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
-    min_df : int (default = 0.1 * (number of included_subreddits)
-         exclude terms that appear in fewer than this number of documents.
-
-    outfile: string
-         where to output csv and feather outputs
-'''
-
-    spark = SparkSession.builder.getOrCreate()
-    conf = spark.sparkContext.getConf()
-    print(outfile)
-
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
-    if included_subreddits is None:
-        rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-        included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
-    else:
-        included_subreddits = set(open(included_subreddits))
-
-    print("creating temporary parquet with matrix indicies")
-    tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
-    tfidf = spark.read.parquet(tempdir.name)
-    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
-    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-    spark.stop()
-
-    print("loading matrix")
-    mat = read_tfidf_matrix(tempdir.name,'author')
-    print('computing similarities')
-    sims = column_similarities(mat)
-    del mat
-    
-    sims = pd.DataFrame(sims.todense())
-    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
-    sims['subreddit'] = subreddit_names.subreddit.values
-
-    p = Path(outfile)
-
-    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-    sims.to_feather(outfile)
-    tempdir.cleanup()
-
-    # print(outfile)
-
-    # tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
-
-    # if included_subreddits is None:
-    #     included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
-    #     included_subreddits = {s.strip('\n') for s in included_subreddits}
-
-    # else:
-    #     included_subreddits = set(open(included_subreddits))
-
-    # sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold)
-
-    # p = Path(outfile)
-
-    # output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    # output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    # output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-    # sim_dist = sim_dist.entries.toDF()
-
-    # sim_dist = sim_dist.repartition(1)
-    # sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-    
-
-
-    # #instead of toLocalMatrix() why not read as entries and put strait into numpy
-    # sim_entries = pd.read_parquet(output_parquet)
-
-    # df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-
-    # spark.stop()
-    # df['subreddit_id_new'] = df['subreddit_id_new'] - 1
-    # df = df.sort_values('subreddit_id_new').reset_index(drop=True)
-    # df = df.set_index('subreddit_id_new')
-
-    # similarities = sim_entries.join(df, on='i')
-    # similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
-    # similarities = similarities.join(df, on='j')
-    # similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
-    # similarities.to_feather(output_feather)
-    # similarities.to_csv(output_csv)
-    # return similarities
-    
-if __name__ == '__main__':
-    fire.Fire(author_cosine_similarities)
similarity index 98%
rename from clustering.py
rename to clustering/clustering.py
index 552d8ae62e47d4521c478f61f8676f20a1a75881..38af31c243cdfe1e3a648e8885047e8cee81ec6b 100644 (file)
@@ -16,6 +16,8 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv
 
     preference = np.quantile(mat,preference_quantile)
 
 
     preference = np.quantile(mat,preference_quantile)
 
+    print("data loaded")
+
     clustering = AffinityPropagation(damping=damping,
                                      max_iter=max_iter,
                                      convergence_iter=convergence_iter,
     clustering = AffinityPropagation(damping=damping,
                                      max_iter=max_iter,
                                      convergence_iter=convergence_iter,
similarity index 100%
rename from fit_tsne.py
rename to clustering/fit_tsne.py
similarity index 100%
rename from helper.py
rename to datasets/helper.py
diff --git a/datasets/job_script.sh b/datasets/job_script.sh
new file mode 100755 (executable)
index 0000000..d90b618
--- /dev/null
@@ -0,0 +1,4 @@
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
+stop-all.sh
diff --git a/ngrams/#ngrams_helper.py# b/ngrams/#ngrams_helper.py#
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ngrams/checkpoint_parallelsql.sbatch b/ngrams/checkpoint_parallelsql.sbatch
new file mode 100644 (file)
index 0000000..dd61e65
--- /dev/null
@@ -0,0 +1,26 @@
+#!/bin/bash
+## parallel_sql_job.sh
+#SBATCH --job-name=tf_subreddit_comments
+## Allocation Definition
+#SBATCH --account=comdata-ckpt
+#SBATCH --partition=ckpt
+## Resources
+## Nodes. This should always be 1 for parallel-sql.
+#SBATCH --nodes=1    
+## Walltime (12 hours)
+#SBATCH --time=12:00:00
+## Memory per node
+#SBATCH --mem=32G
+#SBATCH --cpus-per-task=4
+#SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
+module load parallel_sql
+echo $(which perl)
+conda list pyarrow
+which python3
+#Put here commands to load other modules (e.g. matlab etc.)
+#Below command means that parallel_sql will get tasks from the database
+#and run them on the node (in parallel). So a 16 core node will have
+#16 tasks running at one time.
+parallel-sql --sql -a parallel --exit-on-term --jobs 4
similarity index 100%
rename from run_tf_jobs.sh
rename to ngrams/run_tf_jobs.sh
similarity index 89%
rename from tf_comments.py
rename to ngrams/tf_comments.py
index 526bac2bdabe284ec9550bfe290cadb13e438a1b..f86548a957a866b56d4dec6e9b4f813b2a4b5fa2 100755 (executable)
@@ -7,7 +7,6 @@ from itertools import groupby, islice, chain
 import fire
 from collections import Counter
 import os
 import fire
 from collections import Counter
 import os
-import datetime
 import re
 from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
 from nltk.corpus import stopwords
 import re
 from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
 from nltk.corpus import stopwords
@@ -31,8 +30,8 @@ def weekly_tf(partition, mwe_pass = 'first'):
     ngram_output = partition.replace("parquet","txt")
 
     if mwe_pass == 'first':
     ngram_output = partition.replace("parquet","txt")
 
     if mwe_pass == 'first':
-        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+        if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -67,7 +66,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
     if mwe_pass != 'first':
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
     if mwe_pass != 'first':
-        mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
+        mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
         mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
         mwe_phrases = list(mwe_dataset.phrase)
         mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
         mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
         mwe_phrases = list(mwe_dataset.phrase)
         mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -88,7 +87,6 @@ def weekly_tf(partition, mwe_pass = 'first'):
                 new_sentence.append(new_token)
         return new_sentence
 
                 new_sentence.append(new_token)
         return new_sentence
 
-
     stopWords = set(stopwords.words('english'))
 
     # we follow the approach described in datta, phelan, adar 2017
     stopWords = set(stopwords.words('english'))
 
     # we follow the approach described in datta, phelan, adar 2017
@@ -121,7 +119,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
             for sentence in sentences:
                 if random() <= 0.1:
                     grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
             for sentence in sentences:
                 if random() <= 0.1:
                     grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
-                    with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
+                    with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
                         for ng in grams:
                             gram_file.write(' '.join(ng) + '\n')
                 for token in sentence:
                         for ng in grams:
                             gram_file.write(' '.join(ng) + '\n')
                 for token in sentence:
@@ -156,7 +154,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     outchunksize = 10000
 
 
     outchunksize = 10000
 
-    with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+    with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
     
         while True:
 
     
         while True:
 
diff --git a/old/#tfidf_authors.py# b/old/#tfidf_authors.py#
new file mode 100644 (file)
index 0000000..e92d8df
--- /dev/null
@@ -0,0 +1,21 @@
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
diff --git a/old/#tfidf_comments_weekly.py# b/old/#tfidf_comments_weekly.py#
new file mode 100644 (file)
index 0000000..1f30184
--- /dev/null
@@ -0,0 +1,27 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
diff --git a/old/.#tfidf_authors.py b/old/.#tfidf_authors.py
new file mode 120000 (symlink)
index 0000000..8972575
--- /dev/null
@@ -0,0 +1 @@
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
diff --git a/old/.#tfidf_comments_weekly.py b/old/.#tfidf_comments_weekly.py
new file mode 120000 (symlink)
index 0000000..8972575
--- /dev/null
@@ -0,0 +1 @@
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
diff --git a/old/author_cosine_similarity.py b/old/author_cosine_similarity.py
new file mode 100644 (file)
index 0000000..5bd5405
--- /dev/null
@@ -0,0 +1,106 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+#tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/subreddit_terms.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    tfidf = spark.read.parquet(tfidf_path)
+    
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+    tfidf = spark.read.parquet(tempdir.name)
+
+    # the ids can change each week.
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    weeks = list(subreddit_names.week.drop_duplicates())
+    for week in weeks:
+        print("loading matrix")
+        mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+        print('computing similarities')
+        sims = column_similarities(mat)
+        del mat
+
+        names = subreddit_names.loc[subreddit_names.week==week]
+
+        sims = sims.rename({i:sr for i, sr in enumerate(names.subreddit.values)},axis=1)
+        sims['subreddit'] = names.subreddit.values
+        write_weekly_similarities(outfile, sims, week)
+
+
+
+def cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500):
+    '''
+    Compute similarities between subreddits based on tfi-idf vectors of author comments
+    
+    included_subreddits : string
+        Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
+
+    min_df : int (default = 0.1 * (number of included_subreddits)
+         exclude terms that appear in fewer than this number of documents.
+
+    outfile: string
+         where to output csv and feather outputs
+'''
+
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+
+    tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet')
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name,'author')
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+    
+if __name__ == '__main__':
+    fire.Fire(author_cosine_similarities)
diff --git a/old/term_cosine_similarity.py b/old/term_cosine_similarity.py
new file mode 100644 (file)
index 0000000..88ba71e
--- /dev/null
@@ -0,0 +1,61 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, column_similarities, select_topN
+import scipy
+
+# outfile='test_similarities_500.feather';
+# min_df = None;
+# included_subreddits=None; topN=100; exclude_phrases=True;
+def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_terms.parquet')
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name,'term')
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+    
+if __name__ == '__main__':
+    fire.Fire(term_cosine_similarities)
diff --git a/old/tfidf_authors.py b/old/tfidf_authors.py
new file mode 100644 (file)
index 0000000..e92d8df
--- /dev/null
@@ -0,0 +1,21 @@
+from pyspark.sql import SparkSession
+from similarities_helper import build_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
diff --git a/old/tfidf_authors_weekly.py b/old/tfidf_authors_weekly.py
new file mode 100644 (file)
index 0000000..0073262
--- /dev/null
@@ -0,0 +1,21 @@
+from pyspark.sql import SparkSession
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+spark = SparkSession.builder.getOrCreate()
+
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+df = df.filter(df.author != '[deleted]')
+df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'author')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', mode='overwrite', compression='snappy')
+
+spark.stop()
diff --git a/old/tfidf_comments.py b/old/tfidf_comments.py
new file mode 100644 (file)
index 0000000..ec3f6bf
--- /dev/null
@@ -0,0 +1,18 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_tfidf_dataset
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+df = build_tfidf_dataset(df, include_subs, 'term')
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/reddit_similarity/subreddit_terms.parquet',mode='overwrite',compression='snappy')
+spark.stop()
diff --git a/old/tfidf_comments_weekly.py b/old/tfidf_comments_weekly.py
new file mode 100644 (file)
index 0000000..1f30184
--- /dev/null
@@ -0,0 +1,27 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+
+## TODO:need to exclude automoderator / bot posts.
+## TODO:need to exclude better handle hyperlinks. 
+
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
+
diff --git a/similarities/#cosine_similarities.py# b/similarities/#cosine_similarities.py#
new file mode 100644 (file)
index 0000000..ae080d5
--- /dev/null
@@ -0,0 +1,73 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet(infile)
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name, term_colname)
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                               'term',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                               'author',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases=False)
+
+if __name__ == "__main__":
+    fire.Fire({'term':term_cosine_similarities,
+               'author':author_cosine_similarities})
+
diff --git a/similarities/#tfidf_weekly.py# b/similarities/#tfidf_weekly.py#
new file mode 100644 (file)
index 0000000..8b0e8ff
--- /dev/null
@@ -0,0 +1,24 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+from similarities_helper import build_weekly_tfidf_dataset
+import pandas as pd
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+    
+spark = SparkSession.builder.getOrCreate()
+df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
+
+include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
+
+include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
+# df = df.filter(df.author != '[deleted]')
+# df = df.filter(df.author != 'AutoModerator')
+
+df = build_weekly_tfidf_dataset(df, include_subs, 'term')
+
+
+df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
+spark.stop()
diff --git a/similarities/.#cosine_similarities.py b/similarities/.#cosine_similarities.py
new file mode 120000 (symlink)
index 0000000..8972575
--- /dev/null
@@ -0,0 +1 @@
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
diff --git a/similarities/.#tfidf_weekly.py b/similarities/.#tfidf_weekly.py
new file mode 120000 (symlink)
index 0000000..8972575
--- /dev/null
@@ -0,0 +1 @@
+nathante@n2347.hyak.local.31061:1602221800
\ No newline at end of file
diff --git a/similarities/Makefile b/similarities/Makefile
new file mode 100644 (file)
index 0000000..89a908f
--- /dev/null
@@ -0,0 +1,2 @@
+/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
+       start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
diff --git a/similarities/__pycache__/similarities_helper.cpython-37.pyc b/similarities/__pycache__/similarities_helper.cpython-37.pyc
new file mode 100644 (file)
index 0000000..e5e4965
Binary files /dev/null and b/similarities/__pycache__/similarities_helper.cpython-37.pyc differ
diff --git a/similarities/cosine_similarities.py b/similarities/cosine_similarities.py
new file mode 100644 (file)
index 0000000..ae080d5
--- /dev/null
@@ -0,0 +1,73 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+import pandas as pd
+import fire
+from pathlib import Path
+from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
+
+
+def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    print(exclude_phrases)
+
+    tfidf = spark.read.parquet(infile)
+
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    if exclude_phrases == True:
+        tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
+    tfidf = spark.read.parquet(tempdir.name)
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    print("loading matrix")
+    mat = read_tfidf_matrix(tempdir.name, term_colname)
+    print('computing similarities')
+    sims = column_similarities(mat)
+    del mat
+    
+    sims = pd.DataFrame(sims.todense())
+    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
+    sims['subreddit'] = subreddit_names.subreddit.values
+
+    p = Path(outfile)
+
+    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
+    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
+    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
+
+    sims.to_feather(outfile)
+    tempdir.cleanup()
+
+def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                               'term',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases)
+
+def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
+    return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                               'author',
+                               outfile,
+                               min_df,
+                               included_subreddits,
+                               topN,
+                               exclude_phrases=False)
+
+if __name__ == "__main__":
+    fire.Fire({'term':term_cosine_similarities,
+               'author':author_cosine_similarities})
+
diff --git a/similarities/job_script.sh b/similarities/job_script.sh
new file mode 100755 (executable)
index 0000000..1c2cacf
--- /dev/null
@@ -0,0 +1,4 @@
+#!/usr/bin/bash
+start_spark_cluster.sh
+spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
+stop-all.sh
similarity index 74%
rename from similarities_helper.py
rename to similarities/similarities_helper.py
index ef434ac4ac52d297533d1282a13e3ad1714f3b2e..88c830cacf7d5971e6e885882b1771d0c864183b 100644 (file)
@@ -8,11 +8,33 @@ import pyarrow.dataset as ds
 from scipy.sparse import csr_matrix
 import pandas as pd
 import numpy as np
 from scipy.sparse import csr_matrix
 import pandas as pd
 import numpy as np
+import pathlib
 
 class tf_weight(Enum):
     MaxTF = 1
     Norm05 = 2
 
 
 class tf_weight(Enum):
     MaxTF = 1
     Norm05 = 2
 
+def read_tfidf_matrix_weekly(path, term_colname, week):
+    term = term_colname
+    term_id = term + '_id'
+    term_id_new = term + '_id_new'
+
+    dataset = ds.dataset(path,format='parquet')
+    entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new],filter=ds.field('week')==week).to_pandas()
+    return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
+
+def write_weekly_similarities(path, sims, week, names):
+    sims['week'] = week
+    p = pathlib.Path(path)
+    if not p.is_dir():
+        p.mkdir()
+        
+    # reformat as a pairwise list
+    sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
+    sims.to_parquet(p / week.isoformat())
+
+
+
 def read_tfidf_matrix(path,term_colname):
     term = term_colname
     term_id = term + '_id'
 def read_tfidf_matrix(path,term_colname):
     term = term_colname
     term_id = term + '_id'
@@ -29,6 +51,41 @@ def column_similarities(mat):
     return(sims)
 
 
     return(sims)
 
 
+def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
+    term = term_colname
+    term_id = term + '_id'
+    term_id_new = term + '_id_new'
+
+    if min_df is None:
+        min_df = 0.1 * len(included_subreddits)
+
+    tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
+
+    # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
+    sub_ids = tfidf.select(['subreddit_id','week']).distinct()
+    sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
+    tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
+
+    # only use terms in at least min_df included subreddits in a given week
+    new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
+    tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
+
+    # reset the term ids
+    term_ids = tfidf.select([term_id,'week']).distinct()
+    term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
+    tfidf = tfidf.join(term_ids,[term_id,'week'])
+
+    tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
+    tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
+
+    tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
+
+    tfidf = tfidf.repartition('week')
+
+    tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
+    return(tempdir)
+    
+
 def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
     term = term_colname
     term_id = term + '_id'
 def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
     term = term_colname
     term_id = term + '_id'
@@ -46,7 +103,6 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-#    new_count = new_count.filter(f.col('new_count') >= min_df)
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
@@ -55,8 +111,6 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-    # tfidf = tfidf.withColumnRenamed("idf","idf_old")
-    # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
     
     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
     
     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
@@ -64,7 +118,9 @@ def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
     return tempdir
 
     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
     return tempdir
 
-def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
+
+# try computing cosine similarities using spark
+def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
     term = term_colname
     term_id = term + '_id'
     term_id_new = term + '_id_new'
     term = term_colname
     term_id = term + '_id'
     term_id_new = term + '_id_new'
@@ -82,7 +138,6 @@ def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, simila
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
 
     # only use terms in at least min_df included subreddits
     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
-#    new_count = new_count.filter(f.col('new_count') >= min_df)
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
     tfidf = tfidf.join(new_count,term_id,how='inner')
     
     # reset the term ids
@@ -91,14 +146,10 @@ def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, simila
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
     tfidf = tfidf.join(term_ids,term_id)
 
     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
-    # tfidf = tfidf.withColumnRenamed("idf","idf_old")
-    # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
 
     # step 1 make an rdd of entires
     # sorted by (dense) spark subreddit id
     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
 
     # step 1 make an rdd of entires
     # sorted by (dense) spark subreddit id
-    #    entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd
     n_partitions = int(len(included_subreddits)*2 / 5)
 
     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
     n_partitions = int(len(included_subreddits)*2 / 5)
 
     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
@@ -214,7 +265,6 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
     df = df.join(idf, on=[term_id, term])
 
     # agg terms by subreddit to make sparse tf/df vectors
     df = df.join(idf, on=[term_id, term])
 
     # agg terms by subreddit to make sparse tf/df vectors
-    
     if tf_family == tf_weight.MaxTF:
         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
     else: # tf_fam = tf_weight.Norm05
     if tf_family == tf_weight.MaxTF:
         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
     else: # tf_fam = tf_weight.Norm05
@@ -222,4 +272,7 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
 
     return df
 
 
     return df
 
-
+def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"):
+    rankdf = pd.read_csv(path)
+    included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
+    return included_subreddits
diff --git a/similarities/tfidf.py b/similarities/tfidf.py
new file mode 100644 (file)
index 0000000..5b1c0c9
--- /dev/null
@@ -0,0 +1,73 @@
+import fire
+from pyspark.sql import SparkSession
+from pyspark.sql import functions as f
+from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
+
+
+def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude):
+    spark = SparkSession.builder.getOrCreate()
+
+    df = spark.read.parquet(inpath)
+
+    df = df.filter(~ f.col(term_colname).isin(exclude))
+
+    include_subs = select_topN_subreddits(topN)
+
+    df = func(df, include_subs, term_colname)
+
+    df.write.parquet(outpath,mode='overwrite',compression='snappy')
+
+    spark.stop()
+
+def tfidf(inpath, outpath, topN, term_colname, exclude):
+    return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
+    return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
+
+def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                  topN=25000):
+
+    return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+                 outpath,
+                 topN,
+                 'author',
+                 ['[deleted]','AutoModerator']
+                 )
+
+def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                topN=25000):
+
+    return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+                 outpath,
+                 topN,
+                 'term',
+                 []
+                 )
+
+def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
+                  topN=25000):
+
+    return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
+                 outpath,
+                 topN,
+                 'author',
+                 ['[deleted]','AutoModerator']
+                 )
+
+def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
+                topN=25000):
+
+    return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
+                 outpath,
+                 topN,
+                 'term',
+                 []
+                 )
+
+
+if __name__ == "__main__":
+    fire.Fire({'authors':tfidf_authors,
+               'terms':tfidf_terms,
+               'authors_weekly':tfidf_authors_weekly,
+               'terms_weekly':tfidf_terms_weekly})
similarity index 56%
rename from top_subreddits_by_comments.py
rename to similarities/top_subreddits_by_comments.py
index 9e172c54f0439dbc546c9302bc23e5aa5592569f..214c7e0b91d7f720a43a685d05b05186769ee7f2 100644 (file)
@@ -1,14 +1,6 @@
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
 from pyspark.sql import Window
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
 from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
@@ -21,10 +13,10 @@ df = df.filter(~df.subreddit.like("u_%"))
 df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
 
 win = Window.orderBy(f.col('n_comments').desc())
 df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
 
 win = Window.orderBy(f.col('n_comments').desc())
-df = df.withColumn('comments_rank',f.rank().over(win))
+df = df.withColumn('comments_rank', f.rank().over(win))
 
 df = df.toPandas()
 
 df = df.sort_values("n_comments")
 
 
 df = df.toPandas()
 
 df = df.sort_values("n_comments")
 
-df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)
+df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)
diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py
new file mode 100644 (file)
index 0000000..2b3c90b
--- /dev/null
@@ -0,0 +1,73 @@
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession
+from pyspark.sql import Window
+import numpy as np
+import pyarrow
+import pandas as pd
+import fire
+from itertools import islice
+from pathlib import Path
+from similarities_helper import *
+
+
+#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
+def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
+    spark = SparkSession.builder.getOrCreate()
+    conf = spark.sparkContext.getConf()
+    print(outfile)
+    tfidf = spark.read.parquet(tfidf_path)
+    
+    if included_subreddits is None:
+        included_subreddits = select_topN_subreddits(topN)
+    else:
+        included_subreddits = set(open(included_subreddits))
+
+    print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
+
+    print("creating temporary parquet with matrix indicies")
+    tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+
+    tfidf = spark.read.parquet(tempdir.name)
+
+    # the ids can change each week.
+    subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
+    spark.stop()
+
+    weeks = list(subreddit_names.week.drop_duplicates())
+    for week in weeks:
+        print(f"loading matrix: {week}")
+        mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
+        print('computing similarities')
+        sims = column_similarities(mat)
+        del mat
+
+        names = subreddit_names.loc[subreddit_names.week == week]
+        sims = pd.DataFrame(sims.todense())
+
+        sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
+        sims['subreddit'] = names.subreddit.values
+
+        write_weekly_similarities(outfile, sims, week, names)
+
+
+def author_cosine_similarities_weekly(outfile, min_df=None , included_subreddits=None, topN=500):
+    return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
+                                      outfile,
+                                      'author',
+                                      min_df,
+                                      included_subreddits,
+                                      topN)
+
+def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
+    return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
+                                      outfile,
+                                      'term',
+                                      min_df,
+                                      included_subreddits,
+                                      topN)
+
+if __name__ == "__main__":
+    fire.Fire({'author':author_cosine_similarities_weekly,
+               'term':term_cosine_similarities_weekly})
diff --git a/term_cosine_similarity.py b/term_cosine_similarity.py
deleted file mode 100644 (file)
index dd92b2c..0000000
+++ /dev/null
@@ -1,127 +0,0 @@
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
-import numpy as np
-import pyarrow
-import pandas as pd
-import fire
-from itertools import islice
-from pathlib import Path
-from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
-import scipy
-# outfile='test_similarities_500.feather';
-# min_df = None;
-# included_subreddits=None; topN=100; exclude_phrases=True;
-
-def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
-    spark = SparkSession.builder.getOrCreate()
-    conf = spark.sparkContext.getConf()
-    print(outfile)
-    print(exclude_phrases)
-
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
-    if included_subreddits is None:
-        rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-        included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
-
-    else:
-        included_subreddits = set(open(included_subreddits))
-
-    if exclude_phrases == True:
-        tfidf = tfidf.filter(~f.col(term).contains("_"))
-
-    print("creating temporary parquet with matrix indicies")
-    tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
-    tfidf = spark.read.parquet(tempdir.name)
-    subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
-    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-    subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
-    spark.stop()
-
-    print("loading matrix")
-    mat = read_tfidf_matrix(tempdir.name,'term')
-    print('computing similarities')
-    sims = column_similarities(mat)
-    del mat
-    
-    sims = pd.DataFrame(sims.todense())
-    sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
-    sims['subreddit'] = subreddit_names.subreddit.values
-
-    p = Path(outfile)
-
-    output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-    output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-    output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-    sims.to_feather(outfile)
-    tempdir.cleanup()
-    path = "term_tfidf_entriesaukjy5gv.parquet"
-    
-
-# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
-#     '''
-#     Compute similarities between subreddits based on tfi-idf vectors of comment texts 
-    
-#     included_subreddits : string
-#         Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
-
-#     similarity_threshold : double (default = 0)
-#         set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
-# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
-
-#     min_df : int (default = 0.1 * (number of included_subreddits)
-#          exclude terms that appear in fewer than this number of documents.
-
-#     outfile: string
-#          where to output csv and feather outputs
-# '''
-
-#     print(outfile)
-#     print(exclude_phrases)
-
-#     tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
-
-#     if included_subreddits is None:
-#         included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
-#         included_subreddits = {s.strip('\n') for s in included_subreddits}
-
-#     else:
-#         included_subreddits = set(open(included_subreddits))
-
-#     if exclude_phrases == True:
-#         tfidf = tfidf.filter(~f.col(term).contains("_"))
-
-#     sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
-
-#     p = Path(outfile)
-
-#     output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
-#     output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
-#     output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
-
-#     sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
-    
-#     #instead of toLocalMatrix() why not read as entries and put strait into numpy
-#     sim_entries = pd.read_parquet(output_parquet)
-
-#     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
-#     spark.stop()
-#     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
-#     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
-#     df = df.set_index('subreddit_id_new')
-
-#     similarities = sim_entries.join(df, on='i')
-#     similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
-#     similarities = similarities.join(df, on='j')
-#     similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
-
-#     similarities.to_feather(output_feather)
-#     similarities.to_csv(output_csv)
-#     return similarities
-    
-if __name__ == '__main__':
-    fire.Fire(term_cosine_similarities)
diff --git a/tfidf_authors.py b/tfidf_authors.py
deleted file mode 100644 (file)
index 6852fe8..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-from pyspark.sql import SparkSession
-from similarities_helper import build_tfidf_dataset
-import pandas as pd
-
-spark = SparkSession.builder.getOrCreate()
-
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
-
-include_subs = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
-
-#include_subs = set(include_subs.loc[include_subs.comments_rank < 300]['subreddit'])
-
-# remove [deleted] and AutoModerator (TODO remove other bots)
-df = df.filter(df.author != '[deleted]')
-df = df.filter(df.author != 'AutoModerator')
-
-df = build_tfidf_dataset(df, include_subs, 'author')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
-
-spark.stop()
diff --git a/tfidf_comments.py b/tfidf_comments.py
deleted file mode 100644 (file)
index 65d2420..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-from pyspark.sql import functions as f
-from pyspark.sql import SparkSession
-from pyspark.sql import Window
-from similarities_helper import build_tfidf_dataset
-
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks. 
-
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp")
-
-include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
-include_subs = {s.strip('\n') for s in include_subs}
-
-df = build_tfidf_dataset(df, include_subs, 'term')
-
-df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
-spark.stop()

Community Data Science Collective || Want to submit a patch?