fix: healing of replication delete markers (#13933)

A corner case can occur where the delete-marker was propagated 
but the metadata could not be updated on the primary. Sending 
a RemoveObject call with the Delete marker version would end 
up permanently deleting the version on target. Instead, perform 
a Stat on the delete-marker version on target and redo replication 
only if the delete-marker is missing on target.
This commit is contained in:
Poorna K 2021-12-16 15:34:55 -08:00 committed by GitHub
parent 926373f9c1
commit e270ab65b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 5 deletions

View File

@ -517,8 +517,8 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
} }
return return
} }
// early return if already replicated delete marker for existing object replication // early return if already replicated delete marker for existing object replication/ healing delete markers
if dobj.DeleteMarkerVersionID != "" && dobj.OpType == replication.ExistingObjectReplicationType { if dobj.DeleteMarkerVersionID != "" && (dobj.OpType == replication.ExistingObjectReplicationType || dobj.OpType == replication.HealReplicationType) {
if _, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.StatObjectOptions{ if _, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
VersionID: versionID, VersionID: versionID,
Internal: miniogo.AdvancedGetOptions{ Internal: miniogo.AdvancedGetOptions{
@ -526,12 +526,10 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
}}); isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) { }}); isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
if dobj.VersionID == "" { if dobj.VersionID == "" {
rinfo.ReplicationStatus = replication.Completed rinfo.ReplicationStatus = replication.Completed
} else {
rinfo.VersionPurgeStatus = Complete
}
return return
} }
} }
}
rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{ rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
VersionID: versionID, VersionID: versionID,

View File

@ -1269,6 +1269,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer,
DeleteMarker: roi.DeleteMarker, DeleteMarker: roi.DeleteMarker,
}, },
Bucket: roi.Bucket, Bucket: roi.Bucket,
OpType: replication.HealReplicationType,
} }
if roi.ExistingObjResync.mustResync() { if roi.ExistingObjResync.mustResync() {
doi.OpType = replication.ExistingObjectReplicationType doi.OpType = replication.ExistingObjectReplicationType