csv_monitor.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. from .monitor import Monitor
  5. import os
  6. import deepspeed.comm as dist
  7. class csvMonitor(Monitor):
  8. def __init__(self, csv_config):
  9. super().__init__(csv_config)
  10. self.filenames = []
  11. self.enabled = csv_config.enabled
  12. self.output_path = csv_config.output_path
  13. self.job_name = csv_config.job_name
  14. self.log_dir = self.setup_log_dir()
  15. def setup_log_dir(self, base=os.path.join(os.path.expanduser("~"), "csv_monitor")):
  16. if self.enabled and dist.get_rank() == 0:
  17. if self.output_path is not None:
  18. log_dir = os.path.join(self.output_path, self.job_name)
  19. # NOTE: This code path currently is never used since the default tensorboard_output_path is an empty string and not None. Saving it in case we want this functionality in the future.
  20. else:
  21. if "DLWS_JOB_ID" in os.environ:
  22. infra_job_id = os.environ["DLWS_JOB_ID"]
  23. elif "DLTS_JOB_ID" in os.environ:
  24. infra_job_id = os.environ["DLTS_JOB_ID"]
  25. else:
  26. infra_job_id = "unknown-job-id"
  27. csv_monitor_dir_name = os.path.join(infra_job_id, "logs")
  28. log_dir = os.path.join(base, csv_monitor_dir_name, self.job_name)
  29. os.makedirs(log_dir, exist_ok=True)
  30. return log_dir
  31. def write_events(self, event_list):
  32. if self.enabled and dist.get_rank() == 0:
  33. import csv
  34. # We assume each event_list element is a tensorboard-style tuple in the format: (log_name: String, value, step: Int)
  35. for event in event_list:
  36. log_name = event[0]
  37. value = event[1]
  38. step = event[2]
  39. # Set the header to the log_name
  40. # Need this check because the deepspeed engine currently formats log strings to separate with '/'
  41. if '/' in log_name:
  42. record_splits = log_name.split('/')
  43. header = record_splits[len(record_splits) - 1]
  44. else:
  45. header = log_name
  46. # sanitize common naming conventions into filename
  47. filename = log_name.replace('/', '_').replace(' ', '_')
  48. fname = self.log_dir + '/' + filename + '.csv'
  49. # Open file and record event. Insert header if this is the first time writing
  50. with open(fname, 'a+') as csv_monitor_file:
  51. csv_monitor_writer = csv.writer(csv_monitor_file)
  52. if filename not in self.filenames:
  53. self.filenames.append(filename)
  54. csv_monitor_writer.writerow(['step', header])
  55. csv_monitor_writer.writerow([step, value])