]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
update pushshift dumps.
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18 import pickle
19
20 class tf_weight(Enum):
21     MaxTF = 1
22     Norm05 = 2
23
24 # infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
25 # cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
26
27 # subreddits missing after this step don't have any terms that have a high enough idf
28 # try rewriting without merges
29
30 # does reindex_tfidf, but without reindexing.
31 def reindex_tfidf(*args, **kwargs):
32     df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
33
34     print("assigning names")
35     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
36     batches = subreddit_names.to_batches()
37     
38     with Pool(cpu_count()) as pool:
39         chunks = pool.imap_unordered(pull_names,batches) 
40         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
41         subreddit_names = subreddit_names.set_index("subreddit_id")
42
43     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
44     new_ids = new_ids.set_index('subreddit_id')
45     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
46     subreddit_names = subreddit_names.drop("subreddit_id",1)
47     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
48     return(df, subreddit_names)
49
50 def pull_tfidf(*args, **kwargs):
51     df, _, _ =  _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
52     return df
53
54 def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
55     print(f"loading tfidf {infile}", flush=True)
56     if week is not None:
57         tfidf_ds = ds.dataset(infile, partitioning='hive')
58     else: 
59         tfidf_ds = ds.dataset(infile)
60
61     if included_subreddits is None:
62         included_subreddits = select_topN_subreddits(topN)
63     else:
64         included_subreddits = set(map(str.strip,open(included_subreddits)))
65
66     ds_filter = ds.field("subreddit").isin(included_subreddits)
67
68     if min_df is not None:
69         ds_filter &= ds.field("count") >= min_df
70
71     if max_df is not None:
72         ds_filter &= ds.field("count") <= max_df
73
74     if week is not None:
75         ds_filter &= ds.field("week") == week
76
77     if from_date is not None:
78         ds_filter &= ds.field("week") >= from_date
79
80     if to_date is not None:
81         ds_filter &= ds.field("week") <= to_date
82
83     term = term_colname
84     term_id = term + '_id'
85     term_id_new = term + '_id_new'
86     
87     projection = {
88         'subreddit_id':ds.field('subreddit_id'),
89         term_id:ds.field(term_id),
90         'relative_tf':ds.field("relative_tf").cast('float32')
91         }
92
93     if not rescale_idf:
94         projection = {
95             'subreddit_id':ds.field('subreddit_id'),
96             term_id:ds.field(term_id),
97             'relative_tf':ds.field('relative_tf').cast('float32'),
98             'tf_idf':ds.field('tf_idf').cast('float32')}
99
100         print(projection)
101
102     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
103
104     df = df.to_pandas(split_blocks=True,self_destruct=True)
105     print("assigning indexes",flush=True)
106     if reindex:
107         df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
108     else:
109         df['subreddit_id_new'] = df['subreddit_id']
110
111     if reindex:
112         grouped = df.groupby(term_id)
113         df[term_id_new] = grouped.ngroup()
114     else:
115         df[term_id_new] = df[term_id]
116
117     if rescale_idf:
118         print("computing idf", flush=True)
119         df['new_count'] = grouped[term_id].transform('count')
120         N_docs = df.subreddit_id_new.max() + 1
121         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
122         if tf_family == tf_weight.MaxTF:
123             df["tf_idf"] = df.relative_tf * df.idf
124         else: # tf_fam = tf_weight.Norm05
125             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
126
127     return (df, tfidf_ds, ds_filter)
128
129     with Pool(cpu_count()) as pool:
130         chunks = pool.imap_unordered(pull_names,batches) 
131         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
132
133     subreddit_names = subreddit_names.set_index("subreddit_id")
134     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
135     new_ids = new_ids.set_index('subreddit_id')
136     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
137     subreddit_names = subreddit_names.drop("subreddit_id",1)
138     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
139     return(df, subreddit_names)
140
141 def pull_names(batch):
142     return(batch.to_pandas().drop_duplicates())
143
144 def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
145     '''
146     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
147     '''
148
149     def proc_sims(sims, outfile):
150         if issparse(sims):
151             sims = sims.todense()
152
153         print(f"shape of sims:{sims.shape}")
154         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
155         sims = pd.DataFrame(sims)
156         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
157         sims['_subreddit'] = subreddit_names.subreddit.values
158
159         p = Path(outfile)
160
161         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
162         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
163         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
164         p.parent.mkdir(exist_ok=True, parents=True)
165
166         sims.to_feather(outfile)
167
168     term = term_colname
169     term_id = term + '_id'
170     term_id_new = term + '_id_new'
171
172     entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
173     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
174
175     print("loading matrix")        
176
177     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
178
179     print(f'computing similarities on mat. mat.shape:{mat.shape}')
180     print(f"size of mat is:{mat.data.nbytes}",flush=True)
181     sims = simfunc(mat)
182     del mat
183
184     if hasattr(sims,'__next__'):
185         for simmat, name in sims:
186             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
187     else:
188         proc_sims(sims, outfile)
189
190 def write_weekly_similarities(path, sims, week, names):
191     sims['week'] = week
192     p = pathlib.Path(path)
193     if not p.is_dir():
194         p.mkdir(exist_ok=True,parents=True)
195         
196     # reformat as a pairwise list
197     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
198     sims.to_parquet(p / week.isoformat())
199
200 def column_overlaps(mat):
201     non_zeros = (mat != 0).astype('double')
202     
203     intersection = non_zeros.T @ non_zeros
204     card1 = non_zeros.sum(axis=0)
205     den = np.add.outer(card1,card1) - intersection
206
207     return intersection / den
208     
209 def test_lsi_sims():
210     term = "term"
211     term_id = term + '_id'
212     term_id_new = term + '_id_new'
213
214     t1 = time.perf_counter()
215     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
216                                              term_colname='term',
217                                              min_df=2000,
218                                              topN=10000
219                                              )
220     t2 = time.perf_counter()
221     print(f"first load took:{t2 - t1}s")
222
223     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
224                                              term_colname='term',
225                                              min_df=2000,
226                                              topN=10000
227                                              )
228     t3=time.perf_counter()
229
230     print(f"second load took:{t3 - t2}s")
231
232     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
233     sims = list(lsi_column_similarities(mat, [10,50]))
234     sims_og = sims
235     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
236
237 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
238 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
239 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
240 # this function takes the svd and then the column similarities of it
241 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
242     # first compute the lsi of the matrix
243     # then take the column similarities
244
245     if type(n_components) is int:
246         n_components = [n_components]
247
248     n_components = sorted(n_components,reverse=True)
249     
250     svd_components = n_components[0]
251     
252     if lsi_model_load is not None and Path(lsi_model_load).exists():
253         print("loading LSI")
254         mod = pickle.load(open(lsi_model_load ,'rb'))
255         lsi_model_save = lsi_model_load
256
257     else:
258         print("running LSI",flush=True)
259
260         svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
261         mod = svd.fit(tfidfmat.T)
262
263     lsimat = mod.transform(tfidfmat.T)
264     if lsi_model_save is not None:
265         pickle.dump(mod, open(lsi_model_save,'wb'))
266
267     sims_list = []
268     for n_dims in n_components:
269         sims = column_similarities(lsimat[:,np.arange(n_dims)])
270         if len(n_components) > 1:
271             yield (sims, n_dims)
272         else:
273             return sims
274     
275
276 def column_similarities(mat):
277     return 1 - pairwise_distances(mat,metric='cosine')
278
279
280 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
281     term = term_colname
282     term_id = term + '_id'
283
284     # aggregate counts by week. now subreddit-term is distinct
285     df = df.filter(df.subreddit.isin(include_subs))
286     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
287
288     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
289     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
290     df = df.join(max_subreddit_terms, on=['subreddit','week'])
291     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
292
293     # group by term. term is unique
294     idf = df.groupby([term,'week']).count()
295
296     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
297
298     idf = idf.join(N_docs, on=['week'])
299
300     # add a little smoothing to the idf
301     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
302
303     # collect the dictionary to make a pydict of terms to indexes
304     terms = idf.select([term]).distinct() # terms are distinct
305
306     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
307
308     # make subreddit ids
309     subreddits = df.select(['subreddit']).distinct()
310     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
311
312     df = df.join(subreddits,on=['subreddit'])
313
314     # map terms to indexes in the tfs and the idfs
315     df = df.join(terms,on=[term]) # subreddit-term-id is unique
316
317     idf = idf.join(terms,on=[term])
318
319     # join on subreddit/term to create tf/dfs indexed by term
320     df = df.join(idf, on=[term_id, term,'week'])
321
322     # agg terms by subreddit to make sparse tf/df vectors
323     
324     if tf_family == tf_weight.MaxTF:
325         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
326     else: # tf_fam = tf_weight.Norm05
327         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
328
329     df = df.repartition(400,'subreddit','week')
330     dfwriter = df.write.partitionBy("week")
331     return dfwriter
332
333 def _calc_tfidf(df, term_colname, tf_family):
334     term = term_colname
335     term_id = term + '_id'
336
337     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
338     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
339
340     df = df.join(max_subreddit_terms, on='subreddit')
341
342     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
343
344     # group by term. term is unique
345     idf = df.groupby([term]).count()
346     N_docs = df.select('subreddit').distinct().count()
347     # add a little smoothing to the idf
348     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
349
350     # collect the dictionary to make a pydict of terms to indexes
351     terms = idf.select(term).distinct() # terms are distinct
352     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
353
354     # make subreddit ids
355     subreddits = df.select(['subreddit']).distinct()
356     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
357
358     df = df.join(subreddits,on='subreddit')
359
360     # map terms to indexes in the tfs and the idfs
361     df = df.join(terms,on=term) # subreddit-term-id is unique
362
363     idf = idf.join(terms,on=term)
364
365     # join on subreddit/term to create tf/dfs indexed by term
366     df = df.join(idf, on=[term_id, term])
367
368     # agg terms by subreddit to make sparse tf/df vectors
369     if tf_family == tf_weight.MaxTF:
370         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
371     else: # tf_fam = tf_weight.Norm05
372         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
373
374     return df
375     
376
377 def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
378     term = term_colname
379     term_id = term + '_id'
380     # aggregate counts by week. now subreddit-term is distinct
381     df = df.filter(df.subreddit.isin(include_subs))
382     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
383
384     df = _calc_tfidf(df, term_colname, tf_family)
385     df = df.repartition('subreddit')
386     dfwriter = df.write
387     return dfwriter
388
389 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
390     rankdf = pd.read_csv(path)
391     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
392     return included_subreddits
393
394
395 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
396                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
397     spark = SparkSession.builder.getOrCreate()
398     df = spark.read.parquet(inpath)
399     df = df.repartition(400,'subreddit')
400     df.write.parquet(outpath,mode='overwrite')
401
402     
403 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
404                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
405     spark = SparkSession.builder.getOrCreate()
406     df = spark.read.parquet(inpath)
407     df = df.repartition(400,'subreddit','week')
408     dfwriter = df.write.partitionBy("week")
409     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?