import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
+import pyarrow.compute as pc
from itertools import groupby, islice, chain
import fire
from collections import Counter
from random import random
from redditcleaner import clean
from pathlib import Path
+from datetime import datetime
# compute term frequencies for comments in each subreddit by week
-def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
+def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
- dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
+ dataset = ds.dataset(Path(inputdir)/partition, format='parquet')
outputdir = Path(outputdir)
samppath = outputdir / "reddit_comment_ngrams_10p_sample"
if mwe_pass == 'first':
if ngram_path.exists():
ngram_path.unlink()
-
+
+ dataset = dataset.filter(pc.field("CreatedAt") <= pa.scalar(datetime(2020,4,13)))
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
outchunksize = 10000
- termtf_outputdir = (outputdir / "comment_terms")
+ termtf_outputdir = (outputdir / "comment_terms.parquet")
termtf_outputdir.mkdir(parents=True, exist_ok=True)
- authortf_outputdir = (outputdir / "comment_authors")
+ authortf_outputdir = (outputdir / "comment_authors.parquet")
authortf_outputdir.mkdir(parents=True, exist_ok=True)
termtf_path = termtf_outputdir / partition
authortf_path = authortf_outputdir / partition
author_writer.close()
-def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
- files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
+def gen_task_list(mwe_pass='first', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
+ files = os.listdir(inputdir)
with open(tf_task_list,'w') as outfile:
for f in files:
if f.endswith(".parquet"):
- outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
+ outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --inputdir {inputdir} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,