]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
git-annex in
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}", flush=True)
56     if week is not None:
57         tfidf_ds = ds.dataset(infile, partitioning='hive')
58     else: 
59         tfidf_ds = ds.dataset(infile)
60
61     if included_subreddits is None:
62         included_subreddits = select_topN_subreddits(topN)
63     else:
64         included_subreddits = set(map(str.strip,open(included_subreddits)))
65
66     ds_filter = ds.field("subreddit").isin(included_subreddits)
67
68     if min_df is not None:
69         ds_filter &= ds.field("count") >= min_df
70
71     if max_df is not None:
72         ds_filter &= ds.field("count") <= max_df
73
74     if week is not None:
75         ds_filter &= ds.field("week") == week
76
77     if from_date is not None:
78         ds_filter &= ds.field("week") >= from_date
79
80     if to_date is not None:
81         ds_filter &= ds.field("week") <= to_date
82
83     term = term_colname
84     term_id = term + '_id'
85     term_id_new = term + '_id_new'
86     
87     projection = {
88         'subreddit_id':ds.field('subreddit_id'),
89         term_id:ds.field(term_id),
90         'relative_tf':ds.field("relative_tf").cast('float32')
91         }
92
93     if not rescale_idf:
94         projection = {
95             'subreddit_id':ds.field('subreddit_id'),
96             term_id:ds.field(term_id),
97             'relative_tf':ds.field('relative_tf').cast('float32'),
98             'tf_idf':ds.field('tf_idf').cast('float32')}
99
100         print(projection)
101
102     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
103
104     df = df.to_pandas(split_blocks=True,self_destruct=True)
105     print("assigning indexes",flush=True)
106     if reindex:
107         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
108     else:
109         df['subreddit_id_new'] = df['subreddit_id']
110
111     if reindex:
112         grouped = df.groupby(term_id)
113         df[term_id_new] = grouped.ngroup()
114     else:
115         df[term_id_new] = df[term_id]
116
117     if rescale_idf:
118         print("computing idf", flush=True)
119         df['new_count'] = grouped[term_id].transform('count')
120         N_docs = df.subreddit_id_new.max() + 1
121         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
122         if tf_family == tf_weight.MaxTF:
123             df["tf_idf"] = df.relative_tf * df.idf
124         else: # tf_fam = tf_weight.Norm05
125             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
126
127     return (df, tfidf_ds, ds_filter)
128
129     with Pool(cpu_count()) as pool:
130         chunks = pool.imap_unordered(pull_names,batches) 
131         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
132
133     subreddit_names = subreddit_names.set_index("subreddit_id")
134     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
135     new_ids = new_ids.set_index('subreddit_id')
136     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
137     subreddit_names = subreddit_names.drop("subreddit_id",1)
138     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
139     return(df, subreddit_names)
140
141 def pull_names(batch):
142     return(batch.to_pandas().drop_duplicates())
143
144 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
145     '''
146     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
147     '''
148
149     def proc_sims(sims, outfile):
150         if issparse(sims):
151             sims = sims.todense()
152
153         print(f"shape of sims:{sims.shape}")
154         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
155         sims = pd.DataFrame(sims)
156         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
157         sims['_subreddit'] = subreddit_names.subreddit.values
158
159         p = Path(outfile)
160
161         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
162         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
163         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
164         p.parent.mkdir(exist_ok=True, parents=True)
165
166         sims.to_feather(outfile)
167
168     term = term_colname
169     term_id = term + '_id'
170     term_id_new = term + '_id_new'
171
172     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
173     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
174
175     print("loading matrix")        
176
177     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
178
179     print(f'computing similarities on mat. mat.shape:{mat.shape}')
180     print(f"size of mat is:{mat.data.nbytes}",flush=True)
181     sims = simfunc(mat)
182     del mat
183
184     if hasattr(sims,'__next__'):
185         for simmat, name in sims:
186             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
187     else:
188         proc_sims(sims, outfile)
189
190 def write_weekly_similarities(path, sims, week, names):
191     sims['week'] = week
192     p = pathlib.Path(path)
193     if not p.is_dir():
194         p.mkdir(exist_ok=True,parents=True)
195         
196     # reformat as a pairwise list
197     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
198     sims.to_parquet(p / week.isoformat())
199
200 def column_overlaps(mat):
201     non_zeros = (mat != 0).astype('double')
202     
203     intersection = non_zeros.T @ non_zeros
204     card1 = non_zeros.sum(axis=0)
205     den = np.add.outer(card1,card1) - intersection
206
207     return intersection / den
208     
209 def test_lsi_sims():
210     term = "term"
211     term_id = term + '_id'
212     term_id_new = term + '_id_new'
213
214     t1 = time.perf_counter()
215     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
216                                              term_colname='term',
217                                              min_df=2000,
218                                              topN=10000
219                                              )
220     t2 = time.perf_counter()
221     print(f"first load took:{t2 - t1}s")
222
223     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
224                                              term_colname='term',
225                                              min_df=2000,
226                                              topN=10000
227                                              )
228     t3=time.perf_counter()
229
230     print(f"second load took:{t3 - t2}s")
231
232     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
233     sims = list(lsi_column_similarities(mat, [10,50]))
234     sims_og = sims
235     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
236
237 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
238 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
239 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
240 # this function takes the svd and then the column similarities of it
241 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
242     # first compute the lsi of the matrix
243     # then take the column similarities
244
245     if type(n_components) is int:
246         n_components = [n_components]
247
248     n_components = sorted(n_components,reverse=True)
249     
250     svd_components = n_components[0]
251     
252     if lsi_model_load is not None and Path(lsi_model_load).exists():
253         print("loading LSI")
254         mod = pickle.load(open(lsi_model_load ,'rb'))
255         lsi_model_save = lsi_model_load
256
257     else:
258         print("running LSI",flush=True)
259
260         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
261         mod = svd.fit(tfidfmat.T)
262
263     lsimat = mod.transform(tfidfmat.T)
264     if lsi_model_save is not None:
265         Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
266         pickle.dump(mod, open(lsi_model_save,'wb'))
267
268     sims_list = []
269     for n_dims in n_components:
270         sims = column_similarities(lsimat[:,np.arange(n_dims)])
271         if len(n_components) > 1:
272             yield (sims, n_dims)
273         else:
274             return sims
275     
276
277 def column_similarities(mat):
278     return 1 - pairwise_distances(mat,metric='cosine')
279
280
281 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
282     term = term_colname
283     term_id = term + '_id'
284
285     # aggregate counts by week. now subreddit-term is distinct
286     df = df.filter(df.subreddit.isin(include_subs))
287     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
288
289     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
290     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
291     df = df.join(max_subreddit_terms, on=['subreddit','week'])
292     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
293
294     # group by term. term is unique
295     idf = df.groupby([term,'week']).count()
296
297     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
298
299     idf = idf.join(N_docs, on=['week'])
300
301     # add a little smoothing to the idf
302     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
303
304     # collect the dictionary to make a pydict of terms to indexes
305     terms = idf.select([term]).distinct() # terms are distinct
306
307     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
308
309     # make subreddit ids
310     subreddits = df.select(['subreddit']).distinct()
311     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
312
313     df = df.join(subreddits,on=['subreddit'])
314
315     # map terms to indexes in the tfs and the idfs
316     df = df.join(terms,on=[term]) # subreddit-term-id is unique
317
318     idf = idf.join(terms,on=[term])
319
320     # join on subreddit/term to create tf/dfs indexed by term
321     df = df.join(idf, on=[term_id, term,'week'])
322
323     # agg terms by subreddit to make sparse tf/df vectors
324     
325     if tf_family == tf_weight.MaxTF:
326         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
327     else: # tf_fam = tf_weight.Norm05
328         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
329
330     df = df.repartition(400,'subreddit','week')
331     dfwriter = df.write.partitionBy("week")
332     return dfwriter
333
334 def _calc_tfidf(df, term_colname, tf_family):
335     term = term_colname
336     term_id = term + '_id'
337
338     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
339     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
340
341     df = df.join(max_subreddit_terms, on='subreddit')
342
343     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
344
345     # group by term. term is unique
346     idf = df.groupby([term]).count()
347     N_docs = df.select('subreddit').distinct().count()
348     # add a little smoothing to the idf
349     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
350
351     # collect the dictionary to make a pydict of terms to indexes
352     terms = idf.select(term).distinct() # terms are distinct
353     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
354
355     # make subreddit ids
356     subreddits = df.select(['subreddit']).distinct()
357     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
358
359     df = df.join(subreddits,on='subreddit')
360
361     # map terms to indexes in the tfs and the idfs
362     df = df.join(terms,on=term) # subreddit-term-id is unique
363
364     idf = idf.join(terms,on=term)
365
366     # join on subreddit/term to create tf/dfs indexed by term
367     df = df.join(idf, on=[term_id, term])
368
369     # agg terms by subreddit to make sparse tf/df vectors
370     if tf_family == tf_weight.MaxTF:
371         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
372     else: # tf_fam = tf_weight.Norm05
373         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
374
375     return df
376     
377
378 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
379     term = term_colname
380     term_id = term + '_id'
381     # aggregate counts by week. now subreddit-term is distinct
382     df = df.filter(df.subreddit.isin(include_subs))
383     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
384
385     df = _calc_tfidf(df, term_colname, tf_family)
386     df = df.repartition('subreddit')
387     dfwriter = df.write
388     return dfwriter
389
390 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
391     rankdf = pd.read_csv(path)
392     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
393     return included_subreddits
394
395
396 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
397                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
398     spark = SparkSession.builder.getOrCreate()
399     df = spark.read.parquet(inpath)
400     df = df.repartition(400,'subreddit')
401     df.write.parquet(outpath,mode='overwrite')
402
403     
404 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
405                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
406     spark = SparkSession.builder.getOrCreate()
407     df = spark.read.parquet(inpath)
408     df = df.repartition(400,'subreddit','week')
409     dfwriter = df.write.partitionBy("week")
410     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?