]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
Updates to similarities code for smap project.
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}", flush=True)
56     if week is not None:
57         tfidf_ds = ds.dataset(infile, partitioning='hive')
58     else: 
59         tfidf_ds = ds.dataset(infile)
60
61     if included_subreddits is None:
62         included_subreddits = select_topN_subreddits(topN)
63     else:
64         included_subreddits = set(map(str.strip,open(included_subreddits)))
65
66     ds_filter = ds.field("subreddit").isin(included_subreddits)
67
68     if min_df is not None:
69         ds_filter &= ds.field("count") >= min_df
70
71     if max_df is not None:
72         ds_filter &= ds.field("count") <= max_df
73
74     if week is not None:
75         ds_filter &= ds.field("week") == week
76
77     if from_date is not None:
78         ds_filter &= ds.field("week") >= from_date
79
80     if to_date is not None:
81         ds_filter &= ds.field("week") <= to_date
82
83     term = term_colname
84     term_id = term + '_id'
85     term_id_new = term + '_id_new'
86     
87     projection = {
88         'subreddit_id':ds.field('subreddit_id'),
89         term_id:ds.field(term_id),
90         'relative_tf':ds.field("relative_tf").cast('float32')
91         }
92
93     if not rescale_idf:
94         projection = {
95             'subreddit_id':ds.field('subreddit_id'),
96             term_id:ds.field(term_id),
97             'relative_tf':ds.field('relative_tf').cast('float32'),
98             'tf_idf':ds.field('tf_idf').cast('float32')}
99
100     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
101
102     df = df.to_pandas(split_blocks=True,self_destruct=True)
103     print("assigning indexes",flush=True)
104     if reindex:
105         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
106     else:
107         df['subreddit_id_new'] = df['subreddit_id']
108
109     if reindex:
110         grouped = df.groupby(term_id)
111         df[term_id_new] = grouped.ngroup()
112     else:
113         df[term_id_new] = df[term_id]
114
115     if rescale_idf:
116         print("computing idf", flush=True)
117         df['new_count'] = grouped[term_id].transform('count')
118         N_docs = df.subreddit_id_new.max() + 1
119         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
120         if tf_family == tf_weight.MaxTF:
121             df["tf_idf"] = df.relative_tf * df.idf
122         else: # tf_fam = tf_weight.Norm05
123             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
124
125     return (df, tfidf_ds, ds_filter)
126
127
128 def pull_names(batch):
129     return(batch.to_pandas().drop_duplicates())
130
131 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
132     '''
133     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
134     '''
135
136     def proc_sims(sims, outfile):
137         if issparse(sims):
138             sims = sims.todense()
139
140         print(f"shape of sims:{sims.shape}")
141         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
142         sims = pd.DataFrame(sims)
143         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
144         sims['_subreddit'] = subreddit_names.subreddit.values
145
146         p = Path(outfile)
147
148         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
149         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
150         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
151         p.parent.mkdir(exist_ok=True, parents=True)
152
153         sims.to_feather(outfile)
154
155     term = term_colname
156     term_id = term + '_id'
157     term_id_new = term + '_id_new'
158
159     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
160     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
161
162     print("loading matrix")        
163
164     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
165
166     print(f'computing similarities on mat. mat.shape:{mat.shape}')
167     print(f"size of mat is:{mat.data.nbytes}",flush=True)
168     # transform this to debug term tfidf
169     sims = simfunc(mat)
170     del mat
171
172     if hasattr(sims,'__next__'):
173         for simmat, name in sims:
174             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
175     else:
176         proc_sims(sims, outfile)
177
178 def write_weekly_similarities(path, sims, week, names):
179     sims['week'] = week
180     p = pathlib.Path(path)
181     if not p.is_dir():
182         p.mkdir(exist_ok=True,parents=True)
183         
184     # reformat as a pairwise list
185     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
186     sims.to_parquet(p / week.isoformat())
187
188 def column_overlaps(mat):
189     non_zeros = (mat != 0).astype('double')
190     
191     intersection = non_zeros.T @ non_zeros
192     card1 = non_zeros.sum(axis=0)
193     den = np.add.outer(card1,card1) - intersection
194
195     return intersection / den
196     
197 def test_lsi_sims():
198     term = "term"
199     term_id = term + '_id'
200     term_id_new = term + '_id_new'
201
202     t1 = time.perf_counter()
203     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
204                                              term_colname='term',
205                                              min_df=2000,
206                                              topN=10000
207                                              )
208     t2 = time.perf_counter()
209     print(f"first load took:{t2 - t1}s")
210
211     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
212                                              term_colname='term',
213                                              min_df=2000,
214                                              topN=10000
215                                              )
216     t3=time.perf_counter()
217
218     print(f"second load took:{t3 - t2}s")
219
220     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
221     sims = list(lsi_column_similarities(mat, [10,50]))
222     sims_og = sims
223     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
224
225 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
226 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
227 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
228 # this function takes the svd and then the column similarities of it
229 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
230     # first compute the lsi of the matrix
231     # then take the column similarities
232     print("running LSI",flush=True)
233
234     if type(n_components) is int:
235         n_components = [n_components]
236
237     n_components = sorted(n_components,reverse=True)
238     
239     svd_components = n_components[0]
240     
241     if lsi_model_load is not None:
242         mod = pickle.load(open(lsi_model_load ,'rb'))
243
244     else:
245         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
246         mod = svd.fit(tfidfmat.T)
247
248     lsimat = mod.transform(tfidfmat.T)
249     if lsi_model_save is not None:
250         pickle.dump(mod, open(lsi_model_save,'wb'))
251
252     sims_list = []
253     for n_dims in n_components:
254         sims = column_similarities(lsimat[:,np.arange(n_dims)])
255         if len(n_components) > 1:
256             yield (sims, n_dims)
257         else:
258             return sims
259
260 def column_similarities(mat):
261     return 1 - pairwise_distances(mat,metric='cosine')
262
263 # need to rewrite this so that subreddit ids and term ids are fixed over the whole thing.
264 # this affords taking the LSI similarities.
265 # fill all 0s if we don't have it.
266 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
267     term = term_colname
268     term_id = term + '_id'
269
270     # aggregate counts by week. now subreddit-term is distinct
271     df = df.filter(df.subreddit.isin(include_subs))
272     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
273
274     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
275     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
276     df = df.join(max_subreddit_terms, on=['subreddit','week'])
277     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
278
279     # group by term. term is unique
280     idf = df.groupby([term,'week']).count()
281
282     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
283
284     idf = idf.join(N_docs, on=['week'])
285
286     # add a little smoothing to the idf
287     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
288
289     # collect the dictionary to make a pydict of terms to indexes
290     terms = idf.select([term]).distinct() # terms are distinct
291
292     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
293
294     # make subreddit ids
295     subreddits = df.select(['subreddit']).distinct()
296     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
297
298     # df = df.cache()
299     df = df.join(subreddits,on=['subreddit'])
300
301     # map terms to indexes in the tfs and the idfs
302     df = df.join(terms,on=[term]) # subreddit-term-id is unique
303
304     idf = idf.join(terms,on=[term])
305
306     # join on subreddit/term to create tf/dfs indexed by term
307     df = df.join(idf, on=[term_id, term,'week'])
308
309     # agg terms by subreddit to make sparse tf/df vectors
310     
311     if tf_family == tf_weight.MaxTF:
312         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
313     else: # tf_fam = tf_weight.Norm05
314         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
315
316     df = df.repartition(400,'subreddit','week')
317     dfwriter = df.write.partitionBy("week")
318     return dfwriter
319
320 def _calc_tfidf(df, term_colname, tf_family):
321     term = term_colname
322     term_id = term + '_id'
323
324     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
325     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
326
327     df = df.join(max_subreddit_terms, on='subreddit')
328
329     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
330
331     # group by term. term is unique
332     idf = df.groupby([term]).count()
333     N_docs = df.select('subreddit').distinct().count()
334     # add a little smoothing to the idf
335     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
336
337     # collect the dictionary to make a pydict of terms to indexes
338     terms = idf.select(term).distinct() # terms are distinct
339     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
340
341     # make subreddit ids
342     subreddits = df.select(['subreddit']).distinct()
343     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
344
345     df = df.join(subreddits,on='subreddit')
346
347     # map terms to indexes in the tfs and the idfs
348     df = df.join(terms,on=term) # subreddit-term-id is unique
349
350     idf = idf.join(terms,on=term)
351
352     # join on subreddit/term to create tf/dfs indexed by term
353     df = df.join(idf, on=[term_id, term])
354
355     # agg terms by subreddit to make sparse tf/df vectors
356     if tf_family == tf_weight.MaxTF:
357         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
358     else: # tf_fam = tf_weight.Norm05
359         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
360
361     return df
362     
363
364 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
365     term = term_colname
366     term_id = term + '_id'
367     # aggregate counts by week. now subreddit-term is distinct
368     df = df.filter(df.subreddit.isin(include_subs))
369     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
370
371     df = _calc_tfidf(df, term_colname, tf_family)
372     df = df.repartition('subreddit')
373     dfwriter = df.write
374     return dfwriter
375
376 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
377     rankdf = pd.read_csv(path)
378     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
379     return included_subreddits
380
381
382 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
383                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
384     spark = SparkSession.builder.getOrCreate()
385     df = spark.read.parquet(inpath)
386     df = df.repartition(400,'subreddit')
387     df.write.parquet(outpath,mode='overwrite')
388
389     
390 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
391                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
392     spark = SparkSession.builder.getOrCreate()
393     df = spark.read.parquet(inpath)
394     df = df.repartition(400,'subreddit','week')
395     dfwriter = df.write.partitionBy("week")
396     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?