]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
bug fix in affinity clustering
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from multiprocessing import cpu_count, Pool
6 from pyspark.mllib.linalg.distributed import CoordinateMatrix
7 from tempfile import TemporaryDirectory
8 import pyarrow
9 import pyarrow.dataset as ds
10 from sklearn.metrics import pairwise_distances
11 from scipy.sparse import csr_matrix, issparse
12 from sklearn.decomposition import TruncatedSVD
13 import pandas as pd
14 import numpy as np
15 import pathlib
16 from datetime import datetime
17 from pathlib import Path
18
19 class tf_weight(Enum):
20     MaxTF = 1
21     Norm05 = 2
22
23 infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
24 cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
25
26 def termauthor_tfidf(term_tfidf_callable, author_tfidf_callable):
27     
28
29 # subreddits missing after this step don't have any terms that have a high enough idf
30 # try rewriting without merges
31 def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
32     print("loading tfidf", flush=True)
33     tfidf_ds = ds.dataset(infile)
34
35     if included_subreddits is None:
36         included_subreddits = select_topN_subreddits(topN)
37     else:
38         included_subreddits = set(open(included_subreddits))
39
40     ds_filter = ds.field("subreddit").isin(included_subreddits)
41
42     if min_df is not None:
43         ds_filter &= ds.field("count") >= min_df
44
45     if max_df is not None:
46         ds_filter &= ds.field("count") <= max_df
47
48     if week is not None:
49         ds_filter &= ds.field("week") == week
50
51     if from_date is not None:
52         ds_filter &= ds.field("week") >= from_date
53
54     if to_date is not None:
55         ds_filter &= ds.field("week") <= to_date
56
57     term = term_colname
58     term_id = term + '_id'
59     term_id_new = term + '_id_new'
60     
61     projection = {
62         'subreddit_id':ds.field('subreddit_id'),
63         term_id:ds.field(term_id),
64         'relative_tf':ds.field("relative_tf").cast('float32')
65         }
66
67     if not rescale_idf:
68         projection = {
69             'subreddit_id':ds.field('subreddit_id'),
70             term_id:ds.field(term_id),
71             'relative_tf':ds.field('relative_tf').cast('float32'),
72             'tf_idf':ds.field('tf_idf').cast('float32')}
73
74     tfidf_ds = ds.dataset(infile)
75
76     df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
77
78     df = df.to_pandas(split_blocks=True,self_destruct=True)
79     print("assigning indexes",flush=True)
80     df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
81     grouped = df.groupby(term_id)
82     df[term_id_new] = grouped.ngroup()
83
84     if rescale_idf:
85         print("computing idf", flush=True)
86         df['new_count'] = grouped[term_id].transform('count')
87         N_docs = df.subreddit_id_new.max() + 1
88         df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
89         if tf_family == tf_weight.MaxTF:
90             df["tf_idf"] = df.relative_tf * df.idf
91         else: # tf_fam = tf_weight.Norm05
92             df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
93
94     print("assigning names")
95     subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
96     batches = subreddit_names.to_batches()
97
98     with Pool(cpu_count()) as pool:
99         chunks = pool.imap_unordered(pull_names,batches) 
100         subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
101
102     subreddit_names = subreddit_names.set_index("subreddit_id")
103     new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
104     new_ids = new_ids.set_index('subreddit_id')
105     subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
106     subreddit_names = subreddit_names.drop("subreddit_id",1)
107     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
108     return(df, subreddit_names)
109
110 def pull_names(batch):
111     return(batch.to_pandas().drop_duplicates())
112
113 def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
114     '''
115     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
116     '''
117
118     def proc_sims(sims, outfile):
119         if issparse(sims):
120             sims = sims.todense()
121
122         print(f"shape of sims:{sims.shape}")
123         print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
124         sims = pd.DataFrame(sims)
125         sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
126         sims['_subreddit'] = subreddit_names.subreddit.values
127
128         p = Path(outfile)
129
130         output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
131         output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
132         output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
133         outfile.parent.mkdir(exist_ok=True, parents=True)
134
135         sims.to_feather(outfile)
136
137     term = term_colname
138     term_id = term + '_id'
139     term_id_new = term + '_id_new'
140
141     entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
142     mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
143
144     print("loading matrix")        
145
146     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
147
148     print(f'computing similarities on mat. mat.shape:{mat.shape}')
149     print(f"size of mat is:{mat.data.nbytes}",flush=True)
150     sims = simfunc(mat)
151     del mat
152
153     if hasattr(sims,'__next__'):
154         for simmat, name in sims:
155             proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
156     else:
157         proc_sims(simmat, outfile)
158
159 def write_weekly_similarities(path, sims, week, names):
160     sims['week'] = week
161     p = pathlib.Path(path)
162     if not p.is_dir():
163         p.mkdir(exist_ok=True,parents=True)
164         
165     # reformat as a pairwise list
166     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
167     sims.to_parquet(p / week.isoformat())
168
169 def column_overlaps(mat):
170     non_zeros = (mat != 0).astype('double')
171     
172     intersection = non_zeros.T @ non_zeros
173     card1 = non_zeros.sum(axis=0)
174     den = np.add.outer(card1,card1) - intersection
175
176     return intersection / den
177     
178 def test_lsi_sims():
179     term = "term"
180     term_id = term + '_id'
181     term_id_new = term + '_id_new'
182
183     t1 = time.perf_counter()
184     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
185                                              term_colname='term',
186                                              min_df=2000,
187                                              topN=10000
188                                              )
189     t2 = time.perf_counter()
190     print(f"first load took:{t2 - t1}s")
191
192     entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
193                                              term_colname='term',
194                                              min_df=2000,
195                                              topN=10000
196                                              )
197     t3=time.perf_counter()
198
199     print(f"second load took:{t3 - t2}s")
200
201     mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
202     sims = list(lsi_column_similarities(mat, [10,50]))
203     sims_og = sims
204     sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
205
206 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
207 # if n_components is a list we'll return a list of similarities with different latent dimensionalities
208 # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
209 # this function takes the svd and then the column similarities of it
210 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
211     # first compute the lsi of the matrix
212     # then take the column similarities
213     print("running LSI",flush=True)
214
215     if type(n_components) is int:
216         n_components = [n_components]
217
218     n_components = sorted(n_components,reverse=True)
219     
220     svd_components = n_components[0]
221     svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
222     mod = svd.fit(tfidfmat.T)
223     lsimat = mod.transform(tfidfmat.T)
224     for n_dims in n_components:
225         sims = column_similarities(lsimat[:,np.arange(n_dims)])
226         if len(n_components) > 1:
227             yield (sims, n_dims)
228         else:
229             return sims
230     
231
232 def column_similarities(mat):
233     return 1 - pairwise_distances(mat,metric='cosine')
234
235
236 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
237     term = term_colname
238     term_id = term + '_id'
239
240     # aggregate counts by week. now subreddit-term is distinct
241     df = df.filter(df.subreddit.isin(include_subs))
242     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
243
244     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
245     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
246     df = df.join(max_subreddit_terms, on=['subreddit','week'])
247     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
248
249     # group by term. term is unique
250     idf = df.groupby([term,'week']).count()
251
252     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
253
254     idf = idf.join(N_docs, on=['week'])
255
256     # add a little smoothing to the idf
257     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
258
259     # collect the dictionary to make a pydict of terms to indexes
260     terms = idf.select([term,'week']).distinct() # terms are distinct
261
262     terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
263
264     # make subreddit ids
265     subreddits = df.select(['subreddit','week']).distinct()
266     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
267
268     df = df.join(subreddits,on=['subreddit','week'])
269
270     # map terms to indexes in the tfs and the idfs
271     df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
272
273     idf = idf.join(terms,on=[term,'week'])
274
275     # join on subreddit/term to create tf/dfs indexed by term
276     df = df.join(idf, on=[term_id, term,'week'])
277
278     # agg terms by subreddit to make sparse tf/df vectors
279     
280     if tf_family == tf_weight.MaxTF:
281         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
282     else: # tf_fam = tf_weight.Norm05
283         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
284
285     df = df.repartition(400,'subreddit','week')
286     dfwriter = df.write.partitionBy("week").sortBy("subreddit")
287     return dfwriter
288
289 def _calc_tfidf(df, term_colname, tf_family):
290     term = term_colname
291     term_id = term + '_id'
292
293     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
294     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
295
296     df = df.join(max_subreddit_terms, on='subreddit')
297
298     df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
299
300     # group by term. term is unique
301     idf = df.groupby([term]).count()
302     N_docs = df.select('subreddit').distinct().count()
303     # add a little smoothing to the idf
304     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
305
306     # collect the dictionary to make a pydict of terms to indexes
307     terms = idf.select(term).distinct() # terms are distinct
308     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
309
310     # make subreddit ids
311     subreddits = df.select(['subreddit']).distinct()
312     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
313
314     df = df.join(subreddits,on='subreddit')
315
316     # map terms to indexes in the tfs and the idfs
317     df = df.join(terms,on=term) # subreddit-term-id is unique
318
319     idf = idf.join(terms,on=term)
320
321     # join on subreddit/term to create tf/dfs indexed by term
322     df = df.join(idf, on=[term_id, term])
323
324     # agg terms by subreddit to make sparse tf/df vectors
325     if tf_family == tf_weight.MaxTF:
326         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
327     else: # tf_fam = tf_weight.Norm05
328         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
329
330     return df
331     
332
333 def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
334     term = term_colname
335     term_id = term + '_id'
336     # aggregate counts by week. now subreddit-term is distinct
337     df = df.filter(df.subreddit.isin(include_subs))
338     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
339
340     df = _calc_tfidf(df, term_colname, tf_family)
341     df = df.repartition('subreddit')
342     dfwriter = df.write.sortBy("subreddit","tf")
343     return dfwriter
344
345 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
346     rankdf = pd.read_csv(path)
347     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
348     return included_subreddits
349
350
351 def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
352                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
353     spark = SparkSession.builder.getOrCreate()
354     df = spark.read.parquet(inpath)
355     df = df.repartition(400,'subreddit')
356     df.write.parquet(outpath,mode='overwrite')
357
358     
359 def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
360                       outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
361     spark = SparkSession.builder.getOrCreate()
362     df = spark.read.parquet(inpath)
363     df = df.repartition(400,'subreddit','week')
364     dfwriter = df.write.partitionBy("week")
365     dfwriter.parquet(outpath,mode='overwrite')

Community Data Science Collective || Want to submit a patch?