meta_watcher_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package integration
  17. import (
  18. "context"
  19. "strconv"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/suite"
  23. "go.uber.org/zap"
  24. "google.golang.org/protobuf/proto"
  25. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  26. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  27. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  28. "github.com/milvus-io/milvus/pkg/common"
  29. "github.com/milvus-io/milvus/pkg/log"
  30. "github.com/milvus-io/milvus/pkg/util/funcutil"
  31. "github.com/milvus-io/milvus/pkg/util/metric"
  32. )
  33. type MetaWatcherSuite struct {
  34. MiniClusterSuite
  35. }
  36. func (s *MetaWatcherSuite) TestShowSessions() {
  37. sessions, err := s.Cluster.MetaWatcher.ShowSessions()
  38. s.NoError(err)
  39. for _, session := range sessions {
  40. log.Info("ShowSessions result", zap.Any("session", session))
  41. }
  42. log.Info("TestShowSessions succeed")
  43. }
  44. func (s *MetaWatcherSuite) TestShowSegments() {
  45. c := s.Cluster
  46. ctx, cancel := context.WithCancel(c.GetContext())
  47. defer cancel()
  48. prefix := "TestShowSegments"
  49. dbName := ""
  50. collectionName := prefix + funcutil.GenRandomStr()
  51. int64Field := "int64"
  52. floatVecField := "fvec"
  53. dim := 128
  54. rowNum := 3000
  55. constructCollectionSchema := func() *schemapb.CollectionSchema {
  56. pk := &schemapb.FieldSchema{
  57. FieldID: 0,
  58. Name: int64Field,
  59. IsPrimaryKey: true,
  60. Description: "",
  61. DataType: schemapb.DataType_Int64,
  62. TypeParams: nil,
  63. IndexParams: nil,
  64. AutoID: true,
  65. }
  66. fVec := &schemapb.FieldSchema{
  67. FieldID: 0,
  68. Name: floatVecField,
  69. IsPrimaryKey: false,
  70. Description: "",
  71. DataType: schemapb.DataType_FloatVector,
  72. TypeParams: []*commonpb.KeyValuePair{
  73. {
  74. Key: common.DimKey,
  75. Value: strconv.Itoa(dim),
  76. },
  77. },
  78. IndexParams: nil,
  79. AutoID: false,
  80. }
  81. return &schemapb.CollectionSchema{
  82. Name: collectionName,
  83. Description: "",
  84. AutoID: false,
  85. Fields: []*schemapb.FieldSchema{
  86. pk,
  87. fVec,
  88. },
  89. }
  90. }
  91. schema := constructCollectionSchema()
  92. marshaledSchema, err := proto.Marshal(schema)
  93. s.NoError(err)
  94. createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
  95. DbName: dbName,
  96. CollectionName: collectionName,
  97. Schema: marshaledSchema,
  98. ShardsNum: common.DefaultShardsNum,
  99. })
  100. s.NoError(err)
  101. s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
  102. log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
  103. showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
  104. s.NoError(err)
  105. s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  106. log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
  107. fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim)
  108. hashKeys := GenerateHashKeys(rowNum)
  109. insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
  110. DbName: dbName,
  111. CollectionName: collectionName,
  112. FieldsData: []*schemapb.FieldData{fVecColumn},
  113. HashKeys: hashKeys,
  114. NumRows: uint32(rowNum),
  115. })
  116. s.NoError(err)
  117. s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  118. // flush
  119. flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
  120. DbName: dbName,
  121. CollectionNames: []string{collectionName},
  122. })
  123. s.NoError(err)
  124. segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
  125. ids := segmentIDs.GetData()
  126. s.Require().NotEmpty(segmentIDs)
  127. s.Require().True(has)
  128. flushTs, has := flushResp.GetCollFlushTs()[collectionName]
  129. s.True(has)
  130. s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
  131. segments, err := c.MetaWatcher.ShowSegments()
  132. s.NoError(err)
  133. s.NotEmpty(segments)
  134. for _, segment := range segments {
  135. log.Info("ShowSegments result", zap.String("segment", segment.String()))
  136. }
  137. log.Info("TestShowSegments succeed")
  138. }
  139. func (s *MetaWatcherSuite) TestShowReplicas() {
  140. c := s.Cluster
  141. ctx, cancel := context.WithCancel(c.GetContext())
  142. defer cancel()
  143. prefix := "TestShowReplicas"
  144. dbName := ""
  145. collectionName := prefix + funcutil.GenRandomStr()
  146. int64Field := "int64"
  147. floatVecField := "fvec"
  148. dim := 128
  149. rowNum := 3000
  150. constructCollectionSchema := func() *schemapb.CollectionSchema {
  151. pk := &schemapb.FieldSchema{
  152. FieldID: 0,
  153. Name: int64Field,
  154. IsPrimaryKey: true,
  155. Description: "",
  156. DataType: schemapb.DataType_Int64,
  157. TypeParams: nil,
  158. IndexParams: nil,
  159. AutoID: true,
  160. }
  161. fVec := &schemapb.FieldSchema{
  162. FieldID: 0,
  163. Name: floatVecField,
  164. IsPrimaryKey: false,
  165. Description: "",
  166. DataType: schemapb.DataType_FloatVector,
  167. TypeParams: []*commonpb.KeyValuePair{
  168. {
  169. Key: common.DimKey,
  170. Value: strconv.Itoa(dim),
  171. },
  172. },
  173. IndexParams: nil,
  174. AutoID: false,
  175. }
  176. return &schemapb.CollectionSchema{
  177. Name: collectionName,
  178. Description: "",
  179. AutoID: false,
  180. Fields: []*schemapb.FieldSchema{
  181. pk,
  182. fVec,
  183. },
  184. }
  185. }
  186. schema := constructCollectionSchema()
  187. marshaledSchema, err := proto.Marshal(schema)
  188. s.NoError(err)
  189. createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
  190. DbName: dbName,
  191. CollectionName: collectionName,
  192. Schema: marshaledSchema,
  193. ShardsNum: common.DefaultShardsNum,
  194. })
  195. s.NoError(err)
  196. if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  197. log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason()))
  198. }
  199. s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
  200. log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
  201. showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
  202. s.NoError(err)
  203. s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  204. log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
  205. fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim)
  206. hashKeys := GenerateHashKeys(rowNum)
  207. insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
  208. DbName: dbName,
  209. CollectionName: collectionName,
  210. FieldsData: []*schemapb.FieldData{fVecColumn},
  211. HashKeys: hashKeys,
  212. NumRows: uint32(rowNum),
  213. })
  214. s.NoError(err)
  215. s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  216. // flush
  217. flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
  218. DbName: dbName,
  219. CollectionNames: []string{collectionName},
  220. })
  221. s.NoError(err)
  222. segments, err := c.MetaWatcher.ShowSegments()
  223. s.NoError(err)
  224. s.NotEmpty(segments)
  225. for _, segment := range segments {
  226. log.Info("ShowSegments result", zap.String("segment", segment.String()))
  227. }
  228. segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
  229. ids := segmentIDs.GetData()
  230. s.Require().NotEmpty(segmentIDs)
  231. s.Require().True(has)
  232. flushTs, has := flushResp.GetCollFlushTs()[collectionName]
  233. s.True(has)
  234. s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
  235. // create index
  236. createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
  237. CollectionName: collectionName,
  238. FieldName: floatVecField,
  239. IndexName: "_default",
  240. ExtraParams: []*commonpb.KeyValuePair{
  241. {
  242. Key: common.DimKey,
  243. Value: strconv.Itoa(dim),
  244. },
  245. {
  246. Key: common.MetricTypeKey,
  247. Value: metric.L2,
  248. },
  249. {
  250. Key: common.IndexTypeKey,
  251. Value: "IVF_FLAT",
  252. },
  253. {
  254. Key: "nlist",
  255. Value: strconv.Itoa(10),
  256. },
  257. },
  258. })
  259. s.NoError(err)
  260. if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  261. log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
  262. }
  263. s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
  264. waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
  265. // load
  266. loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
  267. DbName: dbName,
  268. CollectionName: collectionName,
  269. })
  270. s.NoError(err)
  271. if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  272. log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
  273. }
  274. s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
  275. for {
  276. loadProgress, err := c.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
  277. CollectionName: collectionName,
  278. })
  279. if err != nil {
  280. panic("GetLoadingProgress fail")
  281. }
  282. if loadProgress.GetProgress() == 100 {
  283. break
  284. }
  285. time.Sleep(500 * time.Millisecond)
  286. }
  287. replicas, err := c.MetaWatcher.ShowReplicas()
  288. s.NoError(err)
  289. s.NotEmpty(replicas)
  290. for _, replica := range replicas {
  291. log.Info("ShowReplicas result", zap.String("replica", PrettyReplica(replica)))
  292. }
  293. log.Info("TestShowReplicas succeed")
  294. }
  295. func TestMetaWatcher(t *testing.T) {
  296. suite.Run(t, new(MetaWatcherSuite))
  297. }