Skip versions expired by DeleteAllVersionsAction (#18537)

Object versions expired by DeleteAllVersionsAction must not be included
toward data-usage accounting.
This commit is contained in:
Krishnan Parthasarathi 2023-11-28 08:39:21 -08:00 committed by GitHub
parent b0264bdb90
commit 9fbd931058
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 18 deletions

View File

@ -477,7 +477,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
item.heal.enabled = item.heal.enabled && f.healObjectSelect > 0 item.heal.enabled = item.heal.enabled && f.healObjectSelect > 0
sz, err := f.getSize(item) sz, err := f.getSize(item)
if err != nil { if err != nil && err != errIgnoreFileContrib {
wait() // wait to proceed to next entry. wait() // wait to proceed to next entry.
if err != errSkipFile && f.dataUsageScannerDebug { if err != errSkipFile && f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" getSize \"%v/%v\" returned err: %v\n", bucket, item.objectPath(), err) console.Debugf(scannerLogPrefix+" getSize \"%v/%v\" returned err: %v\n", bucket, item.objectPath(), err)
@ -495,8 +495,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// object. // object.
delete(abandonedChildren, pathJoin(item.bucket, item.objectPath())) delete(abandonedChildren, pathJoin(item.bucket, item.objectPath()))
into.addSizes(sz) if err != errIgnoreFileContrib {
into.Objects++ into.addSizes(sz)
into.Objects++
}
wait() // wait to proceed to next entry. wait() // wait to proceed to next entry.
@ -917,16 +919,17 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi Object
return 0 return 0
} }
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (applied bool, size int64) { func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (action lifecycle.Action, size int64) {
size, err := oi.GetActualSize() size, err := oi.GetActualSize()
if i.debug { if i.debug {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
if i.lifeCycle == nil { if i.lifeCycle == nil {
return false, size return action, size
} }
versionID := oi.VersionID versionID := oi.VersionID
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket) rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
replcfg, _ := getReplicationConfig(ctx, i.bucket) replcfg, _ := getReplicationConfig(ctx, i.bucket)
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, replcfg, oi) lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, replcfg, oi)
@ -938,7 +941,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
} }
} }
defer func() { defer func() {
if applied { if lcEvt.Action != lifecycle.NoneAction {
numVersions := uint64(1) numVersions := uint64(1)
if lcEvt.Action == lifecycle.DeleteAllVersionsAction { if lcEvt.Action == lifecycle.DeleteAllVersionsAction {
numVersions = uint64(oi.NumVersions) numVersions = uint64(oi.NumVersions)
@ -948,14 +951,21 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
}() }()
switch lcEvt.Action { switch lcEvt.Action {
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction, lifecycle.DeleteAllVersionsAction: // This version doesn't contribute towards sizeS only when it is permanently deleted.
return applyLifecycleAction(lcEvt, lcEventSrc_Scanner, oi), 0 // This can happen when,
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: // - ExpireObjectAllVersions flag is enabled
return applyLifecycleAction(lcEvt, lcEventSrc_Scanner, oi), size // - NoncurrentVersionExpiration is applicable
default: case lifecycle.DeleteVersionAction, lifecycle.DeleteAllVersionsAction:
// No action. size = 0
return false, size case lifecycle.DeleteAction:
// On a non-versioned bucket, DeleteObject removes the only version permanently.
if !vcfg.PrefixEnabled(oi.Name) {
size = 0
}
} }
applyLifecycleAction(lcEvt, lcEventSrc_Scanner, oi)
return lcEvt.Action, size
} }
// applyTierObjSweep removes remote object pending deletion and the free-version // applyTierObjSweep removes remote object pending deletion and the free-version
@ -1097,15 +1107,22 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
// The resulting size on disk will always be returned. // The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied. // The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken. // If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 { func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) (objDeleted bool, size int64) {
done := globalScannerMetrics.time(scannerMetricILM) done := globalScannerMetrics.time(scannerMetricILM)
applied, size := i.applyLifecycle(ctx, o, oi) var action lifecycle.Action
action, size = i.applyLifecycle(ctx, o, oi)
done() done()
// Note: objDeleted is true if and only if action ==
// lifecycle.DeleteAllVersionsAction
if action == lifecycle.DeleteAllVersionsAction {
return true, 0
}
// For instance, an applied lifecycle means we remove/transitioned an object // For instance, an applied lifecycle means we remove/transitioned an object
// from the current deployment, which means we don't have to call healing // from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag. // routine even if we are asked to do via heal flag.
if !applied { if action == lifecycle.NoneAction {
if i.heal.enabled { if i.heal.enabled {
done := globalScannerMetrics.time(scannerMetricHealCheck) done := globalScannerMetrics.time(scannerMetricHealCheck)
size = i.applyHealing(ctx, o, oi) size = i.applyHealing(ctx, o, oi)
@ -1126,7 +1143,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object
i.healReplication(ctx, o, oi.Clone(), sizeS) i.healReplication(ctx, o, oi.Clone(), sizeS)
done() done()
} }
return size return false, size
} }
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, rcfg *replication.Config, obj ObjectInfo) lifecycle.Event { func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, rcfg *replication.Config, obj ObjectInfo) lifecycle.Event {

View File

@ -118,6 +118,8 @@ var errDoneForNow = errors.New("done for now")
// to proceed to next entry. // to proceed to next entry.
var errSkipFile = errors.New("skip this file") var errSkipFile = errors.New("skip this file")
var errIgnoreFileContrib = errors.New("ignore this file's contribution toward data-usage")
// errXLBackend XL drive mode requires fresh deployment. // errXLBackend XL drive mode requires fresh deployment.
var errXLBackend = errors.New("XL backend requires fresh drive") var errXLBackend = errors.New("XL backend requires fresh drive")

View File

@ -567,11 +567,19 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
versioned := vcfg != nil && vcfg.Versioned(item.objectPath()) versioned := vcfg != nil && vcfg.Versioned(item.objectPath())
var objDeleted bool
for _, oi := range objInfos { for _, oi := range objInfos {
done = globalScannerMetrics.time(scannerMetricApplyVersion) done = globalScannerMetrics.time(scannerMetricApplyVersion)
sz := item.applyActions(ctx, objAPI, oi, &sizeS) var sz int64
objDeleted, sz = item.applyActions(ctx, objAPI, oi, &sizeS)
done() done()
// DeleteAllVersionsAction: The object and all its
// versions are expired and
// doesn't contribute toward data usage.
if objDeleted {
break
}
actualSz, err := oi.GetActualSize() actualSz, err := oi.GetActualSize()
if err != nil { if err != nil {
continue continue
@ -644,6 +652,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
} }
} }
} }
if objDeleted {
// we return errIgnoreFileContrib to signal this function's
// callers to skip this object's contribution towards
// usage.
return sizeSummary{}, errIgnoreFileContrib
}
return sizeS, nil return sizeS, nil
}, scanMode) }, scanMode)
if err != nil { if err != nil {