]> code.communitydata.science - cdsc_reddit.git/blobdiff - examples/pyarrow_streaming.py
Some improvements to run affinity clustering on larger dataset and
[cdsc_reddit.git] / examples / pyarrow_streaming.py
index 8eaf1f667fff972c38cd9439bb0d8123286d9ed3..ebe22191d2d6a89565fec582567d2ee4a4857c41 100644 (file)
@@ -1,17 +1,17 @@
-pimport pyarrow.dataset as ds
-from itertools import chain, groupby, islice
+import pyarrow.dataset as ds
+from itertools import groupby
 
 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. 
 
 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. 
-#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
+
 dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
 
 # let's get all the comments to two subreddits:
 subreddits_to_pull = ['seattlewa','seattle']
 
 dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
 
 # let's get all the comments to two subreddits:
 subreddits_to_pull = ['seattlewa','seattle']
 
-# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
+# instead of loading the data into a pandas dataframe all at once we can stream it.
 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
 
 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
 
-# simple function to execute scantasks and create a stream of rows 
+# simple function to execute scantasks and generate rows
 def iterate_rows(scan_tasks):
     for st in scan_tasks:
         for rb in st.execute():
 def iterate_rows(scan_tasks):
     for st in scan_tasks:
         for rb in st.execute():

Community Data Science Collective || Want to submit a patch?