123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import hjson
- from ..constants import AUTOTUNING, AUTOTUNING_METRIC_PATH
- from .base_tuner import BaseTuner
- from .cost_model import XGBoostCostModel
- from .utils import *
- from ..utils import *
- import numbers
- from ..constants import AUTOTUNING_METRIC_LATENCY
- INIT_NUM = 2
- class ModelBasedTuner(BaseTuner):
- """Exploring the search space with a cost model"""
- def __init__(self, exps: list, resource_manager, metric, tuning_space):
- super().__init__(exps, resource_manager, metric)
- self.tuning_space = tuning_space
- self.best_iter = 0
- self.all_configs = [e['ds_config'] for e in exps]
- self.num_all_configs = len(self.all_configs)
- self.dims = dict_to_dims(self.tuning_space)
- logger.info(f"Create config dim: {self.dims}, all configs: {self.num_all_configs}")
- self.visited = set([])
- self.trials = []
- self.trial_pt = 0
- init_num = min(INIT_NUM, self.num_all_configs)
- for _ in range(init_num):
- exp_feature = np.random.randint(self.num_all_configs)
- exp_feature = 0
- while exp_feature in self.visited:
- exp_feature = np.random.randint(self.num_all_configs)
- self.trials.append(exp_feature)
- self.visited.add(exp_feature)
- self.cost_model = XGBoostCostModel("rank")
- self.evaluated_configs = []
- self.evaluated_perf = []
- self.train_ct = 0
- self.random_exploration_ratio = 0.2 # do random exploration
- def find_estimated_top_configs(self):
- """Use the cost model to predict the estimated performance of configurations and find the top ones for the next round of evaluation"""
- configs = []
- for c in self.all_configs:
- flattened_ds_config = flatten(c)
- feature_val = []
- for k, v in flattened_ds_config.items():
- if isinstance(v, numbers.Number):
- feature_val.append(v)
- configs.append(feature_val)
- # print(configs)
- # TODO the current implementation requires that all configs have the same shape.
- configs = np.array(configs, dtype=np.float32)
- estimates = self.cost_model.predict(configs)
- n = len(estimates)
- top_idx = np.argsort(estimates)
- top_idx_ret = top_idx if self.metric == AUTOTUNING_METRIC_LATENCY else top_idx[::-1][:n]
- # top_configs = [self.all_configs[i] for i in top_idx]
- return top_idx_ret
- def next_batch(self, sample_size):
- sampled_batch = []
- counter = 0
- while counter < sample_size:
- if len(self.visited) >= self.num_all_configs:
- break
- while self.trial_pt < len(self.trials):
- logger.debug(f"trials: {self.trials}")
- # Select top promising trials
- index = self.trials[self.trial_pt]
- if index not in self.visited:
- break
- self.trial_pt += 1
- # To avoid over-exploitation, randomly select one that has not been explored.
- rand = np.random.rand()
- if rand < self.random_exploration_ratio:
- # Do normal selection
- feature = np.random.choice(self.trials)
- while index in self.visited:
- index = np.random.randint(self.num_all_configs)
- # Need to track both the sampled configs and indices
- sampled_batch.append(self.all_exps[index])
- self.visited.add(index)
- counter += 1
- return sampled_batch
- def has_next(self):
- return len(self.visited) < self.num_all_configs
- def update(self):
- for exp_id, (exp, err) in self.rm.finished_experiments.items():
- feature_val = []
- if err:
- logger.info(
- f"Skipping exp_id = {exp_id}, exp_name = {exp['name']}, the experiment did not run successfully with error = {err}, thus a metrics.txt does not exist for it. Please check the stderr.log in {exp['result_dir']}"
- )
- ds_config = exp["ds_config"]
- flattened_ds_config = flatten(ds_config)
- for k, v in flattened_ds_config.items():
- if isinstance(v, numbers.Number):
- feature_val.append(v)
- self.evaluated_configs.append(feature_val)
- self.evaluated_perf.append(0.0)
- continue
- p = exp["ds_config"][AUTOTUNING][AUTOTUNING_METRIC_PATH]
- with open(p, 'r') as f:
- results = hjson.load(f)
- curr_iter = results[self.metric]
- logger.debug(f"parsing the results for {exp_id}, Result is {curr_iter}")
- ds_config = exp["ds_config"]
- flattened_ds_config = flatten(ds_config)
- for k, v in flattened_ds_config.items():
- if isinstance(v, numbers.Number):
- feature_val.append(v)
- self.evaluated_configs.append(feature_val)
- self.evaluated_perf.append(curr_iter)
- logger.debug(f"**Evaluated configs: {len(self.evaluated_configs)}, evaluated perf: {self.evaluated_perf}")
- self.cost_model.fit(self.evaluated_configs, self.evaluated_perf)
- estimated_top_configs = self.find_estimated_top_configs()
- self.trials = estimated_top_configs
- self.trial_pt = 0
- self.train_ct += 1
|