X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/e6294b5b90135a5163441c8dc62252dd6a188412..refs/heads/synced/excise_reindex:/datasets/comments_2_parquet_part1.py diff --git a/datasets/comments_2_parquet_part1.py b/datasets/comments_2_parquet_part1.py index d3c7b7c..6960986 100755 --- a/datasets/comments_2_parquet_part1.py +++ b/datasets/comments_2_parquet_part1.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 +import os import json from datetime import datetime from multiprocessing import Pool from itertools import islice -from helper import find_dumps, open_fileset +from helper import open_input_file, find_dumps import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +from pathlib import Path +import fire def parse_comment(comment, names= None): if names is None: @@ -46,70 +49,63 @@ def parse_comment(comment, names= None): # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" - -files = list(find_dumps(dumpdir, base_pattern="RC_20*")) - -pool = Pool(28) - -stream = open_fileset(files) - -N = int(1e4) - -rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) - -schema = pa.schema([ - pa.field('id', pa.string(), nullable=True), - pa.field('subreddit', pa.string(), nullable=True), - pa.field('link_id', pa.string(), nullable=True), - pa.field('parent_id', pa.string(), nullable=True), - pa.field('created_utc', pa.timestamp('ms'), nullable=True), - pa.field('author', pa.string(), nullable=True), - pa.field('ups', pa.int64(), nullable=True), - pa.field('downs', pa.int64(), nullable=True), - pa.field('score', pa.int64(), nullable=True), - pa.field('edited', pa.bool_(), nullable=True), - pa.field('time_edited', pa.timestamp('ms'), nullable=True), - pa.field('subreddit_type', pa.string(), nullable=True), - pa.field('subreddit_id', pa.string(), nullable=True), - pa.field('stickied', pa.bool_(), nullable=True), - pa.field('is_submitter', pa.bool_(), nullable=True), - pa.field('body', pa.string(), nullable=True), - pa.field('error', pa.string(), nullable=True), -]) - -from pathlib import Path -p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") - -if not p.is_dir(): - if p.exists(): - p.unlink() - p.mkdir() - -else: - list(map(Path.unlink,p.glob('*'))) - -part_size = int(1e7) -part = 1 -n_output = 0 -writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - -while True: - if n_output > part_size: - if part > 1: - writer.close() - - part = part + 1 - n_output = 0 - - writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - - n_output += N - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) - +def parse_dump(partition): + + dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}" + + stream = open_input_file(dumpdir) + rows = map(parse_comment, stream) + + schema = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('link_id', pa.string(), nullable=True), + pa.field('parent_id', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('is_submitter', pa.bool_(), nullable=True), + pa.field('body', pa.string(), nullable=True), + pa.field('error', pa.string(), nullable=True), + ]) + + p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet") + p.mkdir(exist_ok=True,parents=True) + + N=10000 + with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet", + schema=schema, + compression='snappy', + flavor='spark') as writer: + + while True: + chunk = islice(rows,N) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + + writer.close() + + +def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True): + files = list(find_dumps(dumpdir,base_pattern="RC_20*.*")) + with open("comments_task_list.sh",'w') as of: + for fpath in files: + partition = os.path.split(fpath)[1] + if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True): + of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n') + + +if __name__ == '__main__': + fire.Fire({'parse_dump':parse_dump, + 'gen_task_list':gen_task_list})