2 import pyarrow.dataset as ds
3 import pyarrow.parquet as pq
4 from itertools import groupby, islice, chain
6 from collections import Counter
11 # compute term frequencies for comments in each subreddit by week
12 def weekly_tf(partition):
13 dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
14 batches = dataset.to_batches(columns=['CreatedAt','subreddit','body'])
16 schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
17 pa.field('term', pa.string(), nullable=False),
18 pa.field('week', pa.date32(), nullable=False),
19 pa.field('tf', pa.int64(), nullable=False)]
22 dfs = (b.to_pandas() for b in batches)
25 df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
28 dfs = (add_week(df) for df in dfs)
30 def iterate_rows(dfs):
32 for row in df.itertuples():
35 rows = iterate_rows(dfs)
37 subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
39 def tf_comments(subreddit_weeks):
40 for key, posts in subreddit_weeks:
45 tfs.update(post.body.split())
47 for term, tf in tfs.items():
48 yield [subreddit, term, week, tf]
50 outrows = tf_comments(subreddit_weeks)
54 with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
56 chunk = islice(outrows,outchunksize)
57 pddf = pd.DataFrame(chunk, columns=schema.names)
58 table = pa.Table.from_pandas(pddf,schema=schema)
59 if table.shape[0] == 0:
61 writer.write_table(table)
67 files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
68 with open("tf_task_list",'w') as outfile:
70 if f.endswith(".parquet"):
71 outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n")
73 if __name__ == "__main__":
74 fire.Fire({"gen_task_list":gen_task_list,
75 "weekly_tf":weekly_tf})