]> code.communitydata.science - cdsc_reddit.git/blobdiff - tf_comments.py
git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit
[cdsc_reddit.git] / tf_comments.py
index cb3b6288c37079b20f5355cbdf54d28bf1f29495..526bac2bdabe284ec9550bfe290cadb13e438a1b 100755 (executable)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
-import pandas as pd
 import os
 import datetime
 import re
 import os
 import datetime
 import re
@@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
@@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     ngram_output = partition.replace("parquet","txt")
 
 
     ngram_output = partition.replace("parquet","txt")
 
-    if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-        os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+    if mwe_pass == 'first':
+        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -167,14 +167,20 @@ def weekly_tf(partition, mwe_pass = 'first'):
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
-
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
-            if table.shape[0] == 0:
+            do_break = True
+
+            if table.shape[0] != 0:
+                writer.write_table(table)
+                do_break = False
+            if author_table.shape[0] != 0:
+                author_writer.write_table(author_table)
+                do_break = False
+
+            if do_break:
                 break
                 break
-            writer.write_table(table)
-            author_writer.write_table(author_table)
-            
+
         writer.close()
         author_writer.close()
 
         writer.close()
         author_writer.close()
 

Community Data Science Collective || Want to submit a patch?