]> code.communitydata.science - cdsc_reddit.git/commitdiff
git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit
authorNate E TeBlunthuis <nathante@mox2.hyak.local>
Thu, 12 Nov 2020 00:39:44 +0000 (16:39 -0800)
committerNate E TeBlunthuis <nathante@mox2.hyak.local>
Thu, 12 Nov 2020 00:39:44 +0000 (16:39 -0800)
author_cosine_similarity.py
checkpoint_parallelsql.sbatch
comments_2_parquet.sh
comments_2_parquet_part1.py
comments_2_parquet_part2.py
helper.py
term_cosine_similarity.py
tf_comments.py
tfidf_authors.py
tfidf_comments.py

index 7b2a766d24461269061a244f0350725ef8a15936..7137da478008e26c2f05fc134f336973e94bb35a 100644 (file)
@@ -13,7 +13,7 @@ spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
 
 # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
+def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
     '''
     Compute similarities between subreddits based on tfi-idf vectors of author comments
     
@@ -32,9 +32,8 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
 '''
 
     print(outfile)
-    print(exclude_phrases)
 
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet_test1/part-00000-107cee94-92d8-4265-b804-40f1e7f1aaf2-c000.snappy.parquet')
+    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
 
     if included_subreddits is None:
         included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
@@ -55,12 +54,14 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
     sim_dist = sim_dist.repartition(1)
     sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
-    spark.stop()
+
 
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+
+    spark.stop()
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
@@ -75,4 +76,4 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
     return similarities
     
 if __name__ == '__main__':
-    fire.Fire(term_cosine_similarities)
+    fire.Fire(author_cosine_similarities)
index 1975802daa4b26745d5e917fe6acde46a9303de6..dd61e65c3a0d90e9791e55bdbd5b6df9b623b3f8 100644 (file)
 #SBATCH --mem=32G
 #SBATCH --cpus-per-task=4
 #SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
 module load parallel_sql
-
+echo $(which perl)
+conda list pyarrow
+which python3
 #Put here commands to load other modules (e.g. matlab etc.)
 #Below command means that parallel_sql will get tasks from the database
 #and run them on the node (in parallel). So a 16 core node will have
index e9818c19cfcd6c7f29f2c7bcc04b564b5471e3e6..56ecc4d806ebc5e8017d6a2ee237ff4610a53d08 100755 (executable)
@@ -2,7 +2,7 @@
 
 #!/usr/bin/env bash
 echo "#!/usr/bin/bash" > job_script.sh
-echo "source $(pwd)/../bin/activate" >> job_script.sh
+#echo "source $(pwd)/../bin/activate" >> job_script.sh
 echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
 
 srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
index faea040402a30f66c147dd034ed70ed7e4ac1e86..d3c7b7cfef3d2d127fd92b5faef98f616210a11a 100755 (executable)
@@ -8,8 +8,6 @@ import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
 
-globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*"
-
 def parse_comment(comment, names= None):
     if names is None:
         names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
@@ -48,15 +46,15 @@ def parse_comment(comment, names= None):
 
 #    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
 
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
+dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
 
-files = list(find_dumps(dumpdir, base_pattern="RC_20*.*"))
+files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
 
 pool = Pool(28)
 
 stream = open_fileset(files)
 
-N = 100000
+N = int(1e4)
 
 rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
 
@@ -80,13 +78,38 @@ schema = pa.schema([
     pa.field('error', pa.string(), nullable=True),
 ])
 
-with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer:
-    while True:
-        chunk = islice(rows,N)
-        pddf = pd.DataFrame(chunk, columns=schema.names)
-        table = pa.Table.from_pandas(pddf,schema=schema)
-        if table.shape[0] == 0:
-            break
-        writer.write_table(table)
+from pathlib import Path
+p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+
+if not p.is_dir():
+    if p.exists():
+        p.unlink()
+    p.mkdir()
+
+else:
+    list(map(Path.unlink,p.glob('*')))
+
+part_size = int(1e7)
+part = 1
+n_output = 0
+writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+while True:
+    if n_output > part_size:
+        if part > 1:
+            writer.close()
+
+        part = part + 1
+        n_output = 0
+    
+        writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+    n_output += N
+    chunk = islice(rows,N)
+    pddf = pd.DataFrame(chunk, columns=schema.names)
+    table = pa.Table.from_pandas(pddf,schema=schema)
+    if table.shape[0] == 0:
+        break
+    writer.write_table(table)
+
 
-    writer.close()
index 62580acf605e92c39a4f93c5f15f272d612b8e41..0d5cc9edf10df5f19104032b421096014143a233 100755 (executable)
@@ -7,7 +7,7 @@ from pyspark.sql import SparkSession
 
 spark = SparkSession.builder.getOrCreate()
 
-df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
@@ -21,9 +21,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 df = df.repartition('subreddit')
 df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
 df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
+df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
 
 df = df.repartition('author')
 df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
 df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
+df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
index af87f71d265501f1f3ca25ecb64882f4ff9997da..8f1dfe2e79bdd3ad29018e34adde7a30e2cac823 100644 (file)
--- a/helper.py
+++ b/helper.py
@@ -14,7 +14,7 @@ def find_dumps(dumpdir, base_pattern):
         fname, ext = path.splitext(fpath)
         dumpext[fname].append(ext)
 
-    ext_priority = ['.zst','.xz','.bz2']
+    ext_priority = ['.zst','.xz','.bz2','.gz']
 
     for base, exts in dumpext.items():
         ext = [ext for ext in ext_priority if ext in exts][0]
index 44af4e606082185611b077100b7ab24cd7397e1a..f4f1c6edf76e33bbb41fc74a1de207a8390dca9e 100644 (file)
@@ -8,7 +8,7 @@ import pandas as pd
 import fire
 from itertools import islice
 from pathlib import Path
-from similarities_helper import build_cosine_similarities
+from similarities_helper import cosine_similarities
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
@@ -57,12 +57,11 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
 
     sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
-    spark.stop()
-
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+    spark.stop()
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
index cb3b6288c37079b20f5355cbdf54d28bf1f29495..526bac2bdabe284ec9550bfe290cadb13e438a1b 100755 (executable)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python3
+import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
-import pandas as pd
 import os
 import datetime
 import re
@@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
@@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     ngram_output = partition.replace("parquet","txt")
 
-    if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-        os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+    if mwe_pass == 'first':
+        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -167,14 +167,20 @@ def weekly_tf(partition, mwe_pass = 'first'):
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
-
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
-            if table.shape[0] == 0:
+            do_break = True
+
+            if table.shape[0] != 0:
+                writer.write_table(table)
+                do_break = False
+            if author_table.shape[0] != 0:
+                author_writer.write_table(author_table)
+                do_break = False
+
+            if do_break:
                 break
-            writer.write_table(table)
-            author_writer.write_table(author_table)
-            
+
         writer.close()
         author_writer.close()
 
index f06a8ce72f4ec995d4dcaa8a2d4abf42b0d447df..432ec39b764cf1a4de1edfba82a35dab01dfd393 100644 (file)
@@ -1,19 +1,19 @@
 from pyspark.sql import SparkSession
 from similarities_helper import build_tfidf_dataset
 
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks. 
 spark = SparkSession.builder.getOrCreate()
 
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet")
+df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
 
 include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
 include_subs = {s.strip('\n') for s in include_subs}
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
 df = df.filter(df.author != '[deleted]')
 df = df.filter(df.author != 'AutoModerator')
 
 df = build_tfidf_dataset(df, include_subs, 'author')
 
-df.cache()
-
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
index 9e1a437d417df955b6ea5f84c9f5c9b2b87b2136..65d24204446d09ecf66afa30b23f3e118f51a2de 100644 (file)
@@ -15,3 +15,4 @@ include_subs = {s.strip('\n') for s in include_subs}
 df = build_tfidf_dataset(df, include_subs, 'term')
 
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
+spark.stop()

Community Data Science Collective || Want to submit a patch?