From 808a6d00523922499ec5e67883828e2a1e0d1796 Mon Sep 17 00:00:00 2001 From: Imran McGrath <31265677+patrickimran@users.noreply.github.com> Date: Fri, 14 Aug 2020 11:58:19 -0700 Subject: [PATCH] Orchestrate (#14) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * added orchestrator and template * class for parameter grids * Added entry points support for custom models with Pipeline Co-authored-by: gwarmstrong * Adds ability to set_params of custom models (Pipelines) * Adds a _parameters.py containing defined parameter grids * parameter grids * parameter get * file locating bug fixed * added templates * template issues * orchestrator json fixes in template * Final Random Forest grids * more updates to parameter grid for RF * grid updates * randomized parameters, minor template tweak * Added template to setup * remove extra template * reduced param grids for rf * update to template and preprocess bugfix * Option for reduced parameter grid, where defined * option flags * (untested) adds force option, info txt * tested previous additions * Fixed result-skipping behavior, removed lgbm * checking for existing results fix * specify intel in resource list * Update q2_mlab/orchestrator.py Co-authored-by: Yoshiki Vázquez Baeza Co-authored-by: gwarmstrong Co-authored-by: Patrick McGrath Co-authored-by: Patrick McGrath Co-authored-by: Patrick McGrath Co-authored-by: Yoshiki Vázquez Baeza --- q2_mlab/__init__.py | 2 + q2_mlab/_parameters.py | 103 ++++++++++++ q2_mlab/_preprocess.py | 5 +- q2_mlab/learningtask.py | 38 ++++- q2_mlab/orchestrator.py | 215 ++++++++++++++++++++++++ q2_mlab/templates/array_job_template.sh | 82 +++++++++ q2_mlab/templates/info.txt | 2 + setup.py | 10 +- 8 files changed, 442 insertions(+), 15 deletions(-) create mode 100644 q2_mlab/_parameters.py create mode 100644 q2_mlab/orchestrator.py create mode 100644 q2_mlab/templates/array_job_template.sh create mode 100644 q2_mlab/templates/info.txt diff --git a/q2_mlab/__init__.py b/q2_mlab/__init__.py index db24c20..c22fb4b 100644 --- a/q2_mlab/__init__.py +++ b/q2_mlab/__init__.py @@ -12,6 +12,7 @@ from .learningtask import LearningTask, ClassificationTask, RegressionTask from ._type import Target, Results from ._format import ResultsDirectoryFormat, ResultsFormat +from ._parameters import ParameterGrids __version__ = get_versions()["version"] @@ -28,4 +29,5 @@ "LearningTask", "ClassificationTask", "RegressionTask", + "ParameterGrids", ] diff --git a/q2_mlab/_parameters.py b/q2_mlab/_parameters.py new file mode 100644 index 0000000..d4435f7 --- /dev/null +++ b/q2_mlab/_parameters.py @@ -0,0 +1,103 @@ +import numpy as np +from sklearn.model_selection import ParameterGrid + + +class ParameterGrids: + def get(algorithm): + grids = { + "LinearSVC": { + 'penalty': ['l2'], + 'tol': [1e-4, 1e-3, 1e-2, 1e-1], + 'loss': ['hinge', 'squared_hinge'], + 'random_state': [2018] + }, + "LinearSVR": { + "C": [1e-4, 1e-2, 1e-1, 1e0, 1e1, 1e2, 1e4], + "epsilon": [1e-2, 1e-1, 0, 1], + "loss": ["squared_epsilon_insensitive", "epsilon_insensitive"], + "random_state": [2018], + }, + "RidgeClassifier": { + "alpha": [1e-15, 1e-10, 1e-8, 1e-4], + "fit_intercept": [True], + "normalize": [True, False], + "tol": [1e-1, 1e-2, 1e-3], + "solver": [ + "svd", + "cholesky", + "lsqr", + "sparse_cg", + "sag", + "saga", + ], + "random_state": [2018], + }, + "RidgeRegressor": { + "alpha": [1e-15, 1e-10, 1e-8, 1e-4], + "fit_intercept": [True], + "normalize": [True, False], + "tol": [1e-1, 1e-2, 1e-3], + "solver": [ + "svd", + "cholesky", + "lsqr", + "sparse_cg", + "sag", + "saga", + ], + "random_state": [2018], + }, + "RandomForestClassifier": { + "n_estimators": [1000, 5000], + "criterion": ["gini", "entropy"], + "max_features": ["sqrt", "log2", None] + list(np.arange(0.2, 1, 0.2)), + "max_samples": [0.25, 0.5, 0.75, None], + "max_depth": [None], + "n_jobs": [-1], + "random_state": [2020], + "bootstrap": [True], + "min_samples_split": list(np.arange(0.2, 1, 0.2)) + [2], + "min_samples_leaf": list(np.arange(0.01, 0.5, 0.2)) + [1], + }, + "RandomForestRegressor": { + 'n_estimators': [1000, 5000], + 'criterion': ['mse', 'mae'], + "max_features": ["sqrt", "log2", None] + list(np.arange(0.2, 1, 0.2)), + "max_samples": [0.25, 0.5, 0.75, None], + 'max_depth': [None], + 'n_jobs': [-1], + 'random_state': [2020], + 'bootstrap': [True], + 'min_samples_split': list(np.arange(0.2, 1, 0.2)) + [2], + 'min_samples_leaf': list(np.arange(0.01, .5, 0.2)) + [1], + }, + } + return grids[algorithm] + + def get_reduced(algorithm): + grids = { + "RandomForestClassifier": { + "n_estimators": [5000], + "criterion": ["gini"], + "max_features": ["sqrt", "log2", None] + list(np.arange(0.2, 1, 0.2)), + "max_samples": [0.25, 0.5, 0.75, None], + "max_depth": [None], + "n_jobs": [-1], + "random_state": [2020], + "bootstrap": [True], + }, + "RandomForestRegressor": { + 'n_estimators': [5000], + 'criterion': ['mse'], + "max_features": ["sqrt", "log2", None] + list(np.arange(0.2, 1, 0.2)), + "max_samples": [0.25, 0.5, 0.75, None], + 'max_depth': [None], + 'n_jobs': [-1], + 'random_state': [2020], + 'bootstrap': [True], + }, + } + return grids[algorithm] + + def get_size(algorithm): + return len(list(ParameterGrid(ParameterGrids.get(algorithm)))) diff --git a/q2_mlab/_preprocess.py b/q2_mlab/_preprocess.py index 8cfd39b..9ee5b6b 100644 --- a/q2_mlab/_preprocess.py +++ b/q2_mlab/_preprocess.py @@ -59,7 +59,8 @@ def preprocess( initial_ids_to_keep = table.view(biom.Table).ids() table_id_set = set(initial_ids_to_keep) metadata_id_set = set(metadata.ids) - num_shared_ids = len(table_id_set.intersection(metadata_id_set)) + shared_ids = table_id_set.intersection(metadata_id_set) + num_shared_ids = len(shared_ids) if num_shared_ids == 0: raise ValueError("No sample IDs are shared between Table and Metadata") print( @@ -69,7 +70,7 @@ def preprocess( # Filter metadata by samples in table print("Filtering Metadata by samples in table") - filteredmetadata = metadata.filter_ids(ids_to_keep=initial_ids_to_keep) + filteredmetadata = metadata.filter_ids(ids_to_keep=shared_ids) print_datasize(table, filteredmetadata) # Filter samples from metadata where NaN in target_variable column diff --git a/q2_mlab/learningtask.py b/q2_mlab/learningtask.py index f23bd8d..fcf6928 100644 --- a/q2_mlab/learningtask.py +++ b/q2_mlab/learningtask.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd import time +import pkg_resources from abc import ABC # CV Methods @@ -21,6 +22,7 @@ ) # Algorithms +from sklearn.pipeline import Pipeline from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier from sklearn.linear_model import RidgeClassifier, Ridge from xgboost import XGBRegressor, XGBClassifier @@ -39,7 +41,6 @@ ) from sklearn.mixture import BayesianGaussianMixture from sklearn.naive_bayes import ComplementNB -from lightgbm import LGBMClassifier, LGBMRegressor from sklearn.neural_network import MLPClassifier, MLPRegressor from sklearn.linear_model import ElasticNet, Lasso @@ -47,6 +48,11 @@ class LearningTask(ABC): algorithms = {} + def iter_entry_points(cls): + for entry_point in pkg_resources.iter_entry_points( + group='q2_mlab.models'): + yield entry_point + def __init__( self, table, @@ -56,12 +62,26 @@ def __init__( n_repeats, distance_matrix=None, ): - self.distance_matrix = distance_matrix + # Add any custom algorithms from entry points + for entry_point in self.iter_entry_points(): + name = entry_point.name + method = entry_point.load() + self.algorithms.update({name: method}) + + self.learner = self.algorithms[algorithm] + print(params) self.params = json.loads(params) + if isinstance(self.learner, Pipeline): + # Assumes that the last step in the pipeline is the model: + prefix = list(self.learner.named_steps)[-1] + "__" + # And adds the prefix of that last step to our param dict's keys + # so we can access that step's parameters. + newparams = {prefix + key: val for key, val in self.params.items()} + self.params = newparams self.X = table.transpose().matrix_data self.metadata = metadata self.y = self.metadata.to_numpy() - self.learner = self.algorithms[algorithm] + self.distance_matrix = distance_matrix self.cv_idx = 0 self.idx = 0 self.n_repeats = n_repeats @@ -109,7 +129,6 @@ class ClassificationTask(LearningTask): "BaggingClassifier": BaggingClassifier, "ExtraTreesClassifier": ExtraTreesClassifier, "HistGradientBoostingClassifier": HistGradientBoostingClassifier, - "LGBMClassifier": LGBMClassifier, "BayesianGaussianMixture": BayesianGaussianMixture, "ComplementNB": ComplementNB, "BayesianGaussianMixture": BayesianGaussianMixture, @@ -150,7 +169,8 @@ def cv_fold(self, train_index, test_index): # Start timing start = time.process_time() - model = self.learner(**self.params) + model = self.learner() + model.set_params(**self.params) model.fit(X_train, y_train) y_pred = model.predict(X_test) # End timimg @@ -225,7 +245,6 @@ class RegressionTask(LearningTask): "BaggingRegressor": BaggingRegressor, "ExtraTreesRegressor": ExtraTreesRegressor, "HistGradientBoostingRegressor": HistGradientBoostingRegressor, - "LGBMRegressor": LGBMRegressor, "LinearSVR": LinearSVR, "RidgeRegressor": Ridge, "MLPRegressor": MLPRegressor, @@ -263,9 +282,10 @@ def cv_fold(self, train_index, test_index): # Start timing start = time.process_time() - m = self.learner(**self.params) - m.fit(X_train, y_train) - y_pred = m.predict(X_test) + model = self.learner() + model.set_params(**self.params) + model.fit(X_train, y_train) + y_pred = model.predict(X_test) # End timimg end = time.process_time() diff --git a/q2_mlab/orchestrator.py b/q2_mlab/orchestrator.py new file mode 100644 index 0000000..13e8140 --- /dev/null +++ b/q2_mlab/orchestrator.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python +import json +import math +import click +import random +from os import path, makedirs +from sklearn.model_selection import ParameterGrid +from jinja2 import Environment, FileSystemLoader +from q2_mlab import RegressionTask, ClassificationTask, ParameterGrids + + +@click.command() +@click.argument('dataset') +@click.argument('preparation') +@click.argument('target') +@click.argument('algorithm',) +@click.option( + '--base_dir', '-b', + help="Directory to search for datasets in", +) +@click.option( + '--repeats', '-r', + default=3, + help="Number of CV repeats", +) +@click.option( + '--ppn', + default=1, + help="Processors per node for job script", +) +@click.option( + '--memory', + default=32, + help="GB of memory for job script", +) +@click.option( + '--wall', + default=50, + help="Walltime in hours for job script", +) +@click.option( + '--chunk_size', + default=100, + help="Number of params to run in one job for job script", +) +@click.option( + '--randomize/--no-randomize', + default=True, + help="Randomly shuffle the order of the hyperparameter list", +) +@click.option( + '--reduced/--no-reduced', + default=False, + help="If a reduced parameter grid is available, run the reduced grid.", +) +@click.option( + '--force/--no-force', + default=False, + help="Overwrite existing results.", +) +def cli( + dataset, + preparation, + target, + algorithm, + repeats, + base_dir, + ppn, + memory, + wall, + chunk_size, + randomize, + reduced, + force +): + classifiers = set(RegressionTask.algorithms.keys()) + regressors = set(ClassificationTask.algorithms.keys()) + valid_algorithms = classifiers.union(regressors) + ALGORITHM = algorithm + if ALGORITHM not in valid_algorithms: + raise ValueError( + "Unrecognized algorithm passed. Algorithms must be one of the " + "following: \n" + str(valid_algorithms) + ) + + if reduced: + try: + algorithm_parameters = ParameterGrids.get_reduced(ALGORITHM) + except KeyError: + print( + f'{ALGORITHM} does not have a reduced grid implemented grid ' + 'in mlab.ParameterGrids' + ) + raise + + else: + try: + algorithm_parameters = ParameterGrids.get(ALGORITHM) + except KeyError: + print( + f'{ALGORITHM} does not have a grid implemented in ' + 'mlab.ParameterGrids' + ) + raise + + PPN = ppn + N_REPEATS = repeats + GB_MEM = memory + WALLTIME_HRS = wall + CHUNK_SIZE = chunk_size + JOB_NAME = "_".join([dataset, preparation, target, ALGORITHM]) + FORCE = str(force).lower() + + TABLE_FP = path.join( + base_dir, dataset, preparation, target, "filtered_rarefied_table.qza" + ) + if not path.exists(TABLE_FP): + raise FileNotFoundError( + "Table was not found at the expected path: " + + TABLE_FP + ) + + METADATA_FP = path.join( + base_dir, dataset, preparation, target, "filtered_metadata.qza" + ) + if not path.exists(METADATA_FP): + raise FileNotFoundError( + "Metadata was not found at the expected path: " + + TABLE_FP + ) + + RESULTS_DIR = path.join( + base_dir, dataset, preparation, target, ALGORITHM + ) + if not path.isdir(RESULTS_DIR): + makedirs(RESULTS_DIR) + + BARNACLE_OUT_DIR = path.join(base_dir, dataset, "barnacle_output/") + if not path.isdir(BARNACLE_OUT_DIR): + makedirs(BARNACLE_OUT_DIR) + + params = list(ParameterGrid(algorithm_parameters)) + params_list = [json.dumps(param_dict) for param_dict in params] + PARAMS_FP = path.join(RESULTS_DIR, ALGORITHM + "_parameters.txt") + N_PARAMS = len(params_list) + N_CHUNKS = math.ceil(N_PARAMS/CHUNK_SIZE) + REMAINDER = N_PARAMS % CHUNK_SIZE + + random.seed(2021) + if randomize: + random.shuffle(params_list) + + mlab_dir = path.dirname(path.abspath(__file__)) + env = Environment( + loader=FileSystemLoader(path.join(mlab_dir, 'templates')) + ) + job_template = env.get_template('array_job_template.sh') + info_template = env.get_template('info.txt') + + output_from_job_template = job_template.render( + JOB_NAME=JOB_NAME, + STD_ERR_OUT=BARNACLE_OUT_DIR, + PPN=PPN, + GB_MEM=GB_MEM, + WALLTIME_HRS=WALLTIME_HRS, + PARAMS_FP=PARAMS_FP, + CHUNK_SIZE=CHUNK_SIZE, + TABLE_FP=TABLE_FP, + METADATA_FP=METADATA_FP, + ALGORITHM=ALGORITHM, + N_REPEATS=N_REPEATS, + RESULTS_DIR=RESULTS_DIR, + FORCE_OVERWRITE=FORCE, + ) + + output_from_info_template = info_template.render( + PARAMS_FP=PARAMS_FP, + CHUNK_SIZE=CHUNK_SIZE, + N_PARAMS=N_PARAMS, + REMAINDER=REMAINDER, + N_CHUNKS=N_CHUNKS + ) + + output_script = path.join( + base_dir, + dataset, + "_".join([preparation, target, ALGORITHM]) + ".sh" + ) + + info_doc = path.join( + base_dir, + dataset, + "_".join([preparation, target, ALGORITHM]) + "_info.txt" + ) + + print(output_script) + # print(output_from_job_template) + print("##########################") + print("Number of parameters: " + str(len(params_list))) + print(f"Max number of jobs with chunk size {CHUNK_SIZE}: " + str(N_CHUNKS)) + with open(info_doc, "w") as fh: + fh.write(output_from_info_template) + print("Saved info to: " + info_doc) + with open(PARAMS_FP, 'w') as fh: + i = 1 + for p in params_list: + fh.write(str(i).zfill(4)+"\t"+p+"\n") + i += 1 + print("Saved params to: " + PARAMS_FP) + with open(output_script, "w") as fh: + fh.write(output_from_job_template) + print("Saved to: " + output_script) + +if __name__ == "__main__": + cli() diff --git a/q2_mlab/templates/array_job_template.sh b/q2_mlab/templates/array_job_template.sh new file mode 100644 index 0000000..07f780f --- /dev/null +++ b/q2_mlab/templates/array_job_template.sh @@ -0,0 +1,82 @@ +#PBS -V + +### Set the job name +#PBS -N {{ JOB_NAME }} + +### Join the output and error streams as standard output +#PBS -j oe + +### Specify output directory +#PBS -o {{ STD_ERR_OUT }} + +### Declares the shell that interprets the job script +#PBS -S /bin/sh + +### Specify the number of cpus for your job. 1 processor on 1 node for benchmarking +### Also specifies to pick any node brncl-01 through brncl-32 +#PBS -l nodes=1:ppn={{ PPN }}:intel + +### Tell PBS how much memory you expct to use. Use units of 'b','kb', 'mb' or 'gb'. +#PBS -l mem={{ GB_MEM }}gb + +### Tell PBS the anticipated run-time for your job, where walltime=HH:MM:SS +#PBS -l walltime={{ WALLTIME_HRS }}:00:00 + +### Switch to the working directory; by default TORQUE launches processes +### from your home directory. + +cd $PBS_O_WORKDIR +source activate qiime2-2020.6 +echo Working directory is $PBS_O_WORKDIR + +# Calculate the number of processors allocated to this run. +NPROCS=`wc -l < $PBS_NODEFILE` + +# Calculate the number of nodes allocated. +NNODES=`uniq $PBS_NODEFILE | wc -l` + +### Display the job context +echo Running on host `hostname` +echo Time is `date` +echo Directory is `pwd` +echo Using ${NPROCS} processors across ${NNODES} nodes + +input={{ PARAMS_FP }} +chunk_size={{ CHUNK_SIZE }} + +# Calculate chunk sizes and the index of the last chunk +max_size="$(wc -l <${input})" +max_n_chunks=$((($max_size/$chunk_size)+1)) +offset=$(($chunk_size * $PBS_ARRAYID)) + +# If we are on the last possible chunk, set its size +# to the number of remaining params +if [ ${PBS_ARRAYID} = ${max_n_chunks} ] +then + final_chunk_size=$(($max_size%$chunk_size)) + previous_offset=$(($chunk_size * ($PBS_ARRAYID-1))) + offset=$(($previous_offset+$final_chunk_size)) + chunk_size=$final_chunk_size +fi + +echo Chunk number ${PBS_ARRAYID} size is ${chunk_size} + +head -n $offset $input | tail -n $chunk_size > subset${PBS_ARRAYID}.list + +FORCE={{ FORCE_OVERWRITE }} +while IFS=$'\t' read -r idx params +do + RESULTS={{ RESULTS_DIR }}/${idx}_chunk_${PBS_ARRAYID} + if [[ -f $RESULTS.qza && ${FORCE} = false ]] + then + echo $RESULTS already exists, execution skipped + else + qiime mlab unit-benchmark --i-table {{ TABLE_FP }} \ + --i-metadata {{ METADATA_FP }} \ + --p-algorithm {{ ALGORITHM }} \ + --p-params "${params}" \ + --p-n-repeats {{ N_REPEATS }} \ + --o-result-table ${RESULTS} \ + --verbose + fi +done < subset${PBS_ARRAYID}.list diff --git a/q2_mlab/templates/info.txt b/q2_mlab/templates/info.txt new file mode 100644 index 0000000..a9ec017 --- /dev/null +++ b/q2_mlab/templates/info.txt @@ -0,0 +1,2 @@ +parameters_fp, parameter_space_size, chunk_size, remainder, n_chunks +{{ PARAMS_FP }}, {{ N_PARAMS }}, {{ CHUNK_SIZE }}, {{ REMAINDER }}, {{ N_CHUNKS }} \ No newline at end of file diff --git a/setup.py b/setup.py index ba105cb..9e04678 100644 --- a/setup.py +++ b/setup.py @@ -20,10 +20,12 @@ license='BSD-3-Clause', url="https://qiime2.org", entry_points={ - 'qiime2.plugins': - ['q2-mlab=q2_mlab.plugin_setup:plugin'] + 'qiime2.plugins': ['q2-mlab=q2_mlab.plugin_setup:plugin'], + 'console_scripts': ['orchestrator=q2_mlab.orchestrator:cli'], }, - package_data={'q2_mlab': ['assets/index.html', 'citations.bib']}, + package_data={'q2_mlab': ['assets/index.html', + 'citations.bib', + 'templates/array_job_template.sh']}, zip_safe=False, - install_requires=['xgboost', 'lightgbm', 'calour'] + install_requires=['xgboost', 'calour', 'click'] )