From 289223b6de752e20e2d77c59ade64e026431008a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 27 Mar 2024 23:44:52 -0700 Subject: [PATCH] expire ILM all versions verify quorum on action (#19359) --- cmd/erasure-object.go | 56 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index e87313d13..5e2599eef 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1821,10 +1821,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string auditObjectErasureSet(ctx, object, &er) } - if opts.DeletePrefix { - return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object) - } - var lc *lifecycle.Lifecycle var rcfg lock.Retention var replcfg *replication.Config @@ -1851,12 +1847,63 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } } + if opts.DeletePrefix { + if opts.Expiration.Expire { + // Expire all versions expiration must still verify the state() on disk + // via a getObjectInfo() call as follows, any read quorum issues we + // must not proceed further for safety reasons. attempt a MRF heal + // while we see such quorum errors. + goi, _, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts) + if gerr != nil && goi.Name == "" { + if _, ok := gerr.(InsufficientReadQuorum); ok { + // Add an MRF heal for next time. + er.addPartial(bucket, object, opts.VersionID) + + return objInfo, InsufficientWriteQuorum{} + } + return objInfo, gerr + } + + // Add protection and re-verify the ILM rules for qualification + // based on the latest objectInfo and see if the object still + // qualifies for deletion. + if gerr == nil { + evt := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, goi) + var isErr bool + switch evt.Action { + case lifecycle.NoneAction: + isErr = true + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + isErr = true + } + if isErr { + if goi.VersionID != "" { + return goi, VersionNotFound{ + Bucket: bucket, + Object: object, + VersionID: goi.VersionID, + } + } + return goi, ObjectNotFound{ + Bucket: bucket, + Object: object, + } + } + } + } // Delete marker and any latest that qualifies shall be expired permanently. + + return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object) + } + storageDisks := er.getDisks() versionFound := true objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. goi, _, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts) if gerr != nil && goi.Name == "" { if _, ok := gerr.(InsufficientReadQuorum); ok { + // Add an MRF heal for next time. + er.addPartial(bucket, object, opts.VersionID) + return objInfo, InsufficientWriteQuorum{} } // For delete marker replication, versionID being replicated will not exist on disk @@ -1866,6 +1913,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string return objInfo, gerr } } + if opts.EvalMetadataFn != nil { dsc, err := opts.EvalMetadataFn(&goi, err) if err != nil {