X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/197518a222a321a8027c3dc5a4121350c47d0779..refs/heads/icwsm_dataverse:/ngrams/tf_comments.py diff --git a/ngrams/tf_comments.py b/ngrams/tf_comments.py index f472eeb..604421c 100755 --- a/ngrams/tf_comments.py +++ b/ngrams/tf_comments.py @@ -3,6 +3,7 @@ import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.parquet as pq +import pyarrow.compute as pc from itertools import groupby, islice, chain import fire from collections import Counter @@ -15,11 +16,12 @@ import string from random import random from redditcleaner import clean from pathlib import Path +from datetime import datetime # compute term frequencies for comments in each subreddit by week -def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None): +def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None): - dataset = ds.dataset(Path(input_dir)/partition, format='parquet') + dataset = ds.dataset(Path(inputdir)/partition, format='parquet') outputdir = Path(outputdir) samppath = outputdir / "reddit_comment_ngrams_10p_sample" @@ -37,7 +39,8 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', if mwe_pass == 'first': if ngram_path.exists(): ngram_path.unlink() - + + dataset = dataset.filter(pc.field("CreatedAt") <= pa.scalar(datetime(2020,4,13))) batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) @@ -160,9 +163,9 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', outchunksize = 10000 - termtf_outputdir = (outputdir / "comment_terms") + termtf_outputdir = (outputdir / "comment_terms.parquet") termtf_outputdir.mkdir(parents=True, exist_ok=True) - authortf_outputdir = (outputdir / "comment_authors") + authortf_outputdir = (outputdir / "comment_authors.parquet") authortf_outputdir.mkdir(parents=True, exist_ok=True) termtf_path = termtf_outputdir / partition authortf_path = authortf_outputdir / partition @@ -196,12 +199,12 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', author_writer.close() -def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None): - files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") +def gen_task_list(mwe_pass='first', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None): + files = os.listdir(inputdir) with open(tf_task_list,'w') as outfile: for f in files: if f.endswith(".parquet"): - outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n") + outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --inputdir {inputdir} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n") if __name__ == "__main__": fire.Fire({"gen_task_list":gen_task_list,