]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",axis=1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True)
56
57     if week is not None:
58         tfidf_ds = ds.dataset(infile, partitioning='hive')
59     else: 
60         tfidf_ds = ds.dataset(infile)
61
62     if included_subreddits is None:
63         included_subreddits = select_topN_subreddits(topN)
64     else:
65         included_subreddits = set(map(str.strip,open(included_subreddits)))
66
67     ds_filter = ds.field("subreddit").isin(included_subreddits)
68
69     if min_df is not None:
70         ds_filter &= ds.field("count") >= min_df
71
72     if max_df is not None:
73         ds_filter &= ds.field("count") <= max_df
74
75     if week is not None:
76         ds_filter &= ds.field("week") == week
77
78     if from_date is not None:
79         ds_filter &= ds.field("week") >= from_date
80
81     if to_date is not None:
82         ds_filter &= ds.field("week") <= to_date
83
84     term = term_colname
85     term_id = term + '_id'
86     term_id_new = term + '_id_new'
87     
88     projection = {
89         'subreddit_id':ds.field('subreddit_id'),
90         term_id:ds.field(term_id),
91         'relative_tf':ds.field("relative_tf").cast('float32')
92         }
93
94     if not rescale_idf:
95         projection = {
96             'subreddit_id':ds.field('subreddit_id'),
97             term_id:ds.field(term_id),
98             'tf_idf':ds.field('tf_idf').cast('float32')}
99
100     print(projection, flush=True)
101     print(ds_filter, flush=True)
102     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
103
104     df = df.to_pandas(split_blocks=True,self_destruct=True)
105
106     if reindex:
107         print("assigning indexes",flush=True)
108         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1
109     else:
110         df['subreddit_id_new'] = df['subreddit_id']
111
112     if reindex:
113         grouped = df.groupby(term_id)
114         df[term_id_new] = grouped.ngroup() + 1 
115     else:
116         df[term_id_new] = df[term_id]
117
118     if rescale_idf:
119         print("computing idf", flush=True)
120         df['new_count'] = grouped[term_id].transform('count')
121         N_docs = df.subreddit_id_new.max() + 1
122         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
123         if tf_family == tf_weight.MaxTF:
124             df["tf_idf"] = df.relative_tf * df.idf
125         else: # tf_fam = tf_weight.Norm05
126             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
127
128     return (df, tfidf_ds, ds_filter)
129
130     # with Pool(cpu_count()) as pool:
131     #     chunks = pool.imap_unordered(pull_names,batches) 
132     #     subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
133
134     # subreddit_names = subreddit_names.set_index("subreddit_id")
135     # new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
136     # new_ids = new_ids.set_index('subreddit_id')
137     # subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
138     # subreddit_names = subreddit_names.drop("subreddit_id",1)
139     # subreddit_names = subreddit_names.sort_values("subreddit_id_new")
140     # return(df, subreddit_names)
141
142 def pull_names(batch):
143     return(batch.to_pandas().drop_duplicates())
144
145 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
146     '''
147     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
148     '''
149
150     def proc_sims(sims, outfile):
151         if issparse(sims):
152             sims = sims.todense()
153
154         print(f"shape of sims:{sims.shape}")
155         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
156         sims = pd.DataFrame(sims)
157         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
158         sims['_subreddit'] = subreddit_names.subreddit.values
159
160         p = Path(outfile)
161
162         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
163         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
164         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
165         p.parent.mkdir(exist_ok=True, parents=True)
166
167         sims.to_feather(outfile)
168
169     term = term_colname
170     term_id = term + '_id'
171     term_id_new = term + '_id_new'
172
173     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
174     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
175
176     print("loading matrix")        
177
178     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
179
180     print(f'computing similarities on mat. mat.shape:{mat.shape}')
181     print(f"size of mat is:{mat.data.nbytes}",flush=True)
182     sims = simfunc(mat)
183     del mat
184
185     if hasattr(sims,'__next__'):
186         for simmat, name in sims:
187             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
188     else:
189         proc_sims(sims, outfile)
190
191 def write_weekly_similarities(path, sims, week, names):
192     sims['week'] = week
193     p = pathlib.Path(path)
194     if not p.is_dir():
195         p.mkdir(exist_ok=True,parents=True)
196         
197     # reformat as a pairwise list
198     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
199     sims.to_parquet(p / week.isoformat())
200
201 def column_overlaps(mat):
202     non_zeros = (mat != 0).astype('double')
203     
204     intersection = non_zeros.T @ non_zeros
205     card1 = non_zeros.sum(axis=0)
206     den = np.add.outer(card1,card1) - intersection
207
208     return intersection / den
209     
210 def test_lsi_sims():
211     term = "term"
212     term_id = term + '_id'
213     term_id_new = term + '_id_new'
214
215     t1 = time.perf_counter()
216     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
217                                              term_colname='term',
218                                              min_df=2000,
219                                              topN=10000
220                                              )
221     t2 = time.perf_counter()
222     print(f"first load took:{t2 - t1}s")
223
224     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
225                                              term_colname='term',
226                                              min_df=2000,
227                                              topN=10000
228                                              )
229     t3=time.perf_counter()
230
231     print(f"second load took:{t3 - t2}s")
232
233     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
234     sims = list(lsi_column_similarities(mat, [10,50]))
235     sims_og = sims
236     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
237
238 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
239 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
240 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
241 # this function takes the svd and then the column similarities of it
242 # lsi_model_load = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
243 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model=None):
244     # first compute the lsi of the matrix
245     # then take the column similarities
246
247     if type(n_components) is int:
248         n_components = [n_components]
249
250     n_components = sorted(n_components,reverse=True)
251     
252     svd_components = n_components[0]
253     
254     if lsi_model is None:
255         print("running LSI",flush=True)
256         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
257         mod = svd.fit(tfidfmat.T)
258     else:
259         mod = lsi_model
260
261     lsimat = mod.transform(tfidfmat.T)
262     if lsi_model_save is not None:
263         Path(lsi_model_save).parent.mkdir(exist_ok=True,parents=True)
264         pickle.dump(mod, open(lsi_model_save,'wb'))
265
266     print(n_components)
267     for n_dims in n_components:
268         print("computing similarities")
269         sims = column_similarities(lsimat[:,np.arange(n_dims)])
270         yield (sims, n_dims)
271
272     
273
274 def column_similarities(mat):
275     return 1 - pairwise_distances(mat,metric='cosine')
276
277
278 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
279     term = term_colname
280     term_id = term + '_id'
281
282     # aggregate counts by week. now subreddit-term is distinct
283     df = df.filter(df.subreddit.isin(include_subs))
284     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
285
286     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
287     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
288     df = df.join(max_subreddit_terms, on=['subreddit','week'])
289     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
290
291     # group by term. term is unique
292     idf = df.groupby([term,'week']).count()
293
294     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
295
296     idf = idf.join(N_docs, on=['week'])
297
298     # add a little smoothing to the idf
299     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
300
301     # collect the dictionary to make a pydict of terms to indexes
302     terms = idf.select([term]).distinct() # terms are distinct
303
304     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
305
306     # make subreddit ids
307     subreddits = df.select(['subreddit']).distinct()
308     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
309
310     df = df.join(subreddits,on=['subreddit'])
311
312     # map terms to indexes in the tfs and the idfs
313     df = df.join(terms,on=[term]) # subreddit-term-id is unique
314
315     idf = idf.join(terms,on=[term])
316
317     # join on subreddit/term to create tf/dfs indexed by term
318     df = df.join(idf, on=[term_id, term,'week'])
319
320     # agg terms by subreddit to make sparse tf/df vectors
321     
322     if tf_family == tf_weight.MaxTF:
323         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
324     else: # tf_fam = tf_weight.Norm05
325         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
326
327     df = df.repartition('week')
328     dfwriter = df.write.partitionBy("week")
329     return dfwriter
330
331 def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
332     term = term_colname
333     term_id = term + '_id'
334
335     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
336     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
337
338     df = df.join(max_subreddit_terms, on='subreddit')
339
340     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
341
342     # group by term. term is unique
343     idf = df.groupby([term]).count()
344     N_docs = df.select('subreddit').distinct().count()
345     # add a little smoothing to the idf
346     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
347
348     # collect the dictionary to make a pydict of terms to indexes
349     terms = idf
350     if min_df is not None:
351         terms = terms.filter(f.col('count')>=min_df)
352     if max_df is not None:
353         terms = terms.filter(f.col('count')<=max_df)
354     
355     terms = terms.select(term).distinct() # terms are distinct
356     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
357
358     # make subreddit ids
359     subreddits = df.select(['subreddit']).distinct()
360     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
361
362     df = df.join(subreddits,on='subreddit')
363
364     # map terms to indexes in the tfs and the idfs
365     df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique
366
367     idf = idf.join(terms,on=term,how='inner')
368
369     # join on subreddit/term to create tf/dfs indexed by term
370     df = df.join(idf, on=[term_id, term],how='inner')
371
372     # agg terms by subreddit to make sparse tf/df vectors
373     if tf_family == tf_weight.MaxTF:
374         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
375     else: # tf_fam = tf_weight.Norm05
376         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
377
378     return df
379     
380
381 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None):
382     term = term_colname
383     term_id = term + '_id'
384
385     df = df.filter(df.subreddit.isin(include_subs))
386     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
387
388     df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df)
389     df = df.repartition('subreddit')
390     dfwriter = df.write
391     return dfwriter
392
393 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
394     rankdf = pd.read_csv(path)
395     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
396     return included_subreddits
397
398
399 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
400                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
401     spark = SparkSession.builder.getOrCreate()
402     df = spark.read.parquet(inpath)
403     df = df.repartition(400,'subreddit')
404     df.write.parquet(outpath,mode='overwrite')
405
406     
407 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
408                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
409     spark = SparkSession.builder.getOrCreate()
410     df = spark.read.parquet(inpath)
411     df = df.repartition(400,'subreddit','week')
412     dfwriter = df.write.partitionBy("week")
413     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?