+def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
+ term = term_colname
+ term_id = term + '_id'
+ term_id_new = term + '_id_new'
+
+ if min_df is None:
+ min_df = 0.1 * len(included_subreddits)
+
+ tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
+
+ # we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
+ sub_ids = tfidf.select(['subreddit_id','week']).distinct()
+ sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
+ tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
+
+ # only use terms in at least min_df included subreddits in a given week
+ new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
+ tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
+
+ # reset the term ids
+ term_ids = tfidf.select([term_id,'week']).distinct()
+ term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
+ tfidf = tfidf.join(term_ids,[term_id,'week'])
+
+ tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
+ tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
+
+ tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
+
+ tfidf = tfidf.repartition('week')
+
+ tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
+ return(tempdir)
+
+