]> code.communitydata.science - cdsc_reddit.git/blob - top_comment_phrases.py
707ceaa865aeee5954ea5740d03fc7014a00b1b5
[cdsc_reddit.git] / top_comment_phrases.py
1 from pyspark.sql import functions as f
2 from pyspark.sql import Window
3 from pyspark.sql import SparkSession
4 import numpy as np
5
6 spark = SparkSession.builder.getOrCreate()
7 df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
8
9 df = df.withColumnRenamed("value","phrase")
10
11
12 # count overall
13 N = df.count()
14 print(f'analyzing PMI on a sample of {N} phrases') 
15 logN = np.log(N)
16
17 # count phrase occurrances
18 phrases = df.groupby('phrase').count()
19 phrases = phrases.withColumnRenamed('count','phraseCount')
20 phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
21
22
23 # count term occurrances
24 phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
25 terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
26
27 win = Window.partitionBy('term')
28 terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
29 terms = terms.withColumnRenamed('count','termCount')
30 terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
31
32 terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
33 terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
34 terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
35
36 # join phrases to term counts
37
38
39 df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
40
41 df = df.repartition('phrasePWMI')
42 df = df.sort(['phrasePWMI'],descending=True)
43 df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
44 df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
45 df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')

Community Data Science Collective || Want to submit a patch?