]> code.communitydata.science - cdsc_reddit.git/blobdiff - clustering/affinity_clustering.py
lsi support for weekly similarities
[cdsc_reddit.git] / clustering / affinity_clustering.py
index 287f7e21e84b7ddf6ad87fda6b96431d9ceec922..737967e964b2cad824837be7a6da74bf74c4437b 100644 (file)
@@ -1,15 +1,12 @@
-from sklearn.metrics import silhouette_score
 from sklearn.cluster import AffinityPropagation
-from functools import partial
 from dataclasses import dataclass
-from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
-from multiprocessing  import Pool, cpu_count, Array, Process
+from clustering_base import clustering_result, clustering_job
+from grid_sweep import grid_sweep
 from pathlib import Path
 from itertools import product, starmap
-import numpy as np
-import pandas as pd
 import fire
 import sys
+import numpy as np
 
 # silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying. 
 @dataclass
@@ -17,116 +14,116 @@ class affinity_clustering_result(clustering_result):
     damping:float
     convergence_iter:int
     preference_quantile:float
-
-def affinity_clustering(similarities, output, *args, **kwargs):
-    subreddits, mat = read_similarity_mat(similarities)
-    clustering = _affinity_clustering(mat, *args, **kwargs)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    cluster_data['algorithm'] = 'affinity'
-    return(cluster_data)
-
-def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
-    '''
-    similarities: matrix of similarity scores
-    preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
-    damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author. 
-    '''
-    print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
-
-    preference = np.quantile(mat,preference_quantile)
-
-    print(f"preference is {preference}")
-    print("data loaded")
-    sys.stdout.flush()
-    clustering = AffinityPropagation(damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     copy=False,
-                                     preference=preference,
-                                     affinity='precomputed',
-                                     verbose=verbose,
-                                     random_state=random_state).fit(mat)
-
-    cluster_data = process_clustering_result(clustering, subreddits)
-    output = Path(output)
-    output.parent.mkdir(parents=True,exist_ok=True)
-    cluster_data.to_feather(output)
-    print(f"saved {output}")
-    return clustering
-
-
-def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits,  max_iter,  outdir:Path, random_state, verbose, alt_mat, overwrite=False):
-    if name is None:
-        name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
-    print(name)
-    sys.stdout.flush()
-    outpath = outdir / (str(name) + ".feather")
-    outpath.parent.mkdir(parents=True,exist_ok=True)
-    print(outpath)
-    clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
-    cluster_data = process_clustering_result(clustering, subreddits)
-    mat = sim_to_dist(clustering.affinity_matrix_)
-
-    try: 
-        score = silhouette_score(mat, clustering.labels_, metric='precomputed')
-    except ValueError:
-        score = None
-
-    if alt_mat is not None:
-        alt_distances = sim_to_dist(alt_mat)
-        try:
-            alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
-        except ValueError:
-            alt_score = None
+    preference:float
+    max_iter:int
+
+class affinity_job(clustering_job):
+    def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
+        super().__init__(infile,
+                         outpath,
+                         name,
+                         call=self._affinity_clustering,
+                         preference_quantile=preference_quantile,
+                         damping=damping,
+                         max_iter=max_iter,
+                         convergence_iter=convergence_iter,
+                         random_state=1968,
+                         verbose=verbose)
+        self.damping=damping
+        self.max_iter=max_iter
+        self.convergence_iter=convergence_iter
+        self.preference_quantile=preference_quantile
+
+    def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
+        mat = 1-mat
+        preference = np.quantile(mat, preference_quantile)
+        self.preference = preference
+        print(f"preference is {preference}")
+        print("data loaded")
+        sys.stdout.flush()
+        clustering = AffinityPropagation(*args,
+                                         preference=preference,
+                                         affinity='precomputed',
+                                         copy=False,
+                                         **kwargs).fit(mat)
+        return clustering
+
+    def get_info(self):
+        result = super().get_info()
+        self.result=affinity_clustering_result(**result.__dict__,
+                                               damping=self.damping,
+                                               max_iter=self.max_iter,
+                                               convergence_iter=self.convergence_iter,
+                                               preference_quantile=self.preference_quantile,
+                                               preference=self.preference)
+
+        return self.result
+
+class affinity_grid_sweep(grid_sweep):
+    def __init__(self,
+                 inpath,
+                 outpath,
+                 *args,
+                 **kwargs):
+
+        super().__init__(affinity_job,
+                         _afffinity_grid_sweep,
+                         inpath,
+                         outpath,
+                         self.namer,
+                         *args,
+                         **kwargs)
+    def namer(self,
+              damping,
+              max_iter,
+              convergence_iter,
+              preference_quantile):
+
+        return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
+
+def run_affinity_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5],n_cores=10):
+    """Run affinity clustering once or more with different parameters.
     
-    res = affinity_clustering_result(outpath=outpath,
-                                     damping=damping,
-                                     max_iter=max_iter,
-                                     convergence_iter=convergence_iter,
-                                     preference_quantile=preference_quantile,
-                                     silhouette_score=score,
-                                     alt_silhouette_score=score,
-                                     name=str(name))
-
-    return res
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-
-def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
-
-    damping = list(map(float,damping))
-    convergence_iter = convergence_iter = list(map(int,convergence_iter))
-    preference_quantile = list(map(float,preference_quantile))
-
-    if type(outdir) is str:
-        outdir = Path(outdir)
-
-    outdir.mkdir(parents=True,exist_ok=True)
-
-    subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
-    if alt_similarities is not None:
-        alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
-    else:
-        alt_mat = None
-
-    if J is None:
-        J = cpu_count()
-    pool = Pool(J)
-
-    # get list of tuples: the combinations of hyperparameters
-    hyper_grid = product(damping, convergence_iter, preference_quantile)
-    hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
-    _do_clustering = partial(do_clustering,  mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
+    Usage:
+    affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv>
+
+    Keword arguments:
+    savefile: path to save the metadata and diagnostics 
+    inpath: path to feather data containing a labeled matrix of subreddit similarities.
+    outpath: path to output fit kmeans clusterings.
+    dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering. 
+    preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
+    convergence_iters:one or more integers of number of iterations without improvement before stopping.
+    max_iters: one or more numbers of different maximum interations.
+    """
+    obj = affinity_grid_sweep(inpath,
+                         outpath,
+                         map(float,dampings),
+                         map(int,max_iters),
+                         map(int,convergence_iters),
+                         map(float,preference_quantiles))
+    obj.run(n_cores)
+    obj.save(savefile)
+    
+def test_select_affinity_clustering():
+    # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+    #                           "test_hdbscan_author30k",
+    #                           min_cluster_sizes=[2],
+    #                           min_samples=[1,2],
+    #                           cluster_selection_epsilons=[0,0.05,0.1,0.15],
+    #                           cluster_selection_methods=['eom','leaf'],
+    #                           lsi_dimensions='all')
+    inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+    outpath = "test_affinity";
+    dampings=[0.8,0.9]
+    max_iters=[100000]
+    convergence_iters=[15]
+    preference_quantiles=[0.5,0.7]
+    
+    gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
+    gs.run(20)
+    gs.save("test_affinity/lsi_sweep.csv")
 
-    #    similarities = Array('d', mat)
-    # call pool.starmap
-    print("running clustering selection")
-    clustering_data = pool.starmap(_do_clustering, hyper_grid)
-    clustering_data = pd.DataFrame(list(clustering_data))
-    clustering_data.to_csv(outinfo)
-    return clustering_data
 
 if __name__ == "__main__":
-    x = fire.Fire(select_affinity_clustering)
+    fire.Fire(run_affinity_grid_sweep)

Community Data Science Collective || Want to submit a patch?