X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/blobdiff_plain/f05cb962e0388feaf38aaf84f222696ab8f5f398..refs/heads/icwsm_dataverse:/clustering/clustering_base.py diff --git a/clustering/clustering_base.py b/clustering/clustering_base.py index 5492415..2f37b68 100644 --- a/clustering/clustering_base.py +++ b/clustering/clustering_base.py @@ -1,61 +1,10 @@ +import pickle from pathlib import Path import numpy as np import pandas as pd from dataclasses import dataclass from sklearn.metrics import silhouette_score, silhouette_samples -from itertools import product, chain -from multiprocessing import Pool, cpu_count - -def sim_to_dist(mat): - dist = 1-mat - dist[dist < 0] = 0 - np.fill_diagonal(dist,0) - return dist - -class grid_sweep: - def __init__(self, jobtype, inpath, outpath, namer, *args): - self.jobtype = jobtype - self.namer = namer - grid = list(product(*args)) - inpath = Path(inpath) - outpath = Path(outpath) - self.hasrun = False - self.grid = [(inpath,outpath,namer(*g)) + g for g in grid] - self.jobs = [jobtype(*g) for g in self.grid] - - def run(self, cores=20): - if cores is not None and cores > 1: - with Pool(cores) as pool: - infos = pool.map(self.jobtype.get_info, self.jobs) - else: - infos = map(self.jobtype.get_info, self.jobs) - - self.infos = pd.DataFrame(infos) - self.hasrun = True - - def save(self, outcsv): - if not self.hasrun: - self.run() - outcsv = Path(outcsv) - outcsv.parent.mkdir(parents=True, exist_ok=True) - self.infos.to_csv(outcsv) - - -class lsi_grid_sweep(grid_sweep): - def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs): - self.jobtype = jobtype - self.subsweep = subsweep - inpath = Path(inpath) - if lsi_dimensions == 'all': - lsi_paths = list(inpath.glob("*")) - else: - lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions] - - lsi_nums = [p.stem for p in lsi_paths] - self.hasrun = False - self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)] - self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids))) - +from collections import Counter # this is meant to be an interface, not created directly class clustering_job: @@ -72,10 +21,17 @@ class clustering_job: self.subreddits, self.mat = self.read_distance_mat(self.infile) self.clustering = self.call(self.mat, *self.args, **self.kwargs) self.cluster_data = self.process_clustering(self.clustering, self.subreddits) - self.score = self.silhouette() self.outpath.mkdir(parents=True, exist_ok=True) self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + self.hasrun = True + self.cleanup() + + def cleanup(self): + self.cluster_data = None + self.mat = None + self.clustering=None + self.subreddits=None def get_info(self): if not self.hasrun: @@ -86,24 +42,33 @@ class clustering_job: name=self.name, n_clusters=self.n_clusters, n_isolates=self.n_isolates, - silhouette_samples = str(self.silsampout.resolve()) + silhouette_samples = self.silsampout ) return self.result def silhouette(self): - isolates = self.clustering.labels_ == -1 + counts = Counter(self.clustering.labels_) + singletons = [key for key, value in counts.items() if value == 1] + isolates = (self.clustering.labels_ == -1) | (np.isin(self.clustering.labels_,np.array(singletons))) scoremat = self.mat[~isolates][:,~isolates] - score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed') - silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed') - silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp}) - self.outpath.mkdir(parents=True, exist_ok=True) - self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather") - silhouette_samp.to_feather(self.silsampout) + if self.n_clusters > 1: + score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed') + silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed') + silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp}) + self.outpath.mkdir(parents=True, exist_ok=True) + silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather") + self.silsampout = silsampout.resolve() + silhouette_samp.to_feather(self.silsampout) + else: + score = None + self.silsampout = None + return score def read_distance_mat(self, similarities, use_threads=True): + print(similarities) df = pd.read_feather(similarities, use_threads=use_threads) - mat = np.array(df.drop('_subreddit',1)) + mat = np.array(df.drop('_subreddit',axis=1)) n = mat.shape[0] mat[range(n),range(n)] = 1 return (df._subreddit,1-mat) @@ -117,9 +82,13 @@ class clustering_job: self.n_clusters = len(set(clusters)) print(f"found {self.n_clusters} clusters") - cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) + + self.score = self.silhouette() + print(f"silhouette_score:{self.score}") + + cluster_sizes = cluster_data.groupby("cluster").count().reset_index() print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") @@ -128,8 +97,9 @@ class clustering_job: print(f"{n_isolates1} clusters have 1 member") - n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']]) - + n_isolates2 = cluster_sizes.loc[cluster_sizes.cluster==-1,:]['subreddit'].to_list() + if len(n_isolates2) > 0: + n_isloates2 = n_isolates2[0] print(f"{n_isolates2} subreddits are in cluster -1",flush=True) if n_isolates1 == 0: @@ -139,10 +109,37 @@ class clustering_job: return cluster_data +class twoway_clustering_job(clustering_job): + def __init__(self, infile, outpath, name, call1, call2, args1, args2): + self.outpath = Path(outpath) + self.call1 = call1 + self.args1 = args1 + self.call2 = call2 + self.args2 = args2 + self.infile = Path(infile) + self.name = name + self.hasrun = False + self.args = args1|args2 + + def run(self): + self.subreddits, self.mat = self.read_distance_mat(self.infile) + self.step1 = self.call1(self.mat, **self.args1) + self.clustering = self.call2(self.mat, self.step1, **self.args2) + self.cluster_data = self.process_clustering(self.clustering, self.subreddits) + self.hasrun = True + self.after_run() + self.cleanup() + + def after_run(self): + self.score = self.silhouette() + self.outpath.mkdir(parents=True, exist_ok=True) + print(self.outpath/(self.name+".feather")) + self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + -class lsi_mixin(): - def set_lsi_dims(self, lsi_dims): - self.lsi_dims = lsi_dims + def cleanup(self): + super().cleanup() + self.step1 = None @dataclass class clustering_result: @@ -152,7 +149,3 @@ class clustering_result: n_clusters:int n_isolates:int silhouette_samples:str - -@dataclass -class lsi_result_mixin: - lsi_dimensions:int