]> code.communitydata.science - cdsc_reddit.git/commitdiff
refactor clustring in object oriented style
authorNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Sat, 8 May 2021 05:33:26 +0000 (22:33 -0700)
committerNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Sat, 8 May 2021 05:33:26 +0000 (22:33 -0700)
clustering/affinity_clustering.py
clustering/clustering_base.py
clustering/hdbscan_clustering.py
clustering/kmeans_clustering.py

index 287f7e21e84b7ddf6ad87fda6b96431d9ceec922..b4f8461157fc83f78d4211ff2321d28a5a36cf7d 100644 (file)
@@ -2,7 +2,8 @@ from sklearn.metrics import silhouette_score
 from sklearn.cluster import AffinityPropagation
 from functools import partial
 from dataclasses import dataclass
-from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
+from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
 from multiprocessing  import Pool, cpu_count, Array, Process
 from pathlib import Path
 from itertools import product, starmap
@@ -17,116 +18,158 @@ class affinity_clustering_result(clustering_result):
     damping:float
     convergence_iter:int
     preference_quantile:float
+    preference:float
+    max_iter:int
 
-def affinity_clustering(similarities, output, *args, **kwargs):
-    subreddits, mat = read_similarity_mat(similarities)
-    clustering = _affinity_clustering(mat, *args, **kwargs)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    cluster_data['algorithm'] = 'affinity'
-    return(cluster_data)
-
-def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
-    '''
-    similarities: matrix of similarity scores
-    preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
-    damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. 
-    '''
-    print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
-
-    preference = np.quantile(mat,preference_quantile)
-
-    print(f"preference is {preference}")
-    print("data loaded")
-    sys.stdout.flush()
-    clustering = AffinityPropagation(damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     copy=False,
-                                     preference=preference,
-                                     affinity='precomputed',
-                                     verbose=verbose,
-                                     random_state=random_state).fit(mat)
-
-    cluster_data = process_clustering_result(clustering, subreddits)
-    output = Path(output)
-    output.parent.mkdir(parents=True,exist_ok=True)
-    cluster_data.to_feather(output)
-    print(f"saved {output}")
-    return clustering
-
-
-def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits,  max_iter,  outdir:Path, random_state, verbose, alt_mat, overwrite=False):
-    if name is None:
-        name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
-    print(name)
-    sys.stdout.flush()
-    outpath = outdir / (str(name) + ".feather")
-    outpath.parent.mkdir(parents=True,exist_ok=True)
-    print(outpath)
-    clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    mat = sim_to_dist(clustering.affinity_matrix_)
-
-    try: 
-        score = silhouette_score(mat, clustering.labels_, metric='precomputed')
-    except ValueError:
-        score = None
-
-    if alt_mat is not None:
-        alt_distances = sim_to_dist(alt_mat)
-        try:
-            alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
-        except ValueError:
-            alt_score = None
+@dataclass
+class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
+    pass
+
+class affinity_job(clustering_job):
+    def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=self._affinity_clustering,
+                         preference_quantile=preference_quantile,
+                         damping=damping,
+                         max_iter=max_iter,
+                         convergence_iter=convergence_iter,
+                         random_state=1968,
+                         verbose=verbose)
+        self.damping=damping
+        self.max_iter=max_iter
+        self.convergence_iter=convergence_iter
+        self.preference_quantile=preference_quantile
+
+    def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
+        mat = 1-mat
+        preference = np.quantile(mat, preference_quantile)
+        self.preference = preference
+        print(f"preference is {preference}")
+        print("data loaded")
+        sys.stdout.flush()
+        clustering = AffinityPropagation(*args,
+                                         preference=preference,
+                                         affinity='precomputed',
+                                         copy=False,
+                                         **kwargs).fit(mat)
+        return clustering
+
+    def get_info(self):
+        result = super().get_info()
+        self.result=affinity_clustering_result(**result.__dict__,
+                                               damping=self.damping,
+                                               max_iter=self.max_iter,
+                                               convergence_iter=self.convergence_iter,
+                                               preference_quantile=self.preference_quantile,
+                                               preference=self.preference)
+
+        return self.result
+
+class affinity_lsi_job(affinity_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = affinity_clustering_result_lsi(**result.__dict__,
+                                                     lsi_dimensions=self.lsi_dims)
+        return self.result
+
+class affinity_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+
+        super().__init__(affinity_job,
+                         _afffinity_grid_sweep,
+                         inpath,
+                         outpath,
+                         self.namer,
+                         *args,
+                         **kwargs)
+    def namer(self,
+              damping,
+              max_iter,
+              convergence_iter,
+              preference_quantile):
+
+        return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
+
+class _affinity_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+        self.lsi_dim = lsi_dim
+        self.jobtype = affinity_lsi_job
+        super().__init__(self.jobtype,
+                         inpath,
+                         outpath,
+                         self.namer,
+                         self.lsi_dim,
+                         *args,
+                         **kwargs)
+
+    def namer(self, *args, **kwargs):
+        s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
+
+class affinity_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 dampings=[0.9],
+                 max_iters=[10000],
+                 convergence_iters=[30],
+                 preference_quantiles=[0.5]):
+
+        super().__init__(affinity_lsi_job,
+                         _affinity_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         dampings,
+                         max_iters,
+                         convergence_iters,
+                         preference_quantiles)
     
-    res = affinity_clustering_result(outpath=outpath,
-                                     damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     preference_quantile=preference_quantile,
-                                     silhouette_score=score,
-                                     alt_silhouette_score=score,
-                                     name=str(name))
-
-    return res
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-
-def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
-
-    damping = list(map(float,damping))
-    convergence_iter = convergence_iter = list(map(int,convergence_iter))
-    preference_quantile = list(map(float,preference_quantile))
-
-    if type(outdir) is str:
-        outdir = Path(outdir)
-
-    outdir.mkdir(parents=True,exist_ok=True)
-
-    subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
-    if alt_similarities is not None:
-        alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
-    else:
-        alt_mat = None
-
-    if J is None:
-        J = cpu_count()
-    pool = Pool(J)
-
-    # get list of tuples: the combinations of hyperparameters
-    hyper_grid = product(damping, convergence_iter, preference_quantile)
-    hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
-    _do_clustering = partial(do_clustering,  mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
+                         
+    
+def test_select_affinity_clustering():
+    # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+    #                           "test_hdbscan_author30k",
+    #                           min_cluster_sizes=[2],
+    #                           min_samples=[1,2],
+    #                           cluster_selection_epsilons=[0,0.05,0.1,0.15],
+    #                           cluster_selection_methods=['eom','leaf'],
+    #                           lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+    outpath = "test_affinity";
+    dampings=[0.8,0.9]
+    max_iters=[100000]
+    convergence_iters=[15]
+    preference_quantiles=[0.5,0.7]
+    
+    gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
+    gs.run(20)
+    gs.save("test_affinity/lsi_sweep.csv")
 
-    #    similarities = Array('d', mat)
-    # call pool.starmap
-    print("running clustering selection")
-    clustering_data = pool.starmap(_do_clustering, hyper_grid)
-    clustering_data = pd.DataFrame(list(clustering_data))
-    clustering_data.to_csv(outinfo)
-    return clustering_data
 
 if __name__ == "__main__":
-    x = fire.Fire(select_affinity_clustering)
+    fire.Fire{'grid_sweep':affinity_grid_sweep,
+              'grid_sweep_lsi':affinity_lsi_grid_sweep
+              'cluster':affinity_job,
+              'cluster_lsi':affinity_lsi_job}
index 1d864383cb791c30d803de69aa281be8a966cafe..5492415671846392affed1261f9ede79919b9f27 100644 (file)
@@ -2,6 +2,9 @@ from pathlib import Path
 import numpy as np
 import pandas as pd
 from dataclasses import dataclass
+from sklearn.metrics import silhouette_score, silhouette_samples
+from itertools import product, chain
+from multiprocessing import Pool, cpu_count
 
 def sim_to_dist(mat):
     dist = 1-mat
@@ -9,41 +12,147 @@ def sim_to_dist(mat):
     np.fill_diagonal(dist,0)
     return dist
 
-def process_clustering_result(clustering, subreddits):
+class grid_sweep:
+    def __init__(self, jobtype, inpath, outpath, namer, *args):
+        self.jobtype = jobtype
+        self.namer = namer
+        grid = list(product(*args))
+        inpath = Path(inpath)
+        outpath = Path(outpath)
+        self.hasrun = False
+        self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
+        self.jobs = [jobtype(*g) for g in self.grid]
 
-    if hasattr(clustering,'n_iter_'):
-        print(f"clustering took {clustering.n_iter_} iterations")
+    def run(self, cores=20):
+        if cores is not None and cores > 1:
+            with Pool(cores) as pool:
+                infos = pool.map(self.jobtype.get_info, self.jobs)
+        else:
+            infos = map(self.jobtype.get_info, self.jobs)
 
-    clusters = clustering.labels_
+        self.infos = pd.DataFrame(infos)
+        self.hasrun = True
 
-    print(f"found {len(set(clusters))} clusters")
+    def save(self, outcsv):
+        if not self.hasrun:
+            self.run()
+        outcsv = Path(outcsv)
+        outcsv.parent.mkdir(parents=True, exist_ok=True)
+        self.infos.to_csv(outcsv)
 
-    cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
 
-    cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
-    print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+class lsi_grid_sweep(grid_sweep):
+    def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
+        self.jobtype = jobtype
+        self.subsweep = subsweep
+        inpath = Path(inpath)
+        if lsi_dimensions == 'all':
+            lsi_paths = list(inpath.glob("*"))
+        else:
+            lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
 
-    print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        lsi_nums = [p.stem for p in lsi_paths]
+        self.hasrun = False
+        self.subgrids = [self.subsweep(lsi_path, outpath,  lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
+        self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
 
-    print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
 
-    print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
+# this is meant to be an interface, not created directly
+class clustering_job:
+    def __init__(self, infile, outpath, name, call, *args, **kwargs):
+        self.outpath = Path(outpath)
+        self.call = call
+        self.args = args
+        self.kwargs = kwargs
+        self.infile = Path(infile)
+        self.name = name
+        self.hasrun = False
 
-    return cluster_data
+    def run(self):
+        self.subreddits, self.mat = self.read_distance_mat(self.infile)
+        self.clustering = self.call(self.mat, *self.args, **self.kwargs)
+        self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
+        self.score = self.silhouette()
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
+        self.hasrun = True
+        
+    def get_info(self):
+        if not self.hasrun:
+            self.run()
 
+        self.result = clustering_result(outpath=str(self.outpath.resolve()),
+                                        silhouette_score=self.score,
+                                        name=self.name,
+                                        n_clusters=self.n_clusters,
+                                        n_isolates=self.n_isolates,
+                                        silhouette_samples = str(self.silsampout.resolve())
+                                        )
+        return self.result
+
+    def silhouette(self):
+        isolates = self.clustering.labels_ == -1
+        scoremat = self.mat[~isolates][:,~isolates]
+        score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
+        silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
+        silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.silsampout = self.outpath / ("silhouette_samples-" + self.name +  ".feather")
+        silhouette_samp.to_feather(self.silsampout)
+        return score
+
+    def read_distance_mat(self, similarities, use_threads=True):
+        df = pd.read_feather(similarities, use_threads=use_threads)
+        mat = np.array(df.drop('_subreddit',1))
+        n = mat.shape[0]
+        mat[range(n),range(n)] = 1
+        return (df._subreddit,1-mat)
+
+    def process_clustering(self, clustering, subreddits):
+
+        if hasattr(clustering,'n_iter_'):
+            print(f"clustering took {clustering.n_iter_} iterations")
+
+        clusters = clustering.labels_
+        self.n_clusters = len(set(clusters))
+
+        print(f"found {self.n_clusters} clusters")
+
+        cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
+
+        cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
+        print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+
+        print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        n_isolates1 = (cluster_sizes.subreddit==1).sum()
+
+        print(f"{n_isolates1} clusters have 1 member")
+
+        n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])
+
+        print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
+
+        if n_isolates1 == 0:
+            self.n_isolates = n_isolates2
+        else:
+            self.n_isolates = n_isolates1
+
+        return cluster_data
+
+
+class lsi_mixin():
+    def set_lsi_dims(self, lsi_dims):
+        self.lsi_dims = lsi_dims
 
 @dataclass
 class clustering_result:
     outpath:Path
-    max_iter:int
     silhouette_score:float
-    alt_silhouette_score:float
     name:str
     n_clusters:int
+    n_isolates:int
+    silhouette_samples:str
 
-def read_similarity_mat(similarities, use_threads=True):
-    df = pd.read_feather(similarities, use_threads=use_threads)
-    mat = np.array(df.drop('_subreddit',1))
-    n = mat.shape[0]
-    mat[range(n),range(n)] = 1
-    return (df._subreddit,mat)
+@dataclass
+class lsi_result_mixin:
+    lsi_dimensions:int
index 4f4e0d6f2c4f18b47d3d96ac0991fbc72fdb6aef..f0ee7038c75bab4df7960f5d49b68518aac85345 100644 (file)
@@ -1,10 +1,11 @@
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
 from dataclasses import dataclass
 import hdbscan
 from sklearn.neighbors import NearestNeighbors
 import plotnine as pn
 import numpy as np
-from itertools import product, starmap
+from itertools import product, starmap, chain
 import pandas as pd
 from sklearn.metrics import silhouette_score, silhouette_samples
 from pathlib import Path
@@ -13,27 +14,88 @@ import fire
 from pyarrow.feather import write_feather
 
 def test_select_hdbscan_clustering():
-    select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
-                              "test_hdbscan_author30k",
-                              min_cluster_sizes=[2],
-                              min_samples=[1,2],
-                              cluster_selection_epsilons=[0,0.05,0.1,0.15],
-                              cluster_selection_methods=['eom','leaf'],
-                              lsi_dimensions='all')
-    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
+    select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+                              "test_hdbscan_author30k",
+                              min_cluster_sizes=[2],
+                              min_samples=[1,2],
+                              cluster_selection_epsilons=[0,0.05,0.1,0.15],
+                              cluster_selection_methods=['eom','leaf'],
+                              lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
     outpath = "test_hdbscan";
     min_cluster_sizes=[2,3,4];
     min_samples=[1,2,3];
     cluster_selection_epsilons=[0,0.1,0.3,0.5];
     cluster_selection_methods=['eom'];
     lsi_dimensions='all'
+    gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+    gs.run(20)
+    gs.save("test_hdbscan/lsi_sweep.csv")
+    # job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
+    # job1.run()
+    # print(job1.get_info())
 
-    df = pd.read_csv("test_hdbscan/selection_data.csv")
-    test_select_hdbscan_clustering()
-    check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
-    silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
-    c = check_clusters.merge(silscores,on='subreddit')#    fire.Fire(select_hdbscan_clustering)
+    df = pd.read_csv("test_hdbscan/selection_data.csv")
+    test_select_hdbscan_clustering()
+    check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
+    silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
+    c = check_clusters.merge(silscores,on='subreddit')#    fire.Fire(select_hdbscan_clustering)
 
+class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 min_cluster_sizes,
+                 min_samples,
+                 cluster_selection_epsilons,
+                 cluster_selection_methods
+                 ):
+
+        super().__init__(hdbscan_lsi_job,
+                         _hdbscan_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         min_cluster_sizes,
+                         min_samples,
+                         cluster_selection_epsilons,
+                         cluster_selection_methods)
+        
+class hdbscan_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+
+        super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
+
+    def namer(self,
+              min_cluster_size,
+              min_samples,
+              cluster_selection_epsilon,
+              cluster_selection_method):
+        return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
+
+
+class _hdbscan_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+
+        self.lsi_dim = lsi_dim
+        self.jobtype = hdbscan_lsi_job
+        super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+
+    def namer(self, *args, **kwargs):
+        s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
 
 @dataclass
 class hdbscan_clustering_result(clustering_result):
@@ -41,107 +103,166 @@ class hdbscan_clustering_result(clustering_result):
     min_samples:int
     cluster_selection_epsilon:float
     cluster_selection_method:str
-    lsi_dimensions:int
-    n_isolates:int
-    silhouette_samples:str
-
-def select_hdbscan_clustering(inpath,
-                              outpath,
-                              outfile=None,
-                              min_cluster_sizes=[2],
-                              min_samples=[1],
-                              cluster_selection_epsilons=[0],
-                              cluster_selection_methods=['eom'],
-                              lsi_dimensions='all'
-                              ):
-
-    inpath = Path(inpath)
-    outpath = Path(outpath)
-    outpath.mkdir(exist_ok=True, parents=True)
+
+@dataclass
+class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
+    pass 
+
+class hdbscan_job(clustering_job):
+    def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=hdbscan_job._hdbscan_clustering,
+                         min_cluster_size=min_cluster_size,
+                         min_samples=min_samples,
+                         cluster_selection_epsilon=cluster_selection_epsilon,
+                         cluster_selection_method=cluster_selection_method
+                         )
+
+        self.min_cluster_size = min_cluster_size
+        self.min_samples = min_samples
+        self.cluster_selection_epsilon = cluster_selection_epsilon
+        self.cluster_selection_method = cluster_selection_method
+#        self.mat = 1 - self.mat
+
+    def _hdbscan_clustering(mat, *args, **kwargs):
+        print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+        print(mat)
+        clusterer = hdbscan.HDBSCAN(metric='precomputed',
+                                    core_dist_n_jobs=cpu_count(),
+                                    *args,
+                                    **kwargs,
+                                    )
     
-    if lsi_dimensions == 'all':
-        lsi_paths = list(inpath.glob("*"))
+        clustering = clusterer.fit(mat.astype('double'))
+    
+        return(clustering)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = hdbscan_clustering_result(**result.__dict__,
+                                                min_cluster_size=self.min_cluster_size,
+                                                min_samples=self.min_samples,
+                                                cluster_selection_epsilon=self.cluster_selection_epsilon,
+                                                cluster_selection_method=self.cluster_selection_method)
+        return self.result
+
+class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(
+                         infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        partial_result = super().get_info()
+        self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
+                                                    lsi_dimensions=self.lsi_dims)
+        return self.result
+
+# def select_hdbscan_clustering(inpath,
+#                               outpath,
+#                               outfile=None,
+#                               min_cluster_sizes=[2],
+#                               min_samples=[1],
+#                               cluster_selection_epsilons=[0],
+#                               cluster_selection_methods=['eom'],
+#                               lsi_dimensions='all'
+#                               ):
+
+#     inpath = Path(inpath)
+#     outpath = Path(outpath)
+#     outpath.mkdir(exist_ok=True, parents=True)
+    
+#     if lsi_dimensions is None:
+#         lsi_paths = [inpath]
+#     elif lsi_dimensions == 'all':
+#         lsi_paths = list(inpath.glob("*"))
 
-    else:
-        lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
+    else:
+        lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
 
-    lsi_nums = [p.stem for p in lsi_paths]
-    grid = list(product(lsi_nums,
-                        min_cluster_sizes,
-                        min_samples,
-                        cluster_selection_epsilons,
-                        cluster_selection_methods))
+#     if lsi_dimensions is not None:
+#         lsi_nums = [p.stem for p in lsi_paths]
+#     else:
+#         lsi_nums = [None]
+#     grid = list(product(lsi_nums,
+#                         min_cluster_sizes,
+#                         min_samples,
+#                         cluster_selection_epsilons,
+#                         cluster_selection_methods))
 
-    # fix the output file names
-    names = list(map(lambda t:'_'.join(map(str,t)),grid))
+    # fix the output file names
+    names = list(map(lambda t:'_'.join(map(str,t)),grid))
 
-    grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
+    grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
         
-    with Pool(int(cpu_count()/4)) as pool:
-        mods = starmap(hdbscan_clustering, grid)
+    with Pool(int(cpu_count()/4)) as pool:
+        mods = starmap(hdbscan_clustering, grid)
 
-    res = pd.DataFrame(mods)
-    if outfile is None:
-        outfile = outpath / "selection_data.csv"
+    res = pd.DataFrame(mods)
+    if outfile is None:
+        outfile = outpath / "selection_data.csv"
 
-    res.to_csv(outfile)
+    res.to_csv(outfile)
 
-def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
-    subreddits, mat = read_similarity_mat(similarities)
-    mat = sim_to_dist(mat)
-    clustering = _hdbscan_clustering(mat,
-                                     min_cluster_size=min_cluster_size,
-                                     min_samples=min_samples,
-                                     cluster_selection_epsilon=cluster_selection_epsilon,
-                                     cluster_selection_method=cluster_selection_method,
-                                     metric='precomputed',
-                                     core_dist_n_jobs=cpu_count()
-                                     )
-
-    cluster_data = process_clustering_result(clustering, subreddits)
-    isolates = clustering.labels_ == -1
-    scoremat = mat[~isolates][:,~isolates]
-    score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
-    cluster_data.to_feather(output)
-
-    silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
-    silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
-    silsampout = output.parent / ("silhouette_samples" + output.name)
-    silhouette_samp.to_feather(silsampout)
-
-    result = hdbscan_clustering_result(outpath=output,
-                                       max_iter=None,
-                                       silhouette_samples=silsampout,
-                                       silhouette_score=score,
-                                       alt_silhouette_score=score,
-                                       name=name,
-                                       min_cluster_size=min_cluster_size,
-                                       min_samples=min_samples,
-                                       cluster_selection_epsilon=cluster_selection_epsilon,
-                                       cluster_selection_method=cluster_selection_method,
-                                       lsi_dimensions=lsi_dim,
-                                       n_isolates=isolates.sum(),
-                                       n_clusters=len(set(clustering.labels_))
-                                   )
+# def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+#     subreddits, mat = read_similarity_mat(similarities)
+#     mat = sim_to_dist(mat)
+#     clustering = _hdbscan_clustering(mat,
+#                                      min_cluster_size=min_cluster_size,
+#                                      min_samples=min_samples,
+#                                      cluster_selection_epsilon=cluster_selection_epsilon,
+#                                      cluster_selection_method=cluster_selection_method,
+#                                      metric='precomputed',
+#                                      core_dist_n_jobs=cpu_count()
+#                                      )
+
+#     cluster_data = process_clustering_result(clustering, subreddits)
+#     isolates = clustering.labels_ == -1
+#     scoremat = mat[~isolates][:,~isolates]
+#     score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
+#     cluster_data.to_feather(output)
+#     silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
+#     silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
+#     silsampout = output.parent / ("silhouette_samples" + output.name)
+#     silhouette_samp.to_feather(silsampout)
+
+#     result = hdbscan_clustering_result(outpath=output,
+#                                        silhouette_samples=silsampout,
+#                                        silhouette_score=score,
+#                                        name=name,
+#                                        min_cluster_size=min_cluster_size,
+#                                        min_samples=min_samples,
+#                                        cluster_selection_epsilon=cluster_selection_epsilon,
+#                                        cluster_selection_method=cluster_selection_method,
+#                                        lsi_dimensions=lsi_dim,
+#                                        n_isolates=isolates.sum(),
+#                                        n_clusters=len(set(clustering.labels_))
+#                                    )
 
 
                                        
-    return(result)
-
-# for all runs we should try cluster_selection_epsilon = None
-# for terms we should try cluster_selection_epsilon around 0.56-0.66
-# for authors we should try cluster_selection_epsilon around 0.98-0.99
-def _hdbscan_clustering(mat, *args, **kwargs):
-    print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
-
-    print(mat)
-    clusterer = hdbscan.HDBSCAN(*args,
-                                **kwargs,
-                                )
+    return(result)
+
+# for all runs we should try cluster_selection_epsilon = None
+# for terms we should try cluster_selection_epsilon around 0.56-0.66
+# for authors we should try cluster_selection_epsilon around 0.98-0.99
+def _hdbscan_clustering(mat, *args, **kwargs):
+    print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+
+    print(mat)
+    clusterer = hdbscan.HDBSCAN(*args,
+                                **kwargs,
+                                )
     
-    clustering = clusterer.fit(mat.astype('double'))
+    clustering = clusterer.fit(mat.astype('double'))
     
-    return(clustering)
+    return(clustering)
 
 def KNN_distances_plot(mat,outname,k=2):
     nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
@@ -172,4 +293,10 @@ def make_KNN_plots():
     KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
 
 if __name__ == "__main__":
-    fire.Fire(select_hdbscan_clustering)
+    fire.Fire{'grid_sweep':hdbscan_grid_sweep,
+              'grid_sweep_lsi':hdbscan_lsi_grid_sweep
+              'cluster':hdbscan_job,
+              'cluster_lsi':hdbscan_lsi_job}
+    
+#    test_select_hdbscan_clustering()
+    #fire.Fire(select_hdbscan_clustering)  
index 8822e9f0cd67c2ece76d9552d7c2c77883bac76f..e41b88bff810bcde4937b0b4dff83e5f827f8761 100644 (file)
@@ -4,98 +4,145 @@ from pathlib import Path
 from multiprocessing import cpu_count
 from dataclasses import dataclass
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
+
 
 @dataclass
 class kmeans_clustering_result(clustering_result):
     n_clusters:int
     n_init:int
+    max_iter:int
 
-def kmeans_clustering(similarities, *args, **kwargs):
-    subreddits, mat = read_similarity_mat(similarities)
-    mat = sim_to_dist(mat)
-    clustering = _kmeans_clustering(mat, *args, **kwargs)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    return(cluster_data)
-
-def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
-
-    clustering = KMeans(n_clusters=n_clusters,
-                        n_init=n_init,
-                        max_iter=max_iter,
-                        random_state=random_state,
-                        verbose=verbose
-                        ).fit(mat)
-
-    return clustering
-
-def do_clustering(n_clusters, n_init, name, mat, subreddits,  max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
-    if name is None:
-        name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
-    print(name)
-    sys.stdout.flush()
-    outpath = outdir / (str(name) + ".feather")
-    print(outpath)
-    mat = sim_to_dist(mat)
-    clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
-
-    outpath.parent.mkdir(parents=True,exist_ok=True)
-    cluster_data.to_feather(outpath)
-    cluster_data = process_clustering_result(clustering, subreddits)
-
-    try: 
-        score = silhouette_score(mat, clustering.labels_, metric='precomputed')
-    except ValueError:
-        score = None
-
-    if alt_mat is not None:
-        alt_distances = sim_to_dist(alt_mat)
-        try:
-            alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
-        except ValueError:
-            alt_score = None
+@dataclass
+class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
+    pass
+
+class kmeans_job(clustering_job):
+    def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=kmeans_job._kmeans_clustering,
+                         n_clusters=n_clusters,
+                         n_init=n_init,
+                         max_iter=max_iter,
+                         random_state=random_state,
+                         verbose=verbose)
+
+        self.n_clusters=n_clusters
+        self.n_init=n_init
+        self.max_iter=max_iter
+
+    def _kmeans_clustering(mat, *args, **kwargs):
+
+        clustering = KMeans(*args,
+                            **kwargs,
+                            ).fit(mat)
+
+        return clustering
+
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = kmeans_clustering_result(**result.__dict__,
+                                               n_init=n_init,
+                                               max_iter=max_iter)
+        return self.result
+
+
+class kmeans_lsi_job(kmeans_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = kmeans_clustering_result_lsi(**result.__dict__,
+                                                   lsi_dimensions=self.lsi_dims)
+        return self.result
     
-    res = kmeans_clustering_result(outpath=outpath,
-                                   max_iter=max_iter,
-                                   n_clusters=n_clusters,
-                                   n_init = n_init,
-                                   silhouette_score=score,
-                                   alt_silhouette_score=score,
-                                   name=str(name))
-
-    return res
-
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
-
-    n_clusters = list(map(int,n_clusters))
-    n_init  = list(map(int,n_init))
-
-    if type(outdir) is str:
-        outdir = Path(outdir)
 
-    outdir.mkdir(parents=True,exist_ok=True)
+class kmeans_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+        super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
+
+    def namer(self,
+             n_clusters,
+             n_init,
+             max_iter):
+        return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
+
+class _kmeans_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+        self.lsi_dim = lsi_dim
+        self.jobtype = kmeans_lsi_job
+        super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+    def namer(self, *args, **kwargs):
+        s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
+
+class kmeans_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 n_clusters,
+                 n_inits,
+                 max_iters
+                 ):
+
+        super().__init__(kmeans_lsi_job,
+                         _kmeans_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         n_clusters,
+                         n_inits,
+                         max_iters)
+
+def test_select_kmeans_clustering():
+    # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+    #                           "test_hdbscan_author30k",
+    #                           min_cluster_sizes=[2],
+    #                           min_samples=[1,2],
+    #                           cluster_selection_epsilons=[0,0.05,0.1,0.15],
+    #                           cluster_selection_methods=['eom','leaf'],
+    #                           lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+    outpath = "test_kmeans";
+    n_clusters=[200,300,400];
+    n_init=[1,2,3];
+    max_iter=[100000]
+
+    gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
+    gs.run(1)
+
+    cluster_selection_epsilons=[0,0.1,0.3,0.5];
+    cluster_selection_methods=['eom'];
+    lsi_dimensions='all'
+    gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+    gs.run(20)
+    gs.save("test_hdbscan/lsi_sweep.csv")
 
-    subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
-    if alt_similarities is not None:
-        alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
-    else:
-        alt_mat = None
-
-    # get list of tuples: the combinations of hyperparameters
-    hyper_grid = product(n_clusters, n_init)
-    hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
-    _do_clustering = partial(do_clustering,  mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
-
-    # call starmap
-    print("running clustering selection")
-    clustering_data = starmap(_do_clustering, hyper_grid)
-    clustering_data = pd.DataFrame(list(clustering_data))
-    clustering_data.to_csv(outinfo)
-    
-    return clustering_data
 
 if __name__ == "__main__":
-    x = fire.Fire(select_kmeans_clustering)
+
+    fire.Fire{'grid_sweep':kmeans_grid_sweep,
+              'grid_sweep_lsi':kmeans_lsi_grid_sweep
+              'cluster':kmeans_job,
+              'cluster_lsi':kmeans_lsi_job}

Community Data Science Collective || Want to submit a patch?