From: Nathan TeBlunthuis Date: Wed, 19 Jan 2022 21:57:02 +0000 (-0800) Subject: commit changes from smap project. X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/7b130a30af863dfa727d80d9fea23648dcc9d5d8?hp=-c commit changes from smap project. --- 7b130a30af863dfa727d80d9fea23648dcc9d5d8 diff --git a/density/overlap_density.py b/density/overlap_density.py index 2036824..ef0eb26 100644 --- a/density/overlap_density.py +++ b/density/overlap_density.py @@ -4,9 +4,9 @@ from pathlib import Path import fire import numpy as np import sys -sys.path.append("..") -sys.path.append("../similarities") -from similarities.similarities_helper import reindex_tfidf +# sys.path.append("..") +# sys.path.append("../similarities") +# from similarities.similarities_helper import pull_tfidf # this is the mean of the ratio of the overlap to the focal size. # mean shared membership per focal community member diff --git a/similarities/lsi_similarities.py b/similarities/lsi_similarities.py index eb89f55..565e53f 100644 --- a/similarities/lsi_similarities.py +++ b/similarities/lsi_similarities.py @@ -5,28 +5,28 @@ from similarities_helper import * #from similarities_helper import similarities, lsi_column_similarities from functools import partial -inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/" -term_colname='term' -outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI' -n_components=[10,50,100] -included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" -n_iter=5 -random_state=1968 -algorithm='arpack' -topN = None -from_date=None -to_date=None -min_df=None -max_df=None +# inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" +# term_colname='authors' +# outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_test_compex_LSI' +# n_components=[10,50,100] +# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" +# n_iter=5 +# random_state=1968 +# algorithm='randomized' +# topN = None +# from_date=None +# to_date=None +# min_df=None +# max_df=None + def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None): print(n_components,flush=True) - if lsi_model is None: if type(n_components) == list: - lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}_LSIMOD.pkl' + lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}s_LSIMOD.pkl' else: - lsi_model = Path(outfile) / f'{n_components}_{term_colname}_LSIMOD.pkl' + lsi_model = Path(outfile) / f'{n_components}_{term_colname}s_LSIMOD.pkl' simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model) @@ -62,7 +62,7 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t n_components=n_components ) -def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968): +def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968): return lsi_similarities(inpath, 'author', outfile, diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index d97e519..202220c 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -43,7 +43,7 @@ def reindex_tfidf(*args, **kwargs): new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() new_ids = new_ids.set_index('subreddit_id') subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() - subreddit_names = subreddit_names.drop("subreddit_id",1) + subreddit_names = subreddit_names.drop("subreddit_id",axis=1) subreddit_names = subreddit_names.sort_values("subreddit_id_new") return(df, subreddit_names) @@ -51,8 +51,9 @@ def pull_tfidf(*args, **kwargs): df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False) return df -def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): - print(f"loading tfidf {infile}", flush=True) +def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True): + print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True) + if week is not None: tfidf_ds = ds.dataset(infile, partitioning='hive') else: @@ -94,23 +95,23 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu projection = { 'subreddit_id':ds.field('subreddit_id'), term_id:ds.field(term_id), - 'relative_tf':ds.field('relative_tf').cast('float32'), 'tf_idf':ds.field('tf_idf').cast('float32')} - print(projection) - + print(projection, flush=True) + print(ds_filter, flush=True) df = tfidf_ds.to_table(filter=ds_filter,columns=projection) df = df.to_pandas(split_blocks=True,self_destruct=True) - print("assigning indexes",flush=True) + if reindex: - df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + print("assigning indexes",flush=True) + df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1 else: df['subreddit_id_new'] = df['subreddit_id'] if reindex: grouped = df.groupby(term_id) - df[term_id_new] = grouped.ngroup() + df[term_id_new] = grouped.ngroup() + 1 else: df[term_id_new] = df[term_id] @@ -126,17 +127,17 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu return (df, tfidf_ds, ds_filter) - with Pool(cpu_count()) as pool: - chunks = pool.imap_unordered(pull_names,batches) - subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() + # with Pool(cpu_count()) as pool: + # chunks = pool.imap_unordered(pull_names,batches) + # subreddit_names = pd.concat(chunks,copy=False).drop_duplicates() - subreddit_names = subreddit_names.set_index("subreddit_id") - new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() - new_ids = new_ids.set_index('subreddit_id') - subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() - subreddit_names = subreddit_names.drop("subreddit_id",1) - subreddit_names = subreddit_names.sort_values("subreddit_id_new") - return(df, subreddit_names) + # subreddit_names = subreddit_names.set_index("subreddit_id") + # new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates() + # new_ids = new_ids.set_index('subreddit_id') + # subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index() + # subreddit_names = subreddit_names.drop("subreddit_id",1) + # subreddit_names = subreddit_names.sort_values("subreddit_id_new") + # return(df, subreddit_names) def pull_names(batch): return(batch.to_pandas().drop_duplicates()) @@ -170,7 +171,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non term_id_new = term + '_id_new' entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date) - mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new))) + mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))) print("loading matrix") @@ -238,7 +239,8 @@ def test_lsi_sims(): # if n_components is a list we'll return a list of similarities with different latent dimensionalities # if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations. # this function takes the svd and then the column similarities of it -def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None): +# lsi_model_load = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl" +def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model=None): # first compute the lsi of the matrix # then take the column similarities @@ -249,28 +251,24 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196 svd_components = n_components[0] - if lsi_model_load is not None and Path(lsi_model_load).exists(): - print("loading LSI") - mod = pickle.load(open(lsi_model_load ,'rb')) - lsi_model_save = lsi_model_load - - else: + if lsi_model is None: print("running LSI",flush=True) - svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter) mod = svd.fit(tfidfmat.T) + else: + mod = lsi_model lsimat = mod.transform(tfidfmat.T) if lsi_model_save is not None: + Path(lsi_model_save).parent.mkdir(exist_ok=True,parents=True) pickle.dump(mod, open(lsi_model_save,'wb')) - sims_list = [] + print(n_components) for n_dims in n_components: + print("computing similarities") sims = column_similarities(lsimat[:,np.arange(n_dims)]) - if len(n_components) > 1: - yield (sims, n_dims) - else: - return sims + yield (sims, n_dims) + def column_similarities(mat): @@ -326,11 +324,11 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig else: # tf_fam = tf_weight.Norm05 df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf) - df = df.repartition(400,'subreddit','week') + df = df.repartition('week') dfwriter = df.write.partitionBy("week") return dfwriter -def _calc_tfidf(df, term_colname, tf_family): +def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None): term = term_colname term_id = term + '_id' @@ -348,7 +346,13 @@ def _calc_tfidf(df, term_colname, tf_family): idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1) # collect the dictionary to make a pydict of terms to indexes - terms = idf.select(term).distinct() # terms are distinct + terms = idf + if min_df is not None: + terms = terms.filter(f.col('count')>=min_df) + if max_df is not None: + terms = terms.filter(f.col('count')<=max_df) + + terms = terms.select(term).distinct() # terms are distinct terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct # make subreddit ids @@ -358,12 +362,12 @@ def _calc_tfidf(df, term_colname, tf_family): df = df.join(subreddits,on='subreddit') # map terms to indexes in the tfs and the idfs - df = df.join(terms,on=term) # subreddit-term-id is unique + df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique - idf = idf.join(terms,on=term) + idf = idf.join(terms,on=term,how='inner') # join on subreddit/term to create tf/dfs indexed by term - df = df.join(idf, on=[term_id, term]) + df = df.join(idf, on=[term_id, term],how='inner') # agg terms by subreddit to make sparse tf/df vectors if tf_family == tf_weight.MaxTF: @@ -374,14 +378,14 @@ def _calc_tfidf(df, term_colname, tf_family): return df -def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): +def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None): term = term_colname term_id = term + '_id' - # aggregate counts by week. now subreddit-term is distinct + df = df.filter(df.subreddit.isin(include_subs)) df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf')) - df = _calc_tfidf(df, term_colname, tf_family) + df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df) df = df.repartition('subreddit') dfwriter = df.write return dfwriter diff --git a/similarities/tfidf.py b/similarities/tfidf.py index 01b0b20..3356299 100644 --- a/similarities/tfidf.py +++ b/similarities/tfidf.py @@ -2,9 +2,12 @@ import fire from pyspark.sql import SparkSession from pyspark.sql import functions as f from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits +from functools import partial -def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits): - spark = SparkSession.builder.getOrCreate()y +inpath = '/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet' +# include_terms is a path to a parquet file that contains a column of term_colname + '_id' to include. +def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None): + spark = SparkSession.builder.getOrCreate() df = spark.read.parquet(inpath) @@ -15,50 +18,72 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_ else: include_subs = select_topN_subreddits(topN) - dfwriter = func(df, include_subs, term_colname) + include_subs = spark.sparkContext.broadcast(include_subs) + + # term_id = term_colname + "_id" + + if included_terms is not None: + terms_df = spark.read.parquet(included_terms) + terms_df = terms_df.select(term_colname).distinct() + df = df.join(terms_df, on=term_colname, how='left_semi') + + dfwriter = func(df, include_subs.value, term_colname) dfwriter.parquet(outpath,mode='overwrite',compression='snappy') spark.stop() -def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits): - return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) +def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits, min_df, max_df): + tfidf_func = partial(tfidf_dataset, max_df=max_df, min_df=min_df) + return _tfidf_wrapper(tfidf_func, inpath, outpath, topN, term_colname, exclude, included_subreddits) + +def tfidf_weekly(inpath, outpath, static_tfidf_path, topN, term_colname, exclude, included_subreddits): + return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=static_tfidf_path) -def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits): - return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', topN=None, - included_subreddits=None): + included_subreddits=None, + min_df=None, + max_df=None): return tfidf(inpath, outpath, topN, 'author', ['[deleted]','AutoModerator'], - included_subreddits=included_subreddits + included_subreddits=included_subreddits, + min_df=min_df, + max_df=max_df ) def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', topN=None, - included_subreddits=None): + included_subreddits=None, + min_df=None, + max_df=None): return tfidf(inpath, outpath, topN, 'term', [], - included_subreddits=included_subreddits + included_subreddits=included_subreddits, + min_df=min_df, + max_df=max_df ) def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", + static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet", outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', topN=None, - included_subreddits=None): + included_subreddits=None + ): return tfidf_weekly(inpath, outpath, + static_tfidf_path, topN, 'author', ['[deleted]','AutoModerator'], @@ -66,13 +91,16 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_ ) def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", + static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet", outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', topN=None, - included_subreddits=None): + included_subreddits=None + ): return tfidf_weekly(inpath, outpath, + static_tfidf_path, topN, 'term', [], diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index 6ce30b8..45327c7 100755 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -13,18 +13,23 @@ from similarities_helper import pull_tfidf, column_similarities, write_weekly_si from scipy.sparse import csr_matrix from multiprocessing import Pool, cpu_count from functools import partial - -infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet" -tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" -min_df=None -included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" -max_df = None -topN=100 -term_colname='author' -# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet' -# included_subreddits=None - -def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms): +import pickle + +# tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors_tfidf.parquet" +# #tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data//comment_authors_compex.parquet" +# min_df=2 +# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" +# max_df = None +# topN=100 +# term_colname='author' +# # outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet' +# # included_subreddits=None +outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors.parquet"; infile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_authors_tfidf.parquet"; included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"; lsi_model="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/2000_authors_LSIMOD.pkl"; n_components=1500; algorithm="randomized"; term_colname='author'; tfidf_path=infile; random_state=1968; + +# static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" +# dftest = spark.read.parquet(static_tfidf) + +def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None): term = term_colname term_id = term + '_id' term_id_new = term + '_id_new' @@ -32,20 +37,19 @@ def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, entries = pull_tfidf(infile = tfidf_path, term_colname=term_colname, - min_df=min_df, - max_df=max_df, included_subreddits=included_subreddits, topN=topN, - week=week, + week=week.isoformat(), rescale_idf=False) tfidf_colname='tf_idf' # if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0])) - print('computing similarities') + print(simfunc) sims = simfunc(mat) del mat + sims = next(sims)[0] sims = pd.DataFrame(sims) sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) sims['_subreddit'] = subreddit_names.subreddit.values @@ -56,18 +60,20 @@ def pull_weeks(batch): return set(batch.to_pandas()['week']) # This requires a prefit LSI model, since we shouldn't fit different LSI models for every week. -def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs): +def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kwargs): + print(args) + print(kwargs) term_colname= kwargs.get('term_colname') - #lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl" - - # simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model) + # lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/1000_author_LSIMOD.pkl" - simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model) + lsi_model = pickle.load(open(lsi_model,'rb')) + #simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=random_state,algorithm='randomized',lsi_model=lsi_model) + simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=kwargs.get('random_state'),lsi_model=lsi_model) return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet') -def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities): +def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=None,max_df=None): print(outfile) # do this step in parallel if we have the memory for it. # should be doable with pool.map @@ -84,12 +90,14 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, spark.stop() print(f"computing weekly similarities") - week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms) + week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms) - pool = Pool(cpu_count()) - - list(pool.imap(week_similarities_helper,weeks)) - pool.close() + for week in weeks: + week_similarities_helper(week) + # pool = Pool(cpu_count()) + + # list(pool.imap(week_similarities_helper, weeks)) + # pool.close() # with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine? @@ -97,10 +105,11 @@ def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/ return cosine_similarities_weekly(infile, outfile, 'author', - min_df, max_df, included_subreddits, - topN) + topN, + min_df=2 +) def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None): return cosine_similarities_weekly(infile, @@ -112,32 +121,29 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re topN) -def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None): +def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None): return cosine_similarities_weekly_lsi(infile, outfile, 'author', - min_df, - max_df, - included_subreddits, - topN, + included_subreddits=included_subreddits, n_components=n_components, - lsi_model=lsi_model) + lsi_model=lsi_model + ) -def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None): +def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None): return cosine_similarities_weekly_lsi(infile, outfile, 'term', - min_df, - max_df, - included_subreddits, - topN, + included_subreddits=included_subreddits, n_components=n_components, - lsi_model=lsi_model) + lsi_model=lsi_model, + ) if __name__ == "__main__": fire.Fire({'authors':author_cosine_similarities_weekly, 'terms':term_cosine_similarities_weekly, 'authors-lsi':author_cosine_similarities_weekly_lsi, - 'terms-lsi':term_cosine_similarities_weekly + 'terms-lsi':term_cosine_similarities_weekly_lsi }) + diff --git a/timeseries/cluster_timeseries.py b/timeseries/cluster_timeseries.py index 91fa705..2286ab0 100644 --- a/timeseries/cluster_timeseries.py +++ b/timeseries/cluster_timeseries.py @@ -12,10 +12,6 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", output="data/subreddit_timeseries.parquet"): - - clusters = load_clusters(term_clusters_path, author_clusters_path) - densities = load_densities(term_densities_path, author_densities_path) - spark = SparkSession.builder.getOrCreate() df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet") @@ -26,11 +22,15 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count() ts = ts.repartition('subreddit') - spk_clusters = spark.createDataFrame(clusters) + + if term_densities_path is not None and author_densities_path is not None: + densities = load_densities(term_densities_path, author_densities_path) + spk_densities = spark.createDataFrame(densities) + ts = ts.join(spk_densities, on='subreddit', how='inner') + clusters = load_clusters(term_clusters_path, author_clusters_path) + spk_clusters = spark.createDataFrame(clusters) ts = ts.join(spk_clusters, on='subreddit', how='inner') - spk_densities = spark.createDataFrame(densities) - ts = ts.join(spk_densities, on='subreddit', how='inner') ts.write.parquet(output, mode='overwrite') if __name__ == "__main__":