test_cache.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import asyncio
  2. import multiprocessing as mp
  3. import random
  4. import time
  5. from typing import Optional
  6. import pytest
  7. import pytest_asyncio # make sure the module exists; otherwise the test will be skipped
  8. import torch
  9. from hivemind import TensorDescriptor
  10. from petals.server.memory_cache import AllocationFailed, MemoryCache
  11. from petals.utils.misc import get_size_in_bytes
  12. def _make_tensor_descriptor(num_bytes: int, dtype: Optional[torch.dtype] = None):
  13. if dtype is None:
  14. dtype = random.choice((torch.int64, torch.int8, torch.uint8, torch.float32, torch.bfloat16, torch.bool))
  15. elem_size_bytes = get_size_in_bytes(dtype)
  16. descr = TensorDescriptor.from_tensor(torch.empty((num_bytes // elem_size_bytes,), dtype=dtype))
  17. return descr
  18. @pytest.mark.asyncio
  19. async def test_cache_timeout():
  20. cache = MemoryCache(max_size_bytes=1024, max_alloc_timeout=0.5)
  21. cache.runtime_pid += 1 # pretend we're another process
  22. async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=0):
  23. pass
  24. async with cache.allocate_cache(_make_tensor_descriptor(100), timeout=999):
  25. async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0):
  26. async with cache.allocate_cache(_make_tensor_descriptor(128), _make_tensor_descriptor(32), timeout=1):
  27. t_start = time.perf_counter()
  28. with pytest.raises(AllocationFailed):
  29. async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=0.1):
  30. pass
  31. assert 0.1 < time.perf_counter() - t_start < 0.2, "wait time exceeds alloc timeout"
  32. async with cache.allocate_cache(_make_tensor_descriptor(128), timeout=float("inf")):
  33. pass
  34. t_start = time.perf_counter()
  35. with pytest.raises(AllocationFailed):
  36. async with cache.allocate_cache(_make_tensor_descriptor(384), timeout=1.0): # exceeds max timeout
  37. pass
  38. assert 0.5 < time.perf_counter() - t_start < 0.6, "wait time exceeds max alloc timeout"
  39. # test memory allocation when another task frees the memory
  40. async def _klog_the_cache():
  41. async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0.2):
  42. pass
  43. large_alloc_task = asyncio.create_task(_klog_the_cache())
  44. t_start = time.perf_counter()
  45. await asyncio.sleep(0.05) # wait for large alloc to enqueue
  46. async with cache.allocate_cache(_make_tensor_descriptor(128), timeout=float("inf")): # exceeds max timeout
  47. pass # this memory should allocate once the background task clears the queue
  48. assert 0.2 < time.perf_counter() - t_start < 0.3, "memory should be allocated after background task clears"
  49. with pytest.raises(AllocationFailed):
  50. await large_alloc_task
  51. # test that zero-timeout allocation fails instantaneously even if someone else is awaiting alloc
  52. large_alloc_task = asyncio.create_task(_klog_the_cache())
  53. t_start = time.perf_counter()
  54. await asyncio.sleep(0.05) # wait for large alloc to enqueue
  55. with pytest.raises(AllocationFailed):
  56. async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0):
  57. pass # this memory should allocate once the background task clears the queue
  58. assert time.perf_counter() - t_start < 0.1, "zero-timeout task should fail (or succeed) instantaneously"
  59. with pytest.raises(AllocationFailed):
  60. await large_alloc_task
  61. @pytest.mark.asyncio
  62. async def test_unlimited_timeout():
  63. cache = MemoryCache(max_size_bytes=1024)
  64. cache.runtime_pid += 1 # pretend we're another process
  65. t_start = time.perf_counter()
  66. async def _klog_the_cache():
  67. async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0.2):
  68. await asyncio.sleep(0.5)
  69. alloc_task = asyncio.create_task(_klog_the_cache())
  70. await asyncio.sleep(0.1)
  71. async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=float("inf")):
  72. await alloc_task
  73. assert 0.5 < time.perf_counter() - t_start < 0.6, "memory should be allocated after background task clears"
  74. @pytest.mark.asyncio
  75. async def test_cache_usage():
  76. cache = MemoryCache(max_size_bytes=2048)
  77. alloc_event, dealloc_a_event, dealloc_bcd_event, dealloc_e_event, dealloc_f_event = (mp.Event() for _ in range(5))
  78. pipe_receiver, pipe_sender = mp.Pipe(duplex=False)
  79. with pytest.raises(AssertionError):
  80. async with cache.allocate_cache(_make_tensor_descriptor(123), timeout=1):
  81. pass # fails because cache must be allocated from another process
  82. descr_a = TensorDescriptor.from_tensor(torch.empty(768, dtype=torch.uint8)) # 768 bytes
  83. descr_b = TensorDescriptor.from_tensor(torch.empty((), dtype=torch.float64)) # 8 bytes
  84. descr_c = TensorDescriptor.from_tensor(torch.empty((33,), dtype=torch.bool)) # 33 bytes
  85. descr_d = TensorDescriptor.from_tensor(torch.empty((0,), dtype=torch.int64)) # 0 bytes
  86. descr_e = TensorDescriptor.from_tensor(torch.empty((96, 8), dtype=torch.bfloat16)) # 1536 bytes
  87. descr_f = TensorDescriptor.from_tensor(torch.empty((1792,), dtype=torch.uint8)) # 1792 bytes
  88. async def _allocate_and_wait(dealloc_event, *descrs, timeout=None):
  89. loop = asyncio.get_event_loop()
  90. async with cache.allocate_cache(*descrs, timeout=timeout) as handles:
  91. pipe_sender.send(handles)
  92. await loop.run_in_executor(None, dealloc_event.wait)
  93. async def _allocate_af():
  94. alloc_event.wait()
  95. allocate_a_task = asyncio.create_task(_allocate_and_wait(dealloc_a_event, descr_a))
  96. await allocate_a_task
  97. allocate_f_task = asyncio.create_task(_allocate_and_wait(dealloc_f_event, descr_f)) # klogs the cache
  98. await allocate_f_task
  99. alloc_process1 = mp.context.ForkProcess(target=lambda: asyncio.run(_allocate_af()), daemon=True)
  100. alloc_process1.start()
  101. async def _allocate_bcde():
  102. alloc_event.wait()
  103. await asyncio.sleep(0.1) # ensure that the other tensor is always allocated (and sent through pipe) first
  104. allocate_bcd_task = asyncio.create_task(_allocate_and_wait(dealloc_bcd_event, descr_b, descr_c, descr_d))
  105. allocate_e_task = asyncio.create_task(_allocate_and_wait(dealloc_e_event, descr_e)) # doesn't fit
  106. await asyncio.wait({allocate_e_task, allocate_bcd_task}, return_when=asyncio.ALL_COMPLETED)
  107. alloc_process2 = mp.context.ForkProcess(target=lambda: asyncio.run(_allocate_bcde()), daemon=True)
  108. alloc_process2.start()
  109. assert cache.current_size_bytes == 0
  110. alloc_event.set()
  111. (handle_a,) = pipe_receiver.recv()
  112. handle_b, handle_c, handle_d = pipe_receiver.recv()
  113. with cache.use_cache(handle_a) as (tensor_a,):
  114. assert tensor_a.dtype == torch.uint8
  115. tensor_a[2:5] = torch.tensor((42, 43, 44))
  116. with cache.use_cache(handle_a, handle_b, handle_d) as (tensor_a, tensor_b, tensor_d):
  117. assert tensor_b.dtype == torch.float64 and tensor_b.numel() == 1 and tensor_b.ndim == 0
  118. assert tensor_d.dtype == torch.int64 and tensor_d.numel() == 0
  119. tensor_a += 1
  120. tensor_b[...] = -1.337
  121. assert cache.current_size_bytes == 809 # this checks a,b,c,d are allocated but b still awaits memory
  122. dealloc_bcd_event.set()
  123. await asyncio.sleep(0.1)
  124. assert cache.current_size_bytes == 768 # only tensor a should be allocated
  125. with pytest.raises(KeyError):
  126. with cache.use_cache(handle_a, handle_b):
  127. pass # one of handles (c) is deallocated
  128. with pytest.raises(KeyError):
  129. with cache.use_cache(handle_d):
  130. pass # handle_d is deallocated correctly, even though it is never used
  131. with cache.use_cache(handle_a) as (tensor_a,):
  132. assert tuple(tensor_a[2:5]) == (43, 44, 45)
  133. dealloc_a_event.set()
  134. (handle_e,) = pipe_receiver.recv() # e can finally be allocated
  135. await asyncio.sleep(0.1)
  136. assert cache.current_size_bytes == 1536 # tensor e should finally be able to allocate
  137. with pytest.raises(KeyError):
  138. with cache.use_cache(handle_a):
  139. pass # tensor a is no longer allocated
  140. with cache.use_cache(handle_e) as (tensor_e,):
  141. assert tensor_e.dtype == torch.bfloat16 and tensor_e.shape == (96, 8)
  142. dealloc_e_event.set()
  143. await asyncio.sleep(0.1)
  144. assert cache.current_size_bytes == 1792 # only tensor f is still allocated
  145. dealloc_f_event.set()
  146. alloc_process1.join()
  147. alloc_process2.join()
  148. await asyncio.sleep(0.1)
  149. assert cache.current_size_bytes == 0
  150. assert cache.current_size_bytes == 0
  151. assert alloc_process1.exitcode == 0, "allocation process 1 failed or did not finish, see stderr for details"
  152. assert alloc_process2.exitcode == 0, "allocation process 2 failed or did not finish, see stderr for details"