X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/e22ddf23da40d9aeb64363e017d8cbbc65aeb45c..4447c60265c5c5de3281ca135461d91ab5339f03:/examples/pyarrow_streaming.py diff --git a/examples/pyarrow_streaming.py b/examples/pyarrow_streaming.py index 8eaf1f6..ebe2219 100644 --- a/examples/pyarrow_streaming.py +++ b/examples/pyarrow_streaming.py @@ -1,17 +1,17 @@ -pimport pyarrow.dataset as ds -from itertools import chain, groupby, islice +import pyarrow.dataset as ds +from itertools import groupby # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. -#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive') + dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet') # let's get all the comments to two subreddits: subreddits_to_pull = ['seattlewa','seattle'] -# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read. +# instead of loading the data into a pandas dataframe all at once we can stream it. scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext']) -# simple function to execute scantasks and create a stream of rows +# simple function to execute scantasks and generate rows def iterate_rows(scan_tasks): for st in scan_tasks: for rb in st.execute():