cross_cluster_routing_test.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package crossclusterrouting
  17. import (
  18. "fmt"
  19. "math/rand"
  20. "strconv"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/stretchr/testify/suite"
  25. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  26. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  27. "github.com/milvus-io/milvus/internal/proto/datapb"
  28. "github.com/milvus-io/milvus/internal/proto/proxypb"
  29. "github.com/milvus-io/milvus/internal/proto/querypb"
  30. "github.com/milvus-io/milvus/internal/proto/workerpb"
  31. "github.com/milvus-io/milvus/pkg/util/commonpbutil"
  32. "github.com/milvus-io/milvus/pkg/util/merr"
  33. "github.com/milvus-io/milvus/pkg/util/paramtable"
  34. "github.com/milvus-io/milvus/tests/integration"
  35. )
  36. type CrossClusterRoutingSuite struct {
  37. integration.MiniClusterSuite
  38. }
  39. func (s *CrossClusterRoutingSuite) SetupSuite() {
  40. rand.Seed(time.Now().UnixNano())
  41. s.Require().NoError(s.SetupEmbedEtcd())
  42. paramtable.Init()
  43. paramtable.Get().Save("grpc.client.maxMaxAttempts", "1")
  44. }
  45. func (s *CrossClusterRoutingSuite) TearDownSuite() {
  46. s.TearDownEmbedEtcd()
  47. paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10))
  48. }
  49. func (s *CrossClusterRoutingSuite) TestCrossClusterRouting() {
  50. const (
  51. waitFor = time.Second * 10
  52. duration = time.Millisecond * 10
  53. )
  54. go func() {
  55. for {
  56. select {
  57. case <-time.After(15 * time.Second):
  58. return
  59. default:
  60. err := paramtable.Get().Save(paramtable.Get().CommonCfg.ClusterPrefix.Key, fmt.Sprintf("%d", rand.Int()))
  61. if err != nil {
  62. panic(err)
  63. }
  64. }
  65. }
  66. }()
  67. // test rootCoord
  68. s.Eventually(func() bool {
  69. resp, err := s.Cluster.RootCoordClient.ShowCollections(s.Cluster.GetContext(), &milvuspb.ShowCollectionsRequest{
  70. Base: commonpbutil.NewMsgBase(
  71. commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
  72. ),
  73. DbName: "fake_db_name",
  74. })
  75. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  76. if err != nil {
  77. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  78. }
  79. return false
  80. }, waitFor, duration)
  81. // test dataCoord
  82. s.Eventually(func() bool {
  83. resp, err := s.Cluster.DataCoordClient.GetRecoveryInfoV2(s.Cluster.GetContext(), &datapb.GetRecoveryInfoRequestV2{})
  84. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  85. if err != nil {
  86. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  87. }
  88. return false
  89. }, waitFor, duration)
  90. // test queryCoord
  91. s.Eventually(func() bool {
  92. resp, err := s.Cluster.QueryCoordClient.LoadCollection(s.Cluster.GetContext(), &querypb.LoadCollectionRequest{})
  93. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  94. if err != nil {
  95. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  96. }
  97. return false
  98. }, waitFor, duration)
  99. // test proxy
  100. s.Eventually(func() bool {
  101. resp, err := s.Cluster.ProxyClient.InvalidateCollectionMetaCache(s.Cluster.GetContext(), &proxypb.InvalidateCollMetaCacheRequest{})
  102. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  103. if err != nil {
  104. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  105. }
  106. return false
  107. }, waitFor, duration)
  108. // test dataNode
  109. s.Eventually(func() bool {
  110. resp, err := s.Cluster.DataNodeClient.FlushSegments(s.Cluster.GetContext(), &datapb.FlushSegmentsRequest{})
  111. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  112. if err != nil {
  113. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  114. }
  115. return false
  116. }, waitFor, duration)
  117. // test queryNode
  118. s.Eventually(func() bool {
  119. resp, err := s.Cluster.QueryNodeClient.Search(s.Cluster.GetContext(), &querypb.SearchRequest{})
  120. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  121. if err != nil {
  122. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  123. }
  124. return false
  125. }, waitFor, duration)
  126. // test indexNode
  127. s.Eventually(func() bool {
  128. resp, err := s.Cluster.IndexNodeClient.CreateJob(s.Cluster.GetContext(), &workerpb.CreateJobRequest{})
  129. s.Suite.T().Logf("resp: %s, err: %s", resp, err)
  130. if err != nil {
  131. return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
  132. }
  133. return false
  134. }, waitFor, duration)
  135. }
  136. func TestCrossClusterRoutingSuite(t *testing.T) {
  137. suite.Run(t, new(CrossClusterRoutingSuite))
  138. }