]> code.communitydata.science - cdsc_reddit.git/blobdiff - top_comment_phrases.py
Reuse code for term and author cosine similarity.
[cdsc_reddit.git] / top_comment_phrases.py
index 707ceaa865aeee5954ea5740d03fc7014a00b1b5..031cba543c951f3a1c7fcf9e407cf7d0e875dd73 100644 (file)
@@ -8,18 +8,19 @@ df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p
 
 df = df.withColumnRenamed("value","phrase")
 
 
 df = df.withColumnRenamed("value","phrase")
 
+# count phrase occurrances
+phrases = df.groupby('phrase').count()
+phrases = phrases.withColumnRenamed('count','phraseCount')
+phrases = phrases.filter(phrases.phraseCount > 10)
+
 
 # count overall
 
 # count overall
-N = df.count()
+N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
+
 print(f'analyzing PMI on a sample of {N} phrases') 
 logN = np.log(N)
 print(f'analyzing PMI on a sample of {N} phrases') 
 logN = np.log(N)
-
-# count phrase occurrances
-phrases = df.groupby('phrase').count()
-phrases = phrases.withColumnRenamed('count','phraseCount')
 phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
 
 phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
 
-
 # count term occurrances
 phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
 terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
 # count term occurrances
 phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
 terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
@@ -38,8 +39,20 @@ terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogP
 
 df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
 
 
 df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
 
-df = df.repartition('phrasePWMI')
 df = df.sort(['phrasePWMI'],descending=True)
 df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
 df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
 df = df.sort(['phrasePWMI'],descending=True)
 df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
 df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
+
+df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/")
+
 df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
 df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
+
+df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet")
+df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
+
+# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
+#
+df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
+df = df.toPandas()
+df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather")
+df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")

Community Data Science Collective || Want to submit a patch?