mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
XL: Do not rely on getLoadBalancedQuorumDisks for NS consistency. (#2243)
The reason is any function relying on `getLoadBalancedQuorumDisks` cannot possibly have an idempotent behavior. The problem comes from given a set of N disks returning just a shuffled N/2 disks. In case of a scenario where we have N/2 number of failed disks, the returned value of `getLoadBalancedQuorumDisks` is not equal to the same failed disks so essentially calls using such disks might succeed or fail randomly at different intervals in time. This proposal change is we move to `getLoadBalancedDisks()` and use the shuffled N disks as a whole. Since most of the time we might hit a good disk since we are not reducing our solution space. This also provides consistent behavior for all the functions which rely on shuffled disks. Fixes #2242
This commit is contained in:
parent
41f4f2806d
commit
a0635dcdd9
@ -30,6 +30,10 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
|||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return BucketNameInvalid{Bucket: bucket}
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
}
|
}
|
||||||
|
// Verify if bucket is found.
|
||||||
|
if xl.isBucketExist(bucket) {
|
||||||
|
return toObjectErr(errVolumeExists, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
nsMutex.Lock(bucket, "")
|
nsMutex.Lock(bucket, "")
|
||||||
defer nsMutex.Unlock(bucket, "")
|
defer nsMutex.Unlock(bucket, "")
|
||||||
@ -68,20 +72,36 @@ func (xl xlObjects) MakeBucket(bucket string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify we have any other errors which should undo make bucket.
|
// Verify we have any other errors which should undo make bucket.
|
||||||
for _, err := range dErrs {
|
if reducedErr := reduceErrs(dErrs, []error{
|
||||||
// Bucket already exists, return BucketExists error.
|
errDiskNotFound,
|
||||||
if err == errVolumeExists {
|
errFaultyDisk,
|
||||||
return toObjectErr(errVolumeExists, bucket)
|
errDiskAccessDenied,
|
||||||
}
|
}); reducedErr != nil {
|
||||||
// Undo make bucket for any other errors.
|
return toObjectErr(reducedErr, bucket)
|
||||||
if err != nil && err != errDiskNotFound {
|
|
||||||
xl.undoMakeBucket(bucket)
|
|
||||||
return toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (xl xlObjects) undoDeleteBucket(bucket string) {
|
||||||
|
// Initialize sync waitgroup.
|
||||||
|
var wg = &sync.WaitGroup{}
|
||||||
|
// Undo previous make bucket entry on all underlying storage disks.
|
||||||
|
for index, disk := range xl.storageDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
// Delete a bucket inside a go-routine.
|
||||||
|
go func(index int, disk StorageAPI) {
|
||||||
|
defer wg.Done()
|
||||||
|
_ = disk.MakeVol(bucket)
|
||||||
|
}(index, disk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all make vol to finish.
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// undo make bucket operation upon quorum failure.
|
// undo make bucket operation upon quorum failure.
|
||||||
func (xl xlObjects) undoMakeBucket(bucket string) {
|
func (xl xlObjects) undoMakeBucket(bucket string) {
|
||||||
// Initialize sync waitgroup.
|
// Initialize sync waitgroup.
|
||||||
@ -113,7 +133,7 @@ var bucketMetadataOpIgnoredErrs = []error{
|
|||||||
|
|
||||||
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
|
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
|
||||||
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
|
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -169,7 +189,7 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
|
|||||||
|
|
||||||
// listBuckets - returns list of all buckets from a disk picked at random.
|
// listBuckets - returns list of all buckets from a disk picked at random.
|
||||||
func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
|
func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -220,13 +240,15 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
|
|||||||
if !IsValidBucketName(bucket) {
|
if !IsValidBucketName(bucket) {
|
||||||
return BucketNameInvalid{Bucket: bucket}
|
return BucketNameInvalid{Bucket: bucket}
|
||||||
}
|
}
|
||||||
|
// Verify if bucket is found.
|
||||||
|
if !xl.isBucketExist(bucket) {
|
||||||
|
return BucketNotFound{Bucket: bucket}
|
||||||
|
}
|
||||||
|
|
||||||
nsMutex.Lock(bucket, "")
|
nsMutex.Lock(bucket, "")
|
||||||
defer nsMutex.Unlock(bucket, "")
|
defer nsMutex.Unlock(bucket, "")
|
||||||
|
|
||||||
// Collect if all disks report volume not found.
|
// Collect if all disks report volume not found.
|
||||||
var volumeNotFoundErrCnt int
|
|
||||||
|
|
||||||
var wg = &sync.WaitGroup{}
|
var wg = &sync.WaitGroup{}
|
||||||
var dErrs = make([]error, len(xl.storageDisks))
|
var dErrs = make([]error, len(xl.storageDisks))
|
||||||
|
|
||||||
@ -257,21 +279,17 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
|
|||||||
// Wait for all the delete vols to finish.
|
// Wait for all the delete vols to finish.
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Count the errors for known errors, return quickly if we found an unknown error.
|
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
||||||
for _, err := range dErrs {
|
xl.undoDeleteBucket(bucket)
|
||||||
if err != nil {
|
return toObjectErr(errXLWriteQuorum, bucket)
|
||||||
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
|
|
||||||
volumeNotFoundErrCnt++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return toObjectErr(err, bucket)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return errVolumeNotFound if all disks report volume not found.
|
if reducedErr := reduceErrs(dErrs, []error{
|
||||||
if volumeNotFoundErrCnt == len(xl.storageDisks) {
|
errFaultyDisk,
|
||||||
return toObjectErr(errVolumeNotFound, bucket)
|
errDiskNotFound,
|
||||||
|
errDiskAccessDenied,
|
||||||
|
}); reducedErr != nil {
|
||||||
|
return toObjectErr(reducedErr, bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -21,15 +21,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getLoadBalancedQuorumDisks - fetches load balanced sufficiently
|
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
|
||||||
// randomized quorum disk slice.
|
|
||||||
func (xl xlObjects) getLoadBalancedQuorumDisks() (disks []StorageAPI) {
|
|
||||||
// It is okay to have readQuorum disks.
|
|
||||||
return xl.getLoadBalancedDisks()[0 : xl.readQuorum-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// getLoadBalancedDisks - fetches load balanced (sufficiently
|
|
||||||
// randomized) disk slice.
|
|
||||||
func (xl xlObjects) getLoadBalancedDisks() (disks []StorageAPI) {
|
func (xl xlObjects) getLoadBalancedDisks() (disks []StorageAPI) {
|
||||||
// Based on the random shuffling return back randomized disks.
|
// Based on the random shuffling return back randomized disks.
|
||||||
for _, i := range hashOrder(time.Now().UTC().String(), len(xl.storageDisks)) {
|
for _, i := range hashOrder(time.Now().UTC().String(), len(xl.storageDisks)) {
|
||||||
@ -60,7 +52,7 @@ func (xl xlObjects) parentDirIsObject(bucket, parent string) bool {
|
|||||||
// isObject - returns `true` if the prefix is an object i.e if
|
// isObject - returns `true` if the prefix is an object i.e if
|
||||||
// `xl.json` exists at the leaf, false otherwise.
|
// `xl.json` exists at the leaf, false otherwise.
|
||||||
func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
|
func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
|
|||||||
if walkResultCh == nil {
|
if walkResultCh == nil {
|
||||||
endWalkCh = make(chan struct{})
|
endWalkCh = make(chan struct{})
|
||||||
isLeaf := xl.isObject
|
isLeaf := xl.isObject
|
||||||
listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...)
|
listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...)
|
||||||
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
|
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +230,7 @@ var objMetadataOpIgnoredErrs = []error{
|
|||||||
// readXLMetadata - returns the object metadata `xl.json` content from
|
// readXLMetadata - returns the object metadata `xl.json` content from
|
||||||
// one of the disks picked at random.
|
// one of the disks picked at random.
|
||||||
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
|
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
|
|||||||
|
|
||||||
// Returns if the prefix is a multipart upload.
|
// Returns if the prefix is a multipart upload.
|
||||||
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -200,7 +200,7 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
|||||||
|
|
||||||
// listUploadsInfo - list all uploads info.
|
// listUploadsInfo - list all uploads info.
|
||||||
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
|
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -252,7 +252,7 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string)
|
|||||||
// statPart - returns fileInfo structure for a successful stat on part file.
|
// statPart - returns fileInfo structure for a successful stat on part file.
|
||||||
func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) {
|
func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) {
|
||||||
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName)
|
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName)
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||||||
// uploadIDMarker first.
|
// uploadIDMarker first.
|
||||||
if uploadIDMarker != "" {
|
if uploadIDMarker != "" {
|
||||||
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
|
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker))
|
||||||
for _, disk := range xl.getLoadBalancedQuorumDisks() {
|
for _, disk := range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -91,7 +91,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||||||
if walkerCh == nil {
|
if walkerCh == nil {
|
||||||
walkerDoneCh = make(chan struct{})
|
walkerDoneCh = make(chan struct{})
|
||||||
isLeaf := xl.isMultipartUpload
|
isLeaf := xl.isMultipartUpload
|
||||||
listDir := listDirFactory(isLeaf, xl.getLoadBalancedQuorumDisks()...)
|
listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...)
|
||||||
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
|
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
|
||||||
}
|
}
|
||||||
// Collect uploads until we have reached maxUploads count to 0.
|
// Collect uploads until we have reached maxUploads count to 0.
|
||||||
@ -130,7 +130,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
|
|||||||
// For the new object entry we get all its pending uploadIDs.
|
// For the new object entry we get all its pending uploadIDs.
|
||||||
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
|
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry))
|
||||||
var disk StorageAPI
|
var disk StorageAPI
|
||||||
for _, disk = range xl.getLoadBalancedQuorumDisks() {
|
for _, disk = range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -757,7 +757,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
|||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
// the object, if yes do not attempt to delete 'uploads.json'.
|
||||||
var disk StorageAPI
|
var disk StorageAPI
|
||||||
var uploadsJSON uploadsV1
|
var uploadsJSON uploadsV1
|
||||||
for _, disk = range xl.getLoadBalancedQuorumDisks() {
|
for _, disk = range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -811,7 +811,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
|
|||||||
// the object, if yes do not attempt to delete 'uploads.json'.
|
// the object, if yes do not attempt to delete 'uploads.json'.
|
||||||
var disk StorageAPI
|
var disk StorageAPI
|
||||||
var uploadsJSON uploadsV1
|
var uploadsJSON uploadsV1
|
||||||
for _, disk = range xl.getLoadBalancedQuorumDisks() {
|
for _, disk = range xl.getLoadBalancedDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user