]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18
19 class tf_weight(Enum):
20     MaxTF = 1
21     Norm05 = 2
22
23 infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
24 cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
25
26 # subreddits missing after this step don't have any terms that have a high enough idf
27 # try rewriting without merges
28 def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
29     print("loading tfidf", flush=True)
30     tfidf_ds = ds.dataset(infile)
31
32     if included_subreddits is None:
33         included_subreddits = select_topN_subreddits(topN)
34     else:
35         included_subreddits = set(open(included_subreddits))
36
37     ds_filter = ds.field("subreddit").isin(included_subreddits)
38
39     if min_df is not None:
40         ds_filter &= ds.field("count") >= min_df
41
42     if max_df is not None:
43         ds_filter &= ds.field("count") <= max_df
44
45     if week is not None:
46         ds_filter &= ds.field("week") == week
47
48     if from_date is not None:
49         ds_filter &= ds.field("week") >= from_date
50
51     if to_date is not None:
52         ds_filter &= ds.field("week") <= to_date
53
54     term = term_colname
55     term_id = term + '_id'
56     term_id_new = term + '_id_new'
57     
58     projection = {
59         'subreddit_id':ds.field('subreddit_id'),
60         term_id:ds.field(term_id),
61         'relative_tf':ds.field("relative_tf").cast('float32')
62         }
63
64     if not rescale_idf:
65         projection = {
66             'subreddit_id':ds.field('subreddit_id'),
67             term_id:ds.field(term_id),
68             'relative_tf':ds.field('relative_tf').cast('float32'),
69             'tf_idf':ds.field('tf_idf').cast('float32')}
70
71     tfidf_ds = ds.dataset(infile)
72
73     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
74
75     df = df.to_pandas(split_blocks=True,self_destruct=True)
76     print("assigning indexes",flush=True)
77     df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
78     grouped = df.groupby(term_id)
79     df[term_id_new] = grouped.ngroup()
80
81     if rescale_idf:
82         print("computing idf", flush=True)
83         df['new_count'] = grouped[term_id].transform('count')
84         N_docs = df.subreddit_id_new.max() + 1
85         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
86         if tf_family == tf_weight.MaxTF:
87             df["tf_idf"] = df.relative_tf * df.idf
88         else: # tf_fam = tf_weight.Norm05
89             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
90
91     print("assigning names")
92     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
93     batches = subreddit_names.to_batches()
94
95     with Pool(cpu_count()) as pool:
96         chunks = pool.imap_unordered(pull_names,batches) 
97         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
98
99     subreddit_names = subreddit_names.set_index("subreddit_id")
100     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
101     new_ids = new_ids.set_index('subreddit_id')
102     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
103     subreddit_names = subreddit_names.drop("subreddit_id",1)
104     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
105     return(df, subreddit_names)
106
107 def pull_names(batch):
108     return(batch.to_pandas().drop_duplicates())
109
110 def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
111     '''
112     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
113     '''
114
115     def proc_sims(sims, outfile):
116         if issparse(sims):
117             sims = sims.todense()
118
119         print(f"shape of sims:{sims.shape}")
120         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
121         sims = pd.DataFrame(sims)
122         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
123         sims['_subreddit'] = subreddit_names.subreddit.values
124
125         p = Path(outfile)
126
127         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
128         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
129         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
130         outfile.parent.mkdir(exist_ok=True, parents=True)
131
132         sims.to_feather(outfile)
133
134     term = term_colname
135     term_id = term + '_id'
136     term_id_new = term + '_id_new'
137
138     entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
139     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
140
141     print("loading matrix")        
142
143     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
144
145     print(f'computing similarities on mat. mat.shape:{mat.shape}')
146     print(f"size of mat is:{mat.data.nbytes}",flush=True)
147     sims = simfunc(mat)
148     del mat
149
150     if hasattr(sims,'__next__'):
151         for simmat, name in sims:
152             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
153     else:
154         proc_sims(simmat, outfile)
155
156 def write_weekly_similarities(path, sims, week, names):
157     sims['week'] = week
158     p = pathlib.Path(path)
159     if not p.is_dir():
160         p.mkdir(exist_ok=True,parents=True)
161         
162     # reformat as a pairwise list
163     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
164     sims.to_parquet(p / week.isoformat())
165
166 def column_overlaps(mat):
167     non_zeros = (mat != 0).astype('double')
168     
169     intersection = non_zeros.T @ non_zeros
170     card1 = non_zeros.sum(axis=0)
171     den = np.add.outer(card1,card1) - intersection
172
173     return intersection / den
174     
175 def test_lsi_sims():
176     term = "term"
177     term_id = term + '_id'
178     term_id_new = term + '_id_new'
179
180     t1 = time.perf_counter()
181     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
182                                              term_colname='term',
183                                              min_df=2000,
184                                              topN=10000
185                                              )
186     t2 = time.perf_counter()
187     print(f"first load took:{t2 - t1}s")
188
189     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
190                                              term_colname='term',
191                                              min_df=2000,
192                                              topN=10000
193                                              )
194     t3=time.perf_counter()
195
196     print(f"second load took:{t3 - t2}s")
197
198     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
199     sims = list(lsi_column_similarities(mat, [10,50]))
200     sims_og = sims
201     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
202
203 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
204 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
205 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
206 # this function takes the svd and then the column similarities of it
207 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
208     # first compute the lsi of the matrix
209     # then take the column similarities
210     print("running LSI",flush=True)
211
212     if type(n_components) is int:
213         n_components = [n_components]
214
215     n_components = sorted(n_components,reverse=True)
216     
217     svd_components = n_components[0]
218     svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
219     mod = svd.fit(tfidfmat.T)
220     lsimat = mod.transform(tfidfmat.T)
221     for n_dims in n_components:
222         sims = column_similarities(lsimat[:,np.arange(n_dims)])
223         if len(n_components) > 1:
224             yield (sims, n_dims)
225         else:
226             return sims
227     
228
229 def column_similarities(mat):
230     return 1 - pairwise_distances(mat,metric='cosine')
231
232
233 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
234     term = term_colname
235     term_id = term + '_id'
236
237     # aggregate counts by week. now subreddit-term is distinct
238     df = df.filter(df.subreddit.isin(include_subs))
239     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
240
241     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
242     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
243     df = df.join(max_subreddit_terms, on=['subreddit','week'])
244     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
245
246     # group by term. term is unique
247     idf = df.groupby([term,'week']).count()
248
249     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
250
251     idf = idf.join(N_docs, on=['week'])
252
253     # add a little smoothing to the idf
254     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
255
256     # collect the dictionary to make a pydict of terms to indexes
257     terms = idf.select([term,'week']).distinct() # terms are distinct
258
259     terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
260
261     # make subreddit ids
262     subreddits = df.select(['subreddit','week']).distinct()
263     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
264
265     df = df.join(subreddits,on=['subreddit','week'])
266
267     # map terms to indexes in the tfs and the idfs
268     df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
269
270     idf = idf.join(terms,on=[term,'week'])
271
272     # join on subreddit/term to create tf/dfs indexed by term
273     df = df.join(idf, on=[term_id, term,'week'])
274
275     # agg terms by subreddit to make sparse tf/df vectors
276     
277     if tf_family == tf_weight.MaxTF:
278         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
279     else: # tf_fam = tf_weight.Norm05
280         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
281
282     df = df.repartition(400,'subreddit','week')
283     dfwriter = df.write.partitionBy("week")
284     return dfwriter
285
286 def _calc_tfidf(df, term_colname, tf_family):
287     term = term_colname
288     term_id = term + '_id'
289
290     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
291     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
292
293     df = df.join(max_subreddit_terms, on='subreddit')
294
295     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
296
297     # group by term. term is unique
298     idf = df.groupby([term]).count()
299     N_docs = df.select('subreddit').distinct().count()
300     # add a little smoothing to the idf
301     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
302
303     # collect the dictionary to make a pydict of terms to indexes
304     terms = idf.select(term).distinct() # terms are distinct
305     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
306
307     # make subreddit ids
308     subreddits = df.select(['subreddit']).distinct()
309     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
310
311     df = df.join(subreddits,on='subreddit')
312
313     # map terms to indexes in the tfs and the idfs
314     df = df.join(terms,on=term) # subreddit-term-id is unique
315
316     idf = idf.join(terms,on=term)
317
318     # join on subreddit/term to create tf/dfs indexed by term
319     df = df.join(idf, on=[term_id, term])
320
321     # agg terms by subreddit to make sparse tf/df vectors
322     if tf_family == tf_weight.MaxTF:
323         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
324     else: # tf_fam = tf_weight.Norm05
325         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
326
327     return df
328     
329
330 def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
331     term = term_colname
332     term_id = term + '_id'
333     # aggregate counts by week. now subreddit-term is distinct
334     df = df.filter(df.subreddit.isin(include_subs))
335     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
336
337     df = _calc_tfidf(df, term_colname, tf_family)
338     df = df.repartition('subreddit')
339     dfwriter = df.write
340     return dfwriter
341
342 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
343     rankdf = pd.read_csv(path)
344     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
345     return included_subreddits
346
347
348 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
349                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
350     spark = SparkSession.builder.getOrCreate()
351     df = spark.read.parquet(inpath)
352     df = df.repartition(400,'subreddit')
353     df.write.parquet(outpath,mode='overwrite')
354
355     
356 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
357                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
358     spark = SparkSession.builder.getOrCreate()
359     df = spark.read.parquet(inpath)
360     df = df.repartition(400,'subreddit','week')
361     dfwriter = df.write.partitionBy("week")
362     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?