]> code.communitydata.science - cdsc_reddit.git/blobdiff - datasets/submissions_2_parquet_part1.py
Merge remote-tracking branch 'refs/remotes/origin/excise_reindex' into excise_reindex
[cdsc_reddit.git] / datasets / submissions_2_parquet_part1.py
index 16d1988f0e04ebbbf47796d97fd9b03f70a252f4..77ae09f33ca260261ea919cbf23bbe822aa940c4 100755 (executable)
@@ -3,26 +3,23 @@
 # two stages:
 # 1. from gz to arrow parquet (this script) 
 # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
-
 from datetime import datetime
-from multiprocessing import Pool
+from pathlib import Path
 from itertools import islice
 from helper import find_dumps, open_fileset
 import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
-import simdjson
 import fire
 import os
-
-parser = simdjson.Parser()
+import json
 
 def parse_submission(post, names = None):
     if names is None:
         names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
 
     try:
-        post = parser.parse(post)
+        post = json.loads(post)
     except (ValueError) as e:
         #        print(e)
         #        print(post)
@@ -92,8 +89,7 @@ def parse_dump(partition):
         pa.field('quarantine',pa.bool_(),nullable=True),
         pa.field('error',pa.string(),nullable=True)])
 
-    if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
-        os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
+    Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
 
     with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
         while True:
@@ -108,7 +104,7 @@ def parse_dump(partition):
 
 def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
     files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
-    with open("parse_submissions_task_list",'w') as of:
+    with open("submissions_task_list.sh",'w') as of:
         for fpath in files:
             partition = os.path.split(fpath)[1]
             of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')

Community Data Science Collective || Want to submit a patch?