conf = conf.set("spark.sql.shuffle.partitions",2000)
 conf = conf.set('spark.sql.crossJoin.enabled',"true")
 conf = conf.set('spark.debug.maxToStringFields',200)
 sqlContext = pyspark.SQLContext(sc)
 
 conf = conf.set("spark.sql.shuffle.partitions",2000)
 conf = conf.set('spark.sql.crossJoin.enabled',"true")
 conf = conf.set('spark.debug.maxToStringFields',200)
 sqlContext = pyspark.SQLContext(sc)
 
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
 
 df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
 df = df.drop('subreddit')
 df = df.repartition("subreddit")
 df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
 df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
 df = df.repartition("subreddit")
 df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
 df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
-df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", partitionBy=["Year",'Month'], mode='overwrite')
+df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
 
 
 # # we also want to have parquet files sorted by author then reddit. 
 df = df.repartition("author")
 df3 = df.sort(["author","CreatedAt","id"],ascending=True)
 df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
 
 
 # # we also want to have parquet files sorted by author then reddit. 
 df = df.repartition("author")
 df3 = df.sort(["author","CreatedAt","id"],ascending=True)
 df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
-df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet", partitionBy=["Year",'Month'], mode='overwrite')
+df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')