# join on subreddit/term/week to create tf/dfs indexed by term
df = df.join(idf, on=['author_id','week','author'])
# agg terms by subreddit to make sparse tf/df vectors
df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf)
# join on subreddit/term/week to create tf/dfs indexed by term
df = df.join(idf, on=['author_id','week','author'])
# agg terms by subreddit to make sparse tf/df vectors
df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf)
-
-df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps'))
+
+df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('author_id','tf_idf')).alias('tfidf_maps'))
df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps'))
# output: subreddit | week | tf/df
df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps'))
# output: subreddit | week | tf/df
-df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy')
+df.write.json('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy')