from sklearn.cluster import AffinityPropagation
from functools import partial
from dataclasses import dataclass
-from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
+from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
damping:float
convergence_iter:int
preference_quantile:float
+ preference:float
+ max_iter:int
-def affinity_clustering(similarities, output, *args, **kwargs):
- subreddits, mat = read_similarity_mat(similarities)
- clustering = _affinity_clustering(mat, *args, **kwargs)
- cluster_data = process_clustering_result(clustering, subreddits)
- cluster_data['algorithm'] = 'affinity'
- return(cluster_data)
-
-def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
- '''
- similarities: matrix of similarity scores
- preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
- damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
- '''
- print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
-
- preference = np.quantile(mat,preference_quantile)
-
- print(f"preference is {preference}")
- print("data loaded")
- sys.stdout.flush()
- clustering = AffinityPropagation(damping=damping,
- max_iter=max_iter,
- convergence_iter=convergence_iter,
- copy=False,
- preference=preference,
- affinity='precomputed',
- verbose=verbose,
- random_state=random_state).fit(mat)
-
- cluster_data = process_clustering_result(clustering, subreddits)
- output = Path(output)
- output.parent.mkdir(parents=True,exist_ok=True)
- cluster_data.to_feather(output)
- print(f"saved {output}")
- return clustering
-
-
-def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
- if name is None:
- name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
- print(name)
- sys.stdout.flush()
- outpath = outdir / (str(name) + ".feather")
- outpath.parent.mkdir(parents=True,exist_ok=True)
- print(outpath)
- clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
- cluster_data = process_clustering_result(clustering, subreddits)
- mat = sim_to_dist(clustering.affinity_matrix_)
-
- try:
- score = silhouette_score(mat, clustering.labels_, metric='precomputed')
- except ValueError:
- score = None
-
- if alt_mat is not None:
- alt_distances = sim_to_dist(alt_mat)
- try:
- alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
- except ValueError:
- alt_score = None
+@dataclass
+class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
+ pass
+
+class affinity_job(clustering_job):
+ def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
+ super().__init__(infile,
+ outpath,
+ name,
+ call=self._affinity_clustering,
+ preference_quantile=preference_quantile,
+ damping=damping,
+ max_iter=max_iter,
+ convergence_iter=convergence_iter,
+ random_state=1968,
+ verbose=verbose)
+ self.damping=damping
+ self.max_iter=max_iter
+ self.convergence_iter=convergence_iter
+ self.preference_quantile=preference_quantile
+
+ def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
+ mat = 1-mat
+ preference = np.quantile(mat, preference_quantile)
+ self.preference = preference
+ print(f"preference is {preference}")
+ print("data loaded")
+ sys.stdout.flush()
+ clustering = AffinityPropagation(*args,
+ preference=preference,
+ affinity='precomputed',
+ copy=False,
+ **kwargs).fit(mat)
+ return clustering
+
+ def get_info(self):
+ result = super().get_info()
+ self.result=affinity_clustering_result(**result.__dict__,
+ damping=self.damping,
+ max_iter=self.max_iter,
+ convergence_iter=self.convergence_iter,
+ preference_quantile=self.preference_quantile,
+ preference=self.preference)
+
+ return self.result
+
+class affinity_lsi_job(affinity_job, lsi_mixin):
+ def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+ super().__init__(infile,
+ outpath,
+ name,
+ *args,
+ **kwargs)
+ super().set_lsi_dims(lsi_dims)
+
+ def get_info(self):
+ result = super().get_info()
+ self.result = affinity_clustering_result_lsi(**result.__dict__,
+ lsi_dimensions=self.lsi_dims)
+ return self.result
+
+class affinity_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ *args,
+ **kwargs):
+
+ super().__init__(affinity_job,
+ _afffinity_grid_sweep,
+ inpath,
+ outpath,
+ self.namer,
+ *args,
+ **kwargs)
+ def namer(self,
+ damping,
+ max_iter,
+ convergence_iter,
+ preference_quantile):
+
+ return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
+
+class _affinity_lsi_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ lsi_dim,
+ *args,
+ **kwargs):
+ self.lsi_dim = lsi_dim
+ self.jobtype = affinity_lsi_job
+ super().__init__(self.jobtype,
+ inpath,
+ outpath,
+ self.namer,
+ self.lsi_dim,
+ *args,
+ **kwargs)
+
+ def namer(self, *args, **kwargs):
+ s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
+ s += f"_lsi-{self.lsi_dim}"
+ return s
+
+class affinity_lsi_grid_sweep(lsi_grid_sweep):
+ def __init__(self,
+ inpath,
+ lsi_dims,
+ outpath,
+ dampings=[0.9],
+ max_iters=[10000],
+ convergence_iters=[30],
+ preference_quantiles=[0.5]):
+
+ super().__init__(affinity_lsi_job,
+ _affinity_lsi_grid_sweep,
+ inpath,
+ lsi_dims,
+ outpath,
+ dampings,
+ max_iters,
+ convergence_iters,
+ preference_quantiles)
- res = affinity_clustering_result(outpath=outpath,
- damping=damping,
- max_iter=max_iter,
- convergence_iter=convergence_iter,
- preference_quantile=preference_quantile,
- silhouette_score=score,
- alt_silhouette_score=score,
- name=str(name))
-
- return res
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-
-def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
-
- damping = list(map(float,damping))
- convergence_iter = convergence_iter = list(map(int,convergence_iter))
- preference_quantile = list(map(float,preference_quantile))
-
- if type(outdir) is str:
- outdir = Path(outdir)
-
- outdir.mkdir(parents=True,exist_ok=True)
-
- subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
- if alt_similarities is not None:
- alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
- else:
- alt_mat = None
-
- if J is None:
- J = cpu_count()
- pool = Pool(J)
-
- # get list of tuples: the combinations of hyperparameters
- hyper_grid = product(damping, convergence_iter, preference_quantile)
- hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
- _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
+
+
+def test_select_affinity_clustering():
+ # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+ # "test_hdbscan_author30k",
+ # min_cluster_sizes=[2],
+ # min_samples=[1,2],
+ # cluster_selection_epsilons=[0,0.05,0.1,0.15],
+ # cluster_selection_methods=['eom','leaf'],
+ # lsi_dimensions='all')
+ inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+ outpath = "test_affinity";
+ dampings=[0.8,0.9]
+ max_iters=[100000]
+ convergence_iters=[15]
+ preference_quantiles=[0.5,0.7]
+
+ gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
+ gs.run(20)
+ gs.save("test_affinity/lsi_sweep.csv")
- # similarities = Array('d', mat)
- # call pool.starmap
- print("running clustering selection")
- clustering_data = pool.starmap(_do_clustering, hyper_grid)
- clustering_data = pd.DataFrame(list(clustering_data))
- clustering_data.to_csv(outinfo)
- return clustering_data
if __name__ == "__main__":
- x = fire.Fire(select_affinity_clustering)
+ fire.Fire{'grid_sweep':affinity_grid_sweep,
+ 'grid_sweep_lsi':affinity_lsi_grid_sweep
+ 'cluster':affinity_job,
+ 'cluster_lsi':affinity_lsi_job}
import numpy as np
import pandas as pd
from dataclasses import dataclass
+from sklearn.metrics import silhouette_score, silhouette_samples
+from itertools import product, chain
+from multiprocessing import Pool, cpu_count
def sim_to_dist(mat):
dist = 1-mat
np.fill_diagonal(dist,0)
return dist
-def process_clustering_result(clustering, subreddits):
+class grid_sweep:
+ def __init__(self, jobtype, inpath, outpath, namer, *args):
+ self.jobtype = jobtype
+ self.namer = namer
+ grid = list(product(*args))
+ inpath = Path(inpath)
+ outpath = Path(outpath)
+ self.hasrun = False
+ self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
+ self.jobs = [jobtype(*g) for g in self.grid]
- if hasattr(clustering,'n_iter_'):
- print(f"clustering took {clustering.n_iter_} iterations")
+ def run(self, cores=20):
+ if cores is not None and cores > 1:
+ with Pool(cores) as pool:
+ infos = pool.map(self.jobtype.get_info, self.jobs)
+ else:
+ infos = map(self.jobtype.get_info, self.jobs)
- clusters = clustering.labels_
+ self.infos = pd.DataFrame(infos)
+ self.hasrun = True
- print(f"found {len(set(clusters))} clusters")
+ def save(self, outcsv):
+ if not self.hasrun:
+ self.run()
+ outcsv = Path(outcsv)
+ outcsv.parent.mkdir(parents=True, exist_ok=True)
+ self.infos.to_csv(outcsv)
- cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
- cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
- print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+class lsi_grid_sweep(grid_sweep):
+ def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
+ self.jobtype = jobtype
+ self.subsweep = subsweep
+ inpath = Path(inpath)
+ if lsi_dimensions == 'all':
+ lsi_paths = list(inpath.glob("*"))
+ else:
+ lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
- print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+ lsi_nums = [p.stem for p in lsi_paths]
+ self.hasrun = False
+ self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
+ self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
- print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
- print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
+# this is meant to be an interface, not created directly
+class clustering_job:
+ def __init__(self, infile, outpath, name, call, *args, **kwargs):
+ self.outpath = Path(outpath)
+ self.call = call
+ self.args = args
+ self.kwargs = kwargs
+ self.infile = Path(infile)
+ self.name = name
+ self.hasrun = False
- return cluster_data
+ def run(self):
+ self.subreddits, self.mat = self.read_distance_mat(self.infile)
+ self.clustering = self.call(self.mat, *self.args, **self.kwargs)
+ self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
+ self.score = self.silhouette()
+ self.outpath.mkdir(parents=True, exist_ok=True)
+ self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
+ self.hasrun = True
+
+ def get_info(self):
+ if not self.hasrun:
+ self.run()
+ self.result = clustering_result(outpath=str(self.outpath.resolve()),
+ silhouette_score=self.score,
+ name=self.name,
+ n_clusters=self.n_clusters,
+ n_isolates=self.n_isolates,
+ silhouette_samples = str(self.silsampout.resolve())
+ )
+ return self.result
+
+ def silhouette(self):
+ isolates = self.clustering.labels_ == -1
+ scoremat = self.mat[~isolates][:,~isolates]
+ score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
+ silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
+ silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
+ self.outpath.mkdir(parents=True, exist_ok=True)
+ self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather")
+ silhouette_samp.to_feather(self.silsampout)
+ return score
+
+ def read_distance_mat(self, similarities, use_threads=True):
+ df = pd.read_feather(similarities, use_threads=use_threads)
+ mat = np.array(df.drop('_subreddit',1))
+ n = mat.shape[0]
+ mat[range(n),range(n)] = 1
+ return (df._subreddit,1-mat)
+
+ def process_clustering(self, clustering, subreddits):
+
+ if hasattr(clustering,'n_iter_'):
+ print(f"clustering took {clustering.n_iter_} iterations")
+
+ clusters = clustering.labels_
+ self.n_clusters = len(set(clusters))
+
+ print(f"found {self.n_clusters} clusters")
+
+ cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
+
+ cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
+ print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+
+ print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+ n_isolates1 = (cluster_sizes.subreddit==1).sum()
+
+ print(f"{n_isolates1} clusters have 1 member")
+
+ n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])
+
+ print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
+
+ if n_isolates1 == 0:
+ self.n_isolates = n_isolates2
+ else:
+ self.n_isolates = n_isolates1
+
+ return cluster_data
+
+
+class lsi_mixin():
+ def set_lsi_dims(self, lsi_dims):
+ self.lsi_dims = lsi_dims
@dataclass
class clustering_result:
outpath:Path
- max_iter:int
silhouette_score:float
- alt_silhouette_score:float
name:str
n_clusters:int
+ n_isolates:int
+ silhouette_samples:str
-def read_similarity_mat(similarities, use_threads=True):
- df = pd.read_feather(similarities, use_threads=use_threads)
- mat = np.array(df.drop('_subreddit',1))
- n = mat.shape[0]
- mat[range(n),range(n)] = 1
- return (df._subreddit,mat)
+@dataclass
+class lsi_result_mixin:
+ lsi_dimensions:int
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
-from itertools import product, starmap
+from itertools import product, starmap, chain
import pandas as pd
from sklearn.metrics import silhouette_score, silhouette_samples
from pathlib import Path
from pyarrow.feather import write_feather
def test_select_hdbscan_clustering():
- select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
- "test_hdbscan_author30k",
- min_cluster_sizes=[2],
- min_samples=[1,2],
- cluster_selection_epsilons=[0,0.05,0.1,0.15],
- cluster_selection_methods=['eom','leaf'],
- lsi_dimensions='all')
- inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
+ # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+ # "test_hdbscan_author30k",
+ # min_cluster_sizes=[2],
+ # min_samples=[1,2],
+ # cluster_selection_epsilons=[0,0.05,0.1,0.15],
+ # cluster_selection_methods=['eom','leaf'],
+ # lsi_dimensions='all')
+ inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_hdbscan";
min_cluster_sizes=[2,3,4];
min_samples=[1,2,3];
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
+ gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+ gs.run(20)
+ gs.save("test_hdbscan/lsi_sweep.csv")
+ # job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
+ # job1.run()
+ # print(job1.get_info())
- df = pd.read_csv("test_hdbscan/selection_data.csv")
- test_select_hdbscan_clustering()
- check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
- silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
- c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
+ # df = pd.read_csv("test_hdbscan/selection_data.csv")
+ # test_select_hdbscan_clustering()
+ # check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
+ # silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
+ # c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
+class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
+ def __init__(self,
+ inpath,
+ lsi_dims,
+ outpath,
+ min_cluster_sizes,
+ min_samples,
+ cluster_selection_epsilons,
+ cluster_selection_methods
+ ):
+
+ super().__init__(hdbscan_lsi_job,
+ _hdbscan_lsi_grid_sweep,
+ inpath,
+ lsi_dims,
+ outpath,
+ min_cluster_sizes,
+ min_samples,
+ cluster_selection_epsilons,
+ cluster_selection_methods)
+
+class hdbscan_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ *args,
+ **kwargs):
+
+ super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
+
+ def namer(self,
+ min_cluster_size,
+ min_samples,
+ cluster_selection_epsilon,
+ cluster_selection_method):
+ return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
+
+
+class _hdbscan_lsi_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ lsi_dim,
+ *args,
+ **kwargs):
+
+ self.lsi_dim = lsi_dim
+ self.jobtype = hdbscan_lsi_job
+ super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+
+ def namer(self, *args, **kwargs):
+ s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
+ s += f"_lsi-{self.lsi_dim}"
+ return s
@dataclass
class hdbscan_clustering_result(clustering_result):
min_samples:int
cluster_selection_epsilon:float
cluster_selection_method:str
- lsi_dimensions:int
- n_isolates:int
- silhouette_samples:str
-
-def select_hdbscan_clustering(inpath,
- outpath,
- outfile=None,
- min_cluster_sizes=[2],
- min_samples=[1],
- cluster_selection_epsilons=[0],
- cluster_selection_methods=['eom'],
- lsi_dimensions='all'
- ):
-
- inpath = Path(inpath)
- outpath = Path(outpath)
- outpath.mkdir(exist_ok=True, parents=True)
+
+@dataclass
+class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
+ pass
+
+class hdbscan_job(clustering_job):
+ def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+ super().__init__(infile,
+ outpath,
+ name,
+ call=hdbscan_job._hdbscan_clustering,
+ min_cluster_size=min_cluster_size,
+ min_samples=min_samples,
+ cluster_selection_epsilon=cluster_selection_epsilon,
+ cluster_selection_method=cluster_selection_method
+ )
+
+ self.min_cluster_size = min_cluster_size
+ self.min_samples = min_samples
+ self.cluster_selection_epsilon = cluster_selection_epsilon
+ self.cluster_selection_method = cluster_selection_method
+# self.mat = 1 - self.mat
+
+ def _hdbscan_clustering(mat, *args, **kwargs):
+ print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+ print(mat)
+ clusterer = hdbscan.HDBSCAN(metric='precomputed',
+ core_dist_n_jobs=cpu_count(),
+ *args,
+ **kwargs,
+ )
- if lsi_dimensions == 'all':
- lsi_paths = list(inpath.glob("*"))
+ clustering = clusterer.fit(mat.astype('double'))
+
+ return(clustering)
+
+ def get_info(self):
+ result = super().get_info()
+ self.result = hdbscan_clustering_result(**result.__dict__,
+ min_cluster_size=self.min_cluster_size,
+ min_samples=self.min_samples,
+ cluster_selection_epsilon=self.cluster_selection_epsilon,
+ cluster_selection_method=self.cluster_selection_method)
+ return self.result
+
+class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
+ def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+ super().__init__(
+ infile,
+ outpath,
+ name,
+ *args,
+ **kwargs)
+ super().set_lsi_dims(lsi_dims)
+
+ def get_info(self):
+ partial_result = super().get_info()
+ self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
+ lsi_dimensions=self.lsi_dims)
+ return self.result
+
+# def select_hdbscan_clustering(inpath,
+# outpath,
+# outfile=None,
+# min_cluster_sizes=[2],
+# min_samples=[1],
+# cluster_selection_epsilons=[0],
+# cluster_selection_methods=['eom'],
+# lsi_dimensions='all'
+# ):
+
+# inpath = Path(inpath)
+# outpath = Path(outpath)
+# outpath.mkdir(exist_ok=True, parents=True)
+
+# if lsi_dimensions is None:
+# lsi_paths = [inpath]
+# elif lsi_dimensions == 'all':
+# lsi_paths = list(inpath.glob("*"))
- else:
- lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
+# else:
+# lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
- lsi_nums = [p.stem for p in lsi_paths]
- grid = list(product(lsi_nums,
- min_cluster_sizes,
- min_samples,
- cluster_selection_epsilons,
- cluster_selection_methods))
+# if lsi_dimensions is not None:
+# lsi_nums = [p.stem for p in lsi_paths]
+# else:
+# lsi_nums = [None]
+# grid = list(product(lsi_nums,
+# min_cluster_sizes,
+# min_samples,
+# cluster_selection_epsilons,
+# cluster_selection_methods))
- # fix the output file names
- names = list(map(lambda t:'_'.join(map(str,t)),grid))
+# # fix the output file names
+# names = list(map(lambda t:'_'.join(map(str,t)),grid))
- grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
+# grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
- with Pool(int(cpu_count()/4)) as pool:
- mods = starmap(hdbscan_clustering, grid)
+# with Pool(int(cpu_count()/4)) as pool:
+# mods = starmap(hdbscan_clustering, grid)
- res = pd.DataFrame(mods)
- if outfile is None:
- outfile = outpath / "selection_data.csv"
+# res = pd.DataFrame(mods)
+# if outfile is None:
+# outfile = outpath / "selection_data.csv"
- res.to_csv(outfile)
+# res.to_csv(outfile)
-def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
- subreddits, mat = read_similarity_mat(similarities)
- mat = sim_to_dist(mat)
- clustering = _hdbscan_clustering(mat,
- min_cluster_size=min_cluster_size,
- min_samples=min_samples,
- cluster_selection_epsilon=cluster_selection_epsilon,
- cluster_selection_method=cluster_selection_method,
- metric='precomputed',
- core_dist_n_jobs=cpu_count()
- )
-
- cluster_data = process_clustering_result(clustering, subreddits)
- isolates = clustering.labels_ == -1
- scoremat = mat[~isolates][:,~isolates]
- score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
- cluster_data.to_feather(output)
-
- silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
- silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
- silsampout = output.parent / ("silhouette_samples" + output.name)
- silhouette_samp.to_feather(silsampout)
-
- result = hdbscan_clustering_result(outpath=output,
- max_iter=None,
- silhouette_samples=silsampout,
- silhouette_score=score,
- alt_silhouette_score=score,
- name=name,
- min_cluster_size=min_cluster_size,
- min_samples=min_samples,
- cluster_selection_epsilon=cluster_selection_epsilon,
- cluster_selection_method=cluster_selection_method,
- lsi_dimensions=lsi_dim,
- n_isolates=isolates.sum(),
- n_clusters=len(set(clustering.labels_))
- )
+# def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+# subreddits, mat = read_similarity_mat(similarities)
+# mat = sim_to_dist(mat)
+# clustering = _hdbscan_clustering(mat,
+# min_cluster_size=min_cluster_size,
+# min_samples=min_samples,
+# cluster_selection_epsilon=cluster_selection_epsilon,
+# cluster_selection_method=cluster_selection_method,
+# metric='precomputed',
+# core_dist_n_jobs=cpu_count()
+# )
+
+# cluster_data = process_clustering_result(clustering, subreddits)
+# isolates = clustering.labels_ == -1
+# scoremat = mat[~isolates][:,~isolates]
+# score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
+# cluster_data.to_feather(output)
+# silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
+# silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
+# silsampout = output.parent / ("silhouette_samples" + output.name)
+# silhouette_samp.to_feather(silsampout)
+
+# result = hdbscan_clustering_result(outpath=output,
+# silhouette_samples=silsampout,
+# silhouette_score=score,
+# name=name,
+# min_cluster_size=min_cluster_size,
+# min_samples=min_samples,
+# cluster_selection_epsilon=cluster_selection_epsilon,
+# cluster_selection_method=cluster_selection_method,
+# lsi_dimensions=lsi_dim,
+# n_isolates=isolates.sum(),
+# n_clusters=len(set(clustering.labels_))
+# )
- return(result)
-
-# for all runs we should try cluster_selection_epsilon = None
-# for terms we should try cluster_selection_epsilon around 0.56-0.66
-# for authors we should try cluster_selection_epsilon around 0.98-0.99
-def _hdbscan_clustering(mat, *args, **kwargs):
- print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
-
- print(mat)
- clusterer = hdbscan.HDBSCAN(*args,
- **kwargs,
- )
+# return(result)
+
+# # for all runs we should try cluster_selection_epsilon = None
+# # for terms we should try cluster_selection_epsilon around 0.56-0.66
+# # for authors we should try cluster_selection_epsilon around 0.98-0.99
+# def _hdbscan_clustering(mat, *args, **kwargs):
+# print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+
+# print(mat)
+# clusterer = hdbscan.HDBSCAN(*args,
+# **kwargs,
+# )
- clustering = clusterer.fit(mat.astype('double'))
+# clustering = clusterer.fit(mat.astype('double'))
- return(clustering)
+# return(clustering)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
- fire.Fire(select_hdbscan_clustering)
+ fire.Fire{'grid_sweep':hdbscan_grid_sweep,
+ 'grid_sweep_lsi':hdbscan_lsi_grid_sweep
+ 'cluster':hdbscan_job,
+ 'cluster_lsi':hdbscan_lsi_job}
+
+# test_select_hdbscan_clustering()
+ #fire.Fire(select_hdbscan_clustering)
from multiprocessing import cpu_count
from dataclasses import dataclass
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
+
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
+ max_iter:int
-def kmeans_clustering(similarities, *args, **kwargs):
- subreddits, mat = read_similarity_mat(similarities)
- mat = sim_to_dist(mat)
- clustering = _kmeans_clustering(mat, *args, **kwargs)
- cluster_data = process_clustering_result(clustering, subreddits)
- return(cluster_data)
-
-def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
-
- clustering = KMeans(n_clusters=n_clusters,
- n_init=n_init,
- max_iter=max_iter,
- random_state=random_state,
- verbose=verbose
- ).fit(mat)
-
- return clustering
-
-def do_clustering(n_clusters, n_init, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
- if name is None:
- name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
- print(name)
- sys.stdout.flush()
- outpath = outdir / (str(name) + ".feather")
- print(outpath)
- mat = sim_to_dist(mat)
- clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
-
- outpath.parent.mkdir(parents=True,exist_ok=True)
- cluster_data.to_feather(outpath)
- cluster_data = process_clustering_result(clustering, subreddits)
-
- try:
- score = silhouette_score(mat, clustering.labels_, metric='precomputed')
- except ValueError:
- score = None
-
- if alt_mat is not None:
- alt_distances = sim_to_dist(alt_mat)
- try:
- alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
- except ValueError:
- alt_score = None
+@dataclass
+class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
+ pass
+
+class kmeans_job(clustering_job):
+ def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
+ super().__init__(infile,
+ outpath,
+ name,
+ call=kmeans_job._kmeans_clustering,
+ n_clusters=n_clusters,
+ n_init=n_init,
+ max_iter=max_iter,
+ random_state=random_state,
+ verbose=verbose)
+
+ self.n_clusters=n_clusters
+ self.n_init=n_init
+ self.max_iter=max_iter
+
+ def _kmeans_clustering(mat, *args, **kwargs):
+
+ clustering = KMeans(*args,
+ **kwargs,
+ ).fit(mat)
+
+ return clustering
+
+
+ def get_info(self):
+ result = super().get_info()
+ self.result = kmeans_clustering_result(**result.__dict__,
+ n_init=n_init,
+ max_iter=max_iter)
+ return self.result
+
+
+class kmeans_lsi_job(kmeans_job, lsi_mixin):
+ def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+ super().__init__(infile,
+ outpath,
+ name,
+ *args,
+ **kwargs)
+ super().set_lsi_dims(lsi_dims)
+
+ def get_info(self):
+ result = super().get_info()
+ self.result = kmeans_clustering_result_lsi(**result.__dict__,
+ lsi_dimensions=self.lsi_dims)
+ return self.result
- res = kmeans_clustering_result(outpath=outpath,
- max_iter=max_iter,
- n_clusters=n_clusters,
- n_init = n_init,
- silhouette_score=score,
- alt_silhouette_score=score,
- name=str(name))
-
- return res
-
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
-
- n_clusters = list(map(int,n_clusters))
- n_init = list(map(int,n_init))
-
- if type(outdir) is str:
- outdir = Path(outdir)
- outdir.mkdir(parents=True,exist_ok=True)
+class kmeans_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ *args,
+ **kwargs):
+ super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
+
+ def namer(self,
+ n_clusters,
+ n_init,
+ max_iter):
+ return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
+
+class _kmeans_lsi_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ lsi_dim,
+ *args,
+ **kwargs):
+ self.lsi_dim = lsi_dim
+ self.jobtype = kmeans_lsi_job
+ super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+ def namer(self, *args, **kwargs):
+ s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
+ s += f"_lsi-{self.lsi_dim}"
+ return s
+
+class kmeans_lsi_grid_sweep(lsi_grid_sweep):
+ def __init__(self,
+ inpath,
+ lsi_dims,
+ outpath,
+ n_clusters,
+ n_inits,
+ max_iters
+ ):
+
+ super().__init__(kmeans_lsi_job,
+ _kmeans_lsi_grid_sweep,
+ inpath,
+ lsi_dims,
+ outpath,
+ n_clusters,
+ n_inits,
+ max_iters)
+
+def test_select_kmeans_clustering():
+ # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+ # "test_hdbscan_author30k",
+ # min_cluster_sizes=[2],
+ # min_samples=[1,2],
+ # cluster_selection_epsilons=[0,0.05,0.1,0.15],
+ # cluster_selection_methods=['eom','leaf'],
+ # lsi_dimensions='all')
+ inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+ outpath = "test_kmeans";
+ n_clusters=[200,300,400];
+ n_init=[1,2,3];
+ max_iter=[100000]
+
+ gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
+ gs.run(1)
+
+ cluster_selection_epsilons=[0,0.1,0.3,0.5];
+ cluster_selection_methods=['eom'];
+ lsi_dimensions='all'
+ gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+ gs.run(20)
+ gs.save("test_hdbscan/lsi_sweep.csv")
- subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
- if alt_similarities is not None:
- alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
- else:
- alt_mat = None
-
- # get list of tuples: the combinations of hyperparameters
- hyper_grid = product(n_clusters, n_init)
- hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
- _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
-
- # call starmap
- print("running clustering selection")
- clustering_data = starmap(_do_clustering, hyper_grid)
- clustering_data = pd.DataFrame(list(clustering_data))
- clustering_data.to_csv(outinfo)
-
- return clustering_data
if __name__ == "__main__":
- x = fire.Fire(select_kmeans_clustering)
+
+ fire.Fire{'grid_sweep':kmeans_grid_sweep,
+ 'grid_sweep_lsi':kmeans_lsi_grid_sweep
+ 'cluster':kmeans_job,
+ 'cluster_lsi':kmeans_lsi_job}