relax pre-emptive GetBucketInfo() for multi-object delete (#19035)

This commit is contained in:
Harshavardhana 2024-02-12 08:46:46 -08:00 committed by GitHub
parent 4fa06aefc6
commit 6d381f7c0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 29 additions and 13 deletions

View File

@ -467,13 +467,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
// Ignore errors here to preserve the S3 error behavior of GetBucketInfo() // Ignore errors here to preserve the S3 error behavior of GetBucketInfo()
checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, "") checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, "")
// Before proceeding validate if bucket exists.
_, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
deleteObjectsFn := objectAPI.DeleteObjects deleteObjectsFn := objectAPI.DeleteObjects
// Return Malformed XML as S3 spec if the number of objects is empty // Return Malformed XML as S3 spec if the number of objects is empty
@ -611,6 +604,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
VersionSuspended: vc.Suspended(), VersionSuspended: vc.Suspended(),
}) })
// Are all objects saying bucket not found?
if isAllBucketsNotFound(errs) {
writeErrorResponse(ctx, w, toAPIError(ctx, errs[0]), r.URL)
return
}
for i := range errs { for i := range errs {
// DeleteMarkerVersionID is not used specifically to avoid // DeleteMarkerVersionID is not used specifically to avoid
// lookup errors, since DeleteMarkerVersionID is only // lookup errors, since DeleteMarkerVersionID is only
@ -618,7 +617,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
// specify a versionID. // specify a versionID.
objToDel := ObjectToDelete{ objToDel := ObjectToDelete{
ObjectV: ObjectV{ ObjectV: ObjectV{
ObjectName: dObjects[i].ObjectName, ObjectName: decodeDirObject(dObjects[i].ObjectName),
VersionID: dObjects[i].VersionID, VersionID: dObjects[i].VersionID,
}, },
VersionPurgeStatus: dObjects[i].VersionPurgeStatus(), VersionPurgeStatus: dObjects[i].VersionPurgeStatus(),

View File

@ -880,6 +880,15 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
expectedContent: encodedAnonResponseWithPartialPublicAccess, expectedContent: encodedAnonResponseWithPartialPublicAccess,
expectedRespStatus: http.StatusOK, expectedRespStatus: http.StatusOK,
}, },
// Test case - 7.
// Bucket does not exist.
7: {
bucket: "unknown-bucket-name",
objects: successRequest0,
accessKey: credentials.AccessKey,
secretKey: credentials.SecretKey,
expectedRespStatus: http.StatusNotFound,
},
} }
for i, testCase := range testCases { for i, testCase := range testCases {
@ -888,13 +897,12 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
// Generate a signed or anonymous request based on the testCase // Generate a signed or anonymous request based on the testCase
if testCase.accessKey != "" { if testCase.accessKey != "" {
req, err = newTestSignedRequestV4(http.MethodPost, getDeleteMultipleObjectsURL("", bucketName), req, err = newTestSignedRequestV4(http.MethodPost, getDeleteMultipleObjectsURL("", testCase.bucket),
int64(len(testCase.objects)), bytes.NewReader(testCase.objects), testCase.accessKey, testCase.secretKey, nil) int64(len(testCase.objects)), bytes.NewReader(testCase.objects), testCase.accessKey, testCase.secretKey, nil)
} else { } else {
req, err = newTestRequest(http.MethodPost, getDeleteMultipleObjectsURL("", bucketName), req, err = newTestRequest(http.MethodPost, getDeleteMultipleObjectsURL("", testCase.bucket),
int64(len(testCase.objects)), bytes.NewReader(testCase.objects)) int64(len(testCase.objects)), bytes.NewReader(testCase.objects))
} }
if err != nil { if err != nil {
t.Fatalf("Failed to create HTTP request for DeleteMultipleObjects: <ERROR> %v", err) t.Fatalf("Failed to create HTTP request for DeleteMultipleObjects: <ERROR> %v", err)
} }

View File

@ -861,8 +861,12 @@ func isAllBucketsNotFound(errs []error) bool {
} }
notFoundCount := 0 notFoundCount := 0
for _, err := range errs { for _, err := range errs {
if err != nil && errors.Is(err, errVolumeNotFound) { if err != nil {
notFoundCount++ if errors.Is(err, errVolumeNotFound) {
notFoundCount++
} else if isErrBucketNotFound(err) {
notFoundCount++
}
} }
} }
return len(errs) == notFoundCount return len(errs) == notFoundCount

View File

@ -1085,6 +1085,11 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
} }
if len(buf) == 0 { if len(buf) == 0 {
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
return errVolumeNotFound
}
}
return errFileNotFound return errFileNotFound
} }