# 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
-from collections import defaultdict
-from os import path
-import glob
import json
-import re
from datetime import datetime
-from subprocess import Popen, PIPE
-from multiprocessing import Pool, SimpleQueue
-
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
-
-def find_json_files(dumpdir):
- base_pattern = "RS_20*.*"
-
- files = glob.glob(path.join(dumpdir,base_pattern))
-
- # build a dictionary of possible extensions for each dump
- dumpext = defaultdict(list)
- for fpath in files:
- fname, ext = path.splitext(fpath)
- dumpext[fname].append(ext)
-
- ext_priority = ['.zst','.xz','.bz2']
-
- for base, exts in dumpext.items():
- found = False
- if len(exts) == 1:
- yield base + exts[0]
- found = True
- else:
- for ext in ext_priority:
- if ext in exts:
- yield base + ext
- found = True
- assert(found == True)
-
-files = list(find_json_files(dumpdir))
-
-def read_file(fh):
- lines = open_input_file(fh)
- for line in lines:
- yield line
-
-def open_fileset(files):
- for fh in files:
- print(fh)
- lines = open_input_file(fh)
- for line in lines:
- yield line
-
-def open_input_file(input_filename):
- if re.match(r'.*\.7z$', input_filename):
- cmd = ["7za", "x", "-so", input_filename, '*']
- elif re.match(r'.*\.gz$', input_filename):
- cmd = ["zcat", input_filename]
- elif re.match(r'.*\.bz2$', input_filename):
- cmd = ["bzcat", "-dk", input_filename]
- elif re.match(r'.*\.bz', input_filename):
- cmd = ["bzcat", "-dk", input_filename]
- elif re.match(r'.*\.xz', input_filename):
- cmd = ["xzcat",'-dk', '-T 20',input_filename]
- elif re.match(r'.*\.zst',input_filename):
- cmd = ['zstd','-dck', input_filename]
- try:
- input_file = Popen(cmd, stdout=PIPE).stdout
- except NameError as e:
- print(e)
- input_file = open(input_filename, 'r')
- return input_file
+from multiprocessing import Pool
+from itertools import islice
+from helper import find_dumps, open_fileset
+import pandas as pd
+import pyarrow as pa
+import pyarrow.parquet as pq
def parse_submission(post, names = None):
row.append(post[name])
return tuple(row)
+dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
+
+files = list(find_dumps(dumpdir))
+
pool = Pool(28)
stream = open_fileset(files)
rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28))
-from itertools import islice
-import pandas as pd
-import pyarrow as pa
-import pyarrow.parquet as pq
-
schema = pa.schema([
pa.field('id', pa.string(),nullable=True),
pa.field('author', pa.string(),nullable=True),