test_ds_aio_utils.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. """
  5. Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
  6. """
  7. BYTES_PER_GB = 1024**3
  8. LOG_TIDS = [0]
  9. def task_log(tid, msg):
  10. if tid in LOG_TIDS:
  11. print(f'tid {tid}: {msg}')
  12. def task_barrier(barrier, num_parties):
  13. assert barrier.parties == num_parties
  14. barrier.wait()
  15. assert barrier.broken == False
  16. def report_results(args, read_op, pool_results):
  17. #print(f'pool_results = {pool_results}')
  18. io_string = 'Read' if read_op else 'Write'
  19. if None in pool_results:
  20. print(f'Failure in one of {args.threads} {io_string} processes')
  21. return
  22. total_bytes = sum([num_bytes for _, _, num_bytes in pool_results])
  23. task_latency_sec = max([sec for _, sec, _ in pool_results])
  24. task_speed_GB = total_bytes / task_latency_sec / BYTES_PER_GB
  25. print(f'Task {io_string} Latency = {task_latency_sec} sec')
  26. print(f'Task {io_string} Speed = {task_speed_GB} GB/sec')
  27. e2e_latency_sec = max([sec for sec, _, _ in pool_results])
  28. e2e_speed_GB = total_bytes / e2e_latency_sec / BYTES_PER_GB
  29. print(f'E2E {io_string} Latency = {e2e_latency_sec} sec')
  30. print(f'E2E {io_string} Speed = {e2e_speed_GB} GB/sec')
  31. def refine_integer_value(value):
  32. unit_dict = {'K': 1024, 'M': 1024**2, 'G': 1024**3}
  33. if value[-1] in list(unit_dict.keys()):
  34. int_value = int(value[:-1]) * unit_dict[value[-1]]
  35. return int_value
  36. return int(value)
  37. def refine_args(args):
  38. if args.write_size and type(args.write_size) == str:
  39. args.write_size = refine_integer_value(args.write_size)
  40. if args.block_size and type(args.block_size) == str:
  41. args.block_size = refine_integer_value(args.block_size)