X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/4efd72a916652199c45f6fcf99cdd3de016f4530..1bf206d219a38ffb0290dde83cd061d4a848f1c7:/examples/pyarrow_streaming.py?ds=sidebyside diff --git a/examples/pyarrow_streaming.py b/examples/pyarrow_streaming.py index 512e63f..ebe2219 100644 --- a/examples/pyarrow_streaming.py +++ b/examples/pyarrow_streaming.py @@ -1,32 +1,38 @@ import pyarrow.dataset as ds -from itertools import chain, groupby, islice +from itertools import groupby # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. -#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive') -dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive') + +dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet') # let's get all the comments to two subreddits: subreddits_to_pull = ['seattlewa','seattle'] -# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read. +# instead of loading the data into a pandas dataframe all at once we can stream it. scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext']) -# simple function to execute scantasks and create a stream of pydict rows -def execute_scan_task(st): - # an executed scan task yields an iterator of record_batches - def unroll_record_batch(rb): - df = rb.to_pandas() - return df.itertuples() - - for rb in st.execute(): - yield unroll_record_batch(rb) +# simple function to execute scantasks and generate rows +def iterate_rows(scan_tasks): + for st in scan_tasks: + for rb in st.execute(): + df = rb.to_pandas() + for t in df.itertuples(): + yield t - -# now we just need to flatten and we have our iterator -row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks))) +row_iter = iterate_rows(scan_tasks) # now we can use python's groupby function to read one author at a time # note that the same author can appear more than once since the record batches may not be in the correct order. author_submissions = groupby(row_iter, lambda row: row.author) + +count_dict = {} + for auth, posts in author_submissions: - print(f"{auth} has {len(list(posts))} posts") + if auth in count_dict: + count_dict[auth] = count_dict[auth] + 1 + else: + count_dict[auth] = 1 + +# since it's partitioned and sorted by author, we get one group for each author +any([ v != 1 for k,v in count_dict.items()]) +