123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import time
- from threading import Thread
- from aios.hooks.stores._global import global_llm_req_queue_add_message
- from .agent_process import AgentProcess
- from ..utils.logger import AgentLogger
- class CustomizedThread(Thread):
- def __init__(self, target, args=()):
- super().__init__()
- self.target = target
- self.args = args
- self.result = None
- def run(self):
- self.result = self.target(*self.args)
- def join(self):
- super().join()
- return self.result
- class CallCore:
- """
- Simplify BaseAgent to provide an interface for external frameworks to make LLM requests using aios.
- """
- def __init__(self,
- agent_name,
- agent_process_factory,
- log_mode: str = "console"
- ):
- self.agent_name = agent_name
- self.agent_process_factory = agent_process_factory
- self.log_mode = log_mode
- self.logger = self.setup_logger()
- # the default method used for getting response from AIOS
- def get_response(self,
- query,
- temperature=0.0
- ):
- thread = CustomizedThread(target=self.query_loop, args=(query,))
- thread.start()
- return thread.join()
- def query_loop(self, query):
- agent_process = self.create_agent_request(query)
- completed_response, start_times, end_times, waiting_times, turnaround_times = "", [], [], [], []
- while agent_process.get_status() != "done":
- thread = Thread(target=self.listen, args=(agent_process,))
- current_time = time.time()
- # reinitialize agent status
- agent_process.set_created_time(current_time)
- agent_process.set_response(None)
- global_llm_req_queue_add_message(agent_process)
- # LLMRequestQueue.add_message(agent_process)
- thread.start()
- thread.join()
- completed_response = agent_process.get_response()
- if agent_process.get_status() != "done":
- self.logger.log(
- f"Suspended due to the reach of time limit ({agent_process.get_time_limit()}s). Current result is: {completed_response.response_message}\n",
- level="suspending"
- )
- start_time = agent_process.get_start_time()
- end_time = agent_process.get_end_time()
- waiting_time = start_time - agent_process.get_created_time()
- turnaround_time = end_time - agent_process.get_created_time()
- start_times.append(start_time)
- end_times.append(end_time)
- waiting_times.append(waiting_time)
- turnaround_times.append(turnaround_time)
- # Re-start the thread if not done
- # self.agent_process_factory.deactivate_agent_process(agent_process.get_pid())
- return completed_response, start_times, end_times, waiting_times, turnaround_times
- def create_agent_request(self, query):
- agent_process = self.agent_process_factory.activate_agent_process(
- agent_name=self.agent_name,
- query=query
- )
- agent_process.set_created_time(time.time())
- # print("Already put into the queue")
- return agent_process
- def listen(self, agent_process: AgentProcess):
- """Response Listener for agent
- Args:
- agent_process (AgentProcess): Listened AgentProcess
- Returns:
- str: LLM response of Agent Process
- """
- while agent_process.get_response() is None:
- time.sleep(0.2)
- return agent_process.get_response()
- def setup_logger(self):
- logger = AgentLogger(self.agent_name, self.log_mode)
- return logger
|