# two stages:
# 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
-
from datetime import datetime
-from multiprocessing import Pool
+from pathlib import Path
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
-import simdjson
import fire
import os
-
-parser = simdjson.Parser()
+import json
def parse_submission(post, names = None):
if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try:
- post = parser.parse(post)
+ post = json.loads(post)
except (ValueError) as e:
# print(e)
# print(post)
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
- if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
- os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
+ Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
- with open("parse_submissions_task_list",'w') as of:
+ with open("submissions_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')