123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- // Licensed to the LF AI & Data foundation under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package integration
- import (
- "context"
- "fmt"
- "math"
- "net"
- "path"
- "sync"
- "time"
- "github.com/cockroachdb/errors"
- grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
- clientv3 "go.etcd.io/etcd/client/v3"
- "go.uber.org/atomic"
- "go.uber.org/zap"
- "google.golang.org/grpc"
- "google.golang.org/grpc/backoff"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/keepalive"
- "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
- grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord"
- grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
- grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
- grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
- grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
- grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
- grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
- grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client"
- grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
- grpcquerycoordclient "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
- grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
- grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
- grpcrootcoord "github.com/milvus-io/milvus/internal/distributed/rootcoord"
- grpcrootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
- "github.com/milvus-io/milvus/internal/distributed/streaming"
- "github.com/milvus-io/milvus/internal/distributed/streamingnode"
- "github.com/milvus-io/milvus/internal/storage"
- "github.com/milvus-io/milvus/internal/types"
- "github.com/milvus-io/milvus/internal/util/dependency"
- kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
- "github.com/milvus-io/milvus/internal/util/hookutil"
- "github.com/milvus-io/milvus/internal/util/streamingutil"
- "github.com/milvus-io/milvus/pkg/log"
- "github.com/milvus-io/milvus/pkg/util/etcd"
- "github.com/milvus-io/milvus/pkg/util/paramtable"
- "github.com/milvus-io/milvus/pkg/util/typeutil"
- )
- var params *paramtable.ComponentParam = paramtable.Get()
- var (
- initOnce sync.Once
- configMap map[string]string
- )
- func DefaultParams() map[string]string {
- initOnce.Do(func() {
- testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix())
- // Notice: don't use ParamItem.Key here, the config key will be empty before param table init
- configMap = map[string]string{
- "etcd.rootPath": testPath,
- "minio.rootPath": testPath,
- "localStorage.path": path.Join("/tmp", testPath),
- "common.storageType": "local",
- "dataNode.memory.forceSyncEnable": "false", // local execution will print too many logs
- "common.gracefulStopTimeout": "30",
- }
- })
- return configMap
- }
- type MiniClusterV2 struct {
- ctx context.Context
- mu sync.RWMutex
- params map[string]string
- factory dependency.Factory
- ChunkManager storage.ChunkManager
- EtcdCli *clientv3.Client
- Proxy *grpcproxy.Server
- DataCoord *grpcdatacoord.Server
- RootCoord *grpcrootcoord.Server
- QueryCoord *grpcquerycoord.Server
- DataCoordClient types.DataCoordClient
- RootCoordClient types.RootCoordClient
- QueryCoordClient types.QueryCoordClient
- MilvusClient milvuspb.MilvusServiceClient
- ProxyClient types.ProxyClient
- DataNodeClient types.DataNodeClient
- QueryNodeClient types.QueryNodeClient
- IndexNodeClient types.IndexNodeClient
- DataNode *grpcdatanode.Server
- StreamingNode *streamingnode.Server
- QueryNode *grpcquerynode.Server
- IndexNode *grpcindexnode.Server
- MetaWatcher MetaWatcher
- ptmu sync.Mutex
- querynodes []*grpcquerynode.Server
- qnid atomic.Int64
- datanodes []*grpcdatanode.Server
- dnid atomic.Int64
- streamingnodes []*streamingnode.Server
- clientConn *grpc.ClientConn
- Extension *ReportChanExtension
- }
- type OptionV2 func(cluster *MiniClusterV2)
- func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, error) {
- cluster := &MiniClusterV2{
- ctx: ctx,
- qnid: *atomic.NewInt64(10000),
- dnid: *atomic.NewInt64(20000),
- }
- paramtable.Init()
- cluster.Extension = InitReportExtension()
- cluster.params = DefaultParams()
- for _, opt := range opts {
- opt(cluster)
- }
- for k, v := range cluster.params {
- params.Save(k, v)
- }
- // setup etcd client
- etcdConfig := ¶mtable.Get().EtcdCfg
- etcdCli, err := etcd.GetEtcdClient(
- etcdConfig.UseEmbedEtcd.GetAsBool(),
- etcdConfig.EtcdUseSSL.GetAsBool(),
- etcdConfig.Endpoints.GetAsStrings(),
- etcdConfig.EtcdTLSCert.GetValue(),
- etcdConfig.EtcdTLSKey.GetValue(),
- etcdConfig.EtcdTLSCACert.GetValue(),
- etcdConfig.EtcdTLSMinVersion.GetValue())
- if err != nil {
- return nil, err
- }
- cluster.EtcdCli = etcdCli
- if streamingutil.IsStreamingServiceEnabled() {
- streaming.Init()
- }
- cluster.MetaWatcher = &EtcdMetaWatcher{
- rootPath: etcdConfig.RootPath.GetValue(),
- etcdCli: cluster.EtcdCli,
- }
- ports, err := cluster.GetAvailablePorts(7)
- if err != nil {
- return nil, err
- }
- log.Info("minicluster ports", zap.Ints("ports", ports))
- params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0]))
- params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1]))
- params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2]))
- params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3]))
- params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4]))
- params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5]))
- params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6]))
- // setup clients
- cluster.RootCoordClient, err = grpcrootcoordclient.NewClient(ctx)
- if err != nil {
- return nil, err
- }
- cluster.DataCoordClient, err = grpcdatacoordclient.NewClient(ctx)
- if err != nil {
- return nil, err
- }
- cluster.QueryCoordClient, err = grpcquerycoordclient.NewClient(ctx)
- if err != nil {
- return nil, err
- }
- cluster.ProxyClient, err = grpcproxyclient.NewClient(ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 0)
- if err != nil {
- return nil, err
- }
- cluster.DataNodeClient, err = grpcdatanodeclient.NewClient(ctx, paramtable.Get().DataNodeGrpcClientCfg.GetAddress(), 0)
- if err != nil {
- return nil, err
- }
- cluster.QueryNodeClient, err = grpcquerynodeclient.NewClient(ctx, paramtable.Get().QueryNodeGrpcClientCfg.GetAddress(), 0)
- if err != nil {
- return nil, err
- }
- cluster.IndexNodeClient, err = grpcindexnodeclient.NewClient(ctx, paramtable.Get().IndexNodeGrpcClientCfg.GetAddress(), 0, false)
- if err != nil {
- return nil, err
- }
- // setup servers
- cluster.factory = dependency.MockDefaultFactory(true, params)
- chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
- if err != nil {
- return nil, err
- }
- cluster.ChunkManager = chunkManager
- cluster.RootCoord, err = grpcrootcoord.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- cluster.DataCoord, err = grpcdatacoord.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- cluster.QueryCoord, err = grpcquerycoord.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- cluster.Proxy, err = grpcproxy.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- cluster.DataNode, err = grpcdatanode.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- if streamingutil.IsStreamingServiceEnabled() {
- cluster.StreamingNode, err = streamingnode.NewServer(cluster.factory)
- if err != nil {
- return nil, err
- }
- }
- cluster.QueryNode, err = grpcquerynode.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- cluster.IndexNode, err = grpcindexnode.NewServer(ctx, cluster.factory)
- if err != nil {
- return nil, err
- }
- return cluster, nil
- }
- func (cluster *MiniClusterV2) AddQueryNodes(k int) []*grpcquerynode.Server {
- servers := make([]*grpcquerynode.Server, k)
- for i := 0; i < k; i++ {
- servers = append(servers, cluster.AddQueryNode())
- }
- return servers
- }
- func (cluster *MiniClusterV2) AddQueryNode() *grpcquerynode.Server {
- cluster.ptmu.Lock()
- defer cluster.ptmu.Unlock()
- cluster.qnid.Inc()
- id := cluster.qnid.Load()
- oid := paramtable.GetNodeID()
- log.Info(fmt.Sprintf("adding extra querynode with id:%d", id))
- paramtable.SetNodeID(id)
- node, err := grpcquerynode.NewServer(context.TODO(), cluster.factory)
- if err != nil {
- return nil
- }
- runComponent(node)
- paramtable.SetNodeID(oid)
- req := &milvuspb.GetComponentStatesRequest{}
- resp, err := node.GetComponentStates(context.TODO(), req)
- if err != nil {
- return nil
- }
- log.Info(fmt.Sprintf("querynode %d ComponentStates:%v", id, resp))
- cluster.querynodes = append(cluster.querynodes, node)
- return node
- }
- func (cluster *MiniClusterV2) AddDataNode() *grpcdatanode.Server {
- cluster.ptmu.Lock()
- defer cluster.ptmu.Unlock()
- cluster.qnid.Inc()
- id := cluster.qnid.Load()
- oid := paramtable.GetNodeID()
- log.Info(fmt.Sprintf("adding extra datanode with id:%d", id))
- paramtable.SetNodeID(id)
- node, err := grpcdatanode.NewServer(context.TODO(), cluster.factory)
- if err != nil {
- return nil
- }
- runComponent(node)
- paramtable.SetNodeID(oid)
- req := &milvuspb.GetComponentStatesRequest{}
- resp, err := node.GetComponentStates(context.TODO(), req)
- if err != nil {
- return nil
- }
- log.Info(fmt.Sprintf("datanode %d ComponentStates:%v", id, resp))
- cluster.datanodes = append(cluster.datanodes, node)
- return node
- }
- func (cluster *MiniClusterV2) AddStreamingNode() {
- cluster.ptmu.Lock()
- defer cluster.ptmu.Unlock()
- node, err := streamingnode.NewServer(cluster.factory)
- if err != nil {
- panic(err)
- }
- runComponent(node)
- cluster.streamingnodes = append(cluster.streamingnodes, node)
- }
- func (cluster *MiniClusterV2) Start() error {
- log.Info("mini cluster start")
- runComponent(cluster.RootCoord)
- runComponent(cluster.DataCoord)
- runComponent(cluster.QueryCoord)
- runComponent(cluster.Proxy)
- runComponent(cluster.DataNode)
- runComponent(cluster.QueryNode)
- runComponent(cluster.IndexNode)
- ctx2, cancel := context.WithTimeout(context.Background(), time.Second*120)
- defer cancel()
- healthy := false
- for !healthy {
- checkHealthResp, _ := cluster.Proxy.CheckHealth(ctx2, &milvuspb.CheckHealthRequest{})
- healthy = checkHealthResp.IsHealthy
- time.Sleep(time.Second * 1)
- }
- if !healthy {
- return errors.New("minicluster is not healthy after 120s")
- }
- if streamingutil.IsStreamingServiceEnabled() {
- runComponent(cluster.StreamingNode)
- }
- port := params.ProxyGrpcServerCfg.Port.GetAsInt()
- var err error
- cluster.clientConn, err = grpc.DialContext(cluster.ctx, fmt.Sprintf("localhost:%d", port), getGrpcDialOpt()...)
- if err != nil {
- return err
- }
- cluster.MilvusClient = milvuspb.NewMilvusServiceClient(cluster.clientConn)
- log.Info("minicluster started")
- return nil
- }
- func (cluster *MiniClusterV2) StopRootCoord() {
- if err := cluster.RootCoord.Stop(); err != nil {
- panic(err)
- }
- cluster.RootCoord = nil
- }
- func (cluster *MiniClusterV2) StartRootCoord() {
- if cluster.RootCoord == nil {
- var err error
- if cluster.RootCoord, err = grpcrootcoord.NewServer(cluster.ctx, cluster.factory); err != nil {
- panic(err)
- }
- runComponent(cluster.RootCoord)
- }
- }
- func (cluster *MiniClusterV2) StopDataCoord() {
- if err := cluster.DataCoord.Stop(); err != nil {
- panic(err)
- }
- cluster.DataCoord = nil
- }
- func (cluster *MiniClusterV2) StartDataCoord() {
- if cluster.DataCoord == nil {
- var err error
- if cluster.DataCoord, err = grpcdatacoord.NewServer(cluster.ctx, cluster.factory); err != nil {
- panic(err)
- }
- runComponent(cluster.DataCoord)
- }
- }
- func (cluster *MiniClusterV2) StopQueryCoord() {
- if err := cluster.QueryCoord.Stop(); err != nil {
- panic(err)
- }
- cluster.QueryCoord = nil
- }
- func (cluster *MiniClusterV2) StartQueryCoord() {
- if cluster.QueryCoord == nil {
- var err error
- if cluster.QueryCoord, err = grpcquerycoord.NewServer(cluster.ctx, cluster.factory); err != nil {
- panic(err)
- }
- runComponent(cluster.QueryCoord)
- }
- }
- func getGrpcDialOpt() []grpc.DialOption {
- return []grpc.DialOption{
- grpc.WithBlock(),
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: 5 * time.Second,
- Timeout: 10 * time.Second,
- PermitWithoutStream: true,
- }),
- grpc.WithConnectParams(grpc.ConnectParams{
- Backoff: backoff.Config{
- BaseDelay: 100 * time.Millisecond,
- Multiplier: 1.6,
- Jitter: 0.2,
- MaxDelay: 3 * time.Second,
- },
- MinConnectTimeout: 3 * time.Second,
- }),
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithChainUnaryInterceptor(grpc_retry.UnaryClientInterceptor(
- grpc_retry.WithMax(6),
- grpc_retry.WithBackoff(func(attempt uint) time.Duration {
- return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
- }),
- grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)),
- ),
- }
- }
- func (cluster *MiniClusterV2) Stop() error {
- log.Info("mini cluster stop")
- if cluster.clientConn != nil {
- cluster.clientConn.Close()
- }
- cluster.RootCoord.Stop()
- log.Info("mini cluster rootCoord stopped")
- cluster.DataCoord.Stop()
- log.Info("mini cluster dataCoord stopped")
- cluster.QueryCoord.Stop()
- log.Info("mini cluster queryCoord stopped")
- cluster.Proxy.Stop()
- log.Info("mini cluster proxy stopped")
- cluster.StopAllDataNodes()
- cluster.StopAllStreamingNodes()
- cluster.StopAllQueryNodes()
- if streamingutil.IsStreamingServiceEnabled() {
- streaming.Release()
- }
- cluster.IndexNode.Stop()
- log.Info("mini cluster indexNode stopped")
- cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
- defer cluster.EtcdCli.Close()
- if cluster.ChunkManager == nil {
- chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
- if err != nil {
- log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
- } else {
- cluster.ChunkManager = chunkManager
- }
- }
- cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
- kvfactory.CloseEtcdClient()
- return nil
- }
- func (cluster *MiniClusterV2) GetAllQueryNodes() []*grpcquerynode.Server {
- ret := make([]*grpcquerynode.Server, 0)
- ret = append(ret, cluster.QueryNode)
- ret = append(ret, cluster.querynodes...)
- return ret
- }
- func (cluster *MiniClusterV2) StopAllQueryNodes() {
- cluster.QueryNode.Stop()
- log.Info("mini cluster main queryNode stopped")
- numExtraQN := len(cluster.querynodes)
- for _, node := range cluster.querynodes {
- node.Stop()
- }
- cluster.querynodes = nil
- log.Info(fmt.Sprintf("mini cluster stopped %d extra querynode", numExtraQN))
- }
- func (cluster *MiniClusterV2) StopAllDataNodes() {
- cluster.DataNode.Stop()
- log.Info("mini cluster main dataNode stopped")
- numExtraDN := len(cluster.datanodes)
- for _, node := range cluster.datanodes {
- node.Stop()
- }
- cluster.datanodes = nil
- log.Info(fmt.Sprintf("mini cluster stopped %d extra datanode", numExtraDN))
- }
- func (cluster *MiniClusterV2) StopAllStreamingNodes() {
- if cluster.StreamingNode != nil {
- cluster.StreamingNode.Stop()
- log.Info("mini cluster main streamingnode stopped")
- }
- for _, node := range cluster.streamingnodes {
- node.Stop()
- }
- log.Info(fmt.Sprintf("mini cluster stopped %d streaming nodes", len(cluster.streamingnodes)))
- cluster.streamingnodes = nil
- }
- func (cluster *MiniClusterV2) GetContext() context.Context {
- return cluster.ctx
- }
- func (cluster *MiniClusterV2) GetFactory() dependency.Factory {
- return cluster.factory
- }
- func (cluster *MiniClusterV2) GetAvailablePorts(n int) ([]int, error) {
- ports := typeutil.NewSet[int]()
- for ports.Len() < n {
- port, err := cluster.GetAvailablePort()
- if err != nil {
- return nil, err
- }
- ports.Insert(port)
- }
- return ports.Collect(), nil
- }
- func (cluster *MiniClusterV2) GetAvailablePort() (int, error) {
- address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0"))
- if err != nil {
- return 0, err
- }
- listener, err := net.ListenTCP("tcp", address)
- if err != nil {
- return 0, err
- }
- defer listener.Close()
- return listener.Addr().(*net.TCPAddr).Port, nil
- }
- func InitReportExtension() *ReportChanExtension {
- e := NewReportChanExtension()
- hookutil.InitOnceHook()
- hookutil.SetTestExtension(e)
- return e
- }
- type ReportChanExtension struct {
- reportChan chan any
- }
- func NewReportChanExtension() *ReportChanExtension {
- return &ReportChanExtension{
- reportChan: make(chan any),
- }
- }
- func (r *ReportChanExtension) Report(info any) int {
- select {
- case r.reportChan <- info:
- default:
- }
- return 1
- }
- func (r *ReportChanExtension) GetReportChan() <-chan any {
- return r.reportChan
- }
- type component interface {
- Prepare() error
- Run() error
- }
- func runComponent(c component) {
- if err := c.Prepare(); err != nil {
- panic(err)
- }
- if err := c.Run(); err != nil {
- panic(err)
- }
- }
|