return tuple(row)
-# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
+# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','../../data/spark_tmp')])
def parse_dump(partition):
- dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
+ dumpdir = f"../../data/reddit_dumps/comments/{partition}"
stream = open_input_file(dumpdir)
rows = map(parse_comment, stream)
pa.field('error', pa.string(), nullable=True),
])
- p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
+ p = Path("../../data/temp/reddit_comments.parquet")
p.mkdir(exist_ok=True,parents=True)
N=10000
- with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
+ with pq.ParquetWriter(f"../../data/temp/reddit_comments.parquet/{partition}.parquet",
schema=schema,
compression='snappy',
flavor='spark') as writer:
writer.close()
-def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
+def gen_task_list(dumpdir="../../data/raw_data/reddit_dumps/comments", overwrite=True):
files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
with open("comments_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
- if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
+ if (not Path(f"../../data/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')