event_utils.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import os
  2. import time
  3. import mmap
  4. import json
  5. import fnmatch
  6. import asyncio
  7. import itertools
  8. import collections
  9. import logging.handlers
  10. from ray.dashboard.modules.event import event_consts
  11. from ray.dashboard.utils import async_loop_forever, create_task
  12. logger = logging.getLogger(__name__)
  13. def _get_source_files(event_dir, source_types=None, event_file_filter=None):
  14. event_log_names = os.listdir(event_dir)
  15. source_files = {}
  16. all_source_types = set(event_consts.EVENT_SOURCE_ALL)
  17. for source_type in source_types or event_consts.EVENT_SOURCE_ALL:
  18. assert source_type in all_source_types, f"Invalid source type: {source_type}"
  19. files = []
  20. for n in event_log_names:
  21. if fnmatch.fnmatch(n, f"*{source_type}*"):
  22. f = os.path.join(event_dir, n)
  23. if event_file_filter is not None and not event_file_filter(f):
  24. continue
  25. files.append(f)
  26. if files:
  27. source_files[source_type] = files
  28. return source_files
  29. def _restore_newline(event_dict):
  30. try:
  31. event_dict["message"] = (
  32. event_dict["message"].replace("\\n", "\n").replace("\\r", "\n")
  33. )
  34. except Exception:
  35. logger.exception("Restore newline for event failed: %s", event_dict)
  36. return event_dict
  37. def _parse_line(event_str):
  38. return _restore_newline(json.loads(event_str))
  39. def parse_event_strings(event_string_list):
  40. events = []
  41. for data in event_string_list:
  42. if not data:
  43. continue
  44. try:
  45. event = _parse_line(data)
  46. events.append(event)
  47. except Exception:
  48. logger.exception("Parse event line failed: %s", repr(data))
  49. return events
  50. ReadFileResult = collections.namedtuple(
  51. "ReadFileResult", ["fid", "size", "mtime", "position", "lines"]
  52. )
  53. def _read_file(
  54. file, pos, n_lines=event_consts.EVENT_READ_LINE_COUNT_LIMIT, closefd=True
  55. ):
  56. with open(file, "rb", closefd=closefd) as f:
  57. # The ino may be 0 on Windows.
  58. stat = os.stat(f.fileno())
  59. fid = stat.st_ino or file
  60. lines = []
  61. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
  62. start = pos
  63. for _ in range(n_lines):
  64. sep = mm.find(b"\n", start)
  65. if sep == -1:
  66. break
  67. if sep - start <= event_consts.EVENT_READ_LINE_LENGTH_LIMIT:
  68. lines.append(mm[start:sep].decode("utf-8"))
  69. else:
  70. truncated_size = min(100, event_consts.EVENT_READ_LINE_LENGTH_LIMIT)
  71. logger.warning(
  72. "Ignored long string: %s...(%s chars)",
  73. mm[start : start + truncated_size].decode("utf-8"),
  74. sep - start,
  75. )
  76. start = sep + 1
  77. return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines)
  78. def monitor_events(
  79. event_dir,
  80. callback,
  81. scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS,
  82. start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS,
  83. monitor_files=None,
  84. source_types=None,
  85. ):
  86. """Monitor events in directory. New events will be read and passed to the
  87. callback.
  88. Args:
  89. event_dir (str): The event log directory.
  90. callback (def callback(List[str]): pass): A callback accepts a list of
  91. event strings.
  92. scan_interval_seconds (float): An interval seconds between two scans.
  93. start_mtime (float): Only the event log files whose last modification
  94. time is greater than start_mtime are monitored.
  95. monitor_files (Dict[int, MonitorFile]): The map from event log file id
  96. to MonitorFile object. Monitor all files start from the beginning
  97. if the value is None.
  98. source_types (List[str]): A list of source type name from
  99. event_pb2.Event.SourceType.keys(). Monitor all source types if the
  100. value is None.
  101. """
  102. loop = asyncio.get_event_loop()
  103. if monitor_files is None:
  104. monitor_files = {}
  105. logger.info(
  106. "Monitor events logs modified after %s on %s, " "the source types are %s.",
  107. start_mtime,
  108. event_dir,
  109. "all" if source_types is None else source_types,
  110. )
  111. MonitorFile = collections.namedtuple("MonitorFile", ["size", "mtime", "position"])
  112. def _source_file_filter(source_file):
  113. stat = os.stat(source_file)
  114. return stat.st_mtime > start_mtime
  115. def _read_monitor_file(file, pos):
  116. assert isinstance(
  117. file, str
  118. ), f"File should be a str, but a {type(file)}({file}) found"
  119. fd = os.open(file, os.O_RDONLY)
  120. try:
  121. stat = os.stat(fd)
  122. # Check the file size to avoid raising the exception
  123. # ValueError: cannot mmap an empty file
  124. if stat.st_size <= 0:
  125. return []
  126. fid = stat.st_ino or file
  127. monitor_file = monitor_files.get(fid)
  128. if monitor_file:
  129. if (
  130. monitor_file.position == monitor_file.size
  131. and monitor_file.size == stat.st_size
  132. and monitor_file.mtime == stat.st_mtime
  133. ):
  134. logger.debug(
  135. "Skip reading the file because " "there is no change: %s", file
  136. )
  137. return []
  138. position = monitor_file.position
  139. else:
  140. logger.info("Found new event log file: %s", file)
  141. position = pos
  142. # Close the fd in finally.
  143. r = _read_file(fd, position, closefd=False)
  144. # It should be fine to update the dict in executor thread.
  145. monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position)
  146. loop.call_soon_threadsafe(callback, r.lines)
  147. except Exception as e:
  148. raise Exception(f"Read event file failed: {file}") from e
  149. finally:
  150. os.close(fd)
  151. @async_loop_forever(scan_interval_seconds, cancellable=True)
  152. async def _scan_event_log_files():
  153. # Scan event files.
  154. source_files = await loop.run_in_executor(
  155. None, _get_source_files, event_dir, source_types, _source_file_filter
  156. )
  157. # Limit concurrent read to avoid fd exhaustion.
  158. semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT)
  159. async def _concurrent_coro(filename):
  160. async with semaphore:
  161. return await loop.run_in_executor(None, _read_monitor_file, filename, 0)
  162. # Read files.
  163. await asyncio.gather(
  164. *[
  165. _concurrent_coro(filename)
  166. for filename in list(itertools.chain(*source_files.values()))
  167. ]
  168. )
  169. return create_task(_scan_event_log_files())