milvus_operator.py 9.4 KB


  1. import json
  2. import os
  3. import time
  4. from benedict import benedict
  5. from utils.util_log import test_log as log
  6. from utils.util_k8s import get_pod_ip_name_pairs
  7. from common.cus_resource_opts import CustomResourceOperations as CusResource
  8. template_yaml = os.path.join(os.path.dirname(__file__), 'template/default.yaml')
  9. MILVUS_GRP = 'milvus.io'
  10. # MILVUS_VER = 'v1alpha1'
  11. MILVUS_VER = 'v1beta1'
  12. # MILVUS_PLURAL = 'milvusclusters'
  13. MILVUS_PLURAL = 'milvuses'
  14. # MILVUS_KIND = 'MilvusCluster'
  15. MILVUS_KIND = 'Milvus'
  16. class MilvusOperator(object):
  17. def __init__(self):
  18. self.group = MILVUS_GRP
  19. self.version = MILVUS_VER
  20. self.plural = MILVUS_PLURAL.lower()
  21. @staticmethod
  22. def _update_configs(configs, template=None):
  23. """
  24. Method: update the template with customized configs
  25. Params:
  26. configs: a dict type of configurations that describe the properties of milvus to be deployed
  27. template: Optional. Pass the template file location if there is a template to apply
  28. Return: a dict type customized configs
  29. """
  30. if not isinstance(configs, dict):
  31. log.error("customize configurations must be in dict type")
  32. return None
  33. if template is None:
  34. # d_configs = benedict()
  35. log.debug(f'template yaml {template_yaml}')
  36. d_configs = benedict.from_yaml(template_yaml)
  37. d_configs['apiVersion'] = f'{MILVUS_GRP}/{MILVUS_VER}'
  38. d_configs['kind'] = MILVUS_KIND
  39. else:
  40. d_configs = benedict.from_yaml(template)
  41. for key in configs.keys():
  42. d_configs[key] = configs[key]
  43. # return a python dict if it is not none
  44. return d_configs._dict if d_configs._dict is not None else d_configs
  45. def install(self, configs, template=None):
  46. """
  47. Method: apply a custom resource object to install milvus
  48. Params:
  49. configs: a dict type of configurations that describe the properties of milvus to be deployed
  50. template: Optional. Pass the template file location if there is a template to apply
  51. Return: custom resource object instance
  52. """
  53. new_configs = self._update_configs(configs, template)
  54. log.debug(new_configs)
  55. namespace = new_configs['metadata'].get('namespace', 'default')
  56. # apply custom resource object to deploy milvus
  57. cus_res = CusResource(kind=self.plural, group=self.group,
  58. version=self.version, namespace=namespace)
  59. log.info(f'install milvus with configs: {json.dumps(new_configs, indent=4)}')
  60. return cus_res.create(new_configs)
  61. def uninstall(self, release_name, namespace='default', delete_depends=True, delete_pvc=True):
  62. """
  63. Method: delete custom resource object to uninstall milvus
  64. Params:
  65. release_name: release name of milvus
  66. namespace: namespace that the milvus is running in
  67. delete_depends: whether to delete the dependent etcd, pulsar and minio services. default: True
  68. delete_pvc: whether to delete the data persistent pvc volumes. default: True
  69. """
  70. cus_res = CusResource(kind=self.plural, group=self.group,
  71. version=self.version, namespace=namespace)
  72. del_configs = {}
  73. if delete_depends:
  74. del_configs = {'spec.dependencies.etcd.inCluster.deletionPolicy': 'Delete',
  75. 'spec.dependencies.pulsar.inCluster.deletionPolicy': 'Delete',
  76. 'spec.dependencies.kafka.inCluster.deletionPolicy': 'Delete',
  77. 'spec.dependencies.storage.inCluster.deletionPolicy': 'Delete'
  78. }
  79. if delete_pvc:
  80. del_configs.update({'spec.dependencies.etcd.inCluster.pvcDeletion': True,
  81. 'spec.dependencies.pulsar.inCluster.pvcDeletion': True,
  82. 'spec.dependencies.kafka.inCluster.pvcDeletion': True,
  83. 'spec.dependencies.storage.inCluster.pvcDeletion': True
  84. })
  85. if delete_depends or delete_pvc:
  86. self.upgrade(release_name, del_configs, namespace=namespace)
  87. cus_res.delete(release_name)
  88. def upgrade(self, release_name, configs, namespace='default'):
  89. """
  90. Method: patch custom resource object to upgrade milvus
  91. Params:
  92. release_name: release name of milvus
  93. configs: a dict type like configurations to be upgrade milvus
  94. namespace: namespace that the milvus is running in
  95. """
  96. if not isinstance(configs, dict):
  97. log.error("customize configurations must be in dict type")
  98. return None
  99. d_configs = benedict()
  100. for key in configs.keys():
  101. d_configs[key] = configs[key]
  102. cus_res = CusResource(kind=self.plural, group=self.group,
  103. version=self.version, namespace=namespace)
  104. log.debug(f"upgrade milvus with configs: {d_configs}")
  105. cus_res.patch(release_name, d_configs)
  106. self.wait_for_healthy(release_name, namespace=namespace)
  107. def rolling_update(self, release_name, new_image_name, namespace='default'):
  108. """
  109. Method: patch custom resource object to rolling update milvus
  110. Params:
  111. release_name: release name of milvus
  112. namespace: namespace that the milvus is running in
  113. """
  114. cus_res = CusResource(kind=self.plural, group=self.group,
  115. version=self.version, namespace=namespace)
  116. rolling_configs = {'spec.components.enableRollingUpdate': True,
  117. 'spec.components.imageUpdateMode': "rollingUpgrade",
  118. 'spec.components.image': new_image_name}
  119. log.debug(f"rolling update milvus with configs: {rolling_configs}")
  120. cus_res.patch(release_name, rolling_configs)
  121. self.wait_for_healthy(release_name, namespace=namespace)
  122. def scale(self, release_name, component, replicas, namespace='default'):
  123. """
  124. Method: scale milvus components by replicas
  125. Params:
  126. release_name: release name of milvus
  127. replicas: the number of replicas to scale
  128. component: the component to scale, e.g: dataNode, queryNode, indexNode, proxy
  129. namespace: namespace that the milvus is running in
  130. """
  131. cus_res = CusResource(kind=self.plural, group=self.group,
  132. version=self.version, namespace=namespace)
  133. component = component.replace('node', 'Node')
  134. scale_configs = {f'spec.components.{component}.replicas': replicas}
  135. log.info(f"scale milvus with configs: {scale_configs}")
  136. self.upgrade(release_name, scale_configs, namespace=namespace)
  137. self.wait_for_healthy(release_name, namespace=namespace)
  138. def wait_for_healthy(self, release_name, namespace='default', timeout=600):
  139. """
  140. Method: wait a milvus instance until healthy or timeout
  141. Params:
  142. release_name: release name of milvus
  143. namespace: namespace that the milvus is running in
  144. timeout: default: 600 seconds
  145. """
  146. cus_res = CusResource(kind=self.plural, group=self.group,
  147. version=self.version, namespace=namespace)
  148. starttime = time.time()
  149. log.info(f"start to check healthy: {starttime}")
  150. while time.time() < starttime + timeout:
  151. time.sleep(10)
  152. res_object = cus_res.get(release_name)
  153. mic_status = res_object.get('status', None)
  154. if mic_status is not None:
  155. if 'Healthy' == mic_status.get('status'):
  156. log.info(f"milvus healthy in {time.time() - starttime} seconds")
  157. return True
  158. else:
  159. log.info(f"milvus status is: {mic_status.get('status')}")
  160. log.info(f"end to check healthy until timeout {timeout}")
  161. return False
  162. def endpoint(self, release_name, namespace='default'):
  163. """
  164. Method: get Milvus endpoint by name and namespace
  165. Return: a string type endpoint. e.g: host:port
  166. """
  167. endpoint = None
  168. cus_res = CusResource(kind=self.plural, group=self.group,
  169. version=self.version, namespace=namespace)
  170. res_object = cus_res.get(release_name)
  171. if res_object.get('status', None) is not None:
  172. endpoint = res_object['status']['endpoint']
  173. return endpoint
  174. def etcd_endpoints(self, release_name, namespace='default'):
  175. """
  176. Method: get etcd endpoints by name and namespace
  177. Return: a string type etcd endpoints. e.g: host:port
  178. """
  179. etcd_endpoints = None
  180. cus_res = CusResource(kind=self.plural, group=self.group,
  181. version=self.version, namespace=namespace)
  182. res_object = cus_res.get(release_name)
  183. try:
  184. etcd_endpoints = res_object['spec']['dependencies']['etcd']['endpoints']
  185. except KeyError:
  186. log.info("etcd endpoints not found")
  187. # get pod ip by pod name
  188. label_selector = f"app.kubernetes.io/instance={release_name}-etcd, app.kubernetes.io/name=etcd"
  189. res = get_pod_ip_name_pairs(namespace, label_selector)
  190. if res:
  191. etcd_endpoints = [f"{pod_ip}:2379" for pod_ip in res.keys()]
  192. return etcd_endpoints[0]