diff --git a/erasure-createfile.go b/erasure-createfile.go index 874fb242e..78cc581bf 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -160,7 +160,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist wg.Wait() // Do we have write quorum?. - if !isQuorum(wErrs, writeQuorum) { + if !isDiskQuorum(wErrs, writeQuorum) { return toObjectErr(errXLWriteQuorum, volume, path) } return nil diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 148a9508f..6a51d6e3d 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -61,7 +61,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { wg.Wait() // Do we have write quorum?. - if !isQuorum(dErrs, xl.writeQuorum) { + if !isDiskQuorum(dErrs, xl.writeQuorum) { // Purge successfully created buckets if we don't have writeQuorum. xl.undoMakeBucket(bucket) return toObjectErr(errXLWriteQuorum, bucket) diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 846d908f3..57cb72a55 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -322,9 +322,9 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] wg.Wait() // Do we have write quorum?. - if !isQuorum(mErrs, writeQuorum) { + if !isDiskQuorum(mErrs, writeQuorum) { // Validate if we have read quorum. - if isQuorum(mErrs, readQuorum) { + if isDiskQuorum(mErrs, readQuorum) { // Return success. return nil } @@ -375,9 +375,9 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet wg.Wait() // Do we have write Quorum?. - if !isQuorum(mErrs, writeQuorum) { + if !isDiskQuorum(mErrs, writeQuorum) { // Do we have readQuorum?. - if isQuorum(mErrs, readQuorum) { + if isDiskQuorum(mErrs, readQuorum) { // Return success. return nil } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 55a51bab4..f518ece25 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -62,7 +62,7 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads wg.Wait() // Count all the errors and validate if we have write quorum. - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { // Rename `uploads.json` left over back to tmp location. for index, disk := range xl.storageDisks { if disk == nil { @@ -149,7 +149,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t wg.Wait() // Count all the errors and validate if we have write quorum. - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { // Rename `uploads.json` left over back to tmp location. for index, disk := range xl.storageDisks { if disk == nil { @@ -295,9 +295,10 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuor wg.Wait() // Do we have write quorum?. - if !isQuorum(mErrs, writeQuorum) { + if !isDiskQuorum(mErrs, writeQuorum) { return errXLWriteQuorum } + // For all other errors return. for _, err := range mErrs { if err != nil && err != errDiskNotFound { diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index e4539aec9..e3df1b12f 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -347,7 +347,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { nsMutex.RUnlock(minioMetaBucket, uploadIDPath) return "", toObjectErr(errXLWriteQuorum, bucket, object) } @@ -435,7 +435,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Read metadata (again) associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath) - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) } @@ -626,7 +626,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) // Do we have writeQuorum?. - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) } diff --git a/xl-v1-object.go b/xl-v1-object.go index b15c74deb..cdde92308 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -63,7 +63,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have read quorum? - if !isQuorum(errs, xl.readQuorum) { + if !isDiskQuorum(errs, xl.readQuorum) { return toObjectErr(errXLReadQuorum, bucket, object) } @@ -302,9 +302,9 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. Cleanup successful renames. - if !isQuorum(errs, writeQuorum) { + if !isDiskQuorum(errs, writeQuorum) { // Check we have successful read quorum. - if isQuorum(errs, readQuorum) { + if isDiskQuorum(errs, readQuorum) { return nil // Return success. } // else - failed to acquire read quorum. // Undo all the partial rename operations. @@ -378,7 +378,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have write quroum?. - if !isQuorum(errs, xl.writeQuorum) { + if !isDiskQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) } @@ -564,9 +564,9 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Wait for all routines to finish. wg.Wait() - if !isQuorum(dErrs, xl.writeQuorum) { - // Return errXLWriteQuorum if errors were more than - // allowed write quorum. + // Do we have write quorum? + if !isDiskQuorum(dErrs, xl.writeQuorum) { + // Return errXLWriteQuorum if errors were more than allowed write quorum. return errXLWriteQuorum } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index 636e97ff1..a879281f0 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -76,16 +76,18 @@ func reduceErrs(errs []error) error { return errTypes[max].err } -// Validates if we have quorum based on the errors with errDiskNotFound. -func isQuorum(errs []error, minQuorumCount int) bool { - var errCount int +// Validates if we have quorum based on the errors related to disk only. +// Returns 'true' if we have quorum, 'false' if we don't. +func isDiskQuorum(errs []error, minQuorumCount int) bool { + var count int for _, err := range errs { - if err == errDiskNotFound || err == errFaultyDisk || err == errDiskAccessDenied { + switch err { + case errDiskNotFound, errFaultyDisk, errDiskAccessDenied: continue } - errCount++ + count++ } - return errCount >= minQuorumCount + return count >= minQuorumCount } // Similar to 'len(slice)' but returns the actual elements count