From 5632a971c633834720b1beb7f65f8a5d0924c0e7 Mon Sep 17 00:00:00 2001 From: Nate E TeBlunthuis Date: Tue, 10 Nov 2020 13:18:19 -0800 Subject: [PATCH] Refactor tfidf code to for code resuse. --- similarities_helper.py | 116 +++++++++++++++++++++++++++++++++++++++++ tfidf_authors.py | 46 ++++------------ tfidf_comments.py | 40 +------------- 3 files changed, 129 insertions(+), 73 deletions(-) create mode 100644 similarities_helper.py diff --git a/similarities_helper.py b/similarities_helper.py new file mode 100644 index 0000000..c69983f --- /dev/null +++ b/similarities_helper.py @@ -0,0 +1,116 @@ +from pyspark.sql import Window +from pyspark.sql import functions as f +from enum import Enum +from pyspark.mllib.linalg.distributed import CoordinateMatrix + +class tf_weight(Enum): + MaxTF = 1 + Norm05 = 2 + + +def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): + term = term_colname + term_id = term + '_id' + term_id_new = term + '_id_new' + + if min_df is None: + min_df = 0.1 * len(included_subreddits) + + tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) + tfidf = tfidf.cache() + + # reset the subreddit ids + sub_ids = tfidf.select('subreddit_id').distinct() + sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id"))) + tfidf = tfidf.join(sub_ids,'subreddit_id') + + # only use terms in at least min_df included subreddits + new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count')) +# new_count = new_count.filter(f.col('new_count') >= min_df) + tfidf = tfidf.join(new_count,term_id,how='inner') + + # reset the term ids + term_ids = tfidf.select([term_id]).distinct() + term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id))) + tfidf = tfidf.join(term_ids,term_id) + + tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") + # tfidf = tfidf.withColumnRenamed("idf","idf_old") + # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count"))) + tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf) + + # step 1 make an rdd of entires + # sorted by (dense) spark subreddit id + # entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd + + n_partitions = int(len(included_subreddits)*2 / 5) + + entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions) + + # put like 10 subredis in each partition + + # step 2 make it into a distributed.RowMatrix + coordMat = CoordinateMatrix(entries) + + coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions)) + + # this needs to be an IndexedRowMatrix() + mat = coordMat.toRowMatrix() + + #goal: build a matrix of subreddit columns and tf-idfs rows + sim_dist = mat.columnSimilarities(threshold=similarity_threshold) + + return (sim_dist, tfidf) + + +def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): + + term = term_colname + term_id = term + '_id' + # aggregate counts by week. now subreddit-term is distinct + df = df.filter(df.subreddit.isin(include_subs)) + df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf')) + + max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique + max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') + + df = df.join(max_subreddit_terms, on='subreddit') + + df = df.withColumn("relative_tf", df.tf / df.sr_max_tf) + + # group by term. term is unique + idf = df.groupby([term]).count() + + N_docs = df.select('subreddit').distinct().count() + + # add a little smoothing to the idf + idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1) + + # collect the dictionary to make a pydict of terms to indexes + terms = idf.select(term).distinct() # terms are distinct + terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct + + # make subreddit ids + subreddits = df.select(['subreddit']).distinct() + subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) + + df = df.join(subreddits,on='subreddit') + + # map terms to indexes in the tfs and the idfs + df = df.join(terms,on=term) # subreddit-term-id is unique + + idf = idf.join(terms,on=term) + + # join on subreddit/term to create tf/dfs indexed by term + df = df.join(idf, on=[term_id, term]) + + # agg terms by subreddit to make sparse tf/df vectors + + if tf_family == tf_weight.MaxTF: + df = df.withColumn("tf_idf", df.relative_tf * df.idf) + else: # tf_fam = tf_weight.Norm05 + df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf) + + return df + + diff --git a/tfidf_authors.py b/tfidf_authors.py index 92a4965..f06a8ce 100644 --- a/tfidf_authors.py +++ b/tfidf_authors.py @@ -1,43 +1,19 @@ -from pyspark.sql import functions as f from pyspark.sql import SparkSession +from similarities_helper import build_tfidf_dataset +## TODO:need to exclude automoderator / bot posts. +## TODO:need to exclude better handle hyperlinks. spark = SparkSession.builder.getOrCreate() -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") -max_subreddit_week_authors = df.groupby(['subreddit','week']).max('tf') -max_subreddit_week_authors = max_subreddit_week_authors.withColumnRenamed('max(tf)','sr_week_max_tf') +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet") -df = df.join(max_subreddit_week_authors, ['subreddit','week']) +include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) +include_subs = {s.strip('\n') for s in include_subs} +df = df.filter(df.author != '[deleted]') +df = df.filter(df.author != 'AutoModerator') -df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf) +df = build_tfidf_dataset(df, include_subs, 'author') -# group by term / week -idf = df.groupby(['author','week']).count() +df.cache() -idf = idf.withColumnRenamed('count','idf') - -# output: term | week | df -#idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') - -# collect the dictionary to make a pydict of terms to indexes -authors = idf.select('author').distinct() -authors = authors.withColumn('author_id',f.monotonically_increasing_id()) - - -# map terms to indexes in the tfs and the idfs -df = df.join(authors,on='author') - -idf = idf.join(authors,on='author') - -# join on subreddit/term/week to create tf/dfs indexed by term -df = df.join(idf, on=['author_id','week','author']) - -# agg terms by subreddit to make sparse tf/df vectors -df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf) - -df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('author_id','tf_idf')).alias('tfidf_maps')) - -df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps')) - -# output: subreddit | week | tf/df -df.write.json('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy') +df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy') diff --git a/tfidf_comments.py b/tfidf_comments.py index b3e5624..9e1a437 100644 --- a/tfidf_comments.py +++ b/tfidf_comments.py @@ -1,6 +1,7 @@ from pyspark.sql import functions as f from pyspark.sql import SparkSession from pyspark.sql import Window +from similarities_helper import build_tfidf_dataset ## TODO:need to exclude automoderator / bot posts. ## TODO:need to exclude better handle hyperlinks. @@ -11,43 +12,6 @@ df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parq include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) include_subs = {s.strip('\n') for s in include_subs} -# aggregate counts by week. now subreddit-term is distinct -df = df.filter(df.subreddit.isin(include_subs)) -df = df.groupBy(['subreddit','term']).agg(f.sum('tf').alias('tf')) - -max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique -max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') - -df = df.join(max_subreddit_terms, on='subreddit') - -df = df.withColumn("relative_tf", df.tf / df.sr_max_tf) - -# group by term. term is unique -idf = df.groupby(['term']).count() - -N_docs = df.select('subreddit').distinct().count() - -idf = idf.withColumn('idf',f.log(N_docs/f.col('count'))) - -# collect the dictionary to make a pydict of terms to indexes -terms = idf.select('term').distinct() # terms are distinct -terms = terms.withColumn('term_id',f.row_number().over(Window.orderBy("term"))) # term ids are distinct - -# make subreddit ids -subreddits = df.select(['subreddit']).distinct() -subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) - -df = df.join(subreddits,on='subreddit') - -# map terms to indexes in the tfs and the idfs -df = df.join(terms,on='term') # subreddit-term-id is unique - -idf = idf.join(terms,on='term') - -# join on subreddit/term to create tf/dfs indexed by term -df = df.join(idf, on=['term_id','term']) - -# agg terms by subreddit to make sparse tf/df vectors -df = df.withColumn("tf_idf", (0.5 + (0.5 * df.relative_tf) * df.idf)) +df = build_tfidf_dataset(df, include_subs, 'term') df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') -- 2.39.2