]> code.communitydata.science - cdsc_reddit.git/blob - ngrams/tf_comments.py
git-annex in
[cdsc_reddit.git] / ngrams / tf_comments.py
1 #!/usr/bin/env python3
2 import pandas as pd
3 import pyarrow as pa
4 import pyarrow.dataset as ds
5 import pyarrow.parquet as pq
6 from itertools import groupby, islice, chain
7 import fire
8 from collections import Counter
9 import os
10 import re
11 from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
12 from nltk.corpus import stopwords
13 from nltk.util import ngrams
14 import string
15 from random import random
16 from redditcleaner import clean
17 from pathlib import Path
18
19 # compute term frequencies for comments in each subreddit by week
20 def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
21
22     dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
23     outputdir = Path(outputdir)
24     samppath = outputdir / "reddit_comment_ngrams_10p_sample"
25
26     if not samppath.exists():
27         samppath.mkdir(parents=True, exist_ok=True)
28
29     ngram_output = partition.replace("parquet","txt")
30
31     if excluded_users is not None:
32         excluded_users = set(map(str.strip,open(excluded_users)))
33         df = df.filter(~ (f.col("author").isin(excluded_users)))
34
35
36     ngram_path = samppath / ngram_output
37     if mwe_pass == 'first':
38         if ngram_path.exists():
39             ngram_path.unlink()
40     
41     batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
42
43
44     schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
45                         pa.field('term', pa.string(), nullable=False),
46                         pa.field('week', pa.date32(), nullable=False),
47                         pa.field('tf', pa.int64(), nullable=False)]
48     )
49
50     author_schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
51                                pa.field('author', pa.string(), nullable=False),
52                                pa.field('week', pa.date32(), nullable=False),
53                                pa.field('tf', pa.int64(), nullable=False)]
54     )
55
56     dfs = (b.to_pandas() for b in batches)
57
58     def add_week(df):
59         df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
60         return(df)
61
62     dfs = (add_week(df) for df in dfs)
63
64     def iterate_rows(dfs):
65         for df in dfs:
66             for row in df.itertuples():
67                 yield row
68
69     rows = iterate_rows(dfs)
70
71     subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
72
73     mwe_path = outputdir / "multiword_expressions.feather"
74
75     if mwe_pass != 'first':
76         mwe_dataset = pd.read_feather(mwe_path)
77         mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
78         mwe_phrases = list(mwe_dataset.phrase)
79         mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
80         mwe_tokenizer = MWETokenizer(mwe_phrases)
81         mwe_tokenize = mwe_tokenizer.tokenize
82     
83     else:
84         mwe_tokenize = MWETokenizer().tokenize
85
86     def remove_punct(sentence):
87         new_sentence = []
88         for token in sentence:
89             new_token = ''
90             for c in token:
91                 if c not in string.punctuation:
92                     new_token += c
93             if len(new_token) > 0:
94                 new_sentence.append(new_token)
95         return new_sentence
96
97     stopWords = set(stopwords.words('english'))
98
99     # we follow the approach described in datta, phelan, adar 2017
100     def my_tokenizer(text):
101         # remove stopwords, punctuation, urls, lower case
102         # lowercase        
103         text = text.lower()
104
105         # redditcleaner removes reddit markdown(newlines, quotes, bullet points, links, strikethrough, spoiler, code, superscript, table, headings)
106         text = clean(text)
107
108         # sentence tokenize
109         sentences = sent_tokenize(text)
110
111         # wordpunct_tokenize
112         sentences = map(wordpunct_tokenize, sentences)
113
114         # remove punctuation
115                         
116         sentences = map(remove_punct, sentences)
117         # datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase.
118         # they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms
119         # here we take a 10 percent sample of sentences 
120         if mwe_pass == 'first':
121
122             # remove sentences with less than 2 words
123             sentences = filter(lambda sentence: len(sentence) > 2, sentences)
124             sentences = list(sentences)
125             for sentence in sentences:
126                 if random() <= 0.1:
127                     grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
128                     with open(ngram_path,'a') as gram_file:
129                         for ng in grams:
130                             gram_file.write(' '.join(ng) + '\n')
131                 for token in sentence:
132                     if token not in stopWords:
133                         yield token
134
135         else:
136             # remove stopWords
137             sentences = map(mwe_tokenize, sentences)
138             sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences)
139             for sentence in sentences:
140                 for token in sentence:
141                     yield token
142
143     def tf_comments(subreddit_weeks):
144         for key, posts in subreddit_weeks:
145             subreddit, week = key
146             tfs = Counter([])
147             authors = Counter([])
148             for post in posts:
149                 tokens = my_tokenizer(post.body)
150                 tfs.update(tokens)
151                 authors.update([post.author])
152
153             for term, tf in tfs.items():
154                 yield [True, subreddit, term, week, tf]
155
156             for author, tf in authors.items():
157                 yield [False, subreddit, author, week, tf]
158
159     outrows = tf_comments(subreddit_weeks)
160
161     outchunksize = 10000
162     
163     termtf_outputdir = (outputdir / "comment_terms")
164     termtf_outputdir.mkdir(parents=True, exist_ok=True)
165     authortf_outputdir = (outputdir / "comment_authors")
166     authortf_outputdir.mkdir(parents=True, exist_ok=True)    
167     termtf_path = termtf_outputdir / partition
168     authortf_path = authortf_outputdir / partition
169     with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \
170          pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer:
171     
172         while True:
173
174             chunk = islice(outrows,outchunksize)
175             chunk = (c for c in chunk if c[1] is not None)
176             pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
177             author_pddf = pddf.loc[pddf.is_token == False, schema.names]
178             pddf = pddf.loc[pddf.is_token == True, schema.names]
179             author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
180             author_pddf = author_pddf.loc[:,author_schema.names]
181             table = pa.Table.from_pandas(pddf,schema=schema)
182             author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
183             do_break = True
184
185             if table.shape[0] != 0:
186                 writer.write_table(table)
187                 do_break = False
188             if author_table.shape[0] != 0:
189                 author_writer.write_table(author_table)
190                 do_break = False
191
192             if do_break:
193                 break
194
195         writer.close()
196         author_writer.close()
197
198
199 def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
200     files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
201     with open(tf_task_list,'w') as outfile:
202         for f in files:
203             if f.endswith(".parquet"):
204                 outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
205
206 if __name__ == "__main__":
207     fire.Fire({"gen_task_list":gen_task_list,
208                "weekly_tf":weekly_tf})

Community Data Science Collective || Want to submit a patch?