util_collection.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package integration
  2. import (
  3. "context"
  4. "strconv"
  5. "strings"
  6. "go.uber.org/zap"
  7. "google.golang.org/protobuf/proto"
  8. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  9. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  10. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  11. "github.com/milvus-io/milvus/pkg/common"
  12. "github.com/milvus-io/milvus/pkg/log"
  13. "github.com/milvus-io/milvus/pkg/util/merr"
  14. "github.com/milvus-io/milvus/pkg/util/metric"
  15. )
  16. type CreateCollectionConfig struct {
  17. DBName string
  18. CollectionName string
  19. ChannelNum int
  20. SegmentNum int
  21. RowNumPerSegment int
  22. Dim int
  23. ReplicaNumber int32
  24. ResourceGroups []string
  25. }
  26. func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) {
  27. schema := ConstructSchema(cfg.CollectionName, cfg.Dim, true)
  28. marshaledSchema, err := proto.Marshal(schema)
  29. s.NoError(err)
  30. s.NotNil(marshaledSchema)
  31. createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
  32. DbName: cfg.DBName,
  33. CollectionName: cfg.CollectionName,
  34. Schema: marshaledSchema,
  35. ShardsNum: int32(cfg.ChannelNum),
  36. Properties: []*commonpb.KeyValuePair{
  37. {
  38. Key: common.CollectionReplicaNumber,
  39. Value: strconv.FormatInt(int64(cfg.ReplicaNumber), 10),
  40. },
  41. {
  42. Key: common.CollectionResourceGroups,
  43. Value: strings.Join(cfg.ResourceGroups, ","),
  44. },
  45. },
  46. })
  47. s.NoError(err)
  48. s.True(merr.Ok(createCollectionStatus))
  49. log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
  50. showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{DbName: cfg.DBName})
  51. s.NoError(err)
  52. s.True(merr.Ok(showCollectionsResp.Status))
  53. log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
  54. for i := 0; i < cfg.SegmentNum; i++ {
  55. fVecColumn := NewFloatVectorFieldData(FloatVecField, cfg.RowNumPerSegment, cfg.Dim)
  56. hashKeys := GenerateHashKeys(cfg.RowNumPerSegment)
  57. insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
  58. DbName: cfg.DBName,
  59. CollectionName: cfg.CollectionName,
  60. FieldsData: []*schemapb.FieldData{fVecColumn},
  61. HashKeys: hashKeys,
  62. NumRows: uint32(cfg.RowNumPerSegment),
  63. })
  64. s.NoError(err)
  65. s.True(merr.Ok(insertResult.Status))
  66. flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
  67. DbName: cfg.DBName,
  68. CollectionNames: []string{cfg.CollectionName},
  69. })
  70. s.NoError(err)
  71. segmentIDs, has := flushResp.GetCollSegIDs()[cfg.CollectionName]
  72. ids := segmentIDs.GetData()
  73. s.Require().NotEmpty(segmentIDs)
  74. s.Require().True(has)
  75. flushTs, has := flushResp.GetCollFlushTs()[cfg.CollectionName]
  76. s.True(has)
  77. s.WaitForFlush(ctx, ids, flushTs, cfg.DBName, cfg.CollectionName)
  78. }
  79. // create index
  80. createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
  81. DbName: cfg.DBName,
  82. CollectionName: cfg.CollectionName,
  83. FieldName: FloatVecField,
  84. IndexName: "_default",
  85. ExtraParams: ConstructIndexParam(cfg.Dim, IndexFaissIvfFlat, metric.L2),
  86. })
  87. s.NoError(err)
  88. s.True(merr.Ok(createIndexStatus))
  89. s.WaitForIndexBuiltWithDB(ctx, cfg.DBName, cfg.CollectionName, FloatVecField)
  90. }