fix: delete/delete marker replication versions consistent (#11932)

replication didn't work as expected when deletion of
delete markers was requested in DeleteMultipleObjects
API, this is due to incorrect lookup elements being
used to look for delete markers.
This commit is contained in:
Harshavardhana 2021-03-30 17:15:36 -07:00
parent 16ef338649
commit 438becfde8
4 changed files with 23 additions and 19 deletions

View File

@ -496,7 +496,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
object.PurgeTransitioned = goi.TransitionStatus object.PurgeTransitioned = goi.TransitionStatus
} }
if replicateDeletes { if replicateDeletes {
delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectName: object.ObjectName, ObjectName: object.ObjectName,
VersionID: object.VersionID, VersionID: object.VersionID,
}, goi, gerr) }, goi, gerr)
@ -511,9 +511,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
} }
if object.VersionID != "" { if object.VersionID != "" {
object.VersionPurgeStatus = Pending object.VersionPurgeStatus = Pending
if delMarker {
object.DeleteMarkerVersionID = object.VersionID
}
} else { } else {
object.DeleteMarkerReplicationStatus = string(replication.Pending) object.DeleteMarkerReplicationStatus = string(replication.Pending)
} }
@ -557,13 +554,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}) })
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs { for i := range errs {
dindex := objectsToDelete[ObjectToDelete{ // DeleteMarkerVersionID is not used specifically to avoid
// lookup errors, since DeleteMarkerVersionID is only
// created during DeleteMarker creation when client didn't
// specify a versionID.
objToDel := ObjectToDelete{
ObjectName: dObjects[i].ObjectName, ObjectName: dObjects[i].ObjectName,
VersionID: dObjects[i].VersionID, VersionID: dObjects[i].VersionID,
VersionPurgeStatus: dObjects[i].VersionPurgeStatus, VersionPurgeStatus: dObjects[i].VersionPurgeStatus,
DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus, DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus,
PurgeTransitioned: dObjects[i].PurgeTransitioned, PurgeTransitioned: dObjects[i].PurgeTransitioned,
}] }
dindex := objectsToDelete[objToDel]
if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) { if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) {
if replicateDeletes { if replicateDeletes {
dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus
@ -619,12 +621,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
eventName := event.ObjectRemovedDelete eventName := event.ObjectRemovedDelete
objInfo := ObjectInfo{ objInfo := ObjectInfo{
Name: dobj.ObjectName, Name: dobj.ObjectName,
VersionID: dobj.VersionID, VersionID: dobj.VersionID,
DeleteMarker: dobj.DeleteMarker,
} }
if dobj.DeleteMarker { if objInfo.DeleteMarker {
objInfo.DeleteMarker = dobj.DeleteMarker
objInfo.VersionID = dobj.DeleteMarkerVersionID objInfo.VersionID = dobj.DeleteMarkerVersionID
eventName = event.ObjectRemovedDeleteMarkerCreated eventName = event.ObjectRemovedDeleteMarkerCreated
} }

View File

@ -175,10 +175,10 @@ func isStandardHeader(matchHeaderKey string) bool {
} }
// returns whether object version is a deletemarker and if object qualifies for replication // returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) { func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
rcfg, err := getReplicationConfig(ctx, bucket) rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil { if err != nil || rcfg == nil {
return false, false, sync return false, sync
} }
opts := replication.ObjectOpts{ opts := replication.ObjectOpts{
Name: dobj.ObjectName, Name: dobj.ObjectName,
@ -198,19 +198,19 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
validReplStatus = true validReplStatus = true
} }
if oi.DeleteMarker && (validReplStatus || replicate) { if oi.DeleteMarker && (validReplStatus || replicate) {
return oi.DeleteMarker, true, sync return true, sync
} }
// can be the case that other cluster is down and duplicate `mc rm --vid` // can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target // is issued - this still needs to be replicated back to the other target
return oi.DeleteMarker, oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
} }
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
// the target online status should not be used here while deciding // the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down // whether to replicate deletes as the target could be temporarily down
if tgt == nil { if tgt == nil {
return oi.DeleteMarker, false, false return false, false
} }
return oi.DeleteMarker, replicate, tgt.replicateSync return replicate, tgt.replicateSync
} }
// replicate deletes to the designated replication target if replication configuration // replicate deletes to the designated replication target if replication configuration

View File

@ -2817,7 +2817,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionID: opts.VersionID, VersionID: opts.VersionID,
}) })
} }
_, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
if replicateDel { if replicateDel {
if opts.VersionID != "" { if opts.VersionID != "" {
opts.VersionPurgeStatus = Pending opts.VersionPurgeStatus = Pending
@ -2825,6 +2826,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
opts.DeleteMarkerReplicationStatus = string(replication.Pending) opts.DeleteMarkerReplicationStatus = string(replication.Pending)
} }
} }
vID := opts.VersionID vID := opts.VersionID
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
// check if replica has permission to be deleted. // check if replica has permission to be deleted.

View File

@ -767,7 +767,7 @@ next:
} }
if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig { if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig {
goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts) goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts)
if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ if replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
ObjectName: objectName, ObjectName: objectName,
VersionID: goi.VersionID, VersionID: goi.VersionID,
}, goi, gerr); replicateDel { }, goi, gerr); replicateDel {
@ -903,7 +903,7 @@ next:
} }
} }
} }
_, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil) replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
// since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication // since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication
objToDel := ObjectToDelete{ObjectName: obj.Name} objToDel := ObjectToDelete{ObjectName: obj.Name}
if replicateDel { if replicateDel {