]> code.communitydata.science - cdsc_reddit.git/blobdiff - tf_comments.py
git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit
[cdsc_reddit.git] / tf_comments.py
index 211647e11521052390b4c0c754797cc2c55e651a..526bac2bdabe284ec9550bfe290cadb13e438a1b 100755 (executable)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
-import pandas as pd
 import os
 import datetime
 import re
 import os
 import datetime
 import re
@@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
@@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     ngram_output = partition.replace("parquet","txt")
 
 
     ngram_output = partition.replace("parquet","txt")
 
-    if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-        os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+    if mwe_pass == 'first':
+        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
@@ -161,21 +161,26 @@ def weekly_tf(partition, mwe_pass = 'first'):
         while True:
 
             chunk = islice(outrows,outchunksize)
         while True:
 
             chunk = islice(outrows,outchunksize)
-            chunk = (c for c in chunk if c.subreddit is not None)
+            chunk = (c for c in chunk if c[1] is not None)
             pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
             pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
-
             author_pddf = pddf.loc[pddf.is_token == False, schema.names]
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
             author_pddf = pddf.loc[pddf.is_token == False, schema.names]
             pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
-
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
-            if table.shape[0] == 0:
+            do_break = True
+
+            if table.shape[0] != 0:
+                writer.write_table(table)
+                do_break = False
+            if author_table.shape[0] != 0:
+                author_writer.write_table(author_table)
+                do_break = False
+
+            if do_break:
                 break
                 break
-            writer.write_table(table)
-            author_writer.write_table(author_table)
-            
+
         writer.close()
         author_writer.close()
 
         writer.close()
         author_writer.close()
 
@@ -185,7 +190,7 @@ def gen_task_list(mwe_pass='first'):
     with open("tf_task_list",'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
     with open("tf_task_list",'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
-                outfile.write(f"python3 tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
+                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,

Community Data Science Collective || Want to submit a patch?