From cef26fd6eac29cbaa436eabb91178a679429068c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 19 Jul 2016 19:24:32 -0700 Subject: [PATCH] XL: Refactor usage of reduceErrs and consistent behavior. (#2240) This refactor is also needed in lieu of our quorum requirement change for the newly understood logic behind klauspost/reedsolom implementation. --- object-api-multipart_test.go | 7 +++- xl-v1-common.go | 2 +- xl-v1-metadata.go | 67 ++++++--------------------------- xl-v1-multipart-common.go | 35 ++++------------- xl-v1-multipart.go | 19 +++++----- xl-v1-object.go | 73 +++++++++++------------------------- xl-v1-utils.go | 7 +++- xl-v1-utils_test.go | 37 +++++++++++++----- xl-v1.go | 10 ++--- xl-v1_test.go | 2 +- 10 files changed, 95 insertions(+), 164 deletions(-) diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index 671f8999a..066c416dc 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -129,7 +129,7 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [ } // Remove some random disk. - for _, disk := range disks[:7] { + for _, disk := range disks[:6] { removeAll(disk) } @@ -165,7 +165,10 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [ } // This causes quorum failure verify. - removeAll(disks[len(disks)-1]) + disks = disks[len(disks)-3:] + for _, disk := range disks { + removeAll(disk) + } // Object part upload should fail with quorum not available. testCase := createPartCases[len(createPartCases)-1] diff --git a/xl-v1-common.go b/xl-v1-common.go index c50062527..01185014b 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -25,7 +25,7 @@ import ( // randomized quorum disk slice. func (xl xlObjects) getLoadBalancedQuorumDisks() (disks []StorageAPI) { // It is okay to have readQuorum disks. - return xl.getLoadBalancedDisks()[:xl.readQuorum-1] + return xl.getLoadBalancedDisks()[0 : xl.readQuorum-1] } // getLoadBalancedDisks - fetches load balanced (sufficiently diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 8717e5b3f..814ff2ed5 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -314,7 +314,7 @@ func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error } // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. -func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, writeQuorum, readQuorum int) error { +func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -344,38 +344,17 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] wg.Wait() // Do we have write quorum?. - if !isDiskQuorum(mErrs, writeQuorum) { - // Validate if we have read quorum. - if isDiskQuorum(mErrs, readQuorum) { - // Return success. - return nil - } + if !isDiskQuorum(mErrs, quorum) { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) return errXLWriteQuorum } - // Reduce errors and verify quourm and return. - if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { - if errCount < writeQuorum { - // Delete all `xl.json` successfully renamed. - deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return errXLWriteQuorum - } - if isErrIgnored(reducedErr, []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errVolumeNotFound, - }) { - // Success. - return nil - } - return reducedErr - } - - // Success. - return nil + return reduceErrs(mErrs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }) } // writeSameXLMetadata - write `xl.json` on all disks in order. @@ -410,36 +389,14 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet // Do we have write Quorum?. if !isDiskQuorum(mErrs, writeQuorum) { - // Do we have readQuorum?. - if isDiskQuorum(mErrs, readQuorum) { - // Return success. - return nil - } // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) return errXLWriteQuorum } - // Reduce errors and verify quourm and return. - if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { - if errCount < writeQuorum { - // Delete all `xl.json` successfully renamed. - deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return errXLWriteQuorum - } - // Ignore specific errors if we are under write quorum. - if isErrIgnored(reducedErr, []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errVolumeNotFound, - }) { - // Success. - return nil - } - return reducedErr - } - - // Success. - return nil + return reduceErrs(mErrs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }) } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index f7d951d74..b3b2b9cea 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -270,7 +270,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf } // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. -func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum, readQuorum int) error { +func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum int) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -303,35 +303,16 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuor wg.Wait() // Do we have write Quorum?. - if !isDiskQuorum(mErrs, writeQuorum) { - // Do we have readQuorum?. - if isDiskQuorum(mErrs, readQuorum) { - // Return success. - return nil - } + if !isDiskQuorum(mErrs, quorum) { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs) return errXLWriteQuorum } - // Reduce errors and verify quourm and return. - if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { - if errCount < writeQuorum { - // Delete all `xl.json` successfully renamed. - deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs) - return errXLWriteQuorum - } - if isErrIgnored(reducedErr, []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errVolumeNotFound, - }) { - return nil - } - return reducedErr - } - - // Success. - return nil + return reduceErrs(mErrs, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errVolumeNotFound, + }) } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 0d8493fb0..0c27ad315 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -285,7 +285,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } - rErr := renameObject(xl.storageDisks, minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum, xl.readQuorum) + rErr := renameObject(xl.storageDisks, minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum) if rErr == nil { // Return success. return uploadID, nil @@ -431,7 +431,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) - err = renamePart(onlineDisks, minioMetaBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum, xl.readQuorum) + err = renamePart(onlineDisks, minioMetaBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum) if err != nil { return "", toObjectErr(err, minioMetaBucket, partPath) } @@ -466,12 +466,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s newUUID := getUUID() tempXLMetaPath := path.Join(tmpMetaPrefix, newUUID) - // Writes a unique `xl.json` each disk carrying new checksum - // related information. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { + // Writes a unique `xl.json` each disk carrying new checksum related information. + if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath) } - rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum, xl.readQuorum) + rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } @@ -696,10 +695,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { + if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } - rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum, xl.readQuorum) + rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } @@ -722,7 +721,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Rename if an object already exists to temporary location. uniqueID := getUUID() if xl.isObject(bucket, object) { - err = renameObject(xl.storageDisks, bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID), xl.writeQuorum, xl.readQuorum) + err = renameObject(xl.storageDisks, bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID), xl.writeQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -742,7 +741,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Rename the multipart object to final location. - if err = renameObject(xl.storageDisks, minioMetaBucket, uploadIDPath, bucket, object, xl.writeQuorum, xl.readQuorum); err != nil { + if err = renameObject(xl.storageDisks, minioMetaBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil { return "", toObjectErr(err, bucket, object) } diff --git a/xl-v1-object.go b/xl-v1-object.go index ae0932516..3554cde93 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -67,12 +67,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return toObjectErr(errXLReadQuorum, bucket, object) } - // If all the disks returned error, we return error. - if errCount, err := reduceErrs(errs); err != nil { - if errCount < xl.readQuorum { - return toObjectErr(errXLReadQuorum, bucket, object) - } - return toObjectErr(err, bucket, object) + if reducedErr := reduceErrs(errs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }); reducedErr != nil { + return toObjectErr(reducedErr, bucket, object) } // List all online disks. @@ -237,7 +237,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er return objInfo, nil } -func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error, writeQuorum, readQuorum int) { +func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) { var wg = &sync.WaitGroup{} // Undo rename object on disks where RenameFile succeeded. @@ -264,21 +264,9 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str wg.Wait() } -// undoRenameObject - renames back the partially successful rename operations. -func undoRenameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, errs []error, writeQuorum, readQuorum int) { - isPart := false - undoRename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, errs, writeQuorum, readQuorum) -} - -// undoRenamePart - renames back the partially successful rename operation. -func undoRenamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, errs []error, writeQuorum, readQuorum int) { - isPart := true - undoRename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, errs, writeQuorum, readQuorum) -} - // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, writeQuorum, readQuorum int) error { +func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, quorum int) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -311,52 +299,35 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. Cleanup successful renames. - if !isDiskQuorum(errs, writeQuorum) { - // Check we have successful read quorum. - if isDiskQuorum(errs, readQuorum) { - return nil // Return success. - } // else - failed to acquire read quorum. + if !isDiskQuorum(errs, quorum) { // Undo all the partial rename operations. - undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) + undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) return errXLWriteQuorum } // Return on first error, also undo any partially successful rename operations. - if errCount, reducedErr := reduceErrs(errs); reducedErr != nil { - if errCount < writeQuorum { - // Undo all the partial rename operations. - undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) - return errXLWriteQuorum - } - if isErrIgnored(reducedErr, []error{ - errDiskNotFound, - errDiskAccessDenied, - errFaultyDisk, - errVolumeNotFound, - }) { - // Return success. - return nil - } - return reducedErr - } - return nil + return reduceErrs(errs, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + }) } // renamePart - renames a part of the source object to the destination // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, writeQuorum, readQuorum int) error { +func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) error { isPart := true - return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, writeQuorum, readQuorum) + return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, quorum) } // renameObject - renames all source objects to destination object // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, writeQuorum, readQuorum int) error { +func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) error { isPart := false - return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, writeQuorum, readQuorum) + return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, quorum) } // PutObject - creates an object upon reading from the input stream @@ -508,7 +479,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Rename if an object already exists to temporary location. newUniqueID := getUUID() if xl.isObject(bucket, object) { - err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum, xl.readQuorum) + err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -530,12 +501,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { + if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { return "", toObjectErr(err, bucket, object) } // Rename the successfully written temporary object to final location. - err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum) + err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index 2e38dd2de..98ecefa9e 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -30,9 +30,12 @@ import ( // value is greater than or equal to simple majority, since none of the equally // maximal values would occur quorum or more number of times. -func reduceErrs(errs []error) (int, error) { +func reduceErrs(errs []error, ignoredErrs []error) error { errorCounts := make(map[error]int) for _, err := range errs { + if isErrIgnored(err, ignoredErrs) { + continue + } errorCounts[err]++ } max := 0 @@ -43,7 +46,7 @@ func reduceErrs(errs []error) (int, error) { errMax = err } } - return max, errMax + return errMax } // Validates if we have quorum based on the errors related to disk only. diff --git a/xl-v1-utils_test.go b/xl-v1-utils_test.go index beb3097e7..c82eae058 100644 --- a/xl-v1-utils_test.go +++ b/xl-v1-utils_test.go @@ -20,23 +20,40 @@ import "testing" // Test for reduceErrs. func TestReduceErrs(t *testing.T) { + // List all of all test cases to validate various cases of reduce errors. testCases := []struct { - errs []error - err error - count int + errs []error + ignoredErrs []error + err error }{ - {[]error{errDiskNotFound, errDiskNotFound, errDiskFull}, errDiskNotFound, 2}, - {[]error{errDiskFull, errDiskNotFound, nil, nil}, nil, 2}, - {[]error{}, nil, 0}, + // Validate if have reduced properly. + {[]error{ + errDiskNotFound, + errDiskNotFound, + errDiskFull, + }, []error{}, errDiskNotFound}, + // Validate if have no consensus. + {[]error{ + errDiskFull, + errDiskNotFound, + nil, nil, + }, []error{}, nil}, + // Validate if have consensus and errors ignored. + {[]error{ + errVolumeNotFound, + errVolumeNotFound, + errVolumeNotFound, + errDiskNotFound, + errDiskNotFound, + }, []error{errDiskNotFound}, errVolumeNotFound}, + {[]error{}, []error{}, nil}, } + // Validates list of all the testcases for returning valid errors. for i, testCase := range testCases { - gotMax, gotErr := reduceErrs(testCase.errs) + gotErr := reduceErrs(testCase.errs, testCase.ignoredErrs) if testCase.err != gotErr { t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr) } - if testCase.count != gotMax { - t.Errorf("Test %d : expected %d, got %d", i+1, testCase.count, gotMax) - } } } diff --git a/xl-v1.go b/xl-v1.go index 74460babd..fb86ce232 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -45,7 +45,7 @@ const ( maxErasureBlocks = 16 // Minimum erasure blocks. - minErasureBlocks = 6 + minErasureBlocks = 4 ) // xlObjects - Implements XL object layer. @@ -84,7 +84,7 @@ func checkSufficientDisks(disks []string) error { } // Verify if we have even number of disks. - // only combination of 6, 8, 10, 12, 14, 16 are supported. + // only combination of 4, 6, 8, 10, 12, 14, 16 are supported. if !isEven(totalDisks) { return errXLNumDisks } @@ -194,9 +194,9 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) { } // Figure out read and write quorum based on number of storage disks. - // READ and WRITE quorum is always set to (N/2 + 1) number of disks. - xl.readQuorum = len(xl.storageDisks)/2 + 1 - xl.writeQuorum = len(xl.storageDisks)/2 + 1 + // READ and WRITE quorum is always set to (N/2) number of disks. + xl.readQuorum = len(xl.storageDisks) / 2 + xl.writeQuorum = len(xl.storageDisks) / 2 // Return successfully initialized object layer. return xl, nil diff --git a/xl-v1_test.go b/xl-v1_test.go index 940e85b30..a781be780 100644 --- a/xl-v1_test.go +++ b/xl-v1_test.go @@ -73,7 +73,7 @@ func TestCheckSufficientDisks(t *testing.T) { }, // Lesser than minimum number of disks < 6. { - disks[0:5], + disks[0:3], errXLMinDisks, }, // Odd number of disks, not divisible by '2'.