From: Nathan TeBlunthuis Date: Wed, 6 Apr 2022 18:11:11 +0000 (-0700) Subject: git-annex in X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/197518a222a321a8027c3dc5a4121350c47d0779?ds=inline;hp=-c git-annex in --- 197518a222a321a8027c3dc5a4121350c47d0779 diff --git a/datasets/checkpoint_parallelsql.sbatch b/datasets/checkpoint_parallelsql.sbatch deleted file mode 100644 index dd61e65..0000000 --- a/datasets/checkpoint_parallelsql.sbatch +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -## parallel_sql_job.sh -#SBATCH --job-name=tf_subreddit_comments -## Allocation Definition -#SBATCH --account=comdata-ckpt -#SBATCH --partition=ckpt -## Resources -## Nodes. This should always be 1 for parallel-sql. -#SBATCH --nodes=1 -## Walltime (12 hours) -#SBATCH --time=12:00:00 -## Memory per node -#SBATCH --mem=32G -#SBATCH --cpus-per-task=4 -#SBATCH --ntasks=1 -#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit -source ./bin/activate -module load parallel_sql -echo $(which perl) -conda list pyarrow -which python3 -#Put here commands to load other modules (e.g. matlab etc.) -#Below command means that parallel_sql will get tasks from the database -#and run them on the node (in parallel). So a 16 core node will have -#16 tasks running at one time. -parallel-sql --sql -a parallel --exit-on-term --jobs 4 diff --git a/datasets/comments_2_parquet.sh b/datasets/comments_2_parquet.sh index 56ecc4d..d61eb65 100755 --- a/datasets/comments_2_parquet.sh +++ b/datasets/comments_2_parquet.sh @@ -1,10 +1,10 @@ +#!/usr/bin/env bash ## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete -#!/usr/bin/env bash echo "#!/usr/bin/bash" > job_script.sh #echo "source $(pwd)/../bin/activate" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh -srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh +srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py diff --git a/datasets/comments_2_parquet_part1.py b/datasets/comments_2_parquet_part1.py index d3c7b7c..6960986 100755 --- a/datasets/comments_2_parquet_part1.py +++ b/datasets/comments_2_parquet_part1.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 +import os import json from datetime import datetime from multiprocessing import Pool from itertools import islice -from helper import find_dumps, open_fileset +from helper import open_input_file, find_dumps import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +from pathlib import Path +import fire def parse_comment(comment, names= None): if names is None: @@ -46,70 +49,63 @@ def parse_comment(comment, names= None): # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" - -files = list(find_dumps(dumpdir, base_pattern="RC_20*")) - -pool = Pool(28) - -stream = open_fileset(files) - -N = int(1e4) - -rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) - -schema = pa.schema([ - pa.field('id', pa.string(), nullable=True), - pa.field('subreddit', pa.string(), nullable=True), - pa.field('link_id', pa.string(), nullable=True), - pa.field('parent_id', pa.string(), nullable=True), - pa.field('created_utc', pa.timestamp('ms'), nullable=True), - pa.field('author', pa.string(), nullable=True), - pa.field('ups', pa.int64(), nullable=True), - pa.field('downs', pa.int64(), nullable=True), - pa.field('score', pa.int64(), nullable=True), - pa.field('edited', pa.bool_(), nullable=True), - pa.field('time_edited', pa.timestamp('ms'), nullable=True), - pa.field('subreddit_type', pa.string(), nullable=True), - pa.field('subreddit_id', pa.string(), nullable=True), - pa.field('stickied', pa.bool_(), nullable=True), - pa.field('is_submitter', pa.bool_(), nullable=True), - pa.field('body', pa.string(), nullable=True), - pa.field('error', pa.string(), nullable=True), -]) - -from pathlib import Path -p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") - -if not p.is_dir(): - if p.exists(): - p.unlink() - p.mkdir() - -else: - list(map(Path.unlink,p.glob('*'))) - -part_size = int(1e7) -part = 1 -n_output = 0 -writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - -while True: - if n_output > part_size: - if part > 1: - writer.close() - - part = part + 1 - n_output = 0 - - writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - - n_output += N - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) - +def parse_dump(partition): + + dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}" + + stream = open_input_file(dumpdir) + rows = map(parse_comment, stream) + + schema = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('link_id', pa.string(), nullable=True), + pa.field('parent_id', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('is_submitter', pa.bool_(), nullable=True), + pa.field('body', pa.string(), nullable=True), + pa.field('error', pa.string(), nullable=True), + ]) + + p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet") + p.mkdir(exist_ok=True,parents=True) + + N=10000 + with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet", + schema=schema, + compression='snappy', + flavor='spark') as writer: + + while True: + chunk = islice(rows,N) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + + writer.close() + + +def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True): + files = list(find_dumps(dumpdir,base_pattern="RC_20*.*")) + with open("comments_task_list.sh",'w') as of: + for fpath in files: + partition = os.path.split(fpath)[1] + if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True): + of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n') + + +if __name__ == '__main__': + fire.Fire({'parse_dump':parse_dump, + 'gen_task_list':gen_task_list}) diff --git a/datasets/comments_2_parquet_part2.py b/datasets/comments_2_parquet_part2.py index 0d5cc9e..1031c68 100755 --- a/datasets/comments_2_parquet_part2.py +++ b/datasets/comments_2_parquet_part2.py @@ -2,12 +2,19 @@ # spark script to make sorted, and partitioned parquet files +import pyspark from pyspark.sql import functions as f from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy') +conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet") +conf = conf.set("spark.sql.shuffle.partitions",2000) +conf = conf.set('spark.sql.crossJoin.enabled',"true") +conf = conf.set('spark.debug.maxToStringFields',200) +sc = spark.sparkContext + +df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy') df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') @@ -21,9 +28,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) df = df.repartition('subreddit') df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy') +df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy') df = df.repartition('author') df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy') +df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') diff --git a/datasets/helper.py b/datasets/helper.py index 8f1dfe2..db28628 100644 --- a/datasets/helper.py +++ b/datasets/helper.py @@ -24,8 +24,7 @@ def open_fileset(files): for fh in files: print(fh) lines = open_input_file(fh) - for line in lines: - yield line + yield from lines def open_input_file(input_filename): if re.match(r'.*\.7z$', input_filename): @@ -39,7 +38,7 @@ def open_input_file(input_filename): elif re.match(r'.*\.xz', input_filename): cmd = ["xzcat",'-dk', '-T 20',input_filename] elif re.match(r'.*\.zst',input_filename): - cmd = ['zstd','-dck', input_filename] + cmd = ['/kloneusr/bin/zstd','-dck', input_filename, '--memory=2048MB --stdout'] elif re.match(r'.*\.gz',input_filename): cmd = ['gzip','-dc', input_filename] try: diff --git a/datasets/job_script.sh b/datasets/job_script.sh index d90b618..5b5a7d3 100755 --- a/datasets/job_script.sh +++ b/datasets/job_script.sh @@ -1,4 +1,4 @@ #!/usr/bin/bash start_spark_cluster.sh -spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000 -stop-all.sh +singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py +singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh diff --git a/datasets/submissions_2_parquet.sh b/datasets/submissions_2_parquet.sh old mode 100644 new mode 100755 index f133069..81a5753 --- a/datasets/submissions_2_parquet.sh +++ b/datasets/submissions_2_parquet.sh @@ -1,8 +1,8 @@ +#!/usr/bin/env bash ## this should be run manually since we don't have a nice way to wait on parallel_sql jobs -#!/usr/bin/env bash -./parse_submissions.sh +srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py diff --git a/datasets/submissions_2_parquet_part1.py b/datasets/submissions_2_parquet_part1.py index 16d1988..77ae09f 100755 --- a/datasets/submissions_2_parquet_part1.py +++ b/datasets/submissions_2_parquet_part1.py @@ -3,26 +3,23 @@ # two stages: # 1. from gz to arrow parquet (this script) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) - from datetime import datetime -from multiprocessing import Pool +from pathlib import Path from itertools import islice from helper import find_dumps, open_fileset import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -import simdjson import fire import os - -parser = simdjson.Parser() +import json def parse_submission(post, names = None): if names is None: names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] try: - post = parser.parse(post) + post = json.loads(post) except (ValueError) as e: # print(e) # print(post) @@ -92,8 +89,7 @@ def parse_dump(partition): pa.field('quarantine',pa.bool_(),nullable=True), pa.field('error',pa.string(),nullable=True)]) - if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"): - os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/") + Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True) with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer: while True: @@ -108,7 +104,7 @@ def parse_dump(partition): def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"): files = list(find_dumps(dumpdir,base_pattern="RS_20*.*")) - with open("parse_submissions_task_list",'w') as of: + with open("submissions_task_list.sh",'w') as of: for fpath in files: partition = os.path.split(fpath)[1] of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n') diff --git a/dumps/check_comments_shas.py b/dumps/check_comments_shas.py index dd428be..e59a7b8 100755 --- a/dumps/check_comments_shas.py +++ b/dumps/check_comments_shas.py @@ -8,7 +8,7 @@ import hashlib shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text #shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text -shasums = shasums1 + shasums2 +shasums = shasums1 dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" for l in shasums.strip().split('\n'): diff --git a/ngrams/run_tf_jobs.sh b/ngrams/run_tf_jobs.sh index 0e7d5dd..9ff590f 100755 --- a/ngrams/run_tf_jobs.sh +++ b/ngrams/run_tf_jobs.sh @@ -1,8 +1,6 @@ #!/usr/bin/env bash -module load parallel_sql + source ./bin/activate python3 tf_comments.py gen_task_list -psu --del --Y -cat tf_task_list | psu --load for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done; diff --git a/ngrams/sort_tf_comments.py b/ngrams/sort_tf_comments.py index abb097e..d9c3e2c 100644 --- a/ngrams/sort_tf_comments.py +++ b/ngrams/sort_tf_comments.py @@ -2,12 +2,17 @@ from pyspark.sql import functions as f from pyspark.sql import SparkSession +import fire -spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/") +def main(inparquet, outparquet, colname): + spark = SparkSession.builder.getOrCreate() + df = spark.read.parquet(inparquet) -df = df.repartition(2000,'term') -df = df.sort(['term','week','subreddit']) -df = df.sortWithinPartitions(['term','week','subreddit']) + df = df.repartition(2000,colname) + df = df.sort([colname,'week','subreddit']) + df = df.sortWithinPartitions([colname,'week','subreddit']) -df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') + df.write.parquet(outparquet,mode='overwrite',compression='snappy') + +if __name__ == '__main__': + fire.Fire(main) diff --git a/ngrams/tf_comments.py b/ngrams/tf_comments.py index a40e5d9..f472eeb 100755 --- a/ngrams/tf_comments.py +++ b/ngrams/tf_comments.py @@ -14,21 +14,29 @@ from nltk.util import ngrams import string from random import random from redditcleaner import clean +from pathlib import Path # compute term frequencies for comments in each subreddit by week -def weekly_tf(partition, mwe_pass = 'first'): - dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') - if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"): - os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") +def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None): - if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"): - os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") + dataset = ds.dataset(Path(input_dir)/partition, format='parquet') + outputdir = Path(outputdir) + samppath = outputdir / "reddit_comment_ngrams_10p_sample" + + if not samppath.exists(): + samppath.mkdir(parents=True, exist_ok=True) ngram_output = partition.replace("parquet","txt") + if excluded_users is not None: + excluded_users = set(map(str.strip,open(excluded_users))) + df = df.filter(~ (f.col("author").isin(excluded_users))) + + + ngram_path = samppath / ngram_output if mwe_pass == 'first': - if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"): - os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}") + if ngram_path.exists(): + ngram_path.unlink() batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) @@ -62,8 +70,10 @@ def weekly_tf(partition, mwe_pass = 'first'): subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) + mwe_path = outputdir / "multiword_expressions.feather" + if mwe_pass != 'first': - mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather') + mwe_dataset = pd.read_feather(mwe_path) mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False) mwe_phrases = list(mwe_dataset.phrase) mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases] @@ -115,7 +125,7 @@ def weekly_tf(partition, mwe_pass = 'first'): for sentence in sentences: if random() <= 0.1: grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4)))) - with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file: + with open(ngram_path,'a') as gram_file: for ng in grams: gram_file.write(' '.join(ng) + '\n') for token in sentence: @@ -149,8 +159,15 @@ def weekly_tf(partition, mwe_pass = 'first'): outrows = tf_comments(subreddit_weeks) outchunksize = 10000 - - with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: + + termtf_outputdir = (outputdir / "comment_terms") + termtf_outputdir.mkdir(parents=True, exist_ok=True) + authortf_outputdir = (outputdir / "comment_authors") + authortf_outputdir.mkdir(parents=True, exist_ok=True) + termtf_path = termtf_outputdir / partition + authortf_path = authortf_outputdir / partition + with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \ + pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer: while True: @@ -179,12 +196,12 @@ def weekly_tf(partition, mwe_pass = 'first'): author_writer.close() -def gen_task_list(mwe_pass='first'): +def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None): files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") - with open("tf_task_list",'w') as outfile: + with open(tf_task_list,'w') as outfile: for f in files: if f.endswith(".parquet"): - outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n") + outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n") if __name__ == "__main__": fire.Fire({"gen_task_list":gen_task_list, diff --git a/ngrams/top_comment_phrases.py b/ngrams/top_comment_phrases.py old mode 100644 new mode 100755 index 031cba5..ff1c4f0 --- a/ngrams/top_comment_phrases.py +++ b/ngrams/top_comment_phrases.py @@ -1,58 +1,69 @@ +#!/usr/bin/env python3 from pyspark.sql import functions as f from pyspark.sql import Window from pyspark.sql import SparkSession import numpy as np +import fire +from pathlib import Path -spark = SparkSession.builder.getOrCreate() -df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") -df = df.withColumnRenamed("value","phrase") +def main(ngram_dir="/gscratch/comdata/output/reddit_ngrams"): + spark = SparkSession.builder.getOrCreate() + ngram_dir = Path(ngram_dir) + ngram_sample = ngram_dir / "reddit_comment_ngrams_10p_sample" + df = spark.read.text(str(ngram_sample)) -# count phrase occurrances -phrases = df.groupby('phrase').count() -phrases = phrases.withColumnRenamed('count','phraseCount') -phrases = phrases.filter(phrases.phraseCount > 10) + df = df.withColumnRenamed("value","phrase") + # count phrase occurrances + phrases = df.groupby('phrase').count() + phrases = phrases.withColumnRenamed('count','phraseCount') + phrases = phrases.filter(phrases.phraseCount > 10) -# count overall -N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount + # count overall + N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount -print(f'analyzing PMI on a sample of {N} phrases') -logN = np.log(N) -phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN) + print(f'analyzing PMI on a sample of {N} phrases') + logN = np.log(N) + phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN) -# count term occurrances -phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' ')) -terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')]) + # count term occurrances + phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' ')) + terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')]) -win = Window.partitionBy('term') -terms = terms.withColumn('termCount',f.sum('phraseCount').over(win)) -terms = terms.withColumnRenamed('count','termCount') -terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN) + win = Window.partitionBy('term') + terms = terms.withColumn('termCount',f.sum('phraseCount').over(win)) + terms = terms.withColumnRenamed('count','termCount') + terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN) -terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb') -terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb') -terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb')) + terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb') + terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb') + terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb')) -# join phrases to term counts + # join phrases to term counts -df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI']) + df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI']) -df = df.sort(['phrasePWMI'],descending=True) -df = df.sortWithinPartitions(['phrasePWMI'],descending=True) -df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy') + df = df.sort(['phrasePWMI'],descending=True) + df = df.sortWithinPartitions(['phrasePWMI'],descending=True) -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/") + pwmi_dir = ngram_dir / "reddit_comment_ngrams_pwmi.parquet/" + df.write.parquet(str(pwmi_dir), mode='overwrite', compression='snappy') -df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none') + df = spark.read.parquet(str(pwmi_dir)) -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet") -df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI') + df.write.csv(str(ngram_dir / "reddit_comment_ngrams_pwmi.csv/"),mode='overwrite',compression='none') -# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions. -# -df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3) -df = df.toPandas() -df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather") -df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv") + df = spark.read.parquet(str(pwmi_dir)) + df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI') + + # choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions. + # + df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3) + df = df.toPandas() + df.to_feather(ngram_dir / "multiword_expressions.feather") + df.to_csv(ngram_dir / "multiword_expressions.csv") + +if __name__ == '__main__': + fire.Fire(main) diff --git a/similarities/Makefile b/similarities/Makefile index f578fd5..963192d 100644 --- a/similarities/Makefile +++ b/similarities/Makefile @@ -1,8 +1,10 @@ + #all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet -srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh -srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh -base_data=/gscratch/comdata/output -similarity_data=${base_data}/reddit_similarity +# srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh +# srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh +srun=srun -p compute-bigmem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40 +srun_huge=srun -p compute-hugemem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40 +similarity_data=/gscratch/scrubbed/comdata/reddit_similarity tfidf_data=${similarity_data}/tfidf tfidf_weekly_data=${similarity_data}/tfidf_weekly similarity_weekly_data=${similarity_data}/weekly @@ -10,7 +12,10 @@ lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500] lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI -all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet + +all: ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather + +#all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet #${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet @@ -18,103 +23,106 @@ all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.pa # all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet -${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet - ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet +${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms.parquet + ${srun} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet ${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000 + ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000 ${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200 + ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200 ${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 + ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 --inpath=$< ${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 + ${srun_huge} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 --inpath=$< ${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 + ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 --inpath=$< ${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 + ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 --inpath=$< ${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2 + ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$< ${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2 + ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$< -${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 +${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py + ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 --inpath=$< -${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000 +${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py + ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000 ${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2 + ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$< ${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2 + ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$< ${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000 + ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000 ${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000 + ${srun} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000 ${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py - ${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000 + ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000 + +${similarity_data}/subreddits_by_num_comments_nonsfw.csv: + start_spark_and_run.sh 3 top_subreddits_by_comments.py -${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather +${tfidf_data}/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py terms --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_terms_100k.parquet -${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather +${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py terms --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_terms_30k.feather -${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather +${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py terms --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_terms_10k.feather -${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather +${tfidf_data}/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py authors --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_authors_100k.parquet -${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet +${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py authors --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_authors_10k.parquet -${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv - mkdir -p ${tfidf_data}/ - start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet +${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv +# mkdir -p ${tfidf_data}/ + start_spark_and_run.sh 3 tfidf.py authors --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_authors_30k.parquet -${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet +${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv + start_spark_and_run.sh 3 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet ${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv - start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet + start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=100000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet -${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet +${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv + start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet -${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv - start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet +${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv + start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet - ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet + ${srun} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_terms_100k.parquet -${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet - ${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet +${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_100k.parquet + ${srun} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet -${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet - ${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet +${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms_30k.parquet + ${srun} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet -${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet - ${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet +,${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_30k.parquet + ${srun} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet -# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv +# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv # start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000 # /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet diff --git a/similarities/job_script.sh b/similarities/job_script.sh index 0c37103..1158ff0 100755 --- a/similarities/job_script.sh +++ b/similarities/job_script.sh @@ -1,4 +1,4 @@ #!/usr/bin/bash start_spark_cluster.sh -singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py -singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh +singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 tfidf.py authors --topN=100000 --inpath=/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet --outpath=/gscratch/scrubbed/comdata/reddit_similarity/tfidf/comment_authors_100k.parquet +singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh diff --git a/similarities/lsi_similarities.py b/similarities/lsi_similarities.py index eb89f55..493755f 100644 --- a/similarities/lsi_similarities.py +++ b/similarities/lsi_similarities.py @@ -5,19 +5,20 @@ from similarities_helper import * #from similarities_helper import similarities, lsi_column_similarities from functools import partial -inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/" -term_colname='term' -outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI' -n_components=[10,50,100] -included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" -n_iter=5 -random_state=1968 -algorithm='arpack' -topN = None -from_date=None -to_date=None -min_df=None -max_df=None +# inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/" +# term_colname='term' +# outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI' +# n_components=[10,50,100] +# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" +# n_iter=5 +# random_state=1968 +# algorithm='arpack' +# topN = None +# from_date=None +# to_date=None +# min_df=None +# max_df=None + def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None): print(n_components,flush=True) @@ -62,7 +63,7 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t n_components=n_components ) -def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968): +def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968): return lsi_similarities(inpath, 'author', outfile, diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index d97e519..03c10b2 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -262,6 +262,7 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196 lsimat = mod.transform(tfidfmat.T) if lsi_model_save is not None: + Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True) pickle.dump(mod, open(lsi_model_save,'wb')) sims_list = [] diff --git a/similarities/tfidf.py b/similarities/tfidf.py index 01b0b20..bbae528 100644 --- a/similarities/tfidf.py +++ b/similarities/tfidf.py @@ -4,7 +4,7 @@ from pyspark.sql import functions as f from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits): - spark = SparkSession.builder.getOrCreate()y + spark = SparkSession.builder.getOrCreate() df = spark.read.parquet(inpath) diff --git a/similarities/top_subreddits_by_comments.py b/similarities/top_subreddits_by_comments.py index ff9293c..9a4d7d3 100644 --- a/similarities/top_subreddits_by_comments.py +++ b/similarities/top_subreddits_by_comments.py @@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%")) df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments")) df = df.join(prop_nsfw,on='subreddit') -#df = df.filter(df.prop_nsfw < 0.5) +df = df.filter(df.prop_nsfw < 0.5) win = Window.orderBy(f.col('n_comments').desc()) df = df.withColumn('comments_rank', f.rank().over(win)) @@ -26,4 +26,4 @@ df = df.toPandas() df = df.sort_values("n_comments") -df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False) +df.to_csv('/gscratch/scrubbed/comdata/reddit_similarity/subreddits_by_num_comments_nonsfw.csv', index=False)