]> code.communitydata.science - cdsc_reddit.git/commitdiff
refactor clustring in object oriented style
authorNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Sat, 8 May 2021 05:33:26 +0000 (22:33 -0700)
committerNate E TeBlunthuis <nathante@klone-login01.hyak.local>
Sat, 8 May 2021 05:33:26 +0000 (22:33 -0700)
clustering/affinity_clustering.py
clustering/clustering_base.py
clustering/hdbscan_clustering.py
clustering/kmeans_clustering.py

index 287f7e21e84b7ddf6ad87fda6b96431d9ceec922..b4f8461157fc83f78d4211ff2321d28a5a36cf7d 100644 (file)
@@ -2,7 +2,8 @@ from sklearn.metrics import silhouette_score
 from sklearn.cluster import AffinityPropagation
 from functools import partial
 from dataclasses import dataclass
 from sklearn.cluster import AffinityPropagation
 from functools import partial
 from dataclasses import dataclass
-from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
+from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
 from multiprocessing  import Pool, cpu_count, Array, Process
 from pathlib import Path
 from itertools import product, starmap
 from multiprocessing  import Pool, cpu_count, Array, Process
 from pathlib import Path
 from itertools import product, starmap
@@ -17,116 +18,158 @@ class affinity_clustering_result(clustering_result):
     damping:float
     convergence_iter:int
     preference_quantile:float
     damping:float
     convergence_iter:int
     preference_quantile:float
+    preference:float
+    max_iter:int
 
 
-def affinity_clustering(similarities, output, *args, **kwargs):
-    subreddits, mat = read_similarity_mat(similarities)
-    clustering = _affinity_clustering(mat, *args, **kwargs)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    cluster_data['algorithm'] = 'affinity'
-    return(cluster_data)
-
-def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
-    '''
-    similarities: matrix of similarity scores
-    preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
-    damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. 
-    '''
-    print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
-
-    preference = np.quantile(mat,preference_quantile)
-
-    print(f"preference is {preference}")
-    print("data loaded")
-    sys.stdout.flush()
-    clustering = AffinityPropagation(damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     copy=False,
-                                     preference=preference,
-                                     affinity='precomputed',
-                                     verbose=verbose,
-                                     random_state=random_state).fit(mat)
-
-    cluster_data = process_clustering_result(clustering, subreddits)
-    output = Path(output)
-    output.parent.mkdir(parents=True,exist_ok=True)
-    cluster_data.to_feather(output)
-    print(f"saved {output}")
-    return clustering
-
-
-def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits,  max_iter,  outdir:Path, random_state, verbose, alt_mat, overwrite=False):
-    if name is None:
-        name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
-    print(name)
-    sys.stdout.flush()
-    outpath = outdir / (str(name) + ".feather")
-    outpath.parent.mkdir(parents=True,exist_ok=True)
-    print(outpath)
-    clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    mat = sim_to_dist(clustering.affinity_matrix_)
-
-    try: 
-        score = silhouette_score(mat, clustering.labels_, metric='precomputed')
-    except ValueError:
-        score = None
-
-    if alt_mat is not None:
-        alt_distances = sim_to_dist(alt_mat)
-        try:
-            alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
-        except ValueError:
-            alt_score = None
+@dataclass
+class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
+    pass
+
+class affinity_job(clustering_job):
+    def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=self._affinity_clustering,
+                         preference_quantile=preference_quantile,
+                         damping=damping,
+                         max_iter=max_iter,
+                         convergence_iter=convergence_iter,
+                         random_state=1968,
+                         verbose=verbose)
+        self.damping=damping
+        self.max_iter=max_iter
+        self.convergence_iter=convergence_iter
+        self.preference_quantile=preference_quantile
+
+    def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
+        mat = 1-mat
+        preference = np.quantile(mat, preference_quantile)
+        self.preference = preference
+        print(f"preference is {preference}")
+        print("data loaded")
+        sys.stdout.flush()
+        clustering = AffinityPropagation(*args,
+                                         preference=preference,
+                                         affinity='precomputed',
+                                         copy=False,
+                                         **kwargs).fit(mat)
+        return clustering
+
+    def get_info(self):
+        result = super().get_info()
+        self.result=affinity_clustering_result(**result.__dict__,
+                                               damping=self.damping,
+                                               max_iter=self.max_iter,
+                                               convergence_iter=self.convergence_iter,
+                                               preference_quantile=self.preference_quantile,
+                                               preference=self.preference)
+
+        return self.result
+
+class affinity_lsi_job(affinity_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = affinity_clustering_result_lsi(**result.__dict__,
+                                                     lsi_dimensions=self.lsi_dims)
+        return self.result
+
+class affinity_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+
+        super().__init__(affinity_job,
+                         _afffinity_grid_sweep,
+                         inpath,
+                         outpath,
+                         self.namer,
+                         *args,
+                         **kwargs)
+    def namer(self,
+              damping,
+              max_iter,
+              convergence_iter,
+              preference_quantile):
+
+        return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
+
+class _affinity_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+        self.lsi_dim = lsi_dim
+        self.jobtype = affinity_lsi_job
+        super().__init__(self.jobtype,
+                         inpath,
+                         outpath,
+                         self.namer,
+                         self.lsi_dim,
+                         *args,
+                         **kwargs)
+
+    def namer(self, *args, **kwargs):
+        s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
+
+class affinity_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 dampings=[0.9],
+                 max_iters=[10000],
+                 convergence_iters=[30],
+                 preference_quantiles=[0.5]):
+
+        super().__init__(affinity_lsi_job,
+                         _affinity_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         dampings,
+                         max_iters,
+                         convergence_iters,
+                         preference_quantiles)
     
     
-    res = affinity_clustering_result(outpath=outpath,
-                                     damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     preference_quantile=preference_quantile,
-                                     silhouette_score=score,
-                                     alt_silhouette_score=score,
-                                     name=str(name))
-
-    return res
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-
-def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
-
-    damping = list(map(float,damping))
-    convergence_iter = convergence_iter = list(map(int,convergence_iter))
-    preference_quantile = list(map(float,preference_quantile))
-
-    if type(outdir) is str:
-        outdir = Path(outdir)
-
-    outdir.mkdir(parents=True,exist_ok=True)
-
-    subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
-    if alt_similarities is not None:
-        alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
-    else:
-        alt_mat = None
-
-    if J is None:
-        J = cpu_count()
-    pool = Pool(J)
-
-    # get list of tuples: the combinations of hyperparameters
-    hyper_grid = product(damping, convergence_iter, preference_quantile)
-    hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
-    _do_clustering = partial(do_clustering,  mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
+                         
+    
+def test_select_affinity_clustering():
+    # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+    #                           "test_hdbscan_author30k",
+    #                           min_cluster_sizes=[2],
+    #                           min_samples=[1,2],
+    #                           cluster_selection_epsilons=[0,0.05,0.1,0.15],
+    #                           cluster_selection_methods=['eom','leaf'],
+    #                           lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+    outpath = "test_affinity";
+    dampings=[0.8,0.9]
+    max_iters=[100000]
+    convergence_iters=[15]
+    preference_quantiles=[0.5,0.7]
+    
+    gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
+    gs.run(20)
+    gs.save("test_affinity/lsi_sweep.csv")
 
 
-    #    similarities = Array('d', mat)
-    # call pool.starmap
-    print("running clustering selection")
-    clustering_data = pool.starmap(_do_clustering, hyper_grid)
-    clustering_data = pd.DataFrame(list(clustering_data))
-    clustering_data.to_csv(outinfo)
-    return clustering_data
 
 if __name__ == "__main__":
 
 if __name__ == "__main__":
-    x = fire.Fire(select_affinity_clustering)
+    fire.Fire{'grid_sweep':affinity_grid_sweep,
+              'grid_sweep_lsi':affinity_lsi_grid_sweep
+              'cluster':affinity_job,
+              'cluster_lsi':affinity_lsi_job}
index 1d864383cb791c30d803de69aa281be8a966cafe..5492415671846392affed1261f9ede79919b9f27 100644 (file)
@@ -2,6 +2,9 @@ from pathlib import Path
 import numpy as np
 import pandas as pd
 from dataclasses import dataclass
 import numpy as np
 import pandas as pd
 from dataclasses import dataclass
+from sklearn.metrics import silhouette_score, silhouette_samples
+from itertools import product, chain
+from multiprocessing import Pool, cpu_count
 
 def sim_to_dist(mat):
     dist = 1-mat
 
 def sim_to_dist(mat):
     dist = 1-mat
@@ -9,41 +12,147 @@ def sim_to_dist(mat):
     np.fill_diagonal(dist,0)
     return dist
 
     np.fill_diagonal(dist,0)
     return dist
 
-def process_clustering_result(clustering, subreddits):
+class grid_sweep:
+    def __init__(self, jobtype, inpath, outpath, namer, *args):
+        self.jobtype = jobtype
+        self.namer = namer
+        grid = list(product(*args))
+        inpath = Path(inpath)
+        outpath = Path(outpath)
+        self.hasrun = False
+        self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
+        self.jobs = [jobtype(*g) for g in self.grid]
 
 
-    if hasattr(clustering,'n_iter_'):
-        print(f"clustering took {clustering.n_iter_} iterations")
+    def run(self, cores=20):
+        if cores is not None and cores > 1:
+            with Pool(cores) as pool:
+                infos = pool.map(self.jobtype.get_info, self.jobs)
+        else:
+            infos = map(self.jobtype.get_info, self.jobs)
 
 
-    clusters = clustering.labels_
+        self.infos = pd.DataFrame(infos)
+        self.hasrun = True
 
 
-    print(f"found {len(set(clusters))} clusters")
+    def save(self, outcsv):
+        if not self.hasrun:
+            self.run()
+        outcsv = Path(outcsv)
+        outcsv.parent.mkdir(parents=True, exist_ok=True)
+        self.infos.to_csv(outcsv)
 
 
-    cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
 
 
-    cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
-    print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+class lsi_grid_sweep(grid_sweep):
+    def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
+        self.jobtype = jobtype
+        self.subsweep = subsweep
+        inpath = Path(inpath)
+        if lsi_dimensions == 'all':
+            lsi_paths = list(inpath.glob("*"))
+        else:
+            lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
 
 
-    print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        lsi_nums = [p.stem for p in lsi_paths]
+        self.hasrun = False
+        self.subgrids = [self.subsweep(lsi_path, outpath,  lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
+        self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
 
 
-    print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
 
 
-    print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
+# this is meant to be an interface, not created directly
+class clustering_job:
+    def __init__(self, infile, outpath, name, call, *args, **kwargs):
+        self.outpath = Path(outpath)
+        self.call = call
+        self.args = args
+        self.kwargs = kwargs
+        self.infile = Path(infile)
+        self.name = name
+        self.hasrun = False
 
 
-    return cluster_data
+    def run(self):
+        self.subreddits, self.mat = self.read_distance_mat(self.infile)
+        self.clustering = self.call(self.mat, *self.args, **self.kwargs)
+        self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
+        self.score = self.silhouette()
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
+        self.hasrun = True
+        
+    def get_info(self):
+        if not self.hasrun:
+            self.run()
 
 
+        self.result = clustering_result(outpath=str(self.outpath.resolve()),
+                                        silhouette_score=self.score,
+                                        name=self.name,
+                                        n_clusters=self.n_clusters,
+                                        n_isolates=self.n_isolates,
+                                        silhouette_samples = str(self.silsampout.resolve())
+                                        )
+        return self.result
+
+    def silhouette(self):
+        isolates = self.clustering.labels_ == -1
+        scoremat = self.mat[~isolates][:,~isolates]
+        score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
+        silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
+        silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.silsampout = self.outpath / ("silhouette_samples-" + self.name +  ".feather")
+        silhouette_samp.to_feather(self.silsampout)
+        return score
+
+    def read_distance_mat(self, similarities, use_threads=True):
+        df = pd.read_feather(similarities, use_threads=use_threads)
+        mat = np.array(df.drop('_subreddit',1))
+        n = mat.shape[0]
+        mat[range(n),range(n)] = 1
+        return (df._subreddit,1-mat)
+
+    def process_clustering(self, clustering, subreddits):
+
+        if hasattr(clustering,'n_iter_'):
+            print(f"clustering took {clustering.n_iter_} iterations")
+
+        clusters = clustering.labels_
+        self.n_clusters = len(set(clusters))
+
+        print(f"found {self.n_clusters} clusters")
+
+        cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
+
+        cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
+        print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+
+        print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        n_isolates1 = (cluster_sizes.subreddit==1).sum()
+
+        print(f"{n_isolates1} clusters have 1 member")
+
+        n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])
+
+        print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
+
+        if n_isolates1 == 0:
+            self.n_isolates = n_isolates2
+        else:
+            self.n_isolates = n_isolates1
+
+        return cluster_data
+
+
+class lsi_mixin():
+    def set_lsi_dims(self, lsi_dims):
+        self.lsi_dims = lsi_dims
 
 @dataclass
 class clustering_result:
     outpath:Path
 
 @dataclass
 class clustering_result:
     outpath:Path
-    max_iter:int
     silhouette_score:float
     silhouette_score:float
-    alt_silhouette_score:float
     name:str
     n_clusters:int
     name:str
     n_clusters:int
+    n_isolates:int
+    silhouette_samples:str
 
 
-def read_similarity_mat(similarities, use_threads=True):
-    df = pd.read_feather(similarities, use_threads=use_threads)
-    mat = np.array(df.drop('_subreddit',1))
-    n = mat.shape[0]
-    mat[range(n),range(n)] = 1
-    return (df._subreddit,mat)
+@dataclass
+class lsi_result_mixin:
+    lsi_dimensions:int
index 4f4e0d6f2c4f18b47d3d96ac0991fbc72fdb6aef..f0ee7038c75bab4df7960f5d49b68518aac85345 100644 (file)
@@ -1,10 +1,11 @@
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
 from dataclasses import dataclass
 import hdbscan
 from sklearn.neighbors import NearestNeighbors
 import plotnine as pn
 import numpy as np
 from dataclasses import dataclass
 import hdbscan
 from sklearn.neighbors import NearestNeighbors
 import plotnine as pn
 import numpy as np
-from itertools import product, starmap
+from itertools import product, starmap, chain
 import pandas as pd
 from sklearn.metrics import silhouette_score, silhouette_samples
 from pathlib import Path
 import pandas as pd
 from sklearn.metrics import silhouette_score, silhouette_samples
 from pathlib import Path
@@ -13,27 +14,88 @@ import fire
 from pyarrow.feather import write_feather
 
 def test_select_hdbscan_clustering():
 from pyarrow.feather import write_feather
 
 def test_select_hdbscan_clustering():
-    select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
-                              "test_hdbscan_author30k",
-                              min_cluster_sizes=[2],
-                              min_samples=[1,2],
-                              cluster_selection_epsilons=[0,0.05,0.1,0.15],
-                              cluster_selection_methods=['eom','leaf'],
-                              lsi_dimensions='all')
-    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
+    select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+                              "test_hdbscan_author30k",
+                              min_cluster_sizes=[2],
+                              min_samples=[1,2],
+                              cluster_selection_epsilons=[0,0.05,0.1,0.15],
+                              cluster_selection_methods=['eom','leaf'],
+                              lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
     outpath = "test_hdbscan";
     min_cluster_sizes=[2,3,4];
     min_samples=[1,2,3];
     cluster_selection_epsilons=[0,0.1,0.3,0.5];
     cluster_selection_methods=['eom'];
     lsi_dimensions='all'
     outpath = "test_hdbscan";
     min_cluster_sizes=[2,3,4];
     min_samples=[1,2,3];
     cluster_selection_epsilons=[0,0.1,0.3,0.5];
     cluster_selection_methods=['eom'];
     lsi_dimensions='all'
+    gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+    gs.run(20)
+    gs.save("test_hdbscan/lsi_sweep.csv")
+    # job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
+    # job1.run()
+    # print(job1.get_info())
 
 
-    df = pd.read_csv("test_hdbscan/selection_data.csv")
-    test_select_hdbscan_clustering()
-    check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
-    silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
-    c = check_clusters.merge(silscores,on='subreddit')#    fire.Fire(select_hdbscan_clustering)
+    df = pd.read_csv("test_hdbscan/selection_data.csv")
+    test_select_hdbscan_clustering()
+    check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
+    silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
+    c = check_clusters.merge(silscores,on='subreddit')#    fire.Fire(select_hdbscan_clustering)
 
 
+class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 min_cluster_sizes,
+                 min_samples,
+                 cluster_selection_epsilons,
+                 cluster_selection_methods
+                 ):
+
+        super().__init__(hdbscan_lsi_job,
+                         _hdbscan_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         min_cluster_sizes,
+                         min_samples,
+                         cluster_selection_epsilons,
+                         cluster_selection_methods)
+        
+class hdbscan_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+
+        super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
+
+    def namer(self,
+              min_cluster_size,
+              min_samples,
+              cluster_selection_epsilon,
+              cluster_selection_method):
+        return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
+
+
+class _hdbscan_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+
+        self.lsi_dim = lsi_dim
+        self.jobtype = hdbscan_lsi_job
+        super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+
+    def namer(self, *args, **kwargs):
+        s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
 
 @dataclass
 class hdbscan_clustering_result(clustering_result):
 
 @dataclass
 class hdbscan_clustering_result(clustering_result):
@@ -41,107 +103,166 @@ class hdbscan_clustering_result(clustering_result):
     min_samples:int
     cluster_selection_epsilon:float
     cluster_selection_method:str
     min_samples:int
     cluster_selection_epsilon:float
     cluster_selection_method:str
-    lsi_dimensions:int
-    n_isolates:int
-    silhouette_samples:str
-
-def select_hdbscan_clustering(inpath,
-                              outpath,
-                              outfile=None,
-                              min_cluster_sizes=[2],
-                              min_samples=[1],
-                              cluster_selection_epsilons=[0],
-                              cluster_selection_methods=['eom'],
-                              lsi_dimensions='all'
-                              ):
-
-    inpath = Path(inpath)
-    outpath = Path(outpath)
-    outpath.mkdir(exist_ok=True, parents=True)
+
+@dataclass
+class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
+    pass 
+
+class hdbscan_job(clustering_job):
+    def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=hdbscan_job._hdbscan_clustering,
+                         min_cluster_size=min_cluster_size,
+                         min_samples=min_samples,
+                         cluster_selection_epsilon=cluster_selection_epsilon,
+                         cluster_selection_method=cluster_selection_method
+                         )
+
+        self.min_cluster_size = min_cluster_size
+        self.min_samples = min_samples
+        self.cluster_selection_epsilon = cluster_selection_epsilon
+        self.cluster_selection_method = cluster_selection_method
+#        self.mat = 1 - self.mat
+
+    def _hdbscan_clustering(mat, *args, **kwargs):
+        print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+        print(mat)
+        clusterer = hdbscan.HDBSCAN(metric='precomputed',
+                                    core_dist_n_jobs=cpu_count(),
+                                    *args,
+                                    **kwargs,
+                                    )
     
     
-    if lsi_dimensions == 'all':
-        lsi_paths = list(inpath.glob("*"))
+        clustering = clusterer.fit(mat.astype('double'))
+    
+        return(clustering)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = hdbscan_clustering_result(**result.__dict__,
+                                                min_cluster_size=self.min_cluster_size,
+                                                min_samples=self.min_samples,
+                                                cluster_selection_epsilon=self.cluster_selection_epsilon,
+                                                cluster_selection_method=self.cluster_selection_method)
+        return self.result
+
+class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(
+                         infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        partial_result = super().get_info()
+        self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
+                                                    lsi_dimensions=self.lsi_dims)
+        return self.result
+
+# def select_hdbscan_clustering(inpath,
+#                               outpath,
+#                               outfile=None,
+#                               min_cluster_sizes=[2],
+#                               min_samples=[1],
+#                               cluster_selection_epsilons=[0],
+#                               cluster_selection_methods=['eom'],
+#                               lsi_dimensions='all'
+#                               ):
+
+#     inpath = Path(inpath)
+#     outpath = Path(outpath)
+#     outpath.mkdir(exist_ok=True, parents=True)
+    
+#     if lsi_dimensions is None:
+#         lsi_paths = [inpath]
+#     elif lsi_dimensions == 'all':
+#         lsi_paths = list(inpath.glob("*"))
 
 
-    else:
-        lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
+    else:
+        lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
 
 
-    lsi_nums = [p.stem for p in lsi_paths]
-    grid = list(product(lsi_nums,
-                        min_cluster_sizes,
-                        min_samples,
-                        cluster_selection_epsilons,
-                        cluster_selection_methods))
+#     if lsi_dimensions is not None:
+#         lsi_nums = [p.stem for p in lsi_paths]
+#     else:
+#         lsi_nums = [None]
+#     grid = list(product(lsi_nums,
+#                         min_cluster_sizes,
+#                         min_samples,
+#                         cluster_selection_epsilons,
+#                         cluster_selection_methods))
 
 
-    # fix the output file names
-    names = list(map(lambda t:'_'.join(map(str,t)),grid))
+    # fix the output file names
+    names = list(map(lambda t:'_'.join(map(str,t)),grid))
 
 
-    grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
+    grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
         
         
-    with Pool(int(cpu_count()/4)) as pool:
-        mods = starmap(hdbscan_clustering, grid)
+    with Pool(int(cpu_count()/4)) as pool:
+        mods = starmap(hdbscan_clustering, grid)
 
 
-    res = pd.DataFrame(mods)
-    if outfile is None:
-        outfile = outpath / "selection_data.csv"
+    res = pd.DataFrame(mods)
+    if outfile is None:
+        outfile = outpath / "selection_data.csv"
 
 
-    res.to_csv(outfile)
+    res.to_csv(outfile)
 
 
-def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
-    subreddits, mat = read_similarity_mat(similarities)
-    mat = sim_to_dist(mat)
-    clustering = _hdbscan_clustering(mat,
-                                     min_cluster_size=min_cluster_size,
-                                     min_samples=min_samples,
-                                     cluster_selection_epsilon=cluster_selection_epsilon,
-                                     cluster_selection_method=cluster_selection_method,
-                                     metric='precomputed',
-                                     core_dist_n_jobs=cpu_count()
-                                     )
-
-    cluster_data = process_clustering_result(clustering, subreddits)
-    isolates = clustering.labels_ == -1
-    scoremat = mat[~isolates][:,~isolates]
-    score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
-    cluster_data.to_feather(output)
-
-    silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
-    silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
-    silsampout = output.parent / ("silhouette_samples" + output.name)
-    silhouette_samp.to_feather(silsampout)
-
-    result = hdbscan_clustering_result(outpath=output,
-                                       max_iter=None,
-                                       silhouette_samples=silsampout,
-                                       silhouette_score=score,
-                                       alt_silhouette_score=score,
-                                       name=name,
-                                       min_cluster_size=min_cluster_size,
-                                       min_samples=min_samples,
-                                       cluster_selection_epsilon=cluster_selection_epsilon,
-                                       cluster_selection_method=cluster_selection_method,
-                                       lsi_dimensions=lsi_dim,
-                                       n_isolates=isolates.sum(),
-                                       n_clusters=len(set(clustering.labels_))
-                                   )
+# def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
+#     subreddits, mat = read_similarity_mat(similarities)
+#     mat = sim_to_dist(mat)
+#     clustering = _hdbscan_clustering(mat,
+#                                      min_cluster_size=min_cluster_size,
+#                                      min_samples=min_samples,
+#                                      cluster_selection_epsilon=cluster_selection_epsilon,
+#                                      cluster_selection_method=cluster_selection_method,
+#                                      metric='precomputed',
+#                                      core_dist_n_jobs=cpu_count()
+#                                      )
+
+#     cluster_data = process_clustering_result(clustering, subreddits)
+#     isolates = clustering.labels_ == -1
+#     scoremat = mat[~isolates][:,~isolates]
+#     score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
+#     cluster_data.to_feather(output)
+#     silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
+#     silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
+#     silsampout = output.parent / ("silhouette_samples" + output.name)
+#     silhouette_samp.to_feather(silsampout)
+
+#     result = hdbscan_clustering_result(outpath=output,
+#                                        silhouette_samples=silsampout,
+#                                        silhouette_score=score,
+#                                        name=name,
+#                                        min_cluster_size=min_cluster_size,
+#                                        min_samples=min_samples,
+#                                        cluster_selection_epsilon=cluster_selection_epsilon,
+#                                        cluster_selection_method=cluster_selection_method,
+#                                        lsi_dimensions=lsi_dim,
+#                                        n_isolates=isolates.sum(),
+#                                        n_clusters=len(set(clustering.labels_))
+#                                    )
 
 
                                        
 
 
                                        
-    return(result)
-
-# for all runs we should try cluster_selection_epsilon = None
-# for terms we should try cluster_selection_epsilon around 0.56-0.66
-# for authors we should try cluster_selection_epsilon around 0.98-0.99
-def _hdbscan_clustering(mat, *args, **kwargs):
-    print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
-
-    print(mat)
-    clusterer = hdbscan.HDBSCAN(*args,
-                                **kwargs,
-                                )
+    return(result)
+
+# for all runs we should try cluster_selection_epsilon = None
+# for terms we should try cluster_selection_epsilon around 0.56-0.66
+# for authors we should try cluster_selection_epsilon around 0.98-0.99
+def _hdbscan_clustering(mat, *args, **kwargs):
+    print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
+
+    print(mat)
+    clusterer = hdbscan.HDBSCAN(*args,
+                                **kwargs,
+                                )
     
     
-    clustering = clusterer.fit(mat.astype('double'))
+    clustering = clusterer.fit(mat.astype('double'))
     
     
-    return(clustering)
+    return(clustering)
 
 def KNN_distances_plot(mat,outname,k=2):
     nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
 
 def KNN_distances_plot(mat,outname,k=2):
     nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
@@ -172,4 +293,10 @@ def make_KNN_plots():
     KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
 
 if __name__ == "__main__":
     KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
 
 if __name__ == "__main__":
-    fire.Fire(select_hdbscan_clustering)
+    fire.Fire{'grid_sweep':hdbscan_grid_sweep,
+              'grid_sweep_lsi':hdbscan_lsi_grid_sweep
+              'cluster':hdbscan_job,
+              'cluster_lsi':hdbscan_lsi_job}
+    
+#    test_select_hdbscan_clustering()
+    #fire.Fire(select_hdbscan_clustering)  
index 8822e9f0cd67c2ece76d9552d7c2c77883bac76f..e41b88bff810bcde4937b0b4dff83e5f827f8761 100644 (file)
@@ -4,98 +4,145 @@ from pathlib import Path
 from multiprocessing import cpu_count
 from dataclasses import dataclass
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
 from multiprocessing import cpu_count
 from dataclasses import dataclass
 from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
+from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
+
 
 @dataclass
 class kmeans_clustering_result(clustering_result):
     n_clusters:int
     n_init:int
 
 @dataclass
 class kmeans_clustering_result(clustering_result):
     n_clusters:int
     n_init:int
+    max_iter:int
 
 
-def kmeans_clustering(similarities, *args, **kwargs):
-    subreddits, mat = read_similarity_mat(similarities)
-    mat = sim_to_dist(mat)
-    clustering = _kmeans_clustering(mat, *args, **kwargs)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    return(cluster_data)
-
-def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
-
-    clustering = KMeans(n_clusters=n_clusters,
-                        n_init=n_init,
-                        max_iter=max_iter,
-                        random_state=random_state,
-                        verbose=verbose
-                        ).fit(mat)
-
-    return clustering
-
-def do_clustering(n_clusters, n_init, name, mat, subreddits,  max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
-    if name is None:
-        name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
-    print(name)
-    sys.stdout.flush()
-    outpath = outdir / (str(name) + ".feather")
-    print(outpath)
-    mat = sim_to_dist(mat)
-    clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
-
-    outpath.parent.mkdir(parents=True,exist_ok=True)
-    cluster_data.to_feather(outpath)
-    cluster_data = process_clustering_result(clustering, subreddits)
-
-    try: 
-        score = silhouette_score(mat, clustering.labels_, metric='precomputed')
-    except ValueError:
-        score = None
-
-    if alt_mat is not None:
-        alt_distances = sim_to_dist(alt_mat)
-        try:
-            alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
-        except ValueError:
-            alt_score = None
+@dataclass
+class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
+    pass
+
+class kmeans_job(clustering_job):
+    def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=kmeans_job._kmeans_clustering,
+                         n_clusters=n_clusters,
+                         n_init=n_init,
+                         max_iter=max_iter,
+                         random_state=random_state,
+                         verbose=verbose)
+
+        self.n_clusters=n_clusters
+        self.n_init=n_init
+        self.max_iter=max_iter
+
+    def _kmeans_clustering(mat, *args, **kwargs):
+
+        clustering = KMeans(*args,
+                            **kwargs,
+                            ).fit(mat)
+
+        return clustering
+
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = kmeans_clustering_result(**result.__dict__,
+                                               n_init=n_init,
+                                               max_iter=max_iter)
+        return self.result
+
+
+class kmeans_lsi_job(kmeans_job, lsi_mixin):
+    def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         *args,
+                         **kwargs)
+        super().set_lsi_dims(lsi_dims)
+
+    def get_info(self):
+        result = super().get_info()
+        self.result = kmeans_clustering_result_lsi(**result.__dict__,
+                                                   lsi_dimensions=self.lsi_dims)
+        return self.result
     
     
-    res = kmeans_clustering_result(outpath=outpath,
-                                   max_iter=max_iter,
-                                   n_clusters=n_clusters,
-                                   n_init = n_init,
-                                   silhouette_score=score,
-                                   alt_silhouette_score=score,
-                                   name=str(name))
-
-    return res
-
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
-
-    n_clusters = list(map(int,n_clusters))
-    n_init  = list(map(int,n_init))
-
-    if type(outdir) is str:
-        outdir = Path(outdir)
 
 
-    outdir.mkdir(parents=True,exist_ok=True)
+class kmeans_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+        super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
+
+    def namer(self,
+             n_clusters,
+             n_init,
+             max_iter):
+        return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
+
+class _kmeans_lsi_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 lsi_dim,
+                 *args,
+                 **kwargs):
+        self.lsi_dim = lsi_dim
+        self.jobtype = kmeans_lsi_job
+        super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
+
+    def namer(self, *args, **kwargs):
+        s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
+        s += f"_lsi-{self.lsi_dim}"
+        return s
+
+class kmeans_lsi_grid_sweep(lsi_grid_sweep):
+    def __init__(self,
+                 inpath,
+                 lsi_dims,
+                 outpath,
+                 n_clusters,
+                 n_inits,
+                 max_iters
+                 ):
+
+        super().__init__(kmeans_lsi_job,
+                         _kmeans_lsi_grid_sweep,
+                         inpath,
+                         lsi_dims,
+                         outpath,
+                         n_clusters,
+                         n_inits,
+                         max_iters)
+
+def test_select_kmeans_clustering():
+    # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+    #                           "test_hdbscan_author30k",
+    #                           min_cluster_sizes=[2],
+    #                           min_samples=[1,2],
+    #                           cluster_selection_epsilons=[0,0.05,0.1,0.15],
+    #                           cluster_selection_methods=['eom','leaf'],
+    #                           lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+    outpath = "test_kmeans";
+    n_clusters=[200,300,400];
+    n_init=[1,2,3];
+    max_iter=[100000]
+
+    gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
+    gs.run(1)
+
+    cluster_selection_epsilons=[0,0.1,0.3,0.5];
+    cluster_selection_methods=['eom'];
+    lsi_dimensions='all'
+    gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
+    gs.run(20)
+    gs.save("test_hdbscan/lsi_sweep.csv")
 
 
-    subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
-    if alt_similarities is not None:
-        alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
-    else:
-        alt_mat = None
-
-    # get list of tuples: the combinations of hyperparameters
-    hyper_grid = product(n_clusters, n_init)
-    hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
-    _do_clustering = partial(do_clustering,  mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
-
-    # call starmap
-    print("running clustering selection")
-    clustering_data = starmap(_do_clustering, hyper_grid)
-    clustering_data = pd.DataFrame(list(clustering_data))
-    clustering_data.to_csv(outinfo)
-    
-    return clustering_data
 
 if __name__ == "__main__":
 
 if __name__ == "__main__":
-    x = fire.Fire(select_kmeans_clustering)
+
+    fire.Fire{'grid_sweep':kmeans_grid_sweep,
+              'grid_sweep_lsi':kmeans_lsi_grid_sweep
+              'cluster':kmeans_job,
+              'cluster_lsi':kmeans_lsi_job}

Community Data Science Collective || Want to submit a patch?