]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
changes for archiving.
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",axis=1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True)
56
57     if week is not None:
58         tfidf_ds = ds.dataset(infile, partitioning='hive')
59     else: 
60         tfidf_ds = ds.dataset(infile)
61
62     if included_subreddits is None:
63         included_subreddits = select_topN_subreddits(topN)
64     else:
65         included_subreddits = set(map(str.strip,open(included_subreddits)))
66
67     ds_filter = ds.field("subreddit").isin(included_subreddits)
68
69     if min_df is not None:
70         ds_filter &= ds.field("count") >= min_df
71
72     if max_df is not None:
73         ds_filter &= ds.field("count") <= max_df
74
75     if week is not None:
76         ds_filter &= ds.field("week") == week
77
78     if from_date is not None:
79         ds_filter &= ds.field("week") >= from_date
80
81     if to_date is not None:
82         ds_filter &= ds.field("week") <= to_date
83
84     term = term_colname
85     term_id = term + '_id'
86     term_id_new = term + '_id_new'
87     
88     projection = {
89         'subreddit_id':ds.field('subreddit_id'),
90         term_id:ds.field(term_id),
91         'relative_tf':ds.field("relative_tf").cast('float32')
92         }
93
94     if not rescale_idf:
95         projection = {
96             'subreddit_id':ds.field('subreddit_id'),
97             term_id:ds.field(term_id),
98             'relative_tf':ds.field('relative_tf').cast('float32'),
99             'tf_idf':ds.field('tf_idf').cast('float32')}
100
101     print(projection, flush=True)
102     print(ds_filter, flush=True)
103     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
104
105     df = df.to_pandas(split_blocks=True,self_destruct=True)
106     print("assigning indexes",flush=True)
107     if reindex:
108         print("assigning indexes",flush=True)
109         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1
110     else:
111         df['subreddit_id_new'] = df['subreddit_id']
112
113     if reindex:
114         grouped = df.groupby(term_id)
115         df[term_id_new] = grouped.ngroup() + 1 
116     else:
117         df[term_id_new] = df[term_id]
118
119     if rescale_idf:
120         print("computing idf", flush=True)
121         df['new_count'] = grouped[term_id].transform('count')
122         N_docs = df.subreddit_id_new.max() + 1
123         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
124         if tf_family == tf_weight.MaxTF:
125             df["tf_idf"] = df.relative_tf * df.idf
126         else: # tf_fam = tf_weight.Norm05
127             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
128
129     return (df, tfidf_ds, ds_filter)
130
131
132 def pull_names(batch):
133     return(batch.to_pandas().drop_duplicates())
134
135 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
136     '''
137     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
138     '''
139
140     def proc_sims(sims, outfile):
141         if issparse(sims):
142             sims = sims.todense()
143
144         print(f"shape of sims:{sims.shape}")
145         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
146         sims = pd.DataFrame(sims)
147         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
148         sims['_subreddit'] = subreddit_names.subreddit.values
149
150         p = Path(outfile)
151
152         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
153         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
154         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
155         p.parent.mkdir(exist_ok=True, parents=True)
156
157         sims.to_feather(outfile)
158
159     term = term_colname
160     term_id = term + '_id'
161     term_id_new = term + '_id_new'
162
163     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
164     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
165
166     print("loading matrix")        
167
168     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
169
170     print(f'computing similarities on mat. mat.shape:{mat.shape}')
171     print(f"size of mat is:{mat.data.nbytes}",flush=True)
172     sims = simfunc(mat)
173     del mat
174
175     if hasattr(sims,'__next__'):
176         for simmat, name in sims:
177             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
178     else:
179         proc_sims(sims, outfile)
180
181 def write_weekly_similarities(path, sims, week, names):
182     sims['week'] = week
183     p = pathlib.Path(path)
184     if not p.is_dir():
185         p.mkdir(exist_ok=True,parents=True)
186         
187     # reformat as a pairwise list
188     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
189     sims.to_parquet(p / week.isoformat())
190
191 def column_overlaps(mat):
192     non_zeros = (mat != 0).astype('double')
193     
194     intersection = non_zeros.T @ non_zeros
195     card1 = non_zeros.sum(axis=0)
196     den = np.add.outer(card1,card1) - intersection
197
198     return intersection / den
199     
200 def test_lsi_sims():
201     term = "term"
202     term_id = term + '_id'
203     term_id_new = term + '_id_new'
204
205     t1 = time.perf_counter()
206     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
207                                              term_colname='term',
208                                              min_df=2000,
209                                              topN=10000
210                                              )
211     t2 = time.perf_counter()
212     print(f"first load took:{t2 - t1}s")
213
214     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
215                                              term_colname='term',
216                                              min_df=2000,
217                                              topN=10000
218                                              )
219     t3=time.perf_counter()
220
221     print(f"second load took:{t3 - t2}s")
222
223     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
224     sims = list(lsi_column_similarities(mat, [10,50]))
225     sims_og = sims
226     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
227
228 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
229 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
230 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
231 # this function takes the svd and then the column similarities of it
232 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
233     # first compute the lsi of the matrix
234     # then take the column similarities
235
236     if type(n_components) is int:
237         n_components = [n_components]
238
239     n_components = sorted(n_components,reverse=True)
240     
241     svd_components = n_components[0]
242     
243     if lsi_model_load is not None and Path(lsi_model_load).exists():
244         print("loading LSI")
245         mod = pickle.load(open(lsi_model_load ,'rb'))
246         lsi_model_save = lsi_model_load
247
248     else:
249         print("running LSI",flush=True)
250         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
251         mod = svd.fit(tfidfmat.T)
252
253     if lsi_model_save is not None:
254         Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
255         pickle.dump(mod, open(lsi_model_save,'wb'))
256
257     print(n_components, flush=True)
258     lsimat = mod.transform(tfidfmat.T)
259     for n_dims in n_components:
260         print("computing similarities", flush=True)
261         sims = column_similarities(lsimat[:,np.arange(n_dims)])
262         yield (sims, n_dims)
263
264     
265
266 def column_similarities(mat):
267     return 1 - pairwise_distances(mat,metric='cosine')
268
269
270 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
271     term = term_colname
272     term_id = term + '_id'
273
274     # aggregate counts by week. now subreddit-term is distinct
275     df = df.filter(df.subreddit.isin(include_subs))
276     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
277
278     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
279     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
280     df = df.join(max_subreddit_terms, on=['subreddit','week'])
281     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
282
283     # group by term. term is unique
284     idf = df.groupby([term,'week']).count()
285
286     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
287
288     idf = idf.join(N_docs, on=['week'])
289
290     # add a little smoothing to the idf
291     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
292
293     # collect the dictionary to make a pydict of terms to indexes
294     terms = idf.select([term]).distinct() # terms are distinct
295
296     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
297
298     # make subreddit ids
299     subreddits = df.select(['subreddit']).distinct()
300     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
301
302     df = df.join(subreddits,on=['subreddit'])
303
304     # map terms to indexes in the tfs and the idfs
305     df = df.join(terms,on=[term]) # subreddit-term-id is unique
306
307     idf = idf.join(terms,on=[term])
308
309     # join on subreddit/term to create tf/dfs indexed by term
310     df = df.join(idf, on=[term_id, term,'week'])
311
312     # agg terms by subreddit to make sparse tf/df vectors
313     
314     if tf_family == tf_weight.MaxTF:
315         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
316     else: # tf_fam = tf_weight.Norm05
317         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
318
319     df = df.repartition('week')
320     dfwriter = df.write.partitionBy("week")
321     return dfwriter
322
323 def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
324     term = term_colname
325     term_id = term + '_id'
326
327     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
328     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
329
330     df = df.join(max_subreddit_terms, on='subreddit')
331
332     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
333
334     # group by term. term is unique
335     idf = df.groupby([term]).count()
336     N_docs = df.select('subreddit').distinct().count()
337     # add a little smoothing to the idf
338     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
339
340     # collect the dictionary to make a pydict of terms to indexes
341     terms = idf
342     if min_df is not None:
343         terms = terms.filter(f.col('count')>=min_df)
344     if max_df is not None:
345         terms = terms.filter(f.col('count')<=max_df)
346     
347     terms = terms.select(term).distinct() # terms are distinct
348     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
349
350     # make subreddit ids
351     subreddits = df.select(['subreddit']).distinct()
352     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
353
354     df = df.join(subreddits,on='subreddit')
355
356     # map terms to indexes in the tfs and the idfs
357     df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique
358
359     idf = idf.join(terms,on=term,how='inner')
360
361     # join on subreddit/term to create tf/dfs indexed by term
362     df = df.join(idf, on=[term_id, term],how='inner')
363
364     # agg terms by subreddit to make sparse tf/df vectors
365     if tf_family == tf_weight.MaxTF:
366         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
367     else: # tf_fam = tf_weight.Norm05
368         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
369
370     return df
371     
372
373 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None):
374     term = term_colname
375     term_id = term + '_id'
376     # aggregate counts by week. now subreddit-term is distinct
377     df = df.filter(df.subreddit.isin(include_subs))
378     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
379
380     df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df)
381     df = df.repartition('subreddit')
382     dfwriter = df.write
383     return dfwriter
384
385 def select_topN_subreddits(topN, path="../../data/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
386     rankdf = pd.read_csv(path)
387     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
388     return included_subreddits
389
390
391 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
392                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
393     spark = SparkSession.builder.getOrCreate()
394     df = spark.read.parquet(inpath)
395     df = df.repartition(400,'subreddit')
396     df.write.parquet(outpath,mode='overwrite')
397
398     
399 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
400                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
401     spark = SparkSession.builder.getOrCreate()
402     df = spark.read.parquet(inpath)
403     df = df.repartition(400,'subreddit','week')
404     dfwriter = df.write.partitionBy("week")
405     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?