deepspeed_py_aio_handle.cpp 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright (c) Microsoft Corporation.
  2. // SPDX-License-Identifier: Apache-2.0
  3. // DeepSpeed Team
  4. /*
  5. Copyright 2020 The Microsoft DeepSpeed Team
  6. Licensed under the MIT license.
  7. Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
  8. */
  9. #include "deepspeed_py_aio_handle.h"
  10. using namespace std;
  11. static void _start_aio_thread(std::shared_ptr<struct deepspeed_aio_thread_t> ctxt) { ctxt->run(); }
  12. deepspeed_aio_handle_t::deepspeed_aio_handle_t(const int block_size,
  13. const int queue_depth,
  14. const bool single_submit,
  15. const bool overlap_events,
  16. const int num_threads)
  17. : _aio_ctxt(new aio_context(block_size, queue_depth)),
  18. _single_submit(single_submit),
  19. _overlap_events(overlap_events),
  20. _num_threads(num_threads),
  21. _aio_config(block_size, queue_depth, single_submit, overlap_events, false),
  22. _num_pending_ops(0),
  23. _pinned_tensor_mgr(new deepspeed_pin_tensor_t())
  24. {
  25. for (auto i = 0; i < num_threads; ++i) {
  26. _thread_contexts.push_back(std::make_shared<deepspeed_aio_thread_t>(i, _aio_config));
  27. }
  28. for (auto& ctxt : _thread_contexts) {
  29. _threads.push_back(std::thread(_start_aio_thread, ctxt));
  30. }
  31. }
  32. deepspeed_aio_handle_t::~deepspeed_aio_handle_t()
  33. {
  34. _stop_threads();
  35. for (auto& thr : _threads) { thr.join(); }
  36. }
  37. const int deepspeed_aio_handle_t::get_block_size() const
  38. {
  39. return _aio_ctxt ? _aio_ctxt->_block_size : -1;
  40. }
  41. const int deepspeed_aio_handle_t::get_queue_depth() const
  42. {
  43. return _aio_ctxt ? _aio_ctxt->_queue_depth : -1;
  44. }
  45. const bool deepspeed_aio_handle_t::get_single_submit() const { return _single_submit; }
  46. const bool deepspeed_aio_handle_t::get_overlap_events() const { return _overlap_events; }
  47. const int deepspeed_aio_handle_t::get_thread_count() const { return _num_threads; }
  48. int deepspeed_aio_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate)
  49. {
  50. const auto start_time = std::chrono::high_resolution_clock::now();
  51. assert(_aio_ctxt);
  52. long long num_file_bytes;
  53. if (-1 == get_file_size(filename, num_file_bytes)) {
  54. const auto error_code = errno;
  55. report_file_error(filename, " fstat for read", error_code);
  56. return -1;
  57. }
  58. assert(static_cast<long long int>(buffer.nbytes()) == num_file_bytes);
  59. const auto fd = open_file(filename, true);
  60. if (fd == -1) { return -1; }
  61. auto read_buffer = (char*)buffer.data_ptr();
  62. std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer));
  63. if (_aio_config._overlap_events) {
  64. do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  65. } else {
  66. do_aio_operation_sequential(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  67. }
  68. close(fd);
  69. const std::chrono::duration<double> aio_time =
  70. std::chrono::high_resolution_clock::now() - start_time;
  71. if (validate) { validate_aio_operation(true, filename, read_buffer, num_file_bytes); }
  72. const std::chrono::duration<double> fn_time =
  73. std::chrono::high_resolution_clock::now() - start_time;
  74. std::cout << "Elapsed time(usec): "
  75. << "aio = " << aio_time.count() * 1e6 << " call = " << fn_time.count() * 1e6
  76. << std::endl;
  77. return 0;
  78. }
  79. int deepspeed_aio_handle_t::write(const torch::Tensor& buffer,
  80. const char* filename,
  81. const bool validate)
  82. {
  83. assert(_aio_ctxt);
  84. const auto start_time = std::chrono::high_resolution_clock::now();
  85. const auto fd = open_file(filename, false);
  86. if (fd == -1) { return -1; }
  87. auto write_buffer = (char*)buffer.data_ptr();
  88. const auto num_write_bytes = static_cast<long long int>(buffer.nbytes());
  89. std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer));
  90. if (_aio_config._overlap_events) {
  91. do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  92. } else {
  93. do_aio_operation_sequential(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
  94. }
  95. const std::chrono::duration<double> aio_time =
  96. std::chrono::high_resolution_clock::now() - start_time;
  97. close(fd);
  98. if (validate) { validate_aio_operation(false, filename, write_buffer, num_write_bytes); }
  99. const std::chrono::duration<double> fn_time =
  100. std::chrono::high_resolution_clock::now() - start_time;
  101. std::cout << "Elapsed time(usec): "
  102. << "aio = " << aio_time.count() * 1e6 << " call = " << fn_time.count() * 1e6
  103. << std::endl;
  104. return 0;
  105. }
  106. void deepspeed_aio_handle_t::_schedule_aio_work(std::shared_ptr<struct io_op_desc_t> scheduled_op)
  107. {
  108. for (auto& ctxt : _thread_contexts) {
  109. {
  110. std::lock_guard<std::mutex> lock(ctxt->_work_sync._mutex);
  111. ctxt->_work_queue.push(scheduled_op);
  112. }
  113. ctxt->_work_sync._cond_var.notify_one();
  114. }
  115. _num_pending_ops++;
  116. }
  117. std::shared_ptr<struct io_op_desc_t> deepspeed_aio_handle_t::_wait_for_aio_work()
  118. {
  119. std::shared_ptr<struct io_op_desc_t> completed_op = nullptr;
  120. for (auto& ctxt : _thread_contexts) {
  121. std::unique_lock<std::mutex> lock(ctxt->_complete_sync._mutex);
  122. ctxt->_complete_sync._cond_var.wait(lock,
  123. [ctxt] { return !ctxt->_complete_queue.empty(); });
  124. completed_op = ctxt->_complete_queue.front();
  125. ctxt->_complete_queue.pop();
  126. }
  127. return completed_op;
  128. }
  129. void deepspeed_aio_handle_t::_stop_threads()
  130. {
  131. assert(0 == _num_pending_ops);
  132. for (auto& ctxt : _thread_contexts) {
  133. {
  134. std::lock_guard<std::mutex> lock(ctxt->_work_sync._mutex);
  135. ctxt->_time_to_exit = true;
  136. }
  137. ctxt->_work_sync._cond_var.notify_one();
  138. }
  139. }
  140. int deepspeed_aio_handle_t::wait()
  141. {
  142. assert(_num_pending_ops > 0);
  143. auto num_completed_ops = 0;
  144. while (_num_pending_ops > 0) {
  145. auto completed_op = _wait_for_aio_work();
  146. completed_op->fini();
  147. close(completed_op->_fd);
  148. if (completed_op->_validate) {
  149. validate_aio_operation(completed_op->_read_op,
  150. completed_op->_filename.c_str(),
  151. completed_op->data_ptr(),
  152. _num_threads * completed_op->_num_bytes);
  153. }
  154. --_num_pending_ops;
  155. ++num_completed_ops;
  156. }
  157. return num_completed_ops;
  158. }
  159. bool deepspeed_aio_handle_t::_is_valid_parallel_aio_op(const bool read_op,
  160. const long long int num_bytes)
  161. {
  162. const auto op_string = read_op ? "Read" : "Write";
  163. if (num_bytes % get_thread_count()) {
  164. std::cout << "deepspeed_aio failure: parallel " << op_string << " num_bytes = " << num_bytes
  165. << " not divisible by thread count = " << get_thread_count() << std::endl;
  166. return false;
  167. }
  168. return true;
  169. }
  170. int deepspeed_aio_handle_t::pread(const torch::Tensor& buffer,
  171. const char* filename,
  172. const bool validate,
  173. const bool async)
  174. {
  175. long long num_file_bytes;
  176. if (-1 == get_file_size(filename, num_file_bytes)) {
  177. const auto error_code = errno;
  178. report_file_error(filename, " fstat for read", error_code);
  179. return -1;
  180. }
  181. const auto buffer_bytes = static_cast<long long int>(buffer.nbytes());
  182. if (buffer_bytes != num_file_bytes) {
  183. std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes
  184. << " != " << num_file_bytes << std::endl;
  185. }
  186. assert(static_cast<long long int>(buffer.nbytes()) == num_file_bytes);
  187. assert((num_file_bytes % _num_threads) == 0);
  188. if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; }
  189. const auto fd = open_file(filename, true);
  190. if (fd == -1) { return -1; }
  191. auto scheduled_op = std::make_shared<io_op_desc_t>(
  192. true, buffer, fd, filename, (num_file_bytes / _num_threads), validate);
  193. _schedule_aio_work(scheduled_op);
  194. if (async) { return 0; }
  195. return wait();
  196. }
  197. int deepspeed_aio_handle_t::pwrite(const torch::Tensor& buffer,
  198. const char* filename,
  199. const bool validate,
  200. const bool async)
  201. {
  202. const auto num_write_bytes = static_cast<long long int>(buffer.nbytes());
  203. assert((num_write_bytes % _num_threads) == 0);
  204. if (!_is_valid_parallel_aio_op(false, num_write_bytes)) { return -1; }
  205. const auto fd = open_file(filename, false);
  206. if (fd == -1) { return -1; }
  207. auto scheduled_op = std::make_shared<io_op_desc_t>(
  208. false, buffer, fd, filename, (num_write_bytes / _num_threads), validate);
  209. _schedule_aio_work(scheduled_op);
  210. if (async) { return 0; }
  211. return wait();
  212. }
  213. int deepspeed_aio_handle_t::sync_pread(torch::Tensor& buffer, const char* filename)
  214. {
  215. return pread(buffer, filename, false, false);
  216. }
  217. int deepspeed_aio_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename)
  218. {
  219. return pwrite(buffer, filename, false, false);
  220. }
  221. int deepspeed_aio_handle_t::async_pread(torch::Tensor& buffer, const char* filename)
  222. {
  223. return pread(buffer, filename, false, true);
  224. }
  225. int deepspeed_aio_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename)
  226. {
  227. return pwrite(buffer, filename, false, true);
  228. }
  229. at::Tensor deepspeed_aio_handle_t::new_cpu_locked_tensor(const size_t num_elem,
  230. const torch::Tensor& example_tensor)
  231. {
  232. return _pinned_tensor_mgr->alloc(num_elem, example_tensor.scalar_type());
  233. }
  234. bool deepspeed_aio_handle_t::free_cpu_locked_tensor(torch::Tensor& locked_tensor)
  235. {
  236. return _pinned_tensor_mgr->free(locked_tensor);
  237. }