]> code.communitydata.science - cdsc_reddit.git/blob - tf_reddit_comments.py
ec2dd2cddbb0d1cfbdce215b111820758f6a78d6
[cdsc_reddit.git] / tf_reddit_comments.py
1 import pyarrow as pa
2 import pyarrow.dataset as ds
3 import pyarrow.parquet as pq
4 from itertools import groupby, islice, chain
5 import fire
6 from collections import Counter
7 import pandas as pd
8 import os
9 import datetime
10 from nltk import wordpunct_tokenize, MWETokenizer
11
12 # compute term frequencies for comments in each subreddit by week
13 def weekly_tf(partition):
14     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
15     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body'])
16
17     schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
18                         pa.field('term', pa.string(), nullable=False),
19                         pa.field('week', pa.date32(), nullable=False),
20                         pa.field('tf', pa.int64(), nullable=False)]
21     )
22
23     dfs = (b.to_pandas() for b in batches)
24
25     def add_week(df):
26         df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
27         return(df)
28
29     dfs = (add_week(df) for df in dfs)
30
31     def iterate_rows(dfs):
32         for df in dfs:
33             for row in df.itertuples():
34                 yield row
35
36     rows = iterate_rows(dfs)
37
38     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
39
40     tokenizer = MWETokenizer()
41
42     def tf_comments(subreddit_weeks):
43         for key, posts in subreddit_weeks:
44             subreddit, week = key
45             tfs = Counter([])
46
47             for post in posts:
48                 tfs.update(tokenizer.tokenize(wordpunct_tokenize(post.body.lower())))
49
50             for term, tf in tfs.items():
51                 yield [subreddit, term, week, tf]
52             
53     outrows = tf_comments(subreddit_weeks)
54
55     outchunksize = 10000
56
57     with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
58         while True:
59             chunk = islice(outrows,outchunksize)
60             pddf = pd.DataFrame(chunk, columns=schema.names)
61             print(pddf)
62             table = pa.Table.from_pandas(pddf,schema=schema)
63             if table.shape[0] == 0:
64                 break
65             writer.write_table(table)
66
67         writer.close()
68
69
70 def gen_task_list():
71     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
72     with open("tf_task_list",'w') as outfile:
73         for f in files:
74             if f.endswith(".parquet"):
75                 outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n")
76
77 if __name__ == "__main__":
78     fire.Fire({"gen_task_list":gen_task_list,
79                "weekly_tf":weekly_tf})

Community Data Science Collective || Want to submit a patch?