fix: resync of replication of delete markers (#12932)

Fixes #12919
This commit is contained in:
Poorna Krishnamoorthy 2021-08-23 17:48:22 -04:00 committed by GitHub
parent db35bcf2ce
commit 674c6f7a7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 3 deletions

View File

@ -337,6 +337,17 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// early return if already replicated delete marker for existing object replication
if dobj.DeleteMarkerVersionID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
_, err := tgt.StatObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: "false",
}})
if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
return
}
}
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
VersionID: versionID,
@ -766,7 +777,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
if rtype == replicateNone {
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed {
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
// if metadata is not updated for some reason after replication, such as
// 503 encountered while updating metadata - make sure to set ReplicationStatus
// as Completed.
@ -782,6 +793,9 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
for k, v := range objInfo.UserDefined {
popts.UserDefined[k] = v
}
if ri.OpType == replication.ExistingObjectReplicationType {
popts.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
}
popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
if objInfo.UserTags != "" {
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags

View File

@ -1173,7 +1173,7 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
if existingObjResync && oi.ReplicationStatus == replication.Completed {
if existingObjResync && (oi.ReplicationStatus == replication.Completed) {
i.healReplicationDeletes(ctx, o, oi, existingObjResync)
return
}

View File

@ -441,7 +441,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
}
objInfo = fi.ToObjectInfo(bucket, object)
if !fi.VersionPurgeStatus.Empty() {
if !fi.VersionPurgeStatus.Empty() && opts.VersionID != "" {
// Make sure to return object info to provide extra information.
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
}

View File

@ -333,6 +333,10 @@ func ErrorRespToObjectError(err error, params ...string) error {
err = PartTooSmall{}
}
switch minioErr.StatusCode {
case http.StatusMethodNotAllowed:
err = toObjectErr(errMethodNotAllowed, bucket, object)
}
return err
}

View File

@ -686,3 +686,9 @@ func isErrPreconditionFailed(err error) bool {
_, ok := err.(PreConditionFailed)
return ok
}
// isErrMethodNotAllowed - Check if error type is MethodNotAllowed.
func isErrMethodNotAllowed(err error) bool {
var methodNotAllowed MethodNotAllowed
return errors.As(err, &methodNotAllowed)
}