]> code.communitydata.science - cdsc_reddit.git/blobdiff - tf_comments.py
git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit
[cdsc_reddit.git] / tf_comments.py
old mode 100644 (file)
new mode 100755 (executable)
index 85eebec..526bac2
@@ -1,10 +1,11 @@
+#!/usr/bin/env python3
+import pandas as pd
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
 import pyarrow as pa
 import pyarrow.dataset as ds
 import pyarrow.parquet as pq
 from itertools import groupby, islice, chain
 import fire
 from collections import Counter
-import pandas as pd
 import os
 import datetime
 import re
 import os
 import datetime
 import re
@@ -21,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
 # compute term frequencies for comments in each subreddit by week
 def weekly_tf(partition, mwe_pass = 'first'):
     dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
     if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
         os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
 
@@ -30,11 +30,13 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     ngram_output = partition.replace("parquet","txt")
 
 
     ngram_output = partition.replace("parquet","txt")
 
-    if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
-        os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+    if mwe_pass == 'first':
+        if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+            os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
     
     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
 
+
     schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
                         pa.field('term', pa.string(), nullable=False),
                         pa.field('week', pa.date32(), nullable=False),
     schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
                         pa.field('term', pa.string(), nullable=False),
                         pa.field('week', pa.date32(), nullable=False),
@@ -64,7 +66,16 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
 
     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
 
-    mwe_tokenize = MWETokenizer().tokenize
+    if mwe_pass != 'first':
+        mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
+        mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
+        mwe_phrases = list(mwe_dataset.phrase)
+        mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
+        mwe_tokenizer = MWETokenizer(mwe_phrases)
+        mwe_tokenize = mwe_tokenizer.tokenize
+    
+    else:
+        mwe_tokenize = MWETokenizer().tokenize
 
     def remove_punct(sentence):
         new_sentence = []
 
     def remove_punct(sentence):
         new_sentence = []
@@ -119,8 +130,11 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
         else:
             # remove stopWords
 
         else:
             # remove stopWords
+            sentences = map(mwe_tokenize, sentences)
             sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences)
             sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences)
-            return chain(* sentences)
+            for sentence in sentences:
+                for token in sentence:
+                    yield token
 
     def tf_comments(subreddit_weeks):
         for key, posts in subreddit_weeks:
 
     def tf_comments(subreddit_weeks):
         for key, posts in subreddit_weeks:
@@ -142,36 +156,41 @@ def weekly_tf(partition, mwe_pass = 'first'):
 
     outchunksize = 10000
 
 
     outchunksize = 10000
 
-    with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+    with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
+    
         while True:
         while True:
+
             chunk = islice(outrows,outchunksize)
             chunk = islice(outrows,outchunksize)
+            chunk = (c for c in chunk if c[1] is not None)
             pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
             pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
-            print(pddf)
-            author_pddf = pddf.loc[pddf.is_token == False]
+            author_pddf = pddf.loc[pddf.is_token == False, schema.names]
+            pddf = pddf.loc[pddf.is_token == True, schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
             author_pddf = author_pddf.loc[:,author_schema.names]
-            
-            pddf = pddf.loc[pddf.is_token == True, schema.names]
-
-            print(pddf)
-            print(author_pddf)
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
             table = pa.Table.from_pandas(pddf,schema=schema)
             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
-            if table.shape[0] == 0:
+            do_break = True
+
+            if table.shape[0] != 0:
+                writer.write_table(table)
+                do_break = False
+            if author_table.shape[0] != 0:
+                author_writer.write_table(author_table)
+                do_break = False
+
+            if do_break:
                 break
                 break
-            writer.write_table(table)
-            author_writer.write_table(author_table)
-            
+
         writer.close()
         author_writer.close()
 
 
         writer.close()
         author_writer.close()
 
 
-def gen_task_list():
+def gen_task_list(mwe_pass='first'):
     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
     with open("tf_task_list",'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
     with open("tf_task_list",'w') as outfile:
         for f in files:
             if f.endswith(".parquet"):
-                outfile.write(f"source python3 tf_comments.py weekly_tf {f}\n")
+                outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,
 
 if __name__ == "__main__":
     fire.Fire({"gen_task_list":gen_task_list,

Community Data Science Collective || Want to submit a patch?