test_simd_compat.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import pytest
  2. import time
  3. from pymilvus import connections
  4. from utils.util_log import test_log as log
  5. from base.collection_wrapper import ApiCollectionWrapper
  6. from common import common_func as cf
  7. from common import common_type as ct
  8. from milvus_operator import MilvusOperator
  9. from common.milvus_sys import MilvusSys
  10. from common.common_type import CaseLabel
  11. # sorted by the priority order of the simd
  12. # | configuration | possible returned SIMD |
  13. # |--------|----------|
  14. # | auto | avx512 / avx2 / sse4_2|
  15. # | avx512 | avx512 / avx2 / sse4_2|
  16. # | avx2 | avx2 / sse4_2|
  17. # | avx | sse4_2|
  18. # | sse4_2 | sse4_2|
  19. supported_simd_types = ["avx512", "avx2", "avx", "sse4_2"]
  20. namespace = 'chaos-testing'
  21. def _install_milvus(simd):
  22. release_name = f"mil-{simd.replace('_','-')}-" + cf.gen_digits_by_length(6)
  23. cus_configs = {'spec.components.image': 'harbor.milvus.io/milvus/milvus:master-latest',
  24. 'metadata.namespace': namespace,
  25. 'metadata.name': release_name,
  26. 'spec.config.common.simdType': simd
  27. }
  28. milvus_op = MilvusOperator()
  29. log.info(f"install milvus with configs: {cus_configs}")
  30. milvus_op.install(cus_configs)
  31. healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200)
  32. log.info(f"milvus healthy: {healthy}")
  33. if healthy:
  34. endpoint = milvus_op.endpoint(release_name, namespace).split(':')
  35. log.info(f"milvus endpoint: {endpoint}")
  36. host = endpoint[0]
  37. port = endpoint[1]
  38. return release_name, host, port
  39. else:
  40. return release_name, None, None
  41. class TestSimdCompatibility:
  42. def teardown_method(self):
  43. milvus_op = MilvusOperator()
  44. milvus_op.uninstall(self.release_name, namespace)
  45. @pytest.mark.tags(CaseLabel.L3)
  46. @pytest.mark.parametrize('simd', supported_simd_types)
  47. def test_simd_compat_e2e(self, simd):
  48. """
  49. steps
  50. 1. [test_milvus_install]: set up milvus with customized simd configured
  51. 2. [test_simd_compat_e2e]: verify milvus is working well
  52. 4. [test_milvus_cleanup]: delete milvus instances in teardown
  53. """
  54. log.info(f"start to install milvus with simd {simd}")
  55. release_name, host, port = _install_milvus(simd)
  56. time.sleep(10)
  57. self.release_name = release_name
  58. assert host is not None
  59. conn = connections.connect("default", host=host, port=port)
  60. assert conn is not None
  61. mil = MilvusSys(alias="default")
  62. log.info(f"milvus build version: {mil.build_version}")
  63. log.info(f"milvus simdType: {mil.simd_type}")
  64. assert str(mil.simd_type).lower() == simd.lower()
  65. log.info(f"start to e2e verification: {simd}")
  66. # create
  67. prefix = "simd_"
  68. name = cf.gen_unique_str(prefix)
  69. t0 = time.time()
  70. collection_w = ApiCollectionWrapper()
  71. collection_w.init_collection_wrap(name=name)
  72. tt = time.time() - t0
  73. assert collection_w.name == name
  74. entities = collection_w.num_entities
  75. log.info(f"assert create collection: {tt}, init_entities: {entities}")
  76. # insert
  77. for _ in range(10):
  78. data = cf.gen_default_list_data(nb=300)
  79. t0 = time.time()
  80. _, res = collection_w.insert(data)
  81. tt = time.time() - t0
  82. log.info(f"assert insert: {tt}")
  83. assert res
  84. # flush
  85. t0 = time.time()
  86. _, check_result = collection_w.flush(timeout=180)
  87. assert check_result
  88. assert collection_w.num_entities == len(data[0]) + entities
  89. tt = time.time() - t0
  90. entities = collection_w.num_entities
  91. log.info(f"assert flush: {tt}, entities: {entities}")
  92. # index
  93. index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"}
  94. t0 = time.time()
  95. index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
  96. index_params=index_params,
  97. index_name=cf.gen_unique_str())
  98. index, _ = collection_w.create_index(field_name=ct.default_string_field_name,
  99. index_params={},
  100. index_name=cf.gen_unique_str())
  101. tt = time.time() - t0
  102. log.info(f"assert index: {tt}")
  103. assert len(collection_w.indexes) == 2
  104. # load
  105. collection_w.load()
  106. # search
  107. search_vectors = cf.gen_vectors(1, ct.default_dim)
  108. search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
  109. t0 = time.time()
  110. res_1, _ = collection_w.search(data=search_vectors,
  111. anns_field=ct.default_float_vec_field_name,
  112. param=search_params, limit=1)
  113. tt = time.time() - t0
  114. log.info(f"assert search: {tt}")
  115. assert len(res_1) == 1
  116. # release
  117. collection_w.release()
  118. # insert
  119. d = cf.gen_default_list_data()
  120. collection_w.insert(d)
  121. # search
  122. t0 = time.time()
  123. collection_w.load()
  124. tt = time.time() - t0
  125. log.info(f"assert load: {tt}")
  126. nq = 5
  127. topk = 5
  128. search_vectors = cf.gen_vectors(nq, ct.default_dim)
  129. t0 = time.time()
  130. res, _ = collection_w.search(data=search_vectors,
  131. anns_field=ct.default_float_vec_field_name,
  132. param=search_params, limit=topk)
  133. tt = time.time() - t0
  134. log.info(f"assert search: {tt}")
  135. assert len(res) == nq
  136. assert len(res[0]) <= topk
  137. # query
  138. term_expr = f'{ct.default_int64_field_name} in [1, 2, 3, 4]'
  139. t0 = time.time()
  140. res, _ = collection_w.query(term_expr)
  141. tt = time.time() - t0
  142. log.info(f"assert query result {len(res)}: {tt}")
  143. assert len(res) >= 4