manager.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # agent_manager.py
  2. import importlib
  3. import os
  4. import json
  5. import base64
  6. import subprocess
  7. import sys
  8. from typing import List, Dict
  9. import requests
  10. from pathlib import Path
  11. class AgentManager:
  12. def __init__(self, base_url: str):
  13. self.base_url = base_url
  14. self.base_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../")
  15. self.cache_dir = Path(f'{self.base_path}/agenthub/cache')
  16. self.cache_dir.mkdir(parents=True, exist_ok=True)
  17. def _version_to_path(self, version: str) -> str:
  18. return version.replace('.', '-')
  19. def _path_to_version(self, path_version: str) -> str:
  20. return path_version.replace('-', '.')
  21. def upload_agent(self, folder_path: str):
  22. agent_files = self._get_agent_files(folder_path)
  23. metadata = self._get_agent_metadata(folder_path)
  24. payload = {
  25. "author": metadata.get("meta").get('author'),
  26. "name": metadata.get('name'),
  27. "version": metadata.get("meta").get('version'),
  28. "license": metadata.get("license", "Unknown"),
  29. "files": agent_files,
  30. "entry": metadata.get("build", {}).get("entry", "agent.py"),
  31. "module": metadata.get("build", {}).get("module", "Agent")
  32. }
  33. response = requests.post(f"{self.base_url}/api/upload", json=payload)
  34. response.raise_for_status()
  35. print(
  36. f"Agent {payload.get('author')}/{payload.get('name')} (v{payload.get('version')}) uploaded successfully.")
  37. def download_agent(self, author: str, name: str, version: str = "latest") -> tuple[str, str, str]:
  38. params = {
  39. "author": author,
  40. "name": name,
  41. }
  42. if version != 'latest':
  43. params['version'] = version
  44. cache_path = self._get_cache_path(author, name, version)
  45. if cache_path.exists():
  46. print(f"Using cached version of {author}/{name} (v{version})")
  47. return author, name, version
  48. else:
  49. cached_versions = sorted(
  50. self._get_cached_versions(author, name), reverse=True)
  51. if cached_versions:
  52. latest_cached = self._path_to_version(cached_versions[0])
  53. print(
  54. f"Using latest cached version of {author}/{name} (v{latest_cached})")
  55. return author, name, latest_cached
  56. response = requests.get(f"{self.base_url}/api/download", params=params)
  57. response.raise_for_status()
  58. agent_data = response.json()
  59. actual_version = agent_data.get('version', 'unknown')
  60. cache_path = self._get_cache_path(author, name, actual_version)
  61. self._save_agent_to_cache(agent_data, cache_path)
  62. print(
  63. f"Agent {author}/{name} (v{actual_version}) downloaded and cached successfully.")
  64. if not self.check_reqs_installed(cache_path):
  65. self.install_agent_reqs(cache_path)
  66. return author, name, actual_version
  67. def _get_cached_versions(self, author: str, name: str) -> List[str]:
  68. agent_dir = self.cache_dir / author / name
  69. if agent_dir.exists():
  70. return [v.name for v in agent_dir.iterdir() if v.is_dir()]
  71. return []
  72. def _get_cache_path(self, author: str, name: str, version: str) -> Path:
  73. return self.cache_dir / author / name / self._version_to_path(version)
  74. def _save_agent_to_cache(self, agent_data: Dict, cache_path: Path):
  75. cache_path.mkdir(parents=True, exist_ok=True)
  76. for file_data in agent_data["files"]:
  77. file_path = cache_path / file_data["path"]
  78. file_path.parent.mkdir(parents=True, exist_ok=True)
  79. with open(file_path, "wb") as f:
  80. f.write(base64.b64decode(file_data["content"]))
  81. def _get_agent_files(self, folder_path: str) -> List[Dict[str, str]]:
  82. files = []
  83. for root, _, filenames in os.walk(folder_path):
  84. for filename in filenames:
  85. file_path = os.path.join(root, filename)
  86. relative_path = os.path.relpath(file_path, folder_path)
  87. with open(file_path, "rb") as f:
  88. content = base64.b64encode(f.read()).decode('utf-8')
  89. files.append({
  90. "path": relative_path,
  91. "content": content
  92. })
  93. return files
  94. def _get_agent_metadata(self, folder_path: str) -> Dict[str, str]:
  95. config_path = os.path.join(folder_path, "config.json")
  96. if os.path.exists(config_path):
  97. with open(config_path, "r") as f:
  98. return json.load(f)
  99. return {}
  100. def list_available_agents(self) -> List[Dict[str, str]]:
  101. response = requests.get(f"{self.base_url}/api/get_all_agents")
  102. response.raise_for_status()
  103. response: dict = response.json()
  104. agent_list = []
  105. for v in list(response.values())[:-1]:
  106. agent_list.append({
  107. "agent": "/".join([v["author"], v["name"], v['version']])
  108. })
  109. return agent_list
  110. def check_agent_updates(self, author: str, name: str, current_version: str) -> bool:
  111. response = requests.get(f"{self.base_url}/api/check_updates", params={
  112. "author": author,
  113. "name": name,
  114. "current_version": current_version
  115. })
  116. response.raise_for_status()
  117. return response.json()["update_available"]
  118. def check_reqs_installed(self, agent_path: Path) -> bool:
  119. reqs_path = agent_path / "meta_requirements.txt"
  120. if not reqs_path.exists():
  121. return True # No requirements file, consider it as installed
  122. try:
  123. result = subprocess.run(
  124. ['conda', 'list'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  125. except Exception:
  126. result = subprocess.run(
  127. ['pip', 'list', '--format=freeze'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  128. with open(reqs_path, "r") as f:
  129. reqs = [line.strip().split("==")[0]
  130. for line in f if line.strip() and not line.startswith("#")]
  131. output = result.stdout.decode('utf-8')
  132. installed_packages = [line.split()[0]
  133. for line in output.splitlines() if line]
  134. return all(req in installed_packages for req in reqs)
  135. def install_agent_reqs(self, agent_path: Path):
  136. reqs_path = agent_path / "meta_requirements.txt"
  137. if not reqs_path.exists():
  138. print("No meta_requirements.txt found. Skipping dependency installation.")
  139. return
  140. log_path = agent_path / "deplogs.txt"
  141. print(f"Installing dependencies for agent. Writing to {log_path}")
  142. with open(log_path, "a") as f:
  143. subprocess.check_call([
  144. sys.executable,
  145. "-m",
  146. "pip",
  147. "install",
  148. "-r",
  149. str(reqs_path)
  150. ], stdout=f, stderr=f)
  151. def load_agent(self, author: str, name: str, version: str = "latest"):
  152. path_version = self._version_to_path(version)
  153. agent_config = self._get_agent_metadata(
  154. f'{self.cache_dir / author / name / path_version}')
  155. entry, module = agent_config.get("build", {}).get(
  156. "entry", "agent.py"), agent_config.get("build", {}).get("module", "Agent")
  157. module_name = ".".join(
  158. ["agenthub", "cache", author, name, path_version, entry[:-3]])
  159. print(module_name)
  160. agent_module = importlib.import_module(module_name)
  161. agent_class = getattr(agent_module, module)
  162. return agent_class
  163. if __name__ == '__main__':
  164. manager = AgentManager('http://localhost:3000/')
  165. manager.upload_agent('pyopenagi/agents/example/academic_agent')