event_agent.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import os
  2. import asyncio
  3. import logging
  4. from typing import Union
  5. import ray.experimental.internal_kv as internal_kv
  6. import ray.ray_constants as ray_constants
  7. import ray._private.utils as utils
  8. import ray.dashboard.utils as dashboard_utils
  9. import ray.dashboard.consts as dashboard_consts
  10. from ray.dashboard.utils import async_loop_forever, create_task
  11. from ray.dashboard.modules.event import event_consts
  12. from ray.dashboard.modules.event.event_utils import monitor_events
  13. from ray.core.generated import event_pb2
  14. from ray.core.generated import event_pb2_grpc
  15. logger = logging.getLogger(__name__)
  16. class EventAgent(dashboard_utils.DashboardAgentModule):
  17. def __init__(self, dashboard_agent):
  18. super().__init__(dashboard_agent)
  19. self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
  20. os.makedirs(self._event_dir, exist_ok=True)
  21. self._monitor: Union[asyncio.Task, None] = None
  22. self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None
  23. self._cached_events = asyncio.Queue(event_consts.EVENT_AGENT_CACHE_SIZE)
  24. logger.info("Event agent cache buffer size: %s", self._cached_events.maxsize)
  25. async def _connect_to_dashboard(self):
  26. """Connect to the dashboard. If the dashboard is not started, then
  27. this method will never returns.
  28. Returns:
  29. The ReportEventServiceStub object.
  30. """
  31. while True:
  32. try:
  33. # TODO: Use async version if performance is an issue
  34. dashboard_rpc_address = internal_kv._internal_kv_get(
  35. dashboard_consts.DASHBOARD_RPC_ADDRESS,
  36. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  37. )
  38. if dashboard_rpc_address:
  39. logger.info("Report events to %s", dashboard_rpc_address)
  40. options = (("grpc.enable_http_proxy", 0),)
  41. channel = utils.init_grpc_channel(
  42. dashboard_rpc_address, options=options, asynchronous=True
  43. )
  44. return event_pb2_grpc.ReportEventServiceStub(channel)
  45. except Exception:
  46. logger.exception("Connect to dashboard failed.")
  47. await asyncio.sleep(
  48. event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS
  49. )
  50. @async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
  51. async def report_events(self):
  52. """Report events from cached events queue. Reconnect to dashboard if
  53. report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.
  54. This method will never returns.
  55. """
  56. data = await self._cached_events.get()
  57. for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
  58. try:
  59. logger.info("Report %s events.", len(data))
  60. request = event_pb2.ReportEventsRequest(event_strings=data)
  61. await self._stub.ReportEvents(request)
  62. break
  63. except Exception:
  64. logger.exception("Report event failed, reconnect to the " "dashboard.")
  65. self._stub = await self._connect_to_dashboard()
  66. else:
  67. data_str = str(data)
  68. limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
  69. logger.error(
  70. "Report event failed: %s",
  71. data_str[:limit] + (data_str[limit:] and "..."),
  72. )
  73. async def run(self, server):
  74. # Connect to dashboard.
  75. self._stub = await self._connect_to_dashboard()
  76. # Start monitor task.
  77. self._monitor = monitor_events(
  78. self._event_dir,
  79. lambda data: create_task(self._cached_events.put(data)),
  80. source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES,
  81. )
  82. # Start reporting events.
  83. await self.report_events()
  84. @staticmethod
  85. def is_minimal_module():
  86. return False