diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index eeadef267..62bd21715 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -141,5 +141,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash if !isDiskQuorum(wErrs, writeQuorum) { return traceError(errXLWriteQuorum) } - return nil + return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum) } diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index baa90a199..3c0fb8c44 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -80,7 +80,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { } // Verify we have any other errors which should undo make bucket. - if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { + if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket) } return nil @@ -289,7 +289,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return toObjectErr(traceError(errXLWriteQuorum), bucket) } - if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { + if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket) } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 9bd74e856..1dd3b8f9f 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -74,7 +74,7 @@ func (xl xlObjects) HealBucket(bucket string) error { } // Proceed to heal bucket metadata. - return healBucketMetadata(xl.storageDisks, bucket) + return healBucketMetadata(xl.storageDisks, bucket, xl.readQuorum) } // Heal bucket - create buckets on disks where it does not exist. @@ -122,7 +122,7 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error } // Verify we have any other errors which should be returned as failure. - if reducedErr := reduceErrs(dErrs, bucketOpIgnoredErrs); reducedErr != nil { + if reducedErr := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket) } return nil @@ -130,13 +130,13 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error // Heals all the metadata associated for a given bucket, this function // heals `policy.json`, `notification.xml` and `listeners.json`. -func healBucketMetadata(storageDisks []StorageAPI, bucket string) error { +func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error { healBucketMetaFn := func(metaPath string) error { metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath) metaLock.RLock() defer metaLock.RUnlock() // Heals the given file at metaPath. - if err := healObject(storageDisks, minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) { + if err := healObject(storageDisks, minioMetaBucket, metaPath, readQuorum); err != nil && !isErrObjectNotFound(err) { return err } // Success. return nil @@ -200,7 +200,7 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{} // TODO :- // - add support for healing dangling `uploads.json`. // - add support for healing dangling `xl.json`. -func quickHeal(storageDisks []StorageAPI, writeQuorum int) error { +func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error { // List all bucket names from all disks. bucketNames, err := listBucketNames(storageDisks) if err != nil { @@ -210,7 +210,7 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int) error { for bucketName := range bucketNames { // Heal bucket and then proceed to heal bucket metadata. if err = healBucket(storageDisks, bucketName, writeQuorum); err == nil { - if err = healBucketMetadata(storageDisks, bucketName); err == nil { + if err = healBucketMetadata(storageDisks, bucketName, readQuorum); err == nil { continue } return err @@ -221,10 +221,10 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int) error { } // Heals an object only the corrupted/missing erasure blocks. -func healObject(storageDisks []StorageAPI, bucket string, object string) error { +func healObject(storageDisks []StorageAPI, bucket string, object string, quorum int) error { partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object) - if err := reduceErrs(errs, nil); err != nil { - return toObjectErr(err, bucket, object) + if reducedErr := reduceReadQuorumErrs(errs, nil, quorum); reducedErr != nil { + return toObjectErr(reducedErr, bucket, object) } if !xlShouldHeal(partsMetadata, errs) { @@ -363,5 +363,5 @@ func (xl xlObjects) HealObject(bucket, object string) error { defer objectLock.RUnlock() // Heal the object. - return healObject(xl.storageDisks, bucket, object) + return healObject(xl.storageDisks, bucket, object, xl.readQuorum) } diff --git a/cmd/xl-v1-healing_test.go b/cmd/xl-v1-healing_test.go index d1b0f347e..ed3bd5f69 100644 --- a/cmd/xl-v1-healing_test.go +++ b/cmd/xl-v1-healing_test.go @@ -353,7 +353,7 @@ func TestQuickHeal(t *testing.T) { } // Heal the missing buckets. - if err = quickHeal(xl.storageDisks, xl.writeQuorum); err != nil { + if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { t.Fatal(err) } @@ -370,7 +370,7 @@ func TestQuickHeal(t *testing.T) { t.Fatal("storage disk is not *posix type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errUnformattedDisk) - if err = quickHeal(xl.storageDisks, xl.writeQuorum); err != errUnformattedDisk { + if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != errUnformattedDisk { t.Fatal(err) } @@ -392,7 +392,7 @@ func TestQuickHeal(t *testing.T) { } xl = obj.(*xlObjects) xl.storageDisks[0] = nil - if err = quickHeal(xl.storageDisks, xl.writeQuorum); err != nil { + if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { t.Fatal("Got an unexpected error: ", err) } @@ -419,7 +419,7 @@ func TestQuickHeal(t *testing.T) { t.Fatal("storage disk is not *posix type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) - if err = quickHeal(xl.storageDisks, xl.writeQuorum); err != nil { + if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { t.Fatal("Got an unexpected error: ", err) } } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 5698b325d..970e1719a 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -335,8 +335,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] deleteAllXLMetadata(disks, bucket, prefix, mErrs) return traceError(errXLWriteQuorum) } - - return reduceErrs(mErrs, objectOpIgnoredErrs) + return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum) } // writeSameXLMetadata - write `xl.json` on all disks in order. @@ -375,6 +374,5 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet deleteAllXLMetadata(disks, bucket, prefix, mErrs) return traceError(errXLWriteQuorum) } - - return reduceErrs(mErrs, objectOpIgnoredErrs) + return reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, writeQuorum) } diff --git a/cmd/xl-v1-multipart-common.go b/cmd/xl-v1-multipart-common.go index c9f47598f..2f731ca41 100644 --- a/cmd/xl-v1-multipart-common.go +++ b/cmd/xl-v1-multipart-common.go @@ -140,7 +140,10 @@ func (xl xlObjects) updateUploadJSON(bucket, object string, uCh uploadIDChange) } wg.Wait() - return reduceErrs(errs, objectOpIgnoredErrs) + if reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { + return reducedErr + } + return nil } // Returns if the prefix is a multipart upload. @@ -251,5 +254,8 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in return traceError(errXLWriteQuorum) } - return reduceErrs(mErrs, objectOpIgnoredErrs) + if reducedErr := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum); reducedErr != nil { + return reducedErr + } + return nil } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index f38f1ef2b..6a11336ea 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -79,7 +79,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return traceError(InsufficientReadQuorum{}, errs...) } - if reducedErr := reduceErrs(errs, objectOpIgnoredErrs); reducedErr != nil { + if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket, object) } @@ -339,8 +339,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) return traceError(errXLWriteQuorum) } - // Return on first error, also undo any partially successful rename operations. - return reduceErrs(errs, objectOpIgnoredErrs) + return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum) } // renamePart - renames a part of the source object to the destination diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 3b6305fb6..6a88fabc2 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -31,8 +31,7 @@ import ( // golang's map orders keys. This doesn't affect correctness as long as quorum // value is greater than or equal to simple majority, since none of the equally // maximal values would occur quorum or more number of times. - -func reduceErrs(errs []error, ignoredErrs []error) error { +func reduceErrs(errs []error, ignoredErrs []error) (maxCount int, maxErr error) { errorCounts := make(map[error]int) errs = errorsCause(errs) for _, err := range errs { @@ -42,14 +41,45 @@ func reduceErrs(errs []error, ignoredErrs []error) error { errorCounts[err]++ } max := 0 - var errMax error for err, count := range errorCounts { if max < count { max = count - errMax = err + maxErr = err } } - return traceError(errMax, errs...) + return max, maxErr +} + +// reduceQuorumErrs behaves like reduceErrs by only for returning +// values of maximally occurring errors validated against a generic +// quorum number can be read or write quorum depending on usage. +// additionally a special error is provided as well to be returned +// in case quorum is not satisfied. +func reduceQuorumErrs(errs []error, ignoredErrs []error, quorum int, quorumErr error) (maxErr error) { + maxCount, maxErr := reduceErrs(errs, ignoredErrs) + if maxErr == nil && maxCount >= quorum { + // Success in quorum. + return nil + } + if maxErr != nil && maxCount >= quorum { + // Errors in quorum. + return traceError(maxErr, errs...) + } + // No quorum satisfied. + maxErr = traceError(quorumErr, errs...) + return +} + +// reduceReadQuorumErrs behaves like reduceErrs but only for returning +// values of maximally occurring errors validated against readQuorum. +func reduceReadQuorumErrs(errs []error, ignoredErrs []error, readQuorum int) (maxErr error) { + return reduceQuorumErrs(errs, ignoredErrs, readQuorum, errXLReadQuorum) +} + +// reduceWriteQuorumErrs behaves like reduceErrs but only for returning +// values of maximally occurring errors validated against writeQuorum. +func reduceWriteQuorumErrs(errs []error, ignoredErrs []error, writeQuorum int) (maxErr error) { + return reduceQuorumErrs(errs, ignoredErrs, writeQuorum, errXLWriteQuorum) } // List of all errors which are ignored while verifying quorum. diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index 34563c564..d66a85c63 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -63,29 +63,35 @@ func TestReduceErrs(t *testing.T) { errDiskNotFound, errDiskNotFound, errDiskFull, - }, []error{}, errDiskNotFound}, + }, []error{}, errXLReadQuorum}, // Validate if have no consensus. {[]error{ errDiskFull, errDiskNotFound, nil, nil, - }, []error{}, nil}, + }, []error{}, errXLReadQuorum}, // Validate if have consensus and errors ignored. {[]error{ + errVolumeNotFound, + errVolumeNotFound, errVolumeNotFound, errVolumeNotFound, errVolumeNotFound, errDiskNotFound, errDiskNotFound, }, []error{errDiskNotFound}, errVolumeNotFound}, - {[]error{}, []error{}, nil}, + {[]error{}, []error{}, errXLReadQuorum}, } // Validates list of all the testcases for returning valid errors. for i, testCase := range testCases { - gotErr := reduceErrs(testCase.errs, testCase.ignoredErrs) + gotErr := reduceReadQuorumErrs(testCase.errs, testCase.ignoredErrs, 5) if errorCause(gotErr) != testCase.err { t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr) } + gotNewErr := reduceWriteQuorumErrs(testCase.errs, testCase.ignoredErrs, 6) + if errorCause(gotNewErr) != errXLWriteQuorum { + t.Errorf("Test %d : expected %s, got %s", i+1, errXLWriteQuorum, gotErr) + } } } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 87d78e3b5..47600b983 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -124,7 +124,7 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { xl.writeQuorum = writeQuorum // Do a quick heal on the buckets themselves for any discrepancies. - if err := quickHeal(xl.storageDisks, xl.writeQuorum); err != nil { + if err := quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { return xl, err }