Parallelize the DiskInfo calls in xl.StorageInfo() (#8115)

This commit is contained in:
Praveen raj Mani 2019-08-23 08:32:40 +05:30 committed by Harshavardhana
parent f13f421e84
commit e211f6f52e
2 changed files with 38 additions and 19 deletions

View File

@ -304,14 +304,26 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
// StorageInfo - combines output of StorageInfo across all erasure coded object sets. // StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo { func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
var storageInfo StorageInfo var storageInfo StorageInfo
var wg sync.WaitGroup
storageInfos := make([]StorageInfo, len(s.sets))
storageInfo.Backend.Type = BackendErasure storageInfo.Backend.Type = BackendErasure
for _, set := range s.sets { for index, set := range s.sets {
lstorageInfo := set.StorageInfo(ctx) wg.Add(1)
storageInfo.Used = storageInfo.Used + lstorageInfo.Used go func(id int, set *xlObjects) {
storageInfo.Total = storageInfo.Total + lstorageInfo.Total defer wg.Done()
storageInfo.Available = storageInfo.Available + lstorageInfo.Available storageInfos[id] = set.StorageInfo(ctx)
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks + lstorageInfo.Backend.OnlineDisks }(index, set)
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks + lstorageInfo.Backend.OfflineDisks }
// Wait for the go routines.
wg.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used += lstorageInfo.Used
storageInfo.Total += lstorageInfo.Total
storageInfo.Available += lstorageInfo.Available
storageInfo.Backend.OnlineDisks += lstorageInfo.Backend.OnlineDisks
storageInfo.Backend.OfflineDisks += lstorageInfo.Backend.OfflineDisks
} }
scData, scParity := getRedundancyCount(standardStorageClass, s.drivesPerSet) scData, scParity := getRedundancyCount(standardStorageClass, s.drivesPerSet)

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"sort" "sort"
"sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/bpool"
@ -70,26 +71,32 @@ func (d byDiskTotal) Less(i, j int) bool {
// getDisksInfo - fetch disks info across all other storage API. // getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) { func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) {
disksInfo = make([]DiskInfo, len(disks)) disksInfo = make([]DiskInfo, len(disks))
var wg sync.WaitGroup
for i, storageDisk := range disks { for i, storageDisk := range disks {
if storageDisk == nil { if storageDisk == nil {
// Storage disk is empty, perhaps ignored disk or not available. // Storage disk is empty, perhaps ignored disk or not available.
offlineDisks++ offlineDisks++
continue continue
} }
info, err := storageDisk.DiskInfo() wg.Add(1)
go func(id int, sDisk StorageAPI) {
defer wg.Done()
info, err := sDisk.DiskInfo()
if err != nil { if err != nil {
ctx := context.Background() reqInfo := (&logger.ReqInfo{}).AppendTags("disk", sDisk.String())
logger.GetReqInfo(ctx).AppendTags("disk", storageDisk.String()) ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
if IsErr(err, baseErrs...) { if IsErr(err, baseErrs...) {
offlineDisks++ offlineDisks++
continue return
} }
} }
onlineDisks++ onlineDisks++
disksInfo[i] = info disksInfo[id] = info
}(i, storageDisk)
} }
// Wait for the routines.
wg.Wait()
// Success. // Success.
return disksInfo, onlineDisks, offlineDisks return disksInfo, onlineDisks, offlineDisks
} }