fix: DeleteObject() API with versionId under replication (#16325)

This commit is contained in:
Harshavardhana 2022-12-28 22:48:33 -08:00 committed by GitHub
parent aa56c6d51d
commit 2937711390
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 141 additions and 10 deletions

View File

@ -68,6 +68,7 @@ test-replication: install ## verify multi site replication
@echo "Running tests for replicating three sites" @echo "Running tests for replicating three sites"
@(env bash $(PWD)/docs/bucket/replication/setup_3site_replication.sh) @(env bash $(PWD)/docs/bucket/replication/setup_3site_replication.sh)
@(env bash $(PWD)/docs/bucket/replication/setup_2site_existing_replication.sh) @(env bash $(PWD)/docs/bucket/replication/setup_2site_existing_replication.sh)
@(env bash $(PWD)/docs/bucket/replication/delete-replication.sh)
test-site-replication-ldap: install ## verify automatic site replication test-site-replication-ldap: install ## verify automatic site replication
@echo "Running tests for automatic site replication of IAM (with LDAP)" @echo "Running tests for automatic site replication of IAM (with LDAP)"

View File

@ -309,7 +309,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
for _, tgtArn := range tgtArns { for _, tgtArn := range tgtArns {
opts.TargetArn = tgtArn opts.TargetArn = tgtArn
replicate = rcfg.Replicate(opts) replicate = rcfg.Replicate(opts)
// when incoming delete is removal of a delete marker( a.k.a versioned delete), // when incoming delete is removal of a delete marker(a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound // GetObjectInfo returns extra information even though it returns errFileNotFound
if gerr != nil { if gerr != nil {
validReplStatus := false validReplStatus := false

View File

@ -147,9 +147,9 @@ func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInf
// Add replication status to the object info // Add replication status to the object info
objInfo.ReplicationStatusInternal = fi.ReplicationState.ReplicationStatusInternal objInfo.ReplicationStatusInternal = fi.ReplicationState.ReplicationStatusInternal
objInfo.VersionPurgeStatusInternal = fi.ReplicationState.VersionPurgeStatusInternal objInfo.VersionPurgeStatusInternal = fi.ReplicationState.VersionPurgeStatusInternal
objInfo.ReplicationStatus = fi.ReplicationState.CompositeReplicationStatus() objInfo.ReplicationStatus = fi.ReplicationStatus()
objInfo.VersionPurgeStatus = fi.VersionPurgeStatus()
objInfo.VersionPurgeStatus = fi.ReplicationState.CompositeVersionPurgeStatus()
objInfo.TransitionedObject = TransitionedObject{ objInfo.TransitionedObject = TransitionedObject{
Name: fi.TransitionedObjName, Name: fi.TransitionedObjName,
VersionID: fi.TransitionVersionID, VersionID: fi.TransitionVersionID,
@ -176,7 +176,6 @@ func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInf
objInfo.StorageClass = globalMinioDefaultStorageClass objInfo.StorageClass = globalMinioDefaultStorageClass
} }
objInfo.VersionPurgeStatus = fi.VersionPurgeStatus()
// set restore status for transitioned object // set restore status for transitioned object
restoreHdr, ok := fi.Metadata[xhttp.AmzRestore] restoreHdr, ok := fi.Metadata[xhttp.AmzRestore]
if ok { if ok {
@ -534,6 +533,11 @@ func (fi *FileInfo) VersionPurgeStatus() VersionPurgeStatusType {
return fi.ReplicationState.CompositeVersionPurgeStatus() return fi.ReplicationState.CompositeVersionPurgeStatus()
} }
// ReplicationStatus returns overall version replication status for this object version across targets
func (fi *FileInfo) ReplicationStatus() replication.StatusType {
return fi.ReplicationState.CompositeReplicationStatus()
}
// DeleteMarkerReplicationStatus returns overall replication status for this delete marker version across targets // DeleteMarkerReplicationStatus returns overall replication status for this delete marker version across targets
func (fi *FileInfo) DeleteMarkerReplicationStatus() replication.StatusType { func (fi *FileInfo) DeleteMarkerReplicationStatus() replication.StatusType {
if fi.Deleted { if fi.Deleted {

View File

@ -1604,11 +1604,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
markDelete := goi.VersionID != "" markDelete := goi.VersionID != ""
// Default deleteMarker to true if object is under versioning // Default deleteMarker to true if object is under versioning
// versioning suspended means we add `null` version as deleteMarker := opts.Versioned
// delete marker, if its not decided already.
deleteMarker := (opts.Versioned || opts.VersionSuspended) && opts.VersionID == "" || (opts.DeleteMarker && opts.VersionID != "")
if markDelete && opts.VersionID != "" { if opts.VersionID != "" {
// case where replica version needs to be deleted on target cluster // case where replica version needs to be deleted on target cluster
if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica { if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica {
markDelete = false markDelete = false
@ -1619,6 +1617,22 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if opts.VersionPurgeStatus() == Complete { if opts.VersionPurgeStatus() == Complete {
markDelete = false markDelete = false
} }
// now, since VersionPurgeStatus() is already set, we can let the
// lower layers decide this. This fixes a regression that was introduced
// in PR #14555 where !VersionPurgeStatus.Empty() is automatically
// considered as Delete marker true to avoid listing such objects by
// regular ListObjects() calls. However for delete replication this
// ends up being a problem because "upon" a successful delete this
// ends up creating a new delete marker that is spurious and unnecessary.
//
// Regression introduced by #14555 was reintroduced in #15564
if versionFound {
if !goi.VersionPurgeStatus.Empty() {
deleteMarker = false
} else if !goi.DeleteMarker { // implies a versioned delete of object
deleteMarker = false
}
}
} }
modTime := opts.MTime modTime := opts.MTime
@ -1628,9 +1642,14 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
fvID := mustGetUUID() fvID := mustGetUUID()
if markDelete && (opts.Versioned || opts.VersionSuspended) { if markDelete && (opts.Versioned || opts.VersionSuspended) {
if !deleteMarker {
// versioning suspended means we add `null` version as
// delete marker, if its not decided already.
deleteMarker = opts.VersionSuspended && opts.VersionID == ""
}
fi := FileInfo{ fi := FileInfo{
Name: object, Name: object,
Deleted: true, Deleted: deleteMarker,
MarkDeleted: markDelete, MarkDeleted: markDelete,
ModTime: modTime, ModTime: modTime,
ReplicationState: opts.DeleteReplication, ReplicationState: opts.DeleteReplication,

View File

@ -0,0 +1,107 @@
#!/usr/bin/env bash
if [ -n "$TEST_DEBUG" ]; then
set -x
fi
trap 'catch $LINENO' ERR
# shellcheck disable=SC2120
catch() {
if [ $# -ne 0 ]; then
echo "error on line $1"
echo "dc1 server logs ========="
cat /tmp/dc1.log
echo "dc2 server logs ========="
cat /tmp/dc2.log
fi
echo "Cleaning up instances of MinIO"
set +e
pkill minio
pkill mc
rm -rf /tmp/xl/
}
catch
set -e
export MINIO_CI_CD=1
export MINIO_BROWSER=off
export MINIO_ROOT_USER="minio"
export MINIO_ROOT_PASSWORD="minio123"
export MINIO_KMS_AUTO_ENCRYPTION=off
export MINIO_PROMETHEUS_AUTH_TYPE=public
export MINIO_KMS_SECRET_KEY=my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw=
unset MINIO_KMS_KES_CERT_FILE
unset MINIO_KMS_KES_KEY_FILE
unset MINIO_KMS_KES_ENDPOINT
unset MINIO_KMS_KES_KEY_NAME
if [ ! -f ./mc ]; then
wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && \
chmod +x mc
fi
mkdir -p /tmp/xl/1/ /tmp/xl/2/
export MINIO_KMS_SECRET_KEY="my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw="
export MINIO_ROOT_USER="minioadmin"
export MINIO_ROOT_PASSWORD="minioadmin"
./minio server --address ":9001" /tmp/xl/1/{1...4}/ 2>&1 > /tmp/dc1.log &
./minio server --address ":9002" /tmp/xl/2/{1...4}/ 2>&1 > /tmp/dc2.log &
sleep 3
export MC_HOST_myminio1=http://minioadmin:minioadmin@localhost:9001
export MC_HOST_myminio2=http://minioadmin:minioadmin@localhost:9002
./mc mb myminio1/testbucket/
./mc version enable myminio1/testbucket/
./mc mb myminio2/testbucket/
./mc version enable myminio2/testbucket/
arn=$(mc admin bucket remote add myminio1/testbucket/ http://minioadmin:minioadmin@localhost:9002/testbucket/ --service "replication" --json | jq -r .RemoteARN)
./mc replicate add myminio1/testbucket --remote-bucket "$arn" --priority 1
./mc cp README.md myminio1/testbucket/dir/file
./mc cp README.md myminio1/testbucket/dir/file
sleep 1s
echo "=== myminio1"
./mc ls --versions myminio1/testbucket/dir/file
echo "=== myminio2"
./mc ls --versions myminio2/testbucket/dir/file
versionId="$(mc ls --json --versions myminio1/testbucket/dir/ | tail -n1 | jq -r .versionId)"
aws s3api --endpoint-url http://localhost:9001 --profile minio delete-object --bucket testbucket --key dir/file --version-id "$versionId"
./mc ls -r --versions myminio1/testbucket > /tmp/myminio1.txt
./mc ls -r --versions myminio2/testbucket > /tmp/myminio2.txt
out=$(diff -qpruN /tmp/myminio1.txt /tmp/myminio2.txt)
ret=$?
if [ $ret -ne 0 ]; then
echo "BUG: expected no missing entries after replication: $out"
exit 1
fi
./mc rm myminio1/testbucket/dir/file
sleep 1s
./mc ls -r --versions myminio1/testbucket > /tmp/myminio1.txt
./mc ls -r --versions myminio2/testbucket > /tmp/myminio2.txt
out=$(diff -qpruN /tmp/myminio1.txt /tmp/myminio2.txt)
ret=$?
if [ $ret -ne 0 ]; then
echo "BUG: expected no missing entries after replication: $out"
exit 1
fi
echo "Success"
catch