From f3784d108777f3a6829653bf9ae50bd1cc885f1e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 21 Apr 2016 12:57:14 -0700 Subject: [PATCH] xl: Handle read quorum for StatVol, ListVols --- bucket-handlers.go | 10 +++++- fs.go | 36 ++++++++++----------- object-api.go | 9 ++++++ xl-v1.go | 79 +++++++++++++++++++++++++++++++++++++++------- 4 files changed, 102 insertions(+), 32 deletions(-) diff --git a/bucket-handlers.go b/bucket-handlers.go index 7344193fe..d0a9fbbc5 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -348,7 +348,13 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R return } errorIf(err.Trace(), "ListBuckets failed.", nil) - writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + switch err.ToGoError().(type) { + case StorageInsufficientReadResources: + writeErrorResponse(w, r, ErrInsufficientReadResources, r.URL.Path) + default: + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + } + } // DeleteMultipleObjectsHandler - deletes multiple objects. @@ -628,6 +634,8 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path) case BucketNameInvalid: writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) + case StorageInsufficientReadResources: + writeErrorResponse(w, r, ErrInsufficientReadResources, r.URL.Path) default: writeErrorResponse(w, r, ErrInternalError, r.URL.Path) } diff --git a/fs.go b/fs.go index d2488ac00..ba6f90bd4 100644 --- a/fs.go +++ b/fs.go @@ -132,25 +132,24 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) { return nil } -// removeDuplicateVols - remove duplicate volumes. -func removeDuplicateVols(vols []VolInfo) []VolInfo { - length := len(vols) - 1 - for i := 0; i < length; i++ { - for j := i + 1; j <= length; j++ { - if vols[i].Name == vols[j].Name { - // Pick the latest volume, if there is a duplicate. - if vols[i].Created.Sub(vols[j].Created) > 0 { - vols[i] = vols[length] - } else { - vols[j] = vols[length] - } - vols = vols[0:length] - length-- - j-- - } +func removeDuplicateVols(volsInfo []VolInfo) []VolInfo { + // Use map to record duplicates as we find them. + result := []VolInfo{} + + m := make(map[string]VolInfo) + for _, v := range volsInfo { + if _, found := m[v.Name]; !found { + m[v.Name] = v } } - return vols + + result = make([]VolInfo, 0, len(m)) + for _, v := range m { + result = append(result, v) + } + + // Return the new slice. + return result } // gets all the unique directories from diskPath. @@ -184,8 +183,7 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) { Created: fi.ModTime(), }) } - volsInfo = removeDuplicateVols(volsInfo) - return volsInfo, nil + return removeDuplicateVols(volsInfo), nil } // getVolumeDir - will convert incoming volume names to diff --git a/object-api.go b/object-api.go index 58914ef5d..1fc40c50a 100644 --- a/object-api.go +++ b/object-api.go @@ -22,6 +22,7 @@ import ( "errors" "io" "path/filepath" + "sort" "strings" "github.com/minio/minio/pkg/mimedb" @@ -78,6 +79,13 @@ func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) { }, nil } +// byBucketName is a collection satisfying sort.Interface. +type byBucketName []BucketInfo + +func (d byBucketName) Len() int { return len(d) } +func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } +func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name } + // ListBuckets - list buckets. func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) { var bucketInfos []BucketInfo @@ -98,6 +106,7 @@ func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) { Free: vol.Free, }) } + sort.Sort(byBucketName(bucketInfos)) return bucketInfos, nil } diff --git a/xl-v1.go b/xl-v1.go index d919f04b1..67ab7a464 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -172,6 +172,10 @@ func (xl XL) MakeVol(volume string) error { // Make a volume entry on all underlying storage disks. for _, disk := range xl.storageDisks { if err := disk.MakeVol(volume); err != nil { + // We ignore error if errVolumeExists and creating a volume again. + if err == errVolumeExists { + continue + } return err } } @@ -185,6 +189,10 @@ func (xl XL) DeleteVol(volume string) error { } for _, disk := range xl.storageDisks { if err := disk.DeleteVol(volume); err != nil { + // We ignore error if errVolumeNotFound. + if err == errVolumeNotFound { + continue + } return err } } @@ -193,13 +201,40 @@ func (xl XL) DeleteVol(volume string) error { // ListVols - list volumes. func (xl XL) ListVols() (volsInfo []VolInfo, err error) { - // Pick the first node and list there always. - disk := xl.storageDisks[0] - volsInfo, err = disk.ListVols() - if err == nil { - return volsInfo, nil + emptyCount := 0 + // Success vols map carries successful results of ListVols from + // each disks. + var successVolsMap = make(map[int][]VolInfo) + for index, disk := range xl.storageDisks { + var vlsInfo []VolInfo + vlsInfo, err = disk.ListVols() + if err == nil { + if len(vlsInfo) == 0 { + emptyCount++ + } else { + successVolsMap[index] = vlsInfo + } + } } - return nil, err + + // If all list operations resulted in an empty count which is same + // as your total storage disks, then it is a valid case return + // success with empty vols. + if emptyCount == len(xl.storageDisks) { + return []VolInfo{}, nil + } else if len(successVolsMap) < xl.readQuorum { + // If there is data and not empty, then we attempt quorum verification. + // Verify if we have enough quorum to list vols. + return nil, errReadQuorum + } + // Loop through success vols map and return the first value. + for index := range xl.storageDisks { + if _, ok := successVolsMap[index]; ok { + volsInfo = successVolsMap[index] + break + } + } + return volsInfo, nil } // StatVol - get volume stat info. @@ -207,13 +242,33 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { if !isValidVolname(volume) { return VolInfo{}, errInvalidArgument } - // Pick the first node and list there always. - disk := xl.storageDisks[0] - volInfo, err = disk.StatVol(volume) - if err == nil { - return volInfo, nil + var statVols []VolInfo + volumeNotFoundErrCnt := 0 + for _, disk := range xl.storageDisks { + volInfo, err = disk.StatVol(volume) + if err == nil { + // Collect all the successful attempts to verify quorum + // subsequently. + statVols = append(statVols, volInfo) + } else if err == errVolumeNotFound { + // Count total amount of volume not found errors. + volumeNotFoundErrCnt++ + } } - return VolInfo{}, err + + // If volume not found err count is same as total storage disks, we + // really don't have the bucket, report a valid error. + if volumeNotFoundErrCnt == len(xl.storageDisks) { + return VolInfo{}, errVolumeNotFound + } else if len(statVols) < xl.readQuorum { + // If one of the disks have bucket we need to validate if we + // have read quorum, if not fail. + return VolInfo{}, errReadQuorum + } + + // If successful remove all the duplicates and keep the latest one. + volInfo = removeDuplicateVols(statVols)[0] + return volInfo, nil } // isLeafDirectory - check if a given path is leaf directory. i.e