]> code.communitydata.science - cdsc_reddit.git/blob - similarities/similarities_helper.py
fd532a9db1888ddb586087fd557205d3ab831650
[cdsc_reddit.git] / similarities / similarities_helper.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql import Window
3 from pyspark.sql import functions as f
4 from enum import Enum
5 from pyspark.mllib.linalg.distributed import CoordinateMatrix
6 from tempfile import TemporaryDirectory
7 import pyarrow
8 import pyarrow.dataset as ds
9 from sklearn.metrics import pairwise_distances
10 from scipy.sparse import csr_matrix, issparse
11 from sklearn.decomposition import TruncatedSVD
12 import pandas as pd
13 import numpy as np
14 import pathlib
15 from datetime import datetime
16 from pathlib import Path
17
18 class tf_weight(Enum):
19     MaxTF = 1
20     Norm05 = 2
21
22 infile = "/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet"
23 cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
24
25 def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
26     term = term_colname
27     term_id = term + '_id'
28     term_id_new = term + '_id_new'
29
30     spark = SparkSession.builder.getOrCreate()
31     conf = spark.sparkContext.getConf()
32     print(exclude_phrases)
33     tfidf_weekly = spark.read.parquet(infile)
34
35     # create the time interval
36     if from_date is not None:
37         if type(from_date) is str:
38             from_date = datetime.fromisoformat(from_date)
39
40         tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
41         
42     if to_date is not None:
43         if type(to_date) is str:
44             to_date = datetime.fromisoformat(to_date)
45         tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
46
47     tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
48     tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
49     tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
50     tfidf = spark.read_parquet(tempdir.name)
51     subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
52     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
53     subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
54     return(tempdir, subreddit_names)
55
56 # subreddits missing after this step don't have any terms that have a high enough idf
57 def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500,  tf_family=tf_weight.MaxTF):
58     spark = SparkSession.builder.getOrCreate()
59     conf = spark.sparkContext.getConf()
60     print(exclude_phrases)
61
62     tfidf_ds = ds.dataset(infile)
63
64     if included_subreddits is None:
65         included_subreddits = select_topN_subreddits(topN)
66     else:
67         included_subreddits = set(open(included_subreddits))
68
69     ds_filter = ds.field("subreddit").isin(included_subreddits)
70
71     if min_df is not None:
72         ds_filter &= ds.field("count") >= min_df
73
74     if max_df is not None:
75         ds_filter &= ds.field("count") <= max_df
76
77     term = term_colname
78     term_id = term + '_id'
79     term_id_new = term + '_id_new'
80
81     df = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id',term_id,'relative_tf']).to_pandas()
82
83     sub_ids = df.subreddit_id.drop_duplicates()
84     new_sub_ids = pd.DataFrame({'subreddit_id':old,'subreddit_id_new':new} for new, old in enumerate(sorted(sub_ids)))
85     df = df.merge(new_sub_ids,on='subreddit_id',how='inner',validate='many_to_one')
86
87     new_count = df.groupby(term_id)[term_id].aggregate(new_count='count').reset_index()
88     df = df.merge(new_count,on=term_id,how='inner',validate='many_to_one')
89
90     term_ids = df[term_id].drop_duplicates()
91     new_term_ids = pd.DataFrame({term_id:old,term_id_new:new} for new, old in enumerate(sorted(term_ids)))
92
93     df = df.merge(new_term_ids, on=term_id, validate='many_to_one')
94     N_docs = sub_ids.shape[0]
95
96     df['idf'] = np.log(N_docs/(1+df.new_count)) + 1
97
98     # agg terms by subreddit to make sparse tf/df vectors
99     if tf_family == tf_weight.MaxTF:
100         df["tf_idf"] = df.relative_tf * df.idf
101     else: # tf_fam = tf_weight.Norm05
102         df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
103
104     subreddit_names = df.loc[:,['subreddit','subreddit_id_new']].drop_duplicates()
105     subreddit_names = subreddit_names.sort_values("subreddit_id_new")
106     return(df, subreddit_names)
107
108
109 def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
110     '''
111     tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
112     '''
113     if from_date is not None or to_date is not None:
114         tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
115         mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)        
116     else:
117         entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
118         mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
119
120     print("loading matrix")        
121
122     #    mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
123
124     print(f'computing similarities on mat. mat.shape:{mat.shape}')
125     print(f"size of mat is:{mat.data.nbytes}")
126     sims = simfunc(mat)
127     del mat
128
129     if issparse(sims):
130         sims = sims.todense()
131
132     print(f"shape of sims:{sims.shape}")
133     print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
134     sims = pd.DataFrame(sims)
135     sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
136     sims['_subreddit'] = subreddit_names.subreddit.values
137
138     p = Path(outfile)
139
140     output_feather =  Path(str(p).replace("".join(p.suffixes), ".feather"))
141     output_csv =  Path(str(p).replace("".join(p.suffixes), ".csv"))
142     output_parquet =  Path(str(p).replace("".join(p.suffixes), ".parquet"))
143
144     sims.to_feather(outfile)
145 #    tempdir.cleanup()
146
147 def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
148     term = term_colname
149     term_id = term + '_id'
150     term_id_new = term + '_id_new'
151
152     dataset = ds.dataset(path,format='parquet')
153     entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
154     return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
155
156 def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
157     term = term_colname
158     term_id = term + '_id'
159     term_id_new = term + '_id_new'
160     dataset = ds.dataset(path,format='parquet')
161     print(f"tfidf_colname:{tfidf_colname}")
162     entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
163     return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
164     
165
166 def write_weekly_similarities(path, sims, week, names):
167     sims['week'] = week
168     p = pathlib.Path(path)
169     if not p.is_dir():
170         p.mkdir(exist_ok=True,parents=True)
171         
172     # reformat as a pairwise list
173     sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
174     sims.to_parquet(p / week.isoformat())
175
176 def column_overlaps(mat):
177     non_zeros = (mat != 0).astype('double')
178     
179     intersection = non_zeros.T @ non_zeros
180     card1 = non_zeros.sum(axis=0)
181     den = np.add.outer(card1,card1) - intersection
182
183     return intersection / den
184     
185 # n_components is the latent dimensionality. sklearn recommends 100. More might be better
186 # if algorithm is 'random' instead of 'arpack' then n_iter gives the number of iterations.
187 # this function takes the svd and then the column similarities of it
188 def lsi_column_similarities(tfidfmat,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
189     # first compute the lsi of the matrix
190     # then take the column similarities
191     svd = TruncatedSVD(n_components=n_components,random_state=random_state,algorithm='arpack')
192     mod = svd.fit(tfidfmat.T)
193     lsimat = mod.transform(tfidfmat.T)
194     sims = column_similarities(lsimat)
195     return sims
196     
197
198 def column_similarities(mat):
199     return 1 - pairwise_distances(mat,metric='cosine')
200     # if issparse(mat):
201     #     norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
202     #     mat = mat.multiply(1/norm)
203     # else:
204     #     norm = np.matrix(np.power(np.power(mat,2).sum(axis=0),0.5,dtype=np.float32))
205     #     mat = np.multiply(mat,1/norm)
206     # sims = mat.T @ mat
207     # return(sims)
208
209
210 def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
211     term = term_colname
212     term_id = term + '_id'
213     term_id_new = term + '_id_new'
214
215     if min_df is None:
216         min_df = 0.1 * len(included_subreddits)
217         tfidf = tfidf.filter(f.col('count') >= min_df)
218     if max_df is not None:
219         tfidf = tfidf.filter(f.col('count') <= max_df)
220
221     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
222
223     # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
224     sub_ids = tfidf.select(['subreddit_id','week']).distinct()
225     sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
226     tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
227
228     # only use terms in at least min_df included subreddits in a given week
229     new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
230     tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
231
232     # reset the term ids
233     term_ids = tfidf.select([term_id,'week']).distinct()
234     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
235     tfidf = tfidf.join(term_ids,[term_id,'week'])
236
237     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
238     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
239
240     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
241
242     tfidf = tfidf.repartition('week')
243
244     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
245     return(tempdir)
246     
247
248 def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
249     term = term_colname
250     term_id = term + '_id'
251     term_id_new = term + '_id_new'
252
253     if min_df is None:
254         min_df = 0.1 * len(included_subreddits)
255
256     tfidf = tfidf.filter(f.col('count') >= min_df)
257     if max_df is not None:
258         tfidf = tfidf.filter(f.col('count') <= max_df)
259
260     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
261
262     # reset the subreddit ids
263     sub_ids = tfidf.select('subreddit_id').distinct()
264     sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
265     tfidf = tfidf.join(sub_ids,'subreddit_id')
266
267     # only use terms in at least min_df included subreddits
268     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
269     tfidf = tfidf.join(new_count,term_id,how='inner')
270     
271     # reset the term ids
272     term_ids = tfidf.select([term_id]).distinct()
273     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
274     tfidf = tfidf.join(term_ids,term_id)
275
276     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
277     tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
278     
279     tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
280     
281     tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
282     return tempdir
283
284
285 # try computing cosine similarities using spark
286 def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
287     term = term_colname
288     term_id = term + '_id'
289     term_id_new = term + '_id_new'
290
291     if min_df is None:
292         min_df = 0.1 * len(included_subreddits)
293
294     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
295     tfidf = tfidf.cache()
296
297     # reset the subreddit ids
298     sub_ids = tfidf.select('subreddit_id').distinct()
299     sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
300     tfidf = tfidf.join(sub_ids,'subreddit_id')
301
302     # only use terms in at least min_df included subreddits
303     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
304     tfidf = tfidf.join(new_count,term_id,how='inner')
305     
306     # reset the term ids
307     term_ids = tfidf.select([term_id]).distinct()
308     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
309     tfidf = tfidf.join(term_ids,term_id)
310
311     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
312     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
313
314     # step 1 make an rdd of entires
315     # sorted by (dense) spark subreddit id
316     n_partitions = int(len(included_subreddits)*2 / 5)
317
318     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
319
320     # put like 10 subredis in each partition
321
322     # step 2 make it into a distributed.RowMatrix
323     coordMat = CoordinateMatrix(entries)
324
325     coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
326
327     # this needs to be an IndexedRowMatrix()
328     mat = coordMat.toRowMatrix()
329
330     #goal: build a matrix of subreddit columns and tf-idfs rows
331     sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
332
333     return (sim_dist, tfidf)
334
335
336 def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
337     term = term_colname
338     term_id = term + '_id'
339
340     # aggregate counts by week. now subreddit-term is distinct
341     df = df.filter(df.subreddit.isin(include_subs))
342     df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
343
344     max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
345     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
346     df = df.join(max_subreddit_terms, on=['subreddit','week'])
347     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
348
349     # group by term. term is unique
350     idf = df.groupby([term,'week']).count()
351
352     N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
353
354     idf = idf.join(N_docs, on=['week'])
355
356     # add a little smoothing to the idf
357     idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
358
359     # collect the dictionary to make a pydict of terms to indexes
360     terms = idf.select([term,'week']).distinct() # terms are distinct
361
362     terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
363
364     # make subreddit ids
365     subreddits = df.select(['subreddit','week']).distinct()
366     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
367
368     df = df.join(subreddits,on=['subreddit','week'])
369
370     # map terms to indexes in the tfs and the idfs
371     df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
372
373     idf = idf.join(terms,on=[term,'week'])
374
375     # join on subreddit/term to create tf/dfs indexed by term
376     df = df.join(idf, on=[term_id, term,'week'])
377
378     # agg terms by subreddit to make sparse tf/df vectors
379     
380     if tf_family == tf_weight.MaxTF:
381         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
382     else: # tf_fam = tf_weight.Norm05
383         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
384
385     return df
386
387 def _calc_tfidf(df, term_colname, tf_family):
388     term = term_colname
389     term_id = term + '_id'
390
391     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
392     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
393
394     df = df.join(max_subreddit_terms, on='subreddit')
395
396     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
397
398     # group by term. term is unique
399     idf = df.groupby([term]).count()
400     N_docs = df.select('subreddit').distinct().count()
401     # add a little smoothing to the idf
402     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
403
404     # collect the dictionary to make a pydict of terms to indexes
405     terms = idf.select(term).distinct() # terms are distinct
406     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
407
408     # make subreddit ids
409     subreddits = df.select(['subreddit']).distinct()
410     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
411
412     df = df.join(subreddits,on='subreddit')
413
414     # map terms to indexes in the tfs and the idfs
415     df = df.join(terms,on=term) # subreddit-term-id is unique
416
417     idf = idf.join(terms,on=term)
418
419     # join on subreddit/term to create tf/dfs indexed by term
420     df = df.join(idf, on=[term_id, term])
421
422     # agg terms by subreddit to make sparse tf/df vectors
423     if tf_family == tf_weight.MaxTF:
424         df = df.withColumn("tf_idf",  df.relative_tf * df.idf)
425     else: # tf_fam = tf_weight.Norm05
426         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf)
427
428     return df
429     
430
431 def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
432     term = term_colname
433     term_id = term + '_id'
434     # aggregate counts by week. now subreddit-term is distinct
435     df = df.filter(df.subreddit.isin(include_subs))
436     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
437
438     df = _calc_tfidf(df, term_colname, tf_family)
439
440     return df
441
442 def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
443     rankdf = pd.read_csv(path)
444     included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
445     return included_subreddits
446
447

Community Data Science Collective || Want to submit a patch?