]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
13845d155200d04cb270308c6f61ef924900bdc2
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}", flush=True)
56     if week is not None:
57         tfidf_ds = ds.dataset(infile, partitioning='hive')
58     else: 
59         tfidf_ds = ds.dataset(infile)
60
61     if included_subreddits is None:
62         included_subreddits = select_topN_subreddits(topN)
63     else:
64         included_subreddits = set(map(str.strip,open(included_subreddits)))
65
66     ds_filter = ds.field("subreddit").isin(included_subreddits)
67
68     if min_df is not None:
69         ds_filter &= ds.field("count") >= min_df
70
71     if max_df is not None:
72         ds_filter &= ds.field("count") <= max_df
73
74     if week is not None:
75         ds_filter &= ds.field("week") == week
76
77     if from_date is not None:
78         ds_filter &= ds.field("week") >= from_date
79
80     if to_date is not None:
81         ds_filter &= ds.field("week") <= to_date
82
83     term = term_colname
84     term_id = term + '_id'
85     term_id_new = term + '_id_new'
86     
87     projection = {
88         'subreddit_id':ds.field('subreddit_id'),
89         term_id:ds.field(term_id),
90         'relative_tf':ds.field("relative_tf").cast('float32')
91         }
92
93     if not rescale_idf:
94         projection = {
95             'subreddit_id':ds.field('subreddit_id'),
96             term_id:ds.field(term_id),
97             'relative_tf':ds.field('relative_tf').cast('float32'),
98             'tf_idf':ds.field('tf_idf').cast('float32')}
99
100
101     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
102
103     df = df.to_pandas(split_blocks=True,self_destruct=True)
104     print("assigning indexes",flush=True)
105     if reindex:
106         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
107     else:
108         df['subreddit_id_new'] = df['subreddit_id']
109
110     if reindex:
111         grouped = df.groupby(term_id)
112         df[term_id_new] = grouped.ngroup()
113     else:
114         df[term_id_new] = df[term_id]
115
116     if rescale_idf:
117         print("computing idf", flush=True)
118         df['new_count'] = grouped[term_id].transform('count')
119         N_docs = df.subreddit_id_new.max() + 1
120         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
121         if tf_family == tf_weight.MaxTF:
122             df["tf_idf"] = df.relative_tf * df.idf
123         else: # tf_fam = tf_weight.Norm05
124             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
125
126     return (df, tfidf_ds, ds_filter)
127
128     with Pool(cpu_count()) as pool:
129         chunks = pool.imap_unordered(pull_names,batches) 
130         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
131
132     subreddit_names = subreddit_names.set_index("subreddit_id")
133     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
134     new_ids = new_ids.set_index('subreddit_id')
135     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
136     subreddit_names = subreddit_names.drop("subreddit_id",1)
137     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
138     return(df, subreddit_names)
139
140 def pull_names(batch):
141     return(batch.to_pandas().drop_duplicates())
142
143 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
144     '''
145     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
146     '''
147
148     def proc_sims(sims, outfile):
149         if issparse(sims):
150             sims = sims.todense()
151
152         print(f"shape of sims:{sims.shape}")
153         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
154         sims = pd.DataFrame(sims)
155         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
156         sims['_subreddit'] = subreddit_names.subreddit.values
157
158         p = Path(outfile)
159
160         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
161         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
162         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
163         p.parent.mkdir(exist_ok=True, parents=True)
164
165         sims.to_feather(outfile)
166
167     term = term_colname
168     term_id = term + '_id'
169     term_id_new = term + '_id_new'
170
171     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
172     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
173
174     print("loading matrix")        
175
176     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
177
178     print(f'computing similarities on mat. mat.shape:{mat.shape}')
179     print(f"size of mat is:{mat.data.nbytes}",flush=True)
180     sims = simfunc(mat)
181     del mat
182
183     if hasattr(sims,'__next__'):
184         for simmat, name in sims:
185             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
186     else:
187         proc_sims(sims, outfile)
188
189 def write_weekly_similarities(path, sims, week, names):
190     sims['week'] = week
191     p = pathlib.Path(path)
192     if not p.is_dir():
193         p.mkdir(exist_ok=True,parents=True)
194         
195     # reformat as a pairwise list
196     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
197     sims.to_parquet(p / week.isoformat())
198
199 def column_overlaps(mat):
200     non_zeros = (mat != 0).astype('double')
201     
202     intersection = non_zeros.T @ non_zeros
203     card1 = non_zeros.sum(axis=0)
204     den = np.add.outer(card1,card1) - intersection
205
206     return intersection / den
207     
208 def test_lsi_sims():
209     term = "term"
210     term_id = term + '_id'
211     term_id_new = term + '_id_new'
212
213     t1 = time.perf_counter()
214     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
215                                              term_colname='term',
216                                              min_df=2000,
217                                              topN=10000
218                                              )
219     t2 = time.perf_counter()
220     print(f"first load took:{t2 - t1}s")
221
222     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
223                                              term_colname='term',
224                                              min_df=2000,
225                                              topN=10000
226                                              )
227     t3=time.perf_counter()
228
229     print(f"second load took:{t3 - t2}s")
230
231     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
232     sims = list(lsi_column_similarities(mat, [10,50]))
233     sims_og = sims
234     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
235
236 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
237 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
238 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
239 # this function takes the svd and then the column similarities of it
240 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
241     # first compute the lsi of the matrix
242     # then take the column similarities
243     print("running LSI",flush=True)
244
245     if type(n_components) is int:
246         n_components = [n_components]
247
248     n_components = sorted(n_components,reverse=True)
249     
250     svd_components = n_components[0]
251     
252     if lsi_model_load is not None:
253         mod = pickle.load(open(lsi_model_load ,'rb'))
254
255     else:
256         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
257         mod = svd.fit(tfidfmat.T)
258
259     lsimat = mod.transform(tfidfmat.T)
260     if lsi_model_save is not None:
261         pickle.dump(mod, open(lsi_model_save,'wb'))
262
263     sims_list = []
264     for n_dims in n_components:
265         sims = column_similarities(lsimat[:,np.arange(n_dims)])
266         if len(n_components) > 1:
267             yield (sims, n_dims)
268         else:
269             return sims
270     
271
272 def column_similarities(mat):
273     return 1 - pairwise_distances(mat,metric='cosine')
274
275
276 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
277     term = term_colname
278     term_id = term + '_id'
279
280     # aggregate counts by week. now subreddit-term is distinct
281     df = df.filter(df.subreddit.isin(include_subs))
282     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
283
284     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
285     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
286     df = df.join(max_subreddit_terms, on=['subreddit','week'])
287     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
288
289     # group by term. term is unique
290     idf = df.groupby([term,'week']).count()
291
292     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
293
294     idf = idf.join(N_docs, on=['week'])
295
296     # add a little smoothing to the idf
297     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
298
299     # collect the dictionary to make a pydict of terms to indexes
300     terms = idf.select([term]).distinct() # terms are distinct
301
302     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
303
304     # make subreddit ids
305     subreddits = df.select(['subreddit']).distinct()
306     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
307
308     df = df.join(subreddits,on=['subreddit'])
309
310     # map terms to indexes in the tfs and the idfs
311     df = df.join(terms,on=[term]) # subreddit-term-id is unique
312
313     idf = idf.join(terms,on=[term])
314
315     # join on subreddit/term to create tf/dfs indexed by term
316     df = df.join(idf, on=[term_id, term,'week'])
317
318     # agg terms by subreddit to make sparse tf/df vectors
319     
320     if tf_family == tf_weight.MaxTF:
321         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
322     else: # tf_fam = tf_weight.Norm05
323         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
324
325     df = df.repartition(400,'subreddit','week')
326     dfwriter = df.write.partitionBy("week")
327     return dfwriter
328
329 def _calc_tfidf(df, term_colname, tf_family):
330     term = term_colname
331     term_id = term + '_id'
332
333     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
334     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
335
336     df = df.join(max_subreddit_terms, on='subreddit')
337
338     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
339
340     # group by term. term is unique
341     idf = df.groupby([term]).count()
342     N_docs = df.select('subreddit').distinct().count()
343     # add a little smoothing to the idf
344     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
345
346     # collect the dictionary to make a pydict of terms to indexes
347     terms = idf.select(term).distinct() # terms are distinct
348     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
349
350     # make subreddit ids
351     subreddits = df.select(['subreddit']).distinct()
352     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
353
354     df = df.join(subreddits,on='subreddit')
355
356     # map terms to indexes in the tfs and the idfs
357     df = df.join(terms,on=term) # subreddit-term-id is unique
358
359     idf = idf.join(terms,on=term)
360
361     # join on subreddit/term to create tf/dfs indexed by term
362     df = df.join(idf, on=[term_id, term])
363
364     # agg terms by subreddit to make sparse tf/df vectors
365     if tf_family == tf_weight.MaxTF:
366         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
367     else: # tf_fam = tf_weight.Norm05
368         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
369
370     return df
371     
372
373 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
374     term = term_colname
375     term_id = term + '_id'
376     # aggregate counts by week. now subreddit-term is distinct
377     df = df.filter(df.subreddit.isin(include_subs))
378     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
379
380     df = _calc_tfidf(df, term_colname, tf_family)
381     df = df.repartition('subreddit')
382     dfwriter = df.write
383     return dfwriter
384
385 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
386     rankdf = pd.read_csv(path)
387     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
388     return included_subreddits
389
390
391 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
392                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
393     spark = SparkSession.builder.getOrCreate()
394     df = spark.read.parquet(inpath)
395     df = df.repartition(400,'subreddit')
396     df.write.parquet(outpath,mode='overwrite')
397
398     
399 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
400                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
401     spark = SparkSession.builder.getOrCreate()
402     df = spark.read.parquet(inpath)
403     df = df.repartition(400,'subreddit','week')
404     dfwriter = df.write.partitionBy("week")
405     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?