multi-delete: Avoid empty Delete tag in the response (#13725)

When removing an object fails, such as when it is WORM protected, a
wrong <Delete> will still be in the response. This commit fixes it.
This commit is contained in:
Anis Elleuch 2021-11-24 19:01:07 +01:00 committed by GitHub
parent fe3e47b1e8
commit 55d4cdd464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 25 deletions

View File

@ -724,9 +724,6 @@ func generateMultiDeleteResponse(quiet bool, deletedObjects []DeletedObject, err
if !quiet {
deleteResp.DeletedObjects = deletedObjects
}
if len(errs) == len(deletedObjects) {
deleteResp.DeletedObjects = nil
}
deleteResp.Errors = errs
return deleteResp
}

View File

@ -415,16 +415,16 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
const maxBodySize = 2 * 100000 * 1024
// Unmarshal list of keys to be deleted.
deleteObjects := &DeleteObjectsRequest{}
if err := xmlDecoder(r.Body, deleteObjects, maxBodySize); err != nil {
deleteObjectsReq := &DeleteObjectsRequest{}
if err := xmlDecoder(r.Body, deleteObjectsReq, maxBodySize); err != nil {
logger.LogIf(ctx, err, logger.Application)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Convert object name delete objects if it has `/` in the beginning.
for i := range deleteObjects.Objects {
deleteObjects.Objects[i].ObjectName = trimLeadingSlash(deleteObjects.Objects[i].ObjectName)
for i := range deleteObjectsReq.Objects {
deleteObjectsReq.Objects[i].ObjectName = trimLeadingSlash(deleteObjectsReq.Objects[i].ObjectName)
}
// Call checkRequestAuthType to populate ReqInfo.AccessKey before GetBucketInfo()
@ -444,7 +444,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
// Return Malformed XML as S3 spec if the number of objects is empty
if len(deleteObjects.Objects) == 0 || len(deleteObjects.Objects) > maxDeleteList {
if len(deleteObjectsReq.Objects) == 0 || len(deleteObjectsReq.Objects) > maxDeleteList {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL)
return
}
@ -461,7 +461,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
goi ObjectInfo
gerr error
)
replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjects.Objects)
replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjectsReq.Objects)
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled {
hasLockEnabled = true
}
@ -469,16 +469,23 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
versioned := globalBucketVersioningSys.Enabled(bucket)
suspended := globalBucketVersioningSys.Suspended(bucket)
dErrs := make([]DeleteError, len(deleteObjects.Objects))
oss := make([]*objSweeper, len(deleteObjects.Objects))
for index, object := range deleteObjects.Objects {
type deleteResult struct {
delInfo DeletedObject
errInfo DeleteError
}
deleteResults := make([]deleteResult, len(deleteObjectsReq.Objects))
oss := make([]*objSweeper, len(deleteObjectsReq.Objects))
for index, object := range deleteObjectsReq.Objects {
if apiErrCode := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); apiErrCode != ErrNone {
if apiErrCode == ErrSignatureDoesNotMatch || apiErrCode == ErrInvalidAccessKeyID {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErrCode), r.URL)
return
}
apiErr := errorCodes.ToAPIErr(apiErrCode)
dErrs[index] = DeleteError{
deleteResults[index].errInfo = DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,
Key: object.ObjectName,
@ -490,7 +497,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
if _, err := uuid.Parse(object.VersionID); err != nil {
logger.LogIf(ctx, fmt.Errorf("invalid version-id specified %w", err))
apiErr := errorCodes.ToAPIErr(ErrNoSuchVersion)
dErrs[index] = DeleteError{
deleteResults[index].errInfo = DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,
Key: object.ObjectName,
@ -536,7 +543,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
if object.VersionID != "" && hasLockEnabled {
if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone {
apiErr := errorCodes.ToAPIErr(apiErrCode)
dErrs[index] = DeleteError{
deleteResults[index].errInfo = DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,
Key: object.ObjectName,
@ -567,7 +574,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
Versioned: versioned,
VersionSuspended: suspended,
})
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs {
// DeleteMarkerVersionID is not used specifically to avoid
// lookup errors, since DeleteMarkerVersionID is only
@ -586,11 +593,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
if replicateDeletes {
dObjects[i].ReplicationState = deleteList[i].ReplicationState()
}
deletedObjects[dindex] = dObjects[i]
deleteResults[dindex].delInfo = dObjects[i]
continue
}
apiErr := toAPIError(ctx, errs[i])
dErrs[dindex] = DeleteError{
deleteResults[dindex].errInfo = DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,
Key: deleteList[i].ObjectName,
@ -598,15 +605,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
}
var deleteErrors []DeleteError
for _, dErr := range dErrs {
if dErr.Code != "" {
deleteErrors = append(deleteErrors, dErr)
// Generate response
var deleteErrors = make([]DeleteError, 0, len(deleteObjectsReq.Objects))
var deletedObjects = make([]DeletedObject, 0, len(deleteObjectsReq.Objects))
for _, deleteResult := range deleteResults {
if deleteResult.errInfo.Code != "" {
deleteErrors = append(deleteErrors, deleteResult.errInfo)
} else {
deletedObjects = append(deletedObjects, deleteResult.delInfo)
}
}
// Generate response
response := generateMultiDeleteResponse(deleteObjects.Quiet, deletedObjects, deleteErrors)
response := generateMultiDeleteResponse(deleteObjectsReq.Quiet, deletedObjects, deleteErrors)
encodedSuccessResponse := encodeResponse(response)
// Write success response.

View File

@ -20,6 +20,7 @@ package cmd
import (
"bytes"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -656,7 +657,7 @@ func testListBucketsHandler(obj ObjectLayer, instanceType, bucketName string, ap
// Wrapper for calling DeleteMultipleObjects HTTP handler tests for both Erasure multiple disks and single node setup.
func TestAPIDeleteMultipleObjectsHandler(t *testing.T) {
ExecObjectLayerAPITest(t, testAPIDeleteMultipleObjectsHandler, []string{"DeleteMultipleObjects"})
ExecObjectLayerAPITest(t, testAPIDeleteMultipleObjectsHandler, []string{"DeleteMultipleObjects", "PutBucketPolicy"})
}
func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
@ -680,6 +681,29 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
objectNames = append(objectNames, objectName)
}
for _, name := range []string{"private/object", "public/object"} {
// Uploading the object with retention enabled
_, err = obj.PutObject(GlobalContext, bucketName, name, mustGetPutObjReader(t, bytes.NewReader(contentBytes), int64(len(contentBytes)), "", sha256sum), ObjectOptions{})
// if object upload fails stop the test.
if err != nil {
t.Fatalf("Put Object %s: Error uploading object: <ERROR> %v", name, err)
}
}
// The following block will create a bucket policy with delete object to 'public/*'. This is
// to test a mixed response of a successful & failure while deleting objects in a single request
policyBytes := []byte(fmt.Sprintf(`{"Id": "Policy1637752602639", "Version": "2012-10-17", "Statement": [{"Sid": "Stmt1637752600730", "Action": "s3:DeleteObject", "Effect": "Allow", "Resource": "arn:aws:s3:::%s/public/*", "Principal": "*"}]}`, bucketName))
rec := httptest.NewRecorder()
req, err := newTestSignedRequestV4(http.MethodPut, getPutPolicyURL("", bucketName), int64(len(policyBytes)), bytes.NewReader(policyBytes),
credentials.AccessKey, credentials.SecretKey, nil)
if err != nil {
t.Fatalf("Failed to create HTTP request for PutBucketPolicyHandler: <ERROR> %v", err)
}
apiRouter.ServeHTTP(rec, req)
if rec.Code != http.StatusNoContent {
t.Errorf("Expected the response status to be `%d`, but instead found `%d`", 200, rec.Code)
}
getObjectToDeleteList := func(objectNames []string) (objectList []ObjectToDelete) {
for _, objectName := range objectNames {
objectList = append(objectList, ObjectToDelete{
@ -705,6 +729,7 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
requestList := []DeleteObjectsRequest{
{Quiet: false, Objects: getObjectToDeleteList(objectNames[:5])},
{Quiet: true, Objects: getObjectToDeleteList(objectNames[5:])},
{Quiet: false, Objects: []ObjectToDelete{{ObjectName: "private/object"}, {ObjectName: "public/object"}}},
}
// generate multi objects delete response.
@ -741,6 +766,20 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
anonResponse := generateMultiDeleteResponse(requestList[0].Quiet, nil, getDeleteErrorList(requestList[0].Objects))
encodedAnonResponse := encodeResponse(anonResponse)
anonRequestWithPartialPublicAccess := encodeResponse(requestList[2])
anonResponseWithPartialPublicAccess := generateMultiDeleteResponse(requestList[2].Quiet,
[]DeletedObject{
{ObjectName: "public/object"},
},
[]DeleteError{
{
Code: errorCodes[ErrAccessDenied].Code,
Message: errorCodes[ErrAccessDenied].Description,
Key: "private/object",
},
})
encodedAnonResponseWithPartialPublicAccess := encodeResponse(anonResponseWithPartialPublicAccess)
testCases := []struct {
bucket string
objects []byte
@ -800,6 +839,17 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
expectedContent: encodedAnonResponse,
expectedRespStatus: http.StatusOK,
},
// Test case - 6.
// Anonymous user has access to some public folder, issue removing with
// another private object as well
{
bucket: bucketName,
objects: anonRequestWithPartialPublicAccess,
accessKey: "",
secretKey: "",
expectedContent: encodedAnonResponseWithPartialPublicAccess,
expectedRespStatus: http.StatusOK,
},
}
for i, testCase := range testCases {