X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/e7942146532ae43d82181d0ed7f60fe60e8dc8b3..82d184d9c608db47f5c37f17a18962f07169cbc7:/similarities_helper.py diff --git a/similarities_helper.py b/similarities_helper.py index c69983f..5933f8e 100644 --- a/similarities_helper.py +++ b/similarities_helper.py @@ -2,11 +2,67 @@ from pyspark.sql import Window from pyspark.sql import functions as f from enum import Enum from pyspark.mllib.linalg.distributed import CoordinateMatrix +from tempfile import TemporaryDirectory +import pyarrow +import pyarrow.dataset as ds +from scipy.sparse import csr_matrix +import pandas as pd +import numpy as np class tf_weight(Enum): MaxTF = 1 Norm05 = 2 +def read_tfidf_matrix(path,term_colname): + term = term_colname + term_id = term + '_id' + term_id_new = term + '_id_new' + + dataset = ds.dataset(path,format='parquet') + entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas() + return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1)))) + +def column_similarities(mat): + norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32)) + mat = mat.multiply(1/norm) + sims = mat.T @ mat + return(sims) + + +def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): + term = term_colname + term_id = term + '_id' + term_id_new = term + '_id_new' + + if min_df is None: + min_df = 0.1 * len(included_subreddits) + + tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) + + # reset the subreddit ids + sub_ids = tfidf.select('subreddit_id').distinct() + sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id"))) + tfidf = tfidf.join(sub_ids,'subreddit_id') + + # only use terms in at least min_df included subreddits + new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count')) +# new_count = new_count.filter(f.col('new_count') >= min_df) + tfidf = tfidf.join(new_count,term_id,how='inner') + + # reset the term ids + term_ids = tfidf.select([term_id]).distinct() + term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id))) + tfidf = tfidf.join(term_ids,term_id) + + tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") + # tfidf = tfidf.withColumnRenamed("idf","idf_old") + # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count"))) + tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float')) + + tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.') + + tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy') + return tempdir def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): term = term_colname