X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/47ba04aa9715325a67fe17cee205230b042022fe..8a2248fae1ee5818576b9a8f2849d1ad0efd8187:/clustering/clustering_base.py?ds=sidebyside diff --git a/clustering/clustering_base.py b/clustering/clustering_base.py new file mode 100644 index 0000000..5492415 --- /dev/null +++ b/clustering/clustering_base.py @@ -0,0 +1,158 @@ +from pathlib import Path +import numpy as np +import pandas as pd +from dataclasses import dataclass +from sklearn.metrics import silhouette_score, silhouette_samples +from itertools import product, chain +from multiprocessing import Pool, cpu_count + +def sim_to_dist(mat): + dist = 1-mat + dist[dist < 0] = 0 + np.fill_diagonal(dist,0) + return dist + +class grid_sweep: + def __init__(self, jobtype, inpath, outpath, namer, *args): + self.jobtype = jobtype + self.namer = namer + grid = list(product(*args)) + inpath = Path(inpath) + outpath = Path(outpath) + self.hasrun = False + self.grid = [(inpath,outpath,namer(*g)) + g for g in grid] + self.jobs = [jobtype(*g) for g in self.grid] + + def run(self, cores=20): + if cores is not None and cores > 1: + with Pool(cores) as pool: + infos = pool.map(self.jobtype.get_info, self.jobs) + else: + infos = map(self.jobtype.get_info, self.jobs) + + self.infos = pd.DataFrame(infos) + self.hasrun = True + + def save(self, outcsv): + if not self.hasrun: + self.run() + outcsv = Path(outcsv) + outcsv.parent.mkdir(parents=True, exist_ok=True) + self.infos.to_csv(outcsv) + + +class lsi_grid_sweep(grid_sweep): + def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs): + self.jobtype = jobtype + self.subsweep = subsweep + inpath = Path(inpath) + if lsi_dimensions == 'all': + lsi_paths = list(inpath.glob("*")) + else: + lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] + + lsi_nums = [p.stem for p in lsi_paths] + self.hasrun = False + self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)] + self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids))) + + +# this is meant to be an interface, not created directly +class clustering_job: + def __init__(self, infile, outpath, name, call, *args, **kwargs): + self.outpath = Path(outpath) + self.call = call + self.args = args + self.kwargs = kwargs + self.infile = Path(infile) + self.name = name + self.hasrun = False + + def run(self): + self.subreddits, self.mat = self.read_distance_mat(self.infile) + self.clustering = self.call(self.mat, *self.args, **self.kwargs) + self.cluster_data = self.process_clustering(self.clustering, self.subreddits) + self.score = self.silhouette() + self.outpath.mkdir(parents=True, exist_ok=True) + self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + self.hasrun = True + + def get_info(self): + if not self.hasrun: + self.run() + + self.result = clustering_result(outpath=str(self.outpath.resolve()), + silhouette_score=self.score, + name=self.name, + n_clusters=self.n_clusters, + n_isolates=self.n_isolates, + silhouette_samples = str(self.silsampout.resolve()) + ) + return self.result + + def silhouette(self): + isolates = self.clustering.labels_ == -1 + scoremat = self.mat[~isolates][:,~isolates] + score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed') + silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed') + silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp}) + self.outpath.mkdir(parents=True, exist_ok=True) + self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather") + silhouette_samp.to_feather(self.silsampout) + return score + + def read_distance_mat(self, similarities, use_threads=True): + df = pd.read_feather(similarities, use_threads=use_threads) + mat = np.array(df.drop('_subreddit',1)) + n = mat.shape[0] + mat[range(n),range(n)] = 1 + return (df._subreddit,1-mat) + + def process_clustering(self, clustering, subreddits): + + if hasattr(clustering,'n_iter_'): + print(f"clustering took {clustering.n_iter_} iterations") + + clusters = clustering.labels_ + self.n_clusters = len(set(clusters)) + + print(f"found {self.n_clusters} clusters") + + cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) + + cluster_sizes = cluster_data.groupby("cluster").count().reset_index() + print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") + + print(f"the median cluster has {cluster_sizes.subreddit.median()} members") + n_isolates1 = (cluster_sizes.subreddit==1).sum() + + print(f"{n_isolates1} clusters have 1 member") + + n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']]) + + print(f"{n_isolates2} subreddits are in cluster -1",flush=True) + + if n_isolates1 == 0: + self.n_isolates = n_isolates2 + else: + self.n_isolates = n_isolates1 + + return cluster_data + + +class lsi_mixin(): + def set_lsi_dims(self, lsi_dims): + self.lsi_dims = lsi_dims + +@dataclass +class clustering_result: + outpath:Path + silhouette_score:float + name:str + n_clusters:int + n_isolates:int + silhouette_samples:str + +@dataclass +class lsi_result_mixin: + lsi_dimensions:int