X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/8b8c45ee2da18a0c7139e7ff88991aa2a1fff12f..6edd1557491a0d08302ba7506dbccc36f620b5e1:/idf_authors.py diff --git a/idf_authors.py b/idf_authors.py index 379de5a..92a4965 100644 --- a/idf_authors.py +++ b/idf_authors.py @@ -25,19 +25,19 @@ authors = authors.withColumn('author_id',f.monotonically_increasing_id()) # map terms to indexes in the tfs and the idfs -df = df.join(terms,on='author') +df = df.join(authors,on='author') -idf = idf.join(terms,on='author') +idf = idf.join(authors,on='author') # join on subreddit/term/week to create tf/dfs indexed by term df = df.join(idf, on=['author_id','week','author']) # agg terms by subreddit to make sparse tf/df vectors df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf) - -df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps')) + +df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('author_id','tf_idf')).alias('tfidf_maps')) df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps')) # output: subreddit | week | tf/df -df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy') +df.write.json('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy')