X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/7df8436067dba9a9e6867424002d01593e4bcd25..8a2248fae1ee5818576b9a8f2849d1ad0efd8187:/clustering/clustering_base.py?ds=inline diff --git a/clustering/clustering_base.py b/clustering/clustering_base.py index 1d86438..5492415 100644 --- a/clustering/clustering_base.py +++ b/clustering/clustering_base.py @@ -2,6 +2,9 @@ from pathlib import Path import numpy as np import pandas as pd from dataclasses import dataclass +from sklearn.metrics import silhouette_score, silhouette_samples +from itertools import product, chain +from multiprocessing import Pool, cpu_count def sim_to_dist(mat): dist = 1-mat @@ -9,41 +12,147 @@ def sim_to_dist(mat): np.fill_diagonal(dist,0) return dist -def process_clustering_result(clustering, subreddits): +class grid_sweep: + def __init__(self, jobtype, inpath, outpath, namer, *args): + self.jobtype = jobtype + self.namer = namer + grid = list(product(*args)) + inpath = Path(inpath) + outpath = Path(outpath) + self.hasrun = False + self.grid = [(inpath,outpath,namer(*g)) + g for g in grid] + self.jobs = [jobtype(*g) for g in self.grid] - if hasattr(clustering,'n_iter_'): - print(f"clustering took {clustering.n_iter_} iterations") + def run(self, cores=20): + if cores is not None and cores > 1: + with Pool(cores) as pool: + infos = pool.map(self.jobtype.get_info, self.jobs) + else: + infos = map(self.jobtype.get_info, self.jobs) - clusters = clustering.labels_ + self.infos = pd.DataFrame(infos) + self.hasrun = True - print(f"found {len(set(clusters))} clusters") + def save(self, outcsv): + if not self.hasrun: + self.run() + outcsv = Path(outcsv) + outcsv.parent.mkdir(parents=True, exist_ok=True) + self.infos.to_csv(outcsv) - cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) - cluster_sizes = cluster_data.groupby("cluster").count().reset_index() - print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") +class lsi_grid_sweep(grid_sweep): + def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs): + self.jobtype = jobtype + self.subsweep = subsweep + inpath = Path(inpath) + if lsi_dimensions == 'all': + lsi_paths = list(inpath.glob("*")) + else: + lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] - print(f"the median cluster has {cluster_sizes.subreddit.median()} members") + lsi_nums = [p.stem for p in lsi_paths] + self.hasrun = False + self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)] + self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids))) - print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member") - print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True) +# this is meant to be an interface, not created directly +class clustering_job: + def __init__(self, infile, outpath, name, call, *args, **kwargs): + self.outpath = Path(outpath) + self.call = call + self.args = args + self.kwargs = kwargs + self.infile = Path(infile) + self.name = name + self.hasrun = False - return cluster_data + def run(self): + self.subreddits, self.mat = self.read_distance_mat(self.infile) + self.clustering = self.call(self.mat, *self.args, **self.kwargs) + self.cluster_data = self.process_clustering(self.clustering, self.subreddits) + self.score = self.silhouette() + self.outpath.mkdir(parents=True, exist_ok=True) + self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + self.hasrun = True + + def get_info(self): + if not self.hasrun: + self.run() + self.result = clustering_result(outpath=str(self.outpath.resolve()), + silhouette_score=self.score, + name=self.name, + n_clusters=self.n_clusters, + n_isolates=self.n_isolates, + silhouette_samples = str(self.silsampout.resolve()) + ) + return self.result + + def silhouette(self): + isolates = self.clustering.labels_ == -1 + scoremat = self.mat[~isolates][:,~isolates] + score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed') + silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed') + silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp}) + self.outpath.mkdir(parents=True, exist_ok=True) + self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather") + silhouette_samp.to_feather(self.silsampout) + return score + + def read_distance_mat(self, similarities, use_threads=True): + df = pd.read_feather(similarities, use_threads=use_threads) + mat = np.array(df.drop('_subreddit',1)) + n = mat.shape[0] + mat[range(n),range(n)] = 1 + return (df._subreddit,1-mat) + + def process_clustering(self, clustering, subreddits): + + if hasattr(clustering,'n_iter_'): + print(f"clustering took {clustering.n_iter_} iterations") + + clusters = clustering.labels_ + self.n_clusters = len(set(clusters)) + + print(f"found {self.n_clusters} clusters") + + cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) + + cluster_sizes = cluster_data.groupby("cluster").count().reset_index() + print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") + + print(f"the median cluster has {cluster_sizes.subreddit.median()} members") + n_isolates1 = (cluster_sizes.subreddit==1).sum() + + print(f"{n_isolates1} clusters have 1 member") + + n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']]) + + print(f"{n_isolates2} subreddits are in cluster -1",flush=True) + + if n_isolates1 == 0: + self.n_isolates = n_isolates2 + else: + self.n_isolates = n_isolates1 + + return cluster_data + + +class lsi_mixin(): + def set_lsi_dims(self, lsi_dims): + self.lsi_dims = lsi_dims @dataclass class clustering_result: outpath:Path - max_iter:int silhouette_score:float - alt_silhouette_score:float name:str n_clusters:int + n_isolates:int + silhouette_samples:str -def read_similarity_mat(similarities, use_threads=True): - df = pd.read_feather(similarities, use_threads=use_threads) - mat = np.array(df.drop('_subreddit',1)) - n = mat.shape[0] - mat[range(n),range(n)] = 1 - return (df._subreddit,mat) +@dataclass +class lsi_result_mixin: + lsi_dimensions:int