Avoid ILM expiry on deleted versions that are yet to replicate (#18175)

Fixes #18167
This commit is contained in:
Poorna 2023-10-06 05:55:15 -07:00 committed by GitHub
parent 72871dbb9a
commit 9dc29d7687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 23 additions and 12 deletions

View File

@ -929,7 +929,8 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
versionID := oi.VersionID
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, oi)
replcfg, _ := getReplicationConfig(ctx, i.bucket)
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, replcfg, oi)
if i.debug {
if versionID != "" {
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action)
@ -1129,7 +1130,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object
return size
}
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, obj ObjectInfo) lifecycle.Event {
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, rcfg *replication.Config, obj ObjectInfo) lifecycle.Event {
event := lc.Eval(obj.ToLifecycleOpts())
if serverDebugLog {
console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", event.Action)
@ -1161,6 +1162,9 @@ func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr loc
}
return lifecycle.Event{Action: lifecycle.NoneAction}
}
if rcfg != nil && !obj.VersionPurgeStatus.Empty() && rcfg.HasActiveRules(obj.Name, true) {
return lifecycle.Event{Action: lifecycle.NoneAction}
}
}
return event

View File

@ -1628,10 +1628,12 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
var lc *lifecycle.Lifecycle
var rcfg lock.Retention
var replcfg *replication.Config
if opts.Expiration.Expire {
// Check if the current bucket has a configured lifecycle policy
lc, _ = globalLifecycleSys.Get(bucket)
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
replcfg, _ = getReplicationConfig(ctx, bucket)
}
// expiration attempted on a bucket with no lifecycle
@ -1684,7 +1686,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if opts.Expiration.Expire {
if gerr == nil {
evt := evalActionFromLifecycle(ctx, *lc, rcfg, goi)
evt := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, goi)
var isErr bool
switch evt.Action {
case lifecycle.NoneAction:

View File

@ -734,6 +734,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
rcfg, _ := getReplicationConfig(ctx, bi.Name)
for setIdx, set := range pool.sets {
set := set
@ -745,7 +746,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
switch {
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt, lcEventSrc_Decom)
@ -1048,6 +1049,7 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
rcfg, _ := getReplicationConfig(ctx, bi.Name)
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
@ -1056,7 +1058,7 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
switch {
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt, lcEventSrc_Decom)

View File

@ -440,6 +440,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bucket)
rcfg, _ := getReplicationConfig(ctx, bucket)
pool := z.serverPools[poolIdx]
const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
@ -467,7 +468,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
return true

View File

@ -1390,7 +1390,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true})
if err == nil {
if opts.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo)
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {
@ -1414,7 +1414,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil {
if opts.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo)
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, opts.Replication.Config, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {

View File

@ -370,7 +370,7 @@ func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCa
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo).Action
act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo).Action
skip = act.Delete()
if act.DeleteRestored() {
// do not skip DeleteRestored* actions
@ -398,7 +398,7 @@ func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCa
objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo)
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {

View File

@ -474,7 +474,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
event := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo)
replcfg, _ := getReplicationConfig(ctx, bucket)
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
if event.Action.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(event, lcEventSrc_s3GetObject, objInfo)
@ -732,7 +733,8 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
event := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo)
replcfg, _ := getReplicationConfig(ctx, bucket)
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
if event.Action.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(event, lcEventSrc_s3HeadObject, objInfo)