garbage_collector.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package datacoord
  17. import (
  18. "context"
  19. "fmt"
  20. "path"
  21. "sync"
  22. "time"
  23. "github.com/cockroachdb/errors"
  24. "github.com/samber/lo"
  25. "go.uber.org/atomic"
  26. "go.uber.org/zap"
  27. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  28. "github.com/milvus-io/milvus/internal/datacoord/broker"
  29. "github.com/milvus-io/milvus/internal/metastore/kv/binlog"
  30. "github.com/milvus-io/milvus/internal/metastore/model"
  31. "github.com/milvus-io/milvus/internal/proto/datapb"
  32. "github.com/milvus-io/milvus/internal/storage"
  33. "github.com/milvus-io/milvus/pkg/common"
  34. "github.com/milvus-io/milvus/pkg/log"
  35. "github.com/milvus-io/milvus/pkg/metrics"
  36. "github.com/milvus-io/milvus/pkg/util/conc"
  37. "github.com/milvus-io/milvus/pkg/util/funcutil"
  38. "github.com/milvus-io/milvus/pkg/util/merr"
  39. "github.com/milvus-io/milvus/pkg/util/metautil"
  40. "github.com/milvus-io/milvus/pkg/util/paramtable"
  41. "github.com/milvus-io/milvus/pkg/util/typeutil"
  42. )
  43. // GcOption garbage collection options
  44. type GcOption struct {
  45. cli storage.ChunkManager // client
  46. enabled bool // enable switch
  47. checkInterval time.Duration // each interval
  48. missingTolerance time.Duration // key missing in meta tolerance time
  49. dropTolerance time.Duration // dropped segment related key tolerance time
  50. scanInterval time.Duration // interval for scan residue for interupted log wrttien
  51. broker broker.Broker
  52. removeObjectPool *conc.Pool[struct{}]
  53. }
  54. // garbageCollector handles garbage files in object storage
  55. // which could be dropped collection remanent or data node failure traces
  56. type garbageCollector struct {
  57. ctx context.Context
  58. cancel context.CancelFunc
  59. option GcOption
  60. meta *meta
  61. handler Handler
  62. startOnce sync.Once
  63. stopOnce sync.Once
  64. wg sync.WaitGroup
  65. cmdCh chan gcCmd
  66. pauseUntil atomic.Time
  67. }
  68. type gcCmd struct {
  69. cmdType datapb.GcCommand
  70. duration time.Duration
  71. done chan struct{}
  72. }
  73. // newGarbageCollector create garbage collector with meta and option
  74. func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
  75. log.Info("GC with option",
  76. zap.Bool("enabled", opt.enabled),
  77. zap.Duration("interval", opt.checkInterval),
  78. zap.Duration("scanInterval", opt.scanInterval),
  79. zap.Duration("missingTolerance", opt.missingTolerance),
  80. zap.Duration("dropTolerance", opt.dropTolerance))
  81. opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
  82. ctx, cancel := context.WithCancel(context.Background())
  83. return &garbageCollector{
  84. ctx: ctx,
  85. cancel: cancel,
  86. meta: meta,
  87. handler: handler,
  88. option: opt,
  89. cmdCh: make(chan gcCmd),
  90. }
  91. }
  92. // start a goroutine and perform gc check every `checkInterval`
  93. func (gc *garbageCollector) start() {
  94. if gc.option.enabled {
  95. if gc.option.cli == nil {
  96. log.Warn("DataCoord gc enabled, but SSO client is not provided")
  97. return
  98. }
  99. gc.startOnce.Do(func() {
  100. gc.work(gc.ctx)
  101. })
  102. }
  103. }
  104. func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
  105. if !gc.option.enabled {
  106. log.Info("garbage collection not enabled")
  107. return nil
  108. }
  109. done := make(chan struct{})
  110. select {
  111. case gc.cmdCh <- gcCmd{
  112. cmdType: datapb.GcCommand_Pause,
  113. duration: pauseDuration,
  114. done: done,
  115. }:
  116. <-done
  117. return nil
  118. case <-ctx.Done():
  119. return ctx.Err()
  120. }
  121. }
  122. func (gc *garbageCollector) Resume(ctx context.Context) error {
  123. if !gc.option.enabled {
  124. log.Warn("garbage collection not enabled, cannot resume")
  125. return merr.WrapErrServiceUnavailable("garbage collection not enabled")
  126. }
  127. done := make(chan struct{})
  128. select {
  129. case gc.cmdCh <- gcCmd{
  130. cmdType: datapb.GcCommand_Resume,
  131. done: done,
  132. }:
  133. <-done
  134. return nil
  135. case <-ctx.Done():
  136. return ctx.Err()
  137. }
  138. }
  139. // work contains actual looping check logic
  140. func (gc *garbageCollector) work(ctx context.Context) {
  141. // TODO: fast cancel for gc when closing.
  142. // Run gc tasks in parallel.
  143. gc.wg.Add(3)
  144. go func() {
  145. defer gc.wg.Done()
  146. gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
  147. gc.recycleDroppedSegments(ctx)
  148. gc.recycleChannelCPMeta(ctx)
  149. gc.recycleUnusedIndexes(ctx)
  150. gc.recycleUnusedSegIndexes(ctx)
  151. gc.recycleUnusedAnalyzeFiles(ctx)
  152. gc.recycleUnusedTextIndexFiles(ctx)
  153. })
  154. }()
  155. go func() {
  156. defer gc.wg.Done()
  157. gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) {
  158. gc.recycleUnusedBinlogFiles(ctx)
  159. gc.recycleUnusedIndexFiles(ctx)
  160. })
  161. }()
  162. go func() {
  163. defer gc.wg.Done()
  164. gc.startControlLoop(ctx)
  165. }()
  166. }
  167. // startControlLoop start a control loop for garbageCollector.
  168. func (gc *garbageCollector) startControlLoop(_ context.Context) {
  169. for {
  170. select {
  171. case cmd := <-gc.cmdCh:
  172. switch cmd.cmdType {
  173. case datapb.GcCommand_Pause:
  174. pauseUntil := time.Now().Add(cmd.duration)
  175. if pauseUntil.After(gc.pauseUntil.Load()) {
  176. log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil))
  177. gc.pauseUntil.Store(pauseUntil)
  178. } else {
  179. log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load()))
  180. }
  181. case datapb.GcCommand_Resume:
  182. // reset to zero value
  183. gc.pauseUntil.Store(time.Time{})
  184. log.Info("garbage collection resumed")
  185. }
  186. close(cmd.done)
  187. case <-gc.ctx.Done():
  188. log.Warn("garbage collector control loop quit")
  189. return
  190. }
  191. }
  192. }
  193. // runRecycleTaskWithPauser is a helper function to create a task with pauser
  194. func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) {
  195. logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval))
  196. timer := time.NewTicker(interval)
  197. defer timer.Stop()
  198. for {
  199. select {
  200. case <-ctx.Done():
  201. return
  202. case <-timer.C:
  203. if time.Now().Before(gc.pauseUntil.Load()) {
  204. logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
  205. continue
  206. }
  207. logger.Info("garbage collector recycle task start...")
  208. start := time.Now()
  209. task(ctx)
  210. logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start)))
  211. }
  212. }
  213. }
  214. // close stop the garbage collector.
  215. func (gc *garbageCollector) close() {
  216. gc.stopOnce.Do(func() {
  217. gc.cancel()
  218. gc.wg.Wait()
  219. })
  220. }
  221. // recycleUnusedBinlogFiles load meta file info and compares OSS keys
  222. // if missing found, performs gc cleanup
  223. func (gc *garbageCollector) recycleUnusedBinlogFiles(ctx context.Context) {
  224. start := time.Now()
  225. log := log.With(zap.String("gcName", "recycleUnusedBinlogFiles"), zap.Time("startAt", start))
  226. log.Info("start recycleUnusedBinlogFiles...")
  227. defer func() { log.Info("recycleUnusedBinlogFiles done", zap.Duration("timeCost", time.Since(start))) }()
  228. type scanTask struct {
  229. prefix string
  230. checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool
  231. label string
  232. }
  233. scanTasks := []scanTask{
  234. {
  235. prefix: path.Join(gc.option.cli.RootPath(), common.SegmentInsertLogPath),
  236. checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
  237. return segment != nil
  238. },
  239. label: metrics.InsertFileLabel,
  240. },
  241. {
  242. prefix: path.Join(gc.option.cli.RootPath(), common.SegmentStatslogPath),
  243. checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
  244. logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
  245. if err != nil {
  246. log.Warn("garbageCollector find dirty stats log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
  247. return false
  248. }
  249. return segment != nil && segment.IsStatsLogExists(logID)
  250. },
  251. label: metrics.StatFileLabel,
  252. },
  253. {
  254. prefix: path.Join(gc.option.cli.RootPath(), common.SegmentDeltaLogPath),
  255. checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
  256. logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
  257. if err != nil {
  258. log.Warn("garbageCollector find dirty dleta log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
  259. return false
  260. }
  261. return segment != nil && segment.IsDeltaLogExists(logID)
  262. },
  263. label: metrics.DeleteFileLabel,
  264. },
  265. }
  266. for _, task := range scanTasks {
  267. gc.recycleUnusedBinLogWithChecker(ctx, task.prefix, task.label, task.checker)
  268. }
  269. metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
  270. }
  271. // recycleUnusedBinLogWithChecker scans the prefix and checks the path with checker.
  272. // GC the file if checker returns false.
  273. func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context, prefix string, label string, checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool) {
  274. logger := log.With(zap.String("prefix", prefix))
  275. logger.Info("garbageCollector recycleUnusedBinlogFiles start", zap.String("prefix", prefix))
  276. lastFilePath := ""
  277. total := 0
  278. valid := 0
  279. unexpectedFailure := atomic.NewInt32(0)
  280. removed := atomic.NewInt32(0)
  281. start := time.Now()
  282. futures := make([]*conc.Future[struct{}], 0)
  283. err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(chunkInfo *storage.ChunkObjectInfo) bool {
  284. total++
  285. lastFilePath = chunkInfo.FilePath
  286. // Check file tolerance first to avoid unnecessary operation.
  287. if time.Since(chunkInfo.ModifyTime) <= gc.option.missingTolerance {
  288. logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is not expired", zap.String("filePath", chunkInfo.FilePath), zap.Time("modifyTime", chunkInfo.ModifyTime))
  289. return true
  290. }
  291. // Parse segmentID from file path.
  292. // TODO: Does all files in the same segment have the same segmentID?
  293. segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), chunkInfo.FilePath)
  294. if err != nil {
  295. unexpectedFailure.Inc()
  296. logger.Warn("garbageCollector recycleUnusedBinlogFiles parse segment id error",
  297. zap.String("filePath", chunkInfo.FilePath),
  298. zap.Error(err))
  299. return true
  300. }
  301. segment := gc.meta.GetSegment(segmentID)
  302. if checker(chunkInfo, segment) {
  303. valid++
  304. logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is valid", zap.String("filePath", chunkInfo.FilePath), zap.Int64("segmentID", segmentID))
  305. return true
  306. }
  307. // ignore error since it could be cleaned up next time
  308. file := chunkInfo.FilePath
  309. future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
  310. logger := logger.With(zap.String("file", file))
  311. logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...")
  312. if err = gc.option.cli.Remove(ctx, file); err != nil {
  313. log.Warn("garbageCollector recycleUnusedBinlogFiles remove file failed", zap.Error(err))
  314. unexpectedFailure.Inc()
  315. return struct{}{}, err
  316. }
  317. log.Info("garbageCollector recycleUnusedBinlogFiles remove file success")
  318. removed.Inc()
  319. return struct{}{}, nil
  320. })
  321. futures = append(futures, future)
  322. return true
  323. })
  324. // Wait for all remove tasks done.
  325. if err := conc.BlockOnAll(futures...); err != nil {
  326. // error is logged, and can be ignored here.
  327. logger.Warn("some task failure in remove object pool", zap.Error(err))
  328. }
  329. cost := time.Since(start)
  330. logger.Info("garbageCollector recycleUnusedBinlogFiles done",
  331. zap.Int("total", total),
  332. zap.Int("valid", valid),
  333. zap.Int("unexpectedFailure", int(unexpectedFailure.Load())),
  334. zap.Int("removed", int(removed.Load())),
  335. zap.String("lastFilePath", lastFilePath),
  336. zap.Duration("cost", cost),
  337. zap.Error(err))
  338. metrics.GarbageCollectorFileScanDuration.
  339. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), label).
  340. Observe(float64(cost.Milliseconds()))
  341. }
  342. func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
  343. childSegment *SegmentInfo,
  344. indexSet typeutil.UniqueSet,
  345. cpTimestamp Timestamp,
  346. ) bool {
  347. log := log.With(zap.Int64("segmentID", segment.ID))
  348. if !gc.isExpire(segment.GetDroppedAt()) {
  349. return false
  350. }
  351. isCompacted := childSegment != nil || segment.GetCompacted()
  352. if isCompacted {
  353. // For compact A, B -> C, don't GC A or B if C is not indexed,
  354. // guarantee replacing A, B with C won't downgrade performance
  355. // If the child is GC'ed first, then childSegment will be nil.
  356. if childSegment != nil && !indexSet.Contain(childSegment.GetID()) {
  357. log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60).
  358. RatedInfo(60, "skipping GC when compact target segment is not indexed",
  359. zap.Int64("child segment ID", childSegment.GetID()))
  360. return false
  361. }
  362. }
  363. segInsertChannel := segment.GetInsertChannel()
  364. // Ignore segments from potentially dropped collection. Check if collection is to be dropped by checking if channel is dropped.
  365. // We do this because collection meta drop relies on all segment being GCed.
  366. if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
  367. segment.GetDmlPosition().GetTimestamp() > cpTimestamp {
  368. // segment gc shall only happen when channel cp is after segment dml cp.
  369. log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).
  370. RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
  371. zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
  372. zap.Uint64("channelCpTs", cpTimestamp),
  373. )
  374. return false
  375. }
  376. return true
  377. }
  378. // recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
  379. func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
  380. start := time.Now()
  381. log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
  382. log.Info("start clear dropped segments...")
  383. defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }()
  384. all := gc.meta.SelectSegments()
  385. drops := make(map[int64]*SegmentInfo, 0)
  386. compactTo := make(map[int64]*SegmentInfo)
  387. channels := typeutil.NewSet[string]()
  388. for _, segment := range all {
  389. cloned := segment.Clone()
  390. binlog.DecompressBinLogs(cloned.SegmentInfo)
  391. if cloned.GetState() == commonpb.SegmentState_Dropped {
  392. drops[cloned.GetID()] = cloned
  393. channels.Insert(cloned.GetInsertChannel())
  394. // continue
  395. // A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC
  396. }
  397. for _, from := range cloned.GetCompactionFrom() {
  398. compactTo[from] = cloned
  399. }
  400. }
  401. droppedCompactTo := make(map[*SegmentInfo]struct{})
  402. for id := range drops {
  403. if to, ok := compactTo[id]; ok {
  404. droppedCompactTo[to] = struct{}{}
  405. }
  406. }
  407. indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
  408. indexedSet := make(typeutil.UniqueSet)
  409. for _, segment := range indexedSegments {
  410. indexedSet.Insert(segment.GetID())
  411. }
  412. channelCPs := make(map[string]uint64)
  413. for channel := range channels {
  414. pos := gc.meta.GetChannelCheckpoint(channel)
  415. channelCPs[channel] = pos.GetTimestamp()
  416. }
  417. log.Info("start to GC segments", zap.Int("drop_num", len(drops)))
  418. for segmentID, segment := range drops {
  419. if ctx.Err() != nil {
  420. // process canceled, stop.
  421. return
  422. }
  423. log := log.With(zap.Int64("segmentID", segmentID))
  424. segInsertChannel := segment.GetInsertChannel()
  425. if !gc.checkDroppedSegmentGC(segment, compactTo[segment.GetID()], indexedSet, channelCPs[segInsertChannel]) {
  426. continue
  427. }
  428. logs := getLogs(segment)
  429. for key := range getTextLogs(segment) {
  430. logs[key] = struct{}{}
  431. }
  432. log.Info("GC segment start...", zap.Int("insert_logs", len(segment.GetBinlogs())),
  433. zap.Int("delta_logs", len(segment.GetDeltalogs())),
  434. zap.Int("stats_logs", len(segment.GetStatslogs())),
  435. zap.Int("text_logs", len(segment.GetTextStatsLogs())))
  436. if err := gc.removeObjectFiles(ctx, logs); err != nil {
  437. log.Warn("GC segment remove logs failed", zap.Error(err))
  438. continue
  439. }
  440. if err := gc.meta.DropSegment(segment.GetID()); err != nil {
  441. log.Warn("GC segment meta failed to drop segment", zap.Error(err))
  442. continue
  443. }
  444. log.Info("GC segment meta drop segment done")
  445. }
  446. }
  447. func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
  448. channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
  449. if err != nil {
  450. log.Warn("list channel cp fail during GC", zap.Error(err))
  451. return
  452. }
  453. collectionID2GcStatus := make(map[int64]bool)
  454. skippedCnt := 0
  455. log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs)))
  456. for vChannel := range channelCPs {
  457. collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
  458. // !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
  459. if collectionID == -1 {
  460. skippedCnt++
  461. log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
  462. continue
  463. }
  464. _, ok := collectionID2GcStatus[collectionID]
  465. if !ok {
  466. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  467. defer cancel()
  468. has, err := gc.option.broker.HasCollection(ctx, collectionID)
  469. if err == nil && !has {
  470. collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
  471. } else {
  472. // skip checkpoints GC of this cycle if describe collection fails or the collection state is available.
  473. log.Debug("skip channel cp GC, the collection state is available",
  474. zap.Int64("collectionID", collectionID),
  475. zap.Bool("dropped", has), zap.Error(err))
  476. collectionID2GcStatus[collectionID] = false
  477. }
  478. }
  479. // Skip to GC if all segments meta of the corresponding collection are not removed
  480. if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
  481. skippedCnt++
  482. continue
  483. }
  484. err := gc.meta.DropChannelCheckpoint(vChannel)
  485. if err != nil {
  486. // Try to GC in the next gc cycle if drop channel cp meta fail.
  487. log.Warn("failed to drop channelcp check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
  488. } else {
  489. log.Info("GC channel cp", zap.String("vchannel", vChannel))
  490. }
  491. }
  492. log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
  493. }
  494. func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
  495. droptime := time.Unix(0, int64(dropts))
  496. return time.Since(droptime) > gc.option.dropTolerance
  497. }
  498. func getLogs(sinfo *SegmentInfo) map[string]struct{} {
  499. logs := make(map[string]struct{})
  500. for _, flog := range sinfo.GetBinlogs() {
  501. for _, l := range flog.GetBinlogs() {
  502. logs[l.GetLogPath()] = struct{}{}
  503. }
  504. }
  505. for _, flog := range sinfo.GetStatslogs() {
  506. for _, l := range flog.GetBinlogs() {
  507. logs[l.GetLogPath()] = struct{}{}
  508. }
  509. }
  510. for _, flog := range sinfo.GetDeltalogs() {
  511. for _, l := range flog.GetBinlogs() {
  512. logs[l.GetLogPath()] = struct{}{}
  513. }
  514. }
  515. return logs
  516. }
  517. func getTextLogs(sinfo *SegmentInfo) map[string]struct{} {
  518. textLogs := make(map[string]struct{})
  519. for _, flog := range sinfo.GetTextStatsLogs() {
  520. for _, file := range flog.GetFiles() {
  521. textLogs[file] = struct{}{}
  522. }
  523. }
  524. return textLogs
  525. }
  526. // removeObjectFiles remove file from oss storage, return error if any log failed to remove.
  527. func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map[string]struct{}) error {
  528. futures := make([]*conc.Future[struct{}], 0)
  529. for filePath := range filePaths {
  530. filePath := filePath
  531. future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
  532. err := gc.option.cli.Remove(ctx, filePath)
  533. // ignore the error Key Not Found
  534. if err != nil {
  535. if !errors.Is(err, merr.ErrIoKeyNotFound) {
  536. return struct{}{}, err
  537. }
  538. log.Info("remove log failed, key not found, may be removed at previous GC, ignore the error",
  539. zap.String("path", filePath),
  540. zap.Error(err))
  541. }
  542. return struct{}{}, nil
  543. })
  544. futures = append(futures, future)
  545. }
  546. return conc.BlockOnAll(futures...)
  547. }
  548. // recycleUnusedIndexes is used to delete those indexes that is deleted by collection.
  549. func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
  550. start := time.Now()
  551. log := log.With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start))
  552. log.Info("start recycleUnusedIndexes...")
  553. defer func() { log.Info("recycleUnusedIndexes done", zap.Duration("timeCost", time.Since(start))) }()
  554. deletedIndexes := gc.meta.indexMeta.GetDeletedIndexes()
  555. for _, index := range deletedIndexes {
  556. if ctx.Err() != nil {
  557. // process canceled.
  558. return
  559. }
  560. log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID))
  561. if err := gc.meta.indexMeta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
  562. log.Warn("remove index on collection fail", zap.Error(err))
  563. continue
  564. }
  565. log.Info("remove index on collection done")
  566. }
  567. }
  568. // recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted.
  569. func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
  570. start := time.Now()
  571. log := log.With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start))
  572. log.Info("start recycleUnusedSegIndexes...")
  573. defer func() { log.Info("recycleUnusedSegIndexes done", zap.Duration("timeCost", time.Since(start))) }()
  574. segIndexes := gc.meta.indexMeta.GetAllSegIndexes()
  575. for _, segIdx := range segIndexes {
  576. if ctx.Err() != nil {
  577. // process canceled.
  578. return
  579. }
  580. // 1. segment belongs to is deleted.
  581. // 2. index is deleted.
  582. if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.indexMeta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
  583. indexFiles := gc.getAllIndexFilesOfIndex(segIdx)
  584. log := log.With(zap.Int64("collectionID", segIdx.CollectionID),
  585. zap.Int64("partitionID", segIdx.PartitionID),
  586. zap.Int64("segmentID", segIdx.SegmentID),
  587. zap.Int64("indexID", segIdx.IndexID),
  588. zap.Int64("buildID", segIdx.BuildID),
  589. zap.Int64("nodeID", segIdx.NodeID),
  590. zap.Int("indexFiles", len(indexFiles)))
  591. log.Info("GC Segment Index file start...")
  592. // Remove index files first.
  593. if err := gc.removeObjectFiles(ctx, indexFiles); err != nil {
  594. log.Warn("fail to remove index files for index", zap.Error(err))
  595. continue
  596. }
  597. // Remove meta from index meta.
  598. if err := gc.meta.indexMeta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
  599. log.Warn("delete index meta from etcd failed, wait to retry", zap.Error(err))
  600. continue
  601. }
  602. log.Info("index meta recycle success")
  603. }
  604. }
  605. }
  606. // recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta.
  607. func (gc *garbageCollector) recycleUnusedIndexFiles(ctx context.Context) {
  608. start := time.Now()
  609. log := log.With(zap.String("gcName", "recycleUnusedIndexFiles"), zap.Time("startAt", start))
  610. log.Info("start recycleUnusedIndexFiles...")
  611. prefix := path.Join(gc.option.cli.RootPath(), common.SegmentIndexPath) + "/"
  612. // list dir first
  613. keyCount := 0
  614. err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(indexPathInfo *storage.ChunkObjectInfo) bool {
  615. key := indexPathInfo.FilePath
  616. keyCount++
  617. logger := log.With(zap.String("prefix", prefix), zap.String("key", key))
  618. buildID, err := parseBuildIDFromFilePath(key)
  619. if err != nil {
  620. logger.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.Error(err))
  621. return true
  622. }
  623. logger = logger.With(zap.Int64("buildID", buildID))
  624. logger.Info("garbageCollector will recycle index files")
  625. canRecycle, segIdx := gc.meta.indexMeta.CheckCleanSegmentIndex(buildID)
  626. if !canRecycle {
  627. // Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
  628. // and delete all index files about the buildID at one time.
  629. logger.Info("garbageCollector can not recycle index files")
  630. return true
  631. }
  632. if segIdx == nil {
  633. // buildID no longer exists in meta, remove all index files
  634. logger.Info("garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files")
  635. err = gc.option.cli.RemoveWithPrefix(ctx, key)
  636. if err != nil {
  637. logger.Warn("garbageCollector recycleUnusedIndexFiles remove index files failed", zap.Error(err))
  638. return true
  639. }
  640. logger.Info("garbageCollector recycleUnusedIndexFiles remove index files success")
  641. return true
  642. }
  643. filesMap := gc.getAllIndexFilesOfIndex(segIdx)
  644. logger.Info("recycle index files", zap.Int("meta files num", len(filesMap)))
  645. deletedFilesNum := atomic.NewInt32(0)
  646. fileNum := 0
  647. futures := make([]*conc.Future[struct{}], 0)
  648. err = gc.option.cli.WalkWithPrefix(ctx, key, true, func(indexFile *storage.ChunkObjectInfo) bool {
  649. fileNum++
  650. file := indexFile.FilePath
  651. if _, ok := filesMap[file]; !ok {
  652. future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
  653. logger := logger.With(zap.String("file", file))
  654. logger.Info("garbageCollector recycleUnusedIndexFiles remove file...")
  655. if err := gc.option.cli.Remove(ctx, file); err != nil {
  656. logger.Warn("garbageCollector recycleUnusedIndexFiles remove file failed", zap.Error(err))
  657. return struct{}{}, err
  658. }
  659. deletedFilesNum.Inc()
  660. logger.Info("garbageCollector recycleUnusedIndexFiles remove file success")
  661. return struct{}{}, nil
  662. })
  663. futures = append(futures, future)
  664. }
  665. return true
  666. })
  667. // Wait for all remove tasks done.
  668. if err := conc.BlockOnAll(futures...); err != nil {
  669. // error is logged, and can be ignored here.
  670. logger.Warn("some task failure in remove object pool", zap.Error(err))
  671. }
  672. logger = logger.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
  673. if err != nil {
  674. logger.Warn("index files recycle failed when walk with prefix", zap.Error(err))
  675. return true
  676. }
  677. logger.Info("index files recycle done")
  678. return true
  679. })
  680. log = log.With(zap.Duration("timeCost", time.Since(start)), zap.Int("keyCount", keyCount), zap.Error(err))
  681. if err != nil {
  682. log.Warn("garbageCollector recycleUnusedIndexFiles failed", zap.Error(err))
  683. return
  684. }
  685. log.Info("recycleUnusedIndexFiles done")
  686. }
  687. // getAllIndexFilesOfIndex returns the all index files of index.
  688. func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentIndex) map[string]struct{} {
  689. filesMap := make(map[string]struct{})
  690. for _, fileID := range segmentIndex.IndexFileKeys {
  691. filepath := metautil.BuildSegmentIndexFilePath(gc.option.cli.RootPath(), segmentIndex.BuildID, segmentIndex.IndexVersion,
  692. segmentIndex.PartitionID, segmentIndex.SegmentID, fileID)
  693. filesMap[filepath] = struct{}{}
  694. }
  695. return filesMap
  696. }
  697. // recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta.
  698. func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
  699. log.Info("start recycleUnusedAnalyzeFiles")
  700. startTs := time.Now()
  701. prefix := path.Join(gc.option.cli.RootPath(), common.AnalyzeStatsPath) + "/"
  702. // list dir first
  703. keys := make([]string, 0)
  704. err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(chunkInfo *storage.ChunkObjectInfo) bool {
  705. keys = append(keys, chunkInfo.FilePath)
  706. return true
  707. })
  708. if err != nil {
  709. log.Warn("garbageCollector recycleUnusedAnalyzeFiles list keys from chunk manager failed", zap.Error(err))
  710. return
  711. }
  712. log.Info("recycleUnusedAnalyzeFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("task ids", len(keys)))
  713. for _, key := range keys {
  714. if ctx.Err() != nil {
  715. // process canceled
  716. return
  717. }
  718. log.Debug("analyze keys", zap.String("key", key))
  719. taskID, err := parseBuildIDFromFilePath(key)
  720. if err != nil {
  721. log.Warn("garbageCollector recycleUnusedAnalyzeFiles parseAnalyzeResult failed", zap.String("key", key), zap.Error(err))
  722. continue
  723. }
  724. log.Info("garbageCollector will recycle analyze stats files", zap.Int64("taskID", taskID))
  725. canRecycle, task := gc.meta.analyzeMeta.CheckCleanAnalyzeTask(taskID)
  726. if !canRecycle {
  727. // Even if the analysis task is marked as deleted, the analysis stats file will not be recycled, wait for the next gc,
  728. // and delete all index files about the taskID at one time.
  729. log.Info("garbageCollector no need to recycle analyze stats files", zap.Int64("taskID", taskID))
  730. continue
  731. }
  732. if task == nil {
  733. // taskID no longer exists in meta, remove all analysis files
  734. log.Info("garbageCollector recycleUnusedAnalyzeFiles find meta has not exist, remove index files",
  735. zap.Int64("taskID", taskID))
  736. err = gc.option.cli.RemoveWithPrefix(ctx, key)
  737. if err != nil {
  738. log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files failed",
  739. zap.Int64("taskID", taskID), zap.String("prefix", key), zap.Error(err))
  740. continue
  741. }
  742. log.Info("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files success",
  743. zap.Int64("taskID", taskID), zap.String("prefix", key))
  744. continue
  745. }
  746. log.Info("remove analyze stats files which version is less than current task",
  747. zap.Int64("taskID", taskID), zap.Int64("current version", task.Version))
  748. var i int64
  749. for i = 0; i < task.Version; i++ {
  750. if ctx.Err() != nil {
  751. // process canceled.
  752. return
  753. }
  754. removePrefix := prefix + fmt.Sprintf("%d/", task.Version)
  755. if err := gc.option.cli.RemoveWithPrefix(ctx, removePrefix); err != nil {
  756. log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove files with prefix failed",
  757. zap.Int64("taskID", taskID), zap.String("removePrefix", removePrefix))
  758. continue
  759. }
  760. }
  761. log.Info("analyze stats files recycle success", zap.Int64("taskID", taskID))
  762. }
  763. }
  764. // recycleUnusedTextIndexFiles load meta file info and compares OSS keys
  765. // if missing found, performs gc cleanup
  766. func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
  767. start := time.Now()
  768. log := log.With(zap.String("gcName", "recycleUnusedTextIndexFiles"), zap.Time("startAt", start))
  769. log.Info("start recycleUnusedTextIndexFiles...")
  770. defer func() { log.Info("recycleUnusedTextIndexFiles done", zap.Duration("timeCost", time.Since(start))) }()
  771. hasTextIndexSegments := gc.meta.SelectSegments(SegmentFilterFunc(func(info *SegmentInfo) bool {
  772. return len(info.GetTextStatsLogs()) != 0
  773. }))
  774. fileNum := 0
  775. deletedFilesNum := atomic.NewInt32(0)
  776. for _, seg := range hasTextIndexSegments {
  777. for _, fieldStats := range seg.GetTextStatsLogs() {
  778. log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
  779. // clear low version task
  780. for i := int64(1); i < fieldStats.GetVersion(); i++ {
  781. prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
  782. seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i)
  783. futures := make([]*conc.Future[struct{}], 0)
  784. err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool {
  785. file := files.FilePath
  786. future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
  787. log := log.With(zap.String("file", file))
  788. log.Info("garbageCollector recycleUnusedTextIndexFiles remove file...")
  789. if err := gc.option.cli.Remove(ctx, file); err != nil {
  790. log.Warn("garbageCollector recycleUnusedTextIndexFiles remove file failed", zap.Error(err))
  791. return struct{}{}, err
  792. }
  793. deletedFilesNum.Inc()
  794. log.Info("garbageCollector recycleUnusedTextIndexFiles remove file success")
  795. return struct{}{}, nil
  796. })
  797. futures = append(futures, future)
  798. return true
  799. })
  800. // Wait for all remove tasks done.
  801. if err := conc.BlockOnAll(futures...); err != nil {
  802. // error is logged, and can be ignored here.
  803. log.Warn("some task failure in remove object pool", zap.Error(err))
  804. }
  805. log = log.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
  806. if err != nil {
  807. log.Warn("text index files recycle failed when walk with prefix", zap.Error(err))
  808. return
  809. }
  810. }
  811. }
  812. }
  813. log.Info("text index files recycle done")
  814. metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
  815. }