]> code.communitydata.science - cdsc_reddit.git/commitdiff
git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit
authorNate E TeBlunthuis <nathante@mox2.hyak.local>
Thu, 12 Nov 2020 00:39:44 +0000 (16:39 -0800)
committerNate E TeBlunthuis <nathante@mox2.hyak.local>
Thu, 12 Nov 2020 00:39:44 +0000 (16:39 -0800)
author_cosine_similarity.py
checkpoint_parallelsql.sbatch
comments_2_parquet.sh
comments_2_parquet_part1.py
comments_2_parquet_part2.py
helper.py
term_cosine_similarity.py
tf_comments.py
tfidf_authors.py
tfidf_comments.py

index 7b2a766d24461269061a244f0350725ef8a15936..7137da478008e26c2f05fc134f336973e94bb35a 100644 (file)
@@ -13,7 +13,7 @@ spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
 
 # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
 conf = spark.sparkContext.getConf()
 
 # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
+def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
     '''
     Compute similarities between subreddits based on tfi-idf vectors of author comments
     
     '''
     Compute similarities between subreddits based on tfi-idf vectors of author comments
     
@@ -32,9 +32,8 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
 '''
 
     print(outfile)
 '''
 
     print(outfile)
-    print(exclude_phrases)
 
 
-    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet_test1/part-00000-107cee94-92d8-4265-b804-40f1e7f1aaf2-c000.snappy.parquet')
+    tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
 
     if included_subreddits is None:
         included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
 
     if included_subreddits is None:
         included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
@@ -55,12 +54,14 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
     sim_dist = sim_dist.repartition(1)
     sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
     sim_dist = sim_dist.repartition(1)
     sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
-    spark.stop()
+
 
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
 
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+
+    spark.stop()
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
@@ -75,4 +76,4 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
     return similarities
     
 if __name__ == '__main__':
     return similarities
     
 if __name__ == '__main__':
-    fire.Fire(term_cosine_similarities)
+    fire.Fire(author_cosine_similarities)
index 1975802daa4b26745d5e917fe6acde46a9303de6..dd61e65c3a0d90e9791e55bdbd5b6df9b623b3f8 100644 (file)
 #SBATCH --mem=32G
 #SBATCH --cpus-per-task=4
 #SBATCH --ntasks=1
 #SBATCH --mem=32G
 #SBATCH --cpus-per-task=4
 #SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
 module load parallel_sql
 module load parallel_sql
-
+echo $(which perl)
+conda list pyarrow
+which python3
 #Put here commands to load other modules (e.g. matlab etc.)
 #Below command means that parallel_sql will get tasks from the database
 #and run them on the node (in parallel). So a 16 core node will have
 #Put here commands to load other modules (e.g. matlab etc.)
 #Below command means that parallel_sql will get tasks from the database
 #and run them on the node (in parallel). So a 16 core node will have
index e9818c19cfcd6c7f29f2c7bcc04b564b5471e3e6..56ecc4d806ebc5e8017d6a2ee237ff4610a53d08 100755 (executable)
@@ -2,7 +2,7 @@
 
 #!/usr/bin/env bash
 echo "#!/usr/bin/bash" > job_script.sh
 
 #!/usr/bin/env bash
 echo "#!/usr/bin/bash" > job_script.sh
-echo "source $(pwd)/../bin/activate" >> job_script.sh
+#echo "source $(pwd)/../bin/activate" >> job_script.sh
 echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
 
 srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
 echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
 
 srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
index faea040402a30f66c147dd034ed70ed7e4ac1e86..d3c7b7cfef3d2d127fd92b5faef98f616210a11a 100755 (executable)
@@ -8,8 +8,6 @@ import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
 
 import pyarrow as pa
 import pyarrow.parquet as pq
 
-globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*"
-
 def parse_comment(comment, names= None):
     if names is None:
         names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
 def parse_comment(comment, names= None):
     if names is None:
         names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
@@ -48,15 +46,15 @@ def parse_comment(comment, names= None):
 
 #    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
 
 
 #    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
 
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
+dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
 
 
-files = list(find_dumps(dumpdir, base_pattern="RC_20*.*"))
+files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
 
 pool = Pool(28)
 
 stream = open_fileset(files)
 
 
 pool = Pool(28)
 
 stream = open_fileset(files)
 
-N = 100000
+N = int(1e4)
 
 rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
 
 
 rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
 
@@ -80,13 +78,38 @@ schema = pa.schema([
     pa.field('error', pa.string(), nullable=True),
 ])
 
     pa.field('error', pa.string(), nullable=True),
 ])
 
-with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer:
-    while True:
-        chunk = islice(rows,N)
-        pddf = pd.DataFrame(chunk, columns=schema.names)
-        table = pa.Table.from_pandas(pddf,schema=schema)
-        if table.shape[0] == 0:
-            break
-        writer.write_table(table)
+from pathlib import Path
+p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+
+if not p.is_dir():
+    if p.exists():
+        p.unlink()
+    p.mkdir()
+
+else:
+    list(map(Path.unlink,p.glob('*')))
+
+part_size = int(1e7)
+part = 1
+n_output = 0
+writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+while True:
+    if n_output > part_size:
+        if part > 1:
+            writer.close()
+
+        part = part + 1
+        n_output = 0
+    
+        writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+    n_output += N
+    chunk = islice(rows,N)
+    pddf = pd.DataFrame(chunk, columns=schema.names)
+    table = pa.Table.from_pandas(pddf,schema=schema)
+    if table.shape[0] == 0:
+        break
+    writer.write_table(table)
+
 
 
-    writer.close()
index 62580acf605e92c39a4f93c5f15f272d612b8e41..0d5cc9edf10df5f19104032b421096014143a233 100755 (executable)
@@ -7,7 +7,7 @@ from pyspark.sql import SparkSession
 
 spark = SparkSession.builder.getOrCreate()
 
 
 spark = SparkSession.builder.getOrCreate()
 
-df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
@@ -21,9 +21,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 df = df.repartition('subreddit')
 df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
 df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
 df = df.repartition('subreddit')
 df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
 df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
+df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
 
 df = df.repartition('author')
 df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
 df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
 
 df = df.repartition('author')
 df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
 df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
+df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
index af87f71d265501f1f3ca25ecb64882f4ff9997da..8f1dfe2e79bdd3ad29018e34adde7a30e2cac823 100644 (file)
--- a/helper.py
+++ b/helper.py
@@ -14,7 +14,7 @@ def find_dumps(dumpdir, base_pattern):
         fname, ext = path.splitext(fpath)
         dumpext[fname].append(ext)
 
         fname, ext = path.splitext(fpath)
         dumpext[fname].append(ext)
 
-    ext_priority = ['.zst','.xz','.bz2']
+    ext_priority = ['.zst','.xz','.bz2','.gz']
 
     for base, exts in dumpext.items():
         ext = [ext for ext in ext_priority if ext in exts][0]
 
     for base, exts in dumpext.items():
         ext = [ext for ext in ext_priority if ext in exts][0]
index 44af4e606082185611b077100b7ab24cd7397e1a..f4f1c6edf76e33bbb41fc74a1de207a8390dca9e 100644 (file)
@@ -8,7 +8,7 @@ import pandas as pd
 import fire
 from itertools import islice
 from pathlib import Path
 import fire
 from itertools import islice
 from pathlib import Path
-from similarities_helper import build_cosine_similarities
+from similarities_helper import cosine_similarities
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
 
 spark = SparkSession.builder.getOrCreate()
 conf = spark.sparkContext.getConf()
@@ -57,12 +57,11 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
 
     sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
 
     sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
     
-    spark.stop()
-
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
     #instead of toLocalMatrix() why not read as entries and put strait into numpy
     sim_entries = pd.read_parquet(output_parquet)
 
     df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+    spark.stop()
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
     df['subreddit_id_new'] = df['subreddit_id_new'] - 1
     df = df.sort_values('subreddit_id_new').reset_index(drop=True)
     df = df.set_index('subreddit_id_new')
index cb3b6288c37079b20f5355cbdf54d28bf1f29495..526bac2bdabe284ec9550bfe290cadb13e438a1b 100755 (executable)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
-import pandas as pd
 import os
 import datetime
 import re
 import os
 import datetime
 import re
@@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
@@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     ngram_output = partition.replace("parquet","txt")
 
 
     ngram_output = partition.replace("parquet","txt")
 
-    if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-        os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+    if mwe_pass == 'first':
+        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -167,14 +167,20 @@ def weekly_tf(partition, mwe_pass = 'first'):
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
-
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
-            if table.shape[0] == 0:
+            do_break = True
+
+            if table.shape[0] != 0:
+                writer.write_table(table)
+                do_break = False
+            if author_table.shape[0] != 0:
+                author_writer.write_table(author_table)
+                do_break = False
+
+            if do_break:
                 break
                 break
-            writer.write_table(table)
-            author_writer.write_table(author_table)
-            
+
         writer.close()
         author_writer.close()
 
         writer.close()
         author_writer.close()
 
index f06a8ce72f4ec995d4dcaa8a2d4abf42b0d447df..432ec39b764cf1a4de1edfba82a35dab01dfd393 100644 (file)
@@ -1,19 +1,19 @@
 from pyspark.sql import SparkSession
 from similarities_helper import build_tfidf_dataset
 
 from pyspark.sql import SparkSession
 from similarities_helper import build_tfidf_dataset
 
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks. 
 spark = SparkSession.builder.getOrCreate()
 
 spark = SparkSession.builder.getOrCreate()
 
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet")
+df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
 
 include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
 include_subs = {s.strip('\n') for s in include_subs}
 
 include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
 include_subs = {s.strip('\n') for s in include_subs}
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
 df = df.filter(df.author != '[deleted]')
 df = df.filter(df.author != 'AutoModerator')
 
 df = build_tfidf_dataset(df, include_subs, 'author')
 
 df = df.filter(df.author != '[deleted]')
 df = df.filter(df.author != 'AutoModerator')
 
 df = build_tfidf_dataset(df, include_subs, 'author')
 
-df.cache()
-
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
index 9e1a437d417df955b6ea5f84c9f5c9b2b87b2136..65d24204446d09ecf66afa30b23f3e118f51a2de 100644 (file)
@@ -15,3 +15,4 @@ include_subs = {s.strip('\n') for s in include_subs}
 df = build_tfidf_dataset(df, include_subs, 'term')
 
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
 df = build_tfidf_dataset(df, include_subs, 'term')
 
 df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
+spark.stop()

Community Data Science Collective || Want to submit a patch?