]> code.communitydata.science - cdsc_reddit.git/blob - reddit_bz2_2parquet.py
update .gitignore
[cdsc_reddit.git] / reddit_bz2_2parquet.py
1
2 #!/usr/bin/env python3
3 import pyspark
4 from pyspark.sql import functions as f
5 from pyspark.sql.types import *
6 from pyspark import SparkConf, SparkContext
7 from pyspark.sql import SparkSession, SQLContext
8
9 conf = SparkConf().setAppName("Reddit to bz2")
10 conf = conf.set('spark.sql.crossJoin.enabled',"true")
11
12 spark = SparkSession.builder.getOrCreate()
13 sc = spark.sparkContext
14
15 globstr = "/gscratch/comdata/raw_data/reddit_dumps/comments/RC_20*.bz2"
16
17 import re
18 import glob
19 import json
20 from subprocess import Popen, PIPE
21 from datetime import datetime
22 import pandas as pd
23 from multiprocessing import Pool
24
25 def open_fileset(globstr):
26     files = glob.glob(globstr)
27     for fh in files:
28         print(fh)
29         lines = open_input_file(fh)
30         for line in lines:
31             yield json.loads(line)
32                 
33 def open_input_file(input_filename):
34     if re.match(r'.*\.7z$', input_filename):
35         cmd = ["7za", "x", "-so", input_filename, '*'] 
36     elif re.match(r'.*\.gz$', input_filename):
37         cmd = ["zcat", input_filename] 
38     elif re.match(r'.*\.bz2$', input_filename):
39         cmd = ["bzcat", "-dk", input_filename] 
40
41     elif re.match(r'.*\.bz', input_filename):
42         cmd = ["bzcat", "-dk", input_filename] 
43     elif re.match(r'.*\.xz', input_filename):
44         cmd = ["xzcat",'-dk',input_filename]
45     try:
46         input_file = Popen(cmd, stdout=PIPE).stdout
47     except NameError:
48         input_file = open(input_filename, 'r')
49     return input_file
50
51 def include_row(comment, subreddits_to_track = []):
52     
53     subreddit = comment['subreddit'].lower()
54
55     return subreddit in subreddits_to_track
56
57 def parse_comment(comment, names= None):
58     if names is None:
59         names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
60
61     try:
62         comment = json.loads(comment)
63     except json.decoder.JSONDecodeError as e:
64         print(e)
65         print(comment)
66         row = [None for _ in names]
67         row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment)
68         return tuple(row)
69
70     row = []
71     for name in names:
72         if name == 'created_utc':
73             row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None))
74         elif name == 'edited':
75             val = comment[name]
76             if type(val) == bool:
77                 row.append(val)
78                 row.append(None)
79             else:
80                 row.append(True)
81                 row.append(datetime.fromtimestamp(int(val),tz=None))
82         elif name == "time_edited":
83             continue
84         elif name not in comment:
85             row.append(None)
86
87         else:
88             row.append(comment[name])
89
90     return tuple(row)
91
92
93 #    conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
94     
95 sqlContext = pyspark.SQLContext(sc)
96
97 comments = sc.textFile(globstr)
98
99 schema = StructType().add("id", StringType(), True)
100 schema = schema.add("subreddit", StringType(), True)
101 schema = schema.add("link_id", StringType(), True)
102 schema = schema.add("parent_id", StringType(), True)
103 schema = schema.add("created_utc", TimestampType(), True)
104 schema = schema.add("author", StringType(), True)
105 schema = schema.add("ups", LongType(), True)
106 schema = schema.add("downs", LongType(), True)
107 schema = schema.add("score", LongType(), True)
108 schema = schema.add("edited", BooleanType(), True)
109 schema = schema.add("time_edited", TimestampType(), True)
110 schema = schema.add("subreddit_type", StringType(), True)
111 schema = schema.add("subreddit_id", StringType(), True)
112 schema = schema.add("stickied", BooleanType(), True)
113 schema = schema.add("is_submitter", BooleanType(), True)
114 schema = schema.add("body", StringType(), True)
115 schema = schema.add("error", StringType(), True)
116
117 rows = comments.map(lambda c: parse_comment(c, schema.fieldNames()))
118 #!/usr/bin/env python3
119
120 df =  sqlContext.createDataFrame(rows, schema)
121
122 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
123 df = df.drop('subreddit')
124 df = df.withColumnRenamed('subreddit_2','subreddit')
125
126 df = df.withColumnRenamed("created_utc","CreatedAt")
127 df = df.withColumn("Month",f.month(f.col("CreatedAt")))
128 df = df.withColumn("Year",f.year(f.col("CreatedAt")))
129 df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
130 df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
131 df = df.sort(["subreddit","author","link_id","parent_id","Year","Month","Day"],ascending=True)
132 df.write.parquet("/gscratch/comdata/output/reddit_comments.parquet", partitionBy=["Year",'Month'],mode='overwrite')

Community Data Science Collective || Want to submit a patch?