From 6baa08889b2f46c14f2baa5e3d2136cf165b1673 Mon Sep 17 00:00:00 2001 From: Nate E TeBlunthuis Date: Wed, 11 Nov 2020 16:39:44 -0800 Subject: [PATCH] git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit --- author_cosine_similarity.py | 11 ++++---- checkpoint_parallelsql.sbatch | 6 ++++- comments_2_parquet.sh | 2 +- comments_2_parquet_part1.py | 51 +++++++++++++++++++++++++---------- comments_2_parquet_part2.py | 6 ++--- helper.py | 2 +- term_cosine_similarity.py | 5 ++-- tf_comments.py | 24 ++++++++++------- tfidf_authors.py | 10 +++---- tfidf_comments.py | 1 + 10 files changed, 76 insertions(+), 42 deletions(-) diff --git a/author_cosine_similarity.py b/author_cosine_similarity.py index 7b2a766..7137da4 100644 --- a/author_cosine_similarity.py +++ b/author_cosine_similarity.py @@ -13,7 +13,7 @@ spark = SparkSession.builder.getOrCreate() conf = spark.sparkContext.getConf() # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; -def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True): +def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500): ''' Compute similarities between subreddits based on tfi-idf vectors of author comments @@ -32,9 +32,8 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get ''' print(outfile) - print(exclude_phrases) - tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet_test1/part-00000-107cee94-92d8-4265-b804-40f1e7f1aaf2-c000.snappy.parquet') + tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet') if included_subreddits is None: included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) @@ -55,12 +54,14 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get sim_dist = sim_dist.repartition(1) sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - spark.stop() + #instead of toLocalMatrix() why not read as entries and put strait into numpy sim_entries = pd.read_parquet(output_parquet) df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() + + spark.stop() df['subreddit_id_new'] = df['subreddit_id_new'] - 1 df = df.sort_values('subreddit_id_new').reset_index(drop=True) df = df.set_index('subreddit_id_new') @@ -75,4 +76,4 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get return similarities if __name__ == '__main__': - fire.Fire(term_cosine_similarities) + fire.Fire(author_cosine_similarities) diff --git a/checkpoint_parallelsql.sbatch b/checkpoint_parallelsql.sbatch index 1975802..dd61e65 100644 --- a/checkpoint_parallelsql.sbatch +++ b/checkpoint_parallelsql.sbatch @@ -13,8 +13,12 @@ #SBATCH --mem=32G #SBATCH --cpus-per-task=4 #SBATCH --ntasks=1 +#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit +source ./bin/activate module load parallel_sql - +echo $(which perl) +conda list pyarrow +which python3 #Put here commands to load other modules (e.g. matlab etc.) #Below command means that parallel_sql will get tasks from the database #and run them on the node (in parallel). So a 16 core node will have diff --git a/comments_2_parquet.sh b/comments_2_parquet.sh index e9818c1..56ecc4d 100755 --- a/comments_2_parquet.sh +++ b/comments_2_parquet.sh @@ -2,7 +2,7 @@ #!/usr/bin/env bash echo "#!/usr/bin/bash" > job_script.sh -echo "source $(pwd)/../bin/activate" >> job_script.sh +#echo "source $(pwd)/../bin/activate" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh diff --git a/comments_2_parquet_part1.py b/comments_2_parquet_part1.py index faea040..d3c7b7c 100755 --- a/comments_2_parquet_part1.py +++ b/comments_2_parquet_part1.py @@ -8,8 +8,6 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*" - def parse_comment(comment, names= None): if names is None: names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] @@ -48,15 +46,15 @@ def parse_comment(comment, names= None): # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" +dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" -files = list(find_dumps(dumpdir, base_pattern="RC_20*.*")) +files = list(find_dumps(dumpdir, base_pattern="RC_20*")) pool = Pool(28) stream = open_fileset(files) -N = 100000 +N = int(1e4) rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) @@ -80,13 +78,38 @@ schema = pa.schema([ pa.field('error', pa.string(), nullable=True), ]) -with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer: - while True: - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) +from pathlib import Path +p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") + +if not p.is_dir(): + if p.exists(): + p.unlink() + p.mkdir() + +else: + list(map(Path.unlink,p.glob('*'))) + +part_size = int(1e7) +part = 1 +n_output = 0 +writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') + +while True: + if n_output > part_size: + if part > 1: + writer.close() + + part = part + 1 + n_output = 0 + + writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') + + n_output += N + chunk = islice(rows,N) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + - writer.close() diff --git a/comments_2_parquet_part2.py b/comments_2_parquet_part2.py index 62580ac..0d5cc9e 100755 --- a/comments_2_parquet_part2.py +++ b/comments_2_parquet_part2.py @@ -7,7 +7,7 @@ from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2") +df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy') df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') @@ -21,9 +21,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) df = df.repartition('subreddit') df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy') +df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy') df = df.repartition('author') df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') +df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy') diff --git a/helper.py b/helper.py index af87f71..8f1dfe2 100644 --- a/helper.py +++ b/helper.py @@ -14,7 +14,7 @@ def find_dumps(dumpdir, base_pattern): fname, ext = path.splitext(fpath) dumpext[fname].append(ext) - ext_priority = ['.zst','.xz','.bz2'] + ext_priority = ['.zst','.xz','.bz2','.gz'] for base, exts in dumpext.items(): ext = [ext for ext in ext_priority if ext in exts][0] diff --git a/term_cosine_similarity.py b/term_cosine_similarity.py index 44af4e6..f4f1c6e 100644 --- a/term_cosine_similarity.py +++ b/term_cosine_similarity.py @@ -8,7 +8,7 @@ import pandas as pd import fire from itertools import islice from pathlib import Path -from similarities_helper import build_cosine_similarities +from similarities_helper import cosine_similarities spark = SparkSession.builder.getOrCreate() conf = spark.sparkContext.getConf() @@ -57,12 +57,11 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') - spark.stop() - #instead of toLocalMatrix() why not read as entries and put strait into numpy sim_entries = pd.read_parquet(output_parquet) df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() + spark.stop() df['subreddit_id_new'] = df['subreddit_id_new'] - 1 df = df.sort_values('subreddit_id_new').reset_index(drop=True) df = df.set_index('subreddit_id_new') diff --git a/tf_comments.py b/tf_comments.py index cb3b628..526bac2 100755 --- a/tf_comments.py +++ b/tf_comments.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 +import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.parquet as pq from itertools import groupby, islice, chain import fire from collections import Counter -import pandas as pd import os import datetime import re @@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a- # compute term frequencies for comments in each subreddit by week def weekly_tf(partition, mwe_pass = 'first'): dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') - if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"): os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") @@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'): ngram_output = partition.replace("parquet","txt") - if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"): - os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}") + if mwe_pass == 'first': + if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"): + os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}") batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) @@ -167,14 +167,20 @@ def weekly_tf(partition, mwe_pass = 'first'): pddf = pddf.loc[pddf.is_token == True, schema.names] author_pddf = author_pddf.rename({'term':'author'}, axis='columns') author_pddf = author_pddf.loc[:,author_schema.names] - table = pa.Table.from_pandas(pddf,schema=schema) author_table = pa.Table.from_pandas(author_pddf,schema=author_schema) - if table.shape[0] == 0: + do_break = True + + if table.shape[0] != 0: + writer.write_table(table) + do_break = False + if author_table.shape[0] != 0: + author_writer.write_table(author_table) + do_break = False + + if do_break: break - writer.write_table(table) - author_writer.write_table(author_table) - + writer.close() author_writer.close() diff --git a/tfidf_authors.py b/tfidf_authors.py index f06a8ce..432ec39 100644 --- a/tfidf_authors.py +++ b/tfidf_authors.py @@ -1,19 +1,19 @@ from pyspark.sql import SparkSession from similarities_helper import build_tfidf_dataset -## TODO:need to exclude automoderator / bot posts. -## TODO:need to exclude better handle hyperlinks. spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet") +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp") include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) include_subs = {s.strip('\n') for s in include_subs} + +# remove [deleted] and AutoModerator (TODO remove other bots) df = df.filter(df.author != '[deleted]') df = df.filter(df.author != 'AutoModerator') df = build_tfidf_dataset(df, include_subs, 'author') -df.cache() - df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy') + +spark.stop() diff --git a/tfidf_comments.py b/tfidf_comments.py index 9e1a437..65d2420 100644 --- a/tfidf_comments.py +++ b/tfidf_comments.py @@ -15,3 +15,4 @@ include_subs = {s.strip('\n') for s in include_subs} df = build_tfidf_dataset(df, include_subs, 'term') df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') +spark.stop() -- 2.39.2