deepspeed_aio_common.cpp 13 KB


  1. // Copyright (c) Microsoft Corporation.
  2. // SPDX-License-Identifier: Apache-2.0
  3. // DeepSpeed Team
  4. /*
  5. Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
  6. */
  7. #include <assert.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <fcntl.h>
  11. #include <libaio.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/types.h>
  15. #include <time.h>
  16. #include <unistd.h>
  17. #include <algorithm>
  18. #include <chrono>
  19. #include <cmath>
  20. #include <cstring>
  21. #include <fstream>
  22. #include <iostream>
  23. #include <limits>
  24. #include <numeric>
  25. #include <string>
  26. #include <vector>
  27. #include "deepspeed_aio_common.h"
  28. using namespace std;
  29. using namespace std::chrono;
  30. #define DEBUG_DS_AIO_PERF 0
  31. #define DEBUG_DS_AIO_SUBMIT_PERF 0
  32. static const std::string c_library_name = "deepspeed_aio";
  33. static void _report_aio_statistics(const char* tag,
  34. const std::vector<std::chrono::duration<double>>& latencies)
  35. __attribute__((unused));
  36. static void _report_aio_statistics(const char* tag,
  37. const std::vector<std::chrono::duration<double>>& latencies)
  38. {
  39. std::vector<double> lat_usec;
  40. for (auto& lat : latencies) { lat_usec.push_back(lat.count() * 1e6); }
  41. const auto min_lat = *(std::min_element(lat_usec.begin(), lat_usec.end()));
  42. const auto max_lat = *(std::max_element(lat_usec.begin(), lat_usec.end()));
  43. const auto avg_lat = std::accumulate(lat_usec.begin(), lat_usec.end(), 0) / lat_usec.size();
  44. std::cout << c_library_name << ": latency statistics(usec) " << tag
  45. << " min/max/avg = " << min_lat << " " << max_lat << " " << avg_lat << std::endl;
  46. }
  47. static void _get_aio_latencies(std::vector<std::chrono::duration<double>>& raw_latencies,
  48. struct deepspeed_aio_latency_t& summary_latencies)
  49. {
  50. std::vector<double> lat_usec;
  51. for (auto& lat : raw_latencies) { lat_usec.push_back(lat.count() * 1e6); }
  52. summary_latencies._min_usec = *(std::min_element(lat_usec.begin(), lat_usec.end()));
  53. summary_latencies._max_usec = *(std::max_element(lat_usec.begin(), lat_usec.end()));
  54. summary_latencies._avg_usec =
  55. std::accumulate(lat_usec.begin(), lat_usec.end(), 0) / lat_usec.size();
  56. }
  57. static void _do_io_submit_singles(const long long int n_iocbs,
  58. const long long int iocb_index,
  59. std::unique_ptr<aio_context>& aio_ctxt,
  60. std::vector<std::chrono::duration<double>>& submit_times)
  61. {
  62. for (auto i = 0; i < n_iocbs; ++i) {
  63. const auto st = std::chrono::high_resolution_clock::now();
  64. const auto submit_ret = io_submit(aio_ctxt->_io_ctxt, 1, aio_ctxt->_iocbs.data() + i);
  65. submit_times.push_back(std::chrono::high_resolution_clock::now() - st);
  66. #if DEBUG_DS_AIO_SUBMIT_PERF
  67. printf("submit(usec) %f io_index=%lld buf=%p len=%lu off=%llu \n",
  68. submit_times.back().count() * 1e6,
  69. iocb_index,
  70. aio_ctxt->_iocbs[i]->u.c.buf,
  71. aio_ctxt->_iocbs[i]->u.c.nbytes,
  72. aio_ctxt->_iocbs[i]->u.c.offset);
  73. #endif
  74. assert(submit_ret > 0);
  75. }
  76. }
  77. static void _do_io_submit_block(const long long int n_iocbs,
  78. const long long int iocb_index,
  79. std::unique_ptr<aio_context>& aio_ctxt,
  80. std::vector<std::chrono::duration<double>>& submit_times)
  81. {
  82. const auto st = std::chrono::high_resolution_clock::now();
  83. const auto submit_ret = io_submit(aio_ctxt->_io_ctxt, n_iocbs, aio_ctxt->_iocbs.data());
  84. submit_times.push_back(std::chrono::high_resolution_clock::now() - st);
  85. #if DEBUG_DS_AIO_SUBMIT_PERF
  86. printf("submit(usec) %f io_index=%lld nr=%lld buf=%p len=%lu off=%llu \n",
  87. submit_times.back().count() * 1e6,
  88. iocb_index,
  89. n_iocbs,
  90. aio_ctxt->_iocbs[0]->u.c.buf,
  91. aio_ctxt->_iocbs[0]->u.c.nbytes,
  92. aio_ctxt->_iocbs[0]->u.c.offset);
  93. #endif
  94. assert(submit_ret > 0);
  95. }
  96. static int _do_io_complete(const long long int min_completes,
  97. const long long int max_completes,
  98. std::unique_ptr<aio_context>& aio_ctxt,
  99. std::vector<std::chrono::duration<double>>& reap_times)
  100. {
  101. const auto start_time = std::chrono::high_resolution_clock::now();
  102. long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
  103. min_completes,
  104. max_completes,
  105. aio_ctxt->_io_events.data(),
  106. nullptr,
  107. nullptr);
  108. reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);
  109. assert(n_completes >= min_completes);
  110. return n_completes;
  111. }
  112. void do_aio_operation_sequential(const bool read_op,
  113. std::unique_ptr<aio_context>& aio_ctxt,
  114. std::unique_ptr<io_xfer_ctxt>& xfer_ctxt,
  115. deepspeed_aio_config_t* config,
  116. deepspeed_aio_perf_t* perf)
  117. {
  118. struct io_prep_context prep_ctxt(read_op, xfer_ctxt, aio_ctxt->_block_size, &aio_ctxt->_iocbs);
  119. const auto num_io_blocks = static_cast<long long int>(
  120. ceil(static_cast<double>(xfer_ctxt->_num_bytes) / aio_ctxt->_block_size));
  121. #if DEBUG_DS_AIO_PERF
  122. const auto io_op_name = std::string(read_op ? "read" : "write");
  123. std::cout << c_library_name << ": start " << io_op_name << " " << xfer_ctxt->_num_bytes
  124. << " bytes with " << num_io_blocks << " io blocks" << std::endl;
  125. #endif
  126. std::vector<std::chrono::duration<double>> submit_times;
  127. std::vector<std::chrono::duration<double>> reap_times;
  128. const auto max_queue_bytes =
  129. static_cast<long long int>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);
  130. auto start = std::chrono::high_resolution_clock::now();
  131. for (long long iocb_index = 0; iocb_index < num_io_blocks;
  132. iocb_index += aio_ctxt->_queue_depth) {
  133. const auto start_offset = iocb_index * aio_ctxt->_block_size;
  134. const auto start_buffer = (char*)xfer_ctxt->_mem_buffer + start_offset;
  135. const auto n_iocbs =
  136. min(static_cast<long long>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
  137. const auto num_bytes = min(max_queue_bytes, (xfer_ctxt->_num_bytes - start_offset));
  138. prep_ctxt.prep_iocbs(n_iocbs, num_bytes, start_buffer, start_offset);
  139. if (config->_single_submit) {
  140. _do_io_submit_singles(n_iocbs, iocb_index, aio_ctxt, submit_times);
  141. } else {
  142. _do_io_submit_block(n_iocbs, iocb_index, aio_ctxt, submit_times);
  143. }
  144. _do_io_complete(n_iocbs, n_iocbs, aio_ctxt, reap_times);
  145. }
  146. const std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
  147. if (perf) {
  148. _get_aio_latencies(submit_times, perf->_submit);
  149. _get_aio_latencies(reap_times, perf->_complete);
  150. perf->_e2e_usec = elapsed.count() * 1e6;
  151. perf->_e2e_rate_GB = (xfer_ctxt->_num_bytes / elapsed.count() / 1e9);
  152. }
  153. #if DEBUG_DS_AIO_PERF
  154. _report_aio_statistics("submit", submit_times);
  155. _report_aio_statistics("complete", reap_times);
  156. #endif
  157. #if DEBUG_DS_AIO_PERF
  158. std::cout << c_library_name << ": runtime(usec) " << elapsed.count() * 1e6
  159. << " rate(GB/sec) = " << (xfer_ctxt->_num_bytes / elapsed.count() / 1e9) << std::endl;
  160. #endif
  161. #if DEBUG_DS_AIO_PERF
  162. std::cout << c_library_name << ": finish " << io_op_name << " " << xfer_ctxt->_num_bytes
  163. << " bytes " << std::endl;
  164. #endif
  165. }
  166. void do_aio_operation_overlap(const bool read_op,
  167. std::unique_ptr<aio_context>& aio_ctxt,
  168. std::unique_ptr<io_xfer_ctxt>& xfer_ctxt,
  169. deepspeed_aio_config_t* config,
  170. deepspeed_aio_perf_t* perf)
  171. {
  172. struct io_prep_generator io_gen(read_op, xfer_ctxt, aio_ctxt->_block_size);
  173. #if DEBUG_DS_AIO_PERF
  174. const auto io_op_name = std::string(read_op ? "read" : "write");
  175. std::cout << c_library_name << ": start " << io_op_name << " " << xfer_ctxt->_num_bytes
  176. << " bytes with " << io_gen._num_io_blocks << " io blocks" << std::endl;
  177. #endif
  178. std::vector<std::chrono::duration<double>> submit_times;
  179. std::vector<std::chrono::duration<double>> reap_times;
  180. auto request_iocbs = aio_ctxt->_queue_depth;
  181. auto n_pending_iocbs = 0;
  182. const auto min_completes = 1;
  183. auto start = std::chrono::high_resolution_clock::now();
  184. while (true) {
  185. const auto n_iocbs = io_gen.prep_iocbs(request_iocbs - n_pending_iocbs, &aio_ctxt->_iocbs);
  186. if (n_iocbs > 0) {
  187. if (config->_single_submit) {
  188. _do_io_submit_singles(
  189. n_iocbs, (io_gen._next_iocb_index - n_iocbs), aio_ctxt, submit_times);
  190. } else {
  191. _do_io_submit_block(
  192. n_iocbs, (io_gen._next_iocb_index - n_iocbs), aio_ctxt, submit_times);
  193. }
  194. }
  195. n_pending_iocbs += n_iocbs;
  196. assert(n_pending_iocbs <= aio_ctxt->_queue_depth);
  197. if (n_pending_iocbs == 0) { break; }
  198. const auto n_complete =
  199. _do_io_complete(min_completes, n_pending_iocbs, aio_ctxt, reap_times);
  200. n_pending_iocbs -= n_complete;
  201. }
  202. const std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
  203. if (perf) {
  204. _get_aio_latencies(submit_times, perf->_submit);
  205. _get_aio_latencies(reap_times, perf->_complete);
  206. perf->_e2e_usec = elapsed.count() * 1e6;
  207. perf->_e2e_rate_GB = (xfer_ctxt->_num_bytes / elapsed.count() / 1e9);
  208. }
  209. #if DEBUG_DS_AIO_PERF
  210. _report_aio_statistics("submit", submit_times);
  211. _report_aio_statistics("complete", reap_times);
  212. #endif
  213. #if DEBUG_DS_AIO_PERF
  214. std::cout << c_library_name << ": runtime(usec) " << elapsed.count() * 1e6
  215. << " rate(GB/sec) = " << (xfer_ctxt->_num_bytes / elapsed.count() / 1e9) << std::endl;
  216. #endif
  217. #if DEBUG_DS_AIO_PERF
  218. std::cout << c_library_name << ": finish " << io_op_name << " " << xfer_ctxt->_num_bytes
  219. << " bytes " << std::endl;
  220. #endif
  221. }
  222. void report_file_error(const char* filename, const std::string file_op, const int error_code)
  223. {
  224. std::string err_msg = file_op + std::string(" failed on ") + std::string(filename) +
  225. " error = " + std::to_string(error_code);
  226. std::cerr << c_library_name << ": " << err_msg << std::endl;
  227. }
  228. int open_file(const char* filename, const bool read_op)
  229. {
  230. const int flags = read_op ? (O_RDONLY | O_DIRECT) : (O_WRONLY | O_CREAT | O_DIRECT);
  231. #if defined(__ENABLE_CANN__)
  232. int* flags_ptr = (int*)&flags;
  233. *flags_ptr = read_op ? (O_RDONLY) : (O_WRONLY | O_CREAT);
  234. #endif
  235. const int mode = 0600;
  236. const auto fd = open(filename, flags, mode);
  237. if (fd == -1) {
  238. const auto error_code = errno;
  239. const auto error_msg = read_op ? " open for read " : " open for write ";
  240. report_file_error(filename, error_msg, error_code);
  241. return -1;
  242. }
  243. return fd;
  244. }
  245. int regular_read(const char* filename, std::vector<char>& buffer)
  246. {
  247. long long int num_bytes;
  248. const auto f_size = get_file_size(filename, num_bytes);
  249. assert(f_size != -1);
  250. buffer.resize(num_bytes);
  251. const auto fd = open(filename, O_RDONLY, 0600);
  252. assert(fd != -1);
  253. long long int read_bytes = 0;
  254. auto r = 0;
  255. do {
  256. const auto buffer_ptr = buffer.data() + read_bytes;
  257. const auto bytes_to_read = num_bytes - read_bytes;
  258. r = read(fd, buffer_ptr, bytes_to_read);
  259. read_bytes += r;
  260. } while (r > 0);
  261. if (read_bytes != num_bytes) {
  262. std::cerr << "read error "
  263. << " read_bytes (read) = " << read_bytes << " num_bytes (fstat) = " << num_bytes
  264. << std::endl;
  265. }
  266. assert(read_bytes == num_bytes);
  267. close(fd);
  268. return 0;
  269. }
  270. static bool _validate_buffer(const char* filename, void* aio_buffer, const long long int num_bytes)
  271. {
  272. std::vector<char> regular_buffer;
  273. const auto reg_ret = regular_read(filename, regular_buffer);
  274. assert(0 == reg_ret);
  275. std::cout << "regular read of " << filename << " returned " << regular_buffer.size() << " bytes"
  276. << std::endl;
  277. if (static_cast<long long int>(regular_buffer.size()) != num_bytes) { return false; }
  278. return (0 == memcmp(aio_buffer, regular_buffer.data(), regular_buffer.size()));
  279. }
  280. bool validate_aio_operation(const bool read_op,
  281. const char* filename,
  282. void* aio_buffer,
  283. const long long int num_bytes)
  284. {
  285. const auto msg_suffix = std::string("deepspeed_aio_") +
  286. std::string(read_op ? "read()" : "write()") +
  287. std::string("using read()");
  288. if (false == _validate_buffer(filename, aio_buffer, num_bytes)) {
  289. std::cout << "Fail: correctness of " << msg_suffix << std::endl;
  290. return false;
  291. }
  292. std::cout << "Pass: correctness of " << msg_suffix << std::endl;
  293. return true;
  294. }