]> code.communitydata.science - cdsc_reddit.git/blobdiff - clustering/clustering_base.py
Merge remote-tracking branch 'origin/excise_reindex' into temp
[cdsc_reddit.git] / clustering / clustering_base.py
index 1d864383cb791c30d803de69aa281be8a966cafe..5492415671846392affed1261f9ede79919b9f27 100644 (file)
@@ -2,6 +2,9 @@ from pathlib import Path
 import numpy as np
 import pandas as pd
 from dataclasses import dataclass
+from sklearn.metrics import silhouette_score, silhouette_samples
+from itertools import product, chain
+from multiprocessing import Pool, cpu_count
 
 def sim_to_dist(mat):
     dist = 1-mat
@@ -9,41 +12,147 @@ def sim_to_dist(mat):
     np.fill_diagonal(dist,0)
     return dist
 
-def process_clustering_result(clustering, subreddits):
+class grid_sweep:
+    def __init__(self, jobtype, inpath, outpath, namer, *args):
+        self.jobtype = jobtype
+        self.namer = namer
+        grid = list(product(*args))
+        inpath = Path(inpath)
+        outpath = Path(outpath)
+        self.hasrun = False
+        self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
+        self.jobs = [jobtype(*g) for g in self.grid]
 
-    if hasattr(clustering,'n_iter_'):
-        print(f"clustering took {clustering.n_iter_} iterations")
+    def run(self, cores=20):
+        if cores is not None and cores > 1:
+            with Pool(cores) as pool:
+                infos = pool.map(self.jobtype.get_info, self.jobs)
+        else:
+            infos = map(self.jobtype.get_info, self.jobs)
 
-    clusters = clustering.labels_
+        self.infos = pd.DataFrame(infos)
+        self.hasrun = True
 
-    print(f"found {len(set(clusters))} clusters")
+    def save(self, outcsv):
+        if not self.hasrun:
+            self.run()
+        outcsv = Path(outcsv)
+        outcsv.parent.mkdir(parents=True, exist_ok=True)
+        self.infos.to_csv(outcsv)
 
-    cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
 
-    cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
-    print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+class lsi_grid_sweep(grid_sweep):
+    def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
+        self.jobtype = jobtype
+        self.subsweep = subsweep
+        inpath = Path(inpath)
+        if lsi_dimensions == 'all':
+            lsi_paths = list(inpath.glob("*"))
+        else:
+            lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
 
-    print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        lsi_nums = [p.stem for p in lsi_paths]
+        self.hasrun = False
+        self.subgrids = [self.subsweep(lsi_path, outpath,  lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
+        self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
 
-    print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
 
-    print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
+# this is meant to be an interface, not created directly
+class clustering_job:
+    def __init__(self, infile, outpath, name, call, *args, **kwargs):
+        self.outpath = Path(outpath)
+        self.call = call
+        self.args = args
+        self.kwargs = kwargs
+        self.infile = Path(infile)
+        self.name = name
+        self.hasrun = False
 
-    return cluster_data
+    def run(self):
+        self.subreddits, self.mat = self.read_distance_mat(self.infile)
+        self.clustering = self.call(self.mat, *self.args, **self.kwargs)
+        self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
+        self.score = self.silhouette()
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
+        self.hasrun = True
+        
+    def get_info(self):
+        if not self.hasrun:
+            self.run()
 
+        self.result = clustering_result(outpath=str(self.outpath.resolve()),
+                                        silhouette_score=self.score,
+                                        name=self.name,
+                                        n_clusters=self.n_clusters,
+                                        n_isolates=self.n_isolates,
+                                        silhouette_samples = str(self.silsampout.resolve())
+                                        )
+        return self.result
+
+    def silhouette(self):
+        isolates = self.clustering.labels_ == -1
+        scoremat = self.mat[~isolates][:,~isolates]
+        score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
+        silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
+        silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
+        self.outpath.mkdir(parents=True, exist_ok=True)
+        self.silsampout = self.outpath / ("silhouette_samples-" + self.name +  ".feather")
+        silhouette_samp.to_feather(self.silsampout)
+        return score
+
+    def read_distance_mat(self, similarities, use_threads=True):
+        df = pd.read_feather(similarities, use_threads=use_threads)
+        mat = np.array(df.drop('_subreddit',1))
+        n = mat.shape[0]
+        mat[range(n),range(n)] = 1
+        return (df._subreddit,1-mat)
+
+    def process_clustering(self, clustering, subreddits):
+
+        if hasattr(clustering,'n_iter_'):
+            print(f"clustering took {clustering.n_iter_} iterations")
+
+        clusters = clustering.labels_
+        self.n_clusters = len(set(clusters))
+
+        print(f"found {self.n_clusters} clusters")
+
+        cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
+
+        cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
+        print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
+
+        print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
+        n_isolates1 = (cluster_sizes.subreddit==1).sum()
+
+        print(f"{n_isolates1} clusters have 1 member")
+
+        n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])
+
+        print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
+
+        if n_isolates1 == 0:
+            self.n_isolates = n_isolates2
+        else:
+            self.n_isolates = n_isolates1
+
+        return cluster_data
+
+
+class lsi_mixin():
+    def set_lsi_dims(self, lsi_dims):
+        self.lsi_dims = lsi_dims
 
 @dataclass
 class clustering_result:
     outpath:Path
-    max_iter:int
     silhouette_score:float
-    alt_silhouette_score:float
     name:str
     n_clusters:int
+    n_isolates:int
+    silhouette_samples:str
 
-def read_similarity_mat(similarities, use_threads=True):
-    df = pd.read_feather(similarities, use_threads=use_threads)
-    mat = np.array(df.drop('_subreddit',1))
-    n = mat.shape[0]
-    mat[range(n),range(n)] = 1
-    return (df._subreddit,mat)
+@dataclass
+class lsi_result_mixin:
+    lsi_dimensions:int

Community Data Science Collective || Want to submit a patch?