From: Nate E TeBlunthuis Date: Sat, 8 May 2021 05:33:26 +0000 (-0700) Subject: refactor clustring in object oriented style X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/f05cb962e0388feaf38aaf84f222696ab8f5f398?ds=inline refactor clustring in object oriented style --- diff --git a/clustering/affinity_clustering.py b/clustering/affinity_clustering.py index 287f7e2..b4f8461 100644 --- a/clustering/affinity_clustering.py +++ b/clustering/affinity_clustering.py @@ -2,7 +2,8 @@ from sklearn.metrics import silhouette_score from sklearn.cluster import AffinityPropagation from functools import partial from dataclasses import dataclass -from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result +from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat +from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep from multiprocessing import Pool, cpu_count, Array, Process from pathlib import Path from itertools import product, starmap @@ -17,116 +18,158 @@ class affinity_clustering_result(clustering_result): damping:float convergence_iter:int preference_quantile:float + preference:float + max_iter:int -def affinity_clustering(similarities, output, *args, **kwargs): - subreddits, mat = read_similarity_mat(similarities) - clustering = _affinity_clustering(mat, *args, **kwargs) - cluster_data = process_clustering_result(clustering, subreddits) - cluster_data['algorithm'] = 'affinity' - return(cluster_data) - -def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): - ''' - similarities: matrix of similarity scores - preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits. - damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. - ''' - print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}") - - preference = np.quantile(mat,preference_quantile) - - print(f"preference is {preference}") - print("data loaded") - sys.stdout.flush() - clustering = AffinityPropagation(damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - copy=False, - preference=preference, - affinity='precomputed', - verbose=verbose, - random_state=random_state).fit(mat) - - cluster_data = process_clustering_result(clustering, subreddits) - output = Path(output) - output.parent.mkdir(parents=True,exist_ok=True) - cluster_data.to_feather(output) - print(f"saved {output}") - return clustering - - -def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False): - if name is None: - name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}" - print(name) - sys.stdout.flush() - outpath = outdir / (str(name) + ".feather") - outpath.parent.mkdir(parents=True,exist_ok=True) - print(outpath) - clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose) - cluster_data = process_clustering_result(clustering, subreddits) - mat = sim_to_dist(clustering.affinity_matrix_) - - try: - score = silhouette_score(mat, clustering.labels_, metric='precomputed') - except ValueError: - score = None - - if alt_mat is not None: - alt_distances = sim_to_dist(alt_mat) - try: - alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed') - except ValueError: - alt_score = None +@dataclass +class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin): + pass + +class affinity_job(clustering_job): + def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True): + super().__init__(infile, + outpath, + name, + call=self._affinity_clustering, + preference_quantile=preference_quantile, + damping=damping, + max_iter=max_iter, + convergence_iter=convergence_iter, + random_state=1968, + verbose=verbose) + self.damping=damping + self.max_iter=max_iter + self.convergence_iter=convergence_iter + self.preference_quantile=preference_quantile + + def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs): + mat = 1-mat + preference = np.quantile(mat, preference_quantile) + self.preference = preference + print(f"preference is {preference}") + print("data loaded") + sys.stdout.flush() + clustering = AffinityPropagation(*args, + preference=preference, + affinity='precomputed', + copy=False, + **kwargs).fit(mat) + return clustering + + def get_info(self): + result = super().get_info() + self.result=affinity_clustering_result(**result.__dict__, + damping=self.damping, + max_iter=self.max_iter, + convergence_iter=self.convergence_iter, + preference_quantile=self.preference_quantile, + preference=self.preference) + + return self.result + +class affinity_lsi_job(affinity_job, lsi_mixin): + def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs): + super().__init__(infile, + outpath, + name, + *args, + **kwargs) + super().set_lsi_dims(lsi_dims) + + def get_info(self): + result = super().get_info() + self.result = affinity_clustering_result_lsi(**result.__dict__, + lsi_dimensions=self.lsi_dims) + return self.result + +class affinity_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + *args, + **kwargs): + + super().__init__(affinity_job, + _afffinity_grid_sweep, + inpath, + outpath, + self.namer, + *args, + **kwargs) + def namer(self, + damping, + max_iter, + convergence_iter, + preference_quantile): + + return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}" + +class _affinity_lsi_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + lsi_dim, + *args, + **kwargs): + self.lsi_dim = lsi_dim + self.jobtype = affinity_lsi_job + super().__init__(self.jobtype, + inpath, + outpath, + self.namer, + self.lsi_dim, + *args, + **kwargs) + + def namer(self, *args, **kwargs): + s = affinity_grid_sweep.namer(self, *args[1:], **kwargs) + s += f"_lsi-{self.lsi_dim}" + return s + +class affinity_lsi_grid_sweep(lsi_grid_sweep): + def __init__(self, + inpath, + lsi_dims, + outpath, + dampings=[0.9], + max_iters=[10000], + convergence_iters=[30], + preference_quantiles=[0.5]): + + super().__init__(affinity_lsi_job, + _affinity_lsi_grid_sweep, + inpath, + lsi_dims, + outpath, + dampings, + max_iters, + convergence_iters, + preference_quantiles) - res = affinity_clustering_result(outpath=outpath, - damping=damping, - max_iter=max_iter, - convergence_iter=convergence_iter, - preference_quantile=preference_quantile, - silhouette_score=score, - alt_silhouette_score=score, - name=str(name)) - - return res - -# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering). - -def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None): - - damping = list(map(float,damping)) - convergence_iter = convergence_iter = list(map(int,convergence_iter)) - preference_quantile = list(map(float,preference_quantile)) - - if type(outdir) is str: - outdir = Path(outdir) - - outdir.mkdir(parents=True,exist_ok=True) - - subreddits, mat = read_similarity_mat(similarities,use_threads=True) - - if alt_similarities is not None: - alt_mat = read_similarity_mat(alt_similarities,use_threads=True) - else: - alt_mat = None - - if J is None: - J = cpu_count() - pool = Pool(J) - - # get list of tuples: the combinations of hyperparameters - hyper_grid = product(damping, convergence_iter, preference_quantile) - hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid)) - - _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat) + + +def test_select_affinity_clustering(): + # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", + # "test_hdbscan_author30k", + # min_cluster_sizes=[2], + # min_samples=[1,2], + # cluster_selection_epsilons=[0,0.05,0.1,0.15], + # cluster_selection_methods=['eom','leaf'], + # lsi_dimensions='all') + inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/" + outpath = "test_affinity"; + dampings=[0.8,0.9] + max_iters=[100000] + convergence_iters=[15] + preference_quantiles=[0.5,0.7] + + gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles) + gs.run(20) + gs.save("test_affinity/lsi_sweep.csv") - # similarities = Array('d', mat) - # call pool.starmap - print("running clustering selection") - clustering_data = pool.starmap(_do_clustering, hyper_grid) - clustering_data = pd.DataFrame(list(clustering_data)) - clustering_data.to_csv(outinfo) - return clustering_data if __name__ == "__main__": - x = fire.Fire(select_affinity_clustering) + fire.Fire{'grid_sweep':affinity_grid_sweep, + 'grid_sweep_lsi':affinity_lsi_grid_sweep + 'cluster':affinity_job, + 'cluster_lsi':affinity_lsi_job} diff --git a/clustering/clustering_base.py b/clustering/clustering_base.py index 1d86438..5492415 100644 --- a/clustering/clustering_base.py +++ b/clustering/clustering_base.py @@ -2,6 +2,9 @@ from pathlib import Path import numpy as np import pandas as pd from dataclasses import dataclass +from sklearn.metrics import silhouette_score, silhouette_samples +from itertools import product, chain +from multiprocessing import Pool, cpu_count def sim_to_dist(mat): dist = 1-mat @@ -9,41 +12,147 @@ def sim_to_dist(mat): np.fill_diagonal(dist,0) return dist -def process_clustering_result(clustering, subreddits): +class grid_sweep: + def __init__(self, jobtype, inpath, outpath, namer, *args): + self.jobtype = jobtype + self.namer = namer + grid = list(product(*args)) + inpath = Path(inpath) + outpath = Path(outpath) + self.hasrun = False + self.grid = [(inpath,outpath,namer(*g)) + g for g in grid] + self.jobs = [jobtype(*g) for g in self.grid] - if hasattr(clustering,'n_iter_'): - print(f"clustering took {clustering.n_iter_} iterations") + def run(self, cores=20): + if cores is not None and cores > 1: + with Pool(cores) as pool: + infos = pool.map(self.jobtype.get_info, self.jobs) + else: + infos = map(self.jobtype.get_info, self.jobs) - clusters = clustering.labels_ + self.infos = pd.DataFrame(infos) + self.hasrun = True - print(f"found {len(set(clusters))} clusters") + def save(self, outcsv): + if not self.hasrun: + self.run() + outcsv = Path(outcsv) + outcsv.parent.mkdir(parents=True, exist_ok=True) + self.infos.to_csv(outcsv) - cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) - cluster_sizes = cluster_data.groupby("cluster").count().reset_index() - print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") +class lsi_grid_sweep(grid_sweep): + def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs): + self.jobtype = jobtype + self.subsweep = subsweep + inpath = Path(inpath) + if lsi_dimensions == 'all': + lsi_paths = list(inpath.glob("*")) + else: + lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] - print(f"the median cluster has {cluster_sizes.subreddit.median()} members") + lsi_nums = [p.stem for p in lsi_paths] + self.hasrun = False + self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)] + self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids))) - print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member") - print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True) +# this is meant to be an interface, not created directly +class clustering_job: + def __init__(self, infile, outpath, name, call, *args, **kwargs): + self.outpath = Path(outpath) + self.call = call + self.args = args + self.kwargs = kwargs + self.infile = Path(infile) + self.name = name + self.hasrun = False - return cluster_data + def run(self): + self.subreddits, self.mat = self.read_distance_mat(self.infile) + self.clustering = self.call(self.mat, *self.args, **self.kwargs) + self.cluster_data = self.process_clustering(self.clustering, self.subreddits) + self.score = self.silhouette() + self.outpath.mkdir(parents=True, exist_ok=True) + self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + self.hasrun = True + + def get_info(self): + if not self.hasrun: + self.run() + self.result = clustering_result(outpath=str(self.outpath.resolve()), + silhouette_score=self.score, + name=self.name, + n_clusters=self.n_clusters, + n_isolates=self.n_isolates, + silhouette_samples = str(self.silsampout.resolve()) + ) + return self.result + + def silhouette(self): + isolates = self.clustering.labels_ == -1 + scoremat = self.mat[~isolates][:,~isolates] + score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed') + silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed') + silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp}) + self.outpath.mkdir(parents=True, exist_ok=True) + self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather") + silhouette_samp.to_feather(self.silsampout) + return score + + def read_distance_mat(self, similarities, use_threads=True): + df = pd.read_feather(similarities, use_threads=use_threads) + mat = np.array(df.drop('_subreddit',1)) + n = mat.shape[0] + mat[range(n),range(n)] = 1 + return (df._subreddit,1-mat) + + def process_clustering(self, clustering, subreddits): + + if hasattr(clustering,'n_iter_'): + print(f"clustering took {clustering.n_iter_} iterations") + + clusters = clustering.labels_ + self.n_clusters = len(set(clusters)) + + print(f"found {self.n_clusters} clusters") + + cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) + + cluster_sizes = cluster_data.groupby("cluster").count().reset_index() + print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") + + print(f"the median cluster has {cluster_sizes.subreddit.median()} members") + n_isolates1 = (cluster_sizes.subreddit==1).sum() + + print(f"{n_isolates1} clusters have 1 member") + + n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']]) + + print(f"{n_isolates2} subreddits are in cluster -1",flush=True) + + if n_isolates1 == 0: + self.n_isolates = n_isolates2 + else: + self.n_isolates = n_isolates1 + + return cluster_data + + +class lsi_mixin(): + def set_lsi_dims(self, lsi_dims): + self.lsi_dims = lsi_dims @dataclass class clustering_result: outpath:Path - max_iter:int silhouette_score:float - alt_silhouette_score:float name:str n_clusters:int + n_isolates:int + silhouette_samples:str -def read_similarity_mat(similarities, use_threads=True): - df = pd.read_feather(similarities, use_threads=use_threads) - mat = np.array(df.drop('_subreddit',1)) - n = mat.shape[0] - mat[range(n),range(n)] = 1 - return (df._subreddit,mat) +@dataclass +class lsi_result_mixin: + lsi_dimensions:int diff --git a/clustering/hdbscan_clustering.py b/clustering/hdbscan_clustering.py index 4f4e0d6..f0ee703 100644 --- a/clustering/hdbscan_clustering.py +++ b/clustering/hdbscan_clustering.py @@ -1,10 +1,11 @@ from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat +from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep from dataclasses import dataclass import hdbscan from sklearn.neighbors import NearestNeighbors import plotnine as pn import numpy as np -from itertools import product, starmap +from itertools import product, starmap, chain import pandas as pd from sklearn.metrics import silhouette_score, silhouette_samples from pathlib import Path @@ -13,27 +14,88 @@ import fire from pyarrow.feather import write_feather def test_select_hdbscan_clustering(): - select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", - "test_hdbscan_author30k", - min_cluster_sizes=[2], - min_samples=[1,2], - cluster_selection_epsilons=[0,0.05,0.1,0.15], - cluster_selection_methods=['eom','leaf'], - lsi_dimensions='all') - inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI" + # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", + # "test_hdbscan_author30k", + # min_cluster_sizes=[2], + # min_samples=[1,2], + # cluster_selection_epsilons=[0,0.05,0.1,0.15], + # cluster_selection_methods=['eom','leaf'], + # lsi_dimensions='all') + inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/" outpath = "test_hdbscan"; min_cluster_sizes=[2,3,4]; min_samples=[1,2,3]; cluster_selection_epsilons=[0,0.1,0.3,0.5]; cluster_selection_methods=['eom']; lsi_dimensions='all' + gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods) + gs.run(20) + gs.save("test_hdbscan/lsi_sweep.csv") + # job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom') + # job1.run() + # print(job1.get_info()) - df = pd.read_csv("test_hdbscan/selection_data.csv") - test_select_hdbscan_clustering() - check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather") - silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather") - c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering) + # df = pd.read_csv("test_hdbscan/selection_data.csv") + # test_select_hdbscan_clustering() + # check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather") + # silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather") + # c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering) +class hdbscan_lsi_grid_sweep(lsi_grid_sweep): + def __init__(self, + inpath, + lsi_dims, + outpath, + min_cluster_sizes, + min_samples, + cluster_selection_epsilons, + cluster_selection_methods + ): + + super().__init__(hdbscan_lsi_job, + _hdbscan_lsi_grid_sweep, + inpath, + lsi_dims, + outpath, + min_cluster_sizes, + min_samples, + cluster_selection_epsilons, + cluster_selection_methods) + +class hdbscan_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + *args, + **kwargs): + + super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs) + + def namer(self, + min_cluster_size, + min_samples, + cluster_selection_epsilon, + cluster_selection_method): + return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}" + + +class _hdbscan_lsi_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + lsi_dim, + *args, + **kwargs): + + self.lsi_dim = lsi_dim + self.jobtype = hdbscan_lsi_job + super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs) + + + def namer(self, *args, **kwargs): + s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs) + s += f"_lsi-{self.lsi_dim}" + return s @dataclass class hdbscan_clustering_result(clustering_result): @@ -41,107 +103,166 @@ class hdbscan_clustering_result(clustering_result): min_samples:int cluster_selection_epsilon:float cluster_selection_method:str - lsi_dimensions:int - n_isolates:int - silhouette_samples:str - -def select_hdbscan_clustering(inpath, - outpath, - outfile=None, - min_cluster_sizes=[2], - min_samples=[1], - cluster_selection_epsilons=[0], - cluster_selection_methods=['eom'], - lsi_dimensions='all' - ): - - inpath = Path(inpath) - outpath = Path(outpath) - outpath.mkdir(exist_ok=True, parents=True) + +@dataclass +class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin): + pass + +class hdbscan_job(clustering_job): + def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'): + super().__init__(infile, + outpath, + name, + call=hdbscan_job._hdbscan_clustering, + min_cluster_size=min_cluster_size, + min_samples=min_samples, + cluster_selection_epsilon=cluster_selection_epsilon, + cluster_selection_method=cluster_selection_method + ) + + self.min_cluster_size = min_cluster_size + self.min_samples = min_samples + self.cluster_selection_epsilon = cluster_selection_epsilon + self.cluster_selection_method = cluster_selection_method +# self.mat = 1 - self.mat + + def _hdbscan_clustering(mat, *args, **kwargs): + print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}") + print(mat) + clusterer = hdbscan.HDBSCAN(metric='precomputed', + core_dist_n_jobs=cpu_count(), + *args, + **kwargs, + ) - if lsi_dimensions == 'all': - lsi_paths = list(inpath.glob("*")) + clustering = clusterer.fit(mat.astype('double')) + + return(clustering) + + def get_info(self): + result = super().get_info() + self.result = hdbscan_clustering_result(**result.__dict__, + min_cluster_size=self.min_cluster_size, + min_samples=self.min_samples, + cluster_selection_epsilon=self.cluster_selection_epsilon, + cluster_selection_method=self.cluster_selection_method) + return self.result + +class hdbscan_lsi_job(hdbscan_job, lsi_mixin): + def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs): + super().__init__( + infile, + outpath, + name, + *args, + **kwargs) + super().set_lsi_dims(lsi_dims) + + def get_info(self): + partial_result = super().get_info() + self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__, + lsi_dimensions=self.lsi_dims) + return self.result + +# def select_hdbscan_clustering(inpath, +# outpath, +# outfile=None, +# min_cluster_sizes=[2], +# min_samples=[1], +# cluster_selection_epsilons=[0], +# cluster_selection_methods=['eom'], +# lsi_dimensions='all' +# ): + +# inpath = Path(inpath) +# outpath = Path(outpath) +# outpath.mkdir(exist_ok=True, parents=True) + +# if lsi_dimensions is None: +# lsi_paths = [inpath] +# elif lsi_dimensions == 'all': +# lsi_paths = list(inpath.glob("*")) - else: - lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] +# else: +# lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] - lsi_nums = [p.stem for p in lsi_paths] - grid = list(product(lsi_nums, - min_cluster_sizes, - min_samples, - cluster_selection_epsilons, - cluster_selection_methods)) +# if lsi_dimensions is not None: +# lsi_nums = [p.stem for p in lsi_paths] +# else: +# lsi_nums = [None] +# grid = list(product(lsi_nums, +# min_cluster_sizes, +# min_samples, +# cluster_selection_epsilons, +# cluster_selection_methods)) - # fix the output file names - names = list(map(lambda t:'_'.join(map(str,t)),grid)) +# # fix the output file names +# names = list(map(lambda t:'_'.join(map(str,t)),grid)) - grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)] +# grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)] - with Pool(int(cpu_count()/4)) as pool: - mods = starmap(hdbscan_clustering, grid) +# with Pool(int(cpu_count()/4)) as pool: +# mods = starmap(hdbscan_clustering, grid) - res = pd.DataFrame(mods) - if outfile is None: - outfile = outpath / "selection_data.csv" +# res = pd.DataFrame(mods) +# if outfile is None: +# outfile = outpath / "selection_data.csv" - res.to_csv(outfile) +# res.to_csv(outfile) -def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'): - subreddits, mat = read_similarity_mat(similarities) - mat = sim_to_dist(mat) - clustering = _hdbscan_clustering(mat, - min_cluster_size=min_cluster_size, - min_samples=min_samples, - cluster_selection_epsilon=cluster_selection_epsilon, - cluster_selection_method=cluster_selection_method, - metric='precomputed', - core_dist_n_jobs=cpu_count() - ) - - cluster_data = process_clustering_result(clustering, subreddits) - isolates = clustering.labels_ == -1 - scoremat = mat[~isolates][:,~isolates] - score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed') - cluster_data.to_feather(output) - - silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed') - silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp}) - silsampout = output.parent / ("silhouette_samples" + output.name) - silhouette_samp.to_feather(silsampout) - - result = hdbscan_clustering_result(outpath=output, - max_iter=None, - silhouette_samples=silsampout, - silhouette_score=score, - alt_silhouette_score=score, - name=name, - min_cluster_size=min_cluster_size, - min_samples=min_samples, - cluster_selection_epsilon=cluster_selection_epsilon, - cluster_selection_method=cluster_selection_method, - lsi_dimensions=lsi_dim, - n_isolates=isolates.sum(), - n_clusters=len(set(clustering.labels_)) - ) +# def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'): +# subreddits, mat = read_similarity_mat(similarities) +# mat = sim_to_dist(mat) +# clustering = _hdbscan_clustering(mat, +# min_cluster_size=min_cluster_size, +# min_samples=min_samples, +# cluster_selection_epsilon=cluster_selection_epsilon, +# cluster_selection_method=cluster_selection_method, +# metric='precomputed', +# core_dist_n_jobs=cpu_count() +# ) + +# cluster_data = process_clustering_result(clustering, subreddits) +# isolates = clustering.labels_ == -1 +# scoremat = mat[~isolates][:,~isolates] +# score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed') +# cluster_data.to_feather(output) +# silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed') +# silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp}) +# silsampout = output.parent / ("silhouette_samples" + output.name) +# silhouette_samp.to_feather(silsampout) + +# result = hdbscan_clustering_result(outpath=output, +# silhouette_samples=silsampout, +# silhouette_score=score, +# name=name, +# min_cluster_size=min_cluster_size, +# min_samples=min_samples, +# cluster_selection_epsilon=cluster_selection_epsilon, +# cluster_selection_method=cluster_selection_method, +# lsi_dimensions=lsi_dim, +# n_isolates=isolates.sum(), +# n_clusters=len(set(clustering.labels_)) +# ) - return(result) - -# for all runs we should try cluster_selection_epsilon = None -# for terms we should try cluster_selection_epsilon around 0.56-0.66 -# for authors we should try cluster_selection_epsilon around 0.98-0.99 -def _hdbscan_clustering(mat, *args, **kwargs): - print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}") - - print(mat) - clusterer = hdbscan.HDBSCAN(*args, - **kwargs, - ) +# return(result) + +# # for all runs we should try cluster_selection_epsilon = None +# # for terms we should try cluster_selection_epsilon around 0.56-0.66 +# # for authors we should try cluster_selection_epsilon around 0.98-0.99 +# def _hdbscan_clustering(mat, *args, **kwargs): +# print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}") + +# print(mat) +# clusterer = hdbscan.HDBSCAN(*args, +# **kwargs, +# ) - clustering = clusterer.fit(mat.astype('double')) +# clustering = clusterer.fit(mat.astype('double')) - return(clustering) +# return(clustering) def KNN_distances_plot(mat,outname,k=2): nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat) @@ -172,4 +293,10 @@ def make_KNN_plots(): KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png') if __name__ == "__main__": - fire.Fire(select_hdbscan_clustering) + fire.Fire{'grid_sweep':hdbscan_grid_sweep, + 'grid_sweep_lsi':hdbscan_lsi_grid_sweep + 'cluster':hdbscan_job, + 'cluster_lsi':hdbscan_lsi_job} + +# test_select_hdbscan_clustering() + #fire.Fire(select_hdbscan_clustering) diff --git a/clustering/kmeans_clustering.py b/clustering/kmeans_clustering.py index 8822e9f..e41b88b 100644 --- a/clustering/kmeans_clustering.py +++ b/clustering/kmeans_clustering.py @@ -4,98 +4,145 @@ from pathlib import Path from multiprocessing import cpu_count from dataclasses import dataclass from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat +from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep + @dataclass class kmeans_clustering_result(clustering_result): n_clusters:int n_init:int + max_iter:int -def kmeans_clustering(similarities, *args, **kwargs): - subreddits, mat = read_similarity_mat(similarities) - mat = sim_to_dist(mat) - clustering = _kmeans_clustering(mat, *args, **kwargs) - cluster_data = process_clustering_result(clustering, subreddits) - return(cluster_data) - -def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True): - - clustering = KMeans(n_clusters=n_clusters, - n_init=n_init, - max_iter=max_iter, - random_state=random_state, - verbose=verbose - ).fit(mat) - - return clustering - -def do_clustering(n_clusters, n_init, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False): - if name is None: - name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}" - print(name) - sys.stdout.flush() - outpath = outdir / (str(name) + ".feather") - print(outpath) - mat = sim_to_dist(mat) - clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose) - - outpath.parent.mkdir(parents=True,exist_ok=True) - cluster_data.to_feather(outpath) - cluster_data = process_clustering_result(clustering, subreddits) - - try: - score = silhouette_score(mat, clustering.labels_, metric='precomputed') - except ValueError: - score = None - - if alt_mat is not None: - alt_distances = sim_to_dist(alt_mat) - try: - alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed') - except ValueError: - alt_score = None +@dataclass +class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin): + pass + +class kmeans_job(clustering_job): + def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True): + super().__init__(infile, + outpath, + name, + call=kmeans_job._kmeans_clustering, + n_clusters=n_clusters, + n_init=n_init, + max_iter=max_iter, + random_state=random_state, + verbose=verbose) + + self.n_clusters=n_clusters + self.n_init=n_init + self.max_iter=max_iter + + def _kmeans_clustering(mat, *args, **kwargs): + + clustering = KMeans(*args, + **kwargs, + ).fit(mat) + + return clustering + + + def get_info(self): + result = super().get_info() + self.result = kmeans_clustering_result(**result.__dict__, + n_init=n_init, + max_iter=max_iter) + return self.result + + +class kmeans_lsi_job(kmeans_job, lsi_mixin): + def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs): + super().__init__(infile, + outpath, + name, + *args, + **kwargs) + super().set_lsi_dims(lsi_dims) + + def get_info(self): + result = super().get_info() + self.result = kmeans_clustering_result_lsi(**result.__dict__, + lsi_dimensions=self.lsi_dims) + return self.result - res = kmeans_clustering_result(outpath=outpath, - max_iter=max_iter, - n_clusters=n_clusters, - n_init = n_init, - silhouette_score=score, - alt_silhouette_score=score, - name=str(name)) - - return res - - -# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering). -def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None): - - n_clusters = list(map(int,n_clusters)) - n_init = list(map(int,n_init)) - - if type(outdir) is str: - outdir = Path(outdir) - outdir.mkdir(parents=True,exist_ok=True) +class kmeans_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + *args, + **kwargs): + super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs) + + def namer(self, + n_clusters, + n_init, + max_iter): + return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}" + +class _kmeans_lsi_grid_sweep(grid_sweep): + def __init__(self, + inpath, + outpath, + lsi_dim, + *args, + **kwargs): + self.lsi_dim = lsi_dim + self.jobtype = kmeans_lsi_job + super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs) + + def namer(self, *args, **kwargs): + s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs) + s += f"_lsi-{self.lsi_dim}" + return s + +class kmeans_lsi_grid_sweep(lsi_grid_sweep): + def __init__(self, + inpath, + lsi_dims, + outpath, + n_clusters, + n_inits, + max_iters + ): + + super().__init__(kmeans_lsi_job, + _kmeans_lsi_grid_sweep, + inpath, + lsi_dims, + outpath, + n_clusters, + n_inits, + max_iters) + +def test_select_kmeans_clustering(): + # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI", + # "test_hdbscan_author30k", + # min_cluster_sizes=[2], + # min_samples=[1,2], + # cluster_selection_epsilons=[0,0.05,0.1,0.15], + # cluster_selection_methods=['eom','leaf'], + # lsi_dimensions='all') + inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/" + outpath = "test_kmeans"; + n_clusters=[200,300,400]; + n_init=[1,2,3]; + max_iter=[100000] + + gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter) + gs.run(1) + + cluster_selection_epsilons=[0,0.1,0.3,0.5]; + cluster_selection_methods=['eom']; + lsi_dimensions='all' + gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods) + gs.run(20) + gs.save("test_hdbscan/lsi_sweep.csv") - subreddits, mat = read_similarity_mat(similarities,use_threads=True) - - if alt_similarities is not None: - alt_mat = read_similarity_mat(alt_similarities,use_threads=True) - else: - alt_mat = None - - # get list of tuples: the combinations of hyperparameters - hyper_grid = product(n_clusters, n_init) - hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid)) - - _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat) - - # call starmap - print("running clustering selection") - clustering_data = starmap(_do_clustering, hyper_grid) - clustering_data = pd.DataFrame(list(clustering_data)) - clustering_data.to_csv(outinfo) - - return clustering_data if __name__ == "__main__": - x = fire.Fire(select_kmeans_clustering) + + fire.Fire{'grid_sweep':kmeans_grid_sweep, + 'grid_sweep_lsi':kmeans_lsi_grid_sweep + 'cluster':kmeans_job, + 'cluster_lsi':kmeans_lsi_job}