xl: StatVol and ListVols should handle cases when disks are missing. (#1703)

Remove unnecessary code, handle cases of missing disk.
This commit is contained in:
Harshavardhana 2016-05-20 16:45:53 -07:00
parent 6015a7a3cd
commit f76d975304

View File

@ -188,8 +188,6 @@ func (xl XL) DeleteVol(volume string) error {
// ListVols - list volumes.
func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
emptyCount := 0
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
@ -202,45 +200,37 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
defer wg.Done()
// Initiate listing.
vlsInfo, lErr := disk.ListVols()
if lErr == nil {
if len(vlsInfo) == 0 {
emptyCount++ // Calculate empty count specially.
} else {
successVols[index] = vlsInfo
}
}
vlsInfo, _ := disk.ListVols()
successVols[index] = vlsInfo
}(index, disk)
}
// For all the list volumes running in parallel to finish.
wg.Wait()
// If all list operations resulted in an empty count which is same
// as your total storage disks, then it is a valid case return
// success with empty vols.
if emptyCount == len(xl.storageDisks) {
return []VolInfo{}, nil
} else if len(successVols) < xl.readQuorum {
// If there is data and not empty, then we attempt quorum verification.
// Verify if we have enough quorum to list vols.
return nil, errReadQuorum
}
var total, free int64
// Loop through success vols and get aggregated usage values.
for _, vlsInfo := range successVols {
if len(vlsInfo) == 0 {
var vlsInfo []VolInfo
var total, free int64
for _, vlsInfo = range successVols {
if len(vlsInfo) <= 1 {
continue
}
free += vlsInfo[0].Free
total += vlsInfo[0].Total
var vlInfo VolInfo
for _, vlInfo = range vlsInfo {
if vlInfo.Name == "" {
continue
}
break
}
free += vlInfo.Free
total += vlInfo.Total
}
// Save the updated usage values back into the vols.
for _, volInfo := range successVols[0] {
volInfo.Free = free
volInfo.Total = total
volsInfo = append(volsInfo, volInfo)
for _, vlInfo := range vlsInfo {
vlInfo.Free = free
vlInfo.Total = total
volsInfo = append(volsInfo, vlInfo)
}
// NOTE: The assumption here is that volumes across all disks in
@ -252,10 +242,10 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
// getAllVolInfo - list bucket volume info from all disks.
// Returns error slice indicating the failed volume stat operations.
func (xl XL) getAllVolInfo(volume string) (volsInfo []VolInfo, errs []error) {
func (xl XL) getAllVolInfo(volume string) ([]VolInfo, []error) {
// Create errs and volInfo slices of storageDisks size.
errs = make([]error, len(xl.storageDisks))
volsInfo = make([]VolInfo, len(xl.storageDisks))
var errs = make([]error, len(xl.storageDisks))
var volsInfo = make([]VolInfo, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
@ -287,7 +277,6 @@ func (xl XL) getAllVolInfo(volume string) (volsInfo []VolInfo, errs []error) {
// - error if any.
func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) {
volsInfo, errs := xl.getAllVolInfo(volume)
volsInfo = removeDuplicateVols(volsInfo)
notFoundCount := 0
for _, err := range errs {
if err == errVolumeNotFound {
@ -327,75 +316,42 @@ func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) {
return volsInfo, heal, nil
}
// healVolume - heals any missing volumes.
func (xl XL) healVolume(volume string) error {
// Lists volume info for all online disks.
volsInfo, heal, err := xl.listAllVolInfo(volume)
if err != nil {
return err
}
if !heal {
return nil
}
// Create volume if missing on online disks.
for index, volInfo := range volsInfo {
if volInfo.Name != "" {
continue
}
// Volinfo name would be an empty string, create it.
if err = xl.storageDisks[index].MakeVol(volume); err != nil {
continue
}
}
return nil
}
// Removes any duplicate vols.
func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
// Use map to record duplicates as we find them.
result := []VolInfo{}
m := make(map[string]VolInfo)
for _, v := range volsInfo {
if _, found := m[v.Name]; !found {
m[v.Name] = v
}
}
result = make([]VolInfo, 0, len(m))
for _, v := range m {
result = append(result, v)
}
// Return the new slice.
return result
}
// StatVol - get volume stat info.
func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
if !isValidVolname(volume) {
return VolInfo{}, errInvalidArgument
}
// List and figured out if we need healing.
volsInfo, heal, err := xl.listAllVolInfo(volume)
if err != nil {
return VolInfo{}, err
}
// Heal for missing entries.
if heal {
go func() {
hErr := xl.healVolume(volume)
errorIf(hErr, "Unable to heal volume "+volume+".")
// Create volume if missing on disks.
for index, volInfo := range volsInfo {
if volInfo.Name != "" {
continue
}
// Volinfo name would be an empty string, create it.
xl.storageDisks[index].MakeVol(volume)
}
}()
}
// Loop through all statVols, calculate the actual usage values.
var total, free int64
for _, volInfo := range volsInfo {
for _, volInfo = range volsInfo {
if volInfo.Name == "" {
continue
}
free += volInfo.Free
total += volInfo.Total
}
// Filter volsInfo and update the volInfo.
volInfo = volsInfo[0]
// Update the aggregated values.
volInfo.Free = free
volInfo.Total = total
return volInfo, nil