123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import os
- import json
- import time
- from threading import Thread
- import threading
- from ..utils.logger import AgentLogger
- from ..utils.chat_template import Query
- import importlib
- from aios.hooks.request import send_request
- class BaseAgent:
- def __init__(self, agent_name, task_input, log_mode: str):
- # super().__init__()
- self.agent_name = agent_name
- self.config = self.load_config()
- self.tool_names = self.config["tools"]
-
- self.plan_max_fail_times = 3
- self.tool_call_max_fail_times = 3
- # self.agent_process_factory = agent_process_factory
- self.tool_list = dict()
- self.tools = []
- self.tool_info = (
- []
- ) # simplified information of the tool: {"name": "xxx", "description": "xxx"}
- self.load_tools(self.tool_names)
- self.start_time = None
- self.end_time = None
- self.request_waiting_times: list = []
- self.request_turnaround_times: list = []
- self.task_input = task_input
- self.messages = []
- self.workflow_mode = "manual" # (mannual, automatic)
- self.rounds = 0
- self.log_mode = log_mode
- self.logger = self.setup_logger()
- self.set_status("active")
- self.set_created_time(time.time())
- def run(self):
- """Execute each step to finish the task."""
- # self.set_aid(threading.get_ident())
- self.logger.log(
- f"{self.agent_name} starts running. Agent ID is {self.get_aid()}\n",
- level="info",
- )
- # can be customization
- def build_system_instruction(self):
- pass
- def check_workflow(self, message):
- try:
- # print(f"Workflow message: {message}")
- workflow = json.loads(message)
- if not isinstance(workflow, list):
- return None
- for step in workflow:
- if "message" not in step or "tool_use" not in step:
- return None
- return workflow
- except json.JSONDecodeError:
- return None
- def automatic_workflow(self):
- for i in range(self.plan_max_fail_times):
- response, start_times, end_times, waiting_times, turnaround_times = send_request(
- agent_name = self.agent_name,
- query=Query(
- messages=self.messages, tools=None, message_return_type="json"
- )
- )
- if self.rounds == 0:
- self.set_start_time(start_times[0])
- self.request_waiting_times.extend(waiting_times)
- self.request_turnaround_times.extend(turnaround_times)
- workflow = self.check_workflow(response.response_message)
- self.rounds += 1
- if workflow:
- return workflow
- else:
- self.messages.append(
- {
- "role": "assistant",
- "content": f"Fail {i+1} times to generate a valid plan. I need to regenerate a plan",
- }
- )
- return None
- def manual_workflow(self):
- pass
- def check_path(self, tool_calls):
- script_path = os.path.abspath(__file__)
- save_dir = os.path.join(
- os.path.dirname(script_path), "output"
- ) # modify the customized output path for saving outputs
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- for tool_call in tool_calls:
- try:
- for k in tool_call["parameters"]:
- if "path" in k:
- path = tool_call["parameters"][k]
- if not path.startswith(save_dir):
- tool_call["parameters"][k] = os.path.join(
- save_dir, os.path.basename(path)
- )
- except Exception:
- continue
- return tool_calls
- def snake_to_camel(self, snake_str):
- components = snake_str.split("_")
- return "".join(x.title() for x in components)
- def load_tools(self, tool_names):
- if tool_names == "None":
- return
- for tool_name in tool_names:
- org, name = tool_name.split("/")
- module_name = ".".join(["pyopenagi", "tools", org, name])
- class_name = self.snake_to_camel(name)
- tool_module = importlib.import_module(module_name)
- tool_class = getattr(tool_module, class_name)
- self.tool_list[name] = tool_class()
- tool_format = tool_class().get_tool_call_format()
- self.tools.append(tool_format)
- self.tool_info.append(
- {
- "name": tool_format["function"]["name"],
- "description": tool_format["function"]["description"],
- }
- )
- def pre_select_tools(self, tool_names):
- pre_selected_tools = []
- for tool_name in tool_names:
- for tool in self.tools:
- if tool["function"]["name"] == tool_name:
- pre_selected_tools.append(tool)
- break
- return pre_selected_tools
- def setup_logger(self):
- logger = AgentLogger(self.agent_name, self.log_mode)
- return logger
- def load_config(self):
- script_path = os.path.abspath(__file__)
- script_dir = os.path.dirname(script_path)
- config_file = os.path.join(script_dir, self.agent_name, "config.json")
- with open(config_file, "r") as f:
- config = json.load(f)
- return config
- def set_aid(self, aid):
- self.aid = aid
- def get_aid(self):
- return self.aid
- def get_agent_name(self):
- return self.agent_name
- def set_status(self, status):
- """
- Status type: Waiting, Running, Done, Inactive
- """
- self.status = status
- def get_status(self):
- return self.status
- def set_created_time(self, time):
- self.created_time = time
- def get_created_time(self):
- return self.created_time
- def set_start_time(self, time):
- self.start_time = time
- def get_start_time(self):
- return self.start_time
- def set_end_time(self, time):
- self.end_time = time
- def get_end_time(self):
- return self.end_time
|