123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- '''Copyright The Microsoft DeepSpeed Team'''
- import copy
- from numpy import BUFSIZE
- import json
- import subprocess
- import sys
- import threading
- import time
- import base64
- import os
- import hjson
- from tqdm import tqdm
- from ..utils import logger
- from .constants import AUTOTUNING, AUTOTUNING_METRIC_PATH
- from .utils import get_val_by_key, search_error, was_interruptted
- """
- thread-0: loop over experiment queue dispatching experiments if they become available
- thread-N: start each experiment in its own thread
- """
- from deepspeed import comm as dist
- TIMEOUT = 5
- class ResourceManager:
- def __init__(self,
- args,
- hosts,
- num_gpus_per_node,
- results_dir,
- exps_dir,
- arg_mappings):
- self.results_dir = results_dir
- self.exps_dir = exps_dir
- self.nodes = []
- self.num_gpus_per_node = num_gpus_per_node
- for host in hosts:
- self.nodes.append(Node(host, num_gpus_per_node))
- self.experiment_queue = []
- self.running_experiments = {}
- self.finished_experiments = {}
- self.experiment_count = 0
- self.exp_paths = set()
- self.args = args
- self.arg_mappings = {}
- if arg_mappings is not None:
- for k, v in arg_mappings.items():
- k = k.strip()
- v = v.strip()
- if k not in self.arg_mappings:
- self.arg_mappings[k] = v
- def schedule_experiments(self, exp_paths):
- for exp_path in exp_paths:
- if exp_path in self.exp_paths:
- continue
- else:
- self.exp_paths.add(exp_path)
- with open(exp_path, "r") as fd:
- exp = hjson.load(fd)
- exp["exp_id"] = self.experiment_count
- self.experiment_count += 1
- result_dir = exp["result_dir"] = os.path.join(
- self.results_dir,
- exp['name'])
- if AUTOTUNING in exp["ds_config"]:
- metric_file = os.path.join(result_dir, "metrics.json")
- exp["ds_config"][AUTOTUNING][
- AUTOTUNING_METRIC_PATH] = metric_file
- stderr_file = os.path.join(result_dir, "stderr.log")
- model_info_file = os.path.join(result_dir, "model_info.json")
- metric_file = os.path.join(result_dir, "metrics.json")
- # skip existing experiments (except for the ones that were interrupted)
- if os.path.exists(result_dir) and os.path.exists(stderr_file):
- if not was_interruptted(stderr_file):
- err = search_error(stderr_file)
- exp_id = exp["exp_id"]
- self.finished_experiments[exp_id] = (exp, err)
- if err or os.path.exists(metric_file) or os.path.exists(
- model_info_file):
- logger.info(
- f"Skipping exp {exp['name']} whose result already exists"
- )
- continue
- self.experiment_queue.append(exp)
- def run_job(self, exp: dict, reservations):
- exp_id = exp["exp_id"]
- exp["master_port"] = self.args.master_port + exp_id
- exp["result_dir"] = os.path.join(self.results_dir, exp['name'])
- user_script = self.args.user_script
- user_args = self.args.user_args
- # overwrite the user arg in the arg_mappings
- for key, val in self.arg_mappings.items():
- nval = get_val_by_key(exp, key)
- if nval and str(nval) != "auto":
- if val in user_args:
- idx = user_args.index(val)
- user_args[idx + 1] = str(nval)
- else:
- user_args.append(val)
- user_args.append(str(nval))
- t = threading.Thread(target=run_experiment,
- args=(exp,
- reservations,
- user_script,
- user_args))
- t.start()
- self.running_experiments[exp_id] = (t, exp, reservations, time.time())
- def experiment_check(self, pbar):
- finished_exps = []
- for exp_id, exp_data in self.running_experiments.items():
- thread, exp_json, reservations, start_time = exp_data
- logger.debug(f"Checking exp_id = {exp_id}, alive = {thread.is_alive()}")
- thread.join(timeout=TIMEOUT)
- if not thread.is_alive():
- exp_dir = exp_json["result_dir"]
- stderr_file = os.path.join(exp_dir, "stderr.log")
- err = search_error(stderr_file)
- finished_exps.append((exp_id, reservations))
- self.finished_experiments[exp_id] = (exp_json, err)
- duration = time.time() - start_time
- logger.debug(f"Finished exp_id = {exp_id}, duration={duration:.2f} sec")
- pbar.update(len(finished_exps))
- for exp_id, reservations in finished_exps:
- for reservation in reservations:
- reservation.restore_slots()
- self.running_experiments.pop(exp_id)
- time.sleep(TIMEOUT)
- def resource_request(self, exp):
- num_gpus, num_nodes = exp['num_gpus'], exp['num_nodes']
- slot_request = num_gpus
- reservations = []
- for node in self.nodes:
- if num_nodes == 0:
- break
- slots = node.reserve_slots(slot_request=slot_request)
- if slots:
- reservations.append(Reservation(node=node, slots=slots))
- num_nodes -= 1
- if num_nodes == 0:
- # request satisfied
- return reservations
- else:
- # request not satisfied
- for reservation in reservations:
- reservation.restore_slots()
- def status(self):
- status = ""
- for node in self.nodes:
- status += f"{node.host} ({len(node.idle_slots)} idle gpus), "
- return status[:-1]
- def run(self):
- pbar = tqdm(total=len(self.experiment_queue))
- while len(self.experiment_queue) > 0:
- exp = self.experiment_queue.pop(0)
- logger.debug(f'Popped exp_id = {exp["exp_id"]} from the queue')
- logger.debug(f'Resource status: {self.status()}')
- reservations = self.resource_request(exp)
- if not reservations:
- logger.debug(f'Unable to schedule exp_id = {exp["exp_id"]}')
- self.experiment_queue.insert(0, exp)
- logger.debug(f'Put exp_id = {exp["exp_id"]} back into the queue')
- self.experiment_check(pbar)
- else:
- desc = ""
- for reservation in reservations:
- reservation.slots.sort()
- slots = ",".join(map(str, reservation.slots))
- desc += f"{reservation.node.host}:{slots}@"
- desc = desc[:-1]
- logger.debug(f'Running exp_id = {exp["exp_id"]} on {desc}')
- self.run_job(exp, reservations)
- # All pending experiments are scheduled, waiting for them to complete
- while len(self.running_experiments) > 0:
- self.experiment_check(pbar)
- def save_exp_results_to_database(self, message, ranks=None, path=None):
- """Print message when one of following condition meets
- + not dist.is_initialized()
- + dist.get_rank() in ranks if ranks is not None or ranks = [-1]
- Args:
- message (str)
- ranks (list)
- path (str)
- """
- should_log = not dist.is_initialized()
- ranks = ranks or []
- my_rank = dist.get_rank() if dist.is_initialized() else -1
- if ranks and not should_log:
- should_log = ranks[0] == -1
- should_log = should_log or (my_rank in set(ranks))
- logger.debug(f"*** Should log: {should_log}")
- if should_log:
- message['rank'] = my_rank
- with open(path, 'a') as outfile:
- json.dump(message, outfile)
- outfile.write('\n')
- def parse_results(self, metric):
- """ Parses the metric file of the finished experiments to select the optimal DeepSpeed configuration.
- Args:
- finished_experiments (dcit): a dictionary of experiment id and experiment description.
- Returns:
- The path to the result folder of the experiment with the optimal configuration.
- """
- max_throughput = sys.float_info.min
- best_exp_id = -1
- for exp_id, (exp, err) in self.finished_experiments.items():
- if err:
- logger.info(
- f"The experiment exp_id = {exp_id}, exp_name = {exp['name']}, did not run successfully with error = {err}, thus a metrics.txt does not exist for it. Check the stderr.log in {exp['result_dir']}"
- )
- continue
- metric_file = exp["ds_config"][AUTOTUNING][AUTOTUNING_METRIC_PATH]
- if os.path.exists(metric_file):
- with open(metric_file, 'r') as f:
- results = hjson.load(f)
- curr_throughput = results[metric]
- if curr_throughput > max_throughput:
- max_throughput = curr_throughput
- best_exp_id = exp_id
- exp['results'] = results
- if best_exp_id != -1:
- best_exp, _ = self.finished_experiments[best_exp_id]
- return best_exp, max_throughput
- return exp, None
- def clear(self):
- """Clear experiment queues, does not reset self.experiment_count
- """
- self.experiment_queue = []
- # clean up the running experiments
- for exp_id, exp_data in self.running_experiments.items():
- thread, exp_json, reservations, start_time = exp_data
- clean_up(exp_json, reservations)
- self.running_experiments = {}
- self.finished_experiments = {}
- self.exp_paths = set()
- class Node:
- def __init__(self, host, max_slots):
- self.host = host
- self.max_slots = max_slots
- self.idle_slots = list(range(max_slots))
- def reserve_slots(self, slot_request: int) -> list:
- if len(self.idle_slots) >= slot_request:
- return [self.idle_slots.pop(0) for _ in range(slot_request)]
- def restore_slots(self, slots: list):
- self.idle_slots += slots
- class Reservation:
- def __init__(self, node, slots):
- self.node = node
- self.slots = slots
- def restore_slots(self):
- self.node.restore_slots(self.slots)
- def desc(self):
- slots = ",".join(map(str, self.slots))
- return f"{self.node.host}:{slots}@"
- def get_job_id():
- # Infrastructure-specific job-id
- infra_job_id = None
- if "DLWS_JOB_ID" in os.environ:
- infra_job_id = os.environ["DLWS_JOB_ID"]
- elif "DLTS_JOB_ID" in os.environ:
- infra_job_id = os.environ["DLTS_JOB_ID"]
- else:
- infra_job_id = "unknown-job-id"
- return infra_job_id
- def get_user():
- user = None
- if "USER" in os.environ:
- user = os.environ["USER"]
- else:
- user = "unknown-user"
- return user
- def run_experiment(exp: dict, reservations, user_script, user_args):
- include_str = ""
- for reservation in reservations:
- reservation.slots.sort()
- slots = ",".join(map(str, reservation.slots))
- include_str += f"{reservation.node.host}:{slots}@"
- include_str = include_str[:-1]
- master_port = exp["master_port"]
- exp["launcher_args"] = [
- "--include",
- f"{include_str}",
- "--master_port",
- str(master_port),
- ]
- logger.debug(f'launcher args={exp["launcher_args"]}')
- exp["user"] = get_user()
- exp["job_id"] = get_job_id()
- exp_dir = exp["result_dir"]
- os.makedirs(exp_dir, exist_ok=True)
- ds_config_path = os.path.join(exp_dir, "ds_config.json")
- exp["ds_config_path"] = ds_config_path
- ds_config = copy.deepcopy(exp["ds_config"])
- ds_config_json = json.dumps(ds_config).encode('utf-8')
- exp["ds_config_base64"] = base64.urlsafe_b64encode(ds_config_json).decode('utf-8')
- with open(exp["ds_config_path"], "w", buffering=BUFSIZE) as fd:
- json.dump(ds_config, fd)
- fd.flush()
- os.fsync(fd)
- path = exp["ds_config_path"]
- logger.info(f"Scheduler wrote ds_config to {path}, {os.path.abspath(path)}")
- with open(os.path.join(exp_dir, "exp.json"), "w", buffering=BUFSIZE) as fd:
- json.dump(exp, fd)
- fd.flush()
- os.fsync(fd)
- path = os.path.join(exp_dir, "exp.json")
- logger.info(f"Scheduler wrote exp to {path}, {os.path.abspath(path)}")
- # remove "--deepspeed_config ds_config.json" from user_args
- if user_args:
- if "--deepspeed_config" in user_args:
- idx = user_args.index("--deepspeed_config")
- # "--deepspeed_config" is omitted in HF
- elif "--deepspeed" in user_args:
- idx = user_args.index("--deepspeed")
- assert idx < len(user_args), "there is no ds_config file specified after --deepspeed_config or --deepspeed"
- # user_args[idx + 1] = exp["ds_config_path"]
- # pass base64 serialized ds_config to launcher
- user_args[idx + 1] = exp["ds_config_base64"]
- exp["user_script"] = user_script
- exp["user_args"] = user_args
- cmd = ["deepspeed"] + exp["launcher_args"] + [user_script] + user_args
- assert len(exp["launcher_args"]) > 0, "must provide launcher args"
- with open(os.path.join(exp_dir, "cmd.txt"), "w", buffering=BUFSIZE) as fd:
- fd.write(" ".join(cmd))
- fd.write("\n")
- fd.flush()
- os.fsync(fd)
- logger.info(
- f"Launching exp_id = {exp['exp_id']}, exp_name = {exp['name']}, with resource = {include_str}, and ds_config = {os.path.abspath(ds_config_path)}"
- )
- with open(os.path.join(exp_dir, "stdout.log"), "wb") as out, open(
- os.path.join(exp_dir, "stderr.log"), "wb"
- ) as err:
- result = subprocess.Popen(cmd, stdout=out, stderr=err)
- result.wait()
- out.flush()
- err.flush()
- os.fsync(out)
- os.fsync(err)
- clean_up(exp, reservations)
- logger.info(
- f"Done running exp_id = {exp['exp_id']}, exp_name = {exp['name']}, with resource = {include_str}"
- )
- PDSH_MAX_FAN_OUT = 1024
- def clean_up(exp: dict, reservations):
- env = os.environ.copy()
- env['PDSH_RCMD_TYPE'] = 'ssh'
- nodes_str = ""
- for reservation in reservations:
- nodes_str += f"{reservation.node.host},"
- nodes_str = nodes_str[:-1]
- logger.debug(
- f"Cleaning up exp_id = {exp['exp_id']} on the following workers: {nodes_str}")
- # PDSH flags for max node fan out and specific hosts to launch on
- # See https://linux.die.net/man/1/pdsh for flag details
- pdsh_cmd = ['pdsh', '-f', str(PDSH_MAX_FAN_OUT), '-w', nodes_str]
- kill_cmd = [
- 'pkill',
- '-f',
- exp['name'],
- ]
- cmd = pdsh_cmd + kill_cmd
- logger.debug("cmd = {}".format(' '.join(cmd)))
- result = subprocess.Popen(cmd, env=env)
- result.wait()
- # In case of failure must propagate the error-condition back to the caller (usually shell). The
- # actual error and traceback should have been printed in the subprocess, so in order to avoid
- # unnecessary noise we just quietly exit here with the same code as the subprocess
- if result.returncode > 0:
- sys.exit(result.returncode)
- logger.info(
- f"Done cleaning up exp_id = {exp['exp_id']} on the following workers: {nodes_str}"
- )
|