from itertools import islice
from pathlib import Path
from similarities_helper import *
-from multiprocessing import pool
+from multiprocessing import Pool, cpu_count
def _week_similarities(tempdir, term_colname, week):
print(f"loading matrix: {week}")
print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
print("creating temporary parquet with matrix indicies")
- tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
+ tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df=None, included_subreddits=included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
def week_similarities_helper(week):
_week_similarities(tempdir, term_colname, week)
- with Pool(40) as pool: # maybe it can be done with 40 cores on the huge machine?
- list(pool.map(weeks,week_similarities_helper))
+ with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
+ list(pool.map(week_similarities_helper,weeks))
def author_cosine_similarities_weekly(outfile, min_df=2 , included_subreddits=None, topN=500):
- return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_30k.parquet',
+ return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
outfile,
'author',
min_df,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
- return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_30k.parquet',
+ return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,