123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953 |
- import json
- import requests
- import time
- import uuid
- from utils.util_log import test_log as logger
- from minio import Minio
- from minio.error import S3Error
- from minio.commonconfig import CopySource
- from tenacity import retry, retry_if_exception_type, stop_after_attempt
- from requests.exceptions import ConnectionError
- import urllib.parse
- ENABLE_LOG_SAVE = False
- def simplify_list(lst):
- if len(lst) > 20:
- return [lst[0], '...', lst[-1]]
- return lst
- def simplify_dict(d):
- if d is None:
- d = {}
- if len(d) > 20:
- keys = list(d.keys())
- d = {keys[0]: d[keys[0]], '...': '...', keys[-1]: d[keys[-1]]}
- simplified = {}
- for k, v in d.items():
- if isinstance(v, list):
- simplified[k] = simplify_list([simplify_dict(item) if isinstance(item, dict) else simplify_list(
- item) if isinstance(item, list) else item for item in v])
- elif isinstance(v, dict):
- simplified[k] = simplify_dict(v)
- else:
- simplified[k] = v
- return simplified
- def build_curl_command(method, url, headers, data=None, params=None):
- if isinstance(params, dict):
- query_string = urllib.parse.urlencode(params)
- url = f"{url}?{query_string}"
- curl_cmd = [f"curl -X {method} '{url}'"]
- for key, value in headers.items():
- curl_cmd.append(f" -H '{key}: {value}'")
- if data:
- # process_and_simplify(data)
- data = json.dumps(data, indent=4)
- curl_cmd.append(f" -d '{data}'")
- return " \\\n".join(curl_cmd)
- def logger_request_response(response, url, tt, headers, data, str_data, str_response, method, params=None):
- # save data to jsonl file
- data_dict = json.loads(data) if data else {}
- data_dict_simple = simplify_dict(data_dict)
- if ENABLE_LOG_SAVE:
- with open('request_response.jsonl', 'a') as f:
- f.write(json.dumps({
- "method": method,
- "url": url,
- "headers": headers,
- "params": params,
- "data": data_dict_simple,
- "response": response.json()
- }) + "\n")
- data = json.dumps(data_dict_simple, indent=4)
- try:
- if response.status_code == 200:
- if ('code' in response.json() and response.json()["code"] == 0) or (
- 'Code' in response.json() and response.json()["Code"] == 0):
- logger.debug(
- f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {str_response}")
- else:
- logger.debug(
- f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
- else:
- logger.debug(
- f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
- except Exception as e:
- logger.debug(
- f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}, \nerror: {e}")
- class Requests():
- uuid = str(uuid.uuid1())
- api_key = None
- def __init__(self, url=None, api_key=None):
- self.url = url
- self.api_key = api_key
- if self.uuid is None:
- self.uuid = str(uuid.uuid1())
- self.headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {self.api_key}',
- 'RequestId': self.uuid
- }
- @classmethod
- def update_uuid(cls, _uuid):
- cls.uuid = _uuid
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- # retry when request failed caused by network or server error
- @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
- def post(self, url, headers=None, data=None, params=None):
- headers = headers if headers is not None else self.update_headers()
- data = json.dumps(data)
- str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
- t0 = time.time()
- response = requests.post(url, headers=headers, data=data, params=params)
- tt = time.time() - t0
- str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
- logger_request_response(response, url, tt, headers, data, str_data, str_response, "post", params=params)
- return response
- @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
- def get(self, url, headers=None, params=None, data=None):
- headers = headers if headers is not None else self.update_headers()
- data = json.dumps(data)
- str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
- t0 = time.time()
- if data is None or data == "null":
- response = requests.get(url, headers=headers, params=params)
- else:
- response = requests.get(url, headers=headers, params=params, data=data)
- tt = time.time() - t0
- str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
- logger_request_response(response, url, tt, headers, data, str_data, str_response, "get", params=params)
- return response
- @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
- def put(self, url, headers=None, data=None):
- headers = headers if headers is not None else self.update_headers()
- data = json.dumps(data)
- str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
- t0 = time.time()
- response = requests.put(url, headers=headers, data=data)
- tt = time.time() - t0
- str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
- logger_request_response(response, url, tt, headers, data, str_data, str_response, "put")
- return response
- @retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
- def delete(self, url, headers=None, data=None):
- headers = headers if headers is not None else self.update_headers()
- data = json.dumps(data)
- str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
- t0 = time.time()
- response = requests.delete(url, headers=headers, data=data)
- tt = time.time() - t0
- str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
- logger_request_response(response, url, tt, headers, data, str_data, str_response, "delete")
- return response
- class VectorClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.token = token
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'Accept-Type-Allow-Int64': "true",
- 'RequestId': cls.uuid
- }
- return headers
- def vector_search(self, payload, db_name="default", timeout=10):
- time.sleep(1)
- url = f'{self.endpoint}/v2/vectordb/entities/search'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- t0 = time.time()
- while time.time() - t0 < timeout:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if len(rsp["data"]) > 0:
- break
- time.sleep(1)
- else:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- logger.info(f"after {timeout}s, still no data")
- return response.json()
- def vector_advanced_search(self, payload, db_name="default", timeout=10):
- time.sleep(1)
- url = f'{self.endpoint}/v2/vectordb/entities/advanced_search'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- t0 = time.time()
- while time.time() - t0 < timeout:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if len(rsp["data"]) > 0:
- break
- time.sleep(1)
- else:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- logger.info(f"after {timeout}s, still no data")
- return response.json()
- def vector_hybrid_search(self, payload, db_name="default", timeout=10):
- time.sleep(1)
- url = f'{self.endpoint}/v2/vectordb/entities/hybrid_search'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- t0 = time.time()
- while time.time() - t0 < timeout:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if len(rsp["data"]) > 0:
- break
- time.sleep(1)
- else:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- logger.info(f"after {timeout}s, still no data")
- return response.json()
- def vector_query(self, payload, db_name="default", timeout=5):
- time.sleep(1)
- url = f'{self.endpoint}/v2/vectordb/entities/query'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- t0 = time.time()
- while time.time() - t0 < timeout:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if len(rsp["data"]) > 0:
- break
- time.sleep(1)
- else:
- response = self.post(url, headers=self.update_headers(), data=payload)
- rsp = response.json()
- if "data" in rsp and len(rsp["data"]) == 0:
- logger.info(f"after {timeout}s, still no data")
- return response.json()
- def vector_get(self, payload, db_name="default"):
- time.sleep(1)
- url = f'{self.endpoint}/v2/vectordb/entities/get'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- def vector_delete(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/entities/delete'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- def vector_insert(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/entities/insert'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- def vector_upsert(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/entities/upsert'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- class CollectionClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls, headers=None):
- if headers is not None:
- return headers
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def collection_has(self, db_name="default", collection_name=None):
- url = f'{self.endpoint}/v2/vectordb/collections/has'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def collection_rename(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/collections/rename'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- def collection_stats(self, db_name="default", collection_name=None):
- url = f'{self.endpoint}/v2/vectordb/collections/get_stats'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def collection_load(self, db_name="default", collection_name=None):
- url = f'{self.endpoint}/v2/vectordb/collections/load'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def collection_release(self, db_name="default", collection_name=None):
- url = f'{self.endpoint}/v2/vectordb/collections/release'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def collection_load_state(self, db_name="default", collection_name=None, partition_names=None):
- url = f'{self.endpoint}/v2/vectordb/collections/get_load_state'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name,
- }
- if partition_names is not None:
- data["partitionNames"] = partition_names
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def collection_list(self, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/collections/list'
- params = {}
- if self.db_name is not None:
- params = {
- "dbName": self.db_name
- }
- if db_name != "default":
- params = {
- "dbName": db_name
- }
- response = self.post(url, headers=self.update_headers(), params=params)
- res = response.json()
- return res
- def collection_create(self, payload, db_name="default"):
- time.sleep(1) # wait for collection created and in case of rate limit
- url = f'{self.endpoint}/v2/vectordb/collections/create'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- if not ("params" in payload and "consistencyLevel" in payload["params"]):
- if "params" not in payload:
- payload["params"] = {}
- payload["params"]["consistencyLevel"] = "Strong"
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- def collection_describe(self, collection_name, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/collections/describe'
- data = {"collectionName": collection_name}
- if self.db_name is not None:
- data = {
- "collectionName": collection_name,
- "dbName": self.db_name
- }
- if db_name != "default":
- data = {
- "collectionName": collection_name,
- "dbName": db_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- return response.json()
- def collection_drop(self, payload, db_name="default"):
- time.sleep(1) # wait for collection drop and in case of rate limit
- url = f'{self.endpoint}/v2/vectordb/collections/drop'
- if self.db_name is not None:
- payload["dbName"] = self.db_name
- if db_name != "default":
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- return response.json()
- class PartitionClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def partition_list(self, db_name="default", collection_name=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/list'
- data = {
- "collectionName": collection_name
- }
- if self.db_name is not None:
- data = {
- "dbName": self.db_name,
- "collectionName": collection_name
- }
- if db_name != "default":
- data = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def partition_create(self, db_name="default", collection_name=None, partition_name=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/create'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionName": partition_name
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def partition_drop(self, db_name="default", collection_name=None, partition_name=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/drop'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionName": partition_name
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def partition_load(self, db_name="default", collection_name=None, partition_names=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/load'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionNames": partition_names
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def partition_release(self, db_name="default", collection_name=None, partition_names=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/release'
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionNames": partition_names
- }
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def partition_has(self, db_name="default", collection_name=None, partition_name=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/has'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionName": partition_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def partition_stats(self, db_name="default", collection_name=None, partition_name=None):
- url = f'{self.endpoint}/v2/vectordb/partitions/get_stats'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name,
- "partitionName": partition_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- class UserClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def user_list(self):
- url = f'{self.endpoint}/v2/vectordb/users/list'
- response = self.post(url, headers=self.update_headers())
- res = response.json()
- return res
- def user_create(self, payload):
- url = f'{self.endpoint}/v2/vectordb/users/create'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def user_password_update(self, payload):
- url = f'{self.endpoint}/v2/vectordb/users/update_password'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def user_describe(self, user_name):
- url = f'{self.endpoint}/v2/vectordb/users/describe'
- data = {
- "userName": user_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def user_drop(self, payload):
- url = f'{self.endpoint}/v2/vectordb/users/drop'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def user_grant(self, payload):
- url = f'{self.endpoint}/v2/vectordb/users/grant_role'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def user_revoke(self, payload):
- url = f'{self.endpoint}/v2/vectordb/users/revoke_role'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- class RoleClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- self.role_names = []
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def role_list(self):
- url = f'{self.endpoint}/v2/vectordb/roles/list'
- response = self.post(url, headers=self.update_headers())
- res = response.json()
- return res
- def role_create(self, payload):
- url = f'{self.endpoint}/v2/vectordb/roles/create'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- if res["code"] == 0:
- self.role_names.append(payload["roleName"])
- return res
- def role_describe(self, role_name):
- url = f'{self.endpoint}/v2/vectordb/roles/describe'
- data = {
- "roleName": role_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def role_drop(self, payload):
- url = f'{self.endpoint}/v2/vectordb/roles/drop'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def role_grant(self, payload):
- url = f'{self.endpoint}/v2/vectordb/roles/grant_privilege'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def role_revoke(self, payload):
- url = f'{self.endpoint}/v2/vectordb/roles/revoke_privilege'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- class IndexClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def index_create(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/indexes/create'
- if self.db_name is not None:
- db_name = self.db_name
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def index_describe(self, db_name="default", collection_name=None, index_name=None):
- url = f'{self.endpoint}/v2/vectordb/indexes/describe'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name,
- "indexName": index_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def index_list(self, collection_name=None, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/indexes/list'
- if self.db_name is not None:
- db_name = self.db_name
- data = {
- "dbName": db_name,
- "collectionName": collection_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def index_drop(self, payload, db_name="default"):
- url = f'{self.endpoint}/v2/vectordb/indexes/drop'
- if self.db_name is not None:
- db_name = self.db_name
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- class AliasClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def list_alias(self):
- url = f'{self.endpoint}/v2/vectordb/aliases/list'
- response = self.post(url, headers=self.update_headers())
- res = response.json()
- return res
- def describe_alias(self, alias_name):
- url = f'{self.endpoint}/v2/vectordb/aliases/describe'
- data = {
- "aliasName": alias_name
- }
- response = self.post(url, headers=self.update_headers(), data=data)
- res = response.json()
- return res
- def alter_alias(self, payload):
- url = f'{self.endpoint}/v2/vectordb/aliases/alter'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def drop_alias(self, payload):
- url = f'{self.endpoint}/v2/vectordb/aliases/drop'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def create_alias(self, payload):
- url = f'{self.endpoint}/v2/vectordb/aliases/create'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- class ImportJobClient(Requests):
- def __init__(self, endpoint, token):
- super().__init__(url=endpoint, api_key=token)
- self.endpoint = endpoint
- self.api_key = token
- self.db_name = None
- self.headers = self.update_headers()
- @classmethod
- def update_headers(cls):
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Bearer {cls.api_key}',
- 'RequestId': cls.uuid
- }
- return headers
- def list_import_jobs(self, payload, db_name="default"):
- if self.db_name is not None:
- db_name = self.db_name
- payload["dbName"] = db_name
- if db_name is None:
- payload.pop("dbName")
- url = f'{self.endpoint}/v2/vectordb/jobs/import/list'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def create_import_jobs(self, payload, db_name="default"):
- if self.db_name is not None:
- db_name = self.db_name
- url = f'{self.endpoint}/v2/vectordb/jobs/import/create'
- payload["dbName"] = db_name
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def get_import_job_progress(self, job_id, db_name="default"):
- if self.db_name is not None:
- db_name = self.db_name
- payload = {
- "dbName": db_name,
- "jobID": job_id
- }
- if db_name is None:
- payload.pop("dbName")
- if job_id is None:
- payload.pop("jobID")
- url = f'{self.endpoint}/v2/vectordb/jobs/import/get_progress'
- response = self.post(url, headers=self.update_headers(), data=payload)
- res = response.json()
- return res
- def wait_import_job_completed(self, job_id):
- finished = False
- t0 = time.time()
- rsp = self.get_import_job_progress(job_id)
- while not finished:
- rsp = self.get_import_job_progress(job_id)
- if rsp['data']['state'] == "Completed":
- finished = True
- time.sleep(5)
- if time.time() - t0 > 120:
- break
- return rsp, finished
- class StorageClient():
- def __init__(self, endpoint, access_key, secret_key, bucket_name, root_path="file"):
- self.endpoint = endpoint
- self.access_key = access_key
- self.secret_key = secret_key
- self.bucket_name = bucket_name
- self.root_path = root_path
- self.client = Minio(
- self.endpoint,
- access_key=access_key,
- secret_key=secret_key,
- secure=False,
- )
- def upload_file(self, file_path, object_name):
- try:
- self.client.fput_object(self.bucket_name, object_name, file_path)
- except S3Error as exc:
- logger.error("fail to copy files to minio", exc)
- def copy_file(self, src_bucket, src_object, dst_bucket, dst_object):
- try:
- # if dst bucket not exist, create it
- if not self.client.bucket_exists(dst_bucket):
- self.client.make_bucket(dst_bucket)
- self.client.copy_object(dst_bucket, dst_object, CopySource(src_bucket, src_object))
- except S3Error as exc:
- logger.error("fail to copy files to minio", exc)
- def get_collection_binlog(self, collection_id):
- dir_list = [
- "delta_log",
- "insert_log"
- ]
- binlog_list = []
- # list objects dir/collection_id in bucket
- for dir in dir_list:
- prefix = f"{self.root_path}/{dir}/{collection_id}/"
- objects = self.client.list_objects(self.bucket_name, prefix=prefix)
- for obj in objects:
- binlog_list.append(f"{self.bucket_name}/{obj.object_name}")
- print(binlog_list)
- return binlog_list
- if __name__ == "__main__":
- sc = StorageClient(
- endpoint="10.104.19.57:9000",
- access_key="minioadmin",
- secret_key="minioadmin",
- bucket_name="milvus-bucket"
- )
- sc.get_collection_binlog("448305293023730313")
|