from pathlib import Path import pandas as pd from multiprocessing import Pool from pyspark.sql import functions as f from pyspark.sql import SparkSession, Window from pyspark.sql.functions import udf from pyspark.sql.types import StringType import csv path = Path("/gscratch/comdata/users/nathante/wikiqRunning/wikiq_output/") outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_parquet/") files = list(map(Path,path.glob("*.tsv"))) dumpfile = files[0] def wikiq_tsv_to_parquet(dumpfile, outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet/")): outfile = outpath / (dumpfile.name + ".parquet") outpath.mkdir(parents=True, exist_ok=True) _wikiq_tsv_to_parquet(dumpfile,outfile) def _wikiq_tsv_to_parquet(dumpfile, outfile): dtypes = {'anon': dtype('O'), 'articleid': dtype('int64'), 'deleted': dtype('bool'), 'editor': dtype('O'), 'editor_id': dtype('float64'), 'minor': dtype('bool'), 'namespace': dtype('int64'), 'revert': dtype('O'), 'reverteds': dtype('O'), 'revid': dtype('int64'), 'sha1': dtype('O'), 'text_chars': dtype('float64'), 'title': dtype('O')} print(dumpfile) df = pd.read_csv(dumpfile,sep='\t',quoting=csv.QUOTE_NONE,error_bad_lines=False, warn_bad_lines=True,parse_dates=['date_time'],dtype=dtypes) df.to_parquet(outfile) with Pool(28) as pool: jobs = pool.imap_unordered(wikiq_tsv_to_parquet, files) list(jobs) spark = SparkSession.builder.getOrCreate() @udf(StringType()) def decode_strip_udf(val): if val is None: return "" else: return unquote(val).strip('\"') df = spark.read.parquet('/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet') df = df.withColumnRenamed("anon","anonRaw") df = df.withColumn("anon",f.when(f.col("anonRaw")=="TRUE",True).otherwise(False)) df = df.drop("anonRaw") df = df.withColumnRenamed("text_chars","text_chars_raw") df = df.withColumn("text_chars",f.col("text_chars_raw").cast('int')) df = df.drop("text_chars_raw") df = df.withColumnRenamed("editor_id",'editor_id_raw') df = df.withColumn("editor_id",f.col("editor_id_raw").cast("int")) df = df.drop("editor_id_raw") df = df.withColumnRenamed("revert","revert_raw") df = df.withColumn("revert",f.when(f.col("revert_raw")=="TRUE",True).otherwise(False)) df = df.drop("revert_raw") df = df.withColumnRenamed("title","title_raw") df = df.withColumn("title", decode_strip_udf(f.col("title_raw"))) df = df.drop("title_raw") df = df.withColumnRenamed("editor","editor_raw") df = df.withColumn("editor", decode_strip_udf(f.col("editor_raw"))) df = df.drop("editor_raw") df = df.repartition(400,'articleid') df.write.parquet("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_partitioned.parquet",mode='overwrite')