]> code.communitydata.science - articlequality_ordinal.git/blob - wikiq_to_parquet.py
add the rest of the code.
[articlequality_ordinal.git] / wikiq_to_parquet.py
1 from pathlib import Path
2 import pandas as pd
3 from multiprocessing import Pool
4 from pyspark.sql import functions as f
5 from pyspark.sql import SparkSession, Window
6 from pyspark.sql.functions import udf
7 from pyspark.sql.types import StringType
8 import csv
9
10 path = Path("/gscratch/comdata/users/nathante/wikiqRunning/wikiq_output/")
11 outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_parquet/")
12 files = list(map(Path,path.glob("*.tsv")))
13 dumpfile = files[0]
14
15 def wikiq_tsv_to_parquet(dumpfile, outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet/")):
16     outfile = outpath / (dumpfile.name + ".parquet")
17     outpath.mkdir(parents=True, exist_ok=True)
18     _wikiq_tsv_to_parquet(dumpfile,outfile)
19
20 def _wikiq_tsv_to_parquet(dumpfile, outfile):
21
22     dtypes = {'anon': dtype('O'), 'articleid': dtype('int64'), 'deleted': dtype('bool'), 'editor': dtype('O'), 'editor_id': dtype('float64'), 'minor': dtype('bool'), 'namespace': dtype('int64'), 'revert': dtype('O'), 'reverteds': dtype('O'), 'revid': dtype('int64'), 'sha1': dtype('O'), 'text_chars': dtype('float64'), 'title': dtype('O')}
23
24     print(dumpfile)
25     df = pd.read_csv(dumpfile,sep='\t',quoting=csv.QUOTE_NONE,error_bad_lines=False, warn_bad_lines=True,parse_dates=['date_time'],dtype=dtypes)
26
27     df.to_parquet(outfile)
28
29 with Pool(28) as pool:
30     jobs = pool.imap_unordered(wikiq_tsv_to_parquet, files)
31     list(jobs)
32
33 spark = SparkSession.builder.getOrCreate()
34
35 @udf(StringType())
36 def decode_strip_udf(val):
37     if val is None:
38         return ""
39     else:
40         return unquote(val).strip('\"')
41 df = spark.read.parquet('/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet')
42 df = df.withColumnRenamed("anon","anonRaw")
43 df = df.withColumn("anon",f.when(f.col("anonRaw")=="TRUE",True).otherwise(False))
44 df = df.drop("anonRaw")
45 df = df.withColumnRenamed("text_chars","text_chars_raw")
46 df = df.withColumn("text_chars",f.col("text_chars_raw").cast('int'))
47 df = df.drop("text_chars_raw")
48 df = df.withColumnRenamed("editor_id",'editor_id_raw')
49 df = df.withColumn("editor_id",f.col("editor_id_raw").cast("int"))
50 df = df.drop("editor_id_raw")
51 df = df.withColumnRenamed("revert","revert_raw")
52 df = df.withColumn("revert",f.when(f.col("revert_raw")=="TRUE",True).otherwise(False))
53 df = df.drop("revert_raw")
54 df = df.withColumnRenamed("title","title_raw")
55 df = df.withColumn("title", decode_strip_udf(f.col("title_raw")))
56 df = df.drop("title_raw")
57 df = df.withColumnRenamed("editor","editor_raw")
58 df = df.withColumn("editor", decode_strip_udf(f.col("editor_raw")))
59 df = df.drop("editor_raw")
60 df = df.repartition(400,'articleid')
61 df.write.parquet("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_partitioned.parquet",mode='overwrite')

Community Data Science Collective || Want to submit a patch?