xl: Handle read quorum for StatVol, ListVols

This commit is contained in:
Harshavardhana 2016-04-21 12:57:14 -07:00 committed by Harshavardhana
parent 91588209fa
commit f3784d1087
4 changed files with 102 additions and 32 deletions

View File

@ -348,7 +348,13 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
return
}
errorIf(err.Trace(), "ListBuckets failed.", nil)
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
switch err.ToGoError().(type) {
case StorageInsufficientReadResources:
writeErrorResponse(w, r, ErrInsufficientReadResources, r.URL.Path)
default:
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
}
}
// DeleteMultipleObjectsHandler - deletes multiple objects.
@ -628,6 +634,8 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
case StorageInsufficientReadResources:
writeErrorResponse(w, r, ErrInsufficientReadResources, r.URL.Path)
default:
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
}

36
fs.go
View File

@ -132,25 +132,24 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
return nil
}
// removeDuplicateVols - remove duplicate volumes.
func removeDuplicateVols(vols []VolInfo) []VolInfo {
length := len(vols) - 1
for i := 0; i < length; i++ {
for j := i + 1; j <= length; j++ {
if vols[i].Name == vols[j].Name {
// Pick the latest volume, if there is a duplicate.
if vols[i].Created.Sub(vols[j].Created) > 0 {
vols[i] = vols[length]
} else {
vols[j] = vols[length]
}
vols = vols[0:length]
length--
j--
}
func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
// Use map to record duplicates as we find them.
result := []VolInfo{}
m := make(map[string]VolInfo)
for _, v := range volsInfo {
if _, found := m[v.Name]; !found {
m[v.Name] = v
}
}
return vols
result = make([]VolInfo, 0, len(m))
for _, v := range m {
result = append(result, v)
}
// Return the new slice.
return result
}
// gets all the unique directories from diskPath.
@ -184,8 +183,7 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
Created: fi.ModTime(),
})
}
volsInfo = removeDuplicateVols(volsInfo)
return volsInfo, nil
return removeDuplicateVols(volsInfo), nil
}
// getVolumeDir - will convert incoming volume names to

View File

@ -22,6 +22,7 @@ import (
"errors"
"io"
"path/filepath"
"sort"
"strings"
"github.com/minio/minio/pkg/mimedb"
@ -78,6 +79,13 @@ func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) {
}, nil
}
// byBucketName is a collection satisfying sort.Interface.
type byBucketName []BucketInfo
func (d byBucketName) Len() int { return len(d) }
func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name }
// ListBuckets - list buckets.
func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) {
var bucketInfos []BucketInfo
@ -98,6 +106,7 @@ func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) {
Free: vol.Free,
})
}
sort.Sort(byBucketName(bucketInfos))
return bucketInfos, nil
}

View File

@ -172,6 +172,10 @@ func (xl XL) MakeVol(volume string) error {
// Make a volume entry on all underlying storage disks.
for _, disk := range xl.storageDisks {
if err := disk.MakeVol(volume); err != nil {
// We ignore error if errVolumeExists and creating a volume again.
if err == errVolumeExists {
continue
}
return err
}
}
@ -185,6 +189,10 @@ func (xl XL) DeleteVol(volume string) error {
}
for _, disk := range xl.storageDisks {
if err := disk.DeleteVol(volume); err != nil {
// We ignore error if errVolumeNotFound.
if err == errVolumeNotFound {
continue
}
return err
}
}
@ -193,13 +201,40 @@ func (xl XL) DeleteVol(volume string) error {
// ListVols - list volumes.
func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
// Pick the first node and list there always.
disk := xl.storageDisks[0]
volsInfo, err = disk.ListVols()
if err == nil {
return volsInfo, nil
emptyCount := 0
// Success vols map carries successful results of ListVols from
// each disks.
var successVolsMap = make(map[int][]VolInfo)
for index, disk := range xl.storageDisks {
var vlsInfo []VolInfo
vlsInfo, err = disk.ListVols()
if err == nil {
if len(vlsInfo) == 0 {
emptyCount++
} else {
successVolsMap[index] = vlsInfo
}
}
}
return nil, err
// If all list operations resulted in an empty count which is same
// as your total storage disks, then it is a valid case return
// success with empty vols.
if emptyCount == len(xl.storageDisks) {
return []VolInfo{}, nil
} else if len(successVolsMap) < xl.readQuorum {
// If there is data and not empty, then we attempt quorum verification.
// Verify if we have enough quorum to list vols.
return nil, errReadQuorum
}
// Loop through success vols map and return the first value.
for index := range xl.storageDisks {
if _, ok := successVolsMap[index]; ok {
volsInfo = successVolsMap[index]
break
}
}
return volsInfo, nil
}
// StatVol - get volume stat info.
@ -207,13 +242,33 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
if !isValidVolname(volume) {
return VolInfo{}, errInvalidArgument
}
// Pick the first node and list there always.
disk := xl.storageDisks[0]
volInfo, err = disk.StatVol(volume)
if err == nil {
return volInfo, nil
var statVols []VolInfo
volumeNotFoundErrCnt := 0
for _, disk := range xl.storageDisks {
volInfo, err = disk.StatVol(volume)
if err == nil {
// Collect all the successful attempts to verify quorum
// subsequently.
statVols = append(statVols, volInfo)
} else if err == errVolumeNotFound {
// Count total amount of volume not found errors.
volumeNotFoundErrCnt++
}
}
return VolInfo{}, err
// If volume not found err count is same as total storage disks, we
// really don't have the bucket, report a valid error.
if volumeNotFoundErrCnt == len(xl.storageDisks) {
return VolInfo{}, errVolumeNotFound
} else if len(statVols) < xl.readQuorum {
// If one of the disks have bucket we need to validate if we
// have read quorum, if not fail.
return VolInfo{}, errReadQuorum
}
// If successful remove all the duplicates and keep the latest one.
volInfo = removeDuplicateVols(statVols)[0]
return volInfo, nil
}
// isLeafDirectory - check if a given path is leaf directory. i.e