test_customize_segment_size.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import pytest
  2. import time
  3. from pymilvus import connections
  4. from utils.util_log import test_log as log
  5. from base.collection_wrapper import ApiCollectionWrapper
  6. from base.utility_wrapper import ApiUtilityWrapper
  7. from common import common_func as cf
  8. from common import common_type as ct
  9. from milvus_operator import MilvusOperator
  10. from common.milvus_sys import MilvusSys
  11. from common.common_type import CaseLabel
  12. namespace = 'chaos-testing'
  13. def _install_milvus(seg_size):
  14. release_name = f"mil-segsize-{seg_size}-" + cf.gen_digits_by_length(6)
  15. cus_configs = {'spec.components.image': 'milvusdb/milvus:master-latest',
  16. 'metadata.namespace': namespace,
  17. 'metadata.name': release_name,
  18. 'spec.components.proxy.serviceType': 'LoadBalancer',
  19. 'spec.config.dataCoord.segment.maxSize': seg_size
  20. }
  21. milvus_op = MilvusOperator()
  22. log.info(f"install milvus with configs: {cus_configs}")
  23. milvus_op.install(cus_configs)
  24. healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200)
  25. log.info(f"milvus healthy: {healthy}")
  26. if healthy:
  27. endpoint = milvus_op.endpoint(release_name, namespace).split(':')
  28. log.info(f"milvus endpoint: {endpoint}")
  29. host = endpoint[0]
  30. port = endpoint[1]
  31. return release_name, host, port
  32. else:
  33. return release_name, None, None
  34. class TestCustomizeSegmentSize:
  35. def teardown_method(self):
  36. pass
  37. milvus_op = MilvusOperator()
  38. milvus_op.uninstall(self.release_name, namespace)
  39. connections.disconnect("default")
  40. connections.remove_connection("default")
  41. @pytest.mark.tags(CaseLabel.L3)
  42. @pytest.mark.parametrize('seg_size, seg_count', [(128, 10), (1024, 2)])
  43. def test_customize_segment_size(self, seg_size, seg_count):
  44. """
  45. steps
  46. """
  47. log.info(f"start to install milvus with segment size {seg_size}")
  48. release_name, host, port = _install_milvus(seg_size)
  49. self.release_name = release_name
  50. assert host is not None
  51. conn = connections.connect("default", host=host, port=port)
  52. assert conn is not None
  53. mil = MilvusSys(alias="default")
  54. log.info(f"milvus build version: {mil.build_version}")
  55. log.info(f"start to e2e verification: {seg_size}")
  56. # create
  57. name = cf.gen_unique_str("segsiz")
  58. t0 = time.time()
  59. collection_w = ApiCollectionWrapper()
  60. collection_w.init_collection(name=name,
  61. schema=cf.gen_default_collection_schema(),
  62. timeout=40)
  63. tt = time.time() - t0
  64. assert collection_w.name == name
  65. entities = collection_w.num_entities
  66. log.info(f"assert create collection: {tt}, init_entities: {entities}")
  67. # insert
  68. nb = 50000
  69. data = cf.gen_default_list_data(nb=nb)
  70. t0 = time.time()
  71. _, res = collection_w.insert(data)
  72. tt = time.time() - t0
  73. log.info(f"assert insert: {tt}")
  74. assert res
  75. # insert 2 million entities
  76. rounds = 40
  77. for _ in range(rounds-1):
  78. _, res = collection_w.insert(data)
  79. entities = collection_w.num_entities
  80. assert entities == nb * rounds
  81. # load
  82. collection_w.load()
  83. utility_wrap = ApiUtilityWrapper()
  84. segs, _ = utility_wrap.get_query_segment_info(collection_w.name)
  85. log.info(f"assert segments: {len(segs)}")
  86. assert len(segs) == seg_count
  87. # search
  88. search_vectors = cf.gen_vectors(1, ct.default_dim)
  89. search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
  90. t0 = time.time()
  91. res_1, _ = collection_w.search(data=search_vectors,
  92. anns_field=ct.default_float_vec_field_name,
  93. param=search_params, limit=1, timeout=30)
  94. tt = time.time() - t0
  95. log.info(f"assert search: {tt}")
  96. assert len(res_1) == 1
  97. collection_w.release()
  98. # index
  99. d = cf.gen_default_list_data()
  100. collection_w.insert(d)
  101. log.info(f"assert index entities: {collection_w.num_entities}")
  102. _index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"}
  103. t0 = time.time()
  104. index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
  105. index_params=_index_params,
  106. name=cf.gen_unique_str(), timeout=120)
  107. tt = time.time() - t0
  108. log.info(f"assert index: {tt}")
  109. assert len(collection_w.indexes) == 1
  110. # search
  111. t0 = time.time()
  112. collection_w.load()
  113. tt = time.time() - t0
  114. log.info(f"assert load: {tt}")
  115. search_vectors = cf.gen_vectors(1, ct.default_dim)
  116. t0 = time.time()
  117. res_1, _ = collection_w.search(data=search_vectors,
  118. anns_field=ct.default_float_vec_field_name,
  119. param=search_params, limit=1, timeout=30)
  120. tt = time.time() - t0
  121. log.info(f"assert search: {tt}")
  122. # query
  123. term_expr = f'{ct.default_int64_field_name} in [1001,1201,4999,2999]'
  124. t0 = time.time()
  125. res, _ = collection_w.query(term_expr, timeout=30)
  126. tt = time.time() - t0
  127. log.info(f"assert query result {len(res)}: {tt}")