From 438becfde87fafec31a06bbdcde2f89a67a9177b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 30 Mar 2021 17:15:36 -0700 Subject: [PATCH] fix: delete/delete marker replication versions consistent (#11932) replication didn't work as expected when deletion of delete markers was requested in DeleteMultipleObjects API, this is due to incorrect lookup elements being used to look for delete markers. --- cmd/bucket-handlers.go | 22 ++++++++++++---------- cmd/bucket-replication.go | 12 ++++++------ cmd/object-handlers.go | 4 +++- cmd/web-handlers.go | 4 ++-- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 49031c9dc..91d673e0e 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -496,7 +496,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, object.PurgeTransitioned = goi.TransitionStatus } if replicateDeletes { - delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ + replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ ObjectName: object.ObjectName, VersionID: object.VersionID, }, goi, gerr) @@ -511,9 +511,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } if object.VersionID != "" { object.VersionPurgeStatus = Pending - if delMarker { - object.DeleteMarkerVersionID = object.VersionID - } } else { object.DeleteMarkerReplicationStatus = string(replication.Pending) } @@ -557,13 +554,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, }) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) for i := range errs { - dindex := objectsToDelete[ObjectToDelete{ + // DeleteMarkerVersionID is not used specifically to avoid + // lookup errors, since DeleteMarkerVersionID is only + // created during DeleteMarker creation when client didn't + // specify a versionID. + objToDel := ObjectToDelete{ ObjectName: dObjects[i].ObjectName, VersionID: dObjects[i].VersionID, VersionPurgeStatus: dObjects[i].VersionPurgeStatus, DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus, PurgeTransitioned: dObjects[i].PurgeTransitioned, - }] + } + dindex := objectsToDelete[objToDel] if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) { if replicateDeletes { dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus @@ -619,12 +621,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, eventName := event.ObjectRemovedDelete objInfo := ObjectInfo{ - Name: dobj.ObjectName, - VersionID: dobj.VersionID, + Name: dobj.ObjectName, + VersionID: dobj.VersionID, + DeleteMarker: dobj.DeleteMarker, } - if dobj.DeleteMarker { - objInfo.DeleteMarker = dobj.DeleteMarker + if objInfo.DeleteMarker { objInfo.VersionID = dobj.DeleteMarkerVersionID eventName = event.ObjectRemovedDeleteMarkerCreated } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 69ec796ab..7633f159b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -175,10 +175,10 @@ func isStandardHeader(matchHeaderKey string) bool { } // returns whether object version is a deletemarker and if object qualifies for replication -func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) { +func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) { rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { - return false, false, sync + return false, sync } opts := replication.ObjectOpts{ Name: dobj.ObjectName, @@ -198,19 +198,19 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet validReplStatus = true } if oi.DeleteMarker && (validReplStatus || replicate) { - return oi.DeleteMarker, true, sync + return true, sync } // can be the case that other cluster is down and duplicate `mc rm --vid` // is issued - this still needs to be replicated back to the other target - return oi.DeleteMarker, oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync + return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) // the target online status should not be used here while deciding // whether to replicate deletes as the target could be temporarily down if tgt == nil { - return oi.DeleteMarker, false, false + return false, false } - return oi.DeleteMarker, replicate, tgt.replicateSync + return replicate, tgt.replicateSync } // replicate deletes to the designated replication target if replication configuration diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index fabaeb4cf..94ffbed9e 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2817,7 +2817,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. VersionID: opts.VersionID, }) } - _, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) + + replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) if replicateDel { if opts.VersionID != "" { opts.VersionPurgeStatus = Pending @@ -2825,6 +2826,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. opts.DeleteMarkerReplicationStatus = string(replication.Pending) } } + vID := opts.VersionID if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { // check if replica has permission to be deleted. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index e3591cbbe..f743510f4 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -767,7 +767,7 @@ next: } if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig { goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts) - if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ + if replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ ObjectName: objectName, VersionID: goi.VersionID, }, goi, gerr); replicateDel { @@ -903,7 +903,7 @@ next: } } } - _, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil) + replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil) // since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication objToDel := ObjectToDelete{ObjectName: obj.Name} if replicateDel {