milvus.py 34 KB


  1. import json
  2. import requests
  3. import time
  4. import uuid
  5. from utils.util_log import test_log as logger
  6. from minio import Minio
  7. from minio.error import S3Error
  8. from minio.commonconfig import CopySource
  9. from tenacity import retry, retry_if_exception_type, stop_after_attempt
  10. from requests.exceptions import ConnectionError
  11. import urllib.parse
  12. ENABLE_LOG_SAVE = False
  13. def simplify_list(lst):
  14. if len(lst) > 20:
  15. return [lst[0], '...', lst[-1]]
  16. return lst
  17. def simplify_dict(d):
  18. if d is None:
  19. d = {}
  20. if len(d) > 20:
  21. keys = list(d.keys())
  22. d = {keys[0]: d[keys[0]], '...': '...', keys[-1]: d[keys[-1]]}
  23. simplified = {}
  24. for k, v in d.items():
  25. if isinstance(v, list):
  26. simplified[k] = simplify_list([simplify_dict(item) if isinstance(item, dict) else simplify_list(
  27. item) if isinstance(item, list) else item for item in v])
  28. elif isinstance(v, dict):
  29. simplified[k] = simplify_dict(v)
  30. else:
  31. simplified[k] = v
  32. return simplified
  33. def build_curl_command(method, url, headers, data=None, params=None):
  34. if isinstance(params, dict):
  35. query_string = urllib.parse.urlencode(params)
  36. url = f"{url}?{query_string}"
  37. curl_cmd = [f"curl -X {method} '{url}'"]
  38. for key, value in headers.items():
  39. curl_cmd.append(f" -H '{key}: {value}'")
  40. if data:
  41. # process_and_simplify(data)
  42. data = json.dumps(data, indent=4)
  43. curl_cmd.append(f" -d '{data}'")
  44. return " \\\n".join(curl_cmd)
  45. def logger_request_response(response, url, tt, headers, data, str_data, str_response, method, params=None):
  46. # save data to jsonl file
  47. data_dict = json.loads(data) if data else {}
  48. data_dict_simple = simplify_dict(data_dict)
  49. if ENABLE_LOG_SAVE:
  50. with open('request_response.jsonl', 'a') as f:
  51. f.write(json.dumps({
  52. "method": method,
  53. "url": url,
  54. "headers": headers,
  55. "params": params,
  56. "data": data_dict_simple,
  57. "response": response.json()
  58. }) + "\n")
  59. data = json.dumps(data_dict_simple, indent=4)
  60. try:
  61. if response.status_code == 200:
  62. if ('code' in response.json() and response.json()["code"] == 0) or (
  63. 'Code' in response.json() and response.json()["Code"] == 0):
  64. logger.debug(
  65. f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {str_response}")
  66. else:
  67. logger.debug(
  68. f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
  69. else:
  70. logger.debug(
  71. f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
  72. except Exception as e:
  73. logger.debug(
  74. f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}, \nerror: {e}")
  75. class Requests():
  76. uuid = str(uuid.uuid1())
  77. api_key = None
  78. def __init__(self, url=None, api_key=None):
  79. self.url = url
  80. self.api_key = api_key
  81. if self.uuid is None:
  82. self.uuid = str(uuid.uuid1())
  83. self.headers = {
  84. 'Content-Type': 'application/json',
  85. 'Authorization': f'Bearer {self.api_key}',
  86. 'RequestId': self.uuid
  87. }
  88. @classmethod
  89. def update_uuid(cls, _uuid):
  90. cls.uuid = _uuid
  91. @classmethod
  92. def update_headers(cls):
  93. headers = {
  94. 'Content-Type': 'application/json',
  95. 'Authorization': f'Bearer {cls.api_key}',
  96. 'RequestId': cls.uuid
  97. }
  98. return headers
  99. # retry when request failed caused by network or server error
  100. @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
  101. def post(self, url, headers=None, data=None, params=None):
  102. headers = headers if headers is not None else self.update_headers()
  103. data = json.dumps(data)
  104. str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
  105. t0 = time.time()
  106. response = requests.post(url, headers=headers, data=data, params=params)
  107. tt = time.time() - t0
  108. str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
  109. logger_request_response(response, url, tt, headers, data, str_data, str_response, "post", params=params)
  110. return response
  111. @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
  112. def get(self, url, headers=None, params=None, data=None):
  113. headers = headers if headers is not None else self.update_headers()
  114. data = json.dumps(data)
  115. str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
  116. t0 = time.time()
  117. if data is None or data == "null":
  118. response = requests.get(url, headers=headers, params=params)
  119. else:
  120. response = requests.get(url, headers=headers, params=params, data=data)
  121. tt = time.time() - t0
  122. str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
  123. logger_request_response(response, url, tt, headers, data, str_data, str_response, "get", params=params)
  124. return response
  125. @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
  126. def put(self, url, headers=None, data=None):
  127. headers = headers if headers is not None else self.update_headers()
  128. data = json.dumps(data)
  129. str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
  130. t0 = time.time()
  131. response = requests.put(url, headers=headers, data=data)
  132. tt = time.time() - t0
  133. str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
  134. logger_request_response(response, url, tt, headers, data, str_data, str_response, "put")
  135. return response
  136. @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
  137. def delete(self, url, headers=None, data=None):
  138. headers = headers if headers is not None else self.update_headers()
  139. data = json.dumps(data)
  140. str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
  141. t0 = time.time()
  142. response = requests.delete(url, headers=headers, data=data)
  143. tt = time.time() - t0
  144. str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
  145. logger_request_response(response, url, tt, headers, data, str_data, str_response, "delete")
  146. return response
  147. class VectorClient(Requests):
  148. def __init__(self, endpoint, token):
  149. super().__init__(url=endpoint, api_key=token)
  150. self.endpoint = endpoint
  151. self.token = token
  152. self.api_key = token
  153. self.db_name = None
  154. self.headers = self.update_headers()
  155. @classmethod
  156. def update_headers(cls):
  157. headers = {
  158. 'Content-Type': 'application/json',
  159. 'Authorization': f'Bearer {cls.api_key}',
  160. 'Accept-Type-Allow-Int64': "true",
  161. 'RequestId': cls.uuid
  162. }
  163. return headers
  164. def vector_search(self, payload, db_name="default", timeout=10):
  165. time.sleep(1)
  166. url = f'{self.endpoint}/v2/vectordb/entities/search'
  167. if self.db_name is not None:
  168. payload["dbName"] = self.db_name
  169. if db_name != "default":
  170. payload["dbName"] = db_name
  171. response = self.post(url, headers=self.update_headers(), data=payload)
  172. rsp = response.json()
  173. if "data" in rsp and len(rsp["data"]) == 0:
  174. t0 = time.time()
  175. while time.time() - t0 < timeout:
  176. response = self.post(url, headers=self.update_headers(), data=payload)
  177. rsp = response.json()
  178. if len(rsp["data"]) > 0:
  179. break
  180. time.sleep(1)
  181. else:
  182. response = self.post(url, headers=self.update_headers(), data=payload)
  183. rsp = response.json()
  184. if "data" in rsp and len(rsp["data"]) == 0:
  185. logger.info(f"after {timeout}s, still no data")
  186. return response.json()
  187. def vector_advanced_search(self, payload, db_name="default", timeout=10):
  188. time.sleep(1)
  189. url = f'{self.endpoint}/v2/vectordb/entities/advanced_search'
  190. if self.db_name is not None:
  191. payload["dbName"] = self.db_name
  192. if db_name != "default":
  193. payload["dbName"] = db_name
  194. response = self.post(url, headers=self.update_headers(), data=payload)
  195. rsp = response.json()
  196. if "data" in rsp and len(rsp["data"]) == 0:
  197. t0 = time.time()
  198. while time.time() - t0 < timeout:
  199. response = self.post(url, headers=self.update_headers(), data=payload)
  200. rsp = response.json()
  201. if len(rsp["data"]) > 0:
  202. break
  203. time.sleep(1)
  204. else:
  205. response = self.post(url, headers=self.update_headers(), data=payload)
  206. rsp = response.json()
  207. if "data" in rsp and len(rsp["data"]) == 0:
  208. logger.info(f"after {timeout}s, still no data")
  209. return response.json()
  210. def vector_hybrid_search(self, payload, db_name="default", timeout=10):
  211. time.sleep(1)
  212. url = f'{self.endpoint}/v2/vectordb/entities/hybrid_search'
  213. if self.db_name is not None:
  214. payload["dbName"] = self.db_name
  215. if db_name != "default":
  216. payload["dbName"] = db_name
  217. response = self.post(url, headers=self.update_headers(), data=payload)
  218. rsp = response.json()
  219. if "data" in rsp and len(rsp["data"]) == 0:
  220. t0 = time.time()
  221. while time.time() - t0 < timeout:
  222. response = self.post(url, headers=self.update_headers(), data=payload)
  223. rsp = response.json()
  224. if len(rsp["data"]) > 0:
  225. break
  226. time.sleep(1)
  227. else:
  228. response = self.post(url, headers=self.update_headers(), data=payload)
  229. rsp = response.json()
  230. if "data" in rsp and len(rsp["data"]) == 0:
  231. logger.info(f"after {timeout}s, still no data")
  232. return response.json()
  233. def vector_query(self, payload, db_name="default", timeout=5):
  234. time.sleep(1)
  235. url = f'{self.endpoint}/v2/vectordb/entities/query'
  236. if self.db_name is not None:
  237. payload["dbName"] = self.db_name
  238. if db_name != "default":
  239. payload["dbName"] = db_name
  240. response = self.post(url, headers=self.update_headers(), data=payload)
  241. rsp = response.json()
  242. if "data" in rsp and len(rsp["data"]) == 0:
  243. t0 = time.time()
  244. while time.time() - t0 < timeout:
  245. response = self.post(url, headers=self.update_headers(), data=payload)
  246. rsp = response.json()
  247. if len(rsp["data"]) > 0:
  248. break
  249. time.sleep(1)
  250. else:
  251. response = self.post(url, headers=self.update_headers(), data=payload)
  252. rsp = response.json()
  253. if "data" in rsp and len(rsp["data"]) == 0:
  254. logger.info(f"after {timeout}s, still no data")
  255. return response.json()
  256. def vector_get(self, payload, db_name="default"):
  257. time.sleep(1)
  258. url = f'{self.endpoint}/v2/vectordb/entities/get'
  259. if self.db_name is not None:
  260. payload["dbName"] = self.db_name
  261. if db_name != "default":
  262. payload["dbName"] = db_name
  263. response = self.post(url, headers=self.update_headers(), data=payload)
  264. return response.json()
  265. def vector_delete(self, payload, db_name="default"):
  266. url = f'{self.endpoint}/v2/vectordb/entities/delete'
  267. if self.db_name is not None:
  268. payload["dbName"] = self.db_name
  269. if db_name != "default":
  270. payload["dbName"] = db_name
  271. response = self.post(url, headers=self.update_headers(), data=payload)
  272. return response.json()
  273. def vector_insert(self, payload, db_name="default"):
  274. url = f'{self.endpoint}/v2/vectordb/entities/insert'
  275. if self.db_name is not None:
  276. payload["dbName"] = self.db_name
  277. if db_name != "default":
  278. payload["dbName"] = db_name
  279. response = self.post(url, headers=self.update_headers(), data=payload)
  280. return response.json()
  281. def vector_upsert(self, payload, db_name="default"):
  282. url = f'{self.endpoint}/v2/vectordb/entities/upsert'
  283. if self.db_name is not None:
  284. payload["dbName"] = self.db_name
  285. if db_name != "default":
  286. payload["dbName"] = db_name
  287. response = self.post(url, headers=self.update_headers(), data=payload)
  288. return response.json()
  289. class CollectionClient(Requests):
  290. def __init__(self, endpoint, token):
  291. super().__init__(url=endpoint, api_key=token)
  292. self.endpoint = endpoint
  293. self.api_key = token
  294. self.db_name = None
  295. self.headers = self.update_headers()
  296. @classmethod
  297. def update_headers(cls, headers=None):
  298. if headers is not None:
  299. return headers
  300. headers = {
  301. 'Content-Type': 'application/json',
  302. 'Authorization': f'Bearer {cls.api_key}',
  303. 'RequestId': cls.uuid
  304. }
  305. return headers
  306. def collection_has(self, db_name="default", collection_name=None):
  307. url = f'{self.endpoint}/v2/vectordb/collections/has'
  308. if self.db_name is not None:
  309. db_name = self.db_name
  310. data = {
  311. "dbName": db_name,
  312. "collectionName": collection_name
  313. }
  314. response = self.post(url, headers=self.update_headers(), data=data)
  315. res = response.json()
  316. return res
  317. def collection_rename(self, payload, db_name="default"):
  318. url = f'{self.endpoint}/v2/vectordb/collections/rename'
  319. if self.db_name is not None:
  320. payload["dbName"] = self.db_name
  321. if db_name != "default":
  322. payload["dbName"] = db_name
  323. response = self.post(url, headers=self.update_headers(), data=payload)
  324. return response.json()
  325. def collection_stats(self, db_name="default", collection_name=None):
  326. url = f'{self.endpoint}/v2/vectordb/collections/get_stats'
  327. if self.db_name is not None:
  328. db_name = self.db_name
  329. data = {
  330. "dbName": db_name,
  331. "collectionName": collection_name
  332. }
  333. response = self.post(url, headers=self.update_headers(), data=data)
  334. res = response.json()
  335. return res
  336. def collection_load(self, db_name="default", collection_name=None):
  337. url = f'{self.endpoint}/v2/vectordb/collections/load'
  338. if self.db_name is not None:
  339. db_name = self.db_name
  340. payload = {
  341. "dbName": db_name,
  342. "collectionName": collection_name
  343. }
  344. response = self.post(url, headers=self.update_headers(), data=payload)
  345. res = response.json()
  346. return res
  347. def collection_release(self, db_name="default", collection_name=None):
  348. url = f'{self.endpoint}/v2/vectordb/collections/release'
  349. if self.db_name is not None:
  350. db_name = self.db_name
  351. payload = {
  352. "dbName": db_name,
  353. "collectionName": collection_name
  354. }
  355. response = self.post(url, headers=self.update_headers(), data=payload)
  356. res = response.json()
  357. return res
  358. def collection_load_state(self, db_name="default", collection_name=None, partition_names=None):
  359. url = f'{self.endpoint}/v2/vectordb/collections/get_load_state'
  360. if self.db_name is not None:
  361. db_name = self.db_name
  362. data = {
  363. "dbName": db_name,
  364. "collectionName": collection_name,
  365. }
  366. if partition_names is not None:
  367. data["partitionNames"] = partition_names
  368. response = self.post(url, headers=self.update_headers(), data=data)
  369. res = response.json()
  370. return res
  371. def collection_list(self, db_name="default"):
  372. url = f'{self.endpoint}/v2/vectordb/collections/list'
  373. params = {}
  374. if self.db_name is not None:
  375. params = {
  376. "dbName": self.db_name
  377. }
  378. if db_name != "default":
  379. params = {
  380. "dbName": db_name
  381. }
  382. response = self.post(url, headers=self.update_headers(), params=params)
  383. res = response.json()
  384. return res
  385. def collection_create(self, payload, db_name="default"):
  386. time.sleep(1) # wait for collection created and in case of rate limit
  387. url = f'{self.endpoint}/v2/vectordb/collections/create'
  388. if self.db_name is not None:
  389. payload["dbName"] = self.db_name
  390. if db_name != "default":
  391. payload["dbName"] = db_name
  392. if not ("params" in payload and "consistencyLevel" in payload["params"]):
  393. if "params" not in payload:
  394. payload["params"] = {}
  395. payload["params"]["consistencyLevel"] = "Strong"
  396. response = self.post(url, headers=self.update_headers(), data=payload)
  397. return response.json()
  398. def collection_describe(self, collection_name, db_name="default"):
  399. url = f'{self.endpoint}/v2/vectordb/collections/describe'
  400. data = {"collectionName": collection_name}
  401. if self.db_name is not None:
  402. data = {
  403. "collectionName": collection_name,
  404. "dbName": self.db_name
  405. }
  406. if db_name != "default":
  407. data = {
  408. "collectionName": collection_name,
  409. "dbName": db_name
  410. }
  411. response = self.post(url, headers=self.update_headers(), data=data)
  412. return response.json()
  413. def collection_drop(self, payload, db_name="default"):
  414. time.sleep(1) # wait for collection drop and in case of rate limit
  415. url = f'{self.endpoint}/v2/vectordb/collections/drop'
  416. if self.db_name is not None:
  417. payload["dbName"] = self.db_name
  418. if db_name != "default":
  419. payload["dbName"] = db_name
  420. response = self.post(url, headers=self.update_headers(), data=payload)
  421. return response.json()
  422. class PartitionClient(Requests):
  423. def __init__(self, endpoint, token):
  424. super().__init__(url=endpoint, api_key=token)
  425. self.endpoint = endpoint
  426. self.api_key = token
  427. self.db_name = None
  428. self.headers = self.update_headers()
  429. @classmethod
  430. def update_headers(cls):
  431. headers = {
  432. 'Content-Type': 'application/json',
  433. 'Authorization': f'Bearer {cls.api_key}',
  434. 'RequestId': cls.uuid
  435. }
  436. return headers
  437. def partition_list(self, db_name="default", collection_name=None):
  438. url = f'{self.endpoint}/v2/vectordb/partitions/list'
  439. data = {
  440. "collectionName": collection_name
  441. }
  442. if self.db_name is not None:
  443. data = {
  444. "dbName": self.db_name,
  445. "collectionName": collection_name
  446. }
  447. if db_name != "default":
  448. data = {
  449. "dbName": db_name,
  450. "collectionName": collection_name
  451. }
  452. response = self.post(url, headers=self.update_headers(), data=data)
  453. res = response.json()
  454. return res
  455. def partition_create(self, db_name="default", collection_name=None, partition_name=None):
  456. url = f'{self.endpoint}/v2/vectordb/partitions/create'
  457. if self.db_name is not None:
  458. db_name = self.db_name
  459. payload = {
  460. "dbName": db_name,
  461. "collectionName": collection_name,
  462. "partitionName": partition_name
  463. }
  464. response = self.post(url, headers=self.update_headers(), data=payload)
  465. res = response.json()
  466. return res
  467. def partition_drop(self, db_name="default", collection_name=None, partition_name=None):
  468. url = f'{self.endpoint}/v2/vectordb/partitions/drop'
  469. if self.db_name is not None:
  470. db_name = self.db_name
  471. payload = {
  472. "dbName": db_name,
  473. "collectionName": collection_name,
  474. "partitionName": partition_name
  475. }
  476. response = self.post(url, headers=self.update_headers(), data=payload)
  477. res = response.json()
  478. return res
  479. def partition_load(self, db_name="default", collection_name=None, partition_names=None):
  480. url = f'{self.endpoint}/v2/vectordb/partitions/load'
  481. if self.db_name is not None:
  482. db_name = self.db_name
  483. payload = {
  484. "dbName": db_name,
  485. "collectionName": collection_name,
  486. "partitionNames": partition_names
  487. }
  488. response = self.post(url, headers=self.update_headers(), data=payload)
  489. res = response.json()
  490. return res
  491. def partition_release(self, db_name="default", collection_name=None, partition_names=None):
  492. url = f'{self.endpoint}/v2/vectordb/partitions/release'
  493. if self.db_name is not None:
  494. db_name = self.db_name
  495. payload = {
  496. "dbName": db_name,
  497. "collectionName": collection_name,
  498. "partitionNames": partition_names
  499. }
  500. response = self.post(url, headers=self.update_headers(), data=payload)
  501. res = response.json()
  502. return res
  503. def partition_has(self, db_name="default", collection_name=None, partition_name=None):
  504. url = f'{self.endpoint}/v2/vectordb/partitions/has'
  505. if self.db_name is not None:
  506. db_name = self.db_name
  507. data = {
  508. "dbName": db_name,
  509. "collectionName": collection_name,
  510. "partitionName": partition_name
  511. }
  512. response = self.post(url, headers=self.update_headers(), data=data)
  513. res = response.json()
  514. return res
  515. def partition_stats(self, db_name="default", collection_name=None, partition_name=None):
  516. url = f'{self.endpoint}/v2/vectordb/partitions/get_stats'
  517. if self.db_name is not None:
  518. db_name = self.db_name
  519. data = {
  520. "dbName": db_name,
  521. "collectionName": collection_name,
  522. "partitionName": partition_name
  523. }
  524. response = self.post(url, headers=self.update_headers(), data=data)
  525. res = response.json()
  526. return res
  527. class UserClient(Requests):
  528. def __init__(self, endpoint, token):
  529. super().__init__(url=endpoint, api_key=token)
  530. self.endpoint = endpoint
  531. self.api_key = token
  532. self.db_name = None
  533. self.headers = self.update_headers()
  534. @classmethod
  535. def update_headers(cls):
  536. headers = {
  537. 'Content-Type': 'application/json',
  538. 'Authorization': f'Bearer {cls.api_key}',
  539. 'RequestId': cls.uuid
  540. }
  541. return headers
  542. def user_list(self):
  543. url = f'{self.endpoint}/v2/vectordb/users/list'
  544. response = self.post(url, headers=self.update_headers())
  545. res = response.json()
  546. return res
  547. def user_create(self, payload):
  548. url = f'{self.endpoint}/v2/vectordb/users/create'
  549. response = self.post(url, headers=self.update_headers(), data=payload)
  550. res = response.json()
  551. return res
  552. def user_password_update(self, payload):
  553. url = f'{self.endpoint}/v2/vectordb/users/update_password'
  554. response = self.post(url, headers=self.update_headers(), data=payload)
  555. res = response.json()
  556. return res
  557. def user_describe(self, user_name):
  558. url = f'{self.endpoint}/v2/vectordb/users/describe'
  559. data = {
  560. "userName": user_name
  561. }
  562. response = self.post(url, headers=self.update_headers(), data=data)
  563. res = response.json()
  564. return res
  565. def user_drop(self, payload):
  566. url = f'{self.endpoint}/v2/vectordb/users/drop'
  567. response = self.post(url, headers=self.update_headers(), data=payload)
  568. res = response.json()
  569. return res
  570. def user_grant(self, payload):
  571. url = f'{self.endpoint}/v2/vectordb/users/grant_role'
  572. response = self.post(url, headers=self.update_headers(), data=payload)
  573. res = response.json()
  574. return res
  575. def user_revoke(self, payload):
  576. url = f'{self.endpoint}/v2/vectordb/users/revoke_role'
  577. response = self.post(url, headers=self.update_headers(), data=payload)
  578. res = response.json()
  579. return res
  580. class RoleClient(Requests):
  581. def __init__(self, endpoint, token):
  582. super().__init__(url=endpoint, api_key=token)
  583. self.endpoint = endpoint
  584. self.api_key = token
  585. self.db_name = None
  586. self.headers = self.update_headers()
  587. self.role_names = []
  588. @classmethod
  589. def update_headers(cls):
  590. headers = {
  591. 'Content-Type': 'application/json',
  592. 'Authorization': f'Bearer {cls.api_key}',
  593. 'RequestId': cls.uuid
  594. }
  595. return headers
  596. def role_list(self):
  597. url = f'{self.endpoint}/v2/vectordb/roles/list'
  598. response = self.post(url, headers=self.update_headers())
  599. res = response.json()
  600. return res
  601. def role_create(self, payload):
  602. url = f'{self.endpoint}/v2/vectordb/roles/create'
  603. response = self.post(url, headers=self.update_headers(), data=payload)
  604. res = response.json()
  605. if res["code"] == 0:
  606. self.role_names.append(payload["roleName"])
  607. return res
  608. def role_describe(self, role_name):
  609. url = f'{self.endpoint}/v2/vectordb/roles/describe'
  610. data = {
  611. "roleName": role_name
  612. }
  613. response = self.post(url, headers=self.update_headers(), data=data)
  614. res = response.json()
  615. return res
  616. def role_drop(self, payload):
  617. url = f'{self.endpoint}/v2/vectordb/roles/drop'
  618. response = self.post(url, headers=self.update_headers(), data=payload)
  619. res = response.json()
  620. return res
  621. def role_grant(self, payload):
  622. url = f'{self.endpoint}/v2/vectordb/roles/grant_privilege'
  623. response = self.post(url, headers=self.update_headers(), data=payload)
  624. res = response.json()
  625. return res
  626. def role_revoke(self, payload):
  627. url = f'{self.endpoint}/v2/vectordb/roles/revoke_privilege'
  628. response = self.post(url, headers=self.update_headers(), data=payload)
  629. res = response.json()
  630. return res
  631. class IndexClient(Requests):
  632. def __init__(self, endpoint, token):
  633. super().__init__(url=endpoint, api_key=token)
  634. self.endpoint = endpoint
  635. self.api_key = token
  636. self.db_name = None
  637. self.headers = self.update_headers()
  638. @classmethod
  639. def update_headers(cls):
  640. headers = {
  641. 'Content-Type': 'application/json',
  642. 'Authorization': f'Bearer {cls.api_key}',
  643. 'RequestId': cls.uuid
  644. }
  645. return headers
  646. def index_create(self, payload, db_name="default"):
  647. url = f'{self.endpoint}/v2/vectordb/indexes/create'
  648. if self.db_name is not None:
  649. db_name = self.db_name
  650. payload["dbName"] = db_name
  651. response = self.post(url, headers=self.update_headers(), data=payload)
  652. res = response.json()
  653. return res
  654. def index_describe(self, db_name="default", collection_name=None, index_name=None):
  655. url = f'{self.endpoint}/v2/vectordb/indexes/describe'
  656. if self.db_name is not None:
  657. db_name = self.db_name
  658. data = {
  659. "dbName": db_name,
  660. "collectionName": collection_name,
  661. "indexName": index_name
  662. }
  663. response = self.post(url, headers=self.update_headers(), data=data)
  664. res = response.json()
  665. return res
  666. def index_list(self, collection_name=None, db_name="default"):
  667. url = f'{self.endpoint}/v2/vectordb/indexes/list'
  668. if self.db_name is not None:
  669. db_name = self.db_name
  670. data = {
  671. "dbName": db_name,
  672. "collectionName": collection_name
  673. }
  674. response = self.post(url, headers=self.update_headers(), data=data)
  675. res = response.json()
  676. return res
  677. def index_drop(self, payload, db_name="default"):
  678. url = f'{self.endpoint}/v2/vectordb/indexes/drop'
  679. if self.db_name is not None:
  680. db_name = self.db_name
  681. payload["dbName"] = db_name
  682. response = self.post(url, headers=self.update_headers(), data=payload)
  683. res = response.json()
  684. return res
  685. class AliasClient(Requests):
  686. def __init__(self, endpoint, token):
  687. super().__init__(url=endpoint, api_key=token)
  688. self.endpoint = endpoint
  689. self.api_key = token
  690. self.db_name = None
  691. self.headers = self.update_headers()
  692. @classmethod
  693. def update_headers(cls):
  694. headers = {
  695. 'Content-Type': 'application/json',
  696. 'Authorization': f'Bearer {cls.api_key}',
  697. 'RequestId': cls.uuid
  698. }
  699. return headers
  700. def list_alias(self):
  701. url = f'{self.endpoint}/v2/vectordb/aliases/list'
  702. response = self.post(url, headers=self.update_headers())
  703. res = response.json()
  704. return res
  705. def describe_alias(self, alias_name):
  706. url = f'{self.endpoint}/v2/vectordb/aliases/describe'
  707. data = {
  708. "aliasName": alias_name
  709. }
  710. response = self.post(url, headers=self.update_headers(), data=data)
  711. res = response.json()
  712. return res
  713. def alter_alias(self, payload):
  714. url = f'{self.endpoint}/v2/vectordb/aliases/alter'
  715. response = self.post(url, headers=self.update_headers(), data=payload)
  716. res = response.json()
  717. return res
  718. def drop_alias(self, payload):
  719. url = f'{self.endpoint}/v2/vectordb/aliases/drop'
  720. response = self.post(url, headers=self.update_headers(), data=payload)
  721. res = response.json()
  722. return res
  723. def create_alias(self, payload):
  724. url = f'{self.endpoint}/v2/vectordb/aliases/create'
  725. response = self.post(url, headers=self.update_headers(), data=payload)
  726. res = response.json()
  727. return res
  728. class ImportJobClient(Requests):
  729. def __init__(self, endpoint, token):
  730. super().__init__(url=endpoint, api_key=token)
  731. self.endpoint = endpoint
  732. self.api_key = token
  733. self.db_name = None
  734. self.headers = self.update_headers()
  735. @classmethod
  736. def update_headers(cls):
  737. headers = {
  738. 'Content-Type': 'application/json',
  739. 'Authorization': f'Bearer {cls.api_key}',
  740. 'RequestId': cls.uuid
  741. }
  742. return headers
  743. def list_import_jobs(self, payload, db_name="default"):
  744. if self.db_name is not None:
  745. db_name = self.db_name
  746. payload["dbName"] = db_name
  747. if db_name is None:
  748. payload.pop("dbName")
  749. url = f'{self.endpoint}/v2/vectordb/jobs/import/list'
  750. response = self.post(url, headers=self.update_headers(), data=payload)
  751. res = response.json()
  752. return res
  753. def create_import_jobs(self, payload, db_name="default"):
  754. if self.db_name is not None:
  755. db_name = self.db_name
  756. url = f'{self.endpoint}/v2/vectordb/jobs/import/create'
  757. payload["dbName"] = db_name
  758. response = self.post(url, headers=self.update_headers(), data=payload)
  759. res = response.json()
  760. return res
  761. def get_import_job_progress(self, job_id, db_name="default"):
  762. if self.db_name is not None:
  763. db_name = self.db_name
  764. payload = {
  765. "dbName": db_name,
  766. "jobID": job_id
  767. }
  768. if db_name is None:
  769. payload.pop("dbName")
  770. if job_id is None:
  771. payload.pop("jobID")
  772. url = f'{self.endpoint}/v2/vectordb/jobs/import/get_progress'
  773. response = self.post(url, headers=self.update_headers(), data=payload)
  774. res = response.json()
  775. return res
  776. def wait_import_job_completed(self, job_id):
  777. finished = False
  778. t0 = time.time()
  779. rsp = self.get_import_job_progress(job_id)
  780. while not finished:
  781. rsp = self.get_import_job_progress(job_id)
  782. if rsp['data']['state'] == "Completed":
  783. finished = True
  784. time.sleep(5)
  785. if time.time() - t0 > 120:
  786. break
  787. return rsp, finished
  788. class StorageClient():
  789. def __init__(self, endpoint, access_key, secret_key, bucket_name, root_path="file"):
  790. self.endpoint = endpoint
  791. self.access_key = access_key
  792. self.secret_key = secret_key
  793. self.bucket_name = bucket_name
  794. self.root_path = root_path
  795. self.client = Minio(
  796. self.endpoint,
  797. access_key=access_key,
  798. secret_key=secret_key,
  799. secure=False,
  800. )
  801. def upload_file(self, file_path, object_name):
  802. try:
  803. self.client.fput_object(self.bucket_name, object_name, file_path)
  804. except S3Error as exc:
  805. logger.error("fail to copy files to minio", exc)
  806. def copy_file(self, src_bucket, src_object, dst_bucket, dst_object):
  807. try:
  808. # if dst bucket not exist, create it
  809. if not self.client.bucket_exists(dst_bucket):
  810. self.client.make_bucket(dst_bucket)
  811. self.client.copy_object(dst_bucket, dst_object, CopySource(src_bucket, src_object))
  812. except S3Error as exc:
  813. logger.error("fail to copy files to minio", exc)
  814. def get_collection_binlog(self, collection_id):
  815. dir_list = [
  816. "delta_log",
  817. "insert_log"
  818. ]
  819. binlog_list = []
  820. # list objects dir/collection_id in bucket
  821. for dir in dir_list:
  822. prefix = f"{self.root_path}/{dir}/{collection_id}/"
  823. objects = self.client.list_objects(self.bucket_name, prefix=prefix)
  824. for obj in objects:
  825. binlog_list.append(f"{self.bucket_name}/{obj.object_name}")
  826. print(binlog_list)
  827. return binlog_list
  828. if __name__ == "__main__":
  829. sc = StorageClient(
  830. endpoint="10.104.19.57:9000",
  831. access_key="minioadmin",
  832. secret_key="minioadmin",
  833. bucket_name="milvus-bucket"
  834. )
  835. sc.get_collection_binlog("448305293023730313")