123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package rg
- import (
- "context"
- "sync"
- "testing"
- "time"
- "github.com/samber/lo"
- "github.com/stretchr/testify/suite"
- "google.golang.org/protobuf/proto"
- "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
- "github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
- "github.com/milvus-io/milvus/pkg/util/merr"
- "github.com/milvus-io/milvus/pkg/util/paramtable"
- "github.com/milvus-io/milvus/tests/integration"
- )
- const (
- DefaultResourceGroup = "__default_resource_group"
- RecycleResourceGroup = "__recycle_resource_group"
- )
- type collectionConfig struct {
- resourceGroups []string
- createCfg *integration.CreateCollectionConfig
- }
- type resourceGroupConfig struct {
- expectedNodeNum int
- rgCfg *rgpb.ResourceGroupConfig
- }
- type ResourceGroupTestSuite struct {
- integration.MiniClusterSuite
- rgs map[string]*resourceGroupConfig
- collections map[string]*collectionConfig
- }
- func (s *ResourceGroupTestSuite) SetupSuite() {
- paramtable.Init()
- paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000")
- paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "1")
- paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1")
- s.MiniClusterSuite.SetupSuite()
- }
- func (s *ResourceGroupTestSuite) TestResourceGroup() {
- ctx := context.Background()
- s.rgs = map[string]*resourceGroupConfig{
- DefaultResourceGroup: {
- expectedNodeNum: 1,
- rgCfg: newRGConfig(1, 1),
- },
- RecycleResourceGroup: {
- expectedNodeNum: 0,
- rgCfg: &rgpb.ResourceGroupConfig{
- Requests: &rgpb.ResourceGroupLimit{
- NodeNum: 0,
- },
- Limits: &rgpb.ResourceGroupLimit{
- NodeNum: 10000,
- },
- },
- },
- "rg1": {
- expectedNodeNum: 0,
- rgCfg: newRGConfig(0, 0),
- },
- "rg2": {
- expectedNodeNum: 0,
- rgCfg: newRGConfig(0, 0),
- },
- }
- s.initResourceGroup(ctx)
- s.assertResourceGroup(ctx)
- // only one node in rg
- s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
- s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- s.rgs[DefaultResourceGroup].expectedNodeNum = 2
- s.Cluster.AddQueryNode()
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- s.rgs[RecycleResourceGroup].expectedNodeNum = 3
- s.Cluster.AddQueryNodes(3)
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- // node in recycle rg should be balanced to rg1 and rg2
- s.rgs["rg1"].rgCfg.Requests.NodeNum = 1
- s.rgs["rg1"].rgCfg.Limits.NodeNum = 1
- s.rgs["rg1"].expectedNodeNum = 1
- s.rgs["rg2"].rgCfg.Requests.NodeNum = 2
- s.rgs["rg2"].rgCfg.Limits.NodeNum = 2
- s.rgs["rg2"].expectedNodeNum = 2
- s.rgs[RecycleResourceGroup].expectedNodeNum = 0
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 1
- s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
- s.rgs[DefaultResourceGroup].expectedNodeNum = 2
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- // redundant node in default rg should be balanced to recycle rg
- s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 1
- s.rgs[DefaultResourceGroup].expectedNodeNum = 1
- s.rgs[RecycleResourceGroup].expectedNodeNum = 1
- s.syncResourceConfig(ctx)
- s.assertResourceGroup(ctx)
- }
- func (s *ResourceGroupTestSuite) TestWithReplica() {
- ctx := context.Background()
- s.rgs = map[string]*resourceGroupConfig{
- DefaultResourceGroup: {
- expectedNodeNum: 1,
- rgCfg: newRGConfig(1, 1),
- },
- RecycleResourceGroup: {
- expectedNodeNum: 0,
- rgCfg: &rgpb.ResourceGroupConfig{
- Requests: &rgpb.ResourceGroupLimit{
- NodeNum: 0,
- },
- Limits: &rgpb.ResourceGroupLimit{
- NodeNum: 10000,
- },
- },
- },
- "rg1": {
- expectedNodeNum: 1,
- rgCfg: newRGConfig(1, 1),
- },
- "rg2": {
- expectedNodeNum: 2,
- rgCfg: newRGConfig(2, 2),
- },
- }
- s.collections = map[string]*collectionConfig{
- "c1": {
- resourceGroups: []string{DefaultResourceGroup},
- createCfg: newCreateCollectionConfig("c1"),
- },
- "c2": {
- resourceGroups: []string{"rg1"},
- createCfg: newCreateCollectionConfig("c2"),
- },
- "c3": {
- resourceGroups: []string{"rg2"},
- createCfg: newCreateCollectionConfig("c3"),
- },
- }
- // create resource group
- s.initResourceGroup(ctx)
- s.Cluster.AddQueryNodes(3)
- time.Sleep(100 * time.Millisecond)
- s.assertResourceGroup(ctx)
- // create and load replicas for testing.
- s.createAndLoadCollections(ctx)
- s.assertReplica(ctx)
- // TODO: current balancer is not working well on move segment between nodes, open following test after fix it.
- // // test transfer replica and nodes.
- // // transfer one of replica in c3 from rg2 into DEFAULT rg.
- // s.collections["c3"].resourceGroups = []string{DefaultResourceGroup, "rg2"}
- //
- // status, err := s.Cluster.Proxy.TransferReplica(ctx, &milvuspb.TransferReplicaRequest{
- // DbName: s.collections["c3"].createCfg.DBName,
- // CollectionName: s.collections["c3"].createCfg.CollectionName,
- // SourceResourceGroup: "rg2",
- // TargetResourceGroup: DefaultResourceGroup,
- // NumReplica: 1,
- // })
- //
- // s.NoError(err)
- // s.True(merr.Ok(status))
- //
- // // test transfer node from rg2 into DEFAULT_RESOURCE_GROUP
- // s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
- // s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
- // s.rgs[DefaultResourceGroup].expectedNodeNum = 2
- // s.rgs["rg2"].rgCfg.Requests.NodeNum = 1
- // s.rgs["rg2"].rgCfg.Limits.NodeNum = 1
- // s.rgs["rg2"].expectedNodeNum = 1
- // s.syncResourceConfig(ctx)
- //
- // s.Eventually(func() bool {
- // return s.assertReplica(ctx)
- // }, 10*time.Minute, 30*time.Second)
- }
- func (s *ResourceGroupTestSuite) syncResourceConfig(ctx context.Context) {
- req := &milvuspb.UpdateResourceGroupsRequest{
- ResourceGroups: make(map[string]*rgpb.ResourceGroupConfig),
- }
- for rgName, cfg := range s.rgs {
- req.ResourceGroups[rgName] = cfg.rgCfg
- }
- status, err := s.Cluster.Proxy.UpdateResourceGroups(ctx, req)
- s.NoError(err)
- s.True(merr.Ok(status))
- // wait for recovery.
- time.Sleep(100 * time.Millisecond)
- }
- func (s *ResourceGroupTestSuite) assertResourceGroup(ctx context.Context) {
- resp, err := s.Cluster.Proxy.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
- s.NoError(err)
- s.True(merr.Ok(resp.Status))
- s.ElementsMatch(resp.ResourceGroups, lo.Keys(s.rgs))
- for _, rg := range resp.ResourceGroups {
- resp, err := s.Cluster.Proxy.DescribeResourceGroup(ctx, &milvuspb.DescribeResourceGroupRequest{
- ResourceGroup: rg,
- })
- s.NoError(err)
- s.True(merr.Ok(resp.Status))
- s.Equal(s.rgs[rg].expectedNodeNum, len(resp.ResourceGroup.Nodes))
- s.True(proto.Equal(s.rgs[rg].rgCfg, resp.ResourceGroup.Config))
- }
- }
- func (s *ResourceGroupTestSuite) initResourceGroup(ctx context.Context) {
- status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
- ResourceGroup: RecycleResourceGroup,
- Config: s.rgs[RecycleResourceGroup].rgCfg,
- })
- s.NoError(err)
- s.True(merr.Ok(status))
- for rgName, cfg := range s.rgs {
- if rgName == RecycleResourceGroup || rgName == DefaultResourceGroup {
- continue
- }
- status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
- ResourceGroup: rgName,
- Config: cfg.rgCfg,
- })
- s.NoError(err)
- s.True(merr.Ok(status))
- }
- status, err = s.Cluster.Proxy.UpdateResourceGroups(ctx, &milvuspb.UpdateResourceGroupsRequest{
- ResourceGroups: map[string]*rgpb.ResourceGroupConfig{
- DefaultResourceGroup: s.rgs[DefaultResourceGroup].rgCfg,
- },
- })
- s.NoError(err)
- s.True(merr.Ok(status))
- }
- func (s *ResourceGroupTestSuite) createAndLoadCollections(ctx context.Context) {
- wg := &sync.WaitGroup{}
- for _, cfg := range s.collections {
- cfg := cfg
- wg.Add(1)
- go func() {
- defer wg.Done()
- s.CreateCollectionWithConfiguration(ctx, cfg.createCfg)
- loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
- DbName: cfg.createCfg.DBName,
- CollectionName: cfg.createCfg.CollectionName,
- ReplicaNumber: int32(len(cfg.resourceGroups)),
- ResourceGroups: cfg.resourceGroups,
- })
- s.NoError(err)
- s.True(merr.Ok(loadStatus))
- s.WaitForLoad(ctx, cfg.createCfg.CollectionName)
- }()
- }
- wg.Wait()
- }
- func (s *ResourceGroupTestSuite) assertReplica(ctx context.Context) bool {
- for _, cfg := range s.collections {
- resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
- CollectionName: cfg.createCfg.CollectionName,
- DbName: cfg.createCfg.DBName,
- })
- s.NoError(err)
- s.True(merr.Ok(resp.Status))
- rgs := make(map[string]int)
- for _, rg := range cfg.resourceGroups {
- rgs[rg]++
- }
- for _, replica := range resp.GetReplicas() {
- s.True(rgs[replica.ResourceGroupName] > 0)
- rgs[replica.ResourceGroupName]--
- s.NotZero(len(replica.NodeIds))
- if len(replica.NumOutboundNode) > 0 {
- return false
- }
- }
- for _, v := range rgs {
- s.Zero(v)
- }
- }
- return true
- }
- func newRGConfig(request int, limit int) *rgpb.ResourceGroupConfig {
- return &rgpb.ResourceGroupConfig{
- Requests: &rgpb.ResourceGroupLimit{
- NodeNum: int32(request),
- },
- Limits: &rgpb.ResourceGroupLimit{
- NodeNum: int32(limit),
- },
- TransferFrom: []*rgpb.ResourceGroupTransfer{
- {
- ResourceGroup: RecycleResourceGroup,
- },
- },
- TransferTo: []*rgpb.ResourceGroupTransfer{
- {
- ResourceGroup: RecycleResourceGroup,
- },
- },
- }
- }
- func newCreateCollectionConfig(collectionName string) *integration.CreateCollectionConfig {
- return &integration.CreateCollectionConfig{
- DBName: "",
- CollectionName: collectionName,
- ChannelNum: 2,
- SegmentNum: 2,
- RowNumPerSegment: 100,
- Dim: 128,
- }
- }
- func TestResourceGroup(t *testing.T) {
- suite.Run(t, new(ResourceGroupTestSuite))
- }
|