fix: bug in passing Versioned field set for getHealReplicationInfo() (#17498)

Bonus: rejects prefix deletes on object-locked buckets earlier
This commit is contained in:
Harshavardhana 2023-06-27 09:45:50 -07:00 committed by GitHub
parent d3e5e607a7
commit 1818764840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 37 deletions

View File

@ -83,12 +83,6 @@ func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locke
// governance bypass headers are set and user has governance bypass permissions.
// Objects in "Compliance" mode can be overwritten only if retention date is past.
func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) APIErrorCode {
opts, err := getOpts(ctx, r, bucket, object.ObjectName)
if err != nil {
return toAPIErrorCode(ctx, err)
}
opts.VersionID = object.VersionID
if gerr != nil { // error from GetObjectInfo
if _, ok := gerr.(MethodNotAllowed); ok {
// This happens usually for a delete marker

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -512,7 +512,10 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl
ObjectName: oi.Name,
VersionID: oi.VersionID,
},
}, oi, ObjectOptions{VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name)}, nil)
}, oi, ObjectOptions{
Versioned: globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name),
VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name),
}, nil)
} else {
dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{
UserDefined: oi.UserDefined,

View File

@ -2298,6 +2298,14 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
return
}
replica := r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String()
if replica {
if s3Error := checkRequestAuthType(ctx, r, policy.ReplicateDeleteAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
}
if globalDNSConfig != nil {
_, err := globalDNSConfig.Get(bucket)
if err != nil && err != dns.ErrNotImplemented {
@ -2311,10 +2319,12 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
var (
goi ObjectInfo
gerr error
)
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
if rcfg.LockEnabled && opts.DeletePrefix {
writeErrorResponse(ctx, w, toAPIError(ctx, errors.New("force-delete is forbidden in a locked-enabled bucket")), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
@ -2327,7 +2337,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
// the null version's remote object to delete if
// transitioned.
goiOpts := os.GetOpts()
goi, gerr = getObjectInfo(ctx, bucket, object, goiOpts)
goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts)
if gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
@ -2338,17 +2348,12 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionID: opts.VersionID,
},
}, goi, opts, gerr)
vID := opts.VersionID
if dsc.ReplicateAny() {
opts.SetDeleteReplicationState(dsc, opts.VersionID)
}
vID := opts.VersionID
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
// check if replica has permission to be deleted.
if apiErrCode := checkRequestAuthType(ctx, r, policy.ReplicateDeleteAction, bucket, object); apiErrCode != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErrCode), r.URL)
return
}
if replica {
opts.SetReplicaStatus(replication.Replica)
if opts.VersionPurgeStatus().Empty() {
// opts.VersionID holds delete marker version ID to replicate and not yet present on disk
@ -2357,23 +2362,17 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
apiErr := ErrNone
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled {
if opts.DeletePrefix {
writeErrorResponse(ctx, w, toAPIError(ctx, errors.New("force-delete is forbidden in a locked-enabled bucket")), r.URL)
if vID != "" {
apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: vID,
},
}, goi, gerr)
if apiErr != ErrNone && apiErr != ErrNoSuchKey {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErr), r.URL)
return
}
if vID != "" {
apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: vID,
},
}, goi, gerr)
if apiErr != ErrNone && apiErr != ErrNoSuchKey {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErr), r.URL)
return
}
}
}
if apiErr == ErrNoSuchKey {

View File

@ -84,7 +84,7 @@ remote_arn=$(./mc replicate ls sitea/bucket --json | jq -r .rule.Destination.Buc
sleep 1
./mc replicate resync start sitea/bucket/ --remote-bucket "${remote_arn}"
sleep 10s ## sleep for 10s idea is that we give 100ms per object.
sleep 20s ## sleep for 20s idea is that we give 200ms per object.
count=$(./mc replicate resync status sitea/bucket --remote-bucket "${remote_arn}" --json | jq .resyncInfo.target[].replicationCount)