123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922 |
- // Licensed to the LF AI & Data foundation under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rootcoord
- import (
- "context"
- "fmt"
- "math/rand"
- "os"
- "sync"
- "time"
- "github.com/cockroachdb/errors"
- "github.com/samber/lo"
- "github.com/tikv/client-go/v2/txnkv"
- clientv3 "go.etcd.io/etcd/client/v3"
- "go.uber.org/atomic"
- "go.uber.org/zap"
- "golang.org/x/sync/errgroup"
- "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
- "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
- "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
- "github.com/milvus-io/milvus/internal/allocator"
- etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
- "github.com/milvus-io/milvus/internal/kv/tikv"
- "github.com/milvus-io/milvus/internal/metastore"
- kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
- "github.com/milvus-io/milvus/internal/metastore/model"
- pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
- "github.com/milvus-io/milvus/internal/proto/internalpb"
- "github.com/milvus-io/milvus/internal/proto/proxypb"
- "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
- tso2 "github.com/milvus-io/milvus/internal/tso"
- "github.com/milvus-io/milvus/internal/types"
- "github.com/milvus-io/milvus/internal/util/dependency"
- "github.com/milvus-io/milvus/internal/util/proxyutil"
- "github.com/milvus-io/milvus/internal/util/sessionutil"
- "github.com/milvus-io/milvus/internal/util/streamingutil"
- tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil"
- "github.com/milvus-io/milvus/pkg/common"
- "github.com/milvus-io/milvus/pkg/kv"
- "github.com/milvus-io/milvus/pkg/log"
- "github.com/milvus-io/milvus/pkg/metrics"
- "github.com/milvus-io/milvus/pkg/util"
- "github.com/milvus-io/milvus/pkg/util/commonpbutil"
- "github.com/milvus-io/milvus/pkg/util/crypto"
- "github.com/milvus-io/milvus/pkg/util/expr"
- "github.com/milvus-io/milvus/pkg/util/funcutil"
- "github.com/milvus-io/milvus/pkg/util/logutil"
- "github.com/milvus-io/milvus/pkg/util/merr"
- "github.com/milvus-io/milvus/pkg/util/metricsinfo"
- "github.com/milvus-io/milvus/pkg/util/paramtable"
- "github.com/milvus-io/milvus/pkg/util/retry"
- "github.com/milvus-io/milvus/pkg/util/timerecord"
- "github.com/milvus-io/milvus/pkg/util/tsoutil"
- "github.com/milvus-io/milvus/pkg/util/typeutil"
- )
- // UniqueID is an alias of typeutil.UniqueID.
- type UniqueID = typeutil.UniqueID
- // Timestamp is an alias of typeutil.Timestamp
- type Timestamp = typeutil.Timestamp
- const InvalidCollectionID = UniqueID(0)
- var Params *paramtable.ComponentParam = paramtable.Get()
- type Opt func(*Core)
- type metaKVCreator func() (kv.MetaKv, error)
- // Core root coordinator core
- type Core struct {
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- etcdCli *clientv3.Client
- tikvCli *txnkv.Client
- address string
- meta IMetaTable
- scheduler IScheduler
- broker Broker
- ddlTsLockManager DdlTsLockManager
- garbageCollector GarbageCollector
- stepExecutor StepExecutor
- metaKVCreator metaKVCreator
- proxyCreator proxyutil.ProxyCreator
- proxyWatcher *proxyutil.ProxyWatcher
- proxyClientManager proxyutil.ProxyClientManagerInterface
- metricsCacheManager *metricsinfo.MetricsCacheManager
- chanTimeTick *timetickSync
- idAllocator allocator.Interface
- tsoAllocator tso2.Allocator
- dataCoord types.DataCoordClient
- queryCoord types.QueryCoordClient
- quotaCenter *QuotaCenter
- stateCode atomic.Int32
- initOnce sync.Once
- startOnce sync.Once
- session *sessionutil.Session
- factory dependency.Factory
- enableActiveStandBy bool
- activateFunc func() error
- }
- // --------------------- function --------------------------
- // NewCore creates a new rootcoord core
- func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
- ctx, cancel := context.WithCancel(c)
- rand.Seed(time.Now().UnixNano())
- core := &Core{
- ctx: ctx,
- cancel: cancel,
- factory: factory,
- enableActiveStandBy: Params.RootCoordCfg.EnableActiveStandby.GetAsBool(),
- }
- core.UpdateStateCode(commonpb.StateCode_Abnormal)
- core.SetProxyCreator(proxyutil.DefaultProxyCreator)
- expr.Register("rootcoord", core)
- return core, nil
- }
- // UpdateStateCode update state code
- func (c *Core) UpdateStateCode(code commonpb.StateCode) {
- c.stateCode.Store(int32(code))
- log.Info("update rootcoord state", zap.String("state", code.String()))
- }
- func (c *Core) GetStateCode() commonpb.StateCode {
- return commonpb.StateCode(c.stateCode.Load())
- }
- func (c *Core) sendTimeTick(t Timestamp, reason string) error {
- pc := c.chanTimeTick.listDmlChannels()
- pt := make([]uint64, len(pc))
- for i := 0; i < len(pt); i++ {
- pt[i] = t
- }
- ttMsg := internalpb.ChannelTimeTickMsg{
- Base: commonpbutil.NewMsgBase(
- commonpbutil.WithMsgType(commonpb.MsgType_TimeTick),
- commonpbutil.WithTimeStamp(t),
- commonpbutil.WithSourceID(ddlSourceID),
- ),
- ChannelNames: pc,
- Timestamps: pt,
- DefaultTimestamp: t,
- }
- return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
- }
- func (c *Core) sendMinDdlTsAsTt() {
- if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() {
- return
- }
- code := c.GetStateCode()
- if code != commonpb.StateCode_Healthy {
- log.Warn("rootCoord is not healthy, skip send timetick")
- return
- }
- minBgDdlTs := c.ddlTsLockManager.GetMinDdlTs()
- minNormalDdlTs := c.scheduler.GetMinDdlTs()
- minDdlTs := funcutil.Min(minBgDdlTs, minNormalDdlTs)
- // zero -> ddlTsLockManager and scheduler not started.
- if minDdlTs == typeutil.ZeroTimestamp {
- log.Warn("zero ts was met, this should be only occurred in starting state", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
- return
- }
- // max -> abnormal case, impossible.
- if minDdlTs == typeutil.MaxTimestamp {
- log.Warn("ddl ts is abnormal, max ts was met", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
- return
- }
- if err := c.sendTimeTick(minDdlTs, "timetick loop"); err != nil {
- log.Warn("failed to send timetick", zap.Error(err))
- }
- }
- func (c *Core) startTimeTickLoop() {
- defer c.wg.Done()
- ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
- defer ticker.Stop()
- for {
- select {
- case <-c.ctx.Done():
- log.Info("rootcoord's timetick loop quit!")
- return
- case <-ticker.C:
- c.sendMinDdlTsAsTt()
- }
- }
- }
- func (c *Core) tsLoop() {
- defer c.wg.Done()
- tsoTicker := time.NewTicker(tso2.UpdateTimestampStep)
- defer tsoTicker.Stop()
- ctx, cancel := context.WithCancel(c.ctx)
- defer cancel()
- for {
- select {
- case <-tsoTicker.C:
- if err := c.tsoAllocator.UpdateTSO(); err != nil {
- log.Warn("failed to update tso", zap.Error(err))
- continue
- }
- ts := c.tsoAllocator.GetLastSavedTime()
- metrics.RootCoordTimestampSaved.Set(float64(ts.Unix()))
- case <-ctx.Done():
- log.Info("rootcoord's ts loop quit!")
- return
- }
- }
- }
- func (c *Core) SetProxyCreator(f func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) {
- c.proxyCreator = f
- }
- func (c *Core) SetDataCoordClient(s types.DataCoordClient) error {
- if s == nil {
- return errors.New("null DataCoord interface")
- }
- c.dataCoord = s
- return nil
- }
- func (c *Core) SetQueryCoordClient(s types.QueryCoordClient) error {
- if s == nil {
- return errors.New("null QueryCoord interface")
- }
- c.queryCoord = s
- return nil
- }
- // Register register rootcoord at etcd
- func (c *Core) Register() error {
- c.session.Register()
- afterRegister := func() {
- metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc()
- log.Info("RootCoord Register Finished")
- c.session.LivenessCheck(c.ctx, func() {
- log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
- os.Exit(1)
- })
- }
- if c.enableActiveStandBy {
- go func() {
- if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil {
- log.Warn("failed to activate standby rootcoord server", zap.Error(err))
- panic(err)
- }
- afterRegister()
- }()
- } else {
- afterRegister()
- }
- return nil
- }
- func (c *Core) SetAddress(address string) {
- c.address = address
- }
- // SetEtcdClient sets the etcdCli of Core
- func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) {
- c.etcdCli = etcdClient
- }
- // SetTiKVClient sets the tikvCli of Core
- func (c *Core) SetTiKVClient(client *txnkv.Client) {
- c.tikvCli = client
- }
- func (c *Core) initSession() error {
- c.session = sessionutil.NewSession(c.ctx)
- if c.session == nil {
- return fmt.Errorf("session is nil, the etcd client connection may have failed")
- }
- c.session.Init(typeutil.RootCoordRole, c.address, true, true)
- c.session.SetEnableActiveStandBy(c.enableActiveStandBy)
- return nil
- }
- func (c *Core) initKVCreator() {
- if c.metaKVCreator == nil {
- if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
- c.metaKVCreator = func() (kv.MetaKv, error) {
- return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
- tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
- }
- } else {
- c.metaKVCreator = func() (kv.MetaKv, error) {
- return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
- etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
- }
- }
- }
- }
- func (c *Core) initMetaTable() error {
- fn := func() error {
- var catalog metastore.RootCoordCatalog
- var err error
- switch Params.MetaStoreCfg.MetaStoreType.GetValue() {
- case util.MetaStoreTypeEtcd:
- log.Info("Using etcd as meta storage.")
- var metaKV kv.MetaKv
- var ss *kvmetestore.SuffixSnapshot
- var err error
- if metaKV, err = c.metaKVCreator(); err != nil {
- return err
- }
- if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
- return err
- }
- catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
- case util.MetaStoreTypeTiKV:
- log.Info("Using tikv as meta storage.")
- var metaKV kv.MetaKv
- var ss *kvmetestore.SuffixSnapshot
- var err error
- if metaKV, err = c.metaKVCreator(); err != nil {
- return err
- }
- if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
- return err
- }
- catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
- default:
- return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType.GetValue()))
- }
- if c.meta, err = NewMetaTable(c.ctx, catalog, c.tsoAllocator); err != nil {
- return err
- }
- return nil
- }
- return retry.Do(c.ctx, fn, retry.Attempts(10))
- }
- func (c *Core) initIDAllocator() error {
- var tsoKV kv.TxnKV
- var kvPath string
- if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
- kvPath = Params.TiKVCfg.KvRootPath.GetValue()
- tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, kvPath, globalIDAllocatorSubPath)
- } else {
- kvPath = Params.EtcdCfg.KvRootPath.GetValue()
- tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, kvPath, globalIDAllocatorSubPath)
- }
- idAllocator := allocator.NewGlobalIDAllocator(globalIDAllocatorKey, tsoKV)
- if err := idAllocator.Initialize(); err != nil {
- return err
- }
- c.idAllocator = idAllocator
- log.Info("id allocator initialized",
- zap.String("root_path", kvPath),
- zap.String("sub_path", globalIDAllocatorSubPath),
- zap.String("key", globalIDAllocatorKey))
- return nil
- }
- func (c *Core) initTSOAllocator() error {
- var tsoKV kv.TxnKV
- var kvPath string
- if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
- kvPath = Params.TiKVCfg.KvRootPath.GetValue()
- tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
- } else {
- kvPath = Params.EtcdCfg.KvRootPath.GetValue()
- tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
- }
- tsoAllocator := tso2.NewGlobalTSOAllocator(globalTSOAllocatorKey, tsoKV)
- if err := tsoAllocator.Initialize(); err != nil {
- return err
- }
- c.tsoAllocator = tsoAllocator
- log.Info("tso allocator initialized",
- zap.String("root_path", kvPath),
- zap.String("sub_path", globalIDAllocatorSubPath),
- zap.String("key", globalIDAllocatorKey))
- return nil
- }
- func (c *Core) initInternal() error {
- c.UpdateStateCode(commonpb.StateCode_Initializing)
- c.initKVCreator()
- if err := c.initIDAllocator(); err != nil {
- return err
- }
- if err := c.initTSOAllocator(); err != nil {
- return err
- }
- if err := c.initMetaTable(); err != nil {
- return err
- }
- c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator)
- c.factory.Init(Params)
- chanMap := c.meta.ListCollectionPhysicalChannels()
- c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap)
- log.Info("create TimeTick sync done")
- c.proxyClientManager = proxyutil.NewProxyClientManager(c.proxyCreator)
- c.broker = newServerBroker(c)
- c.ddlTsLockManager = newDdlTsLockManager(c.tsoAllocator)
- c.garbageCollector = newBgGarbageCollector(c)
- c.stepExecutor = newBgStepExecutor(c.ctx)
- c.proxyWatcher = proxyutil.NewProxyWatcher(
- c.etcdCli,
- c.chanTimeTick.initSessions,
- c.proxyClientManager.AddProxyClients,
- )
- c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
- c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
- log.Info("init proxy manager done")
- c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
- c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta)
- log.Debug("RootCoord init QuotaCenter done")
- if err := c.initCredentials(); err != nil {
- return err
- }
- log.Info("init credentials done")
- if err := c.initRbac(); err != nil {
- return err
- }
- log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
- return nil
- }
- // Init initialize routine
- func (c *Core) Init() error {
- var initError error
- c.factory.Init(Params)
- if err := c.initSession(); err != nil {
- return err
- }
- if c.enableActiveStandBy {
- c.activateFunc = func() error {
- log.Info("RootCoord switch from standby to active, activating")
- var err error
- c.initOnce.Do(func() {
- if err = c.initInternal(); err != nil {
- log.Error("RootCoord init failed", zap.Error(err))
- }
- })
- if err != nil {
- return err
- }
- c.startOnce.Do(func() {
- if err = c.startInternal(); err != nil {
- log.Error("RootCoord start failed", zap.Error(err))
- }
- })
- log.Info("RootCoord startup success", zap.String("address", c.session.Address))
- return err
- }
- c.UpdateStateCode(commonpb.StateCode_StandBy)
- log.Info("RootCoord enter standby mode successfully")
- } else {
- c.initOnce.Do(func() {
- initError = c.initInternal()
- })
- }
- return initError
- }
- func (c *Core) initCredentials() error {
- credInfo, _ := c.meta.GetCredential(util.UserRoot)
- if credInfo == nil {
- log.Debug("RootCoord init user root")
- encryptedRootPassword, _ := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue())
- err := c.meta.AddCredential(&internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
- return err
- }
- return nil
- }
- func (c *Core) initRbac() error {
- var err error
- // create default roles, including admin, public
- for _, role := range util.DefaultRoles {
- err = c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
- if err != nil && !common.IsIgnorableError(err) {
- return errors.Wrap(err, "failed to create role")
- }
- }
- if Params.ProxyCfg.EnablePublicPrivilege.GetAsBool() {
- err = c.initPublicRolePrivilege()
- if err != nil {
- return err
- }
- }
- if Params.RoleCfg.Enabled.GetAsBool() {
- return c.initBuiltinRoles()
- }
- return nil
- }
- func (c *Core) initPublicRolePrivilege() error {
- // grant privileges for the public role
- globalPrivileges := []string{
- commonpb.ObjectPrivilege_PrivilegeDescribeCollection.String(),
- }
- collectionPrivileges := []string{
- commonpb.ObjectPrivilege_PrivilegeIndexDetail.String(),
- }
- var err error
- for _, globalPrivilege := range globalPrivileges {
- err = c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
- Role: &milvuspb.RoleEntity{Name: util.RolePublic},
- Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()},
- ObjectName: util.AnyWord,
- DbName: util.AnyWord,
- Grantor: &milvuspb.GrantorEntity{
- User: &milvuspb.UserEntity{Name: util.UserRoot},
- Privilege: &milvuspb.PrivilegeEntity{Name: globalPrivilege},
- },
- }, milvuspb.OperatePrivilegeType_Grant)
- if err != nil && !common.IsIgnorableError(err) {
- return errors.Wrap(err, "failed to grant global privilege")
- }
- }
- for _, collectionPrivilege := range collectionPrivileges {
- err = c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
- Role: &milvuspb.RoleEntity{Name: util.RolePublic},
- Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Collection.String()},
- ObjectName: util.AnyWord,
- DbName: util.AnyWord,
- Grantor: &milvuspb.GrantorEntity{
- User: &milvuspb.UserEntity{Name: util.UserRoot},
- Privilege: &milvuspb.PrivilegeEntity{Name: collectionPrivilege},
- },
- }, milvuspb.OperatePrivilegeType_Grant)
- if err != nil && !common.IsIgnorableError(err) {
- return errors.Wrap(err, "failed to grant collection privilege")
- }
- }
- return nil
- }
- func (c *Core) initBuiltinRoles() error {
- rolePrivilegesMap := Params.RoleCfg.Roles.GetAsRoleDetails()
- for role, privilegesJSON := range rolePrivilegesMap {
- err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
- if err != nil && !common.IsIgnorableError(err) {
- log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err))
- return errors.Wrapf(err, "failed to create a builtin role: %s", role)
- }
- for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] {
- privilegeName := privilege[util.RoleConfigPrivilege]
- if !util.IsAnyWord(privilege[util.RoleConfigPrivilege]) {
- privilegeName = util.PrivilegeNameForMetastore(privilege[util.RoleConfigPrivilege])
- }
- err := c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
- Role: &milvuspb.RoleEntity{Name: role},
- Object: &milvuspb.ObjectEntity{Name: privilege[util.RoleConfigObjectType]},
- ObjectName: privilege[util.RoleConfigObjectName],
- DbName: privilege[util.RoleConfigDBName],
- Grantor: &milvuspb.GrantorEntity{
- User: &milvuspb.UserEntity{Name: util.UserRoot},
- Privilege: &milvuspb.PrivilegeEntity{Name: privilegeName},
- },
- }, milvuspb.OperatePrivilegeType_Grant)
- if err != nil && !common.IsIgnorableError(err) {
- log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err))
- return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role)
- }
- }
- util.BuiltinRoles = append(util.BuiltinRoles, role)
- log.Info("init a builtin role successfully", zap.String("roleName", role))
- }
- return nil
- }
- func (c *Core) restore(ctx context.Context) error {
- dbs, err := c.meta.ListDatabases(ctx, typeutil.MaxTimestamp)
- if err != nil {
- return err
- }
- for _, db := range dbs {
- colls, err := c.meta.ListCollections(ctx, db.Name, typeutil.MaxTimestamp, false)
- if err != nil {
- return err
- }
- for _, coll := range colls {
- ts, err := c.tsoAllocator.GenerateTSO(1)
- if err != nil {
- return err
- }
- if coll.Available() {
- for _, part := range coll.Partitions {
- switch part.State {
- case pb.PartitionState_PartitionDropping:
- go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, coll.VirtualChannelNames, part.Clone(), ts)
- case pb.PartitionState_PartitionCreating:
- go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts)
- default:
- }
- }
- } else {
- switch coll.State {
- case pb.CollectionState_CollectionDropping:
- go c.garbageCollector.ReDropCollection(coll.Clone(), ts)
- case pb.CollectionState_CollectionCreating:
- go c.garbageCollector.RemoveCreatingCollection(coll.Clone())
- default:
- }
- }
- }
- }
- return nil
- }
- func (c *Core) startInternal() error {
- if err := c.proxyWatcher.WatchProxy(c.ctx); err != nil {
- log.Fatal("rootcoord failed to watch proxy", zap.Error(err))
- // you can not just stuck here,
- panic(err)
- }
- if err := c.restore(c.ctx); err != nil {
- panic(err)
- }
- if Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
- c.quotaCenter.Start()
- }
- c.scheduler.Start()
- c.stepExecutor.Start()
- go func() {
- // refresh rbac cache
- if err := retry.Do(c.ctx, func() error {
- if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: int32(typeutil.CacheRefresh),
- }); err != nil {
- log.RatedWarn(60, "fail to refresh policy info cache", zap.Error(err))
- return err
- }
- return nil
- }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
- log.Warn("fail to refresh policy info cache", zap.Error(err))
- }
- }()
- c.startServerLoop()
- c.UpdateStateCode(commonpb.StateCode_Healthy)
- sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
- logutil.Logger(c.ctx).Info("rootcoord startup successfully")
- return nil
- }
- func (c *Core) startServerLoop() {
- c.wg.Add(2)
- go c.startTimeTickLoop()
- go c.tsLoop()
- if !streamingutil.IsStreamingServiceEnabled() {
- c.wg.Add(1)
- go c.chanTimeTick.startWatch(&c.wg)
- }
- }
- // Start starts RootCoord.
- func (c *Core) Start() error {
- var err error
- if !c.enableActiveStandBy {
- c.startOnce.Do(func() {
- err = c.startInternal()
- })
- }
- return err
- }
- func (c *Core) stopExecutor() {
- if c.stepExecutor != nil {
- c.stepExecutor.Stop()
- log.Info("stop rootcoord executor")
- }
- }
- func (c *Core) stopScheduler() {
- if c.scheduler != nil {
- c.scheduler.Stop()
- log.Info("stop rootcoord scheduler")
- }
- }
- func (c *Core) cancelIfNotNil() {
- if c.cancel != nil {
- c.cancel()
- log.Info("cancel rootcoord goroutines")
- }
- }
- func (c *Core) revokeSession() {
- if c.session != nil {
- // wait at most one second to revoke
- c.session.Stop()
- log.Info("rootcoord session stop")
- }
- }
- // Stop stops rootCoord.
- func (c *Core) Stop() error {
- c.UpdateStateCode(commonpb.StateCode_Abnormal)
- c.stopExecutor()
- c.stopScheduler()
- if c.proxyWatcher != nil {
- c.proxyWatcher.Stop()
- }
- if c.quotaCenter != nil {
- c.quotaCenter.stop()
- }
- c.revokeSession()
- c.cancelIfNotNil()
- c.wg.Wait()
- return nil
- }
- // GetComponentStates get states of components
- func (c *Core) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
- code := c.GetStateCode()
- log.Debug("RootCoord current state", zap.String("StateCode", code.String()))
- nodeID := common.NotRegisteredID
- if c.session != nil && c.session.Registered() {
- nodeID = c.session.ServerID
- }
- return &milvuspb.ComponentStates{
- State: &milvuspb.ComponentInfo{
- // NodeID: c.session.ServerID, // will race with Core.Register()
- NodeID: nodeID,
- Role: typeutil.RootCoordRole,
- StateCode: code,
- ExtraInfo: nil,
- },
- Status: merr.Success(),
- SubcomponentStates: []*milvuspb.ComponentInfo{
- {
- NodeID: nodeID,
- Role: typeutil.RootCoordRole,
- StateCode: code,
- ExtraInfo: nil,
- },
- },
- }, nil
- }
- // GetTimeTickChannel get timetick channel name
- func (c *Core) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
- return &milvuspb.StringResponse{
- Status: merr.Success(),
- Value: Params.CommonCfg.RootCoordTimeTick.GetValue(),
- }, nil
- }
- // GetStatisticsChannel get statistics channel name
- func (c *Core) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
- return &milvuspb.StringResponse{
- Status: merr.Success(),
- Value: Params.CommonCfg.RootCoordStatistics.GetValue(),
- }, nil
- }
- func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- method := "CreateDatabase"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("CreateDatabase")
- log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole),
- zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
- t := &createDatabaseTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to create database",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to create database",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("dbName", in.GetDbName()),
- zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole),
- zap.String("dbName", in.GetDbName()),
- zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- method := "DropDatabase"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DropDatabase")
- log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole),
- zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
- t := &dropDatabaseTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to drop database", zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("dbName", in.GetDbName()),
- zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.CleanupRootCoordDBMetrics(in.GetDbName())
- log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
- zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- func (c *Core) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- ret := &milvuspb.ListDatabasesResponse{Status: merr.Status(err)}
- return ret, nil
- }
- method := "ListDatabases"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("ListDatabases")
- log := log.Ctx(ctx).With(zap.Int64("msgID", in.GetBase().GetMsgID()))
- log.Info("received request to list databases")
- t := &listDatabaseTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Resp: &milvuspb.ListDatabasesResponse{},
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to list databases", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return &milvuspb.ListDatabasesResponse{
- Status: merr.Status(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Info("failed to list databases", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return &milvuspb.ListDatabasesResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Info("done to list databases", zap.Int("num of databases", len(t.Resp.GetDbNames())))
- return t.Resp, nil
- }
- // CreateCollection create collection
- func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("CreateCollection")
- log.Ctx(ctx).Info("received request to create collection",
- zap.String("dbName", in.GetDbName()),
- zap.String("name", in.GetCollectionName()),
- zap.String("role", typeutil.RootCoordRole))
- t := &createCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to create collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to create collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateCollection").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to create collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // DropCollection drop collection
- func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DropCollection")
- log.Ctx(ctx).Info("received request to drop collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("dbName", in.GetDbName()),
- zap.String("name", in.GetCollectionName()))
- t := &dropCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to drop collection", zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to drop collection", zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropCollection").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // HasCollection check collection existence
- func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("HasCollection")
- ts := getTravelTs(in)
- log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
- zap.Uint64("ts", ts))
- t := &hasCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Rsp: &milvuspb.BoolResponse{},
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to has collection", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Info("failed to has collection", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasCollection").Observe(float64(t.queueDur.Milliseconds()))
- return t.Rsp, nil
- }
- func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
- ts := getTravelTs(in)
- if in.GetCollectionName() != "" {
- return c.meta.GetCollectionByName(ctx, in.GetDbName(), in.GetCollectionName(), ts)
- }
- return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable)
- }
- func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName string) *milvuspb.DescribeCollectionResponse {
- resp := &milvuspb.DescribeCollectionResponse{
- Status: merr.Success(),
- DbName: dbName,
- }
- resp.Schema = &schemapb.CollectionSchema{
- Name: collInfo.Name,
- Description: collInfo.Description,
- AutoID: collInfo.AutoID,
- Fields: model.MarshalFieldModels(collInfo.Fields),
- Functions: model.MarshalFunctionModels(collInfo.Functions),
- EnableDynamicField: collInfo.EnableDynamicField,
- }
- resp.CollectionID = collInfo.CollectionID
- resp.VirtualChannelNames = collInfo.VirtualChannelNames
- resp.PhysicalChannelNames = collInfo.PhysicalChannelNames
- if collInfo.ShardsNum == 0 {
- collInfo.ShardsNum = int32(len(collInfo.VirtualChannelNames))
- }
- resp.ShardsNum = collInfo.ShardsNum
- resp.ConsistencyLevel = collInfo.ConsistencyLevel
- resp.CreatedTimestamp = collInfo.CreateTime
- createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime)
- resp.CreatedUtcTimestamp = uint64(createdPhysicalTime)
- resp.Aliases = aliases
- resp.StartPositions = collInfo.StartPositions
- resp.CollectionName = resp.Schema.Name
- resp.Properties = collInfo.Properties
- resp.NumPartitions = int64(len(collInfo.Partitions))
- resp.DbId = collInfo.DBID
- return resp
- }
- func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.DescribeCollectionResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DescribeCollection")
- ts := getTravelTs(in)
- log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
- zap.String("dbName", in.GetDbName()),
- zap.Int64("id", in.GetCollectionID()),
- zap.Uint64("ts", ts),
- zap.Bool("allowUnavailable", allowUnavailable))
- t := &describeCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Rsp: &milvuspb.DescribeCollectionResponse{Status: merr.Success()},
- allowUnavailable: allowUnavailable,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to describe collection", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
- return &milvuspb.DescribeCollectionResponse{
- Status: merr.Status(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Warn("failed to describe collection", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
- return &milvuspb.DescribeCollectionResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DescribeCollection").Observe(float64(t.queueDur.Milliseconds()))
- return t.Rsp, nil
- }
- // DescribeCollection return collection info
- func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
- return c.describeCollectionImpl(ctx, in, false)
- }
- // DescribeCollectionInternal same to DescribeCollection, but will return unavailable collections and
- // only used in internal RPC.
- // When query cluster tried to do recovery, it'll be healthy until all collections' targets were recovered,
- // so during this time, releasing request generated by rootcoord's recovery won't succeed. So in theory, rootcoord goes
- // to be healthy, querycoord recovers all collections' targets, and then querycoord serves the releasing request sent
- // by rootcoord, eventually, the dropping collections will be released.
- func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
- return c.describeCollectionImpl(ctx, in, true)
- }
- // ShowCollections list all collection names
- func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.ShowCollectionsResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("ShowCollections")
- ts := getTravelTs(in)
- log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()),
- zap.Uint64("ts", ts))
- t := &showCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Rsp: &milvuspb.ShowCollectionsResponse{},
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to show collections", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
- return &milvuspb.ShowCollectionsResponse{
- Status: merr.Status(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Info("failed to show collections", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
- return &milvuspb.ShowCollectionsResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowCollections").Observe(float64(t.queueDur.Milliseconds()))
- return t.Rsp, nil
- }
- func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("AlterCollection")
- log.Ctx(ctx).Info("received request to alter collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetCollectionName()),
- zap.Any("props", in.Properties))
- t := &alterCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Warn("failed to enqueue request to alter collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Warn("failed to alter collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds()))
- log.Info("done to alter collection",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- method := "AlterDatabase"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- log.Ctx(ctx).Info("received request to alter database",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetDbName()),
- zap.Any("props", in.Properties))
- t := &alterDatabaseTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Warn("failed to enqueue request to alter database",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetDbName()),
- zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Warn("failed to alter database",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("name", in.GetDbName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to alter database",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("name", in.GetDbName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // CreatePartition create partition
- func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("CreatePartition")
- log.Ctx(ctx).Info("received request to create partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()))
- t := &createPartitionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to create partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to create partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreatePartition").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to create partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // DropPartition drop partition
- func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DropPartition")
- log.Ctx(ctx).Info("received request to drop partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()))
- t := &dropPartitionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to drop partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to drop partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropPartition").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to drop partition",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // HasPartition check partition existence
- func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("HasPartition")
- // TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do.
- ts := typeutil.MaxTimestamp
- log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
- zap.String("partition", in.GetPartitionName()),
- zap.Uint64("ts", ts))
- t := &hasPartitionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Rsp: &milvuspb.BoolResponse{},
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to has partition", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Info("failed to has partition", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
- return &milvuspb.BoolResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasPartition").Observe(float64(t.queueDur.Milliseconds()))
- return t.Rsp, nil
- }
- func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.ShowPartitionsResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("ShowPartitions")
- log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
- zap.Int64("collection_id", in.GetCollectionID()),
- zap.Strings("partitions", in.GetPartitionNames()),
- zap.Bool("allowUnavailable", allowUnavailable))
- t := &showPartitionTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- Rsp: &milvuspb.ShowPartitionsResponse{},
- allowUnavailable: allowUnavailable,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Info("failed to enqueue request to show partitions", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
- return &milvuspb.ShowPartitionsResponse{
- Status: merr.Status(err),
- // Status: common.StatusFromError(err),
- }, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Info("failed to show partitions", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
- return &milvuspb.ShowPartitionsResponse{
- Status: merr.Status(err),
- // Status: common.StatusFromError(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowPartitions").Observe(float64(t.queueDur.Milliseconds()))
- return t.Rsp, nil
- }
- // ShowPartitions list all partition names
- func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
- return c.showPartitionsImpl(ctx, in, false)
- }
- // ShowPartitionsInternal same to ShowPartitions, only used in internal RPC.
- func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
- return c.showPartitionsImpl(ctx, in, true)
- }
- // ShowSegments list all segments
- func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
- // ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time.
- // Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments.
- return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil
- }
- // GetPChannelInfo get pchannel info.
- func (c *Core) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &rootcoordpb.GetPChannelInfoResponse{
- Status: merr.Status(err),
- }, nil
- }
- return c.meta.GetPChannelInfo(in.GetPchannel()), nil
- }
- // AllocTimestamp alloc timestamp
- func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &rootcoordpb.AllocTimestampResponse{
- Status: merr.Status(err),
- }, nil
- }
- ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
- if err != nil {
- log.Ctx(ctx).Error("failed to allocate timestamp", zap.String("role", typeutil.RootCoordRole),
- zap.Error(err))
- return &rootcoordpb.AllocTimestampResponse{
- Status: merr.Status(err),
- }, nil
- }
- // return first available timestamp
- ts = ts - uint64(in.GetCount()) + 1
- metrics.RootCoordTimestamp.Set(float64(ts))
- return &rootcoordpb.AllocTimestampResponse{
- Status: merr.Success(),
- Timestamp: ts,
- Count: in.GetCount(),
- }, nil
- }
- // AllocID alloc ids
- func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &rootcoordpb.AllocIDResponse{
- Status: merr.Status(err),
- }, nil
- }
- start, _, err := c.idAllocator.Alloc(in.Count)
- if err != nil {
- log.Ctx(ctx).Error("failed to allocate id",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err))
- return &rootcoordpb.AllocIDResponse{
- Status: merr.Status(err),
- Count: in.Count,
- }, nil
- }
- metrics.RootCoordIDAllocCounter.Add(float64(in.Count))
- return &rootcoordpb.AllocIDResponse{
- Status: merr.Success(),
- ID: start,
- Count: in.Count,
- }, nil
- }
- // UpdateChannelTimeTick used to handle ChannelTimeTickMsg
- func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
- log := log.Ctx(ctx)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Error(err))
- return merr.Status(err), nil
- }
- if in.Base.MsgType != commonpb.MsgType_TimeTick {
- log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType))
- return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil
- }
- err := c.chanTimeTick.updateTimeTick(in, "gRPC")
- if err != nil {
- log.Warn("failed to updateTimeTick",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err))
- return merr.Status(err), nil
- }
- return merr.Success(), nil
- }
- // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
- func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in)
- if err != nil {
- return merr.Status(err), nil
- }
- return merr.Success(), nil
- }
- // ShowConfigurations returns the configurations of RootCoord matching req.Pattern
- func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &internalpb.ShowConfigurationsResponse{
- Status: merr.Status(err),
- Configuations: nil,
- }, nil
- }
- configList := make([]*commonpb.KeyValuePair, 0)
- for key, value := range Params.GetComponentConfigurations("rootcoord", req.Pattern) {
- configList = append(configList,
- &commonpb.KeyValuePair{
- Key: key,
- Value: value,
- })
- }
- return &internalpb.ShowConfigurationsResponse{
- Status: merr.Success(),
- Configuations: configList,
- }, nil
- }
- // GetMetrics get metrics
- func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
- if err := merr.CheckHealthyStandby(c.GetStateCode()); err != nil {
- return &milvuspb.GetMetricsResponse{
- Status: merr.Status(err),
- Response: "",
- }, nil
- }
- metricType, err := metricsinfo.ParseMetricType(in.Request)
- if err != nil {
- log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole),
- zap.Int64("nodeID", c.session.ServerID), zap.String("req", in.Request), zap.Error(err))
- return &milvuspb.GetMetricsResponse{
- Status: merr.Status(err),
- Response: "",
- }, nil
- }
- if metricType == metricsinfo.SystemInfoMetrics {
- metrics, err := c.metricsCacheManager.GetSystemInfoMetrics()
- if err != nil {
- metrics, err = c.getSystemInfoMetrics(ctx, in)
- }
- if err != nil {
- log.Warn("GetSystemInfoMetrics failed",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("metricType", metricType),
- zap.Error(err))
- return &milvuspb.GetMetricsResponse{
- Status: merr.Status(err),
- Response: "",
- }, nil
- }
- c.metricsCacheManager.UpdateSystemInfoMetrics(metrics)
- return metrics, err
- }
- log.RatedWarn(60, "GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole),
- zap.String("metricType", metricType))
- return &milvuspb.GetMetricsResponse{
- Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
- Response: "",
- }, nil
- }
- // CreateAlias create collection alias
- func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("CreateAlias")
- log.Ctx(ctx).Info("received request to create alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()))
- t := &createAliasTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to create alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to create alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to create alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // DropAlias drop collection alias
- func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DropAlias")
- log.Ctx(ctx).Info("received request to drop alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()))
- t := &dropAliasTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to drop alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to drop alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds()))
- log.Ctx(ctx).Info("done to drop alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // AlterAlias alter collection alias
- func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("AlterAlias")
- log.Ctx(ctx).Info("received request to alter alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()))
- t := &alterAliasTask{
- baseTask: newBaseTask(ctx, c),
- Req: in,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Ctx(ctx).Info("failed to enqueue request to alter alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Ctx(ctx).Info("failed to alter alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.Error(err),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds()))
- log.Info("done to alter alias",
- zap.String("role", typeutil.RootCoordRole),
- zap.String("alias", in.GetAlias()),
- zap.String("collection", in.GetCollectionName()),
- zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- // DescribeAlias describe collection alias
- func (c *Core) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.DescribeAliasResponse{
- Status: merr.Status(err),
- }, nil
- }
- log := log.Ctx(ctx).With(
- zap.String("role", typeutil.RootCoordRole),
- zap.String("db", in.GetDbName()),
- zap.String("alias", in.GetAlias()))
- method := "DescribeAlias"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DescribeAlias")
- log.Info("received request to describe alias")
- if in.GetAlias() == "" {
- return &milvuspb.DescribeAliasResponse{
- Status: merr.Status(merr.WrapErrParameterMissing("alias", "no input alias")),
- }, nil
- }
- collectionName, err := c.meta.DescribeAlias(ctx, in.GetDbName(), in.GetAlias(), 0)
- if err != nil {
- log.Warn("fail to DescribeAlias", zap.Error(err))
- return &milvuspb.DescribeAliasResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Info("done to describe alias")
- return &milvuspb.DescribeAliasResponse{
- Status: merr.Status(nil),
- DbName: in.GetDbName(),
- Alias: in.GetAlias(),
- Collection: collectionName,
- }, nil
- }
- // ListAliases list aliases
- func (c *Core) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.ListAliasesResponse{
- Status: merr.Status(err),
- }, nil
- }
- method := "ListAliases"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- log := log.Ctx(ctx).With(
- zap.String("role", typeutil.RootCoordRole),
- zap.String("db", in.GetDbName()),
- zap.String("collectionName", in.GetCollectionName()))
- log.Info("received request to list aliases")
- aliases, err := c.meta.ListAliases(ctx, in.GetDbName(), in.GetCollectionName(), 0)
- if err != nil {
- log.Warn("fail to ListAliases", zap.Error(err))
- return &milvuspb.ListAliasesResponse{
- Status: merr.Status(err),
- }, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Info("done to list aliases")
- return &milvuspb.ListAliasesResponse{
- Status: merr.Status(nil),
- DbName: in.GetDbName(),
- CollectionName: in.GetCollectionName(),
- Aliases: aliases,
- }, nil
- }
- // ExpireCredCache will call invalidate credential cache
- func (c *Core) ExpireCredCache(ctx context.Context, username string) error {
- req := proxypb.InvalidateCredCacheRequest{
- Base: commonpbutil.NewMsgBase(
- commonpbutil.WithSourceID(c.session.ServerID),
- ),
- Username: username,
- }
- return c.proxyClientManager.InvalidateCredentialCache(ctx, &req)
- }
- // UpdateCredCache will call update credential cache
- func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error {
- req := proxypb.UpdateCredCacheRequest{
- Base: commonpbutil.NewMsgBase(
- commonpbutil.WithSourceID(c.session.ServerID),
- ),
- Username: credInfo.Username,
- Password: credInfo.Sha256Password,
- }
- return c.proxyClientManager.UpdateCredentialCache(ctx, &req)
- }
- // CreateCredential create new user and password
- // 1. decode ciphertext password to raw password
- // 2. encrypt raw password
- // 3. save in to etcd
- func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
- method := "CreateCredential"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- // insert to db
- err := c.meta.AddCredential(credInfo)
- if err != nil {
- ctxLog.Warn("CreateCredential save credential failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateCredentialFailure), nil
- }
- // update proxy's local cache
- err = c.UpdateCredCache(ctx, credInfo)
- if err != nil {
- ctxLog.Warn("CreateCredential add cache failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- }
- log.Debug("CreateCredential success", zap.String("role", typeutil.RootCoordRole),
- zap.String("username", credInfo.Username))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordNumOfCredentials.Inc()
- return merr.Success(), nil
- }
- // GetCredential get credential by username
- func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
- method := "GetCredential"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &rootcoordpb.GetCredentialResponse{Status: merr.Status(err)}, nil
- }
- credInfo, err := c.meta.GetCredential(in.Username)
- if err != nil {
- ctxLog.Warn("GetCredential query credential failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return &rootcoordpb.GetCredentialResponse{
- Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_GetCredentialFailure),
- }, nil
- }
- ctxLog.Debug("GetCredential success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &rootcoordpb.GetCredentialResponse{
- Status: merr.Success(),
- Username: credInfo.Username,
- Password: credInfo.EncryptedPassword,
- }, nil
- }
- // UpdateCredential update password for a user
- func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
- method := "UpdateCredential"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- // update data on storage
- err := c.meta.AlterCredential(credInfo)
- if err != nil {
- ctxLog.Warn("UpdateCredential save credential failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
- }
- // update proxy's local cache
- err = c.UpdateCredCache(ctx, credInfo)
- if err != nil {
- ctxLog.Warn("UpdateCredential update cache failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
- }
- log.Debug("UpdateCredential success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return merr.Success(), nil
- }
- // DeleteCredential delete a user
- func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
- method := "DeleteCredential"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- var status *commonpb.Status
- defer func() {
- if status.Code != 0 {
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- }
- }()
- redoTask := newBaseRedoTask(c.stepExecutor)
- redoTask.AddSyncStep(NewSimpleStep("delete credential meta data", func(ctx context.Context) ([]nestedStep, error) {
- err := c.meta.DeleteCredential(in.Username)
- if err != nil {
- ctxLog.Warn("delete credential meta data failed", zap.Error(err))
- }
- return nil, err
- }))
- redoTask.AddAsyncStep(NewSimpleStep("delete credential cache", func(ctx context.Context) ([]nestedStep, error) {
- err := c.ExpireCredCache(ctx, in.Username)
- if err != nil {
- ctxLog.Warn("delete credential cache failed", zap.Error(err))
- }
- return nil, err
- }))
- redoTask.AddAsyncStep(NewSimpleStep("delete user role cache for the user", func(ctx context.Context) ([]nestedStep, error) {
- err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: int32(typeutil.CacheDeleteUser),
- OpKey: in.Username,
- })
- if err != nil {
- ctxLog.Warn("delete user role cache failed for the user", zap.Error(err))
- }
- return nil, err
- }))
- err := redoTask.Execute(ctx)
- if err != nil {
- errMsg := "fail to execute task when deleting the user"
- ctxLog.Warn(errMsg, zap.Error(err))
- status = merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DeleteCredentialFailure)
- return status, nil
- }
- ctxLog.Debug("DeleteCredential success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordNumOfCredentials.Dec()
- status = merr.Success()
- return status, nil
- }
- // ListCredUsers list all usernames
- func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
- method := "ListCredUsers"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.ListCredUsersResponse{Status: merr.Status(err)}, nil
- }
- credInfo, err := c.meta.ListCredentialUsernames()
- if err != nil {
- ctxLog.Warn("ListCredUsers query usernames failed", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
- status := merr.Status(err)
- return &milvuspb.ListCredUsersResponse{Status: status}, nil
- }
- ctxLog.Debug("ListCredUsers success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &milvuspb.ListCredUsersResponse{
- Status: merr.Success(),
- Usernames: credInfo.Usernames,
- }, nil
- }
- // CreateRole create role
- // - check the node health
- // - check if the role is existed
- // - check if the role num has reached the limit
- // - create the role by the meta api
- func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
- method := "CreateRole"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method + " begin")
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- entity := in.Entity
- err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name})
- if err != nil {
- errMsg := "fail to create role"
- ctxLog.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateRoleFailure), nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordNumOfRoles.Inc()
- return merr.Success(), nil
- }
- // DropRole drop role
- // - check the node health
- // - check if the role name is existed
- // - check if the role has some grant info
- // - get all role mapping of this role
- // - drop these role mappings
- // - drop the role by the meta api
- func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
- method := "DropRole"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("role_name", in.RoleName))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- for util.IsBuiltinRole(in.GetRoleName()) {
- err := merr.WrapErrPrivilegeNotPermitted("the role[%s] is a builtin role, which can't be dropped", in.GetRoleName())
- return merr.Status(err), nil
- }
- if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
- errMsg := "not found the role, maybe the role isn't existed or internal system error"
- ctxLog.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
- }
- if !in.ForceDrop {
- grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, &milvuspb.GrantEntity{
- Role: &milvuspb.RoleEntity{Name: in.RoleName},
- })
- if len(grantEntities) != 0 {
- errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges"
- ctxLog.Warn(errMsg, zap.Any("grants", grantEntities), zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
- }
- }
- redoTask := newBaseRedoTask(c.stepExecutor)
- redoTask.AddSyncStep(NewSimpleStep("drop role meta data", func(ctx context.Context) ([]nestedStep, error) {
- err := c.meta.DropRole(util.DefaultTenant, in.RoleName)
- if err != nil {
- ctxLog.Warn("drop role mata data failed", zap.Error(err))
- }
- return nil, err
- }))
- redoTask.AddAsyncStep(NewSimpleStep("drop the privilege list of this role", func(ctx context.Context) ([]nestedStep, error) {
- if !in.ForceDrop {
- return nil, nil
- }
- err := c.meta.DropGrant(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName})
- if err != nil {
- ctxLog.Warn("drop the privilege list failed for the role", zap.Error(err))
- }
- return nil, err
- }))
- redoTask.AddAsyncStep(NewSimpleStep("drop role cache", func(ctx context.Context) ([]nestedStep, error) {
- err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: int32(typeutil.CacheDropRole),
- OpKey: in.RoleName,
- })
- if err != nil {
- ctxLog.Warn("delete user role cache failed for the role", zap.Error(err))
- }
- return nil, err
- }))
- err := redoTask.Execute(ctx)
- if err != nil {
- errMsg := "fail to execute task when dropping the role"
- ctxLog.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
- }
- ctxLog.Debug(method+" success", zap.String("role_name", in.RoleName))
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- metrics.RootCoordNumOfRoles.Dec()
- return merr.Success(), nil
- }
- // OperateUserRole operate the relationship between a user and a role
- // - check the node health
- // - check if the role is valid
- // - check if the user is valid
- // - operate the user-role by the meta api
- // - update the policy cache
- func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
- method := "OperateUserRole-" + in.Type.String()
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
- errMsg := "not found the role, maybe the role isn't existed or internal system error"
- ctxLog.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
- }
- if in.Type != milvuspb.OperateUserRoleType_RemoveUserFromRole {
- if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, false); err != nil {
- errMsg := "not found the user, maybe the user isn't existed or internal system error"
- ctxLog.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
- }
- }
- redoTask := newBaseRedoTask(c.stepExecutor)
- redoTask.AddSyncStep(NewSimpleStep("operate user role meta data", func(ctx context.Context) ([]nestedStep, error) {
- err := c.meta.OperateUserRole(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, &milvuspb.RoleEntity{Name: in.RoleName}, in.Type)
- if err != nil && !common.IsIgnorableError(err) {
- log.Warn("operate user role mata data failed", zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- redoTask.AddAsyncStep(NewSimpleStep("operate user role cache", func(ctx context.Context) ([]nestedStep, error) {
- var opType int32
- switch in.Type {
- case milvuspb.OperateUserRoleType_AddUserToRole:
- opType = int32(typeutil.CacheAddUserToRole)
- case milvuspb.OperateUserRoleType_RemoveUserFromRole:
- opType = int32(typeutil.CacheRemoveUserFromRole)
- default:
- errMsg := "invalid operate type for the OperateUserRole api"
- log.Warn(errMsg, zap.Any("in", in))
- return nil, nil
- }
- if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: opType,
- OpKey: funcutil.EncodeUserRoleCache(in.Username, in.RoleName),
- }); err != nil {
- log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- err := redoTask.Execute(ctx)
- if err != nil {
- errMsg := "fail to execute task when operate the user and role"
- log.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return merr.Success(), nil
- }
- // SelectRole select role
- // - check the node health
- // - check if the role is valid when this param is provided
- // - select role by the meta api
- func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
- method := "SelectRole"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.SelectRoleResponse{Status: merr.Status(err)}, nil
- }
- if in.Role != nil {
- if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil {
- if errors.Is(err, merr.ErrIoKeyNotFound) {
- return &milvuspb.SelectRoleResponse{
- Status: merr.Success(),
- }, nil
- }
- errMsg := "fail to select the role to check the role name"
- ctxLog.Warn(errMsg, zap.Error(err))
- return &milvuspb.SelectRoleResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
- }, nil
- }
- }
- roleResults, err := c.meta.SelectRole(util.DefaultTenant, in.Role, in.IncludeUserInfo)
- if err != nil {
- errMsg := "fail to select the role"
- ctxLog.Warn(errMsg, zap.Error(err))
- return &milvuspb.SelectRoleResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
- }, nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &milvuspb.SelectRoleResponse{
- Status: merr.Success(),
- Results: roleResults,
- }, nil
- }
- // SelectUser select user
- // - check the node health
- // - check if the user is valid when this param is provided
- // - select user by the meta api
- func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
- method := "SelectUser"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.SelectUserResponse{Status: merr.Status(err)}, nil
- }
- if in.User != nil {
- if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil {
- if errors.Is(err, merr.ErrIoKeyNotFound) {
- return &milvuspb.SelectUserResponse{
- Status: merr.Success(),
- }, nil
- }
- errMsg := "fail to select the user to check the username"
- ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
- return &milvuspb.SelectUserResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
- }, nil
- }
- }
- userResults, err := c.meta.SelectUser(util.DefaultTenant, in.User, in.IncludeRoleInfo)
- if err != nil {
- errMsg := "fail to select the user"
- ctxLog.Warn(errMsg, zap.Error(err))
- return &milvuspb.SelectUserResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
- }, nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &milvuspb.SelectUserResponse{
- Status: merr.Success(),
- Results: userResults,
- }, nil
- }
- func (c *Core) isValidRole(entity *milvuspb.RoleEntity) error {
- if entity == nil {
- return errors.New("the role entity is nil")
- }
- if entity.Name == "" {
- return errors.New("the name in the role entity is empty")
- }
- if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}, false); err != nil {
- log.Warn("fail to select the role", zap.String("role_name", entity.Name), zap.Error(err))
- return errors.New("not found the role, maybe the role isn't existed or internal system error")
- }
- return nil
- }
- func (c *Core) isValidObject(entity *milvuspb.ObjectEntity) error {
- if entity == nil {
- return errors.New("the object entity is nil")
- }
- if _, ok := commonpb.ObjectType_value[entity.Name]; !ok {
- return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", entity.Name, lo.Keys(commonpb.ObjectType_value))
- }
- return nil
- }
- func (c *Core) isValidGrantor(entity *milvuspb.GrantorEntity, object string) error {
- if entity == nil {
- return errors.New("the grantor entity is nil")
- }
- if entity.User == nil {
- return errors.New("the user entity in the grantor entity is nil")
- }
- if entity.User.Name == "" {
- return errors.New("the name in the user entity of the grantor entity is empty")
- }
- if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: entity.User.Name}, false); err != nil {
- log.Warn("fail to select the user", zap.String("username", entity.User.Name), zap.Error(err))
- return errors.New("not found the user, maybe the user isn't existed or internal system error")
- }
- if entity.Privilege == nil {
- return errors.New("the privilege entity in the grantor entity is nil")
- }
- if util.IsAnyWord(entity.Privilege.Name) {
- return nil
- }
- if privilegeName := util.PrivilegeNameForMetastore(entity.Privilege.Name); privilegeName == "" {
- return fmt.Errorf("not found the privilege name[%s]", entity.Privilege.Name)
- }
- privileges, ok := util.ObjectPrivileges[object]
- if !ok {
- return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", object, lo.Keys(commonpb.ObjectType_value))
- }
- for _, privilege := range privileges {
- if privilege == entity.Privilege.Name {
- return nil
- }
- }
- return fmt.Errorf("not found the privilege name[%s] in object[%s]", entity.Privilege.Name, object)
- }
- // OperatePrivilege operate the privilege, including grant and revoke
- // - check the node health
- // - check if the operating type is valid
- // - check if the entity is nil
- // - check if the params, including the resource entity, the principal entity, the grantor entity, is valid
- // - operate the privilege by the meta api
- // - update the policy cache
- func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
- method := "OperatePrivilege"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- if in.Type != milvuspb.OperatePrivilegeType_Grant && in.Type != milvuspb.OperatePrivilegeType_Revoke {
- errMsg := fmt.Sprintf("invalid operate privilege type, current type: %s, valid value: [%s, %s]", in.Type, milvuspb.OperatePrivilegeType_Grant, milvuspb.OperatePrivilegeType_Revoke)
- ctxLog.Warn(errMsg)
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- if in.Entity == nil {
- errMsg := "the grant entity in the request is nil"
- ctxLog.Error(errMsg)
- return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- if err := c.isValidObject(in.Entity.Object); err != nil {
- ctxLog.Warn("", zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- if err := c.isValidRole(in.Entity.Role); err != nil {
- ctxLog.Warn("", zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- if err := c.isValidGrantor(in.Entity.Grantor, in.Entity.Object.Name); err != nil {
- ctxLog.Error("", zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- ctxLog.Debug("before PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
- if !util.IsAnyWord(in.Entity.Grantor.Privilege.Name) {
- in.Entity.Grantor.Privilege.Name = util.PrivilegeNameForMetastore(in.Entity.Grantor.Privilege.Name)
- }
- ctxLog.Debug("after PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
- if in.Entity.Object.Name == commonpb.ObjectType_Global.String() {
- in.Entity.ObjectName = util.AnyWord
- }
- redoTask := newBaseRedoTask(c.stepExecutor)
- redoTask.AddSyncStep(NewSimpleStep("operate privilege meta data", func(ctx context.Context) ([]nestedStep, error) {
- err := c.meta.OperatePrivilege(util.DefaultTenant, in.Entity, in.Type)
- if err != nil && !common.IsIgnorableError(err) {
- log.Warn("fail to operate the privilege", zap.Any("in", in), zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- redoTask.AddAsyncStep(NewSimpleStep("operate privilege cache", func(ctx context.Context) ([]nestedStep, error) {
- var opType int32
- switch in.Type {
- case milvuspb.OperatePrivilegeType_Grant:
- opType = int32(typeutil.CacheGrantPrivilege)
- case milvuspb.OperatePrivilegeType_Revoke:
- opType = int32(typeutil.CacheRevokePrivilege)
- default:
- log.Warn("invalid operate type for the OperatePrivilege api", zap.Any("in", in))
- return nil, nil
- }
- if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: opType,
- OpKey: funcutil.PolicyForPrivilege(in.Entity.Role.Name, in.Entity.Object.Name, in.Entity.ObjectName, in.Entity.Grantor.Privilege.Name, in.Entity.DbName),
- }); err != nil {
- log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- err := redoTask.Execute(ctx)
- if err != nil {
- errMsg := "fail to execute task when operating the privilege"
- log.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return merr.Success(), nil
- }
- // SelectGrant select grant
- // - check the node health
- // - check if the principal entity is valid
- // - check if the resource entity which is provided by the user is valid
- // - select grant by the meta api
- func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
- method := "SelectGrant"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.SelectGrantResponse{
- Status: merr.Status(err),
- }, nil
- }
- if in.Entity == nil {
- errMsg := "the grant entity in the request is nil"
- ctxLog.Warn(errMsg)
- return &milvuspb.SelectGrantResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
- }, nil
- }
- if err := c.isValidRole(in.Entity.Role); err != nil {
- ctxLog.Warn("", zap.Error(err))
- return &milvuspb.SelectGrantResponse{
- Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
- }, nil
- }
- if in.Entity.Object != nil {
- if err := c.isValidObject(in.Entity.Object); err != nil {
- ctxLog.Warn("", zap.Error(err))
- return &milvuspb.SelectGrantResponse{
- Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
- }, nil
- }
- }
- grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, in.Entity)
- if errors.Is(err, merr.ErrIoKeyNotFound) {
- return &milvuspb.SelectGrantResponse{
- Status: merr.Success(),
- Entities: grantEntities,
- }, nil
- }
- if err != nil {
- errMsg := "fail to select the grant"
- ctxLog.Warn(errMsg, zap.Error(err))
- return &milvuspb.SelectGrantResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
- }, nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &milvuspb.SelectGrantResponse{
- Status: merr.Success(),
- Entities: grantEntities,
- }, nil
- }
- func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
- method := "PolicyList"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &internalpb.ListPolicyResponse{
- Status: merr.Status(err),
- }, nil
- }
- policies, err := c.meta.ListPolicy(util.DefaultTenant)
- if err != nil {
- errMsg := "fail to list policy"
- ctxLog.Warn(errMsg, zap.Error(err))
- return &internalpb.ListPolicyResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
- }, nil
- }
- userRoles, err := c.meta.ListUserRole(util.DefaultTenant)
- if err != nil {
- errMsg := "fail to list user-role"
- ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
- return &internalpb.ListPolicyResponse{
- Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
- }, nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &internalpb.ListPolicyResponse{
- Status: merr.Success(),
- PolicyInfos: policies,
- UserRoles: userRoles,
- }, nil
- }
- func (c *Core) BackupRBAC(ctx context.Context, in *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) {
- method := "BackupRBAC"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.BackupRBACMetaResponse{
- Status: merr.Status(err),
- }, nil
- }
- rbacMeta, err := c.meta.BackupRBAC(ctx, util.DefaultTenant)
- if err != nil {
- return &milvuspb.BackupRBACMetaResponse{
- Status: merr.Status(err),
- }, nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return &milvuspb.BackupRBACMetaResponse{
- Status: merr.Success(),
- RBACMeta: rbacMeta,
- }, nil
- }
- func (c *Core) RestoreRBAC(ctx context.Context, in *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) {
- method := "RestoreRBAC"
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder(method)
- ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
- ctxLog.Debug(method)
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- redoTask := newBaseRedoTask(c.stepExecutor)
- redoTask.AddSyncStep(NewSimpleStep("restore rbac meta data", func(ctx context.Context) ([]nestedStep, error) {
- if err := c.meta.RestoreRBAC(ctx, util.DefaultTenant, in.RBACMeta); err != nil {
- log.Warn("fail to restore rbac meta data", zap.Any("in", in), zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- redoTask.AddAsyncStep(NewSimpleStep("operate privilege cache", func(ctx context.Context) ([]nestedStep, error) {
- if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
- OpType: int32(typeutil.CacheRefresh),
- }); err != nil {
- log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
- return nil, err
- }
- return nil, nil
- }))
- err := redoTask.Execute(ctx)
- if err != nil {
- errMsg := "fail to execute task when restore rbac meta data"
- log.Warn(errMsg, zap.Error(err))
- return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
- }
- ctxLog.Debug(method + " success")
- metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
- return merr.Success(), nil
- }
- func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return merr.Status(err), nil
- }
- log := log.Ctx(ctx).With(zap.String("oldCollectionName", req.GetOldName()), zap.String("newCollectionName", req.GetNewName()))
- log.Info("received request to rename collection")
- metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("RenameCollection")
- t := &renameCollectionTask{
- baseTask: newBaseTask(ctx, c),
- Req: req,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Warn("failed to enqueue request to rename collection", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Warn("failed to rename collection", zap.Uint64("ts", t.GetTs()), zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
- return merr.Status(err), nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Info("done to rename collection", zap.Uint64("ts", t.GetTs()))
- return merr.Success(), nil
- }
- func (c *Core) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
- }
- log := log.Ctx(ctx).With(zap.String("dbName", req.GetDbName()))
- log.Info("received request to describe database ")
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.TotalLabel).Inc()
- tr := timerecord.NewTimeRecorder("DescribeDatabase")
- t := &describeDBTask{
- baseTask: newBaseTask(ctx, c),
- Req: req,
- }
- if err := c.scheduler.AddTask(t); err != nil {
- log.Warn("failed to enqueue request to describe database", zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
- return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
- }
- if err := t.WaitToFinish(); err != nil {
- log.Warn("failed to describe database", zap.Uint64("ts", t.GetTs()), zap.Error(err))
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
- return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
- }
- metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.SuccessLabel).Inc()
- metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeDatabase").Observe(float64(tr.ElapseSpan().Milliseconds()))
- log.Info("done to describe database", zap.Uint64("ts", t.GetTs()))
- return t.Rsp, nil
- }
- func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
- if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
- return &milvuspb.CheckHealthResponse{
- Status: merr.Status(err),
- IsHealthy: false,
- Reasons: []string{fmt.Sprintf("serverID=%d: %v", c.session.ServerID, err)},
- }, nil
- }
- group, ctx := errgroup.WithContext(ctx)
- errs := typeutil.NewConcurrentSet[error]()
- proxyClients := c.proxyClientManager.GetProxyClients()
- proxyClients.Range(func(key int64, value types.ProxyClient) bool {
- nodeID := key
- proxyClient := value
- group.Go(func() error {
- sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
- if err != nil {
- errs.Insert(err)
- return err
- }
- err = merr.AnalyzeState("Proxy", nodeID, sta)
- if err != nil {
- errs.Insert(err)
- }
- return err
- })
- return true
- })
- maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
- if maxDelay > 0 {
- group.Go(func() error {
- err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
- if err != nil {
- errs.Insert(err)
- }
- return err
- })
- }
- err := group.Wait()
- if err != nil {
- return &milvuspb.CheckHealthResponse{
- Status: merr.Success(),
- IsHealthy: false,
- Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
- return err.Error()
- }),
- }, nil
- }
- return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
- }
|