X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/8d1df5b26ee80fee639e5b3ecd057fe8e72f166c..4cb7eeec80c5a9c8f49339acd378c515e290ed81:/clustering/affinity_clustering.py?ds=sidebyside diff --git a/clustering/affinity_clustering.py b/clustering/affinity_clustering.py index 287f7e2..d10628a 100644 --- a/clustering/affinity_clustering.py +++ b/clustering/affinity_clustering.py @@ -1,15 +1,12 @@ -from sklearn.metrics import silhouette_score from sklearn.cluster import AffinityPropagation -from functools import partial from dataclasses import dataclass -from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result -from multiprocessing import Pool, cpu_count, Array, Process +from clustering_base import clustering_result, clustering_job +from grid_sweep import grid_sweep from pathlib import Path from itertools import product, starmap -import numpy as np -import pandas as pd import fire import sys +import numpy as np # silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying. @dataclass @@ -17,116 +14,116 @@ class affinity_clustering_result(clustering_result): damping:float convergence_iter:int preference_quantile:float - -def affinity_clustering(similarities, output, *args, **kwargs): - subreddits, mat = read_similarity_mat(similarities) - clustering = _affinity_clustering(mat, *args, **kwargs) - cluster_data = process_clustering_result(clustering, subreddits) - cluster_data['algorithm'] = 'affinity' - return(cluster_data) - -def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): - ''' - similarities: matrix of similarity scores - preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits. - damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. - ''' - print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}") - - preference = np.quantile(mat,preference_quantile) - - print(f"preference is {preference}") - print("data loaded") - sys.stdout.flush() - clustering = AffinityPropagation(damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - copy=False, - preference=preference, - affinity='precomputed', - verbose=verbose, - random_state=random_state).fit(mat) - - cluster_data = process_clustering_result(clustering, subreddits) - output = Path(output) - output.parent.mkdir(parents=True,exist_ok=True) - cluster_data.to_feather(output) - print(f"saved {output}") - return clustering - - -def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False): - if name is None: - name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}" - print(name) - sys.stdout.flush() - outpath = outdir / (str(name) + ".feather") - outpath.parent.mkdir(parents=True,exist_ok=True) - print(outpath) - clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose) - cluster_data = process_clustering_result(clustering, subreddits) - mat = sim_to_dist(clustering.affinity_matrix_) - - try: - score = silhouette_score(mat, clustering.labels_, metric='precomputed') - except ValueError: - score = None - - if alt_mat is not None: - alt_distances = sim_to_dist(alt_mat) - try: - alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed') - except ValueError: - alt_score = None + preference:float + max_iter:int + +class affinity_job(clustering_job): + def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): + super().__init__(infile, + outpath, + name, + call=self._affinity_clustering, + preference_quantile=preference_quantile, + damping=damping, + max_iter=max_iter, + convergence_iter=convergence_iter, + random_state=1968, + verbose=verbose) + self.damping=damping + self.max_iter=max_iter + self.convergence_iter=convergence_iter + self.preference_quantile=preference_quantile + + def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs): + mat = 1-mat + preference = np.quantile(mat, preference_quantile) + self.preference = preference + print(f"preference is {preference}") + print("data loaded") + sys.stdout.flush() + clustering = AffinityPropagation(*args, + preference=preference, + affinity='precomputed', + copy=False, + **kwargs).fit(mat) + return clustering + + def get_info(self): + result = super().get_info() + self.result=affinity_clustering_result(**result.__dict__, + damping=self.damping, + max_iter=self.max_iter, + convergence_iter=self.convergence_iter, + preference_quantile=self.preference_quantile, + preference=self.preference) + + return self.result + +class affinity_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + *args, + **kwargs): + + super().__init__(affinity_job, + _afffinity_grid_sweep, + inpath, + outpath, + self.namer, + *args, + **kwargs) + def namer(self, + damping, + max_iter, + convergence_iter, + preference_quantile): + + return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}" + +def run_affinity_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5]): + """Run affinity clustering once or more with different parameters. - res = affinity_clustering_result(outpath=outpath, - damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - preference_quantile=preference_quantile, - silhouette_score=score, - alt_silhouette_score=score, - name=str(name)) - - return res - -# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering). - -def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None): - - damping = list(map(float,damping)) - convergence_iter = convergence_iter = list(map(int,convergence_iter)) - preference_quantile = list(map(float,preference_quantile)) - - if type(outdir) is str: - outdir = Path(outdir) - - outdir.mkdir(parents=True,exist_ok=True) - - subreddits, mat = read_similarity_mat(similarities,use_threads=True) - - if alt_similarities is not None: - alt_mat = read_similarity_mat(alt_similarities,use_threads=True) - else: - alt_mat = None - - if J is None: - J = cpu_count() - pool = Pool(J) - - # get list of tuples: the combinations of hyperparameters - hyper_grid = product(damping, convergence_iter, preference_quantile) - hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid)) - - _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat) + Usage: + affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters= --dampings= --preference_quantiles= + + Keword arguments: + savefile: path to save the metadata and diagnostics + inpath: path to feather data containing a labeled matrix of subreddit similarities. + outpath: path to output fit kmeans clusterings. + dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering. + preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter. + convergence_iters:one or more integers of number of iterations without improvement before stopping. + max_iters: one or more numbers of different maximum interations. + """ + obj = affinity_grid_sweep(inpath, + outpath, + map(float,dampings), + map(int,max_iters), + map(int,convergence_iters), + map(float,preference_quantiles)) + obj.run(1) + obj.save(savefile) + +def test_select_affinity_clustering(): + # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", + # "test_hdbscan_author30k", + # min_cluster_sizes=[2], + # min_samples=[1,2], + # cluster_selection_epsilons=[0,0.05,0.1,0.15], + # cluster_selection_methods=['eom','leaf'], + # lsi_dimensions='all') + inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/" + outpath = "test_affinity"; + dampings=[0.8,0.9] + max_iters=[100000] + convergence_iters=[15] + preference_quantiles=[0.5,0.7] + + gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles) + gs.run(20) + gs.save("test_affinity/lsi_sweep.csv") - # similarities = Array('d', mat) - # call pool.starmap - print("running clustering selection") - clustering_data = pool.starmap(_do_clustering, hyper_grid) - clustering_data = pd.DataFrame(list(clustering_data)) - clustering_data.to_csv(outinfo) - return clustering_data if __name__ == "__main__": - x = fire.Fire(select_affinity_clustering) + fire.Fire(run_affinity_grid_sweep)