1 import pyarrow.dataset as ds
2 from itertools import chain, groupby, islice
4 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
5 #dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
6 dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive')
8 # let's get all the comments to two subreddits:
9 subreddits_to_pull = ['seattlewa','seattle']
11 # instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
12 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
14 # simple function to execute scantasks and create a stream of pydict rows
15 def execute_scan_task(st):
16 # an executed scan task yields an iterator of record_batches
17 def unroll_record_batch(rb):
19 return df.itertuples()
21 for rb in st.execute():
22 yield unroll_record_batch(rb)
25 # now we just need to flatten and we have our iterator
26 row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks)))
28 # now we can use python's groupby function to read one author at a time
29 # note that the same author can appear more than once since the record batches may not be in the correct order.
30 author_submissions = groupby(row_iter, lambda row: row.author)
31 for auth, posts in author_submissions:
32 print(f"{auth} has {len(list(posts))} posts")