fix: allow replication of 'null' delete markers (#16773)

This commit is contained in:
Poorna 2023-03-08 07:03:29 -08:00 committed by GitHub
parent 56c57e2c53
commit fb6ab1cca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 8 deletions

View File

@ -586,6 +586,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
return return
} }
} }
if !isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
// mark delete marker replication as failed if target cluster not ready to receive // mark delete marker replication as failed if target cluster not ready to receive
// this request yet (object version not replicated yet) // this request yet (object version not replicated yet)
if err != nil && !toi.ReplicationReady { if err != nil && !toi.ReplicationReady {
@ -593,6 +594,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
return return
} }
} }
}
rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{ rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{
VersionID: versionID, VersionID: versionID,

View File

@ -421,6 +421,7 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
return mtime1.After(mtime2) return mtime1.After(mtime2)
}) })
defPool := PoolObjInfo{Index: -1}
for _, pinfo := range poolObjInfos { for _, pinfo := range poolObjInfos {
// skip all objects from suspended pools if asked by the // skip all objects from suspended pools if asked by the
// caller. // caller.
@ -435,14 +436,13 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) { if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return pinfo, pinfo.Err return pinfo, pinfo.Err
} }
defPool = pinfo
if isErrObjectNotFound(pinfo.Err) { if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker, // No object exists or its a delete marker,
// check objInfo to confirm. // check objInfo to confirm.
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" { if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
return pinfo, nil return pinfo, nil
} }
// objInfo is not valid, truly the object doesn't // objInfo is not valid, truly the object doesn't
// exist proceed to next pool. // exist proceed to next pool.
continue continue
@ -450,7 +450,12 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
return pinfo, nil return pinfo, nil
} }
if opts.ReplicationRequest && opts.DeleteMarker && defPool.Index >= 0 {
// If the request is a delete marker replication request, return a default pool
// in cases where the object does not exist.
// This is to ensure that the delete marker is replicated to the destination.
return defPool, nil
}
return PoolObjInfo{}, toObjectErr(errFileNotFound, bucket, object) return PoolObjInfo{}, toObjectErr(errFileNotFound, bucket, object)
} }