compaction_trigger_v2_test.go 10 KB


  1. package datacoord
  2. import (
  3. "context"
  4. "strconv"
  5. "testing"
  6. "github.com/samber/lo"
  7. "github.com/stretchr/testify/mock"
  8. "github.com/stretchr/testify/suite"
  9. "go.uber.org/zap"
  10. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  11. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  12. "github.com/milvus-io/milvus/internal/datacoord/allocator"
  13. "github.com/milvus-io/milvus/internal/metastore/model"
  14. "github.com/milvus-io/milvus/internal/proto/datapb"
  15. "github.com/milvus-io/milvus/pkg/common"
  16. "github.com/milvus-io/milvus/pkg/log"
  17. "github.com/milvus-io/milvus/pkg/util/paramtable"
  18. )
  19. func TestCompactionTriggerManagerSuite(t *testing.T) {
  20. suite.Run(t, new(CompactionTriggerManagerSuite))
  21. }
  22. type CompactionTriggerManagerSuite struct {
  23. suite.Suite
  24. mockAlloc *allocator.MockAllocator
  25. handler Handler
  26. mockPlanContext *MockCompactionPlanContext
  27. testLabel *CompactionGroupLabel
  28. meta *meta
  29. triggerManager *CompactionTriggerManager
  30. }
  31. func (s *CompactionTriggerManagerSuite) SetupTest() {
  32. s.mockAlloc = allocator.NewMockAllocator(s.T())
  33. s.handler = NewNMockHandler(s.T())
  34. s.mockPlanContext = NewMockCompactionPlanContext(s.T())
  35. s.testLabel = &CompactionGroupLabel{
  36. CollectionID: 1,
  37. PartitionID: 10,
  38. Channel: "ch-1",
  39. }
  40. segments := genSegmentsForMeta(s.testLabel)
  41. s.meta = &meta{segments: NewSegmentsInfo()}
  42. for id, segment := range segments {
  43. s.meta.segments.SetSegment(id, segment)
  44. }
  45. s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, s.meta)
  46. }
  47. func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
  48. handler := NewNMockHandler(s.T())
  49. handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
  50. s.triggerManager.handler = handler
  51. collSegs := s.meta.GetCompactableSegmentGroupByCollection()
  52. segments, found := collSegs[1]
  53. s.Require().True(found)
  54. seg1, found := lo.Find(segments, func(info *SegmentInfo) bool {
  55. return info.ID == int64(100) && info.GetLevel() == datapb.SegmentLevel_L0
  56. })
  57. s.Require().True(found)
  58. // Prepare only 1 l0 segment that doesn't meet the Trigger minimum condition
  59. // but ViewIDLE Trigger will still forceTrigger the plan
  60. latestL0Segments := GetViewsByInfo(seg1)
  61. expectedSegID := seg1.ID
  62. s.Require().Equal(1, len(latestL0Segments))
  63. needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
  64. s.True(needRefresh)
  65. s.Require().Equal(1, len(levelZeroView))
  66. cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
  67. s.True(ok)
  68. s.NotNil(cView)
  69. log.Info("view", zap.Any("cView", cView))
  70. s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil)
  71. s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
  72. RunAndReturn(func(task *datapb.CompactionTask) error {
  73. s.EqualValues(19530, task.GetTriggerID())
  74. // s.True(signal.isGlobal)
  75. // s.False(signal.isForce)
  76. s.EqualValues(30000, task.GetPos().GetTimestamp())
  77. s.Equal(s.testLabel.CollectionID, task.GetCollectionID())
  78. s.Equal(s.testLabel.PartitionID, task.GetPartitionID())
  79. s.Equal(s.testLabel.Channel, task.GetChannel())
  80. s.Equal(datapb.CompactionType_Level0DeleteCompaction, task.GetType())
  81. expectedSegs := []int64{expectedSegID}
  82. s.ElementsMatch(expectedSegs, task.GetInputSegments())
  83. return nil
  84. }).Return(nil).Once()
  85. s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe()
  86. s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView)
  87. }
  88. func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
  89. handler := NewNMockHandler(s.T())
  90. handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
  91. s.triggerManager.handler = handler
  92. collSegs := s.meta.GetCompactableSegmentGroupByCollection()
  93. segments, found := collSegs[1]
  94. s.Require().True(found)
  95. levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
  96. return info.GetLevel() == datapb.SegmentLevel_L0
  97. })
  98. latestL0Segments := GetViewsByInfo(levelZeroSegments...)
  99. s.Require().NotEmpty(latestL0Segments)
  100. needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
  101. s.Require().True(needRefresh)
  102. s.Require().Equal(1, len(levelZeroView))
  103. cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
  104. s.True(ok)
  105. s.NotNil(cView)
  106. log.Info("view", zap.Any("cView", cView))
  107. s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil)
  108. s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
  109. RunAndReturn(func(task *datapb.CompactionTask) error {
  110. s.EqualValues(19530, task.GetTriggerID())
  111. // s.True(signal.isGlobal)
  112. // s.False(signal.isForce)
  113. s.EqualValues(30000, task.GetPos().GetTimestamp())
  114. s.Equal(s.testLabel.CollectionID, task.GetCollectionID())
  115. s.Equal(s.testLabel.PartitionID, task.GetPartitionID())
  116. s.Equal(s.testLabel.Channel, task.GetChannel())
  117. s.Equal(datapb.CompactionType_Level0DeleteCompaction, task.GetType())
  118. expectedSegs := []int64{100, 101, 102}
  119. s.ElementsMatch(expectedSegs, task.GetInputSegments())
  120. return nil
  121. }).Return(nil).Once()
  122. s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe()
  123. s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView)
  124. }
  125. func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
  126. var (
  127. collectionID = int64(1000)
  128. fieldID = int64(2000)
  129. indexID = int64(3000)
  130. )
  131. paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(100))
  132. defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key)
  133. paramtable.Get().Save(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key, strconv.Itoa(200))
  134. defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key)
  135. s.triggerManager.meta = &meta{
  136. indexMeta: &indexMeta{
  137. indexes: map[UniqueID]map[UniqueID]*model.Index{
  138. collectionID: {
  139. indexID + 1: &model.Index{
  140. CollectionID: collectionID,
  141. FieldID: fieldID + 1,
  142. IndexID: indexID + 1,
  143. IndexName: "",
  144. IsDeleted: false,
  145. CreateTime: 0,
  146. TypeParams: nil,
  147. IndexParams: []*commonpb.KeyValuePair{
  148. {Key: common.IndexTypeKey, Value: "DISKANN"},
  149. },
  150. IsAutoIndex: false,
  151. UserIndexParams: nil,
  152. },
  153. indexID + 2: &model.Index{
  154. CollectionID: collectionID,
  155. FieldID: fieldID + 2,
  156. IndexID: indexID + 2,
  157. IndexName: "",
  158. IsDeleted: false,
  159. CreateTime: 0,
  160. TypeParams: nil,
  161. IndexParams: []*commonpb.KeyValuePair{
  162. {Key: common.IndexTypeKey, Value: "DISKANN"},
  163. },
  164. IsAutoIndex: false,
  165. UserIndexParams: nil,
  166. },
  167. },
  168. },
  169. },
  170. }
  171. s.Run("all DISKANN", func() {
  172. collection := &collectionInfo{
  173. ID: collectionID,
  174. Schema: &schemapb.CollectionSchema{
  175. Name: "coll1",
  176. Description: "",
  177. Fields: []*schemapb.FieldSchema{
  178. {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
  179. {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  180. {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  181. },
  182. EnableDynamicField: false,
  183. Properties: nil,
  184. },
  185. }
  186. s.Equal(int64(200*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
  187. })
  188. s.Run("HNSW & DISKANN", func() {
  189. s.triggerManager.meta = &meta{
  190. indexMeta: &indexMeta{
  191. indexes: map[UniqueID]map[UniqueID]*model.Index{
  192. collectionID: {
  193. indexID + 1: &model.Index{
  194. CollectionID: collectionID,
  195. FieldID: fieldID + 1,
  196. IndexID: indexID + 1,
  197. IndexName: "",
  198. IsDeleted: false,
  199. CreateTime: 0,
  200. TypeParams: nil,
  201. IndexParams: []*commonpb.KeyValuePair{
  202. {Key: common.IndexTypeKey, Value: "HNSW"},
  203. },
  204. IsAutoIndex: false,
  205. UserIndexParams: nil,
  206. },
  207. indexID + 2: &model.Index{
  208. CollectionID: collectionID,
  209. FieldID: fieldID + 2,
  210. IndexID: indexID + 2,
  211. IndexName: "",
  212. IsDeleted: false,
  213. CreateTime: 0,
  214. TypeParams: nil,
  215. IndexParams: []*commonpb.KeyValuePair{
  216. {Key: common.IndexTypeKey, Value: "DISKANN"},
  217. },
  218. IsAutoIndex: false,
  219. UserIndexParams: nil,
  220. },
  221. },
  222. },
  223. },
  224. }
  225. collection := &collectionInfo{
  226. ID: collectionID,
  227. Schema: &schemapb.CollectionSchema{
  228. Name: "coll1",
  229. Description: "",
  230. Fields: []*schemapb.FieldSchema{
  231. {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
  232. {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  233. {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  234. },
  235. EnableDynamicField: false,
  236. Properties: nil,
  237. },
  238. }
  239. s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
  240. })
  241. s.Run("some vector has no index", func() {
  242. s.triggerManager.meta = &meta{
  243. indexMeta: &indexMeta{
  244. indexes: map[UniqueID]map[UniqueID]*model.Index{
  245. collectionID: {
  246. indexID + 1: &model.Index{
  247. CollectionID: collectionID,
  248. FieldID: fieldID + 1,
  249. IndexID: indexID + 1,
  250. IndexName: "",
  251. IsDeleted: false,
  252. CreateTime: 0,
  253. TypeParams: nil,
  254. IndexParams: []*commonpb.KeyValuePair{
  255. {Key: common.IndexTypeKey, Value: "HNSW"},
  256. },
  257. IsAutoIndex: false,
  258. UserIndexParams: nil,
  259. },
  260. },
  261. },
  262. },
  263. }
  264. collection := &collectionInfo{
  265. ID: collectionID,
  266. Schema: &schemapb.CollectionSchema{
  267. Name: "coll1",
  268. Description: "",
  269. Fields: []*schemapb.FieldSchema{
  270. {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
  271. {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  272. {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
  273. },
  274. EnableDynamicField: false,
  275. Properties: nil,
  276. },
  277. }
  278. s.Equal(int64(100*1024*1024), getExpectedSegmentSize(s.triggerManager.meta, collection))
  279. })
  280. }