Fix evaluation of NewerNoncurrentVersions (#21096)

- Move VersionPurgeStatus into replication package
- ilm: Evaluate policy w/ obj retention/replication
- lifecycle: Use Evaluator to enforce ILM in scanner
- Unit tests covering ILM, replication and retention
- Simplify NewEvaluator constructor
This commit is contained in:
Krishnan Parthasarathi
2025-04-02 23:45:06 -07:00
committed by GitHub
parent 07f31e574c
commit 01447d2438
20 changed files with 916 additions and 471 deletions

View File

@@ -38,7 +38,6 @@ import (
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/heal"
"github.com/minio/minio/internal/event"
@@ -950,10 +949,7 @@ func (i *scannerItem) transformMetaDir() {
i.objectName = split[len(split)-1]
}
var (
applyActionsLogPrefix = color.Green("applyActions:")
applyVersionActionsLogPrefix = color.Green("applyVersionActions:")
)
var applyActionsLogPrefix = color.Green("applyActions:")
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) {
if i.debug {
@@ -978,153 +974,8 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi Object
return 0
}
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (action lifecycle.Action, size int64) {
size, err := oi.GetActualSize()
if i.debug {
scannerLogIf(ctx, err)
}
if i.lifeCycle == nil {
return action, size
}
versionID := oi.VersionID
var vc *versioning.Versioning
var lr lock.Retention
var rcfg *replication.Config
if !isMinioMetaBucketName(i.bucket) {
vc, err = globalBucketVersioningSys.Get(i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}
// Check if bucket is object locked.
lr, err = globalBucketObjectLockSys.Get(i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}
rcfg, err = getReplicationConfig(ctx, i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}
}
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, lr, rcfg, oi)
if i.debug {
if versionID != "" {
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action)
} else {
console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), lcEvt.Action)
}
}
switch lcEvt.Action {
// This version doesn't contribute towards sizeS only when it is permanently deleted.
// This can happen when,
// - ExpireObjectAllVersions flag is enabled
// - NoncurrentVersionExpiration is applicable
case lifecycle.DeleteVersionAction, lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction:
size = 0
case lifecycle.DeleteAction:
// On a non-versioned bucket, DeleteObject removes the only version permanently.
if !vc.PrefixEnabled(oi.Name) {
size = 0
}
}
applyLifecycleAction(lcEvt, lcEventSrc_Scanner, oi)
return lcEvt.Action, size
}
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {
done := globalScannerMetrics.time(scannerMetricApplyNonCurrent)
defer done()
rcfg, _ := globalBucketObjectLockSys.Get(i.bucket)
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
versioned := vcfg != nil && vcfg.Versioned(i.objectPath())
objectInfos := make([]ObjectInfo, 0, len(fivs))
if i.lifeCycle == nil {
for _, fi := range fivs {
objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned))
}
return objectInfos, nil
}
event := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
lim := event.NewerNoncurrentVersions
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
for _, fi := range fivs {
objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned))
}
return objectInfos, nil
}
overflowVersions := fivs[lim+1:]
// Retain the current version + most recent lim noncurrent versions
for _, fi := range fivs[:lim+1] {
objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned))
}
toDel := make([]ObjectToDelete, 0, len(overflowVersions))
for _, fi := range overflowVersions {
obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)
// skip versions with object locking enabled
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
if i.debug {
if obj.VersionID != "" {
console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
} else {
console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
}
}
// add this version back to remaining versions for
// subsequent lifecycle policy applications
objectInfos = append(objectInfos, obj)
continue
}
// NoncurrentDays not passed yet.
if time.Now().UTC().Before(lifecycle.ExpectedExpiryTime(obj.SuccessorModTime, event.NoncurrentDays)) {
// add this version back to remaining versions for
// subsequent lifecycle policy applications
objectInfos = append(objectInfos, obj)
continue
}
toDel = append(toDel, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: obj.Name,
VersionID: obj.VersionID,
},
})
}
if len(toDel) > 0 {
expState.enqueueByNewerNoncurrent(i.bucket, toDel, event)
}
return objectInfos, nil
}
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
// after applying lifecycle checks configured.
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {
objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs, expState)
if err != nil {
return nil, err
}
// Check if we have many versions after applyNewerNoncurrentVersionLimit.
if len(objInfos) >= int(scannerExcessObjectVersions.Load()) {
func (i *scannerItem) alertExcessiveVersions(remainingVersions int, cumulativeSize int64) {
if remainingVersions >= int(scannerExcessObjectVersions.Load()) {
// Notify object accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectManyVersions,
@@ -1134,7 +985,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
},
UserAgent: "Scanner",
Host: globalLocalNodeName,
RespElements: map[string]string{"x-minio-versions": strconv.Itoa(len(objInfos))},
RespElements: map[string]string{"x-minio-versions": strconv.Itoa(remainingVersions)},
})
auditLogInternal(context.Background(), AuditLogOptions{
@@ -1143,15 +994,11 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
Bucket: i.bucket,
Object: i.objectPath(),
Tags: map[string]string{
"x-minio-versions": strconv.Itoa(len(objInfos)),
"x-minio-versions": strconv.Itoa(remainingVersions),
},
})
}
cumulativeSize := int64(0)
for _, objInfo := range objInfos {
cumulativeSize += objInfo.Size
}
// Check if the cumulative size of all versions of this object is high.
if cumulativeSize >= scannerExcessObjectVersionsTotalSize.Load() {
// Notify object accessed via a GET request.
@@ -1164,7 +1011,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
UserAgent: "Scanner",
Host: globalLocalNodeName,
RespElements: map[string]string{
"x-minio-versions-count": strconv.Itoa(len(objInfos)),
"x-minio-versions-count": strconv.Itoa(remainingVersions),
"x-minio-versions-size": strconv.FormatInt(cumulativeSize, 10),
},
})
@@ -1175,43 +1022,30 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
Bucket: i.bucket,
Object: i.objectPath(),
Tags: map[string]string{
"x-minio-versions-count": strconv.Itoa(len(objInfos)),
"x-minio-versions-count": strconv.Itoa(remainingVersions),
"x-minio-versions-size": strconv.FormatInt(cumulativeSize, 10),
},
})
}
return objInfos, nil
}
type actionsAccountingFn func(oi ObjectInfo, sz, actualSz int64, sizeS *sizeSummary)
// applyActions will apply lifecycle checks on to a scanned item.
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) (objDeleted bool, size int64) {
done := globalScannerMetrics.time(scannerMetricILM)
var action lifecycle.Action
action, size = i.applyLifecycle(ctx, o, oi)
done()
// Note: objDeleted is true if and only if action ==
// lifecycle.DeleteAllVersionsAction
if action.DeleteAll() {
return true, 0
}
// For instance, an applied lifecycle means we remove/transitioned an object
// from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag.
if action == lifecycle.NoneAction {
func (i *scannerItem) applyActions(ctx context.Context, objAPI ObjectLayer, objInfos []ObjectInfo, lr lock.Retention, sizeS *sizeSummary, fn actionsAccountingFn) {
healActions := func(oi ObjectInfo, actualSz int64) int64 {
size := actualSz
if i.heal.enabled {
done := globalScannerMetrics.time(scannerMetricHealCheck)
size = i.applyHealing(ctx, o, oi)
size = i.applyHealing(ctx, objAPI, oi)
done()
if healDeleteDangling {
done := globalScannerMetrics.time(scannerMetricCleanAbandoned)
err := o.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling})
err := objAPI.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling})
done()
if err != nil {
healingLogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", i.bucket, i.objectPath(), err), i.objectPath())
@@ -1221,10 +1055,109 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object
// replicate only if lifecycle rules are not applied.
done := globalScannerMetrics.time(scannerMetricCheckReplication)
i.healReplication(ctx, o, oi.Clone(), sizeS)
i.healReplication(ctx, oi.Clone(), sizeS)
done()
return size
}
return false, size
vc, err := globalBucketVersioningSys.Get(i.bucket)
if err != nil {
scannerLogOnceIf(ctx, err, i.bucket)
return
}
// start ILM check timer
done := globalScannerMetrics.timeN(scannerMetricILM)
if i.lifeCycle == nil { // no ILM configured, apply healing and replication checks
var cumulativeSize int64
for _, oi := range objInfos {
actualSz, err := oi.GetActualSize()
if err != nil {
scannerLogIf(ctx, err)
continue
}
size := healActions(oi, actualSz)
if fn != nil { // call accountingfn
fn(oi, size, actualSz, sizeS)
}
cumulativeSize += size
}
// end ILM check timer
done(len(objInfos))
i.alertExcessiveVersions(len(objInfos), cumulativeSize)
return
}
objOpts := make([]lifecycle.ObjectOpts, len(objInfos))
for i, oi := range objInfos {
objOpts[i] = oi.ToLifecycleOpts()
}
evaluator := lifecycle.NewEvaluator(*i.lifeCycle).WithLockRetention(&lr).WithReplicationConfig(i.replication.Config)
events, err := evaluator.Eval(objOpts)
if err != nil {
// This error indicates that the objOpts passed to Eval is invalid.
bugLogIf(ctx, err, i.bucket, i.objectPath())
done(len(objInfos)) // end ILM check timer
return
}
done(len(objInfos)) // end ILM check timer
var (
toDel []ObjectToDelete
noncurrentEvents []lifecycle.Event
cumulativeSize int64
)
remainingVersions := len(objInfos)
eventLoop:
for idx, event := range events {
oi := objInfos[idx]
actualSz, err := oi.GetActualSize()
if i.debug {
scannerLogIf(ctx, err)
}
size := actualSz
switch event.Action {
case lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction:
remainingVersions = 0
applyExpiryRule(event, lcEventSrc_Scanner, oi)
break eventLoop
case lifecycle.DeleteAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
if !vc.PrefixEnabled(i.objectPath()) && event.Action == lifecycle.DeleteAction {
remainingVersions--
size = 0
}
applyExpiryRule(event, lcEventSrc_Scanner, oi)
case lifecycle.DeleteVersionAction: // noncurrent versions expiration
opts := objOpts[idx]
remainingVersions--
size = 0
toDel = append(toDel, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: opts.Name,
VersionID: opts.VersionID,
},
})
noncurrentEvents = append(noncurrentEvents, event)
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
applyTransitionRule(event, lcEventSrc_Scanner, oi)
case lifecycle.NoneAction:
size = healActions(oi, actualSz)
}
// NB fn must be called for every object version except if it is
// expired or was a dangling object.
if fn != nil {
fn(oi, size, actualSz, sizeS)
}
cumulativeSize += size
}
if len(toDel) > 0 {
globalExpiryState.enqueueNoncurrentVersions(i.bucket, toDel, noncurrentEvents)
}
i.alertExcessiveVersions(remainingVersions, cumulativeSize)
}
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, rcfg *replication.Config, obj ObjectInfo) lifecycle.Event {
@@ -1371,22 +1304,8 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay
}
// Apply object, object version, restored object or restored object version action on the given object
func applyExpiryRule(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) bool {
func applyExpiryRule(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) {
globalExpiryState.enqueueByDays(obj, event, src)
return true
}
// Perform actions (removal or transitioning of objects), return true the action is successfully performed
func applyLifecycleAction(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) (success bool) {
switch action := event.Action; action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction,
lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction,
lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction:
success = applyExpiryRule(event, src, obj)
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
success = applyTransitionRule(event, src, obj)
}
return
}
// objectPath returns the prefix and object name.
@@ -1395,7 +1314,7 @@ func (i *scannerItem) objectPath() string {
}
// healReplication will heal a scanned item that has failed replication.
func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
func (i *scannerItem) healReplication(ctx context.Context, oi ObjectInfo, sizeS *sizeSummary) {
if oi.VersionID == "" {
return
}