From 6a6c930f5b13f4416da85cec7e37ea52e77b8245 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 1 Feb 2017 11:16:17 -0800 Subject: [PATCH] xl: Abort multipart upload should honor quorum properly. (#3670) Current implementation didn't honor quorum properly and didn't handle the errors generated properly. This patch addresses that and also moves common code `cleanupMultipartUploads` into xl specific private function. Fixes #3665 --- cmd/erasure-createfile.go | 4 -- cmd/object-api-multipart-common.go | 40 ---------------- cmd/to_err_test.go | 31 ++++++++++++ cmd/xl-v1-bucket.go | 24 +++------- cmd/xl-v1-healing.go | 12 ++--- cmd/xl-v1-metadata.go | 14 +++--- cmd/xl-v1-multipart.go | 75 +++++++++++++++++++----------- cmd/xl-v1-object.go | 31 ++---------- cmd/xl-v1-utils.go | 17 ------- 9 files changed, 100 insertions(+), 148 deletions(-) create mode 100644 cmd/to_err_test.go diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index d20ca8060..f95c0c18c 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -137,9 +137,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash // Wait for all the appends to finish. wg.Wait() - // Do we have write quorum?. - if !isDiskQuorum(wErrs, writeQuorum) { - return traceError(errXLWriteQuorum) - } return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum) } diff --git a/cmd/object-api-multipart-common.go b/cmd/object-api-multipart-common.go index 448cca141..5540e6a43 100644 --- a/cmd/object-api-multipart-common.go +++ b/cmd/object-api-multipart-common.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "path" "sort" - "sync" "time" "github.com/minio/minio/pkg/lock" @@ -161,45 +160,6 @@ func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) return nil } -// Wrapper which removes all the uploaded parts. -func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { - var errs = make([]error, len(storageDisks)) - var wg = &sync.WaitGroup{} - - // Construct uploadIDPath. - uploadIDPath := path.Join(bucket, object, uploadID) - - // Cleanup uploadID for all disks. - for index, disk := range storageDisks { - if disk == nil { - errs[index] = traceError(errDiskNotFound) - continue - } - wg.Add(1) - // Cleanup each uploadID in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - err := cleanupDir(disk, minioMetaMultipartBucket, uploadIDPath) - if err != nil { - errs[index] = err - return - } - errs[index] = nil - }(index, disk) - } - - // Wait for all the cleanups to finish. - wg.Wait() - - // Return first error. - for _, err := range errs { - if err != nil { - return err - } - } - return nil -} - // listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) { var uploads []uploadMetadata diff --git a/cmd/to_err_test.go b/cmd/to_err_test.go new file mode 100644 index 000000000..2311b3619 --- /dev/null +++ b/cmd/to_err_test.go @@ -0,0 +1,31 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "testing" + +func TestToErrIsNil(t *testing.T) { + if toObjectErr(nil) != nil { + t.Errorf("Test expected to return nil, failed instead got a non-nil value %s", toObjectErr(nil)) + } + if toStorageErr(nil) != nil { + t.Errorf("Test expected to return nil, failed instead got a non-nil value %s", toStorageErr(nil)) + } + if toAPIErrorCode(nil) != ErrNone { + t.Errorf("Test expected error code to be ErrNone, failed instead provided %d", toAPIErrorCode(nil)) + } +} diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 52e993dcf..4078b7292 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -62,18 +62,12 @@ func (xl xlObjects) MakeBucket(bucket string) error { // Wait for all make vol to finish. wg.Wait() - // Do we have write quorum?. - if !isDiskQuorum(dErrs, xl.writeQuorum) { + err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum) + if errorCause(err) == errXLWriteQuorum { // Purge successfully created buckets if we don't have writeQuorum. undoMakeBucket(xl.storageDisks, bucket) - return toObjectErr(traceError(errXLWriteQuorum), bucket) } - - // Verify we have any other errors which should undo make bucket. - if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { - return toObjectErr(reducedErr, bucket) - } - return nil + return toObjectErr(err, bucket) } func (xl xlObjects) undoDeleteBucket(bucket string) { @@ -253,15 +247,9 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Wait for all the delete vols to finish. wg.Wait() - if !isDiskQuorum(dErrs, xl.writeQuorum) { + err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum) + if errorCause(err) == errXLWriteQuorum { xl.undoDeleteBucket(bucket) - return toObjectErr(traceError(errXLWriteQuorum), bucket) } - - if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { - return toObjectErr(reducedErr, bucket) - } - - // Success. - return nil + return toObjectErr(err, bucket) } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 1cdb9ce6f..d0aaf56a7 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -109,18 +109,12 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error // Wait for all make vol to finish. wg.Wait() - // Do we have write quorum?. - if !isDiskQuorum(dErrs, writeQuorum) { + reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum) + if errorCause(reducedErr) == errXLWriteQuorum { // Purge successfully created buckets if we don't have writeQuorum. undoMakeBucket(storageDisks, bucket) - return toObjectErr(traceError(errXLWriteQuorum), bucket) } - - // Verify we have any other errors which should be returned as failure. - if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum); reducedErr != nil { - return toObjectErr(reducedErr, bucket) - } - return nil + return reducedErr } // Heals all the metadata associated for a given bucket, this function diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 05149c48e..243dab9fe 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -368,13 +368,12 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] // Wait for all the routines. wg.Wait() - // Do we have write quorum?. - if !isDiskQuorum(mErrs, quorum) { + err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum) + if errorCause(err) == errXLWriteQuorum { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return traceError(errXLWriteQuorum) } - return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum) + return err } // writeSameXLMetadata - write `xl.json` on all disks in order. @@ -407,11 +406,10 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet // Wait for all the routines. wg.Wait() - // Do we have write Quorum?. - if !isDiskQuorum(mErrs, writeQuorum) { + err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, writeQuorum) + if errorCause(err) == errXLWriteQuorum { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return traceError(errXLWriteQuorum) } - return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, writeQuorum) + return err } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index cbaa58f7f..51624784c 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -96,8 +96,8 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated // Wait for all the writes to finish. wg.Wait() - // Do we have write quorum? - if !isDiskQuorum(errs, xl.writeQuorum) { + err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + if errorCause(err) == errXLWriteQuorum { // No quorum. Perform cleanup on the minority of disks // on which the operation succeeded. @@ -131,7 +131,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated }(index, disk) } wg.Wait() - return traceError(errXLWriteQuorum) + return err } // we do have quorum, so in case of delete upload.json file @@ -148,11 +148,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated }(index, disk) } wg.Wait() - - if reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { - return reducedErr - } - return nil + return err } // addUploadID - add upload ID and its initiated time to 'uploads.json'. @@ -266,17 +262,12 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr // Wait for all the routines. wg.Wait() - // Do we have write Quorum?. - if !isDiskQuorum(mErrs, quorum) { + err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum) + if errorCause(err) == errXLWriteQuorum { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs) - return traceError(errXLWriteQuorum) } - - if reducedErr := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum); reducedErr != nil { - return reducedErr - } - return nil + return err } // listMultipartUploads - lists all multipart uploads. @@ -600,9 +591,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) - if !isDiskQuorum(errs, xl.writeQuorum) { + reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + if errorCause(reducedErr) == errXLWriteQuorum { preUploadIDLock.RUnlock() - return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object) + return PartInfo{}, toObjectErr(reducedErr, bucket, object) } preUploadIDLock.RUnlock() @@ -720,8 +712,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath) - if !isDiskQuorum(errs, xl.writeQuorum) { - return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object) + reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + if errorCause(reducedErr) == errXLWriteQuorum { + return PartInfo{}, toObjectErr(reducedErr, bucket, object) } // Get current highest version based on re-read partsMetadata. @@ -902,9 +895,9 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) - // Do we have writeQuorum?. - if !isDiskQuorum(errs, xl.writeQuorum) { - return ObjectInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object) + reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + if errorCause(reducedErr) == errXLWriteQuorum { + return ObjectInfo{}, toObjectErr(reducedErr, bucket, object) } onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) @@ -1077,13 +1070,44 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return objInfo, nil } +// Wrapper which removes all the uploaded parts. +func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error { + var errs = make([]error, len(xl.storageDisks)) + var wg = &sync.WaitGroup{} + + // Construct uploadIDPath. + uploadIDPath := path.Join(bucket, object, uploadID) + + // Cleanup uploadID for all disks. + for index, disk := range xl.storageDisks { + if disk == nil { + errs[index] = traceError(errDiskNotFound) + continue + } + wg.Add(1) + // Cleanup each uploadID in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + err := cleanupDir(disk, minioMetaMultipartBucket, uploadIDPath) + if err != nil { + errs[index] = err + } + }(index, disk) + } + + // Wait for all the cleanups to finish. + wg.Wait() + + return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) +} + // abortMultipartUpload - wrapper for purging an ongoing multipart // transaction, deletes uploadID entry from `uploads.json` and purges // the directory at '.minio.sys/multipart/bucket/object/uploadID' holding // all the upload parts. func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) { // Cleanup all uploaded parts. - if err = cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { + if err = xl.cleanupUploadedParts(bucket, object, uploadID); err != nil { return toObjectErr(err, bucket, object) } @@ -1129,6 +1153,5 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error if !xl.isUploadIDExists(bucket, object, uploadID) { return traceError(InvalidUploadID{UploadID: uploadID}) } - err := xl.abortMultipartUpload(bucket, object, uploadID) - return err + return xl.abortMultipartUpload(bucket, object, uploadID) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index d25c41d2e..134af0a43 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -34,12 +34,7 @@ import ( ) // list all errors which can be ignored in object operations. -var objectOpIgnoredErrs = []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errFaultyRemoteDisk, -} +var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) /// Object Operations @@ -49,11 +44,6 @@ var objectOpIgnoredErrs = []error{ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (ObjectInfo, error) { // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, srcBucket, srcObject) - // Do we have read quorum? - if !isDiskQuorum(errs, xl.readQuorum) { - return ObjectInfo{}, traceError(InsufficientReadQuorum{}, errs...) - } - if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { return ObjectInfo{}, toObjectErr(reducedErr, srcBucket, srcObject) } @@ -159,11 +149,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) - // Do we have read quorum? - if !isDiskQuorum(errs, xl.readQuorum) { - return traceError(InsufficientReadQuorum{}, errs...) - } - if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket, object) } @@ -416,12 +401,12 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. Cleanup successful renames. - if !isDiskQuorum(errs, quorum) { + err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum) + if errorCause(err) == errXLWriteQuorum { // Undo all the partial rename operations. undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs) - return traceError(errXLWriteQuorum) } - return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum) + return err } // renamePart - renames a part of the source object to the destination @@ -751,13 +736,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Wait for all routines to finish. wg.Wait() - // Do we have write quorum? - if !isDiskQuorum(dErrs, xl.writeQuorum) { - // Return errXLWriteQuorum if errors were more than allowed write quorum. - return traceError(errXLWriteQuorum) - } - - return nil + return reduceWriteQuorumErrs(dErrs, objectOpIgnoredErrs, xl.writeQuorum) } // DeleteObject - deletes an object, this call doesn't necessary reply diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 4a40fdae7..ecedbad1e 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -83,23 +83,6 @@ func reduceWriteQuorumErrs(errs []error, ignoredErrs []error, writeQuorum int) ( return reduceQuorumErrs(errs, ignoredErrs, writeQuorum, errXLWriteQuorum) } -// List of all errors which are ignored while verifying quorum. -var quorumIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) - -// Validates if we have quorum based on the errors related to disk only. -// Returns 'true' if we have quorum, 'false' if we don't. -func isDiskQuorum(errs []error, minQuorumCount int) bool { - var count int - errs = errorsCause(errs) - for _, err := range errs { - // Check if the error can be ignored for quorum verification. - if !isErrIgnored(err, quorumIgnoredErrs...) { - count++ - } - } - return count >= minQuorumCount -} - // Similar to 'len(slice)' but returns the actual elements count // skipping the unallocated elements. func diskCount(disks []StorageAPI) int {