mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
XL: ListVols should provide consistent view. (#1648)
Additionally get list of all volumes in parallel for aggregation and quorum verification. Fixes #1647
This commit is contained in:
parent
498ce1e9bb
commit
7de206cb85
@ -262,48 +262,58 @@ func (xl XL) DeleteVol(volume string) error {
|
||||
// ListVols - list volumes.
|
||||
func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
|
||||
emptyCount := 0
|
||||
// Success vols map carries successful results of ListVols from
|
||||
// each disks.
|
||||
var successVolsMap = make(map[int][]VolInfo)
|
||||
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
// Success vols map carries successful results of ListVols from each disks.
|
||||
var successVols = make([][]VolInfo, len(xl.storageDisks))
|
||||
for index, disk := range xl.storageDisks {
|
||||
var vlsInfo []VolInfo
|
||||
vlsInfo, err = disk.ListVols()
|
||||
if err == nil {
|
||||
if len(vlsInfo) == 0 {
|
||||
emptyCount++
|
||||
} else {
|
||||
successVolsMap[index] = vlsInfo
|
||||
wg.Add(1) // Add each go-routine to wait for.
|
||||
go func(index int, disk StorageAPI) {
|
||||
// Indicate wait group as finished.
|
||||
defer wg.Done()
|
||||
|
||||
// Initiate listing.
|
||||
vlsInfo, lErr := disk.ListVols()
|
||||
if lErr == nil {
|
||||
if len(vlsInfo) == 0 {
|
||||
emptyCount++ // Calculate empty count specially.
|
||||
} else {
|
||||
successVols[index] = vlsInfo
|
||||
}
|
||||
}
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
// For all the list volumes running in parallel to finish.
|
||||
wg.Wait()
|
||||
|
||||
// If all list operations resulted in an empty count which is same
|
||||
// as your total storage disks, then it is a valid case return
|
||||
// success with empty vols.
|
||||
if emptyCount == len(xl.storageDisks) {
|
||||
return []VolInfo{}, nil
|
||||
} else if len(successVolsMap) < xl.readQuorum {
|
||||
} else if len(successVols) < xl.readQuorum {
|
||||
// If there is data and not empty, then we attempt quorum verification.
|
||||
// Verify if we have enough quorum to list vols.
|
||||
return nil, errReadQuorum
|
||||
}
|
||||
|
||||
var total, free int64
|
||||
// Loop through success vols map and get aggregated usage values.
|
||||
for index := range xl.storageDisks {
|
||||
if _, ok := successVolsMap[index]; ok {
|
||||
volsInfo = successVolsMap[index]
|
||||
free += volsInfo[0].Free
|
||||
total += volsInfo[0].Total
|
||||
}
|
||||
}
|
||||
// Save the updated usage values back into the vols.
|
||||
for index := range volsInfo {
|
||||
volsInfo[index].Free = free
|
||||
volsInfo[index].Total = total
|
||||
// Loop through success vols and get aggregated usage values.
|
||||
for _, vlsInfo := range successVols {
|
||||
free += vlsInfo[0].Free
|
||||
total += vlsInfo[0].Total
|
||||
}
|
||||
|
||||
// TODO: the assumption here is that volumes across all disks in
|
||||
// Save the updated usage values back into the vols.
|
||||
for _, volInfo := range successVols[0] {
|
||||
volInfo.Free = free
|
||||
volInfo.Total = total
|
||||
volsInfo = append(volsInfo, volInfo)
|
||||
}
|
||||
|
||||
// NOTE: The assumption here is that volumes across all disks in
|
||||
// readQuorum have consistent view i.e they all have same number
|
||||
// of buckets. This is essentially not verified since healing
|
||||
// should take care of this.
|
||||
|
Loading…
Reference in New Issue
Block a user