resource_group_test.go 9.6 KB


  1. package rg
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/samber/lo"
  8. "github.com/stretchr/testify/suite"
  9. "google.golang.org/protobuf/proto"
  10. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  11. "github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
  12. "github.com/milvus-io/milvus/pkg/util/merr"
  13. "github.com/milvus-io/milvus/pkg/util/paramtable"
  14. "github.com/milvus-io/milvus/tests/integration"
  15. )
  16. const (
  17. DefaultResourceGroup = "__default_resource_group"
  18. RecycleResourceGroup = "__recycle_resource_group"
  19. )
  20. type collectionConfig struct {
  21. resourceGroups []string
  22. createCfg *integration.CreateCollectionConfig
  23. }
  24. type resourceGroupConfig struct {
  25. expectedNodeNum int
  26. rgCfg *rgpb.ResourceGroupConfig
  27. }
  28. type ResourceGroupTestSuite struct {
  29. integration.MiniClusterSuite
  30. rgs map[string]*resourceGroupConfig
  31. collections map[string]*collectionConfig
  32. }
  33. func (s *ResourceGroupTestSuite) SetupSuite() {
  34. paramtable.Init()
  35. paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000")
  36. paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "1")
  37. paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1")
  38. s.MiniClusterSuite.SetupSuite()
  39. }
  40. func (s *ResourceGroupTestSuite) TestResourceGroup() {
  41. ctx := context.Background()
  42. s.rgs = map[string]*resourceGroupConfig{
  43. DefaultResourceGroup: {
  44. expectedNodeNum: 1,
  45. rgCfg: newRGConfig(1, 1),
  46. },
  47. RecycleResourceGroup: {
  48. expectedNodeNum: 0,
  49. rgCfg: &rgpb.ResourceGroupConfig{
  50. Requests: &rgpb.ResourceGroupLimit{
  51. NodeNum: 0,
  52. },
  53. Limits: &rgpb.ResourceGroupLimit{
  54. NodeNum: 10000,
  55. },
  56. },
  57. },
  58. "rg1": {
  59. expectedNodeNum: 0,
  60. rgCfg: newRGConfig(0, 0),
  61. },
  62. "rg2": {
  63. expectedNodeNum: 0,
  64. rgCfg: newRGConfig(0, 0),
  65. },
  66. }
  67. s.initResourceGroup(ctx)
  68. s.assertResourceGroup(ctx)
  69. // only one node in rg
  70. s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
  71. s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
  72. s.syncResourceConfig(ctx)
  73. s.assertResourceGroup(ctx)
  74. s.rgs[DefaultResourceGroup].expectedNodeNum = 2
  75. s.Cluster.AddQueryNode()
  76. s.syncResourceConfig(ctx)
  77. s.assertResourceGroup(ctx)
  78. s.rgs[RecycleResourceGroup].expectedNodeNum = 3
  79. s.Cluster.AddQueryNodes(3)
  80. s.syncResourceConfig(ctx)
  81. s.assertResourceGroup(ctx)
  82. // node in recycle rg should be balanced to rg1 and rg2
  83. s.rgs["rg1"].rgCfg.Requests.NodeNum = 1
  84. s.rgs["rg1"].rgCfg.Limits.NodeNum = 1
  85. s.rgs["rg1"].expectedNodeNum = 1
  86. s.rgs["rg2"].rgCfg.Requests.NodeNum = 2
  87. s.rgs["rg2"].rgCfg.Limits.NodeNum = 2
  88. s.rgs["rg2"].expectedNodeNum = 2
  89. s.rgs[RecycleResourceGroup].expectedNodeNum = 0
  90. s.syncResourceConfig(ctx)
  91. s.assertResourceGroup(ctx)
  92. s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 1
  93. s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
  94. s.rgs[DefaultResourceGroup].expectedNodeNum = 2
  95. s.syncResourceConfig(ctx)
  96. s.assertResourceGroup(ctx)
  97. // redundant node in default rg should be balanced to recycle rg
  98. s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 1
  99. s.rgs[DefaultResourceGroup].expectedNodeNum = 1
  100. s.rgs[RecycleResourceGroup].expectedNodeNum = 1
  101. s.syncResourceConfig(ctx)
  102. s.assertResourceGroup(ctx)
  103. }
  104. func (s *ResourceGroupTestSuite) TestWithReplica() {
  105. ctx := context.Background()
  106. s.rgs = map[string]*resourceGroupConfig{
  107. DefaultResourceGroup: {
  108. expectedNodeNum: 1,
  109. rgCfg: newRGConfig(1, 1),
  110. },
  111. RecycleResourceGroup: {
  112. expectedNodeNum: 0,
  113. rgCfg: &rgpb.ResourceGroupConfig{
  114. Requests: &rgpb.ResourceGroupLimit{
  115. NodeNum: 0,
  116. },
  117. Limits: &rgpb.ResourceGroupLimit{
  118. NodeNum: 10000,
  119. },
  120. },
  121. },
  122. "rg1": {
  123. expectedNodeNum: 1,
  124. rgCfg: newRGConfig(1, 1),
  125. },
  126. "rg2": {
  127. expectedNodeNum: 2,
  128. rgCfg: newRGConfig(2, 2),
  129. },
  130. }
  131. s.collections = map[string]*collectionConfig{
  132. "c1": {
  133. resourceGroups: []string{DefaultResourceGroup},
  134. createCfg: newCreateCollectionConfig("c1"),
  135. },
  136. "c2": {
  137. resourceGroups: []string{"rg1"},
  138. createCfg: newCreateCollectionConfig("c2"),
  139. },
  140. "c3": {
  141. resourceGroups: []string{"rg2"},
  142. createCfg: newCreateCollectionConfig("c3"),
  143. },
  144. }
  145. // create resource group
  146. s.initResourceGroup(ctx)
  147. s.Cluster.AddQueryNodes(3)
  148. time.Sleep(100 * time.Millisecond)
  149. s.assertResourceGroup(ctx)
  150. // create and load replicas for testing.
  151. s.createAndLoadCollections(ctx)
  152. s.assertReplica(ctx)
  153. // TODO: current balancer is not working well on move segment between nodes, open following test after fix it.
  154. // // test transfer replica and nodes.
  155. // // transfer one of replica in c3 from rg2 into DEFAULT rg.
  156. // s.collections["c3"].resourceGroups = []string{DefaultResourceGroup, "rg2"}
  157. //
  158. // status, err := s.Cluster.Proxy.TransferReplica(ctx, &milvuspb.TransferReplicaRequest{
  159. // DbName: s.collections["c3"].createCfg.DBName,
  160. // CollectionName: s.collections["c3"].createCfg.CollectionName,
  161. // SourceResourceGroup: "rg2",
  162. // TargetResourceGroup: DefaultResourceGroup,
  163. // NumReplica: 1,
  164. // })
  165. //
  166. // s.NoError(err)
  167. // s.True(merr.Ok(status))
  168. //
  169. // // test transfer node from rg2 into DEFAULT_RESOURCE_GROUP
  170. // s.rgs[DefaultResourceGroup].rgCfg.Requests.NodeNum = 2
  171. // s.rgs[DefaultResourceGroup].rgCfg.Limits.NodeNum = 2
  172. // s.rgs[DefaultResourceGroup].expectedNodeNum = 2
  173. // s.rgs["rg2"].rgCfg.Requests.NodeNum = 1
  174. // s.rgs["rg2"].rgCfg.Limits.NodeNum = 1
  175. // s.rgs["rg2"].expectedNodeNum = 1
  176. // s.syncResourceConfig(ctx)
  177. //
  178. // s.Eventually(func() bool {
  179. // return s.assertReplica(ctx)
  180. // }, 10*time.Minute, 30*time.Second)
  181. }
  182. func (s *ResourceGroupTestSuite) syncResourceConfig(ctx context.Context) {
  183. req := &milvuspb.UpdateResourceGroupsRequest{
  184. ResourceGroups: make(map[string]*rgpb.ResourceGroupConfig),
  185. }
  186. for rgName, cfg := range s.rgs {
  187. req.ResourceGroups[rgName] = cfg.rgCfg
  188. }
  189. status, err := s.Cluster.Proxy.UpdateResourceGroups(ctx, req)
  190. s.NoError(err)
  191. s.True(merr.Ok(status))
  192. // wait for recovery.
  193. time.Sleep(100 * time.Millisecond)
  194. }
  195. func (s *ResourceGroupTestSuite) assertResourceGroup(ctx context.Context) {
  196. resp, err := s.Cluster.Proxy.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
  197. s.NoError(err)
  198. s.True(merr.Ok(resp.Status))
  199. s.ElementsMatch(resp.ResourceGroups, lo.Keys(s.rgs))
  200. for _, rg := range resp.ResourceGroups {
  201. resp, err := s.Cluster.Proxy.DescribeResourceGroup(ctx, &milvuspb.DescribeResourceGroupRequest{
  202. ResourceGroup: rg,
  203. })
  204. s.NoError(err)
  205. s.True(merr.Ok(resp.Status))
  206. s.Equal(s.rgs[rg].expectedNodeNum, len(resp.ResourceGroup.Nodes))
  207. s.True(proto.Equal(s.rgs[rg].rgCfg, resp.ResourceGroup.Config))
  208. }
  209. }
  210. func (s *ResourceGroupTestSuite) initResourceGroup(ctx context.Context) {
  211. status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
  212. ResourceGroup: RecycleResourceGroup,
  213. Config: s.rgs[RecycleResourceGroup].rgCfg,
  214. })
  215. s.NoError(err)
  216. s.True(merr.Ok(status))
  217. for rgName, cfg := range s.rgs {
  218. if rgName == RecycleResourceGroup || rgName == DefaultResourceGroup {
  219. continue
  220. }
  221. status, err := s.Cluster.Proxy.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
  222. ResourceGroup: rgName,
  223. Config: cfg.rgCfg,
  224. })
  225. s.NoError(err)
  226. s.True(merr.Ok(status))
  227. }
  228. status, err = s.Cluster.Proxy.UpdateResourceGroups(ctx, &milvuspb.UpdateResourceGroupsRequest{
  229. ResourceGroups: map[string]*rgpb.ResourceGroupConfig{
  230. DefaultResourceGroup: s.rgs[DefaultResourceGroup].rgCfg,
  231. },
  232. })
  233. s.NoError(err)
  234. s.True(merr.Ok(status))
  235. }
  236. func (s *ResourceGroupTestSuite) createAndLoadCollections(ctx context.Context) {
  237. wg := &sync.WaitGroup{}
  238. for _, cfg := range s.collections {
  239. cfg := cfg
  240. wg.Add(1)
  241. go func() {
  242. defer wg.Done()
  243. s.CreateCollectionWithConfiguration(ctx, cfg.createCfg)
  244. loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
  245. DbName: cfg.createCfg.DBName,
  246. CollectionName: cfg.createCfg.CollectionName,
  247. ReplicaNumber: int32(len(cfg.resourceGroups)),
  248. ResourceGroups: cfg.resourceGroups,
  249. })
  250. s.NoError(err)
  251. s.True(merr.Ok(loadStatus))
  252. s.WaitForLoad(ctx, cfg.createCfg.CollectionName)
  253. }()
  254. }
  255. wg.Wait()
  256. }
  257. func (s *ResourceGroupTestSuite) assertReplica(ctx context.Context) bool {
  258. for _, cfg := range s.collections {
  259. resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
  260. CollectionName: cfg.createCfg.CollectionName,
  261. DbName: cfg.createCfg.DBName,
  262. })
  263. s.NoError(err)
  264. s.True(merr.Ok(resp.Status))
  265. rgs := make(map[string]int)
  266. for _, rg := range cfg.resourceGroups {
  267. rgs[rg]++
  268. }
  269. for _, replica := range resp.GetReplicas() {
  270. s.True(rgs[replica.ResourceGroupName] > 0)
  271. rgs[replica.ResourceGroupName]--
  272. s.NotZero(len(replica.NodeIds))
  273. if len(replica.NumOutboundNode) > 0 {
  274. return false
  275. }
  276. }
  277. for _, v := range rgs {
  278. s.Zero(v)
  279. }
  280. }
  281. return true
  282. }
  283. func newRGConfig(request int, limit int) *rgpb.ResourceGroupConfig {
  284. return &rgpb.ResourceGroupConfig{
  285. Requests: &rgpb.ResourceGroupLimit{
  286. NodeNum: int32(request),
  287. },
  288. Limits: &rgpb.ResourceGroupLimit{
  289. NodeNum: int32(limit),
  290. },
  291. TransferFrom: []*rgpb.ResourceGroupTransfer{
  292. {
  293. ResourceGroup: RecycleResourceGroup,
  294. },
  295. },
  296. TransferTo: []*rgpb.ResourceGroupTransfer{
  297. {
  298. ResourceGroup: RecycleResourceGroup,
  299. },
  300. },
  301. }
  302. }
  303. func newCreateCollectionConfig(collectionName string) *integration.CreateCollectionConfig {
  304. return &integration.CreateCollectionConfig{
  305. DBName: "",
  306. CollectionName: collectionName,
  307. ChannelNum: 2,
  308. SegmentNum: 2,
  309. RowNumPerSegment: 100,
  310. Dim: 128,
  311. }
  312. }
  313. func TestResourceGroup(t *testing.T) {
  314. suite.Run(t, new(ResourceGroupTestSuite))
  315. }