123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898 |
- // Licensed to the LF AI & Data foundation under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package datacoord
- import (
- "context"
- "fmt"
- "path"
- "sync"
- "time"
- "github.com/cockroachdb/errors"
- "github.com/samber/lo"
- "go.uber.org/atomic"
- "go.uber.org/zap"
- "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
- "github.com/milvus-io/milvus/internal/datacoord/broker"
- "github.com/milvus-io/milvus/internal/metastore/kv/binlog"
- "github.com/milvus-io/milvus/internal/metastore/model"
- "github.com/milvus-io/milvus/internal/proto/datapb"
- "github.com/milvus-io/milvus/internal/storage"
- "github.com/milvus-io/milvus/pkg/common"
- "github.com/milvus-io/milvus/pkg/log"
- "github.com/milvus-io/milvus/pkg/metrics"
- "github.com/milvus-io/milvus/pkg/util/conc"
- "github.com/milvus-io/milvus/pkg/util/funcutil"
- "github.com/milvus-io/milvus/pkg/util/merr"
- "github.com/milvus-io/milvus/pkg/util/metautil"
- "github.com/milvus-io/milvus/pkg/util/paramtable"
- "github.com/milvus-io/milvus/pkg/util/typeutil"
- )
- // GcOption garbage collection options
- type GcOption struct {
- cli storage.ChunkManager // client
- enabled bool // enable switch
- checkInterval time.Duration // each interval
- missingTolerance time.Duration // key missing in meta tolerance time
- dropTolerance time.Duration // dropped segment related key tolerance time
- scanInterval time.Duration // interval for scan residue for interupted log wrttien
- broker broker.Broker
- removeObjectPool *conc.Pool[struct{}]
- }
- // garbageCollector handles garbage files in object storage
- // which could be dropped collection remanent or data node failure traces
- type garbageCollector struct {
- ctx context.Context
- cancel context.CancelFunc
- option GcOption
- meta *meta
- handler Handler
- startOnce sync.Once
- stopOnce sync.Once
- wg sync.WaitGroup
- cmdCh chan gcCmd
- pauseUntil atomic.Time
- }
- type gcCmd struct {
- cmdType datapb.GcCommand
- duration time.Duration
- done chan struct{}
- }
- // newGarbageCollector create garbage collector with meta and option
- func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
- log.Info("GC with option",
- zap.Bool("enabled", opt.enabled),
- zap.Duration("interval", opt.checkInterval),
- zap.Duration("scanInterval", opt.scanInterval),
- zap.Duration("missingTolerance", opt.missingTolerance),
- zap.Duration("dropTolerance", opt.dropTolerance))
- opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
- ctx, cancel := context.WithCancel(context.Background())
- return &garbageCollector{
- ctx: ctx,
- cancel: cancel,
- meta: meta,
- handler: handler,
- option: opt,
- cmdCh: make(chan gcCmd),
- }
- }
- // start a goroutine and perform gc check every `checkInterval`
- func (gc *garbageCollector) start() {
- if gc.option.enabled {
- if gc.option.cli == nil {
- log.Warn("DataCoord gc enabled, but SSO client is not provided")
- return
- }
- gc.startOnce.Do(func() {
- gc.work(gc.ctx)
- })
- }
- }
- func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
- if !gc.option.enabled {
- log.Info("garbage collection not enabled")
- return nil
- }
- done := make(chan struct{})
- select {
- case gc.cmdCh <- gcCmd{
- cmdType: datapb.GcCommand_Pause,
- duration: pauseDuration,
- done: done,
- }:
- <-done
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- func (gc *garbageCollector) Resume(ctx context.Context) error {
- if !gc.option.enabled {
- log.Warn("garbage collection not enabled, cannot resume")
- return merr.WrapErrServiceUnavailable("garbage collection not enabled")
- }
- done := make(chan struct{})
- select {
- case gc.cmdCh <- gcCmd{
- cmdType: datapb.GcCommand_Resume,
- done: done,
- }:
- <-done
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // work contains actual looping check logic
- func (gc *garbageCollector) work(ctx context.Context) {
- // TODO: fast cancel for gc when closing.
- // Run gc tasks in parallel.
- gc.wg.Add(3)
- go func() {
- defer gc.wg.Done()
- gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
- gc.recycleDroppedSegments(ctx)
- gc.recycleChannelCPMeta(ctx)
- gc.recycleUnusedIndexes(ctx)
- gc.recycleUnusedSegIndexes(ctx)
- gc.recycleUnusedAnalyzeFiles(ctx)
- gc.recycleUnusedTextIndexFiles(ctx)
- })
- }()
- go func() {
- defer gc.wg.Done()
- gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) {
- gc.recycleUnusedBinlogFiles(ctx)
- gc.recycleUnusedIndexFiles(ctx)
- })
- }()
- go func() {
- defer gc.wg.Done()
- gc.startControlLoop(ctx)
- }()
- }
- // startControlLoop start a control loop for garbageCollector.
- func (gc *garbageCollector) startControlLoop(_ context.Context) {
- for {
- select {
- case cmd := <-gc.cmdCh:
- switch cmd.cmdType {
- case datapb.GcCommand_Pause:
- pauseUntil := time.Now().Add(cmd.duration)
- if pauseUntil.After(gc.pauseUntil.Load()) {
- log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil))
- gc.pauseUntil.Store(pauseUntil)
- } else {
- log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load()))
- }
- case datapb.GcCommand_Resume:
- // reset to zero value
- gc.pauseUntil.Store(time.Time{})
- log.Info("garbage collection resumed")
- }
- close(cmd.done)
- case <-gc.ctx.Done():
- log.Warn("garbage collector control loop quit")
- return
- }
- }
- }
- // runRecycleTaskWithPauser is a helper function to create a task with pauser
- func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) {
- logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval))
- timer := time.NewTicker(interval)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timer.C:
- if time.Now().Before(gc.pauseUntil.Load()) {
- logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
- continue
- }
- logger.Info("garbage collector recycle task start...")
- start := time.Now()
- task(ctx)
- logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start)))
- }
- }
- }
- // close stop the garbage collector.
- func (gc *garbageCollector) close() {
- gc.stopOnce.Do(func() {
- gc.cancel()
- gc.wg.Wait()
- })
- }
- // recycleUnusedBinlogFiles load meta file info and compares OSS keys
- // if missing found, performs gc cleanup
- func (gc *garbageCollector) recycleUnusedBinlogFiles(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleUnusedBinlogFiles"), zap.Time("startAt", start))
- log.Info("start recycleUnusedBinlogFiles...")
- defer func() { log.Info("recycleUnusedBinlogFiles done", zap.Duration("timeCost", time.Since(start))) }()
- type scanTask struct {
- prefix string
- checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool
- label string
- }
- scanTasks := []scanTask{
- {
- prefix: path.Join(gc.option.cli.RootPath(), common.SegmentInsertLogPath),
- checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
- return segment != nil
- },
- label: metrics.InsertFileLabel,
- },
- {
- prefix: path.Join(gc.option.cli.RootPath(), common.SegmentStatslogPath),
- checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
- logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
- if err != nil {
- log.Warn("garbageCollector find dirty stats log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
- return false
- }
- return segment != nil && segment.IsStatsLogExists(logID)
- },
- label: metrics.StatFileLabel,
- },
- {
- prefix: path.Join(gc.option.cli.RootPath(), common.SegmentDeltaLogPath),
- checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
- logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
- if err != nil {
- log.Warn("garbageCollector find dirty dleta log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
- return false
- }
- return segment != nil && segment.IsDeltaLogExists(logID)
- },
- label: metrics.DeleteFileLabel,
- },
- }
- for _, task := range scanTasks {
- gc.recycleUnusedBinLogWithChecker(ctx, task.prefix, task.label, task.checker)
- }
- metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
- }
- // recycleUnusedBinLogWithChecker scans the prefix and checks the path with checker.
- // GC the file if checker returns false.
- func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context, prefix string, label string, checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool) {
- logger := log.With(zap.String("prefix", prefix))
- logger.Info("garbageCollector recycleUnusedBinlogFiles start", zap.String("prefix", prefix))
- lastFilePath := ""
- total := 0
- valid := 0
- unexpectedFailure := atomic.NewInt32(0)
- removed := atomic.NewInt32(0)
- start := time.Now()
- futures := make([]*conc.Future[struct{}], 0)
- err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(chunkInfo *storage.ChunkObjectInfo) bool {
- total++
- lastFilePath = chunkInfo.FilePath
- // Check file tolerance first to avoid unnecessary operation.
- if time.Since(chunkInfo.ModifyTime) <= gc.option.missingTolerance {
- logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is not expired", zap.String("filePath", chunkInfo.FilePath), zap.Time("modifyTime", chunkInfo.ModifyTime))
- return true
- }
- // Parse segmentID from file path.
- // TODO: Does all files in the same segment have the same segmentID?
- segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), chunkInfo.FilePath)
- if err != nil {
- unexpectedFailure.Inc()
- logger.Warn("garbageCollector recycleUnusedBinlogFiles parse segment id error",
- zap.String("filePath", chunkInfo.FilePath),
- zap.Error(err))
- return true
- }
- segment := gc.meta.GetSegment(segmentID)
- if checker(chunkInfo, segment) {
- valid++
- logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is valid", zap.String("filePath", chunkInfo.FilePath), zap.Int64("segmentID", segmentID))
- return true
- }
- // ignore error since it could be cleaned up next time
- file := chunkInfo.FilePath
- future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
- logger := logger.With(zap.String("file", file))
- logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...")
- if err = gc.option.cli.Remove(ctx, file); err != nil {
- log.Warn("garbageCollector recycleUnusedBinlogFiles remove file failed", zap.Error(err))
- unexpectedFailure.Inc()
- return struct{}{}, err
- }
- log.Info("garbageCollector recycleUnusedBinlogFiles remove file success")
- removed.Inc()
- return struct{}{}, nil
- })
- futures = append(futures, future)
- return true
- })
- // Wait for all remove tasks done.
- if err := conc.BlockOnAll(futures...); err != nil {
- // error is logged, and can be ignored here.
- logger.Warn("some task failure in remove object pool", zap.Error(err))
- }
- cost := time.Since(start)
- logger.Info("garbageCollector recycleUnusedBinlogFiles done",
- zap.Int("total", total),
- zap.Int("valid", valid),
- zap.Int("unexpectedFailure", int(unexpectedFailure.Load())),
- zap.Int("removed", int(removed.Load())),
- zap.String("lastFilePath", lastFilePath),
- zap.Duration("cost", cost),
- zap.Error(err))
- metrics.GarbageCollectorFileScanDuration.
- WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), label).
- Observe(float64(cost.Milliseconds()))
- }
- func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
- childSegment *SegmentInfo,
- indexSet typeutil.UniqueSet,
- cpTimestamp Timestamp,
- ) bool {
- log := log.With(zap.Int64("segmentID", segment.ID))
- if !gc.isExpire(segment.GetDroppedAt()) {
- return false
- }
- isCompacted := childSegment != nil || segment.GetCompacted()
- if isCompacted {
- // For compact A, B -> C, don't GC A or B if C is not indexed,
- // guarantee replacing A, B with C won't downgrade performance
- // If the child is GC'ed first, then childSegment will be nil.
- if childSegment != nil && !indexSet.Contain(childSegment.GetID()) {
- log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60).
- RatedInfo(60, "skipping GC when compact target segment is not indexed",
- zap.Int64("child segment ID", childSegment.GetID()))
- return false
- }
- }
- segInsertChannel := segment.GetInsertChannel()
- // Ignore segments from potentially dropped collection. Check if collection is to be dropped by checking if channel is dropped.
- // We do this because collection meta drop relies on all segment being GCed.
- if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
- segment.GetDmlPosition().GetTimestamp() > cpTimestamp {
- // segment gc shall only happen when channel cp is after segment dml cp.
- log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).
- RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
- zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
- zap.Uint64("channelCpTs", cpTimestamp),
- )
- return false
- }
- return true
- }
- // recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
- func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
- log.Info("start clear dropped segments...")
- defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }()
- all := gc.meta.SelectSegments()
- drops := make(map[int64]*SegmentInfo, 0)
- compactTo := make(map[int64]*SegmentInfo)
- channels := typeutil.NewSet[string]()
- for _, segment := range all {
- cloned := segment.Clone()
- binlog.DecompressBinLogs(cloned.SegmentInfo)
- if cloned.GetState() == commonpb.SegmentState_Dropped {
- drops[cloned.GetID()] = cloned
- channels.Insert(cloned.GetInsertChannel())
- // continue
- // A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC
- }
- for _, from := range cloned.GetCompactionFrom() {
- compactTo[from] = cloned
- }
- }
- droppedCompactTo := make(map[*SegmentInfo]struct{})
- for id := range drops {
- if to, ok := compactTo[id]; ok {
- droppedCompactTo[to] = struct{}{}
- }
- }
- indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
- indexedSet := make(typeutil.UniqueSet)
- for _, segment := range indexedSegments {
- indexedSet.Insert(segment.GetID())
- }
- channelCPs := make(map[string]uint64)
- for channel := range channels {
- pos := gc.meta.GetChannelCheckpoint(channel)
- channelCPs[channel] = pos.GetTimestamp()
- }
- log.Info("start to GC segments", zap.Int("drop_num", len(drops)))
- for segmentID, segment := range drops {
- if ctx.Err() != nil {
- // process canceled, stop.
- return
- }
- log := log.With(zap.Int64("segmentID", segmentID))
- segInsertChannel := segment.GetInsertChannel()
- if !gc.checkDroppedSegmentGC(segment, compactTo[segment.GetID()], indexedSet, channelCPs[segInsertChannel]) {
- continue
- }
- logs := getLogs(segment)
- for key := range getTextLogs(segment) {
- logs[key] = struct{}{}
- }
- log.Info("GC segment start...", zap.Int("insert_logs", len(segment.GetBinlogs())),
- zap.Int("delta_logs", len(segment.GetDeltalogs())),
- zap.Int("stats_logs", len(segment.GetStatslogs())),
- zap.Int("text_logs", len(segment.GetTextStatsLogs())))
- if err := gc.removeObjectFiles(ctx, logs); err != nil {
- log.Warn("GC segment remove logs failed", zap.Error(err))
- continue
- }
- if err := gc.meta.DropSegment(segment.GetID()); err != nil {
- log.Warn("GC segment meta failed to drop segment", zap.Error(err))
- continue
- }
- log.Info("GC segment meta drop segment done")
- }
- }
- func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
- channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
- if err != nil {
- log.Warn("list channel cp fail during GC", zap.Error(err))
- return
- }
- collectionID2GcStatus := make(map[int64]bool)
- skippedCnt := 0
- log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs)))
- for vChannel := range channelCPs {
- collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
- // !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
- if collectionID == -1 {
- skippedCnt++
- log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
- continue
- }
- _, ok := collectionID2GcStatus[collectionID]
- if !ok {
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- has, err := gc.option.broker.HasCollection(ctx, collectionID)
- if err == nil && !has {
- collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
- } else {
- // skip checkpoints GC of this cycle if describe collection fails or the collection state is available.
- log.Debug("skip channel cp GC, the collection state is available",
- zap.Int64("collectionID", collectionID),
- zap.Bool("dropped", has), zap.Error(err))
- collectionID2GcStatus[collectionID] = false
- }
- }
- // Skip to GC if all segments meta of the corresponding collection are not removed
- if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
- skippedCnt++
- continue
- }
- err := gc.meta.DropChannelCheckpoint(vChannel)
- if err != nil {
- // Try to GC in the next gc cycle if drop channel cp meta fail.
- log.Warn("failed to drop channelcp check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
- } else {
- log.Info("GC channel cp", zap.String("vchannel", vChannel))
- }
- }
- log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
- }
- func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
- droptime := time.Unix(0, int64(dropts))
- return time.Since(droptime) > gc.option.dropTolerance
- }
- func getLogs(sinfo *SegmentInfo) map[string]struct{} {
- logs := make(map[string]struct{})
- for _, flog := range sinfo.GetBinlogs() {
- for _, l := range flog.GetBinlogs() {
- logs[l.GetLogPath()] = struct{}{}
- }
- }
- for _, flog := range sinfo.GetStatslogs() {
- for _, l := range flog.GetBinlogs() {
- logs[l.GetLogPath()] = struct{}{}
- }
- }
- for _, flog := range sinfo.GetDeltalogs() {
- for _, l := range flog.GetBinlogs() {
- logs[l.GetLogPath()] = struct{}{}
- }
- }
- return logs
- }
- func getTextLogs(sinfo *SegmentInfo) map[string]struct{} {
- textLogs := make(map[string]struct{})
- for _, flog := range sinfo.GetTextStatsLogs() {
- for _, file := range flog.GetFiles() {
- textLogs[file] = struct{}{}
- }
- }
- return textLogs
- }
- // removeObjectFiles remove file from oss storage, return error if any log failed to remove.
- func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map[string]struct{}) error {
- futures := make([]*conc.Future[struct{}], 0)
- for filePath := range filePaths {
- filePath := filePath
- future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
- err := gc.option.cli.Remove(ctx, filePath)
- // ignore the error Key Not Found
- if err != nil {
- if !errors.Is(err, merr.ErrIoKeyNotFound) {
- return struct{}{}, err
- }
- log.Info("remove log failed, key not found, may be removed at previous GC, ignore the error",
- zap.String("path", filePath),
- zap.Error(err))
- }
- return struct{}{}, nil
- })
- futures = append(futures, future)
- }
- return conc.BlockOnAll(futures...)
- }
- // recycleUnusedIndexes is used to delete those indexes that is deleted by collection.
- func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start))
- log.Info("start recycleUnusedIndexes...")
- defer func() { log.Info("recycleUnusedIndexes done", zap.Duration("timeCost", time.Since(start))) }()
- deletedIndexes := gc.meta.indexMeta.GetDeletedIndexes()
- for _, index := range deletedIndexes {
- if ctx.Err() != nil {
- // process canceled.
- return
- }
- log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID))
- if err := gc.meta.indexMeta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
- log.Warn("remove index on collection fail", zap.Error(err))
- continue
- }
- log.Info("remove index on collection done")
- }
- }
- // recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted.
- func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start))
- log.Info("start recycleUnusedSegIndexes...")
- defer func() { log.Info("recycleUnusedSegIndexes done", zap.Duration("timeCost", time.Since(start))) }()
- segIndexes := gc.meta.indexMeta.GetAllSegIndexes()
- for _, segIdx := range segIndexes {
- if ctx.Err() != nil {
- // process canceled.
- return
- }
- // 1. segment belongs to is deleted.
- // 2. index is deleted.
- if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.indexMeta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
- indexFiles := gc.getAllIndexFilesOfIndex(segIdx)
- log := log.With(zap.Int64("collectionID", segIdx.CollectionID),
- zap.Int64("partitionID", segIdx.PartitionID),
- zap.Int64("segmentID", segIdx.SegmentID),
- zap.Int64("indexID", segIdx.IndexID),
- zap.Int64("buildID", segIdx.BuildID),
- zap.Int64("nodeID", segIdx.NodeID),
- zap.Int("indexFiles", len(indexFiles)))
- log.Info("GC Segment Index file start...")
- // Remove index files first.
- if err := gc.removeObjectFiles(ctx, indexFiles); err != nil {
- log.Warn("fail to remove index files for index", zap.Error(err))
- continue
- }
- // Remove meta from index meta.
- if err := gc.meta.indexMeta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
- log.Warn("delete index meta from etcd failed, wait to retry", zap.Error(err))
- continue
- }
- log.Info("index meta recycle success")
- }
- }
- }
- // recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta.
- func (gc *garbageCollector) recycleUnusedIndexFiles(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleUnusedIndexFiles"), zap.Time("startAt", start))
- log.Info("start recycleUnusedIndexFiles...")
- prefix := path.Join(gc.option.cli.RootPath(), common.SegmentIndexPath) + "/"
- // list dir first
- keyCount := 0
- err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(indexPathInfo *storage.ChunkObjectInfo) bool {
- key := indexPathInfo.FilePath
- keyCount++
- logger := log.With(zap.String("prefix", prefix), zap.String("key", key))
- buildID, err := parseBuildIDFromFilePath(key)
- if err != nil {
- logger.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.Error(err))
- return true
- }
- logger = logger.With(zap.Int64("buildID", buildID))
- logger.Info("garbageCollector will recycle index files")
- canRecycle, segIdx := gc.meta.indexMeta.CheckCleanSegmentIndex(buildID)
- if !canRecycle {
- // Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
- // and delete all index files about the buildID at one time.
- logger.Info("garbageCollector can not recycle index files")
- return true
- }
- if segIdx == nil {
- // buildID no longer exists in meta, remove all index files
- logger.Info("garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files")
- err = gc.option.cli.RemoveWithPrefix(ctx, key)
- if err != nil {
- logger.Warn("garbageCollector recycleUnusedIndexFiles remove index files failed", zap.Error(err))
- return true
- }
- logger.Info("garbageCollector recycleUnusedIndexFiles remove index files success")
- return true
- }
- filesMap := gc.getAllIndexFilesOfIndex(segIdx)
- logger.Info("recycle index files", zap.Int("meta files num", len(filesMap)))
- deletedFilesNum := atomic.NewInt32(0)
- fileNum := 0
- futures := make([]*conc.Future[struct{}], 0)
- err = gc.option.cli.WalkWithPrefix(ctx, key, true, func(indexFile *storage.ChunkObjectInfo) bool {
- fileNum++
- file := indexFile.FilePath
- if _, ok := filesMap[file]; !ok {
- future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
- logger := logger.With(zap.String("file", file))
- logger.Info("garbageCollector recycleUnusedIndexFiles remove file...")
- if err := gc.option.cli.Remove(ctx, file); err != nil {
- logger.Warn("garbageCollector recycleUnusedIndexFiles remove file failed", zap.Error(err))
- return struct{}{}, err
- }
- deletedFilesNum.Inc()
- logger.Info("garbageCollector recycleUnusedIndexFiles remove file success")
- return struct{}{}, nil
- })
- futures = append(futures, future)
- }
- return true
- })
- // Wait for all remove tasks done.
- if err := conc.BlockOnAll(futures...); err != nil {
- // error is logged, and can be ignored here.
- logger.Warn("some task failure in remove object pool", zap.Error(err))
- }
- logger = logger.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
- if err != nil {
- logger.Warn("index files recycle failed when walk with prefix", zap.Error(err))
- return true
- }
- logger.Info("index files recycle done")
- return true
- })
- log = log.With(zap.Duration("timeCost", time.Since(start)), zap.Int("keyCount", keyCount), zap.Error(err))
- if err != nil {
- log.Warn("garbageCollector recycleUnusedIndexFiles failed", zap.Error(err))
- return
- }
- log.Info("recycleUnusedIndexFiles done")
- }
- // getAllIndexFilesOfIndex returns the all index files of index.
- func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentIndex) map[string]struct{} {
- filesMap := make(map[string]struct{})
- for _, fileID := range segmentIndex.IndexFileKeys {
- filepath := metautil.BuildSegmentIndexFilePath(gc.option.cli.RootPath(), segmentIndex.BuildID, segmentIndex.IndexVersion,
- segmentIndex.PartitionID, segmentIndex.SegmentID, fileID)
- filesMap[filepath] = struct{}{}
- }
- return filesMap
- }
- // recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta.
- func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
- log.Info("start recycleUnusedAnalyzeFiles")
- startTs := time.Now()
- prefix := path.Join(gc.option.cli.RootPath(), common.AnalyzeStatsPath) + "/"
- // list dir first
- keys := make([]string, 0)
- err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(chunkInfo *storage.ChunkObjectInfo) bool {
- keys = append(keys, chunkInfo.FilePath)
- return true
- })
- if err != nil {
- log.Warn("garbageCollector recycleUnusedAnalyzeFiles list keys from chunk manager failed", zap.Error(err))
- return
- }
- log.Info("recycleUnusedAnalyzeFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("task ids", len(keys)))
- for _, key := range keys {
- if ctx.Err() != nil {
- // process canceled
- return
- }
- log.Debug("analyze keys", zap.String("key", key))
- taskID, err := parseBuildIDFromFilePath(key)
- if err != nil {
- log.Warn("garbageCollector recycleUnusedAnalyzeFiles parseAnalyzeResult failed", zap.String("key", key), zap.Error(err))
- continue
- }
- log.Info("garbageCollector will recycle analyze stats files", zap.Int64("taskID", taskID))
- canRecycle, task := gc.meta.analyzeMeta.CheckCleanAnalyzeTask(taskID)
- if !canRecycle {
- // Even if the analysis task is marked as deleted, the analysis stats file will not be recycled, wait for the next gc,
- // and delete all index files about the taskID at one time.
- log.Info("garbageCollector no need to recycle analyze stats files", zap.Int64("taskID", taskID))
- continue
- }
- if task == nil {
- // taskID no longer exists in meta, remove all analysis files
- log.Info("garbageCollector recycleUnusedAnalyzeFiles find meta has not exist, remove index files",
- zap.Int64("taskID", taskID))
- err = gc.option.cli.RemoveWithPrefix(ctx, key)
- if err != nil {
- log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files failed",
- zap.Int64("taskID", taskID), zap.String("prefix", key), zap.Error(err))
- continue
- }
- log.Info("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files success",
- zap.Int64("taskID", taskID), zap.String("prefix", key))
- continue
- }
- log.Info("remove analyze stats files which version is less than current task",
- zap.Int64("taskID", taskID), zap.Int64("current version", task.Version))
- var i int64
- for i = 0; i < task.Version; i++ {
- if ctx.Err() != nil {
- // process canceled.
- return
- }
- removePrefix := prefix + fmt.Sprintf("%d/", task.Version)
- if err := gc.option.cli.RemoveWithPrefix(ctx, removePrefix); err != nil {
- log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove files with prefix failed",
- zap.Int64("taskID", taskID), zap.String("removePrefix", removePrefix))
- continue
- }
- }
- log.Info("analyze stats files recycle success", zap.Int64("taskID", taskID))
- }
- }
- // recycleUnusedTextIndexFiles load meta file info and compares OSS keys
- // if missing found, performs gc cleanup
- func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
- start := time.Now()
- log := log.With(zap.String("gcName", "recycleUnusedTextIndexFiles"), zap.Time("startAt", start))
- log.Info("start recycleUnusedTextIndexFiles...")
- defer func() { log.Info("recycleUnusedTextIndexFiles done", zap.Duration("timeCost", time.Since(start))) }()
- hasTextIndexSegments := gc.meta.SelectSegments(SegmentFilterFunc(func(info *SegmentInfo) bool {
- return len(info.GetTextStatsLogs()) != 0
- }))
- fileNum := 0
- deletedFilesNum := atomic.NewInt32(0)
- for _, seg := range hasTextIndexSegments {
- for _, fieldStats := range seg.GetTextStatsLogs() {
- log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
- // clear low version task
- for i := int64(1); i < fieldStats.GetVersion(); i++ {
- prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
- seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i)
- futures := make([]*conc.Future[struct{}], 0)
- err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool {
- file := files.FilePath
- future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
- log := log.With(zap.String("file", file))
- log.Info("garbageCollector recycleUnusedTextIndexFiles remove file...")
- if err := gc.option.cli.Remove(ctx, file); err != nil {
- log.Warn("garbageCollector recycleUnusedTextIndexFiles remove file failed", zap.Error(err))
- return struct{}{}, err
- }
- deletedFilesNum.Inc()
- log.Info("garbageCollector recycleUnusedTextIndexFiles remove file success")
- return struct{}{}, nil
- })
- futures = append(futures, future)
- return true
- })
- // Wait for all remove tasks done.
- if err := conc.BlockOnAll(futures...); err != nil {
- // error is logged, and can be ignored here.
- log.Warn("some task failure in remove object pool", zap.Error(err))
- }
- log = log.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
- if err != nil {
- log.Warn("text index files recycle failed when walk with prefix", zap.Error(err))
- return
- }
- }
- }
- }
- log.Info("text index files recycle done")
- metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
- }
|