test_monitor.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. from deepspeed.monitor.tensorboard import TensorBoardMonitor
  5. from deepspeed.monitor.wandb import WandbMonitor
  6. from deepspeed.monitor.csv_monitor import csvMonitor
  7. from deepspeed.monitor.config import DeepSpeedMonitorConfig
  8. from deepspeed.monitor.comet import CometMonitor
  9. from unit.common import DistributedTest
  10. from unittest.mock import Mock, patch
  11. from deepspeed.runtime.config import DeepSpeedConfig
  12. import deepspeed.comm as dist
  13. class TestTensorBoard(DistributedTest):
  14. world_size = 2
  15. def test_tensorboard(self):
  16. config_dict = {
  17. "train_batch_size": 2,
  18. "tensorboard": {
  19. "enabled": True,
  20. "output_path": "test_output/ds_logs/",
  21. "job_name": "test"
  22. }
  23. }
  24. ds_config = DeepSpeedConfig(config_dict)
  25. tb_monitor = TensorBoardMonitor(ds_config.monitor_config.tensorboard)
  26. assert tb_monitor.enabled == True
  27. assert tb_monitor.output_path == "test_output/ds_logs/"
  28. assert tb_monitor.job_name == "test"
  29. def test_empty_tensorboard(self):
  30. config_dict = {"train_batch_size": 2, "tensorboard": {}}
  31. ds_config = DeepSpeedConfig(config_dict)
  32. tb_monitor = TensorBoardMonitor(ds_config.monitor_config.tensorboard)
  33. defaults = DeepSpeedMonitorConfig().tensorboard
  34. assert tb_monitor.enabled == defaults.enabled
  35. assert tb_monitor.output_path == defaults.output_path
  36. assert tb_monitor.job_name == defaults.job_name
  37. class TestWandB(DistributedTest):
  38. world_size = 2
  39. def test_wandb(self):
  40. config_dict = {
  41. "train_batch_size": 2,
  42. "wandb": {
  43. "enabled": False,
  44. "group": "my_group",
  45. "team": "my_team",
  46. "project": "my_project"
  47. }
  48. }
  49. ds_config = DeepSpeedConfig(config_dict)
  50. wandb_monitor = WandbMonitor(ds_config.monitor_config.wandb)
  51. assert wandb_monitor.enabled == False
  52. assert wandb_monitor.group == "my_group"
  53. assert wandb_monitor.team == "my_team"
  54. assert wandb_monitor.project == "my_project"
  55. def test_empty_wandb(self):
  56. config_dict = {"train_batch_size": 2, "wandb": {}}
  57. ds_config = DeepSpeedConfig(config_dict)
  58. wandb_monitor = WandbMonitor(ds_config.monitor_config.wandb)
  59. defaults = DeepSpeedMonitorConfig().wandb
  60. assert wandb_monitor.enabled == defaults.enabled
  61. assert wandb_monitor.group == defaults.group
  62. assert wandb_monitor.team == defaults.team
  63. assert wandb_monitor.project == defaults.project
  64. class TestCSVMonitor(DistributedTest):
  65. world_size = 2
  66. def test_csv_monitor(self):
  67. config_dict = {
  68. "train_batch_size": 2,
  69. "csv_monitor": {
  70. "enabled": True,
  71. "output_path": "test_output/ds_logs/",
  72. "job_name": "test"
  73. }
  74. }
  75. ds_config = DeepSpeedConfig(config_dict)
  76. csv_monitor = csvMonitor(ds_config.monitor_config.csv_monitor)
  77. assert csv_monitor.enabled == True
  78. assert csv_monitor.output_path == "test_output/ds_logs/"
  79. assert csv_monitor.job_name == "test"
  80. def test_empty_csv_monitor(self):
  81. config_dict = {"train_batch_size": 2, "csv_monitor": {}}
  82. ds_config = DeepSpeedConfig(config_dict)
  83. csv_monitor = csvMonitor(ds_config.monitor_config.csv_monitor)
  84. defaults = DeepSpeedMonitorConfig().csv_monitor
  85. assert csv_monitor.enabled == defaults.enabled
  86. assert csv_monitor.output_path == defaults.output_path
  87. assert csv_monitor.job_name == defaults.job_name
  88. class TestCometMonitor(DistributedTest):
  89. world_size = 2
  90. def test_comet_monitor(self):
  91. import comet_ml
  92. mock_experiment = Mock()
  93. mock_start = Mock(return_value=mock_experiment)
  94. config_dict = {
  95. "train_batch_size": 2,
  96. "comet": {
  97. "enabled": True,
  98. "samples_log_interval": 42,
  99. "workspace": "some-workspace",
  100. "project": "some-project",
  101. "api_key": "some-api-key",
  102. "experiment_name": "some-experiment-name",
  103. "experiment_key": "some-experiment-key",
  104. "mode": "get_or_create",
  105. "online": True
  106. }
  107. }
  108. ds_config = DeepSpeedConfig(config_dict)
  109. with patch.object(comet_ml, "start", mock_start):
  110. comet_monitor = CometMonitor(ds_config.monitor_config.comet)
  111. assert comet_monitor.enabled is True
  112. assert comet_monitor.samples_log_interval == 42
  113. # experiment should be initialized via comet_ml.start only if rank == 0
  114. if dist.get_rank() == 0:
  115. mock_start.assert_called_once_with(
  116. api_key="some-api-key",
  117. project="some-project",
  118. workspace="some-workspace",
  119. experiment_key="some-experiment-key",
  120. mode="get_or_create",
  121. online=True,
  122. )
  123. mock_experiment.set_name.assert_called_once_with("some-experiment-name")
  124. assert comet_monitor.experiment is mock_experiment
  125. else:
  126. mock_start.assert_not_called()
  127. def test_empty_comet(self):
  128. import comet_ml
  129. mock_start = Mock()
  130. config_dict = {"train_batch_size": 2, "comet": {}}
  131. ds_config = DeepSpeedConfig(config_dict)
  132. with patch.object(comet_ml, "start", mock_start):
  133. comet_monitor = CometMonitor(ds_config.monitor_config.comet)
  134. defaults = DeepSpeedMonitorConfig().comet
  135. assert comet_monitor.enabled == defaults.enabled
  136. assert comet_monitor.samples_log_interval == defaults.samples_log_interval
  137. mock_start.assert_not_called()