-def column_similarities(mat):
- norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
- mat = mat.multiply(1/norm)
- sims = mat.T @ mat
- return(sims)
-
-
-def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
- term = term_colname
- term_id = term + '_id'
- term_id_new = term + '_id_new'
-
- if min_df is None:
- min_df = 0.1 * len(included_subreddits)
- tfidf = tfidf.filter(f.col('count') >= min_df)
- if max_df is not None:
- tfidf = tfidf.filter(f.col('count') <= max_df)
-
- tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-
- # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
- sub_ids = tfidf.select(['subreddit_id','week']).distinct()
- sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
- tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
-
- # only use terms in at least min_df included subreddits in a given week
- new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
- tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
-
- # reset the term ids
- term_ids = tfidf.select([term_id,'week']).distinct()
- term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
- tfidf = tfidf.join(term_ids,[term_id,'week'])
-
- tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
-
- tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
-
- tfidf = tfidf.repartition('week')
-
- tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
- return(tempdir)
-
-
-def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
- term = term_colname