2 import pyarrow.dataset as ds
 
   3 import pyarrow.parquet as pq
 
   4 from itertools import groupby, islice, chain
 
   6 from collections import Counter
 
  10 from nltk import wordpunct_tokenize, MWETokenizer
 
  12 # compute term frequencies for comments in each subreddit by week
 
  13 def weekly_tf(partition):
 
  14     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
 
  15     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body'])
 
  17     schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
 
  18                         pa.field('term', pa.string(), nullable=False),
 
  19                         pa.field('week', pa.date32(), nullable=False),
 
  20                         pa.field('tf', pa.int64(), nullable=False)]
 
  23     dfs = (b.to_pandas() for b in batches)
 
  26         df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
 
  29     dfs = (add_week(df) for df in dfs)
 
  31     def iterate_rows(dfs):
 
  33             for row in df.itertuples():
 
  36     rows = iterate_rows(dfs)
 
  38     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
  40     tokenizer = MWETokenizer()
 
  42     def tf_comments(subreddit_weeks):
 
  43         for key, posts in subreddit_weeks:
 
  48                 tfs.update(tokenizer.tokenize(wordpunct_tokenize(post.body.lower())))
 
  50             for term, tf in tfs.items():
 
  51                 yield [subreddit, term, week, tf]
 
  53     outrows = tf_comments(subreddit_weeks)
 
  57     with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
 
  59             chunk = islice(outrows,outchunksize)
 
  60             pddf = pd.DataFrame(chunk, columns=schema.names)
 
  62             table = pa.Table.from_pandas(pddf,schema=schema)
 
  63             if table.shape[0] == 0:
 
  65             writer.write_table(table)
 
  71     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
 
  72     with open("tf_task_list",'w') as outfile:
 
  74             if f.endswith(".parquet"):
 
  75                 outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n")
 
  77 if __name__ == "__main__":
 
  78     fire.Fire({"gen_task_list":gen_task_list,
 
  79                "weekly_tf":weekly_tf})