meta_watcher.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package integration
  17. import (
  18. "context"
  19. "encoding/json"
  20. "fmt"
  21. "path"
  22. "sort"
  23. "time"
  24. clientv3 "go.etcd.io/etcd/client/v3"
  25. "go.uber.org/zap"
  26. "google.golang.org/protobuf/proto"
  27. "github.com/milvus-io/milvus/internal/proto/datapb"
  28. "github.com/milvus-io/milvus/internal/proto/querypb"
  29. "github.com/milvus-io/milvus/internal/util/sessionutil"
  30. "github.com/milvus-io/milvus/pkg/log"
  31. )
  32. // MetaWatcher to observe meta data of milvus cluster
  33. type MetaWatcher interface {
  34. ShowSessions() ([]*sessionutil.SessionRaw, error)
  35. ShowSegments() ([]*datapb.SegmentInfo, error)
  36. ShowReplicas() ([]*querypb.Replica, error)
  37. }
  38. type EtcdMetaWatcher struct {
  39. MetaWatcher
  40. rootPath string
  41. etcdCli *clientv3.Client
  42. }
  43. func (watcher *EtcdMetaWatcher) ShowSessions() ([]*sessionutil.SessionRaw, error) {
  44. metaPath := watcher.rootPath + "/meta/session"
  45. return listSessionsByPrefix(watcher.etcdCli, metaPath)
  46. }
  47. func (watcher *EtcdMetaWatcher) ShowSegments() ([]*datapb.SegmentInfo, error) {
  48. metaBasePath := path.Join(watcher.rootPath, "/meta/datacoord-meta/s/") + "/"
  49. return listSegments(watcher.etcdCli, watcher.rootPath, metaBasePath, func(s *datapb.SegmentInfo) bool {
  50. return true
  51. })
  52. }
  53. func (watcher *EtcdMetaWatcher) ShowReplicas() ([]*querypb.Replica, error) {
  54. metaBasePath := path.Join(watcher.rootPath, "/meta/querycoord-replica/")
  55. return listReplicas(watcher.etcdCli, metaBasePath)
  56. }
  57. //=================== Below largely copied from birdwatcher ========================
  58. // listSessions returns all session
  59. func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*sessionutil.SessionRaw, error) {
  60. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  61. defer cancel()
  62. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
  63. if err != nil {
  64. return nil, err
  65. }
  66. sessions := make([]*sessionutil.SessionRaw, 0, len(resp.Kvs))
  67. for _, kv := range resp.Kvs {
  68. session := &sessionutil.SessionRaw{}
  69. err := json.Unmarshal(kv.Value, session)
  70. if err != nil {
  71. continue
  72. }
  73. sessions = append(sessions, session)
  74. }
  75. return sessions, nil
  76. }
  77. func listSegments(cli *clientv3.Client, rootPath string, prefix string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) {
  78. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  79. defer cancel()
  80. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
  81. if err != nil {
  82. return nil, err
  83. }
  84. segments := make([]*datapb.SegmentInfo, 0, len(resp.Kvs))
  85. for _, kv := range resp.Kvs {
  86. info := &datapb.SegmentInfo{}
  87. err = proto.Unmarshal(kv.Value, info)
  88. if err != nil {
  89. continue
  90. }
  91. if filter == nil || filter(info) {
  92. segments = append(segments, info)
  93. }
  94. }
  95. sort.Slice(segments, func(i, j int) bool {
  96. return segments[i].GetID() < segments[j].GetID()
  97. })
  98. for _, segment := range segments {
  99. segment.Binlogs, segment.Deltalogs, segment.Statslogs, err = getSegmentBinlogs(cli, rootPath, segment)
  100. if err != nil {
  101. return nil, err
  102. }
  103. }
  104. return segments, nil
  105. }
  106. func getSegmentBinlogs(cli *clientv3.Client, rootPath string, segment *datapb.SegmentInfo) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, []*datapb.FieldBinlog, error) {
  107. fn := func(prefix string) ([]*datapb.FieldBinlog, error) {
  108. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  109. defer cancel()
  110. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
  111. if err != nil {
  112. return nil, err
  113. }
  114. fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(resp.Kvs))
  115. for _, kv := range resp.Kvs {
  116. info := &datapb.FieldBinlog{}
  117. err = proto.Unmarshal(kv.Value, info)
  118. if err != nil {
  119. return nil, err
  120. }
  121. fieldBinlogs = append(fieldBinlogs, info)
  122. }
  123. return fieldBinlogs, nil
  124. }
  125. prefix := path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("binlog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
  126. binlogs, err := fn(prefix)
  127. if err != nil {
  128. return nil, nil, nil, err
  129. }
  130. prefix = path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("deltalog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
  131. deltalogs, err := fn(prefix)
  132. if err != nil {
  133. return nil, nil, nil, err
  134. }
  135. prefix = path.Join(rootPath, "/meta/datacoord-meta", fmt.Sprintf("statslog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
  136. statslogs, err := fn(prefix)
  137. if err != nil {
  138. return nil, nil, nil, err
  139. }
  140. return binlogs, deltalogs, statslogs, nil
  141. }
  142. func listReplicas(cli *clientv3.Client, prefix string) ([]*querypb.Replica, error) {
  143. ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
  144. defer cancel()
  145. resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
  146. if err != nil {
  147. return nil, err
  148. }
  149. replicas := make([]*querypb.Replica, 0, len(resp.Kvs))
  150. for _, kv := range resp.Kvs {
  151. replica := &querypb.Replica{}
  152. if err := proto.Unmarshal(kv.Value, replica); err != nil {
  153. log.Warn("failed to unmarshal replica info", zap.Error(err))
  154. continue
  155. }
  156. replicas = append(replicas, replica)
  157. }
  158. return replicas, nil
  159. }
  160. func PrettyReplica(replica *querypb.Replica) string {
  161. res := fmt.Sprintf("ReplicaID: %d CollectionID: %d\n", replica.ID, replica.CollectionID)
  162. res = res + fmt.Sprintf("Nodes:%v\n", replica.Nodes)
  163. return res
  164. }