]> code.communitydata.science - articlequality_ordinal.git/blobdiff - wikiq_to_parquet.py
add the rest of the code.
[articlequality_ordinal.git] / wikiq_to_parquet.py
diff --git a/wikiq_to_parquet.py b/wikiq_to_parquet.py
new file mode 100644 (file)
index 0000000..f200870
--- /dev/null
@@ -0,0 +1,61 @@
+from pathlib import Path
+import pandas as pd
+from multiprocessing import Pool
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession, Window
+from pyspark.sql.functions import udf
+from pyspark.sql.types import StringType
+import csv
+
+path = Path("/gscratch/comdata/users/nathante/wikiqRunning/wikiq_output/")
+outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_parquet/")
+files = list(map(Path,path.glob("*.tsv")))
+dumpfile = files[0]
+
+def wikiq_tsv_to_parquet(dumpfile, outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet/")):
+    outfile = outpath / (dumpfile.name + ".parquet")
+    outpath.mkdir(parents=True, exist_ok=True)
+    _wikiq_tsv_to_parquet(dumpfile,outfile)
+
+def _wikiq_tsv_to_parquet(dumpfile, outfile):
+
+    dtypes = {'anon': dtype('O'), 'articleid': dtype('int64'), 'deleted': dtype('bool'), 'editor': dtype('O'), 'editor_id': dtype('float64'), 'minor': dtype('bool'), 'namespace': dtype('int64'), 'revert': dtype('O'), 'reverteds': dtype('O'), 'revid': dtype('int64'), 'sha1': dtype('O'), 'text_chars': dtype('float64'), 'title': dtype('O')}
+
+    print(dumpfile)
+    df = pd.read_csv(dumpfile,sep='\t',quoting=csv.QUOTE_NONE,error_bad_lines=False, warn_bad_lines=True,parse_dates=['date_time'],dtype=dtypes)
+
+    df.to_parquet(outfile)
+
+with Pool(28) as pool:
+    jobs = pool.imap_unordered(wikiq_tsv_to_parquet, files)
+    list(jobs)
+
+spark = SparkSession.builder.getOrCreate()
+
+@udf(StringType())
+def decode_strip_udf(val):
+    if val is None:
+        return ""
+    else:
+        return unquote(val).strip('\"')
+df = spark.read.parquet('/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet')
+df = df.withColumnRenamed("anon","anonRaw")
+df = df.withColumn("anon",f.when(f.col("anonRaw")=="TRUE",True).otherwise(False))
+df = df.drop("anonRaw")
+df = df.withColumnRenamed("text_chars","text_chars_raw")
+df = df.withColumn("text_chars",f.col("text_chars_raw").cast('int'))
+df = df.drop("text_chars_raw")
+df = df.withColumnRenamed("editor_id",'editor_id_raw')
+df = df.withColumn("editor_id",f.col("editor_id_raw").cast("int"))
+df = df.drop("editor_id_raw")
+df = df.withColumnRenamed("revert","revert_raw")
+df = df.withColumn("revert",f.when(f.col("revert_raw")=="TRUE",True).otherwise(False))
+df = df.drop("revert_raw")
+df = df.withColumnRenamed("title","title_raw")
+df = df.withColumn("title", decode_strip_udf(f.col("title_raw")))
+df = df.drop("title_raw")
+df = df.withColumnRenamed("editor","editor_raw")
+df = df.withColumn("editor", decode_strip_udf(f.col("editor_raw")))
+df = df.drop("editor_raw")
+df = df.repartition(400,'articleid')
+df.write.parquet("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_partitioned.parquet",mode='overwrite')

Community Data Science Collective || Want to submit a patch?