]> code.communitydata.science - cdsc_reddit.git/blobdiff - submissions_2_parquet_part1.py
Build comments dataset similarly to submissions and improve partitioning scheme
[cdsc_reddit.git] / submissions_2_parquet_part1.py
index 10bb5f044f8dd4c196be201a45beea515075e820..131391bad82f927ec7b0578696d77d148db04fa0 100755 (executable)
@@ -4,75 +4,14 @@
 # 1. from gz to arrow parquet (this script) 
 # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
 
-from collections import defaultdict
-from os import path
-import glob
 import json
-import re
 from datetime import datetime
-from subprocess import Popen, PIPE
-from multiprocessing import Pool, SimpleQueue
-
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
-
-def find_json_files(dumpdir):
-    base_pattern = "RS_20*.*"
-
-    files = glob.glob(path.join(dumpdir,base_pattern))
-
-    # build a dictionary of possible extensions for each dump
-    dumpext = defaultdict(list)
-    for fpath in files:
-        fname, ext = path.splitext(fpath)
-        dumpext[fname].append(ext)
-
-    ext_priority = ['.zst','.xz','.bz2']
-
-    for base, exts in dumpext.items():
-        found = False
-        if len(exts) == 1:
-            yield base + exts[0]
-            found = True
-        else:
-            for ext in ext_priority:
-                if ext in exts:
-                    yield base + ext
-                    found = True
-        assert(found == True)
-
-files = list(find_json_files(dumpdir))
-
-def read_file(fh):
-    lines = open_input_file(fh)
-    for line in lines:
-        yield line
-
-def open_fileset(files):
-    for fh in files:
-        print(fh)
-        lines = open_input_file(fh)
-        for line in lines:
-            yield line
-
-def open_input_file(input_filename):
-    if re.match(r'.*\.7z$', input_filename):
-        cmd = ["7za", "x", "-so", input_filename, '*'] 
-    elif re.match(r'.*\.gz$', input_filename):
-        cmd = ["zcat", input_filename] 
-    elif re.match(r'.*\.bz2$', input_filename):
-        cmd = ["bzcat", "-dk", input_filename] 
-    elif re.match(r'.*\.bz', input_filename):
-        cmd = ["bzcat", "-dk", input_filename] 
-    elif re.match(r'.*\.xz', input_filename):
-        cmd = ["xzcat",'-dk', '-T 20',input_filename]
-    elif re.match(r'.*\.zst',input_filename):
-        cmd = ['zstd','-dck', input_filename]
-    try:
-        input_file = Popen(cmd, stdout=PIPE).stdout
-    except NameError as e:
-        print(e)
-        input_file = open(input_filename, 'r')
-    return input_file
+from multiprocessing import Pool
+from itertools import islice
+from helper import find_dumps, open_fileset
+import pandas as pd
+import pyarrow as pa
+import pyarrow.parquet as pq
 
 
 def parse_submission(post, names = None):
@@ -116,6 +55,10 @@ def parse_submission(post, names = None):
             row.append(post[name])
     return tuple(row)
 
+dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
+
+files = list(find_dumps(dumpdir))
+
 pool = Pool(28)
 
 stream = open_fileset(files)
@@ -124,11 +67,6 @@ N = 100000
 
 rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28))
 
-from itertools import islice
-import pandas as pd
-import pyarrow as pa
-import pyarrow.parquet as pq
-
 schema = pa.schema([
     pa.field('id', pa.string(),nullable=True),
     pa.field('author', pa.string(),nullable=True),

Community Data Science Collective || Want to submit a patch?