]> code.communitydata.science - cdsc_reddit.git/blobdiff - ngrams/tf_comments.py
changes for archiving.
[cdsc_reddit.git] / ngrams / tf_comments.py
index f472eebbb2538bb4af353fd7fed9a7c5ff3825d2..604421c0f43796799bba68511d79747c50c3bcc0 100755 (executable)
@@ -3,6 +3,7 @@ import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
+import pyarrow.compute as pc
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
@@ -15,11 +16,12 @@ import string
 from random import random
 from redditcleaner import clean
 from pathlib import Path
+from datetime import datetime
 
 # compute term frequencies for comments in each subreddit by week
-def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
+def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
 
-    dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
+    dataset = ds.dataset(Path(inputdir)/partition, format='parquet')
     outputdir = Path(outputdir)
     samppath = outputdir / "reddit_comment_ngrams_10p_sample"
 
@@ -37,7 +39,8 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
     if mwe_pass == 'first':
         if ngram_path.exists():
             ngram_path.unlink()
-    
+
+    dataset = dataset.filter(pc.field("CreatedAt") <= pa.scalar(datetime(2020,4,13)))
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
 
@@ -160,9 +163,9 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
 
     outchunksize = 10000
     
-    termtf_outputdir = (outputdir / "comment_terms")
+    termtf_outputdir = (outputdir / "comment_terms.parquet")
     termtf_outputdir.mkdir(parents=True, exist_ok=True)
-    authortf_outputdir = (outputdir / "comment_authors")
+    authortf_outputdir = (outputdir / "comment_authors.parquet")
     authortf_outputdir.mkdir(parents=True, exist_ok=True)    
     termtf_path = termtf_outputdir / partition
     authortf_path = authortf_outputdir / partition
@@ -196,12 +199,12 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
         author_writer.close()
 
 
-def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
-    files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
+def gen_task_list(mwe_pass='first', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
+    files = os.listdir(inputdir)
     with open(tf_task_list,'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
-                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
+                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --inputdir {inputdir} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,

Community Data Science Collective || Want to submit a patch?