call_core.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import time
  2. from threading import Thread
  3. from aios.hooks.stores._global import global_llm_req_queue_add_message
  4. from .agent_process import AgentProcess
  5. from ..utils.logger import AgentLogger
  6. class CustomizedThread(Thread):
  7. def __init__(self, target, args=()):
  8. super().__init__()
  9. self.target = target
  10. self.args = args
  11. self.result = None
  12. def run(self):
  13. self.result = self.target(*self.args)
  14. def join(self):
  15. super().join()
  16. return self.result
  17. class CallCore:
  18. """
  19. Simplify BaseAgent to provide an interface for external frameworks to make LLM requests using aios.
  20. """
  21. def __init__(self,
  22. agent_name,
  23. agent_process_factory,
  24. log_mode: str = "console"
  25. ):
  26. self.agent_name = agent_name
  27. self.agent_process_factory = agent_process_factory
  28. self.log_mode = log_mode
  29. self.logger = self.setup_logger()
  30. # the default method used for getting response from AIOS
  31. def get_response(self,
  32. query,
  33. temperature=0.0
  34. ):
  35. thread = CustomizedThread(target=self.query_loop, args=(query,))
  36. thread.start()
  37. return thread.join()
  38. def query_loop(self, query):
  39. agent_process = self.create_agent_request(query)
  40. completed_response, start_times, end_times, waiting_times, turnaround_times = "", [], [], [], []
  41. while agent_process.get_status() != "done":
  42. thread = Thread(target=self.listen, args=(agent_process,))
  43. current_time = time.time()
  44. # reinitialize agent status
  45. agent_process.set_created_time(current_time)
  46. agent_process.set_response(None)
  47. global_llm_req_queue_add_message(agent_process)
  48. # LLMRequestQueue.add_message(agent_process)
  49. thread.start()
  50. thread.join()
  51. completed_response = agent_process.get_response()
  52. if agent_process.get_status() != "done":
  53. self.logger.log(
  54. f"Suspended due to the reach of time limit ({agent_process.get_time_limit()}s). Current result is: {completed_response.response_message}\n",
  55. level="suspending"
  56. )
  57. start_time = agent_process.get_start_time()
  58. end_time = agent_process.get_end_time()
  59. waiting_time = start_time - agent_process.get_created_time()
  60. turnaround_time = end_time - agent_process.get_created_time()
  61. start_times.append(start_time)
  62. end_times.append(end_time)
  63. waiting_times.append(waiting_time)
  64. turnaround_times.append(turnaround_time)
  65. # Re-start the thread if not done
  66. # self.agent_process_factory.deactivate_agent_process(agent_process.get_pid())
  67. return completed_response, start_times, end_times, waiting_times, turnaround_times
  68. def create_agent_request(self, query):
  69. agent_process = self.agent_process_factory.activate_agent_process(
  70. agent_name=self.agent_name,
  71. query=query
  72. )
  73. agent_process.set_created_time(time.time())
  74. # print("Already put into the queue")
  75. return agent_process
  76. def listen(self, agent_process: AgentProcess):
  77. """Response Listener for agent
  78. Args:
  79. agent_process (AgentProcess): Listened AgentProcess
  80. Returns:
  81. str: LLM response of Agent Process
  82. """
  83. while agent_process.get_response() is None:
  84. time.sleep(0.2)
  85. return agent_process.get_response()
  86. def setup_logger(self):
  87. logger = AgentLogger(self.agent_name, self.log_mode)
  88. return logger