]> code.communitydata.science - cdsc_reddit.git/blobdiff - examples/pyarrow_streaming.py
refactor visualization code.
[cdsc_reddit.git] / examples / pyarrow_streaming.py
index 512e63f92eee75b1a79c4b4ee8861a9455f8f924..ebe22191d2d6a89565fec582567d2ee4a4857c41 100644 (file)
@@ -1,32 +1,38 @@
 import pyarrow.dataset as ds
 import pyarrow.dataset as ds
-from itertools import chain, groupby, islice
+from itertools import groupby
 
 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. 
 
 # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. 
-#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
-dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive')
+
+dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
 
 # let's get all the comments to two subreddits:
 subreddits_to_pull = ['seattlewa','seattle']
 
 
 # let's get all the comments to two subreddits:
 subreddits_to_pull = ['seattlewa','seattle']
 
-# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
+# instead of loading the data into a pandas dataframe all at once we can stream it.
 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
 
 scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
 
-# simple function to execute scantasks and create a stream of pydict rows 
-def execute_scan_task(st):
-    # an executed scan task yields an iterator of record_batches
-    def unroll_record_batch(rb):
-        df = rb.to_pandas()
-        return df.itertuples()
-
-    for rb in st.execute():
-        yield unroll_record_batch(rb)
+# simple function to execute scantasks and generate rows
+def iterate_rows(scan_tasks):
+    for st in scan_tasks:
+        for rb in st.execute():
+            df = rb.to_pandas()
+            for t in df.itertuples():
+                yield t
 
 
-
-# now we just need to flatten and we have our iterator
-row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks)))
+row_iter = iterate_rows(scan_tasks)
 
 # now we can use python's groupby function to read one author at a time
 # note that the same author can appear more than once since the record batches may not be in the correct order.
 author_submissions = groupby(row_iter, lambda row: row.author)
 
 # now we can use python's groupby function to read one author at a time
 # note that the same author can appear more than once since the record batches may not be in the correct order.
 author_submissions = groupby(row_iter, lambda row: row.author)
+
+count_dict = {}
+
 for auth, posts in author_submissions:
 for auth, posts in author_submissions:
-    print(f"{auth} has {len(list(posts))} posts")
+    if auth in count_dict:
+        count_dict[auth] = count_dict[auth] + 1
+    else:
+        count_dict[auth] = 1
+
+# since it's partitioned and sorted by author, we get one group for each author 
+any([ v != 1 for k,v in count_dict.items()])
+

Community Data Science Collective || Want to submit a patch?