]> code.communitydata.science - cdsc_reddit.git/blobdiff - ngrams/sort_tf_comments.py
git-annex in
[cdsc_reddit.git] / ngrams / sort_tf_comments.py
index abb097ef927c19f1f6bb2469dfa65da9f2f022ca..d9c3e2c496ed262e794a2f287eb5c6af12425ef2 100644 (file)
@@ -2,12 +2,17 @@
 
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
 
 from pyspark.sql import functions as f
 from pyspark.sql import SparkSession
+import fire
 
 
-spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/")
+def main(inparquet, outparquet, colname):
+    spark = SparkSession.builder.getOrCreate()
+    df = spark.read.parquet(inparquet)
 
 
-df = df.repartition(2000,'term')
-df = df.sort(['term','week','subreddit'])
-df = df.sortWithinPartitions(['term','week','subreddit'])
+    df = df.repartition(2000,colname)
+    df = df.sort([colname,'week','subreddit'])
+    df = df.sortWithinPartitions([colname,'week','subreddit'])
 
 
-df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
+    df.write.parquet(outparquet,mode='overwrite',compression='snappy')
+
+if __name__ == '__main__':
+    fire.Fire(main)

Community Data Science Collective || Want to submit a patch?