-from itertools import product, chain
-from multiprocessing import Pool, cpu_count
-
-def sim_to_dist(mat):
- dist = 1-mat
- dist[dist < 0] = 0
- np.fill_diagonal(dist,0)
- return dist
-
-class grid_sweep:
- def __init__(self, jobtype, inpath, outpath, namer, *args):
- self.jobtype = jobtype
- self.namer = namer
- grid = list(product(*args))
- inpath = Path(inpath)
- outpath = Path(outpath)
- self.hasrun = False
- self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
- self.jobs = [jobtype(*g) for g in self.grid]
-
- def run(self, cores=20):
- if cores is not None and cores > 1:
- with Pool(cores) as pool:
- infos = pool.map(self.jobtype.get_info, self.jobs)
- else:
- infos = map(self.jobtype.get_info, self.jobs)
-
- self.infos = pd.DataFrame(infos)
- self.hasrun = True
-
- def save(self, outcsv):
- if not self.hasrun:
- self.run()
- outcsv = Path(outcsv)
- outcsv.parent.mkdir(parents=True, exist_ok=True)
- self.infos.to_csv(outcsv)
-
-
-class lsi_grid_sweep(grid_sweep):
- def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
- self.jobtype = jobtype
- self.subsweep = subsweep
- inpath = Path(inpath)
- if lsi_dimensions == 'all':
- lsi_paths = list(inpath.glob("*"))
- else:
- lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
-
- lsi_nums = [p.stem for p in lsi_paths]
- self.hasrun = False
- self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
- self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
-