]> code.communitydata.science - cdsc_reddit.git/commitdiff
Merge remote-tracking branch 'refs/remotes/origin/excise_reindex' into excise_reindex synced/excise_reindex
authorNathan TeBlunthuis <nathante@uw.edu>
Wed, 6 Apr 2022 18:14:13 +0000 (11:14 -0700)
committerNathan TeBlunthuis <nathante@uw.edu>
Wed, 6 Apr 2022 18:14:13 +0000 (11:14 -0700)
19 files changed:
datasets/checkpoint_parallelsql.sbatch [deleted file]
datasets/comments_2_parquet.sh
datasets/comments_2_parquet_part1.py
datasets/comments_2_parquet_part2.py
datasets/helper.py
datasets/job_script.sh
datasets/submissions_2_parquet.sh [changed mode: 0644->0755]
datasets/submissions_2_parquet_part1.py
dumps/check_comments_shas.py
ngrams/run_tf_jobs.sh
ngrams/sort_tf_comments.py
ngrams/tf_comments.py
ngrams/top_comment_phrases.py [changed mode: 0644->0755]
similarities/Makefile
similarities/job_script.sh
similarities/lsi_similarities.py
similarities/similarities_helper.py
similarities/tfidf.py
similarities/top_subreddits_by_comments.py

diff --git a/datasets/checkpoint_parallelsql.sbatch b/datasets/checkpoint_parallelsql.sbatch
deleted file mode 100644 (file)
index dd61e65..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-## parallel_sql_job.sh
-#SBATCH --job-name=tf_subreddit_comments
-## Allocation Definition
-#SBATCH --account=comdata-ckpt
-#SBATCH --partition=ckpt
-## Resources
-## Nodes. This should always be 1 for parallel-sql.
-#SBATCH --nodes=1    
-## Walltime (12 hours)
-#SBATCH --time=12:00:00
-## Memory per node
-#SBATCH --mem=32G
-#SBATCH --cpus-per-task=4
-#SBATCH --ntasks=1
-#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
-source ./bin/activate
-module load parallel_sql
-echo $(which perl)
-conda list pyarrow
-which python3
-#Put here commands to load other modules (e.g. matlab etc.)
-#Below command means that parallel_sql will get tasks from the database
-#and run them on the node (in parallel). So a 16 core node will have
-#16 tasks running at one time.
-parallel-sql --sql -a parallel --exit-on-term --jobs 4
index 56ecc4d806ebc5e8017d6a2ee237ff4610a53d08..d61eb652f8d36a8b58f9612946cfb6c3a208cf9c 100755 (executable)
@@ -1,10 +1,10 @@
+#!/usr/bin/env bash
 ## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete 
 
-#!/usr/bin/env bash
 echo "#!/usr/bin/bash" > job_script.sh
 #echo "source $(pwd)/../bin/activate" >> job_script.sh
 echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
 
-srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
+srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh
 
 start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py
index d3c7b7cfef3d2d127fd92b5faef98f616210a11a..69609860478b00e39ee23b624422a0dfb0a5ad89 100755 (executable)
@@ -1,12 +1,15 @@
 #!/usr/bin/env python3
+import os
 import json
 from datetime import datetime
 from multiprocessing import Pool
 from itertools import islice
-from helper import find_dumps, open_fileset
+from helper import open_input_file, find_dumps
 import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
+from pathlib import Path
+import fire
 
 def parse_comment(comment, names= None):
     if names is None:
@@ -46,70 +49,63 @@ def parse_comment(comment, names= None):
 
 #    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
 
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
-
-files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
-
-pool = Pool(28)
-
-stream = open_fileset(files)
-
-N = int(1e4)
-
-rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
-
-schema = pa.schema([
-    pa.field('id', pa.string(), nullable=True),
-    pa.field('subreddit', pa.string(), nullable=True),
-    pa.field('link_id', pa.string(), nullable=True),
-    pa.field('parent_id', pa.string(), nullable=True),
-    pa.field('created_utc', pa.timestamp('ms'), nullable=True),
-    pa.field('author', pa.string(), nullable=True),
-    pa.field('ups', pa.int64(), nullable=True),
-    pa.field('downs', pa.int64(), nullable=True),
-    pa.field('score', pa.int64(), nullable=True),
-    pa.field('edited', pa.bool_(), nullable=True),
-    pa.field('time_edited', pa.timestamp('ms'), nullable=True),
-    pa.field('subreddit_type', pa.string(), nullable=True),
-    pa.field('subreddit_id', pa.string(), nullable=True),
-    pa.field('stickied', pa.bool_(), nullable=True),
-    pa.field('is_submitter', pa.bool_(), nullable=True),
-    pa.field('body', pa.string(), nullable=True),
-    pa.field('error', pa.string(), nullable=True),
-])
-
-from pathlib import Path
-p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
-
-if not p.is_dir():
-    if p.exists():
-        p.unlink()
-    p.mkdir()
-
-else:
-    list(map(Path.unlink,p.glob('*')))
-
-part_size = int(1e7)
-part = 1
-n_output = 0
-writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
-
-while True:
-    if n_output > part_size:
-        if part > 1:
-            writer.close()
-
-        part = part + 1
-        n_output = 0
-    
-        writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
-
-    n_output += N
-    chunk = islice(rows,N)
-    pddf = pd.DataFrame(chunk, columns=schema.names)
-    table = pa.Table.from_pandas(pddf,schema=schema)
-    if table.shape[0] == 0:
-        break
-    writer.write_table(table)
-
+def parse_dump(partition):
+
+    dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
+
+    stream = open_input_file(dumpdir)
+    rows = map(parse_comment, stream)
+
+    schema = pa.schema([
+        pa.field('id', pa.string(), nullable=True),
+        pa.field('subreddit', pa.string(), nullable=True),
+        pa.field('link_id', pa.string(), nullable=True),
+        pa.field('parent_id', pa.string(), nullable=True),
+        pa.field('created_utc', pa.timestamp('ms'), nullable=True),
+        pa.field('author', pa.string(), nullable=True),
+        pa.field('ups', pa.int64(), nullable=True),
+        pa.field('downs', pa.int64(), nullable=True),
+        pa.field('score', pa.int64(), nullable=True),
+        pa.field('edited', pa.bool_(), nullable=True),
+        pa.field('time_edited', pa.timestamp('ms'), nullable=True),
+        pa.field('subreddit_type', pa.string(), nullable=True),
+        pa.field('subreddit_id', pa.string(), nullable=True),
+        pa.field('stickied', pa.bool_(), nullable=True),
+        pa.field('is_submitter', pa.bool_(), nullable=True),
+        pa.field('body', pa.string(), nullable=True),
+        pa.field('error', pa.string(), nullable=True),
+    ])
+
+    p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
+    p.mkdir(exist_ok=True,parents=True)
+
+    N=10000
+    with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
+                          schema=schema,
+                          compression='snappy',
+                          flavor='spark') as writer:
+
+        while True:
+            chunk = islice(rows,N)
+            pddf = pd.DataFrame(chunk, columns=schema.names)
+            table = pa.Table.from_pandas(pddf,schema=schema)
+            if table.shape[0] == 0:
+                break
+            writer.write_table(table)
+
+        writer.close()
+
+
+def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
+    files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
+    with open("comments_task_list.sh",'w') as of:
+        for fpath in files:
+            partition = os.path.split(fpath)[1]
+            if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
+                of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
+
+
+if __name__ == '__main__':
+    fire.Fire({'parse_dump':parse_dump,
+              'gen_task_list':gen_task_list})
 
index 0d5cc9edf10df5f19104032b421096014143a233..1031c683f9ae6d94ca157f34832e2398b0af2e85 100755 (executable)
@@ -2,12 +2,19 @@
 
 # spark script to make sorted, and partitioned parquet files 
 
+import pyspark
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
 
 spark = SparkSession.builder.getOrCreate()
 
-df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
+conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
+conf = conf.set("spark.sql.shuffle.partitions",2000)
+conf = conf.set('spark.sql.crossJoin.enabled',"true")
+conf = conf.set('spark.debug.maxToStringFields',200)
+sc = spark.sparkContext
+
+df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy')
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
@@ -21,9 +28,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 df = df.repartition('subreddit')
 df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
 df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
+df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
 
 df = df.repartition('author')
 df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
 df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
+df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
index 8f1dfe2e79bdd3ad29018e34adde7a30e2cac823..db2862811e5ceef61b008b19157fe1fcee9de966 100644 (file)
@@ -24,8 +24,7 @@ def open_fileset(files):
     for fh in files:
         print(fh)
         lines = open_input_file(fh)
-        for line in lines:
-            yield line
+        yield from lines
 
 def open_input_file(input_filename):
     if re.match(r'.*\.7z$', input_filename):
@@ -39,7 +38,7 @@ def open_input_file(input_filename):
     elif re.match(r'.*\.xz', input_filename):
         cmd = ["xzcat",'-dk', '-T 20',input_filename]
     elif re.match(r'.*\.zst',input_filename):
-        cmd = ['zstd','-dck', input_filename]
+        cmd = ['/kloneusr/bin/zstd','-dck', input_filename,  '--memory=2048MB --stdout']
     elif re.match(r'.*\.gz',input_filename):
         cmd = ['gzip','-dc', input_filename]
     try:
index d90b618d3751af0fbe81ba7068ca150b05faf738..5b5a7d30ef47bfdd99e3a22ef50ed1de059b7e01 100755 (executable)
@@ -1,4 +1,4 @@
 #!/usr/bin/bash
 start_spark_cluster.sh
-spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
-stop-all.sh
+singularity exec  /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py 
+singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh
old mode 100644 (file)
new mode 100755 (executable)
index f133069..81a5753
@@ -1,8 +1,8 @@
+#!/usr/bin/env bash
 ## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
 
-#!/usr/bin/env bash
 
-./parse_submissions.sh
+srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list
 
 start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py
 
index 16d1988f0e04ebbbf47796d97fd9b03f70a252f4..77ae09f33ca260261ea919cbf23bbe822aa940c4 100755 (executable)
@@ -3,26 +3,23 @@
 # two stages:
 # 1. from gz to arrow parquet (this script) 
 # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
-
 from datetime import datetime
-from multiprocessing import Pool
+from pathlib import Path
 from itertools import islice
 from helper import find_dumps, open_fileset
 import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
-import simdjson
 import fire
 import os
-
-parser = simdjson.Parser()
+import json
 
 def parse_submission(post, names = None):
     if names is None:
         names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
 
     try:
-        post = parser.parse(post)
+        post = json.loads(post)
     except (ValueError) as e:
         #        print(e)
         #        print(post)
@@ -92,8 +89,7 @@ def parse_dump(partition):
         pa.field('quarantine',pa.bool_(),nullable=True),
         pa.field('error',pa.string(),nullable=True)])
 
-    if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
-        os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
+    Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
 
     with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
         while True:
@@ -108,7 +104,7 @@ def parse_dump(partition):
 
 def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
     files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
-    with open("parse_submissions_task_list",'w') as of:
+    with open("submissions_task_list.sh",'w') as of:
         for fpath in files:
             partition = os.path.split(fpath)[1]
             of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
index dd428be658dff1cccc68f6ee4b674127444ac459..e59a7b83e2ec78e08eb2fb8ab3af36816ac205f8 100755 (executable)
@@ -8,7 +8,7 @@ import hashlib
 shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
 #shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
 
-shasums = shasums1 + shasums2
+shasums = shasums1 
 dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
 
 for l in shasums.strip().split('\n'):
index 0e7d5dd229f0a8c1a3d58255c6f9dd82e2856b18..9ff590f4535dd88a72f0af7bfd33b9959c0997eb 100755 (executable)
@@ -1,8 +1,6 @@
 #!/usr/bin/env bash
-module load parallel_sql
+
 source ./bin/activate
 python3 tf_comments.py gen_task_list
-psu --del --Y
-cat tf_task_list | psu --load
 
 for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;
index abb097ef927c19f1f6bb2469dfa65da9f2f022ca..d9c3e2c496ed262e794a2f287eb5c6af12425ef2 100644 (file)
@@ -2,12 +2,17 @@
 
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
+import fire
 
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/")
+def main(inparquet, outparquet, colname):
+    spark = SparkSession.builder.getOrCreate()
+    df = spark.read.parquet(inparquet)
 
-df = df.repartition(2000,'term')
-df = df.sort(['term','week','subreddit'])
-df = df.sortWithinPartitions(['term','week','subreddit'])
+    df = df.repartition(2000,colname)
+    df = df.sort([colname,'week','subreddit'])
+    df = df.sortWithinPartitions([colname,'week','subreddit'])
 
-df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
+    df.write.parquet(outparquet,mode='overwrite',compression='snappy')
+
+if __name__ == '__main__':
+    fire.Fire(main)
index a40e5d93914a9dbda0f58853a549d5ffd5e98a4e..f472eebbb2538bb4af353fd7fed9a7c5ff3825d2 100755 (executable)
@@ -14,21 +14,29 @@ from nltk.util import ngrams
 import string
 from random import random
 from redditcleaner import clean
+from pathlib import Path
 
 # compute term frequencies for comments in each subreddit by week
-def weekly_tf(partition, mwe_pass = 'first'):
-    dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-    if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
-        os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
+def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
 
-    if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"):
-        os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
+    dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
+    outputdir = Path(outputdir)
+    samppath = outputdir / "reddit_comment_ngrams_10p_sample"
+
+    if not samppath.exists():
+        samppath.mkdir(parents=True, exist_ok=True)
 
     ngram_output = partition.replace("parquet","txt")
 
+    if excluded_users is not None:
+        excluded_users = set(map(str.strip,open(excluded_users)))
+        df = df.filter(~ (f.col("author").isin(excluded_users)))
+
+
+    ngram_path = samppath / ngram_output
     if mwe_pass == 'first':
-        if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
-            os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
+        if ngram_path.exists():
+            ngram_path.unlink()
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -62,8 +70,10 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
+    mwe_path = outputdir / "multiword_expressions.feather"
+
     if mwe_pass != 'first':
-        mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
+        mwe_dataset = pd.read_feather(mwe_path)
         mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
         mwe_phrases = list(mwe_dataset.phrase)
         mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -115,7 +125,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
             for sentence in sentences:
                 if random() <= 0.1:
                     grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
-                    with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
+                    with open(ngram_path,'a') as gram_file:
                         for ng in grams:
                             gram_file.write(' '.join(ng) + '\n')
                 for token in sentence:
@@ -149,8 +159,15 @@ def weekly_tf(partition, mwe_pass = 'first'):
     outrows = tf_comments(subreddit_weeks)
 
     outchunksize = 10000
-
-    with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+    
+    termtf_outputdir = (outputdir / "comment_terms")
+    termtf_outputdir.mkdir(parents=True, exist_ok=True)
+    authortf_outputdir = (outputdir / "comment_authors")
+    authortf_outputdir.mkdir(parents=True, exist_ok=True)    
+    termtf_path = termtf_outputdir / partition
+    authortf_path = authortf_outputdir / partition
+    with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \
+         pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer:
     
         while True:
 
@@ -179,12 +196,12 @@ def weekly_tf(partition, mwe_pass = 'first'):
         author_writer.close()
 
 
-def gen_task_list(mwe_pass='first'):
+def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
-    with open("tf_task_list",'w') as outfile:
+    with open(tf_task_list,'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
-                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
+                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,
old mode 100644 (file)
new mode 100755 (executable)
index 031cba5..ff1c4f0
@@ -1,58 +1,69 @@
+#!/usr/bin/env python3
 from pyspark.sql import functions as f
 from pyspark.sql import Window
 from pyspark.sql import SparkSession
 import numpy as np
+import fire
+from pathlib import Path
 
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
-df = df.withColumnRenamed("value","phrase")
+def main(ngram_dir="/gscratch/comdata/output/reddit_ngrams"):
+    spark = SparkSession.builder.getOrCreate()
+    ngram_dir = Path(ngram_dir)
+    ngram_sample = ngram_dir / "reddit_comment_ngrams_10p_sample"
+    df = spark.read.text(str(ngram_sample))
 
-# count phrase occurrances
-phrases = df.groupby('phrase').count()
-phrases = phrases.withColumnRenamed('count','phraseCount')
-phrases = phrases.filter(phrases.phraseCount > 10)
+    df = df.withColumnRenamed("value","phrase")
 
+    # count phrase occurrances
+    phrases = df.groupby('phrase').count()
+    phrases = phrases.withColumnRenamed('count','phraseCount')
+    phrases = phrases.filter(phrases.phraseCount > 10)
 
-# count overall
-N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
+    # count overall
+    N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
 
-print(f'analyzing PMI on a sample of {N} phrases') 
-logN = np.log(N)
-phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
+    print(f'analyzing PMI on a sample of {N} phrases') 
+    logN = np.log(N)
+    phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
 
-# count term occurrances
-phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
-terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
+    # count term occurrances
+    phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
+    terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
 
-win = Window.partitionBy('term')
-terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
-terms = terms.withColumnRenamed('count','termCount')
-terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
+    win = Window.partitionBy('term')
+    terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
+    terms = terms.withColumnRenamed('count','termCount')
+    terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
 
-terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
-terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
-terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
+    terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
+    terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
+    terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
 
-# join phrases to term counts
+    # join phrases to term counts
 
 
-df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
+    df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
 
-df = df.sort(['phrasePWMI'],descending=True)
-df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
-df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
+    df = df.sort(['phrasePWMI'],descending=True)
+    df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
 
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/")
+    pwmi_dir = ngram_dir / "reddit_comment_ngrams_pwmi.parquet/"
+    df.write.parquet(str(pwmi_dir), mode='overwrite', compression='snappy')
 
-df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
+    df = spark.read.parquet(str(pwmi_dir))
 
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet")
-df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
+    df.write.csv(str(ngram_dir / "reddit_comment_ngrams_pwmi.csv/"),mode='overwrite',compression='none')
 
-# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
-#
-df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
-df = df.toPandas()
-df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather")
-df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")
+    df = spark.read.parquet(str(pwmi_dir))
+    df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
+
+    # choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
+    #
+    df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
+    df = df.toPandas()
+    df.to_feather(ngram_dir / "multiword_expressions.feather")
+    df.to_csv(ngram_dir / "multiword_expressions.csv")
+
+if __name__ == '__main__':
+    fire.Fire(main)
index f578fd5105a86af777fc4ee9c5868bb94f4405cc..963192de392a7e1ec3bd2a17d52832fae8f5d2b0 100644 (file)
@@ -1,8 +1,10 @@
+
 #all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
-srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
-srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
-base_data=/gscratch/comdata/output
-similarity_data=${base_data}/reddit_similarity
+# srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
+# srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
+srun=srun -p compute-bigmem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40 
+srun_huge=srun -p compute-hugemem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40 
+similarity_data=/gscratch/scrubbed/comdata/reddit_similarity
 tfidf_data=${similarity_data}/tfidf
 tfidf_weekly_data=${similarity_data}/tfidf_weekly
 similarity_weekly_data=${similarity_data}/weekly
@@ -10,7 +12,10 @@ lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
 
 lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
 
-all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather  ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
+
+all: ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather  ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather
+
+#all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather  ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
 
 #${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet  ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
 
@@ -18,103 +23,106 @@ all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.pa
 
 # all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
 
-${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet
-       ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
+${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms.parquet
+        ${srun} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
 
 ${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
+        ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
 
 ${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
+        ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
 
 ${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200
+        ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 --inpath=$<
 
 ${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000
+        ${srun_huge} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 --inpath=$<
 
 ${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000
+        ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 --inpath=$<
 
 ${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000
+        ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 --inpath=$<
 
 ${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
+        ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
 
 ${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
+        ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
 
-${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000
+${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
+        ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 --inpath=$<
 
-${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
+${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
+        ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
 
 ${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
+        ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
 
 ${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
+        ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
 
 ${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
+        ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
 
 ${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
+        ${srun} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
 
 ${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
-       ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
+        ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
+
+${similarity_data}/subreddits_by_num_comments_nonsfw.csv:
+       start_spark_and_run.sh 3 top_subreddits_by_comments.py
 
-${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather 
+${tfidf_data}/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py terms --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_terms_100k.parquet
 
-${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather
+${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py terms --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_terms_30k.feather
 
-${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather
+${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py terms --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_terms_10k.feather
 
-${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather
+${tfidf_data}/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py authors --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_authors_100k.parquet
 
-${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet
+${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py authors --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_authors_10k.parquet
 
-${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       mkdir -p ${tfidf_data}/
-       start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet
+${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+#      mkdir -p ${tfidf_data}/
+       start_spark_and_run.sh 3 tfidf.py authors --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_authors_30k.parquet
 
-${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
+${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+       start_spark_and_run.sh 3 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
 
 ${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
-       start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
+       start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=100000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
 
-${tfidf_weekly_data}/comment_terms_30k.parquet:  /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
+${tfidf_weekly_data}/comment_terms_30k.parquet:  /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+       start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
 
-${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
-       start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
+${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
+       start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
 
 ${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
-       ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
+        ${srun} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_terms_100k.parquet
 
-${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
-       ${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
+${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
+        ${srun} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
 
-${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
-       ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
+${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
+        ${srun} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
 
-${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
-       ${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
+,${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
+        ${srun} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
 
-# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
+# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv
 #      start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
 
 # /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet 
index 0c37103e2735c8af4a169c75a234ec4f2ea1ed96..1158ff052f4cbef2ca23efb79b98e1778d4ee18f 100755 (executable)
@@ -1,4 +1,4 @@
 #!/usr/bin/bash
 start_spark_cluster.sh
-singularity exec  /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py 
-singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh
+singularity exec  /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 tfidf.py authors --topN=100000 --inpath=/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet --outpath=/gscratch/scrubbed/comdata/reddit_similarity/tfidf/comment_authors_100k.parquet
+singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh
index 565e53fa33a9ae8c5af8062ceb892cbca31226c6..57a2d0d6e25fb27d9a083df63b282ac01ecff9e5 100644 (file)
@@ -21,12 +21,13 @@ from functools import partial
 
 def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
     print(n_components,flush=True)
+
         
     if lsi_model is None:
         if type(n_components) == list:
-            lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}s_LSIMOD.pkl'
+            lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}_LSIMOD.pkl'
         else:
-            lsi_model = Path(outfile) / f'{n_components}_{term_colname}s_LSIMOD.pkl'
+            lsi_model = Path(outfile) / f'{n_components}_{term_colname}_LSIMOD.pkl'
 
     simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model)
 
index 202220c389653de068bc52320c50c249bd18d280..03c10b2310d3984e120eefcc23a6b3d4878bf113 100644 (file)
@@ -43,7 +43,7 @@ def reindex_tfidf(*args, **kwargs):
     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
     new_ids = new_ids.set_index('subreddit_id')
     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
-    subreddit_names = subreddit_names.drop("subreddit_id",axis=1)
+    subreddit_names = subreddit_names.drop("subreddit_id",1)
     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
     return(df, subreddit_names)
 
@@ -51,9 +51,8 @@ def pull_tfidf(*args, **kwargs):
     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
     return df
 
-def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
-    print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True)
-
+def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
+    print(f"loading tfidf {infile}", flush=True)
     if week is not None:
         tfidf_ds = ds.dataset(infile, partitioning='hive')
     else: 
@@ -95,23 +94,23 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
         projection = {
             'subreddit_id':ds.field('subreddit_id'),
             term_id:ds.field(term_id),
+            'relative_tf':ds.field('relative_tf').cast('float32'),
             'tf_idf':ds.field('tf_idf').cast('float32')}
 
-    print(projection, flush=True)
-    print(ds_filter, flush=True)
+        print(projection)
+
     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
 
     df = df.to_pandas(split_blocks=True,self_destruct=True)
-
+    print("assigning indexes",flush=True)
     if reindex:
-        print("assigning indexes",flush=True)
-        df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1
+        df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
     else:
         df['subreddit_id_new'] = df['subreddit_id']
 
     if reindex:
         grouped = df.groupby(term_id)
-        df[term_id_new] = grouped.ngroup() + 1 
+        df[term_id_new] = grouped.ngroup()
     else:
         df[term_id_new] = df[term_id]
 
@@ -127,17 +126,17 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
 
     return (df, tfidf_ds, ds_filter)
 
-    with Pool(cpu_count()) as pool:
-        chunks = pool.imap_unordered(pull_names,batches) 
-        subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
+    with Pool(cpu_count()) as pool:
+        chunks = pool.imap_unordered(pull_names,batches) 
+        subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
 
-    subreddit_names = subreddit_names.set_index("subreddit_id")
-    new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
-    new_ids = new_ids.set_index('subreddit_id')
-    subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
-    subreddit_names = subreddit_names.drop("subreddit_id",1)
-    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
-    return(df, subreddit_names)
+    subreddit_names = subreddit_names.set_index("subreddit_id")
+    new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
+    new_ids = new_ids.set_index('subreddit_id')
+    subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
+    subreddit_names = subreddit_names.drop("subreddit_id",1)
+    subreddit_names = subreddit_names.sort_values("subreddit_id_new")
+    return(df, subreddit_names)
 
 def pull_names(batch):
     return(batch.to_pandas().drop_duplicates())
@@ -171,7 +170,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non
     term_id_new = term + '_id_new'
 
     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
-    mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
+    mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
 
     print("loading matrix")        
 
@@ -239,8 +238,7 @@ def test_lsi_sims():
 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
 # this function takes the svd and then the column similarities of it
-# lsi_model_load = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
-def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model=None):
+def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
     # first compute the lsi of the matrix
     # then take the column similarities
 
@@ -251,24 +249,29 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196
     
     svd_components = n_components[0]
     
-    if lsi_model is None:
+    if lsi_model_load is not None and Path(lsi_model_load).exists():
+        print("loading LSI")
+        mod = pickle.load(open(lsi_model_load ,'rb'))
+        lsi_model_save = lsi_model_load
+
+    else:
         print("running LSI",flush=True)
+
         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
         mod = svd.fit(tfidfmat.T)
-    else:
-        mod = lsi_model
 
     lsimat = mod.transform(tfidfmat.T)
     if lsi_model_save is not None:
-        Path(lsi_model_save).parent.mkdir(exist_ok=True,parents=True)
+        Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
         pickle.dump(mod, open(lsi_model_save,'wb'))
 
-    print(n_components)
+    sims_list = []
     for n_dims in n_components:
-        print("computing similarities")
         sims = column_similarities(lsimat[:,np.arange(n_dims)])
-        yield (sims, n_dims)
-
+        if len(n_components) > 1:
+            yield (sims, n_dims)
+        else:
+            return sims
     
 
 def column_similarities(mat):
@@ -324,11 +327,11 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
     else: # tf_fam = tf_weight.Norm05
         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
 
-    df = df.repartition('week')
+    df = df.repartition(400,'subreddit','week')
     dfwriter = df.write.partitionBy("week")
     return dfwriter
 
-def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
+def _calc_tfidf(df, term_colname, tf_family):
     term = term_colname
     term_id = term + '_id'
 
@@ -346,13 +349,7 @@ def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
 
     # collect the dictionary to make a pydict of terms to indexes
-    terms = idf
-    if min_df is not None:
-        terms = terms.filter(f.col('count')>=min_df)
-    if max_df is not None:
-        terms = terms.filter(f.col('count')<=max_df)
-    
-    terms = terms.select(term).distinct() # terms are distinct
+    terms = idf.select(term).distinct() # terms are distinct
     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
 
     # make subreddit ids
@@ -362,12 +359,12 @@ def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
     df = df.join(subreddits,on='subreddit')
 
     # map terms to indexes in the tfs and the idfs
-    df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique
+    df = df.join(terms,on=term) # subreddit-term-id is unique
 
-    idf = idf.join(terms,on=term,how='inner')
+    idf = idf.join(terms,on=term)
 
     # join on subreddit/term to create tf/dfs indexed by term
-    df = df.join(idf, on=[term_id, term],how='inner')
+    df = df.join(idf, on=[term_id, term])
 
     # agg terms by subreddit to make sparse tf/df vectors
     if tf_family == tf_weight.MaxTF:
@@ -378,14 +375,14 @@ def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
     return df
     
 
-def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None):
+def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
     term = term_colname
     term_id = term + '_id'
-
+    # aggregate counts by week. now subreddit-term is distinct
     df = df.filter(df.subreddit.isin(include_subs))
     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
 
-    df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df)
+    df = _calc_tfidf(df, term_colname, tf_family)
     df = df.repartition('subreddit')
     dfwriter = df.write
     return dfwriter
index 33562994afc9d50163c41d46361cc9e092900346..c44fd0ddbf14d49f7c96e9f4be92c03bcd5b4c96 100644 (file)
@@ -78,8 +78,7 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_
                          static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet",
                          outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
                          topN=None,
-                         included_subreddits=None
-                         ):
+                         included_subreddits=None):
 
     return tfidf_weekly(inpath,
                         outpath,
@@ -94,8 +93,7 @@ def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_te
                        static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet",
                        outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
                        topN=None,
-                       included_subreddits=None
-                       ):
+                       included_subreddits=None):
 
 
     return tfidf_weekly(inpath,
index ff9293c209f1f86ecdd0c34a4f282c2cae8eb08c..9a4d7d302e5f4dfef120f61a6a59699b1f1ed480 100644 (file)
@@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%"))
 df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
 
 df = df.join(prop_nsfw,on='subreddit')
-#df = df.filter(df.prop_nsfw < 0.5)
+df = df.filter(df.prop_nsfw < 0.5)
 
 win = Window.orderBy(f.col('n_comments').desc())
 df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +26,4 @@ df = df.toPandas()
 
 df = df.sort_values("n_comments")
 
-df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False)
+df.to_csv('/gscratch/scrubbed/comdata/reddit_similarity/subreddits_by_num_comments_nonsfw.csv', index=False)

Community Data Science Collective || Want to submit a patch?