deepspeed_aio_thread.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. /*
  2. Copyright 2020 The Microsoft DeepSpeed Team
  3. Licensed under the MIT license.
  4. Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
  5. */
  6. #include "deepspeed_aio_thread.h"
  7. using namespace std;
  8. io_op_desc_t::io_op_desc_t(const bool read_op,
  9. const torch::Tensor& buffer,
  10. const int fd,
  11. const char* filename,
  12. const long long int num_bytes,
  13. const bool validate)
  14. : _read_op(read_op),
  15. _buffer(buffer),
  16. _fd(fd),
  17. _filename(filename),
  18. _num_bytes(num_bytes),
  19. _validate(validate)
  20. {
  21. _cpu_buffer = _buffer.is_cuda() ? _buffer.to(torch::kCPU).pin_memory() : _buffer;
  22. _contiguous_buffer = _cpu_buffer.contiguous();
  23. }
  24. char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr(); }
  25. void io_op_desc_t::fini()
  26. {
  27. if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
  28. }
  29. deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config)
  30. : _tid(tid),
  31. _aio_config(aio_config),
  32. _aio_ctxt(new aio_context(aio_config._block_size, aio_config._queue_depth)),
  33. _time_to_exit(false)
  34. {
  35. }
  36. deepspeed_aio_thread_t::~deepspeed_aio_thread_t() {}
  37. void deepspeed_aio_thread_t::run()
  38. {
  39. while (true) {
  40. std::shared_ptr<struct io_op_desc_t> next_io_op = nullptr;
  41. {
  42. std::unique_lock<std::mutex> lock(_work_sync._mutex);
  43. _work_sync._cond_var.wait(lock,
  44. [this] { return (!_work_queue.empty() || _time_to_exit); });
  45. if (!_work_queue.empty()) {
  46. next_io_op = _work_queue.front();
  47. _work_queue.pop();
  48. }
  49. }
  50. if (next_io_op) {
  51. const auto base_offset = next_io_op->_num_bytes * _tid;
  52. std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(
  53. next_io_op->_fd, base_offset, next_io_op->_num_bytes, next_io_op->data_ptr()));
  54. if (_aio_config._overlap_events) {
  55. do_aio_operation_overlap(
  56. next_io_op->_read_op, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  57. } else {
  58. do_aio_operation_sequential(
  59. next_io_op->_read_op, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  60. }
  61. {
  62. std::lock_guard<std::mutex> lock(_complete_sync._mutex);
  63. _complete_queue.push(next_io_op);
  64. }
  65. _complete_sync._cond_var.notify_one();
  66. }
  67. if (_time_to_exit) { break; }
  68. }
  69. }