mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
xl: Abort multipart upload should honor quorum properly. (#3670)
Current implementation didn't honor quorum properly and didn't handle the errors generated properly. This patch addresses that and also moves common code `cleanupMultipartUploads` into xl specific private function. Fixes #3665
This commit is contained in:
parent
35ca3e5d9b
commit
6a6c930f5b
@ -137,9 +137,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
|
||||
// Wait for all the appends to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isDiskQuorum(wErrs, writeQuorum) {
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/lock"
|
||||
@ -161,45 +160,6 @@ func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wrapper which removes all the uploaded parts.
|
||||
func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error {
|
||||
var errs = make([]error, len(storageDisks))
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
// Construct uploadIDPath.
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
|
||||
// Cleanup uploadID for all disks.
|
||||
for index, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
errs[index] = traceError(errDiskNotFound)
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Cleanup each uploadID in a routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
err := cleanupDir(disk, minioMetaMultipartBucket, uploadIDPath)
|
||||
if err != nil {
|
||||
errs[index] = err
|
||||
return
|
||||
}
|
||||
errs[index] = nil
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all the cleanups to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Return first error.
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'.
|
||||
func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) {
|
||||
var uploads []uploadMetadata
|
||||
|
31
cmd/to_err_test.go
Normal file
31
cmd/to_err_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestToErrIsNil(t *testing.T) {
|
||||
if toObjectErr(nil) != nil {
|
||||
t.Errorf("Test expected to return nil, failed instead got a non-nil value %s", toObjectErr(nil))
|
||||
}
|
||||
if toStorageErr(nil) != nil {
|
||||
t.Errorf("Test expected to return nil, failed instead got a non-nil value %s", toStorageErr(nil))
|
||||
}
|
||||
if toAPIErrorCode(nil) != ErrNone {
|
||||
t.Errorf("Test expected error code to be ErrNone, failed instead provided %d", toAPIErrorCode(nil))
|
||||
}
|
||||
}
|
@ -62,18 +62,12 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
||||
// Wait for all make vol to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
||||
err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// Purge successfully created buckets if we don't have writeQuorum.
|
||||
undoMakeBucket(xl.storageDisks, bucket)
|
||||
return toObjectErr(traceError(errXLWriteQuorum), bucket)
|
||||
}
|
||||
|
||||
// Verify we have any other errors which should undo make bucket.
|
||||
if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil {
|
||||
return toObjectErr(reducedErr, bucket)
|
||||
}
|
||||
return nil
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
func (xl xlObjects) undoDeleteBucket(bucket string) {
|
||||
@ -253,15 +247,9 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
|
||||
// Wait for all the delete vols to finish.
|
||||
wg.Wait()
|
||||
|
||||
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
||||
err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
xl.undoDeleteBucket(bucket)
|
||||
return toObjectErr(traceError(errXLWriteQuorum), bucket)
|
||||
}
|
||||
|
||||
if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil {
|
||||
return toObjectErr(reducedErr, bucket)
|
||||
}
|
||||
|
||||
// Success.
|
||||
return nil
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
|
@ -109,18 +109,12 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error
|
||||
// Wait for all make vol to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isDiskQuorum(dErrs, writeQuorum) {
|
||||
reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum)
|
||||
if errorCause(reducedErr) == errXLWriteQuorum {
|
||||
// Purge successfully created buckets if we don't have writeQuorum.
|
||||
undoMakeBucket(storageDisks, bucket)
|
||||
return toObjectErr(traceError(errXLWriteQuorum), bucket)
|
||||
}
|
||||
|
||||
// Verify we have any other errors which should be returned as failure.
|
||||
if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum); reducedErr != nil {
|
||||
return toObjectErr(reducedErr, bucket)
|
||||
}
|
||||
return nil
|
||||
return reducedErr
|
||||
}
|
||||
|
||||
// Heals all the metadata associated for a given bucket, this function
|
||||
|
@ -368,13 +368,12 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isDiskQuorum(mErrs, quorum) {
|
||||
err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// Delete all `xl.json` successfully renamed.
|
||||
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum)
|
||||
return err
|
||||
}
|
||||
|
||||
// writeSameXLMetadata - write `xl.json` on all disks in order.
|
||||
@ -407,11 +406,10 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write Quorum?.
|
||||
if !isDiskQuorum(mErrs, writeQuorum) {
|
||||
err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, writeQuorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// Delete all `xl.json` successfully renamed.
|
||||
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, writeQuorum)
|
||||
return err
|
||||
}
|
||||
|
@ -96,8 +96,8 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
|
||||
// Wait for all the writes to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?
|
||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// No quorum. Perform cleanup on the minority of disks
|
||||
// on which the operation succeeded.
|
||||
|
||||
@ -131,7 +131,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
return traceError(errXLWriteQuorum)
|
||||
return err
|
||||
}
|
||||
|
||||
// we do have quorum, so in case of delete upload.json file
|
||||
@ -148,11 +148,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum); reducedErr != nil {
|
||||
return reducedErr
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// addUploadID - add upload ID and its initiated time to 'uploads.json'.
|
||||
@ -266,17 +262,12 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
|
||||
// Wait for all the routines.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write Quorum?.
|
||||
if !isDiskQuorum(mErrs, quorum) {
|
||||
err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// Delete all `xl.json` successfully renamed.
|
||||
deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs)
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
|
||||
if reducedErr := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum); reducedErr != nil {
|
||||
return reducedErr
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// listMultipartUploads - lists all multipart uploads.
|
||||
@ -600,9 +591,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket,
|
||||
uploadIDPath)
|
||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(reducedErr) == errXLWriteQuorum {
|
||||
preUploadIDLock.RUnlock()
|
||||
return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object)
|
||||
return PartInfo{}, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
preUploadIDLock.RUnlock()
|
||||
|
||||
@ -720,8 +712,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
||||
|
||||
// Read metadata again because it might be updated with parallel upload of another part.
|
||||
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath)
|
||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||
return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object)
|
||||
reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(reducedErr) == errXLWriteQuorum {
|
||||
return PartInfo{}, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
// Get current highest version based on re-read partsMetadata.
|
||||
@ -902,9 +895,9 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
|
||||
// Do we have writeQuorum?.
|
||||
if !isDiskQuorum(errs, xl.writeQuorum) {
|
||||
return ObjectInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object)
|
||||
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
if errorCause(reducedErr) == errXLWriteQuorum {
|
||||
return ObjectInfo{}, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
|
||||
@ -1077,13 +1070,44 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return objInfo, nil
|
||||
}
|
||||
|
||||
// Wrapper which removes all the uploaded parts.
|
||||
func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error {
|
||||
var errs = make([]error, len(xl.storageDisks))
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
// Construct uploadIDPath.
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
|
||||
// Cleanup uploadID for all disks.
|
||||
for index, disk := range xl.storageDisks {
|
||||
if disk == nil {
|
||||
errs[index] = traceError(errDiskNotFound)
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Cleanup each uploadID in a routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
err := cleanupDir(disk, minioMetaMultipartBucket, uploadIDPath)
|
||||
if err != nil {
|
||||
errs[index] = err
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all the cleanups to finish.
|
||||
wg.Wait()
|
||||
|
||||
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
}
|
||||
|
||||
// abortMultipartUpload - wrapper for purging an ongoing multipart
|
||||
// transaction, deletes uploadID entry from `uploads.json` and purges
|
||||
// the directory at '.minio.sys/multipart/bucket/object/uploadID' holding
|
||||
// all the upload parts.
|
||||
func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) {
|
||||
// Cleanup all uploaded parts.
|
||||
if err = cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil {
|
||||
if err = xl.cleanupUploadedParts(bucket, object, uploadID); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -1129,6 +1153,5 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
return traceError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
err := xl.abortMultipartUpload(bucket, object, uploadID)
|
||||
return err
|
||||
return xl.abortMultipartUpload(bucket, object, uploadID)
|
||||
}
|
||||
|
@ -34,12 +34,7 @@ import (
|
||||
)
|
||||
|
||||
// list all errors which can be ignored in object operations.
|
||||
var objectOpIgnoredErrs = []error{
|
||||
errDiskNotFound,
|
||||
errDiskAccessDenied,
|
||||
errFaultyDisk,
|
||||
errFaultyRemoteDisk,
|
||||
}
|
||||
var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
|
||||
|
||||
/// Object Operations
|
||||
|
||||
@ -49,11 +44,6 @@ var objectOpIgnoredErrs = []error{
|
||||
func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (ObjectInfo, error) {
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(xl.storageDisks, srcBucket, srcObject)
|
||||
// Do we have read quorum?
|
||||
if !isDiskQuorum(errs, xl.readQuorum) {
|
||||
return ObjectInfo{}, traceError(InsufficientReadQuorum{}, errs...)
|
||||
}
|
||||
|
||||
if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil {
|
||||
return ObjectInfo{}, toObjectErr(reducedErr, srcBucket, srcObject)
|
||||
}
|
||||
@ -159,11 +149,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
||||
// Do we have read quorum?
|
||||
if !isDiskQuorum(errs, xl.readQuorum) {
|
||||
return traceError(InsufficientReadQuorum{}, errs...)
|
||||
}
|
||||
|
||||
if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil {
|
||||
return toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
@ -416,12 +401,12 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
|
||||
|
||||
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
if !isDiskQuorum(errs, quorum) {
|
||||
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum)
|
||||
if errorCause(err) == errXLWriteQuorum {
|
||||
// Undo all the partial rename operations.
|
||||
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs)
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum)
|
||||
return err
|
||||
}
|
||||
|
||||
// renamePart - renames a part of the source object to the destination
|
||||
@ -751,13 +736,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
|
||||
// Wait for all routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?
|
||||
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
||||
// Return errXLWriteQuorum if errors were more than allowed write quorum.
|
||||
return traceError(errXLWriteQuorum)
|
||||
}
|
||||
|
||||
return nil
|
||||
return reduceWriteQuorumErrs(dErrs, objectOpIgnoredErrs, xl.writeQuorum)
|
||||
}
|
||||
|
||||
// DeleteObject - deletes an object, this call doesn't necessary reply
|
||||
|
@ -83,23 +83,6 @@ func reduceWriteQuorumErrs(errs []error, ignoredErrs []error, writeQuorum int) (
|
||||
return reduceQuorumErrs(errs, ignoredErrs, writeQuorum, errXLWriteQuorum)
|
||||
}
|
||||
|
||||
// List of all errors which are ignored while verifying quorum.
|
||||
var quorumIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
|
||||
|
||||
// Validates if we have quorum based on the errors related to disk only.
|
||||
// Returns 'true' if we have quorum, 'false' if we don't.
|
||||
func isDiskQuorum(errs []error, minQuorumCount int) bool {
|
||||
var count int
|
||||
errs = errorsCause(errs)
|
||||
for _, err := range errs {
|
||||
// Check if the error can be ignored for quorum verification.
|
||||
if !isErrIgnored(err, quorumIgnoredErrs...) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count >= minQuorumCount
|
||||
}
|
||||
|
||||
// Similar to 'len(slice)' but returns the actual elements count
|
||||
// skipping the unallocated elements.
|
||||
func diskCount(disks []StorageAPI) int {
|
||||
|
Loading…
x
Reference in New Issue
Block a user