]> code.communitydata.science - cdsc_reddit.git/blobdiff - datasets/comments_2_parquet_part1.py
changes for archiving.
[cdsc_reddit.git] / datasets / comments_2_parquet_part1.py
index 69609860478b00e39ee23b624422a0dfb0a5ad89..7e06833cb0a44f1f189d1363254155bf985caf5b 100755 (executable)
@@ -47,11 +47,11 @@ def parse_comment(comment, names= None):
     return tuple(row)
 
 
-#    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
+#    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','../../data/spark_tmp')])
 
 def parse_dump(partition):
 
-    dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
+    dumpdir = f"../../data/reddit_dumps/comments/{partition}"
 
     stream = open_input_file(dumpdir)
     rows = map(parse_comment, stream)
@@ -76,11 +76,11 @@ def parse_dump(partition):
         pa.field('error', pa.string(), nullable=True),
     ])
 
-    p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
+    p = Path("../../data/temp/reddit_comments.parquet")
     p.mkdir(exist_ok=True,parents=True)
 
     N=10000
-    with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
+    with pq.ParquetWriter(f"../../data/temp/reddit_comments.parquet/{partition}.parquet",
                           schema=schema,
                           compression='snappy',
                           flavor='spark') as writer:
@@ -96,12 +96,12 @@ def parse_dump(partition):
         writer.close()
 
 
-def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
+def gen_task_list(dumpdir="../../data/raw_data/reddit_dumps/comments", overwrite=True):
     files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
     with open("comments_task_list.sh",'w') as of:
         for fpath in files:
             partition = os.path.split(fpath)[1]
-            if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
+            if (not Path(f"../../data/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
                 of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
 
 

Community Data Science Collective || Want to submit a patch?