single_memory.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # This file makes it easier for threads to access memory for each agent by
  2. # placing a lock on memory that needs to be read later. This is implemented as
  3. # wrapper upon the access methods in BaseMemoryManager
  4. from aios.memory.base import (
  5. MemoryRequest,
  6. BaseMemoryManager
  7. )
  8. # allows for lists to be heapify'd so the blocks are in order
  9. import heapq
  10. # FIFO queue for whichever thread stops blocking first
  11. from queue import Queue, Empty
  12. from utils.compressor import (
  13. ZLIBCompressor
  14. )
  15. from aios.memory.base import (
  16. Memory
  17. )
  18. from threading import Thread
  19. class UniformedMemoryManager(BaseMemoryManager):
  20. def __init__(self, max_memory_block_size, memory_block_num):
  21. super().__init__(max_memory_block_size, memory_block_num)
  22. """ initiate the memory manager in a manner similar to malloc(3) """
  23. self.memory_blocks = [
  24. Memory(max_memory_block_size) for _ in range(memory_block_num)
  25. ]
  26. self.free_memory_blocks = [i for i in range(0, memory_block_num)]
  27. self.thread = Thread(target=self.run)
  28. self.aid_to_memory = dict() # map agent id to memory block id, address, size
  29. # {
  30. # agent_id: {
  31. # round_id: {"memory_block_id": int, "address": int, size: int}
  32. # }
  33. # }
  34. self.compressor = ZLIBCompressor() # used for compressing data
  35. """ maintain a min heap structure for free memory blocks """
  36. heapq.heapify(self.free_memory_blocks)
  37. self.memory_operation_queue = Queue() # TODO add lock to ensure parallel
  38. def run(self):
  39. while self.active:
  40. try:
  41. """ give a reasonable timeout between iterations here """
  42. memory_request = self.memory_operation_queue.get(block=True, timeout=0.1)
  43. self.execute_operation(memory_request)
  44. except Empty:
  45. pass
  46. def start(self):
  47. """start the scheduler"""
  48. self.active = True
  49. self.thread.start()
  50. def execute_operation(self, memory_request: MemoryRequest):
  51. operation_type = memory_request.operation_type
  52. if operation_type == "write":
  53. self.mem_write(
  54. agent_id=memory_request.agent_id, content=memory_request.content
  55. )
  56. elif operation_type == "read":
  57. self.mem_read(
  58. agent_id=memory_request.agent_id, round_id=memory_request.round_id
  59. )
  60. def stop(self):
  61. """stop the scheduler"""
  62. self.active = False
  63. self.thread.join()
  64. def mem_write(self, agent_id, round_id: str, content: str):
  65. """ write to memory given agent id """
  66. compressed_content = self.compressor.compress(content)
  67. size = len(compressed_content)
  68. address = self.memory_blocks[
  69. self.aid_to_memory[agent_id][round_id]["memory_block_id"]
  70. ].mem_alloc(size)
  71. self.memory_blocks[
  72. self.aid_to_memory[agent_id][round_id]["memory_block_id"]
  73. ].mem_write(address, compressed_content)
  74. def mem_read(self, agent_id, round_id):
  75. """ read memory given agent id """
  76. memory_block_id = self.aid_to_memory[agent_id][round_id]
  77. data = self.memory_blocks[memory_block_id].mem_read(
  78. self.aid_to_memory[agent_id][round_id]["address"],
  79. self.aid_to_memory[agent_id][round_id]["size"]
  80. )
  81. return data
  82. def mem_alloc(self, agent_id):
  83. memory_block_id = heapq.heappop(self.free_memory_blocks)
  84. self.aid_to_memory[agent_id] = {
  85. "memory_block_id": memory_block_id
  86. }
  87. def mem_clear(self, agent_id):
  88. memory_block = self.aid_to_memory.pop(agent_id)
  89. memory_block_id = memory_block['memory_block_id']
  90. heapq.heappush(self.free_memory_blocks, memory_block_id)