util.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import pytest
  5. import torch
  6. import deepspeed
  7. from deepspeed.accelerator import get_accelerator, is_current_accelerator_supported
  8. from deepspeed.git_version_info import torch_info
  9. def skip_on_arch(min_arch=7):
  10. if get_accelerator().device_name() == 'cuda':
  11. if torch.cuda.get_device_capability()[0] < min_arch: #ignore-cuda
  12. pytest.skip(f"needs higher compute capability than {min_arch}")
  13. else:
  14. assert is_current_accelerator_supported()
  15. return
  16. def skip_on_cuda(valid_cuda):
  17. split_version = lambda x: map(int, x.split('.')[:2])
  18. if get_accelerator().device_name() == 'cuda':
  19. CUDA_MAJOR, CUDA_MINOR = split_version(torch_info['cuda_version'])
  20. CUDA_VERSION = (CUDA_MAJOR * 10) + CUDA_MINOR
  21. if valid_cuda.count(CUDA_VERSION) == 0:
  22. pytest.skip(f"requires cuda versions {valid_cuda}")
  23. else:
  24. assert is_current_accelerator_supported()
  25. return
  26. def bf16_required_version_check(accelerator_check=True):
  27. split_version = lambda x: map(int, x.split('.')[:2])
  28. TORCH_MAJOR, TORCH_MINOR = split_version(torch_info['version'])
  29. NCCL_MAJOR, NCCL_MINOR = split_version(torch_info['nccl_version'])
  30. CUDA_MAJOR, CUDA_MINOR = split_version(torch_info['cuda_version'])
  31. # Sometimes bf16 tests are runnable even if not natively supported by accelerator
  32. if accelerator_check:
  33. accelerator_pass = get_accelerator().is_bf16_supported()
  34. else:
  35. accelerator_pass = True
  36. torch_version_available = TORCH_MAJOR > 1 or (TORCH_MAJOR == 1 and TORCH_MINOR >= 10)
  37. cuda_version_available = CUDA_MAJOR >= 11
  38. nccl_version_available = NCCL_MAJOR > 2 or (NCCL_MAJOR == 2 and NCCL_MINOR >= 10)
  39. npu_available = get_accelerator().device_name() == 'npu'
  40. hpu_available = get_accelerator().device_name() == 'hpu'
  41. xpu_available = get_accelerator().device_name() == 'xpu'
  42. if torch_version_available and cuda_version_available and nccl_version_available and accelerator_pass:
  43. return True
  44. elif npu_available:
  45. return True
  46. elif hpu_available:
  47. return True
  48. elif xpu_available:
  49. return True
  50. else:
  51. return False
  52. def required_amp_check():
  53. from importlib.util import find_spec
  54. if find_spec('apex') is None:
  55. return False
  56. else:
  57. return True
  58. class no_child_process_in_deepspeed_io:
  59. def __enter__(self):
  60. # deepspeed_io defaults to creating a dataloader that uses a
  61. # multiprocessing pool. Our tests use pools and we cannot nest pools in
  62. # python. Therefore we're injecting this kwarg to ensure that no pools
  63. # are used in the dataloader.
  64. self.old_method = deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io
  65. def new_method(*args, **kwargs):
  66. kwargs["num_local_io_workers"] = 0
  67. return self.old_method(*args, **kwargs)
  68. deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io = new_method
  69. def __exit__(self, *_):
  70. deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io = self.old_method