diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 1308799ea..38d0659a2 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -30,6 +30,10 @@ func (xl xlObjects) MakeBucket(bucket string) error { if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } + // Verify if bucket is found. + if xl.isBucketExist(bucket) { + return toObjectErr(errVolumeExists, bucket) + } nsMutex.Lock(bucket, "") defer nsMutex.Unlock(bucket, "") @@ -68,20 +72,36 @@ func (xl xlObjects) MakeBucket(bucket string) error { } // Verify we have any other errors which should undo make bucket. - for _, err := range dErrs { - // Bucket already exists, return BucketExists error. - if err == errVolumeExists { - return toObjectErr(errVolumeExists, bucket) - } - // Undo make bucket for any other errors. - if err != nil && err != errDiskNotFound { - xl.undoMakeBucket(bucket) - return toObjectErr(err, bucket) - } + if reducedErr := reduceErrs(dErrs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }); reducedErr != nil { + return toObjectErr(reducedErr, bucket) } return nil } +func (xl xlObjects) undoDeleteBucket(bucket string) { + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + // Undo previous make bucket entry on all underlying storage disks. + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + // Delete a bucket inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + _ = disk.MakeVol(bucket) + }(index, disk) + } + + // Wait for all make vol to finish. + wg.Wait() +} + // undo make bucket operation upon quorum failure. func (xl xlObjects) undoMakeBucket(bucket string) { // Initialize sync waitgroup. @@ -113,7 +133,7 @@ var bucketMetadataOpIgnoredErrs = []error{ // getBucketInfo - returns the BucketInfo from one of the load balanced disks. func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -169,7 +189,7 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { // listBuckets - returns list of all buckets from a disk picked at random. func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -220,13 +240,15 @@ func (xl xlObjects) DeleteBucket(bucket string) error { if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } + // Verify if bucket is found. + if !xl.isBucketExist(bucket) { + return BucketNotFound{Bucket: bucket} + } nsMutex.Lock(bucket, "") defer nsMutex.Unlock(bucket, "") // Collect if all disks report volume not found. - var volumeNotFoundErrCnt int - var wg = &sync.WaitGroup{} var dErrs = make([]error, len(xl.storageDisks)) @@ -257,21 +279,17 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Wait for all the delete vols to finish. wg.Wait() - // Count the errors for known errors, return quickly if we found an unknown error. - for _, err := range dErrs { - if err != nil { - if isErrIgnored(err, objMetadataOpIgnoredErrs) { - volumeNotFoundErrCnt++ - continue - } - return toObjectErr(err, bucket) - } + if !isDiskQuorum(dErrs, xl.writeQuorum) { + xl.undoDeleteBucket(bucket) + return toObjectErr(errXLWriteQuorum, bucket) } - // Return errVolumeNotFound if all disks report volume not found. - if volumeNotFoundErrCnt == len(xl.storageDisks) { - return toObjectErr(errVolumeNotFound, bucket) + if reducedErr := reduceErrs(dErrs, []error{ + errFaultyDisk, + errDiskNotFound, + errDiskAccessDenied, + }); reducedErr != nil { + return toObjectErr(reducedErr, bucket) } - return nil } diff --git a/xl-v1-common.go b/xl-v1-common.go index 01185014b..f0f9edc59 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -21,15 +21,7 @@ import ( "time" ) -// getLoadBalancedQuorumDisks - fetches load balanced sufficiently -// randomized quorum disk slice. -func (xl xlObjects) getLoadBalancedQuorumDisks() (disks []StorageAPI) { - // It is okay to have readQuorum disks. - return xl.getLoadBalancedDisks()[0 : xl.readQuorum-1] -} - -// getLoadBalancedDisks - fetches load balanced (sufficiently -// randomized) disk slice. +// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. func (xl xlObjects) getLoadBalancedDisks() (disks []StorageAPI) { // Based on the random shuffling return back randomized disks. for _, i := range hashOrder(time.Now().UTC().String(), len(xl.storageDisks)) { @@ -60,7 +52,7 @@ func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { // isObject - returns `true` if the prefix is an object i.e if // `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index c96176666..fc49a7e3c 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -30,7 +30,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := xl.isObject - listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...) + listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...) walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 814ff2ed5..32e343fc6 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -230,7 +230,7 @@ var objMetadataOpIgnoredErrs = []error{ // readXLMetadata - returns the object metadata `xl.json` content from // one of the disks picked at random. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index b3b2b9cea..2cec00227 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -181,7 +181,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t // Returns if the prefix is a multipart upload. func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -200,7 +200,7 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { // listUploadsInfo - list all uploads info. func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) { - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -252,7 +252,7 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) // statPart - returns fileInfo structure for a successful stat on part file. func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) { partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName) - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 0c27ad315..325ce3543 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -64,7 +64,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // uploadIDMarker first. if uploadIDMarker != "" { nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) - for _, disk := range xl.getLoadBalancedQuorumDisks() { + for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -91,7 +91,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark if walkerCh == nil { walkerDoneCh = make(chan struct{}) isLeaf := xl.isMultipartUpload - listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...) + listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...) walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh) } // Collect uploads until we have reached maxUploads count to 0. @@ -130,7 +130,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // For the new object entry we get all its pending uploadIDs. nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) var disk StorageAPI - for _, disk = range xl.getLoadBalancedQuorumDisks() { + for _, disk = range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -757,7 +757,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // the object, if yes do not attempt to delete 'uploads.json'. var disk StorageAPI var uploadsJSON uploadsV1 - for _, disk = range xl.getLoadBalancedQuorumDisks() { + for _, disk = range xl.getLoadBalancedDisks() { if disk == nil { continue } @@ -811,7 +811,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e // the object, if yes do not attempt to delete 'uploads.json'. var disk StorageAPI var uploadsJSON uploadsV1 - for _, disk = range xl.getLoadBalancedQuorumDisks() { + for _, disk = range xl.getLoadBalancedDisks() { if disk == nil { continue }