X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/8d1df5b26ee80fee639e5b3ecd057fe8e72f166c..8a2248fae1ee5818576b9a8f2849d1ad0efd8187:/clustering/affinity_clustering.py?ds=sidebyside diff --git a/clustering/affinity_clustering.py b/clustering/affinity_clustering.py index 287f7e2..b4f8461 100644 --- a/clustering/affinity_clustering.py +++ b/clustering/affinity_clustering.py @@ -2,7 +2,8 @@ from sklearn.metrics import silhouette_score from sklearn.cluster import AffinityPropagation from functools import partial from dataclasses import dataclass -from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result +from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat +from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep from multiprocessing import Pool, cpu_count, Array, Process from pathlib import Path from itertools import product, starmap @@ -17,116 +18,158 @@ class affinity_clustering_result(clustering_result): damping:float convergence_iter:int preference_quantile:float + preference:float + max_iter:int -def affinity_clustering(similarities, output, *args, **kwargs): - subreddits, mat = read_similarity_mat(similarities) - clustering = _affinity_clustering(mat, *args, **kwargs) - cluster_data = process_clustering_result(clustering, subreddits) - cluster_data['algorithm'] = 'affinity' - return(cluster_data) - -def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): - ''' - similarities: matrix of similarity scores - preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits. - damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. - ''' - print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}") - - preference = np.quantile(mat,preference_quantile) - - print(f"preference is {preference}") - print("data loaded") - sys.stdout.flush() - clustering = AffinityPropagation(damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - copy=False, - preference=preference, - affinity='precomputed', - verbose=verbose, - random_state=random_state).fit(mat) - - cluster_data = process_clustering_result(clustering, subreddits) - output = Path(output) - output.parent.mkdir(parents=True,exist_ok=True) - cluster_data.to_feather(output) - print(f"saved {output}") - return clustering - - -def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False): - if name is None: - name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}" - print(name) - sys.stdout.flush() - outpath = outdir / (str(name) + ".feather") - outpath.parent.mkdir(parents=True,exist_ok=True) - print(outpath) - clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose) - cluster_data = process_clustering_result(clustering, subreddits) - mat = sim_to_dist(clustering.affinity_matrix_) - - try: - score = silhouette_score(mat, clustering.labels_, metric='precomputed') - except ValueError: - score = None - - if alt_mat is not None: - alt_distances = sim_to_dist(alt_mat) - try: - alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed') - except ValueError: - alt_score = None +@dataclass +class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin): + pass + +class affinity_job(clustering_job): + def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): + super().__init__(infile, + outpath, + name, + call=self._affinity_clustering, + preference_quantile=preference_quantile, + damping=damping, + max_iter=max_iter, + convergence_iter=convergence_iter, + random_state=1968, + verbose=verbose) + self.damping=damping + self.max_iter=max_iter + self.convergence_iter=convergence_iter + self.preference_quantile=preference_quantile + + def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs): + mat = 1-mat + preference = np.quantile(mat, preference_quantile) + self.preference = preference + print(f"preference is {preference}") + print("data loaded") + sys.stdout.flush() + clustering = AffinityPropagation(*args, + preference=preference, + affinity='precomputed', + copy=False, + **kwargs).fit(mat) + return clustering + + def get_info(self): + result = super().get_info() + self.result=affinity_clustering_result(**result.__dict__, + damping=self.damping, + max_iter=self.max_iter, + convergence_iter=self.convergence_iter, + preference_quantile=self.preference_quantile, + preference=self.preference) + + return self.result + +class affinity_lsi_job(affinity_job, lsi_mixin): + def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs): + super().__init__(infile, + outpath, + name, + *args, + **kwargs) + super().set_lsi_dims(lsi_dims) + + def get_info(self): + result = super().get_info() + self.result = affinity_clustering_result_lsi(**result.__dict__, + lsi_dimensions=self.lsi_dims) + return self.result + +class affinity_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + *args, + **kwargs): + + super().__init__(affinity_job, + _afffinity_grid_sweep, + inpath, + outpath, + self.namer, + *args, + **kwargs) + def namer(self, + damping, + max_iter, + convergence_iter, + preference_quantile): + + return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}" + +class _affinity_lsi_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + lsi_dim, + *args, + **kwargs): + self.lsi_dim = lsi_dim + self.jobtype = affinity_lsi_job + super().__init__(self.jobtype, + inpath, + outpath, + self.namer, + self.lsi_dim, + *args, + **kwargs) + + def namer(self, *args, **kwargs): + s = affinity_grid_sweep.namer(self, *args[1:], **kwargs) + s += f"_lsi-{self.lsi_dim}" + return s + +class affinity_lsi_grid_sweep(lsi_grid_sweep): + def __init__(self, + inpath, + lsi_dims, + outpath, + dampings=[0.9], + max_iters=[10000], + convergence_iters=[30], + preference_quantiles=[0.5]): + + super().__init__(affinity_lsi_job, + _affinity_lsi_grid_sweep, + inpath, + lsi_dims, + outpath, + dampings, + max_iters, + convergence_iters, + preference_quantiles) - res = affinity_clustering_result(outpath=outpath, - damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - preference_quantile=preference_quantile, - silhouette_score=score, - alt_silhouette_score=score, - name=str(name)) - - return res - -# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering). - -def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None): - - damping = list(map(float,damping)) - convergence_iter = convergence_iter = list(map(int,convergence_iter)) - preference_quantile = list(map(float,preference_quantile)) - - if type(outdir) is str: - outdir = Path(outdir) - - outdir.mkdir(parents=True,exist_ok=True) - - subreddits, mat = read_similarity_mat(similarities,use_threads=True) - - if alt_similarities is not None: - alt_mat = read_similarity_mat(alt_similarities,use_threads=True) - else: - alt_mat = None - - if J is None: - J = cpu_count() - pool = Pool(J) - - # get list of tuples: the combinations of hyperparameters - hyper_grid = product(damping, convergence_iter, preference_quantile) - hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid)) - - _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat) + + +def test_select_affinity_clustering(): + # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", + # "test_hdbscan_author30k", + # min_cluster_sizes=[2], + # min_samples=[1,2], + # cluster_selection_epsilons=[0,0.05,0.1,0.15], + # cluster_selection_methods=['eom','leaf'], + # lsi_dimensions='all') + inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/" + outpath = "test_affinity"; + dampings=[0.8,0.9] + max_iters=[100000] + convergence_iters=[15] + preference_quantiles=[0.5,0.7] + + gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles) + gs.run(20) + gs.save("test_affinity/lsi_sweep.csv") - # similarities = Array('d', mat) - # call pool.starmap - print("running clustering selection") - clustering_data = pool.starmap(_do_clustering, hyper_grid) - clustering_data = pd.DataFrame(list(clustering_data)) - clustering_data.to_csv(outinfo) - return clustering_data if __name__ == "__main__": - x = fire.Fire(select_affinity_clustering) + fire.Fire{'grid_sweep':affinity_grid_sweep, + 'grid_sweep_lsi':affinity_lsi_grid_sweep + 'cluster':affinity_job, + 'cluster_lsi':affinity_lsi_job}