cache DiskInfo at storage layer for performance (#10586)

`mc admin info` on busy setups will not move HDD
heads unnecessarily for repeated calls, provides
a better responsiveness for the call overall.

Bonus change allow listTolerancePerSet be N-1
for good entries, to avoid skipping entries
for some reason one of the disk went offline.
This commit is contained in:
Harshavardhana 2020-09-29 09:54:41 -07:00 committed by GitHub
parent 66174692a2
commit 00eb6f6bc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 38 deletions

View File

@ -28,7 +28,7 @@ verifiers: getdeps fmt lint ruleguard check-gen
check-gen:
@go generate ./... >/dev/null
@git diff --exit-code >/dev/null || echo "Non-committed changes in auto-generated code are detected, please check."
@git diff --exit-code >/dev/null || echo "Non-committed changes detected, please commit them to proceed."
fmt:
@echo "Running $@ check"

View File

@ -164,8 +164,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
for _, disk := range disks {
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
lbDisks := z.zones[i].sets[setIndex].getLoadBalancedDisks()
if err := healErasureSet(ctx, setIndex, buckets, lbDisks, z.zones[i].setDriveCount); err != nil {
lbDisks := z.zones[i].sets[setIndex].getLoadBalancedNDisks(z.zones[i].listTolerancePerSet)
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err)
continue
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"path"
"sync"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -53,16 +54,37 @@ func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAP
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
// ensures to skip disks if they are not healing and online.
func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) {
func (er erasureObjects) getLoadBalancedDisks() []StorageAPI {
disks := er.getDisks()
var wg sync.WaitGroup
var mu sync.Mutex
var newDisks []StorageAPI
// Based on the random shuffling return back randomized disks.
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
// Do not consume disks which are being healed.
if disks[i-1] != nil && !disks[i-1].Healing() && disks[i-1].IsOnline() {
i := i
wg.Add(1)
go func() {
defer wg.Done()
if disks[i-1] == nil {
return
}
di, err := disks[i-1].DiskInfo(context.Background())
if err != nil || di.Healing {
// - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason.
//
// - Do not consume disks which are being healed
//
// - Future: skip busy disks
return
}
mu.Lock()
newDisks = append(newDisks, disks[i-1])
mu.Unlock()
}()
}
}
wg.Wait()
return newDisks
}

View File

@ -662,7 +662,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false))
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1)
}
var objInfos []ObjectInfo
@ -784,7 +784,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
@ -876,7 +876,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
@ -1278,7 +1278,7 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1)
}
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)

View File

@ -97,7 +97,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI, setDriveCount int) error {
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error {
// Get background heal sequence to send elements to heal
var bgSeq *healSequence
var ok bool
@ -131,10 +131,6 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
var mu sync.Mutex
var wg sync.WaitGroup
for _, disk := range disks {
if disk == nil {
// Disk can be offline
continue
}
disk := disk
wg.Add(1)
go func() {
@ -157,16 +153,11 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
entries := make([]FileInfoVersions, len(entryChs))
for {
entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
if !ok {
break
}
if quorumCount == setDriveCount {
// Skip good entries.
continue
}
for _, version := range entry.Versions {
bgSeq.sourceCh <- healSource{
bucket: bucket.Name,

View File

@ -161,6 +161,12 @@ func (client *storageRESTClient) Endpoint() Endpoint {
}
func (client *storageRESTClient) Healing() bool {
// This call should never be called over the network
// this function should always return 'false'
//
// To know if a remote disk is being healed
// perform DiskInfo() call which would return
// back the correct data if disk is being healed.
return false
}

View File

@ -105,6 +105,8 @@ type xlStorage struct {
formatFileInfo os.FileInfo
formatLastCheck time.Time
diskInfoCache timedValue
ctx context.Context
sync.RWMutex
}
@ -446,6 +448,7 @@ type DiskInfo struct {
Total uint64
Free uint64
Used uint64
FSType string
RootDisk bool
Healing bool
Endpoint string
@ -462,23 +465,41 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
atomic.AddInt32(&s.activeIOCount, -1)
}()
di, err := getDiskInfo(s.diskPath)
if err != nil {
return info, err
}
info = DiskInfo{
Total: di.Total,
Free: di.Free,
Used: di.Total - di.Free,
Healing: s.Healing(),
s.diskInfoCache.Once.Do(func() {
s.diskInfoCache.TTL = time.Second
s.diskInfoCache.Update = func() (interface{}, error) {
dcinfo := DiskInfo{
RootDisk: s.rootDisk,
MountPath: s.diskPath,
Endpoint: s.endpoint.String(),
}
di, err := getDiskInfo(s.diskPath)
if err != nil {
return dcinfo, err
}
dcinfo.Total = di.Total
dcinfo.Free = di.Free
dcinfo.Used = di.Total - di.Free
dcinfo.FSType = di.FSType
diskID, err := s.GetDiskID()
info.ID = diskID
if errors.Is(err, errUnformattedDisk) {
// if we found an unformatted disk then
// healing is automatically true.
dcinfo.Healing = true
} else {
// Check if the disk is being healed if GetDiskID
// returned any error other than fresh disk
dcinfo.Healing = s.Healing()
}
dcinfo.ID = diskID
return dcinfo, err
}
})
v, err := s.diskInfoCache.Get()
info = v.(DiskInfo)
return info, err
}
@ -503,7 +524,7 @@ func (s *xlStorage) GetDiskID() (string, error) {
s.RUnlock()
// check if we have a valid disk ID that is less than 1 second old.
if fileInfo != nil && diskID != "" && time.Now().Before(lastCheck.Add(time.Second)) {
if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second {
return diskID, nil
}