- # if issparse(mat):
- # norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
- # mat = mat.multiply(1/norm)
- # else:
- # norm = np.matrix(np.power(np.power(mat,2).sum(axis=0),0.5,dtype=np.float32))
- # mat = np.multiply(mat,1/norm)
- # sims = mat.T @ mat
- # return(sims)
-
-
-def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
- term = term_colname
- term_id = term + '_id'
- term_id_new = term + '_id_new'
-
- if min_df is None:
- min_df = 0.1 * len(included_subreddits)
- tfidf = tfidf.filter(f.col('count') >= min_df)
- if max_df is not None:
- tfidf = tfidf.filter(f.col('count') <= max_df)
-
- tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-
- # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
- sub_ids = tfidf.select(['subreddit_id','week']).distinct()
- sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
- tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
-
- # only use terms in at least min_df included subreddits in a given week
- new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
- tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
-
- # reset the term ids
- term_ids = tfidf.select([term_id,'week']).distinct()
- term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
- tfidf = tfidf.join(term_ids,[term_id,'week'])
-
- tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
-
- tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
-
- tfidf = tfidf.repartition('week')
-
- tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
- return(tempdir)
-
-
-def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
- term = term_colname
- term_id = term + '_id'
- term_id_new = term + '_id_new'
-
- if min_df is None:
- min_df = 0.1 * len(included_subreddits)
-
- tfidf = tfidf.filter(f.col('count') >= min_df)
- if max_df is not None:
- tfidf = tfidf.filter(f.col('count') <= max_df)
-
- tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
-
- # reset the subreddit ids
- sub_ids = tfidf.select('subreddit_id').distinct()
- sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
- tfidf = tfidf.join(sub_ids,'subreddit_id')
-
- # only use terms in at least min_df included subreddits
- new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
- tfidf = tfidf.join(new_count,term_id,how='inner')
-
- # reset the term ids
- term_ids = tfidf.select([term_id]).distinct()
- term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
- tfidf = tfidf.join(term_ids,term_id)
-
- tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
-
- tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
-
- tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
- return tempdir
-
-
-# try computing cosine similarities using spark
-def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
- term = term_colname
- term_id = term + '_id'
- term_id_new = term + '_id_new'
-
- if min_df is None:
- min_df = 0.1 * len(included_subreddits)
-
- tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
- tfidf = tfidf.cache()
-
- # reset the subreddit ids
- sub_ids = tfidf.select('subreddit_id').distinct()
- sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
- tfidf = tfidf.join(sub_ids,'subreddit_id')
-
- # only use terms in at least min_df included subreddits
- new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
- tfidf = tfidf.join(new_count,term_id,how='inner')
-
- # reset the term ids
- term_ids = tfidf.select([term_id]).distinct()
- term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
- tfidf = tfidf.join(term_ids,term_id)
-
- tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
- tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
-
- # step 1 make an rdd of entires
- # sorted by (dense) spark subreddit id
- n_partitions = int(len(included_subreddits)*2 / 5)
-
- entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
-
- # put like 10 subredis in each partition
-
- # step 2 make it into a distributed.RowMatrix
- coordMat = CoordinateMatrix(entries)
-
- coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
-
- # this needs to be an IndexedRowMatrix()
- mat = coordMat.toRowMatrix()
-
- #goal: build a matrix of subreddit columns and tf-idfs rows
- sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
-
- return (sim_dist, tfidf)