X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/40d45637702fb51feb9f99ff7f6d71787af765ed..6baa08889b2f46c14f2baa5e3d2136cf165b1673:/comments_2_parquet_part1.py diff --git a/comments_2_parquet_part1.py b/comments_2_parquet_part1.py index faea040..d3c7b7c 100755 --- a/comments_2_parquet_part1.py +++ b/comments_2_parquet_part1.py @@ -8,8 +8,6 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*" - def parse_comment(comment, names= None): if names is None: names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] @@ -48,15 +46,15 @@ def parse_comment(comment, names= None): # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" +dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" -files = list(find_dumps(dumpdir, base_pattern="RC_20*.*")) +files = list(find_dumps(dumpdir, base_pattern="RC_20*")) pool = Pool(28) stream = open_fileset(files) -N = 100000 +N = int(1e4) rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) @@ -80,13 +78,38 @@ schema = pa.schema([ pa.field('error', pa.string(), nullable=True), ]) -with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer: - while True: - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) +from pathlib import Path +p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") + +if not p.is_dir(): + if p.exists(): + p.unlink() + p.mkdir() + +else: + list(map(Path.unlink,p.glob('*'))) + +part_size = int(1e7) +part = 1 +n_output = 0 +writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') + +while True: + if n_output > part_size: + if part > 1: + writer.close() + + part = part + 1 + n_output = 0 + + writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') + + n_output += N + chunk = islice(rows,N) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + - writer.close()