maintenance_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package client
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "testing"
  22. "time"
  23. "github.com/stretchr/testify/mock"
  24. "github.com/stretchr/testify/suite"
  25. "go.uber.org/atomic"
  26. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  27. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  28. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  29. "github.com/milvus-io/milvus/pkg/util/merr"
  30. )
  31. type MaintenanceSuite struct {
  32. MockSuiteBase
  33. }
  34. func (s *MaintenanceSuite) TestLoadCollection() {
  35. ctx, cancel := context.WithCancel(context.Background())
  36. defer cancel()
  37. s.Run("success", func() {
  38. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  39. fieldNames := []string{"id", "part", "vector"}
  40. replicaNum := rand.Intn(3) + 1
  41. done := atomic.NewBool(false)
  42. s.mock.EXPECT().LoadCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, lcr *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
  43. s.Equal(collectionName, lcr.GetCollectionName())
  44. s.ElementsMatch(fieldNames, lcr.GetLoadFields())
  45. s.True(lcr.SkipLoadDynamicField)
  46. s.EqualValues(replicaNum, lcr.GetReplicaNumber())
  47. return merr.Success(), nil
  48. }).Once()
  49. s.mock.EXPECT().GetLoadingProgress(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, glpr *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
  50. s.Equal(collectionName, glpr.GetCollectionName())
  51. progress := int64(50)
  52. if done.Load() {
  53. progress = 100
  54. }
  55. return &milvuspb.GetLoadingProgressResponse{
  56. Status: merr.Success(),
  57. Progress: progress,
  58. }, nil
  59. })
  60. defer s.mock.EXPECT().GetLoadingProgress(mock.Anything, mock.Anything).Unset()
  61. task, err := s.client.LoadCollection(ctx, NewLoadCollectionOption(collectionName).
  62. WithReplica(replicaNum).
  63. WithLoadFields(fieldNames...).
  64. WithSkipLoadDynamicField(true))
  65. s.NoError(err)
  66. ch := make(chan struct{})
  67. go func() {
  68. defer close(ch)
  69. err := task.Await(ctx)
  70. s.NoError(err)
  71. }()
  72. select {
  73. case <-ch:
  74. s.FailNow("task done before index state set to finish")
  75. case <-time.After(time.Second):
  76. }
  77. done.Store(true)
  78. select {
  79. case <-ch:
  80. case <-time.After(time.Second):
  81. s.FailNow("task not done after index set finished")
  82. }
  83. })
  84. s.Run("failure", func() {
  85. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  86. s.mock.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
  87. _, err := s.client.LoadCollection(ctx, NewLoadCollectionOption(collectionName))
  88. s.Error(err)
  89. })
  90. }
  91. func (s *MaintenanceSuite) TestLoadPartitions() {
  92. ctx, cancel := context.WithCancel(context.Background())
  93. defer cancel()
  94. s.Run("success", func() {
  95. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  96. partitionName := fmt.Sprintf("part_%s", s.randString(6))
  97. fieldNames := []string{"id", "part", "vector"}
  98. replicaNum := rand.Intn(3) + 1
  99. done := atomic.NewBool(false)
  100. s.mock.EXPECT().LoadPartitions(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, lpr *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
  101. s.Equal(collectionName, lpr.GetCollectionName())
  102. s.ElementsMatch([]string{partitionName}, lpr.GetPartitionNames())
  103. s.ElementsMatch(fieldNames, lpr.GetLoadFields())
  104. s.True(lpr.SkipLoadDynamicField)
  105. s.EqualValues(replicaNum, lpr.GetReplicaNumber())
  106. return merr.Success(), nil
  107. }).Once()
  108. s.mock.EXPECT().GetLoadingProgress(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, glpr *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
  109. s.Equal(collectionName, glpr.GetCollectionName())
  110. s.ElementsMatch([]string{partitionName}, glpr.GetPartitionNames())
  111. progress := int64(50)
  112. if done.Load() {
  113. progress = 100
  114. }
  115. return &milvuspb.GetLoadingProgressResponse{
  116. Status: merr.Success(),
  117. Progress: progress,
  118. }, nil
  119. })
  120. defer s.mock.EXPECT().GetLoadingProgress(mock.Anything, mock.Anything).Unset()
  121. task, err := s.client.LoadPartitions(ctx, NewLoadPartitionsOption(collectionName, partitionName).
  122. WithReplica(replicaNum).
  123. WithLoadFields(fieldNames...).
  124. WithSkipLoadDynamicField(true))
  125. s.NoError(err)
  126. ch := make(chan struct{})
  127. go func() {
  128. defer close(ch)
  129. err := task.Await(ctx)
  130. s.NoError(err)
  131. }()
  132. select {
  133. case <-ch:
  134. s.FailNow("task done before index state set to finish")
  135. case <-time.After(time.Second):
  136. }
  137. done.Store(true)
  138. select {
  139. case <-ch:
  140. case <-time.After(time.Second):
  141. s.FailNow("task not done after index set finished")
  142. }
  143. })
  144. s.Run("failure", func() {
  145. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  146. partitionName := fmt.Sprintf("part_%s", s.randString(6))
  147. s.mock.EXPECT().LoadPartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
  148. _, err := s.client.LoadPartitions(ctx, NewLoadPartitionsOption(collectionName, partitionName))
  149. s.Error(err)
  150. })
  151. }
  152. func (s *MaintenanceSuite) TestReleaseCollection() {
  153. ctx, cancel := context.WithCancel(context.Background())
  154. defer cancel()
  155. s.Run("success", func() {
  156. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  157. s.mock.EXPECT().ReleaseCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, rcr *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
  158. s.Equal(collectionName, rcr.GetCollectionName())
  159. return merr.Success(), nil
  160. }).Once()
  161. err := s.client.ReleaseCollection(ctx, NewReleaseCollectionOption(collectionName))
  162. s.NoError(err)
  163. })
  164. s.Run("failure", func() {
  165. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  166. s.mock.EXPECT().ReleaseCollection(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
  167. err := s.client.ReleaseCollection(ctx, NewReleaseCollectionOption(collectionName))
  168. s.Error(err)
  169. })
  170. }
  171. func (s *MaintenanceSuite) TestReleasePartitions() {
  172. ctx, cancel := context.WithCancel(context.Background())
  173. defer cancel()
  174. s.Run("success", func() {
  175. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  176. partitionName := fmt.Sprintf("part_%s", s.randString(6))
  177. s.mock.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, rpr *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
  178. s.Equal(collectionName, rpr.GetCollectionName())
  179. s.ElementsMatch([]string{partitionName}, rpr.GetPartitionNames())
  180. return merr.Success(), nil
  181. }).Once()
  182. err := s.client.ReleasePartitions(ctx, NewReleasePartitionsOptions(collectionName, partitionName))
  183. s.NoError(err)
  184. })
  185. s.Run("failure", func() {
  186. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  187. partitionName := fmt.Sprintf("part_%s", s.randString(6))
  188. s.mock.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
  189. err := s.client.ReleasePartitions(ctx, NewReleasePartitionsOptions(collectionName, partitionName))
  190. s.Error(err)
  191. })
  192. }
  193. func (s *MaintenanceSuite) TestFlush() {
  194. ctx, cancel := context.WithCancel(context.Background())
  195. defer cancel()
  196. s.Run("success", func() {
  197. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  198. done := atomic.NewBool(false)
  199. s.mock.EXPECT().Flush(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, fr *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
  200. s.ElementsMatch([]string{collectionName}, fr.GetCollectionNames())
  201. return &milvuspb.FlushResponse{
  202. Status: merr.Success(),
  203. CollSegIDs: map[string]*schemapb.LongArray{
  204. collectionName: {Data: []int64{1, 2, 3}},
  205. },
  206. CollFlushTs: map[string]uint64{collectionName: 321},
  207. }, nil
  208. }).Once()
  209. s.mock.EXPECT().GetFlushState(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, gfsr *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
  210. s.Equal(collectionName, gfsr.GetCollectionName())
  211. s.ElementsMatch([]int64{1, 2, 3}, gfsr.GetSegmentIDs())
  212. s.EqualValues(321, gfsr.GetFlushTs())
  213. return &milvuspb.GetFlushStateResponse{
  214. Status: merr.Success(),
  215. Flushed: done.Load(),
  216. }, nil
  217. })
  218. defer s.mock.EXPECT().GetFlushState(mock.Anything, mock.Anything).Unset()
  219. task, err := s.client.Flush(ctx, NewFlushOption(collectionName))
  220. s.NoError(err)
  221. ch := make(chan struct{})
  222. go func() {
  223. defer close(ch)
  224. err := task.Await(ctx)
  225. s.NoError(err)
  226. }()
  227. select {
  228. case <-ch:
  229. s.FailNow("task done before index state set to finish")
  230. case <-time.After(time.Second):
  231. }
  232. done.Store(true)
  233. select {
  234. case <-ch:
  235. case <-time.After(time.Second):
  236. s.FailNow("task not done after index set finished")
  237. }
  238. })
  239. s.Run("failure", func() {
  240. collectionName := fmt.Sprintf("coll_%s", s.randString(6))
  241. s.mock.EXPECT().Flush(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
  242. _, err := s.client.Flush(ctx, NewFlushOption(collectionName))
  243. s.Error(err)
  244. })
  245. }
  246. func TestMaintenance(t *testing.T) {
  247. suite.Run(t, new(MaintenanceSuite))
  248. }