X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/40be7bedb668e26f2ad09d3cc15e6f4bb605d143..ddf2adb8a67e82c97de8a1228654c81d72a0e163:/tf_reddit_comments.py?ds=sidebyside diff --git a/tf_reddit_comments.py b/tf_reddit_comments.py new file mode 100644 index 0000000..010b759 --- /dev/null +++ b/tf_reddit_comments.py @@ -0,0 +1,75 @@ +import pyarrow as pa +import pyarrow.dataset as ds +import pyarrow.parquet as pq +from itertools import groupby, islice, chain +import fire +from collections import Counter +import pandas as pd +import os +import datetime + +# compute term frequencies for comments in each subreddit by week +def weekly_tf(partition): + dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') + batches = dataset.to_batches(columns=['CreatedAt','subreddit','body']) + + schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False), + pa.field('term', pa.string(), nullable=False), + pa.field('week', pa.date32(), nullable=False), + pa.field('tf', pa.int64(), nullable=False)] + ) + + dfs = (b.to_pandas() for b in batches) + + def add_week(df): + df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date + return(df) + + dfs = (add_week(df) for df in dfs) + + def iterate_rows(dfs): + for df in dfs: + for row in df.itertuples(): + yield row + + rows = iterate_rows(dfs) + + subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) + + def tf_comments(subreddit_weeks): + for key, posts in subreddit_weeks: + subreddit, week = key + tfs = Counter([]) + + for post in posts: + tfs.update(post.body.split()) + + for term, tf in tfs.items(): + yield [subreddit, term, week, tf] + + outrows = tf_comments(subreddit_weeks) + + outchunksize = 10000 + + with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer: + while True: + chunk = islice(outrows,outchunksize) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + + writer.close() + + +def gen_task_list(): + files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") + with open("tf_task_list",'w') as outfile: + for f in files: + if f.endswith(".parquet"): + outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n") + +if __name__ == "__main__": + fire.Fire({"gen_task_list":gen_task_list, + "weekly_tf":weekly_tf})