]> code.communitydata.science - cdsc_reddit.git/blob - examples/pyarrow_streaming.py
add script for pulling cluster timeseries
[cdsc_reddit.git] / examples / pyarrow_streaming.py
1 import pyarrow.dataset as ds
2 from itertools import groupby
3
4 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. 
5
6 dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
7
8 # let's get all the comments to two subreddits:
9 subreddits_to_pull = ['seattlewa','seattle']
10
11 # instead of loading the data into a pandas dataframe all at once we can stream it.
12 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
13
14 # simple function to execute scantasks and generate rows
15 def iterate_rows(scan_tasks):
16     for st in scan_tasks:
17         for rb in st.execute():
18             df = rb.to_pandas()
19             for t in df.itertuples():
20                 yield t
21
22 row_iter = iterate_rows(scan_tasks)
23
24 # now we can use python's groupby function to read one author at a time
25 # note that the same author can appear more than once since the record batches may not be in the correct order.
26 author_submissions = groupby(row_iter, lambda row: row.author)
27
28 count_dict = {}
29
30 for auth, posts in author_submissions:
31     if auth in count_dict:
32         count_dict[auth] = count_dict[auth] + 1
33     else:
34         count_dict[auth] = 1
35
36 # since it's partitioned and sorted by author, we get one group for each author 
37 any([ v != 1 for k,v in count_dict.items()])
38

Community Data Science Collective || Want to submit a patch?