From 4c204707fd5f4fa772f40d782b66e78bc4e00b46 Mon Sep 17 00:00:00 2001 From: Shubhendu Date: Fri, 7 Apr 2023 02:40:01 +0530 Subject: [PATCH] Correct to remove `null` version while ILM rule application (#16971) Signed-off-by: Shubhendu Ram Tripathi Co-authored-by: Harshavardhana --- cmd/data-scanner.go | 53 ++++++++++++++++++++++++++------------------- cmd/xl-storage.go | 5 ++--- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 03ce556fd..d68116590 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -991,27 +991,35 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O // applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured. // Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. -func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]FileInfo, error) { - if i.lifeCycle == nil { - return fivs, nil - } +func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) { done := globalScannerMetrics.time(scannerMetricApplyNonCurrent) defer done() - _, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()}) - if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions - return fivs, nil - } - - overflowVersions := fivs[lim+1:] - // current version + most recent lim noncurrent versions - fivs = fivs[:lim+1] - rcfg, _ := globalBucketObjectLockSys.Get(i.bucket) vcfg, _ := globalBucketVersioningSys.Get(i.bucket) versioned := vcfg != nil && vcfg.Versioned(i.objectPath()) + // current version + most recent lim noncurrent versions + objectInfos := make([]ObjectInfo, 0, len(fivs)) + + if i.lifeCycle == nil { + for _, fi := range fivs { + objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)) + } + return objectInfos, nil + } + + _, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()}) + if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions + for _, fi := range fivs { + objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)) + } + return objectInfos, nil + } + + overflowVersions := fivs[lim+1:] + toDel := make([]ObjectToDelete, 0, len(overflowVersions)) for _, fi := range overflowVersions { obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned) @@ -1026,7 +1034,7 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob } // add this version back to remaining versions for // subsequent lifecycle policy applications - fivs = append(fivs, fi) + objectInfos = append(objectInfos, obj) continue } @@ -1040,19 +1048,19 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob toDel = append(toDel, ObjectToDelete{ ObjectV: ObjectV{ - ObjectName: fi.Name, - VersionID: fi.VersionID, + ObjectName: obj.Name, + VersionID: obj.VersionID, }, }) } globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel) - return fivs, nil + return objectInfos, nil } // applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain // after applying lifecycle checks configured. -func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) { +func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]ObjectInfo, error) { if i.heal.enabled { if healDeleteDangling { done := globalScannerMetrics.time(scannerMetricCleanAbandoned) @@ -1063,13 +1071,14 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi } } } - fivs, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs) + + objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs) if err != nil { - return fivs, err + return nil, err } // Check if we have many versions after applyNewerNoncurrentVersionLimit. - if len(fivs) > dataScannerExcessiveVersionsThreshold { + if len(objInfos) > dataScannerExcessiveVersionsThreshold { // Notify object accessed via a GET request. sendEvent(eventArgs{ EventName: event.ObjectManyVersions, @@ -1083,7 +1092,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi }) } - return fivs, nil + return objInfos, nil } // applyActions will apply lifecycle checks on to a scanned item. diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1e90c4496..d64f5d3e5 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -514,7 +514,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } done := globalScannerMetrics.time(scannerMetricApplyAll) - fivs.Versions, err = item.applyVersionActions(ctx, objAPI, fivs.Versions) + objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions) done() if err != nil { @@ -524,8 +524,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates versioned := vcfg != nil && vcfg.Versioned(item.objectPath()) - for _, version := range fivs.Versions { - oi := version.ToObjectInfo(item.bucket, item.objectPath(), versioned) + for _, oi := range objInfos { done = globalScannerMetrics.time(scannerMetricApplyVersion) sz := item.applyActions(ctx, objAPI, oi, &sizeS) done()