-from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
-from functools import partial
from dataclasses import dataclass
-from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
-from multiprocessing import Pool, cpu_count, Array, Process
+from clustering_base import clustering_result, clustering_job
+from grid_sweep import grid_sweep
from pathlib import Path
from itertools import product, starmap
-import numpy as np
-import pandas as pd
import fire
import sys
+import numpy as np
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
damping:float
convergence_iter:int
preference_quantile:float
-
-def affinity_clustering(similarities, output, *args, **kwargs):
- subreddits, mat = read_similarity_mat(similarities)
- clustering = _affinity_clustering(mat, *args, **kwargs)
- cluster_data = process_clustering_result(clustering, subreddits)
- cluster_data['algorithm'] = 'affinity'
- return(cluster_data)
-
-def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
- '''
- similarities: matrix of similarity scores
- preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
- damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
- '''
- print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
-
- preference = np.quantile(mat,preference_quantile)
-
- print(f"preference is {preference}")
- print("data loaded")
- sys.stdout.flush()
- clustering = AffinityPropagation(damping=damping,
- max_iter=max_iter,
- convergence_iter=convergence_iter,
- copy=False,
- preference=preference,
- affinity='precomputed',
- verbose=verbose,
- random_state=random_state).fit(mat)
-
- cluster_data = process_clustering_result(clustering, subreddits)
- output = Path(output)
- output.parent.mkdir(parents=True,exist_ok=True)
- cluster_data.to_feather(output)
- print(f"saved {output}")
- return clustering
-
-
-def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
- if name is None:
- name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
- print(name)
- sys.stdout.flush()
- outpath = outdir / (str(name) + ".feather")
- outpath.parent.mkdir(parents=True,exist_ok=True)
- print(outpath)
- clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
- cluster_data = process_clustering_result(clustering, subreddits)
- mat = sim_to_dist(clustering.affinity_matrix_)
-
- try:
- score = silhouette_score(mat, clustering.labels_, metric='precomputed')
- except ValueError:
- score = None
-
- if alt_mat is not None:
- alt_distances = sim_to_dist(alt_mat)
- try:
- alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
- except ValueError:
- alt_score = None
+ preference:float
+ max_iter:int
+
+class affinity_job(clustering_job):
+ def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
+ super().__init__(infile,
+ outpath,
+ name,
+ call=self._affinity_clustering,
+ preference_quantile=preference_quantile,
+ damping=damping,
+ max_iter=max_iter,
+ convergence_iter=convergence_iter,
+ random_state=1968,
+ verbose=verbose)
+ self.damping=damping
+ self.max_iter=max_iter
+ self.convergence_iter=convergence_iter
+ self.preference_quantile=preference_quantile
+
+ def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
+ mat = 1-mat
+ preference = np.quantile(mat, preference_quantile)
+ self.preference = preference
+ print(f"preference is {preference}")
+ print("data loaded")
+ sys.stdout.flush()
+ clustering = AffinityPropagation(*args,
+ preference=preference,
+ affinity='precomputed',
+ copy=False,
+ **kwargs).fit(mat)
+ return clustering
+
+ def get_info(self):
+ result = super().get_info()
+ self.result=affinity_clustering_result(**result.__dict__,
+ damping=self.damping,
+ max_iter=self.max_iter,
+ convergence_iter=self.convergence_iter,
+ preference_quantile=self.preference_quantile,
+ preference=self.preference)
+
+ return self.result
+
+class affinity_grid_sweep(grid_sweep):
+ def __init__(self,
+ inpath,
+ outpath,
+ *args,
+ **kwargs):
+
+ super().__init__(affinity_job,
+ _afffinity_grid_sweep,
+ inpath,
+ outpath,
+ self.namer,
+ *args,
+ **kwargs)
+ def namer(self,
+ damping,
+ max_iter,
+ convergence_iter,
+ preference_quantile):
+
+ return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
+
+def run_affinity_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5],n_cores=10):
+ """Run affinity clustering once or more with different parameters.
- res = affinity_clustering_result(outpath=outpath,
- damping=damping,
- max_iter=max_iter,
- convergence_iter=convergence_iter,
- preference_quantile=preference_quantile,
- silhouette_score=score,
- alt_silhouette_score=score,
- name=str(name))
-
- return res
-
-# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
-
-def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
-
- damping = list(map(float,damping))
- convergence_iter = convergence_iter = list(map(int,convergence_iter))
- preference_quantile = list(map(float,preference_quantile))
-
- if type(outdir) is str:
- outdir = Path(outdir)
-
- outdir.mkdir(parents=True,exist_ok=True)
-
- subreddits, mat = read_similarity_mat(similarities,use_threads=True)
-
- if alt_similarities is not None:
- alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
- else:
- alt_mat = None
-
- if J is None:
- J = cpu_count()
- pool = Pool(J)
-
- # get list of tuples: the combinations of hyperparameters
- hyper_grid = product(damping, convergence_iter, preference_quantile)
- hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
-
- _do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
+ Usage:
+ affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv>
+
+ Keword arguments:
+ savefile: path to save the metadata and diagnostics
+ inpath: path to feather data containing a labeled matrix of subreddit similarities.
+ outpath: path to output fit kmeans clusterings.
+ dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering.
+ preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
+ convergence_iters:one or more integers of number of iterations without improvement before stopping.
+ max_iters: one or more numbers of different maximum interations.
+ """
+ obj = affinity_grid_sweep(inpath,
+ outpath,
+ map(float,dampings),
+ map(int,max_iters),
+ map(int,convergence_iters),
+ map(float,preference_quantiles))
+ obj.run(n_cores)
+ obj.save(savefile)
+
+def test_select_affinity_clustering():
+ # select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
+ # "test_hdbscan_author30k",
+ # min_cluster_sizes=[2],
+ # min_samples=[1,2],
+ # cluster_selection_epsilons=[0,0.05,0.1,0.15],
+ # cluster_selection_methods=['eom','leaf'],
+ # lsi_dimensions='all')
+ inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
+ outpath = "test_affinity";
+ dampings=[0.8,0.9]
+ max_iters=[100000]
+ convergence_iters=[15]
+ preference_quantiles=[0.5,0.7]
+
+ gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
+ gs.run(20)
+ gs.save("test_affinity/lsi_sweep.csv")
- # similarities = Array('d', mat)
- # call pool.starmap
- print("running clustering selection")
- clustering_data = pool.starmap(_do_clustering, hyper_grid)
- clustering_data = pd.DataFrame(list(clustering_data))
- clustering_data.to_csv(outinfo)
- return clustering_data
if __name__ == "__main__":
- x = fire.Fire(select_affinity_clustering)
+ fire.Fire(run_affinity_grid_sweep)