fix: delete/delete marker replication versions consistent (#11932)

replication didn't work as expected when deletion of
delete markers was requested in DeleteMultipleObjects
API, this is due to incorrect lookup elements being
used to look for delete markers.
This commit is contained in:
Harshavardhana 2021-03-30 17:15:36 -07:00 committed by GitHub
parent 014edd3462
commit 8e6e287729
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 19 deletions

View File

@ -496,7 +496,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
object.PurgeTransitioned = goi.TransitionStatus
}
if replicateDeletes {
delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectName: object.ObjectName,
VersionID: object.VersionID,
}, goi, gerr)
@ -511,9 +511,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
if object.VersionID != "" {
object.VersionPurgeStatus = Pending
if delMarker {
object.DeleteMarkerVersionID = object.VersionID
}
} else {
object.DeleteMarkerReplicationStatus = string(replication.Pending)
}
@ -557,13 +554,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
})
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs {
dindex := objectsToDelete[ObjectToDelete{
// DeleteMarkerVersionID is not used specifically to avoid
// lookup errors, since DeleteMarkerVersionID is only
// created during DeleteMarker creation when client didn't
// specify a versionID.
objToDel := ObjectToDelete{
ObjectName: dObjects[i].ObjectName,
VersionID: dObjects[i].VersionID,
VersionPurgeStatus: dObjects[i].VersionPurgeStatus,
DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus,
PurgeTransitioned: dObjects[i].PurgeTransitioned,
}]
}
dindex := objectsToDelete[objToDel]
if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) {
if replicateDeletes {
dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus
@ -619,12 +621,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
eventName := event.ObjectRemovedDelete
objInfo := ObjectInfo{
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
DeleteMarker: dobj.DeleteMarker,
}
if dobj.DeleteMarker {
objInfo.DeleteMarker = dobj.DeleteMarker
if objInfo.DeleteMarker {
objInfo.VersionID = dobj.DeleteMarkerVersionID
eventName = event.ObjectRemovedDeleteMarkerCreated
}

View File

@ -175,10 +175,10 @@ func isStandardHeader(matchHeaderKey string) bool {
}
// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) {
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return false, false, sync
return false, sync
}
opts := replication.ObjectOpts{
Name: dobj.ObjectName,
@ -198,19 +198,19 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
validReplStatus = true
}
if oi.DeleteMarker && (validReplStatus || replicate) {
return oi.DeleteMarker, true, sync
return true, sync
}
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
return oi.DeleteMarker, oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
if tgt == nil {
return oi.DeleteMarker, false, false
return false, false
}
return oi.DeleteMarker, replicate, tgt.replicateSync
return replicate, tgt.replicateSync
}
// replicate deletes to the designated replication target if replication configuration

View File

@ -3094,7 +3094,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionID: opts.VersionID,
})
}
_, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
if replicateDel {
if opts.VersionID != "" {
opts.VersionPurgeStatus = Pending
@ -3102,6 +3103,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
opts.DeleteMarkerReplicationStatus = string(replication.Pending)
}
}
vID := opts.VersionID
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
// check if replica has permission to be deleted.

View File

@ -767,7 +767,7 @@ next:
}
if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig {
goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts)
if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
if replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
ObjectName: objectName,
VersionID: goi.VersionID,
}, goi, gerr); replicateDel {
@ -903,7 +903,7 @@ next:
}
}
}
_, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
// since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication
objToDel := ObjectToDelete{ObjectName: obj.Name}
if replicateDel {