root_coord.go 104 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package rootcoord
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "os"
  22. "sync"
  23. "time"
  24. "github.com/cockroachdb/errors"
  25. "github.com/samber/lo"
  26. "github.com/tikv/client-go/v2/txnkv"
  27. clientv3 "go.etcd.io/etcd/client/v3"
  28. "go.uber.org/atomic"
  29. "go.uber.org/zap"
  30. "golang.org/x/sync/errgroup"
  31. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  32. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  33. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  34. "github.com/milvus-io/milvus/internal/allocator"
  35. etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
  36. "github.com/milvus-io/milvus/internal/kv/tikv"
  37. "github.com/milvus-io/milvus/internal/metastore"
  38. kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
  39. "github.com/milvus-io/milvus/internal/metastore/model"
  40. pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
  41. "github.com/milvus-io/milvus/internal/proto/internalpb"
  42. "github.com/milvus-io/milvus/internal/proto/proxypb"
  43. "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
  44. tso2 "github.com/milvus-io/milvus/internal/tso"
  45. "github.com/milvus-io/milvus/internal/types"
  46. "github.com/milvus-io/milvus/internal/util/dependency"
  47. "github.com/milvus-io/milvus/internal/util/proxyutil"
  48. "github.com/milvus-io/milvus/internal/util/sessionutil"
  49. "github.com/milvus-io/milvus/internal/util/streamingutil"
  50. tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil"
  51. "github.com/milvus-io/milvus/pkg/common"
  52. "github.com/milvus-io/milvus/pkg/kv"
  53. "github.com/milvus-io/milvus/pkg/log"
  54. "github.com/milvus-io/milvus/pkg/metrics"
  55. "github.com/milvus-io/milvus/pkg/util"
  56. "github.com/milvus-io/milvus/pkg/util/commonpbutil"
  57. "github.com/milvus-io/milvus/pkg/util/crypto"
  58. "github.com/milvus-io/milvus/pkg/util/expr"
  59. "github.com/milvus-io/milvus/pkg/util/funcutil"
  60. "github.com/milvus-io/milvus/pkg/util/logutil"
  61. "github.com/milvus-io/milvus/pkg/util/merr"
  62. "github.com/milvus-io/milvus/pkg/util/metricsinfo"
  63. "github.com/milvus-io/milvus/pkg/util/paramtable"
  64. "github.com/milvus-io/milvus/pkg/util/retry"
  65. "github.com/milvus-io/milvus/pkg/util/timerecord"
  66. "github.com/milvus-io/milvus/pkg/util/tsoutil"
  67. "github.com/milvus-io/milvus/pkg/util/typeutil"
  68. )
  69. // UniqueID is an alias of typeutil.UniqueID.
  70. type UniqueID = typeutil.UniqueID
  71. // Timestamp is an alias of typeutil.Timestamp
  72. type Timestamp = typeutil.Timestamp
  73. const InvalidCollectionID = UniqueID(0)
  74. var Params *paramtable.ComponentParam = paramtable.Get()
  75. type Opt func(*Core)
  76. type metaKVCreator func() (kv.MetaKv, error)
  77. // Core root coordinator core
  78. type Core struct {
  79. ctx context.Context
  80. cancel context.CancelFunc
  81. wg sync.WaitGroup
  82. etcdCli *clientv3.Client
  83. tikvCli *txnkv.Client
  84. address string
  85. meta IMetaTable
  86. scheduler IScheduler
  87. broker Broker
  88. ddlTsLockManager DdlTsLockManager
  89. garbageCollector GarbageCollector
  90. stepExecutor StepExecutor
  91. metaKVCreator metaKVCreator
  92. proxyCreator proxyutil.ProxyCreator
  93. proxyWatcher *proxyutil.ProxyWatcher
  94. proxyClientManager proxyutil.ProxyClientManagerInterface
  95. metricsCacheManager *metricsinfo.MetricsCacheManager
  96. chanTimeTick *timetickSync
  97. idAllocator allocator.Interface
  98. tsoAllocator tso2.Allocator
  99. dataCoord types.DataCoordClient
  100. queryCoord types.QueryCoordClient
  101. quotaCenter *QuotaCenter
  102. stateCode atomic.Int32
  103. initOnce sync.Once
  104. startOnce sync.Once
  105. session *sessionutil.Session
  106. factory dependency.Factory
  107. enableActiveStandBy bool
  108. activateFunc func() error
  109. }
  110. // --------------------- function --------------------------
  111. // NewCore creates a new rootcoord core
  112. func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
  113. ctx, cancel := context.WithCancel(c)
  114. rand.Seed(time.Now().UnixNano())
  115. core := &Core{
  116. ctx: ctx,
  117. cancel: cancel,
  118. factory: factory,
  119. enableActiveStandBy: Params.RootCoordCfg.EnableActiveStandby.GetAsBool(),
  120. }
  121. core.UpdateStateCode(commonpb.StateCode_Abnormal)
  122. core.SetProxyCreator(proxyutil.DefaultProxyCreator)
  123. expr.Register("rootcoord", core)
  124. return core, nil
  125. }
  126. // UpdateStateCode update state code
  127. func (c *Core) UpdateStateCode(code commonpb.StateCode) {
  128. c.stateCode.Store(int32(code))
  129. log.Info("update rootcoord state", zap.String("state", code.String()))
  130. }
  131. func (c *Core) GetStateCode() commonpb.StateCode {
  132. return commonpb.StateCode(c.stateCode.Load())
  133. }
  134. func (c *Core) sendTimeTick(t Timestamp, reason string) error {
  135. pc := c.chanTimeTick.listDmlChannels()
  136. pt := make([]uint64, len(pc))
  137. for i := 0; i < len(pt); i++ {
  138. pt[i] = t
  139. }
  140. ttMsg := internalpb.ChannelTimeTickMsg{
  141. Base: commonpbutil.NewMsgBase(
  142. commonpbutil.WithMsgType(commonpb.MsgType_TimeTick),
  143. commonpbutil.WithTimeStamp(t),
  144. commonpbutil.WithSourceID(ddlSourceID),
  145. ),
  146. ChannelNames: pc,
  147. Timestamps: pt,
  148. DefaultTimestamp: t,
  149. }
  150. return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
  151. }
  152. func (c *Core) sendMinDdlTsAsTt() {
  153. if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() {
  154. return
  155. }
  156. code := c.GetStateCode()
  157. if code != commonpb.StateCode_Healthy {
  158. log.Warn("rootCoord is not healthy, skip send timetick")
  159. return
  160. }
  161. minBgDdlTs := c.ddlTsLockManager.GetMinDdlTs()
  162. minNormalDdlTs := c.scheduler.GetMinDdlTs()
  163. minDdlTs := funcutil.Min(minBgDdlTs, minNormalDdlTs)
  164. // zero -> ddlTsLockManager and scheduler not started.
  165. if minDdlTs == typeutil.ZeroTimestamp {
  166. log.Warn("zero ts was met, this should be only occurred in starting state", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
  167. return
  168. }
  169. // max -> abnormal case, impossible.
  170. if minDdlTs == typeutil.MaxTimestamp {
  171. log.Warn("ddl ts is abnormal, max ts was met", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
  172. return
  173. }
  174. if err := c.sendTimeTick(minDdlTs, "timetick loop"); err != nil {
  175. log.Warn("failed to send timetick", zap.Error(err))
  176. }
  177. }
  178. func (c *Core) startTimeTickLoop() {
  179. defer c.wg.Done()
  180. ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
  181. defer ticker.Stop()
  182. for {
  183. select {
  184. case <-c.ctx.Done():
  185. log.Info("rootcoord's timetick loop quit!")
  186. return
  187. case <-ticker.C:
  188. c.sendMinDdlTsAsTt()
  189. }
  190. }
  191. }
  192. func (c *Core) tsLoop() {
  193. defer c.wg.Done()
  194. tsoTicker := time.NewTicker(tso2.UpdateTimestampStep)
  195. defer tsoTicker.Stop()
  196. ctx, cancel := context.WithCancel(c.ctx)
  197. defer cancel()
  198. for {
  199. select {
  200. case <-tsoTicker.C:
  201. if err := c.tsoAllocator.UpdateTSO(); err != nil {
  202. log.Warn("failed to update tso", zap.Error(err))
  203. continue
  204. }
  205. ts := c.tsoAllocator.GetLastSavedTime()
  206. metrics.RootCoordTimestampSaved.Set(float64(ts.Unix()))
  207. case <-ctx.Done():
  208. log.Info("rootcoord's ts loop quit!")
  209. return
  210. }
  211. }
  212. }
  213. func (c *Core) SetProxyCreator(f func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) {
  214. c.proxyCreator = f
  215. }
  216. func (c *Core) SetDataCoordClient(s types.DataCoordClient) error {
  217. if s == nil {
  218. return errors.New("null DataCoord interface")
  219. }
  220. c.dataCoord = s
  221. return nil
  222. }
  223. func (c *Core) SetQueryCoordClient(s types.QueryCoordClient) error {
  224. if s == nil {
  225. return errors.New("null QueryCoord interface")
  226. }
  227. c.queryCoord = s
  228. return nil
  229. }
  230. // Register register rootcoord at etcd
  231. func (c *Core) Register() error {
  232. c.session.Register()
  233. afterRegister := func() {
  234. metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc()
  235. log.Info("RootCoord Register Finished")
  236. c.session.LivenessCheck(c.ctx, func() {
  237. log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
  238. os.Exit(1)
  239. })
  240. }
  241. if c.enableActiveStandBy {
  242. go func() {
  243. if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil {
  244. log.Warn("failed to activate standby rootcoord server", zap.Error(err))
  245. panic(err)
  246. }
  247. afterRegister()
  248. }()
  249. } else {
  250. afterRegister()
  251. }
  252. return nil
  253. }
  254. func (c *Core) SetAddress(address string) {
  255. c.address = address
  256. }
  257. // SetEtcdClient sets the etcdCli of Core
  258. func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) {
  259. c.etcdCli = etcdClient
  260. }
  261. // SetTiKVClient sets the tikvCli of Core
  262. func (c *Core) SetTiKVClient(client *txnkv.Client) {
  263. c.tikvCli = client
  264. }
  265. func (c *Core) initSession() error {
  266. c.session = sessionutil.NewSession(c.ctx)
  267. if c.session == nil {
  268. return fmt.Errorf("session is nil, the etcd client connection may have failed")
  269. }
  270. c.session.Init(typeutil.RootCoordRole, c.address, true, true)
  271. c.session.SetEnableActiveStandBy(c.enableActiveStandBy)
  272. return nil
  273. }
  274. func (c *Core) initKVCreator() {
  275. if c.metaKVCreator == nil {
  276. if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
  277. c.metaKVCreator = func() (kv.MetaKv, error) {
  278. return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
  279. tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
  280. }
  281. } else {
  282. c.metaKVCreator = func() (kv.MetaKv, error) {
  283. return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
  284. etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
  285. }
  286. }
  287. }
  288. }
  289. func (c *Core) initMetaTable() error {
  290. fn := func() error {
  291. var catalog metastore.RootCoordCatalog
  292. var err error
  293. switch Params.MetaStoreCfg.MetaStoreType.GetValue() {
  294. case util.MetaStoreTypeEtcd:
  295. log.Info("Using etcd as meta storage.")
  296. var metaKV kv.MetaKv
  297. var ss *kvmetestore.SuffixSnapshot
  298. var err error
  299. if metaKV, err = c.metaKVCreator(); err != nil {
  300. return err
  301. }
  302. if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
  303. return err
  304. }
  305. catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
  306. case util.MetaStoreTypeTiKV:
  307. log.Info("Using tikv as meta storage.")
  308. var metaKV kv.MetaKv
  309. var ss *kvmetestore.SuffixSnapshot
  310. var err error
  311. if metaKV, err = c.metaKVCreator(); err != nil {
  312. return err
  313. }
  314. if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
  315. return err
  316. }
  317. catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
  318. default:
  319. return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType.GetValue()))
  320. }
  321. if c.meta, err = NewMetaTable(c.ctx, catalog, c.tsoAllocator); err != nil {
  322. return err
  323. }
  324. return nil
  325. }
  326. return retry.Do(c.ctx, fn, retry.Attempts(10))
  327. }
  328. func (c *Core) initIDAllocator() error {
  329. var tsoKV kv.TxnKV
  330. var kvPath string
  331. if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
  332. kvPath = Params.TiKVCfg.KvRootPath.GetValue()
  333. tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, kvPath, globalIDAllocatorSubPath)
  334. } else {
  335. kvPath = Params.EtcdCfg.KvRootPath.GetValue()
  336. tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, kvPath, globalIDAllocatorSubPath)
  337. }
  338. idAllocator := allocator.NewGlobalIDAllocator(globalIDAllocatorKey, tsoKV)
  339. if err := idAllocator.Initialize(); err != nil {
  340. return err
  341. }
  342. c.idAllocator = idAllocator
  343. log.Info("id allocator initialized",
  344. zap.String("root_path", kvPath),
  345. zap.String("sub_path", globalIDAllocatorSubPath),
  346. zap.String("key", globalIDAllocatorKey))
  347. return nil
  348. }
  349. func (c *Core) initTSOAllocator() error {
  350. var tsoKV kv.TxnKV
  351. var kvPath string
  352. if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
  353. kvPath = Params.TiKVCfg.KvRootPath.GetValue()
  354. tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
  355. } else {
  356. kvPath = Params.EtcdCfg.KvRootPath.GetValue()
  357. tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
  358. }
  359. tsoAllocator := tso2.NewGlobalTSOAllocator(globalTSOAllocatorKey, tsoKV)
  360. if err := tsoAllocator.Initialize(); err != nil {
  361. return err
  362. }
  363. c.tsoAllocator = tsoAllocator
  364. log.Info("tso allocator initialized",
  365. zap.String("root_path", kvPath),
  366. zap.String("sub_path", globalIDAllocatorSubPath),
  367. zap.String("key", globalIDAllocatorKey))
  368. return nil
  369. }
  370. func (c *Core) initInternal() error {
  371. c.UpdateStateCode(commonpb.StateCode_Initializing)
  372. c.initKVCreator()
  373. if err := c.initIDAllocator(); err != nil {
  374. return err
  375. }
  376. if err := c.initTSOAllocator(); err != nil {
  377. return err
  378. }
  379. if err := c.initMetaTable(); err != nil {
  380. return err
  381. }
  382. c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator)
  383. c.factory.Init(Params)
  384. chanMap := c.meta.ListCollectionPhysicalChannels()
  385. c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap)
  386. log.Info("create TimeTick sync done")
  387. c.proxyClientManager = proxyutil.NewProxyClientManager(c.proxyCreator)
  388. c.broker = newServerBroker(c)
  389. c.ddlTsLockManager = newDdlTsLockManager(c.tsoAllocator)
  390. c.garbageCollector = newBgGarbageCollector(c)
  391. c.stepExecutor = newBgStepExecutor(c.ctx)
  392. c.proxyWatcher = proxyutil.NewProxyWatcher(
  393. c.etcdCli,
  394. c.chanTimeTick.initSessions,
  395. c.proxyClientManager.AddProxyClients,
  396. )
  397. c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
  398. c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
  399. log.Info("init proxy manager done")
  400. c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
  401. c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta)
  402. log.Debug("RootCoord init QuotaCenter done")
  403. if err := c.initCredentials(); err != nil {
  404. return err
  405. }
  406. log.Info("init credentials done")
  407. if err := c.initRbac(); err != nil {
  408. return err
  409. }
  410. log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
  411. return nil
  412. }
  413. // Init initialize routine
  414. func (c *Core) Init() error {
  415. var initError error
  416. c.factory.Init(Params)
  417. if err := c.initSession(); err != nil {
  418. return err
  419. }
  420. if c.enableActiveStandBy {
  421. c.activateFunc = func() error {
  422. log.Info("RootCoord switch from standby to active, activating")
  423. var err error
  424. c.initOnce.Do(func() {
  425. if err = c.initInternal(); err != nil {
  426. log.Error("RootCoord init failed", zap.Error(err))
  427. }
  428. })
  429. if err != nil {
  430. return err
  431. }
  432. c.startOnce.Do(func() {
  433. if err = c.startInternal(); err != nil {
  434. log.Error("RootCoord start failed", zap.Error(err))
  435. }
  436. })
  437. log.Info("RootCoord startup success", zap.String("address", c.session.Address))
  438. return err
  439. }
  440. c.UpdateStateCode(commonpb.StateCode_StandBy)
  441. log.Info("RootCoord enter standby mode successfully")
  442. } else {
  443. c.initOnce.Do(func() {
  444. initError = c.initInternal()
  445. })
  446. }
  447. return initError
  448. }
  449. func (c *Core) initCredentials() error {
  450. credInfo, _ := c.meta.GetCredential(util.UserRoot)
  451. if credInfo == nil {
  452. log.Debug("RootCoord init user root")
  453. encryptedRootPassword, _ := crypto.PasswordEncrypt(Params.CommonCfg.DefaultRootPassword.GetValue())
  454. err := c.meta.AddCredential(&internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
  455. return err
  456. }
  457. return nil
  458. }
  459. func (c *Core) initRbac() error {
  460. var err error
  461. // create default roles, including admin, public
  462. for _, role := range util.DefaultRoles {
  463. err = c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
  464. if err != nil && !common.IsIgnorableError(err) {
  465. return errors.Wrap(err, "failed to create role")
  466. }
  467. }
  468. if Params.ProxyCfg.EnablePublicPrivilege.GetAsBool() {
  469. err = c.initPublicRolePrivilege()
  470. if err != nil {
  471. return err
  472. }
  473. }
  474. if Params.RoleCfg.Enabled.GetAsBool() {
  475. return c.initBuiltinRoles()
  476. }
  477. return nil
  478. }
  479. func (c *Core) initPublicRolePrivilege() error {
  480. // grant privileges for the public role
  481. globalPrivileges := []string{
  482. commonpb.ObjectPrivilege_PrivilegeDescribeCollection.String(),
  483. }
  484. collectionPrivileges := []string{
  485. commonpb.ObjectPrivilege_PrivilegeIndexDetail.String(),
  486. }
  487. var err error
  488. for _, globalPrivilege := range globalPrivileges {
  489. err = c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
  490. Role: &milvuspb.RoleEntity{Name: util.RolePublic},
  491. Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()},
  492. ObjectName: util.AnyWord,
  493. DbName: util.AnyWord,
  494. Grantor: &milvuspb.GrantorEntity{
  495. User: &milvuspb.UserEntity{Name: util.UserRoot},
  496. Privilege: &milvuspb.PrivilegeEntity{Name: globalPrivilege},
  497. },
  498. }, milvuspb.OperatePrivilegeType_Grant)
  499. if err != nil && !common.IsIgnorableError(err) {
  500. return errors.Wrap(err, "failed to grant global privilege")
  501. }
  502. }
  503. for _, collectionPrivilege := range collectionPrivileges {
  504. err = c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
  505. Role: &milvuspb.RoleEntity{Name: util.RolePublic},
  506. Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Collection.String()},
  507. ObjectName: util.AnyWord,
  508. DbName: util.AnyWord,
  509. Grantor: &milvuspb.GrantorEntity{
  510. User: &milvuspb.UserEntity{Name: util.UserRoot},
  511. Privilege: &milvuspb.PrivilegeEntity{Name: collectionPrivilege},
  512. },
  513. }, milvuspb.OperatePrivilegeType_Grant)
  514. if err != nil && !common.IsIgnorableError(err) {
  515. return errors.Wrap(err, "failed to grant collection privilege")
  516. }
  517. }
  518. return nil
  519. }
  520. func (c *Core) initBuiltinRoles() error {
  521. rolePrivilegesMap := Params.RoleCfg.Roles.GetAsRoleDetails()
  522. for role, privilegesJSON := range rolePrivilegesMap {
  523. err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
  524. if err != nil && !common.IsIgnorableError(err) {
  525. log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err))
  526. return errors.Wrapf(err, "failed to create a builtin role: %s", role)
  527. }
  528. for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] {
  529. privilegeName := privilege[util.RoleConfigPrivilege]
  530. if !util.IsAnyWord(privilege[util.RoleConfigPrivilege]) {
  531. privilegeName = util.PrivilegeNameForMetastore(privilege[util.RoleConfigPrivilege])
  532. }
  533. err := c.meta.OperatePrivilege(util.DefaultTenant, &milvuspb.GrantEntity{
  534. Role: &milvuspb.RoleEntity{Name: role},
  535. Object: &milvuspb.ObjectEntity{Name: privilege[util.RoleConfigObjectType]},
  536. ObjectName: privilege[util.RoleConfigObjectName],
  537. DbName: privilege[util.RoleConfigDBName],
  538. Grantor: &milvuspb.GrantorEntity{
  539. User: &milvuspb.UserEntity{Name: util.UserRoot},
  540. Privilege: &milvuspb.PrivilegeEntity{Name: privilegeName},
  541. },
  542. }, milvuspb.OperatePrivilegeType_Grant)
  543. if err != nil && !common.IsIgnorableError(err) {
  544. log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err))
  545. return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role)
  546. }
  547. }
  548. util.BuiltinRoles = append(util.BuiltinRoles, role)
  549. log.Info("init a builtin role successfully", zap.String("roleName", role))
  550. }
  551. return nil
  552. }
  553. func (c *Core) restore(ctx context.Context) error {
  554. dbs, err := c.meta.ListDatabases(ctx, typeutil.MaxTimestamp)
  555. if err != nil {
  556. return err
  557. }
  558. for _, db := range dbs {
  559. colls, err := c.meta.ListCollections(ctx, db.Name, typeutil.MaxTimestamp, false)
  560. if err != nil {
  561. return err
  562. }
  563. for _, coll := range colls {
  564. ts, err := c.tsoAllocator.GenerateTSO(1)
  565. if err != nil {
  566. return err
  567. }
  568. if coll.Available() {
  569. for _, part := range coll.Partitions {
  570. switch part.State {
  571. case pb.PartitionState_PartitionDropping:
  572. go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, coll.VirtualChannelNames, part.Clone(), ts)
  573. case pb.PartitionState_PartitionCreating:
  574. go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts)
  575. default:
  576. }
  577. }
  578. } else {
  579. switch coll.State {
  580. case pb.CollectionState_CollectionDropping:
  581. go c.garbageCollector.ReDropCollection(coll.Clone(), ts)
  582. case pb.CollectionState_CollectionCreating:
  583. go c.garbageCollector.RemoveCreatingCollection(coll.Clone())
  584. default:
  585. }
  586. }
  587. }
  588. }
  589. return nil
  590. }
  591. func (c *Core) startInternal() error {
  592. if err := c.proxyWatcher.WatchProxy(c.ctx); err != nil {
  593. log.Fatal("rootcoord failed to watch proxy", zap.Error(err))
  594. // you can not just stuck here,
  595. panic(err)
  596. }
  597. if err := c.restore(c.ctx); err != nil {
  598. panic(err)
  599. }
  600. if Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
  601. c.quotaCenter.Start()
  602. }
  603. c.scheduler.Start()
  604. c.stepExecutor.Start()
  605. go func() {
  606. // refresh rbac cache
  607. if err := retry.Do(c.ctx, func() error {
  608. if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  609. OpType: int32(typeutil.CacheRefresh),
  610. }); err != nil {
  611. log.RatedWarn(60, "fail to refresh policy info cache", zap.Error(err))
  612. return err
  613. }
  614. return nil
  615. }, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
  616. log.Warn("fail to refresh policy info cache", zap.Error(err))
  617. }
  618. }()
  619. c.startServerLoop()
  620. c.UpdateStateCode(commonpb.StateCode_Healthy)
  621. sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
  622. logutil.Logger(c.ctx).Info("rootcoord startup successfully")
  623. return nil
  624. }
  625. func (c *Core) startServerLoop() {
  626. c.wg.Add(2)
  627. go c.startTimeTickLoop()
  628. go c.tsLoop()
  629. if !streamingutil.IsStreamingServiceEnabled() {
  630. c.wg.Add(1)
  631. go c.chanTimeTick.startWatch(&c.wg)
  632. }
  633. }
  634. // Start starts RootCoord.
  635. func (c *Core) Start() error {
  636. var err error
  637. if !c.enableActiveStandBy {
  638. c.startOnce.Do(func() {
  639. err = c.startInternal()
  640. })
  641. }
  642. return err
  643. }
  644. func (c *Core) stopExecutor() {
  645. if c.stepExecutor != nil {
  646. c.stepExecutor.Stop()
  647. log.Info("stop rootcoord executor")
  648. }
  649. }
  650. func (c *Core) stopScheduler() {
  651. if c.scheduler != nil {
  652. c.scheduler.Stop()
  653. log.Info("stop rootcoord scheduler")
  654. }
  655. }
  656. func (c *Core) cancelIfNotNil() {
  657. if c.cancel != nil {
  658. c.cancel()
  659. log.Info("cancel rootcoord goroutines")
  660. }
  661. }
  662. func (c *Core) revokeSession() {
  663. if c.session != nil {
  664. // wait at most one second to revoke
  665. c.session.Stop()
  666. log.Info("rootcoord session stop")
  667. }
  668. }
  669. // Stop stops rootCoord.
  670. func (c *Core) Stop() error {
  671. c.UpdateStateCode(commonpb.StateCode_Abnormal)
  672. c.stopExecutor()
  673. c.stopScheduler()
  674. if c.proxyWatcher != nil {
  675. c.proxyWatcher.Stop()
  676. }
  677. if c.quotaCenter != nil {
  678. c.quotaCenter.stop()
  679. }
  680. c.revokeSession()
  681. c.cancelIfNotNil()
  682. c.wg.Wait()
  683. return nil
  684. }
  685. // GetComponentStates get states of components
  686. func (c *Core) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
  687. code := c.GetStateCode()
  688. log.Debug("RootCoord current state", zap.String("StateCode", code.String()))
  689. nodeID := common.NotRegisteredID
  690. if c.session != nil && c.session.Registered() {
  691. nodeID = c.session.ServerID
  692. }
  693. return &milvuspb.ComponentStates{
  694. State: &milvuspb.ComponentInfo{
  695. // NodeID: c.session.ServerID, // will race with Core.Register()
  696. NodeID: nodeID,
  697. Role: typeutil.RootCoordRole,
  698. StateCode: code,
  699. ExtraInfo: nil,
  700. },
  701. Status: merr.Success(),
  702. SubcomponentStates: []*milvuspb.ComponentInfo{
  703. {
  704. NodeID: nodeID,
  705. Role: typeutil.RootCoordRole,
  706. StateCode: code,
  707. ExtraInfo: nil,
  708. },
  709. },
  710. }, nil
  711. }
  712. // GetTimeTickChannel get timetick channel name
  713. func (c *Core) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
  714. return &milvuspb.StringResponse{
  715. Status: merr.Success(),
  716. Value: Params.CommonCfg.RootCoordTimeTick.GetValue(),
  717. }, nil
  718. }
  719. // GetStatisticsChannel get statistics channel name
  720. func (c *Core) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
  721. return &milvuspb.StringResponse{
  722. Status: merr.Success(),
  723. Value: Params.CommonCfg.RootCoordStatistics.GetValue(),
  724. }, nil
  725. }
  726. func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) {
  727. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  728. return merr.Status(err), nil
  729. }
  730. method := "CreateDatabase"
  731. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  732. tr := timerecord.NewTimeRecorder("CreateDatabase")
  733. log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole),
  734. zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
  735. t := &createDatabaseTask{
  736. baseTask: newBaseTask(ctx, c),
  737. Req: in,
  738. }
  739. if err := c.scheduler.AddTask(t); err != nil {
  740. log.Ctx(ctx).Info("failed to enqueue request to create database",
  741. zap.String("role", typeutil.RootCoordRole),
  742. zap.Error(err),
  743. zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
  744. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  745. return merr.Status(err), nil
  746. }
  747. if err := t.WaitToFinish(); err != nil {
  748. log.Ctx(ctx).Info("failed to create database",
  749. zap.String("role", typeutil.RootCoordRole),
  750. zap.Error(err),
  751. zap.String("dbName", in.GetDbName()),
  752. zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
  753. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  754. return merr.Status(err), nil
  755. }
  756. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  757. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  758. log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole),
  759. zap.String("dbName", in.GetDbName()),
  760. zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
  761. return merr.Success(), nil
  762. }
  763. func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) {
  764. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  765. return merr.Status(err), nil
  766. }
  767. method := "DropDatabase"
  768. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  769. tr := timerecord.NewTimeRecorder("DropDatabase")
  770. log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole),
  771. zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
  772. t := &dropDatabaseTask{
  773. baseTask: newBaseTask(ctx, c),
  774. Req: in,
  775. }
  776. if err := c.scheduler.AddTask(t); err != nil {
  777. log.Ctx(ctx).Info("failed to enqueue request to drop database", zap.String("role", typeutil.RootCoordRole),
  778. zap.Error(err),
  779. zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
  780. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  781. return merr.Status(err), nil
  782. }
  783. if err := t.WaitToFinish(); err != nil {
  784. log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole),
  785. zap.Error(err),
  786. zap.String("dbName", in.GetDbName()),
  787. zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
  788. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  789. return merr.Status(err), nil
  790. }
  791. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  792. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  793. metrics.CleanupRootCoordDBMetrics(in.GetDbName())
  794. log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
  795. zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
  796. zap.Uint64("ts", t.GetTs()))
  797. return merr.Success(), nil
  798. }
  799. func (c *Core) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) {
  800. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  801. ret := &milvuspb.ListDatabasesResponse{Status: merr.Status(err)}
  802. return ret, nil
  803. }
  804. method := "ListDatabases"
  805. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  806. tr := timerecord.NewTimeRecorder("ListDatabases")
  807. log := log.Ctx(ctx).With(zap.Int64("msgID", in.GetBase().GetMsgID()))
  808. log.Info("received request to list databases")
  809. t := &listDatabaseTask{
  810. baseTask: newBaseTask(ctx, c),
  811. Req: in,
  812. Resp: &milvuspb.ListDatabasesResponse{},
  813. }
  814. if err := c.scheduler.AddTask(t); err != nil {
  815. log.Info("failed to enqueue request to list databases", zap.Error(err))
  816. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  817. return &milvuspb.ListDatabasesResponse{
  818. Status: merr.Status(err),
  819. }, nil
  820. }
  821. if err := t.WaitToFinish(); err != nil {
  822. log.Info("failed to list databases", zap.Error(err))
  823. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  824. return &milvuspb.ListDatabasesResponse{
  825. Status: merr.Status(err),
  826. }, nil
  827. }
  828. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  829. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  830. log.Info("done to list databases", zap.Int("num of databases", len(t.Resp.GetDbNames())))
  831. return t.Resp, nil
  832. }
  833. // CreateCollection create collection
  834. func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
  835. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  836. return merr.Status(err), nil
  837. }
  838. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
  839. tr := timerecord.NewTimeRecorder("CreateCollection")
  840. log.Ctx(ctx).Info("received request to create collection",
  841. zap.String("dbName", in.GetDbName()),
  842. zap.String("name", in.GetCollectionName()),
  843. zap.String("role", typeutil.RootCoordRole))
  844. t := &createCollectionTask{
  845. baseTask: newBaseTask(ctx, c),
  846. Req: in,
  847. }
  848. if err := c.scheduler.AddTask(t); err != nil {
  849. log.Ctx(ctx).Info("failed to enqueue request to create collection",
  850. zap.String("role", typeutil.RootCoordRole),
  851. zap.Error(err),
  852. zap.String("name", in.GetCollectionName()))
  853. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
  854. return merr.Status(err), nil
  855. }
  856. if err := t.WaitToFinish(); err != nil {
  857. log.Ctx(ctx).Info("failed to create collection",
  858. zap.String("role", typeutil.RootCoordRole),
  859. zap.Error(err),
  860. zap.String("name", in.GetCollectionName()),
  861. zap.Uint64("ts", t.GetTs()))
  862. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
  863. return merr.Status(err), nil
  864. }
  865. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
  866. metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  867. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateCollection").Observe(float64(t.queueDur.Milliseconds()))
  868. log.Ctx(ctx).Info("done to create collection",
  869. zap.String("role", typeutil.RootCoordRole),
  870. zap.String("name", in.GetCollectionName()),
  871. zap.Uint64("ts", t.GetTs()))
  872. return merr.Success(), nil
  873. }
  874. // DropCollection drop collection
  875. func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
  876. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  877. return merr.Status(err), nil
  878. }
  879. metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc()
  880. tr := timerecord.NewTimeRecorder("DropCollection")
  881. log.Ctx(ctx).Info("received request to drop collection",
  882. zap.String("role", typeutil.RootCoordRole),
  883. zap.String("dbName", in.GetDbName()),
  884. zap.String("name", in.GetCollectionName()))
  885. t := &dropCollectionTask{
  886. baseTask: newBaseTask(ctx, c),
  887. Req: in,
  888. }
  889. if err := c.scheduler.AddTask(t); err != nil {
  890. log.Ctx(ctx).Info("failed to enqueue request to drop collection", zap.String("role", typeutil.RootCoordRole),
  891. zap.Error(err),
  892. zap.String("name", in.GetCollectionName()))
  893. metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
  894. return merr.Status(err), nil
  895. }
  896. if err := t.WaitToFinish(); err != nil {
  897. log.Ctx(ctx).Info("failed to drop collection", zap.String("role", typeutil.RootCoordRole),
  898. zap.Error(err),
  899. zap.String("name", in.GetCollectionName()),
  900. zap.Uint64("ts", t.GetTs()))
  901. metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
  902. return merr.Status(err), nil
  903. }
  904. metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
  905. metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  906. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropCollection").Observe(float64(t.queueDur.Milliseconds()))
  907. log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole),
  908. zap.String("name", in.GetCollectionName()),
  909. zap.Uint64("ts", t.GetTs()))
  910. return merr.Success(), nil
  911. }
  912. // HasCollection check collection existence
  913. func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
  914. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  915. return &milvuspb.BoolResponse{
  916. Status: merr.Status(err),
  917. }, nil
  918. }
  919. metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.TotalLabel).Inc()
  920. tr := timerecord.NewTimeRecorder("HasCollection")
  921. ts := getTravelTs(in)
  922. log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
  923. zap.Uint64("ts", ts))
  924. t := &hasCollectionTask{
  925. baseTask: newBaseTask(ctx, c),
  926. Req: in,
  927. Rsp: &milvuspb.BoolResponse{},
  928. }
  929. if err := c.scheduler.AddTask(t); err != nil {
  930. log.Info("failed to enqueue request to has collection", zap.Error(err))
  931. metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
  932. return &milvuspb.BoolResponse{
  933. Status: merr.Status(err),
  934. }, nil
  935. }
  936. if err := t.WaitToFinish(); err != nil {
  937. log.Info("failed to has collection", zap.Error(err))
  938. metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
  939. return &milvuspb.BoolResponse{
  940. Status: merr.Status(err),
  941. }, nil
  942. }
  943. metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc()
  944. metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  945. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasCollection").Observe(float64(t.queueDur.Milliseconds()))
  946. return t.Rsp, nil
  947. }
  948. func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
  949. ts := getTravelTs(in)
  950. if in.GetCollectionName() != "" {
  951. return c.meta.GetCollectionByName(ctx, in.GetDbName(), in.GetCollectionName(), ts)
  952. }
  953. return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable)
  954. }
  955. func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName string) *milvuspb.DescribeCollectionResponse {
  956. resp := &milvuspb.DescribeCollectionResponse{
  957. Status: merr.Success(),
  958. DbName: dbName,
  959. }
  960. resp.Schema = &schemapb.CollectionSchema{
  961. Name: collInfo.Name,
  962. Description: collInfo.Description,
  963. AutoID: collInfo.AutoID,
  964. Fields: model.MarshalFieldModels(collInfo.Fields),
  965. Functions: model.MarshalFunctionModels(collInfo.Functions),
  966. EnableDynamicField: collInfo.EnableDynamicField,
  967. }
  968. resp.CollectionID = collInfo.CollectionID
  969. resp.VirtualChannelNames = collInfo.VirtualChannelNames
  970. resp.PhysicalChannelNames = collInfo.PhysicalChannelNames
  971. if collInfo.ShardsNum == 0 {
  972. collInfo.ShardsNum = int32(len(collInfo.VirtualChannelNames))
  973. }
  974. resp.ShardsNum = collInfo.ShardsNum
  975. resp.ConsistencyLevel = collInfo.ConsistencyLevel
  976. resp.CreatedTimestamp = collInfo.CreateTime
  977. createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime)
  978. resp.CreatedUtcTimestamp = uint64(createdPhysicalTime)
  979. resp.Aliases = aliases
  980. resp.StartPositions = collInfo.StartPositions
  981. resp.CollectionName = resp.Schema.Name
  982. resp.Properties = collInfo.Properties
  983. resp.NumPartitions = int64(len(collInfo.Partitions))
  984. resp.DbId = collInfo.DBID
  985. return resp
  986. }
  987. func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
  988. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  989. return &milvuspb.DescribeCollectionResponse{
  990. Status: merr.Status(err),
  991. }, nil
  992. }
  993. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.TotalLabel).Inc()
  994. tr := timerecord.NewTimeRecorder("DescribeCollection")
  995. ts := getTravelTs(in)
  996. log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
  997. zap.String("dbName", in.GetDbName()),
  998. zap.Int64("id", in.GetCollectionID()),
  999. zap.Uint64("ts", ts),
  1000. zap.Bool("allowUnavailable", allowUnavailable))
  1001. t := &describeCollectionTask{
  1002. baseTask: newBaseTask(ctx, c),
  1003. Req: in,
  1004. Rsp: &milvuspb.DescribeCollectionResponse{Status: merr.Success()},
  1005. allowUnavailable: allowUnavailable,
  1006. }
  1007. if err := c.scheduler.AddTask(t); err != nil {
  1008. log.Info("failed to enqueue request to describe collection", zap.Error(err))
  1009. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
  1010. return &milvuspb.DescribeCollectionResponse{
  1011. Status: merr.Status(err),
  1012. }, nil
  1013. }
  1014. if err := t.WaitToFinish(); err != nil {
  1015. log.Warn("failed to describe collection", zap.Error(err))
  1016. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
  1017. return &milvuspb.DescribeCollectionResponse{
  1018. Status: merr.Status(err),
  1019. }, nil
  1020. }
  1021. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc()
  1022. metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1023. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DescribeCollection").Observe(float64(t.queueDur.Milliseconds()))
  1024. return t.Rsp, nil
  1025. }
  1026. // DescribeCollection return collection info
  1027. func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
  1028. return c.describeCollectionImpl(ctx, in, false)
  1029. }
  1030. // DescribeCollectionInternal same to DescribeCollection, but will return unavailable collections and
  1031. // only used in internal RPC.
  1032. // When query cluster tried to do recovery, it'll be healthy until all collections' targets were recovered,
  1033. // so during this time, releasing request generated by rootcoord's recovery won't succeed. So in theory, rootcoord goes
  1034. // to be healthy, querycoord recovers all collections' targets, and then querycoord serves the releasing request sent
  1035. // by rootcoord, eventually, the dropping collections will be released.
  1036. func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
  1037. return c.describeCollectionImpl(ctx, in, true)
  1038. }
  1039. // ShowCollections list all collection names
  1040. func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
  1041. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1042. return &milvuspb.ShowCollectionsResponse{
  1043. Status: merr.Status(err),
  1044. }, nil
  1045. }
  1046. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc()
  1047. tr := timerecord.NewTimeRecorder("ShowCollections")
  1048. ts := getTravelTs(in)
  1049. log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()),
  1050. zap.Uint64("ts", ts))
  1051. t := &showCollectionTask{
  1052. baseTask: newBaseTask(ctx, c),
  1053. Req: in,
  1054. Rsp: &milvuspb.ShowCollectionsResponse{},
  1055. }
  1056. if err := c.scheduler.AddTask(t); err != nil {
  1057. log.Info("failed to enqueue request to show collections", zap.Error(err))
  1058. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
  1059. return &milvuspb.ShowCollectionsResponse{
  1060. Status: merr.Status(err),
  1061. }, nil
  1062. }
  1063. if err := t.WaitToFinish(); err != nil {
  1064. log.Info("failed to show collections", zap.Error(err))
  1065. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
  1066. return &milvuspb.ShowCollectionsResponse{
  1067. Status: merr.Status(err),
  1068. }, nil
  1069. }
  1070. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc()
  1071. metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1072. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowCollections").Observe(float64(t.queueDur.Milliseconds()))
  1073. return t.Rsp, nil
  1074. }
  1075. func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
  1076. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1077. return merr.Status(err), nil
  1078. }
  1079. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
  1080. tr := timerecord.NewTimeRecorder("AlterCollection")
  1081. log.Ctx(ctx).Info("received request to alter collection",
  1082. zap.String("role", typeutil.RootCoordRole),
  1083. zap.String("name", in.GetCollectionName()),
  1084. zap.Any("props", in.Properties))
  1085. t := &alterCollectionTask{
  1086. baseTask: newBaseTask(ctx, c),
  1087. Req: in,
  1088. }
  1089. if err := c.scheduler.AddTask(t); err != nil {
  1090. log.Warn("failed to enqueue request to alter collection",
  1091. zap.String("role", typeutil.RootCoordRole),
  1092. zap.Error(err),
  1093. zap.String("name", in.GetCollectionName()))
  1094. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
  1095. return merr.Status(err), nil
  1096. }
  1097. if err := t.WaitToFinish(); err != nil {
  1098. log.Warn("failed to alter collection",
  1099. zap.String("role", typeutil.RootCoordRole),
  1100. zap.Error(err),
  1101. zap.String("name", in.GetCollectionName()),
  1102. zap.Uint64("ts", t.GetTs()))
  1103. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
  1104. return merr.Status(err), nil
  1105. }
  1106. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
  1107. metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1108. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds()))
  1109. log.Info("done to alter collection",
  1110. zap.String("role", typeutil.RootCoordRole),
  1111. zap.String("name", in.GetCollectionName()),
  1112. zap.Uint64("ts", t.GetTs()))
  1113. return merr.Success(), nil
  1114. }
  1115. func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) {
  1116. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1117. return merr.Status(err), nil
  1118. }
  1119. method := "AlterDatabase"
  1120. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1121. tr := timerecord.NewTimeRecorder(method)
  1122. log.Ctx(ctx).Info("received request to alter database",
  1123. zap.String("role", typeutil.RootCoordRole),
  1124. zap.String("name", in.GetDbName()),
  1125. zap.Any("props", in.Properties))
  1126. t := &alterDatabaseTask{
  1127. baseTask: newBaseTask(ctx, c),
  1128. Req: in,
  1129. }
  1130. if err := c.scheduler.AddTask(t); err != nil {
  1131. log.Warn("failed to enqueue request to alter database",
  1132. zap.String("role", typeutil.RootCoordRole),
  1133. zap.String("name", in.GetDbName()),
  1134. zap.Error(err))
  1135. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1136. return merr.Status(err), nil
  1137. }
  1138. if err := t.WaitToFinish(); err != nil {
  1139. log.Warn("failed to alter database",
  1140. zap.String("role", typeutil.RootCoordRole),
  1141. zap.Error(err),
  1142. zap.String("name", in.GetDbName()),
  1143. zap.Uint64("ts", t.GetTs()))
  1144. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1145. return merr.Status(err), nil
  1146. }
  1147. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1148. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1149. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds()))
  1150. log.Ctx(ctx).Info("done to alter database",
  1151. zap.String("role", typeutil.RootCoordRole),
  1152. zap.String("name", in.GetDbName()),
  1153. zap.Uint64("ts", t.GetTs()))
  1154. return merr.Success(), nil
  1155. }
  1156. // CreatePartition create partition
  1157. func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
  1158. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1159. return merr.Status(err), nil
  1160. }
  1161. metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
  1162. tr := timerecord.NewTimeRecorder("CreatePartition")
  1163. log.Ctx(ctx).Info("received request to create partition",
  1164. zap.String("role", typeutil.RootCoordRole),
  1165. zap.String("collection", in.GetCollectionName()),
  1166. zap.String("partition", in.GetPartitionName()))
  1167. t := &createPartitionTask{
  1168. baseTask: newBaseTask(ctx, c),
  1169. Req: in,
  1170. }
  1171. if err := c.scheduler.AddTask(t); err != nil {
  1172. log.Ctx(ctx).Info("failed to enqueue request to create partition",
  1173. zap.String("role", typeutil.RootCoordRole),
  1174. zap.Error(err),
  1175. zap.String("collection", in.GetCollectionName()),
  1176. zap.String("partition", in.GetPartitionName()))
  1177. metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
  1178. return merr.Status(err), nil
  1179. }
  1180. if err := t.WaitToFinish(); err != nil {
  1181. log.Ctx(ctx).Info("failed to create partition",
  1182. zap.String("role", typeutil.RootCoordRole),
  1183. zap.Error(err),
  1184. zap.String("collection", in.GetCollectionName()),
  1185. zap.String("partition", in.GetPartitionName()),
  1186. zap.Uint64("ts", t.GetTs()))
  1187. metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
  1188. return merr.Status(err), nil
  1189. }
  1190. metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
  1191. metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1192. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreatePartition").Observe(float64(t.queueDur.Milliseconds()))
  1193. log.Ctx(ctx).Info("done to create partition",
  1194. zap.String("role", typeutil.RootCoordRole),
  1195. zap.String("collection", in.GetCollectionName()),
  1196. zap.String("partition", in.GetPartitionName()),
  1197. zap.Uint64("ts", t.GetTs()))
  1198. return merr.Success(), nil
  1199. }
  1200. // DropPartition drop partition
  1201. func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
  1202. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1203. return merr.Status(err), nil
  1204. }
  1205. metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
  1206. tr := timerecord.NewTimeRecorder("DropPartition")
  1207. log.Ctx(ctx).Info("received request to drop partition",
  1208. zap.String("role", typeutil.RootCoordRole),
  1209. zap.String("collection", in.GetCollectionName()),
  1210. zap.String("partition", in.GetPartitionName()))
  1211. t := &dropPartitionTask{
  1212. baseTask: newBaseTask(ctx, c),
  1213. Req: in,
  1214. }
  1215. if err := c.scheduler.AddTask(t); err != nil {
  1216. log.Ctx(ctx).Info("failed to enqueue request to drop partition",
  1217. zap.String("role", typeutil.RootCoordRole),
  1218. zap.Error(err),
  1219. zap.String("collection", in.GetCollectionName()),
  1220. zap.String("partition", in.GetPartitionName()))
  1221. metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
  1222. return merr.Status(err), nil
  1223. }
  1224. if err := t.WaitToFinish(); err != nil {
  1225. log.Ctx(ctx).Info("failed to drop partition",
  1226. zap.String("role", typeutil.RootCoordRole),
  1227. zap.Error(err),
  1228. zap.String("collection", in.GetCollectionName()),
  1229. zap.String("partition", in.GetPartitionName()),
  1230. zap.Uint64("ts", t.GetTs()))
  1231. metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
  1232. return merr.Status(err), nil
  1233. }
  1234. metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
  1235. metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1236. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropPartition").Observe(float64(t.queueDur.Milliseconds()))
  1237. log.Ctx(ctx).Info("done to drop partition",
  1238. zap.String("role", typeutil.RootCoordRole),
  1239. zap.String("collection", in.GetCollectionName()),
  1240. zap.String("partition", in.GetPartitionName()),
  1241. zap.Uint64("ts", t.GetTs()))
  1242. return merr.Success(), nil
  1243. }
  1244. // HasPartition check partition existence
  1245. func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
  1246. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1247. return &milvuspb.BoolResponse{
  1248. Status: merr.Status(err),
  1249. }, nil
  1250. }
  1251. metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.TotalLabel).Inc()
  1252. tr := timerecord.NewTimeRecorder("HasPartition")
  1253. // TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do.
  1254. ts := typeutil.MaxTimestamp
  1255. log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
  1256. zap.String("partition", in.GetPartitionName()),
  1257. zap.Uint64("ts", ts))
  1258. t := &hasPartitionTask{
  1259. baseTask: newBaseTask(ctx, c),
  1260. Req: in,
  1261. Rsp: &milvuspb.BoolResponse{},
  1262. }
  1263. if err := c.scheduler.AddTask(t); err != nil {
  1264. log.Info("failed to enqueue request to has partition", zap.Error(err))
  1265. metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
  1266. return &milvuspb.BoolResponse{
  1267. Status: merr.Status(err),
  1268. }, nil
  1269. }
  1270. if err := t.WaitToFinish(); err != nil {
  1271. log.Info("failed to has partition", zap.Error(err))
  1272. metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
  1273. return &milvuspb.BoolResponse{
  1274. Status: merr.Status(err),
  1275. }, nil
  1276. }
  1277. metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc()
  1278. metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1279. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasPartition").Observe(float64(t.queueDur.Milliseconds()))
  1280. return t.Rsp, nil
  1281. }
  1282. func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) {
  1283. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1284. return &milvuspb.ShowPartitionsResponse{
  1285. Status: merr.Status(err),
  1286. }, nil
  1287. }
  1288. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
  1289. tr := timerecord.NewTimeRecorder("ShowPartitions")
  1290. log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
  1291. zap.Int64("collection_id", in.GetCollectionID()),
  1292. zap.Strings("partitions", in.GetPartitionNames()),
  1293. zap.Bool("allowUnavailable", allowUnavailable))
  1294. t := &showPartitionTask{
  1295. baseTask: newBaseTask(ctx, c),
  1296. Req: in,
  1297. Rsp: &milvuspb.ShowPartitionsResponse{},
  1298. allowUnavailable: allowUnavailable,
  1299. }
  1300. if err := c.scheduler.AddTask(t); err != nil {
  1301. log.Info("failed to enqueue request to show partitions", zap.Error(err))
  1302. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
  1303. return &milvuspb.ShowPartitionsResponse{
  1304. Status: merr.Status(err),
  1305. // Status: common.StatusFromError(err),
  1306. }, nil
  1307. }
  1308. if err := t.WaitToFinish(); err != nil {
  1309. log.Info("failed to show partitions", zap.Error(err))
  1310. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
  1311. return &milvuspb.ShowPartitionsResponse{
  1312. Status: merr.Status(err),
  1313. // Status: common.StatusFromError(err),
  1314. }, nil
  1315. }
  1316. metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc()
  1317. metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1318. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowPartitions").Observe(float64(t.queueDur.Milliseconds()))
  1319. return t.Rsp, nil
  1320. }
  1321. // ShowPartitions list all partition names
  1322. func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
  1323. return c.showPartitionsImpl(ctx, in, false)
  1324. }
  1325. // ShowPartitionsInternal same to ShowPartitions, only used in internal RPC.
  1326. func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
  1327. return c.showPartitionsImpl(ctx, in, true)
  1328. }
  1329. // ShowSegments list all segments
  1330. func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
  1331. // ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time.
  1332. // Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments.
  1333. return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil
  1334. }
  1335. // GetPChannelInfo get pchannel info.
  1336. func (c *Core) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
  1337. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1338. return &rootcoordpb.GetPChannelInfoResponse{
  1339. Status: merr.Status(err),
  1340. }, nil
  1341. }
  1342. return c.meta.GetPChannelInfo(in.GetPchannel()), nil
  1343. }
  1344. // AllocTimestamp alloc timestamp
  1345. func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
  1346. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1347. return &rootcoordpb.AllocTimestampResponse{
  1348. Status: merr.Status(err),
  1349. }, nil
  1350. }
  1351. ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
  1352. if err != nil {
  1353. log.Ctx(ctx).Error("failed to allocate timestamp", zap.String("role", typeutil.RootCoordRole),
  1354. zap.Error(err))
  1355. return &rootcoordpb.AllocTimestampResponse{
  1356. Status: merr.Status(err),
  1357. }, nil
  1358. }
  1359. // return first available timestamp
  1360. ts = ts - uint64(in.GetCount()) + 1
  1361. metrics.RootCoordTimestamp.Set(float64(ts))
  1362. return &rootcoordpb.AllocTimestampResponse{
  1363. Status: merr.Success(),
  1364. Timestamp: ts,
  1365. Count: in.GetCount(),
  1366. }, nil
  1367. }
  1368. // AllocID alloc ids
  1369. func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
  1370. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1371. return &rootcoordpb.AllocIDResponse{
  1372. Status: merr.Status(err),
  1373. }, nil
  1374. }
  1375. start, _, err := c.idAllocator.Alloc(in.Count)
  1376. if err != nil {
  1377. log.Ctx(ctx).Error("failed to allocate id",
  1378. zap.String("role", typeutil.RootCoordRole),
  1379. zap.Error(err))
  1380. return &rootcoordpb.AllocIDResponse{
  1381. Status: merr.Status(err),
  1382. Count: in.Count,
  1383. }, nil
  1384. }
  1385. metrics.RootCoordIDAllocCounter.Add(float64(in.Count))
  1386. return &rootcoordpb.AllocIDResponse{
  1387. Status: merr.Success(),
  1388. ID: start,
  1389. Count: in.Count,
  1390. }, nil
  1391. }
  1392. // UpdateChannelTimeTick used to handle ChannelTimeTickMsg
  1393. func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
  1394. log := log.Ctx(ctx)
  1395. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1396. log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Error(err))
  1397. return merr.Status(err), nil
  1398. }
  1399. if in.Base.MsgType != commonpb.MsgType_TimeTick {
  1400. log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType))
  1401. return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil
  1402. }
  1403. err := c.chanTimeTick.updateTimeTick(in, "gRPC")
  1404. if err != nil {
  1405. log.Warn("failed to updateTimeTick",
  1406. zap.String("role", typeutil.RootCoordRole),
  1407. zap.Error(err))
  1408. return merr.Status(err), nil
  1409. }
  1410. return merr.Success(), nil
  1411. }
  1412. // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
  1413. func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
  1414. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1415. return merr.Status(err), nil
  1416. }
  1417. err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in)
  1418. if err != nil {
  1419. return merr.Status(err), nil
  1420. }
  1421. return merr.Success(), nil
  1422. }
  1423. // ShowConfigurations returns the configurations of RootCoord matching req.Pattern
  1424. func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
  1425. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1426. return &internalpb.ShowConfigurationsResponse{
  1427. Status: merr.Status(err),
  1428. Configuations: nil,
  1429. }, nil
  1430. }
  1431. configList := make([]*commonpb.KeyValuePair, 0)
  1432. for key, value := range Params.GetComponentConfigurations("rootcoord", req.Pattern) {
  1433. configList = append(configList,
  1434. &commonpb.KeyValuePair{
  1435. Key: key,
  1436. Value: value,
  1437. })
  1438. }
  1439. return &internalpb.ShowConfigurationsResponse{
  1440. Status: merr.Success(),
  1441. Configuations: configList,
  1442. }, nil
  1443. }
  1444. // GetMetrics get metrics
  1445. func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
  1446. if err := merr.CheckHealthyStandby(c.GetStateCode()); err != nil {
  1447. return &milvuspb.GetMetricsResponse{
  1448. Status: merr.Status(err),
  1449. Response: "",
  1450. }, nil
  1451. }
  1452. metricType, err := metricsinfo.ParseMetricType(in.Request)
  1453. if err != nil {
  1454. log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole),
  1455. zap.Int64("nodeID", c.session.ServerID), zap.String("req", in.Request), zap.Error(err))
  1456. return &milvuspb.GetMetricsResponse{
  1457. Status: merr.Status(err),
  1458. Response: "",
  1459. }, nil
  1460. }
  1461. if metricType == metricsinfo.SystemInfoMetrics {
  1462. metrics, err := c.metricsCacheManager.GetSystemInfoMetrics()
  1463. if err != nil {
  1464. metrics, err = c.getSystemInfoMetrics(ctx, in)
  1465. }
  1466. if err != nil {
  1467. log.Warn("GetSystemInfoMetrics failed",
  1468. zap.String("role", typeutil.RootCoordRole),
  1469. zap.String("metricType", metricType),
  1470. zap.Error(err))
  1471. return &milvuspb.GetMetricsResponse{
  1472. Status: merr.Status(err),
  1473. Response: "",
  1474. }, nil
  1475. }
  1476. c.metricsCacheManager.UpdateSystemInfoMetrics(metrics)
  1477. return metrics, err
  1478. }
  1479. log.RatedWarn(60, "GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole),
  1480. zap.String("metricType", metricType))
  1481. return &milvuspb.GetMetricsResponse{
  1482. Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
  1483. Response: "",
  1484. }, nil
  1485. }
  1486. // CreateAlias create collection alias
  1487. func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
  1488. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1489. return merr.Status(err), nil
  1490. }
  1491. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
  1492. tr := timerecord.NewTimeRecorder("CreateAlias")
  1493. log.Ctx(ctx).Info("received request to create alias",
  1494. zap.String("role", typeutil.RootCoordRole),
  1495. zap.String("alias", in.GetAlias()),
  1496. zap.String("collection", in.GetCollectionName()))
  1497. t := &createAliasTask{
  1498. baseTask: newBaseTask(ctx, c),
  1499. Req: in,
  1500. }
  1501. if err := c.scheduler.AddTask(t); err != nil {
  1502. log.Ctx(ctx).Info("failed to enqueue request to create alias",
  1503. zap.String("role", typeutil.RootCoordRole),
  1504. zap.Error(err),
  1505. zap.String("alias", in.GetAlias()),
  1506. zap.String("collection", in.GetCollectionName()))
  1507. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
  1508. return merr.Status(err), nil
  1509. }
  1510. if err := t.WaitToFinish(); err != nil {
  1511. log.Ctx(ctx).Info("failed to create alias",
  1512. zap.String("role", typeutil.RootCoordRole),
  1513. zap.Error(err),
  1514. zap.String("alias", in.GetAlias()),
  1515. zap.String("collection", in.GetCollectionName()),
  1516. zap.Uint64("ts", t.GetTs()))
  1517. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
  1518. return merr.Status(err), nil
  1519. }
  1520. metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
  1521. metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1522. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds()))
  1523. log.Ctx(ctx).Info("done to create alias",
  1524. zap.String("role", typeutil.RootCoordRole),
  1525. zap.String("alias", in.GetAlias()),
  1526. zap.String("collection", in.GetCollectionName()),
  1527. zap.Uint64("ts", t.GetTs()))
  1528. return merr.Success(), nil
  1529. }
  1530. // DropAlias drop collection alias
  1531. func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
  1532. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1533. return merr.Status(err), nil
  1534. }
  1535. metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
  1536. tr := timerecord.NewTimeRecorder("DropAlias")
  1537. log.Ctx(ctx).Info("received request to drop alias",
  1538. zap.String("role", typeutil.RootCoordRole),
  1539. zap.String("alias", in.GetAlias()))
  1540. t := &dropAliasTask{
  1541. baseTask: newBaseTask(ctx, c),
  1542. Req: in,
  1543. }
  1544. if err := c.scheduler.AddTask(t); err != nil {
  1545. log.Ctx(ctx).Info("failed to enqueue request to drop alias",
  1546. zap.String("role", typeutil.RootCoordRole),
  1547. zap.Error(err),
  1548. zap.String("alias", in.GetAlias()))
  1549. metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
  1550. return merr.Status(err), nil
  1551. }
  1552. if err := t.WaitToFinish(); err != nil {
  1553. log.Ctx(ctx).Info("failed to drop alias",
  1554. zap.String("role", typeutil.RootCoordRole),
  1555. zap.Error(err),
  1556. zap.String("alias", in.GetAlias()),
  1557. zap.Uint64("ts", t.GetTs()))
  1558. metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
  1559. return merr.Status(err), nil
  1560. }
  1561. metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
  1562. metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1563. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds()))
  1564. log.Ctx(ctx).Info("done to drop alias",
  1565. zap.String("role", typeutil.RootCoordRole),
  1566. zap.String("alias", in.GetAlias()),
  1567. zap.Uint64("ts", t.GetTs()))
  1568. return merr.Success(), nil
  1569. }
  1570. // AlterAlias alter collection alias
  1571. func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
  1572. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1573. return merr.Status(err), nil
  1574. }
  1575. metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
  1576. tr := timerecord.NewTimeRecorder("AlterAlias")
  1577. log.Ctx(ctx).Info("received request to alter alias",
  1578. zap.String("role", typeutil.RootCoordRole),
  1579. zap.String("alias", in.GetAlias()),
  1580. zap.String("collection", in.GetCollectionName()))
  1581. t := &alterAliasTask{
  1582. baseTask: newBaseTask(ctx, c),
  1583. Req: in,
  1584. }
  1585. if err := c.scheduler.AddTask(t); err != nil {
  1586. log.Ctx(ctx).Info("failed to enqueue request to alter alias",
  1587. zap.String("role", typeutil.RootCoordRole),
  1588. zap.Error(err),
  1589. zap.String("alias", in.GetAlias()),
  1590. zap.String("collection", in.GetCollectionName()))
  1591. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
  1592. return merr.Status(err), nil
  1593. }
  1594. if err := t.WaitToFinish(); err != nil {
  1595. log.Ctx(ctx).Info("failed to alter alias",
  1596. zap.String("role", typeutil.RootCoordRole),
  1597. zap.Error(err),
  1598. zap.String("alias", in.GetAlias()),
  1599. zap.String("collection", in.GetCollectionName()),
  1600. zap.Uint64("ts", t.GetTs()))
  1601. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
  1602. return merr.Status(err), nil
  1603. }
  1604. metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
  1605. metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
  1606. metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds()))
  1607. log.Info("done to alter alias",
  1608. zap.String("role", typeutil.RootCoordRole),
  1609. zap.String("alias", in.GetAlias()),
  1610. zap.String("collection", in.GetCollectionName()),
  1611. zap.Uint64("ts", t.GetTs()))
  1612. return merr.Success(), nil
  1613. }
  1614. // DescribeAlias describe collection alias
  1615. func (c *Core) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
  1616. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1617. return &milvuspb.DescribeAliasResponse{
  1618. Status: merr.Status(err),
  1619. }, nil
  1620. }
  1621. log := log.Ctx(ctx).With(
  1622. zap.String("role", typeutil.RootCoordRole),
  1623. zap.String("db", in.GetDbName()),
  1624. zap.String("alias", in.GetAlias()))
  1625. method := "DescribeAlias"
  1626. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1627. tr := timerecord.NewTimeRecorder("DescribeAlias")
  1628. log.Info("received request to describe alias")
  1629. if in.GetAlias() == "" {
  1630. return &milvuspb.DescribeAliasResponse{
  1631. Status: merr.Status(merr.WrapErrParameterMissing("alias", "no input alias")),
  1632. }, nil
  1633. }
  1634. collectionName, err := c.meta.DescribeAlias(ctx, in.GetDbName(), in.GetAlias(), 0)
  1635. if err != nil {
  1636. log.Warn("fail to DescribeAlias", zap.Error(err))
  1637. return &milvuspb.DescribeAliasResponse{
  1638. Status: merr.Status(err),
  1639. }, nil
  1640. }
  1641. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1642. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1643. log.Info("done to describe alias")
  1644. return &milvuspb.DescribeAliasResponse{
  1645. Status: merr.Status(nil),
  1646. DbName: in.GetDbName(),
  1647. Alias: in.GetAlias(),
  1648. Collection: collectionName,
  1649. }, nil
  1650. }
  1651. // ListAliases list aliases
  1652. func (c *Core) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
  1653. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1654. return &milvuspb.ListAliasesResponse{
  1655. Status: merr.Status(err),
  1656. }, nil
  1657. }
  1658. method := "ListAliases"
  1659. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1660. tr := timerecord.NewTimeRecorder(method)
  1661. log := log.Ctx(ctx).With(
  1662. zap.String("role", typeutil.RootCoordRole),
  1663. zap.String("db", in.GetDbName()),
  1664. zap.String("collectionName", in.GetCollectionName()))
  1665. log.Info("received request to list aliases")
  1666. aliases, err := c.meta.ListAliases(ctx, in.GetDbName(), in.GetCollectionName(), 0)
  1667. if err != nil {
  1668. log.Warn("fail to ListAliases", zap.Error(err))
  1669. return &milvuspb.ListAliasesResponse{
  1670. Status: merr.Status(err),
  1671. }, nil
  1672. }
  1673. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1674. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1675. log.Info("done to list aliases")
  1676. return &milvuspb.ListAliasesResponse{
  1677. Status: merr.Status(nil),
  1678. DbName: in.GetDbName(),
  1679. CollectionName: in.GetCollectionName(),
  1680. Aliases: aliases,
  1681. }, nil
  1682. }
  1683. // ExpireCredCache will call invalidate credential cache
  1684. func (c *Core) ExpireCredCache(ctx context.Context, username string) error {
  1685. req := proxypb.InvalidateCredCacheRequest{
  1686. Base: commonpbutil.NewMsgBase(
  1687. commonpbutil.WithSourceID(c.session.ServerID),
  1688. ),
  1689. Username: username,
  1690. }
  1691. return c.proxyClientManager.InvalidateCredentialCache(ctx, &req)
  1692. }
  1693. // UpdateCredCache will call update credential cache
  1694. func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error {
  1695. req := proxypb.UpdateCredCacheRequest{
  1696. Base: commonpbutil.NewMsgBase(
  1697. commonpbutil.WithSourceID(c.session.ServerID),
  1698. ),
  1699. Username: credInfo.Username,
  1700. Password: credInfo.Sha256Password,
  1701. }
  1702. return c.proxyClientManager.UpdateCredentialCache(ctx, &req)
  1703. }
  1704. // CreateCredential create new user and password
  1705. // 1. decode ciphertext password to raw password
  1706. // 2. encrypt raw password
  1707. // 3. save in to etcd
  1708. func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
  1709. method := "CreateCredential"
  1710. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1711. tr := timerecord.NewTimeRecorder(method)
  1712. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
  1713. ctxLog.Debug(method)
  1714. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1715. return merr.Status(err), nil
  1716. }
  1717. // insert to db
  1718. err := c.meta.AddCredential(credInfo)
  1719. if err != nil {
  1720. ctxLog.Warn("CreateCredential save credential failed", zap.Error(err))
  1721. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1722. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateCredentialFailure), nil
  1723. }
  1724. // update proxy's local cache
  1725. err = c.UpdateCredCache(ctx, credInfo)
  1726. if err != nil {
  1727. ctxLog.Warn("CreateCredential add cache failed", zap.Error(err))
  1728. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1729. }
  1730. log.Debug("CreateCredential success", zap.String("role", typeutil.RootCoordRole),
  1731. zap.String("username", credInfo.Username))
  1732. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1733. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1734. metrics.RootCoordNumOfCredentials.Inc()
  1735. return merr.Success(), nil
  1736. }
  1737. // GetCredential get credential by username
  1738. func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
  1739. method := "GetCredential"
  1740. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1741. tr := timerecord.NewTimeRecorder(method)
  1742. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
  1743. ctxLog.Debug(method)
  1744. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1745. return &rootcoordpb.GetCredentialResponse{Status: merr.Status(err)}, nil
  1746. }
  1747. credInfo, err := c.meta.GetCredential(in.Username)
  1748. if err != nil {
  1749. ctxLog.Warn("GetCredential query credential failed", zap.Error(err))
  1750. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1751. return &rootcoordpb.GetCredentialResponse{
  1752. Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_GetCredentialFailure),
  1753. }, nil
  1754. }
  1755. ctxLog.Debug("GetCredential success")
  1756. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1757. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1758. return &rootcoordpb.GetCredentialResponse{
  1759. Status: merr.Success(),
  1760. Username: credInfo.Username,
  1761. Password: credInfo.EncryptedPassword,
  1762. }, nil
  1763. }
  1764. // UpdateCredential update password for a user
  1765. func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
  1766. method := "UpdateCredential"
  1767. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1768. tr := timerecord.NewTimeRecorder(method)
  1769. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
  1770. ctxLog.Debug(method)
  1771. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1772. return merr.Status(err), nil
  1773. }
  1774. // update data on storage
  1775. err := c.meta.AlterCredential(credInfo)
  1776. if err != nil {
  1777. ctxLog.Warn("UpdateCredential save credential failed", zap.Error(err))
  1778. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1779. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
  1780. }
  1781. // update proxy's local cache
  1782. err = c.UpdateCredCache(ctx, credInfo)
  1783. if err != nil {
  1784. ctxLog.Warn("UpdateCredential update cache failed", zap.Error(err))
  1785. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1786. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
  1787. }
  1788. log.Debug("UpdateCredential success")
  1789. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1790. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1791. return merr.Success(), nil
  1792. }
  1793. // DeleteCredential delete a user
  1794. func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
  1795. method := "DeleteCredential"
  1796. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1797. tr := timerecord.NewTimeRecorder(method)
  1798. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
  1799. ctxLog.Debug(method)
  1800. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1801. return merr.Status(err), nil
  1802. }
  1803. var status *commonpb.Status
  1804. defer func() {
  1805. if status.Code != 0 {
  1806. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1807. }
  1808. }()
  1809. redoTask := newBaseRedoTask(c.stepExecutor)
  1810. redoTask.AddSyncStep(NewSimpleStep("delete credential meta data", func(ctx context.Context) ([]nestedStep, error) {
  1811. err := c.meta.DeleteCredential(in.Username)
  1812. if err != nil {
  1813. ctxLog.Warn("delete credential meta data failed", zap.Error(err))
  1814. }
  1815. return nil, err
  1816. }))
  1817. redoTask.AddAsyncStep(NewSimpleStep("delete credential cache", func(ctx context.Context) ([]nestedStep, error) {
  1818. err := c.ExpireCredCache(ctx, in.Username)
  1819. if err != nil {
  1820. ctxLog.Warn("delete credential cache failed", zap.Error(err))
  1821. }
  1822. return nil, err
  1823. }))
  1824. redoTask.AddAsyncStep(NewSimpleStep("delete user role cache for the user", func(ctx context.Context) ([]nestedStep, error) {
  1825. err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  1826. OpType: int32(typeutil.CacheDeleteUser),
  1827. OpKey: in.Username,
  1828. })
  1829. if err != nil {
  1830. ctxLog.Warn("delete user role cache failed for the user", zap.Error(err))
  1831. }
  1832. return nil, err
  1833. }))
  1834. err := redoTask.Execute(ctx)
  1835. if err != nil {
  1836. errMsg := "fail to execute task when deleting the user"
  1837. ctxLog.Warn(errMsg, zap.Error(err))
  1838. status = merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DeleteCredentialFailure)
  1839. return status, nil
  1840. }
  1841. ctxLog.Debug("DeleteCredential success")
  1842. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1843. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1844. metrics.RootCoordNumOfCredentials.Dec()
  1845. status = merr.Success()
  1846. return status, nil
  1847. }
  1848. // ListCredUsers list all usernames
  1849. func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
  1850. method := "ListCredUsers"
  1851. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1852. tr := timerecord.NewTimeRecorder(method)
  1853. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
  1854. ctxLog.Debug(method)
  1855. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1856. return &milvuspb.ListCredUsersResponse{Status: merr.Status(err)}, nil
  1857. }
  1858. credInfo, err := c.meta.ListCredentialUsernames()
  1859. if err != nil {
  1860. ctxLog.Warn("ListCredUsers query usernames failed", zap.Error(err))
  1861. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
  1862. status := merr.Status(err)
  1863. return &milvuspb.ListCredUsersResponse{Status: status}, nil
  1864. }
  1865. ctxLog.Debug("ListCredUsers success")
  1866. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1867. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1868. return &milvuspb.ListCredUsersResponse{
  1869. Status: merr.Success(),
  1870. Usernames: credInfo.Usernames,
  1871. }, nil
  1872. }
  1873. // CreateRole create role
  1874. // - check the node health
  1875. // - check if the role is existed
  1876. // - check if the role num has reached the limit
  1877. // - create the role by the meta api
  1878. func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
  1879. method := "CreateRole"
  1880. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1881. tr := timerecord.NewTimeRecorder(method)
  1882. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  1883. ctxLog.Debug(method + " begin")
  1884. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1885. return merr.Status(err), nil
  1886. }
  1887. entity := in.Entity
  1888. err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name})
  1889. if err != nil {
  1890. errMsg := "fail to create role"
  1891. ctxLog.Warn(errMsg, zap.Error(err))
  1892. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateRoleFailure), nil
  1893. }
  1894. ctxLog.Debug(method + " success")
  1895. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1896. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1897. metrics.RootCoordNumOfRoles.Inc()
  1898. return merr.Success(), nil
  1899. }
  1900. // DropRole drop role
  1901. // - check the node health
  1902. // - check if the role name is existed
  1903. // - check if the role has some grant info
  1904. // - get all role mapping of this role
  1905. // - drop these role mappings
  1906. // - drop the role by the meta api
  1907. func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
  1908. method := "DropRole"
  1909. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1910. tr := timerecord.NewTimeRecorder(method)
  1911. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("role_name", in.RoleName))
  1912. ctxLog.Debug(method)
  1913. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1914. return merr.Status(err), nil
  1915. }
  1916. for util.IsBuiltinRole(in.GetRoleName()) {
  1917. err := merr.WrapErrPrivilegeNotPermitted("the role[%s] is a builtin role, which can't be dropped", in.GetRoleName())
  1918. return merr.Status(err), nil
  1919. }
  1920. if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
  1921. errMsg := "not found the role, maybe the role isn't existed or internal system error"
  1922. ctxLog.Warn(errMsg, zap.Error(err))
  1923. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
  1924. }
  1925. if !in.ForceDrop {
  1926. grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, &milvuspb.GrantEntity{
  1927. Role: &milvuspb.RoleEntity{Name: in.RoleName},
  1928. })
  1929. if len(grantEntities) != 0 {
  1930. errMsg := "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges"
  1931. ctxLog.Warn(errMsg, zap.Any("grants", grantEntities), zap.Error(err))
  1932. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
  1933. }
  1934. }
  1935. redoTask := newBaseRedoTask(c.stepExecutor)
  1936. redoTask.AddSyncStep(NewSimpleStep("drop role meta data", func(ctx context.Context) ([]nestedStep, error) {
  1937. err := c.meta.DropRole(util.DefaultTenant, in.RoleName)
  1938. if err != nil {
  1939. ctxLog.Warn("drop role mata data failed", zap.Error(err))
  1940. }
  1941. return nil, err
  1942. }))
  1943. redoTask.AddAsyncStep(NewSimpleStep("drop the privilege list of this role", func(ctx context.Context) ([]nestedStep, error) {
  1944. if !in.ForceDrop {
  1945. return nil, nil
  1946. }
  1947. err := c.meta.DropGrant(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName})
  1948. if err != nil {
  1949. ctxLog.Warn("drop the privilege list failed for the role", zap.Error(err))
  1950. }
  1951. return nil, err
  1952. }))
  1953. redoTask.AddAsyncStep(NewSimpleStep("drop role cache", func(ctx context.Context) ([]nestedStep, error) {
  1954. err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  1955. OpType: int32(typeutil.CacheDropRole),
  1956. OpKey: in.RoleName,
  1957. })
  1958. if err != nil {
  1959. ctxLog.Warn("delete user role cache failed for the role", zap.Error(err))
  1960. }
  1961. return nil, err
  1962. }))
  1963. err := redoTask.Execute(ctx)
  1964. if err != nil {
  1965. errMsg := "fail to execute task when dropping the role"
  1966. ctxLog.Warn(errMsg, zap.Error(err))
  1967. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_DropRoleFailure), nil
  1968. }
  1969. ctxLog.Debug(method+" success", zap.String("role_name", in.RoleName))
  1970. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  1971. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  1972. metrics.RootCoordNumOfRoles.Dec()
  1973. return merr.Success(), nil
  1974. }
  1975. // OperateUserRole operate the relationship between a user and a role
  1976. // - check the node health
  1977. // - check if the role is valid
  1978. // - check if the user is valid
  1979. // - operate the user-role by the meta api
  1980. // - update the policy cache
  1981. func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
  1982. method := "OperateUserRole-" + in.Type.String()
  1983. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  1984. tr := timerecord.NewTimeRecorder(method)
  1985. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  1986. ctxLog.Debug(method)
  1987. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  1988. return merr.Status(err), nil
  1989. }
  1990. if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.RoleName}, false); err != nil {
  1991. errMsg := "not found the role, maybe the role isn't existed or internal system error"
  1992. ctxLog.Warn(errMsg, zap.Error(err))
  1993. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
  1994. }
  1995. if in.Type != milvuspb.OperateUserRoleType_RemoveUserFromRole {
  1996. if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, false); err != nil {
  1997. errMsg := "not found the user, maybe the user isn't existed or internal system error"
  1998. ctxLog.Warn(errMsg, zap.Error(err))
  1999. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
  2000. }
  2001. }
  2002. redoTask := newBaseRedoTask(c.stepExecutor)
  2003. redoTask.AddSyncStep(NewSimpleStep("operate user role meta data", func(ctx context.Context) ([]nestedStep, error) {
  2004. err := c.meta.OperateUserRole(util.DefaultTenant, &milvuspb.UserEntity{Name: in.Username}, &milvuspb.RoleEntity{Name: in.RoleName}, in.Type)
  2005. if err != nil && !common.IsIgnorableError(err) {
  2006. log.Warn("operate user role mata data failed", zap.Error(err))
  2007. return nil, err
  2008. }
  2009. return nil, nil
  2010. }))
  2011. redoTask.AddAsyncStep(NewSimpleStep("operate user role cache", func(ctx context.Context) ([]nestedStep, error) {
  2012. var opType int32
  2013. switch in.Type {
  2014. case milvuspb.OperateUserRoleType_AddUserToRole:
  2015. opType = int32(typeutil.CacheAddUserToRole)
  2016. case milvuspb.OperateUserRoleType_RemoveUserFromRole:
  2017. opType = int32(typeutil.CacheRemoveUserFromRole)
  2018. default:
  2019. errMsg := "invalid operate type for the OperateUserRole api"
  2020. log.Warn(errMsg, zap.Any("in", in))
  2021. return nil, nil
  2022. }
  2023. if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  2024. OpType: opType,
  2025. OpKey: funcutil.EncodeUserRoleCache(in.Username, in.RoleName),
  2026. }); err != nil {
  2027. log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
  2028. return nil, err
  2029. }
  2030. return nil, nil
  2031. }))
  2032. err := redoTask.Execute(ctx)
  2033. if err != nil {
  2034. errMsg := "fail to execute task when operate the user and role"
  2035. log.Warn(errMsg, zap.Error(err))
  2036. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperateUserRoleFailure), nil
  2037. }
  2038. ctxLog.Debug(method + " success")
  2039. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2040. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2041. return merr.Success(), nil
  2042. }
  2043. // SelectRole select role
  2044. // - check the node health
  2045. // - check if the role is valid when this param is provided
  2046. // - select role by the meta api
  2047. func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
  2048. method := "SelectRole"
  2049. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2050. tr := timerecord.NewTimeRecorder(method)
  2051. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2052. ctxLog.Debug(method)
  2053. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2054. return &milvuspb.SelectRoleResponse{Status: merr.Status(err)}, nil
  2055. }
  2056. if in.Role != nil {
  2057. if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil {
  2058. if errors.Is(err, merr.ErrIoKeyNotFound) {
  2059. return &milvuspb.SelectRoleResponse{
  2060. Status: merr.Success(),
  2061. }, nil
  2062. }
  2063. errMsg := "fail to select the role to check the role name"
  2064. ctxLog.Warn(errMsg, zap.Error(err))
  2065. return &milvuspb.SelectRoleResponse{
  2066. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
  2067. }, nil
  2068. }
  2069. }
  2070. roleResults, err := c.meta.SelectRole(util.DefaultTenant, in.Role, in.IncludeUserInfo)
  2071. if err != nil {
  2072. errMsg := "fail to select the role"
  2073. ctxLog.Warn(errMsg, zap.Error(err))
  2074. return &milvuspb.SelectRoleResponse{
  2075. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
  2076. }, nil
  2077. }
  2078. ctxLog.Debug(method + " success")
  2079. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2080. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2081. return &milvuspb.SelectRoleResponse{
  2082. Status: merr.Success(),
  2083. Results: roleResults,
  2084. }, nil
  2085. }
  2086. // SelectUser select user
  2087. // - check the node health
  2088. // - check if the user is valid when this param is provided
  2089. // - select user by the meta api
  2090. func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
  2091. method := "SelectUser"
  2092. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2093. tr := timerecord.NewTimeRecorder(method)
  2094. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2095. ctxLog.Debug(method)
  2096. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2097. return &milvuspb.SelectUserResponse{Status: merr.Status(err)}, nil
  2098. }
  2099. if in.User != nil {
  2100. if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil {
  2101. if errors.Is(err, merr.ErrIoKeyNotFound) {
  2102. return &milvuspb.SelectUserResponse{
  2103. Status: merr.Success(),
  2104. }, nil
  2105. }
  2106. errMsg := "fail to select the user to check the username"
  2107. ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
  2108. return &milvuspb.SelectUserResponse{
  2109. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
  2110. }, nil
  2111. }
  2112. }
  2113. userResults, err := c.meta.SelectUser(util.DefaultTenant, in.User, in.IncludeRoleInfo)
  2114. if err != nil {
  2115. errMsg := "fail to select the user"
  2116. ctxLog.Warn(errMsg, zap.Error(err))
  2117. return &milvuspb.SelectUserResponse{
  2118. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
  2119. }, nil
  2120. }
  2121. ctxLog.Debug(method + " success")
  2122. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2123. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2124. return &milvuspb.SelectUserResponse{
  2125. Status: merr.Success(),
  2126. Results: userResults,
  2127. }, nil
  2128. }
  2129. func (c *Core) isValidRole(entity *milvuspb.RoleEntity) error {
  2130. if entity == nil {
  2131. return errors.New("the role entity is nil")
  2132. }
  2133. if entity.Name == "" {
  2134. return errors.New("the name in the role entity is empty")
  2135. }
  2136. if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}, false); err != nil {
  2137. log.Warn("fail to select the role", zap.String("role_name", entity.Name), zap.Error(err))
  2138. return errors.New("not found the role, maybe the role isn't existed or internal system error")
  2139. }
  2140. return nil
  2141. }
  2142. func (c *Core) isValidObject(entity *milvuspb.ObjectEntity) error {
  2143. if entity == nil {
  2144. return errors.New("the object entity is nil")
  2145. }
  2146. if _, ok := commonpb.ObjectType_value[entity.Name]; !ok {
  2147. return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", entity.Name, lo.Keys(commonpb.ObjectType_value))
  2148. }
  2149. return nil
  2150. }
  2151. func (c *Core) isValidGrantor(entity *milvuspb.GrantorEntity, object string) error {
  2152. if entity == nil {
  2153. return errors.New("the grantor entity is nil")
  2154. }
  2155. if entity.User == nil {
  2156. return errors.New("the user entity in the grantor entity is nil")
  2157. }
  2158. if entity.User.Name == "" {
  2159. return errors.New("the name in the user entity of the grantor entity is empty")
  2160. }
  2161. if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: entity.User.Name}, false); err != nil {
  2162. log.Warn("fail to select the user", zap.String("username", entity.User.Name), zap.Error(err))
  2163. return errors.New("not found the user, maybe the user isn't existed or internal system error")
  2164. }
  2165. if entity.Privilege == nil {
  2166. return errors.New("the privilege entity in the grantor entity is nil")
  2167. }
  2168. if util.IsAnyWord(entity.Privilege.Name) {
  2169. return nil
  2170. }
  2171. if privilegeName := util.PrivilegeNameForMetastore(entity.Privilege.Name); privilegeName == "" {
  2172. return fmt.Errorf("not found the privilege name[%s]", entity.Privilege.Name)
  2173. }
  2174. privileges, ok := util.ObjectPrivileges[object]
  2175. if !ok {
  2176. return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", object, lo.Keys(commonpb.ObjectType_value))
  2177. }
  2178. for _, privilege := range privileges {
  2179. if privilege == entity.Privilege.Name {
  2180. return nil
  2181. }
  2182. }
  2183. return fmt.Errorf("not found the privilege name[%s] in object[%s]", entity.Privilege.Name, object)
  2184. }
  2185. // OperatePrivilege operate the privilege, including grant and revoke
  2186. // - check the node health
  2187. // - check if the operating type is valid
  2188. // - check if the entity is nil
  2189. // - check if the params, including the resource entity, the principal entity, the grantor entity, is valid
  2190. // - operate the privilege by the meta api
  2191. // - update the policy cache
  2192. func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
  2193. method := "OperatePrivilege"
  2194. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2195. tr := timerecord.NewTimeRecorder(method)
  2196. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2197. ctxLog.Debug(method)
  2198. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2199. return merr.Status(err), nil
  2200. }
  2201. if in.Type != milvuspb.OperatePrivilegeType_Grant && in.Type != milvuspb.OperatePrivilegeType_Revoke {
  2202. errMsg := fmt.Sprintf("invalid operate privilege type, current type: %s, valid value: [%s, %s]", in.Type, milvuspb.OperatePrivilegeType_Grant, milvuspb.OperatePrivilegeType_Revoke)
  2203. ctxLog.Warn(errMsg)
  2204. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2205. }
  2206. if in.Entity == nil {
  2207. errMsg := "the grant entity in the request is nil"
  2208. ctxLog.Error(errMsg)
  2209. return merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2210. }
  2211. if err := c.isValidObject(in.Entity.Object); err != nil {
  2212. ctxLog.Warn("", zap.Error(err))
  2213. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2214. }
  2215. if err := c.isValidRole(in.Entity.Role); err != nil {
  2216. ctxLog.Warn("", zap.Error(err))
  2217. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2218. }
  2219. if err := c.isValidGrantor(in.Entity.Grantor, in.Entity.Object.Name); err != nil {
  2220. ctxLog.Error("", zap.Error(err))
  2221. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2222. }
  2223. ctxLog.Debug("before PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
  2224. if !util.IsAnyWord(in.Entity.Grantor.Privilege.Name) {
  2225. in.Entity.Grantor.Privilege.Name = util.PrivilegeNameForMetastore(in.Entity.Grantor.Privilege.Name)
  2226. }
  2227. ctxLog.Debug("after PrivilegeNameForMetastore", zap.String("privilege", in.Entity.Grantor.Privilege.Name))
  2228. if in.Entity.Object.Name == commonpb.ObjectType_Global.String() {
  2229. in.Entity.ObjectName = util.AnyWord
  2230. }
  2231. redoTask := newBaseRedoTask(c.stepExecutor)
  2232. redoTask.AddSyncStep(NewSimpleStep("operate privilege meta data", func(ctx context.Context) ([]nestedStep, error) {
  2233. err := c.meta.OperatePrivilege(util.DefaultTenant, in.Entity, in.Type)
  2234. if err != nil && !common.IsIgnorableError(err) {
  2235. log.Warn("fail to operate the privilege", zap.Any("in", in), zap.Error(err))
  2236. return nil, err
  2237. }
  2238. return nil, nil
  2239. }))
  2240. redoTask.AddAsyncStep(NewSimpleStep("operate privilege cache", func(ctx context.Context) ([]nestedStep, error) {
  2241. var opType int32
  2242. switch in.Type {
  2243. case milvuspb.OperatePrivilegeType_Grant:
  2244. opType = int32(typeutil.CacheGrantPrivilege)
  2245. case milvuspb.OperatePrivilegeType_Revoke:
  2246. opType = int32(typeutil.CacheRevokePrivilege)
  2247. default:
  2248. log.Warn("invalid operate type for the OperatePrivilege api", zap.Any("in", in))
  2249. return nil, nil
  2250. }
  2251. if err := c.proxyClientManager.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  2252. OpType: opType,
  2253. OpKey: funcutil.PolicyForPrivilege(in.Entity.Role.Name, in.Entity.Object.Name, in.Entity.ObjectName, in.Entity.Grantor.Privilege.Name, in.Entity.DbName),
  2254. }); err != nil {
  2255. log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
  2256. return nil, err
  2257. }
  2258. return nil, nil
  2259. }))
  2260. err := redoTask.Execute(ctx)
  2261. if err != nil {
  2262. errMsg := "fail to execute task when operating the privilege"
  2263. log.Warn(errMsg, zap.Error(err))
  2264. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2265. }
  2266. ctxLog.Debug(method + " success")
  2267. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2268. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2269. return merr.Success(), nil
  2270. }
  2271. // SelectGrant select grant
  2272. // - check the node health
  2273. // - check if the principal entity is valid
  2274. // - check if the resource entity which is provided by the user is valid
  2275. // - select grant by the meta api
  2276. func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
  2277. method := "SelectGrant"
  2278. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2279. tr := timerecord.NewTimeRecorder(method)
  2280. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2281. ctxLog.Debug(method)
  2282. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2283. return &milvuspb.SelectGrantResponse{
  2284. Status: merr.Status(err),
  2285. }, nil
  2286. }
  2287. if in.Entity == nil {
  2288. errMsg := "the grant entity in the request is nil"
  2289. ctxLog.Warn(errMsg)
  2290. return &milvuspb.SelectGrantResponse{
  2291. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
  2292. }, nil
  2293. }
  2294. if err := c.isValidRole(in.Entity.Role); err != nil {
  2295. ctxLog.Warn("", zap.Error(err))
  2296. return &milvuspb.SelectGrantResponse{
  2297. Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
  2298. }, nil
  2299. }
  2300. if in.Entity.Object != nil {
  2301. if err := c.isValidObject(in.Entity.Object); err != nil {
  2302. ctxLog.Warn("", zap.Error(err))
  2303. return &milvuspb.SelectGrantResponse{
  2304. Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
  2305. }, nil
  2306. }
  2307. }
  2308. grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, in.Entity)
  2309. if errors.Is(err, merr.ErrIoKeyNotFound) {
  2310. return &milvuspb.SelectGrantResponse{
  2311. Status: merr.Success(),
  2312. Entities: grantEntities,
  2313. }, nil
  2314. }
  2315. if err != nil {
  2316. errMsg := "fail to select the grant"
  2317. ctxLog.Warn(errMsg, zap.Error(err))
  2318. return &milvuspb.SelectGrantResponse{
  2319. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
  2320. }, nil
  2321. }
  2322. ctxLog.Debug(method + " success")
  2323. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2324. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2325. return &milvuspb.SelectGrantResponse{
  2326. Status: merr.Success(),
  2327. Entities: grantEntities,
  2328. }, nil
  2329. }
  2330. func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
  2331. method := "PolicyList"
  2332. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2333. tr := timerecord.NewTimeRecorder(method)
  2334. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2335. ctxLog.Debug(method)
  2336. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2337. return &internalpb.ListPolicyResponse{
  2338. Status: merr.Status(err),
  2339. }, nil
  2340. }
  2341. policies, err := c.meta.ListPolicy(util.DefaultTenant)
  2342. if err != nil {
  2343. errMsg := "fail to list policy"
  2344. ctxLog.Warn(errMsg, zap.Error(err))
  2345. return &internalpb.ListPolicyResponse{
  2346. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
  2347. }, nil
  2348. }
  2349. userRoles, err := c.meta.ListUserRole(util.DefaultTenant)
  2350. if err != nil {
  2351. errMsg := "fail to list user-role"
  2352. ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
  2353. return &internalpb.ListPolicyResponse{
  2354. Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_ListPolicyFailure),
  2355. }, nil
  2356. }
  2357. ctxLog.Debug(method + " success")
  2358. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2359. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2360. return &internalpb.ListPolicyResponse{
  2361. Status: merr.Success(),
  2362. PolicyInfos: policies,
  2363. UserRoles: userRoles,
  2364. }, nil
  2365. }
  2366. func (c *Core) BackupRBAC(ctx context.Context, in *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) {
  2367. method := "BackupRBAC"
  2368. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2369. tr := timerecord.NewTimeRecorder(method)
  2370. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
  2371. ctxLog.Debug(method)
  2372. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2373. return &milvuspb.BackupRBACMetaResponse{
  2374. Status: merr.Status(err),
  2375. }, nil
  2376. }
  2377. rbacMeta, err := c.meta.BackupRBAC(ctx, util.DefaultTenant)
  2378. if err != nil {
  2379. return &milvuspb.BackupRBACMetaResponse{
  2380. Status: merr.Status(err),
  2381. }, nil
  2382. }
  2383. ctxLog.Debug(method + " success")
  2384. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2385. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2386. return &milvuspb.BackupRBACMetaResponse{
  2387. Status: merr.Success(),
  2388. RBACMeta: rbacMeta,
  2389. }, nil
  2390. }
  2391. func (c *Core) RestoreRBAC(ctx context.Context, in *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) {
  2392. method := "RestoreRBAC"
  2393. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
  2394. tr := timerecord.NewTimeRecorder(method)
  2395. ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
  2396. ctxLog.Debug(method)
  2397. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2398. return merr.Status(err), nil
  2399. }
  2400. redoTask := newBaseRedoTask(c.stepExecutor)
  2401. redoTask.AddSyncStep(NewSimpleStep("restore rbac meta data", func(ctx context.Context) ([]nestedStep, error) {
  2402. if err := c.meta.RestoreRBAC(ctx, util.DefaultTenant, in.RBACMeta); err != nil {
  2403. log.Warn("fail to restore rbac meta data", zap.Any("in", in), zap.Error(err))
  2404. return nil, err
  2405. }
  2406. return nil, nil
  2407. }))
  2408. redoTask.AddAsyncStep(NewSimpleStep("operate privilege cache", func(ctx context.Context) ([]nestedStep, error) {
  2409. if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
  2410. OpType: int32(typeutil.CacheRefresh),
  2411. }); err != nil {
  2412. log.Warn("fail to refresh policy info cache", zap.Any("in", in), zap.Error(err))
  2413. return nil, err
  2414. }
  2415. return nil, nil
  2416. }))
  2417. err := redoTask.Execute(ctx)
  2418. if err != nil {
  2419. errMsg := "fail to execute task when restore rbac meta data"
  2420. log.Warn(errMsg, zap.Error(err))
  2421. return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
  2422. }
  2423. ctxLog.Debug(method + " success")
  2424. metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
  2425. metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
  2426. return merr.Success(), nil
  2427. }
  2428. func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
  2429. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2430. return merr.Status(err), nil
  2431. }
  2432. log := log.Ctx(ctx).With(zap.String("oldCollectionName", req.GetOldName()), zap.String("newCollectionName", req.GetNewName()))
  2433. log.Info("received request to rename collection")
  2434. metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.TotalLabel).Inc()
  2435. tr := timerecord.NewTimeRecorder("RenameCollection")
  2436. t := &renameCollectionTask{
  2437. baseTask: newBaseTask(ctx, c),
  2438. Req: req,
  2439. }
  2440. if err := c.scheduler.AddTask(t); err != nil {
  2441. log.Warn("failed to enqueue request to rename collection", zap.Error(err))
  2442. metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
  2443. return merr.Status(err), nil
  2444. }
  2445. if err := t.WaitToFinish(); err != nil {
  2446. log.Warn("failed to rename collection", zap.Uint64("ts", t.GetTs()), zap.Error(err))
  2447. metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
  2448. return merr.Status(err), nil
  2449. }
  2450. metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc()
  2451. metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
  2452. log.Info("done to rename collection", zap.Uint64("ts", t.GetTs()))
  2453. return merr.Success(), nil
  2454. }
  2455. func (c *Core) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) {
  2456. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2457. return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
  2458. }
  2459. log := log.Ctx(ctx).With(zap.String("dbName", req.GetDbName()))
  2460. log.Info("received request to describe database ")
  2461. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.TotalLabel).Inc()
  2462. tr := timerecord.NewTimeRecorder("DescribeDatabase")
  2463. t := &describeDBTask{
  2464. baseTask: newBaseTask(ctx, c),
  2465. Req: req,
  2466. }
  2467. if err := c.scheduler.AddTask(t); err != nil {
  2468. log.Warn("failed to enqueue request to describe database", zap.Error(err))
  2469. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
  2470. return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
  2471. }
  2472. if err := t.WaitToFinish(); err != nil {
  2473. log.Warn("failed to describe database", zap.Uint64("ts", t.GetTs()), zap.Error(err))
  2474. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
  2475. return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
  2476. }
  2477. metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.SuccessLabel).Inc()
  2478. metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeDatabase").Observe(float64(tr.ElapseSpan().Milliseconds()))
  2479. log.Info("done to describe database", zap.Uint64("ts", t.GetTs()))
  2480. return t.Rsp, nil
  2481. }
  2482. func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
  2483. if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
  2484. return &milvuspb.CheckHealthResponse{
  2485. Status: merr.Status(err),
  2486. IsHealthy: false,
  2487. Reasons: []string{fmt.Sprintf("serverID=%d: %v", c.session.ServerID, err)},
  2488. }, nil
  2489. }
  2490. group, ctx := errgroup.WithContext(ctx)
  2491. errs := typeutil.NewConcurrentSet[error]()
  2492. proxyClients := c.proxyClientManager.GetProxyClients()
  2493. proxyClients.Range(func(key int64, value types.ProxyClient) bool {
  2494. nodeID := key
  2495. proxyClient := value
  2496. group.Go(func() error {
  2497. sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
  2498. if err != nil {
  2499. errs.Insert(err)
  2500. return err
  2501. }
  2502. err = merr.AnalyzeState("Proxy", nodeID, sta)
  2503. if err != nil {
  2504. errs.Insert(err)
  2505. }
  2506. return err
  2507. })
  2508. return true
  2509. })
  2510. maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
  2511. if maxDelay > 0 {
  2512. group.Go(func() error {
  2513. err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
  2514. if err != nil {
  2515. errs.Insert(err)
  2516. }
  2517. return err
  2518. })
  2519. }
  2520. err := group.Wait()
  2521. if err != nil {
  2522. return &milvuspb.CheckHealthResponse{
  2523. Status: merr.Success(),
  2524. IsHealthy: false,
  2525. Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
  2526. return err.Error()
  2527. }),
  2528. }, nil
  2529. }
  2530. return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
  2531. }