heal: calculate the number of workers based on NRRequests (#17945)

This commit is contained in:
Anis Eleuch
2023-09-11 14:48:54 -07:00
committed by GitHub
parent 9878031cfd
commit 41de53996b
10 changed files with 131 additions and 61 deletions

View File

@@ -20,6 +20,7 @@ package cmd
import (
"context"
"fmt"
"runtime"
"sort"
"time"
@@ -30,6 +31,7 @@ import (
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/wildcard"
"github.com/minio/pkg/v2/workers"
)
const (
@@ -132,30 +134,8 @@ func getLocalBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.Bg
return status, true
}
func mustGetHealSequence(ctx context.Context) *healSequence {
// Get background heal sequence to send elements to heal
for {
globalHealStateLK.RLock()
hstate := globalBackgroundHealState
globalHealStateLK.RUnlock()
if hstate == nil {
time.Sleep(time.Second)
continue
}
bgSeq, ok := hstate.getHealSequenceByToken(bgHealingUUID)
if !ok {
time.Sleep(time.Second)
continue
}
return bgSeq
}
}
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
bgSeq := mustGetHealSequence(ctx)
scanMode := madmin.HealNormalScan
// Make sure to copy since `buckets slice`
@@ -173,6 +153,30 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
info, err := tracker.disk.DiskInfo(ctx, false)
if err != nil {
return fmt.Errorf("unable to get disk information before healing it: %w", err)
}
var numHealers uint64
if numCores := uint64(runtime.GOMAXPROCS(0)); info.NRRequests > numCores {
numHealers = numCores / 4
} else {
numHealers = info.NRRequests / 4
}
if numHealers < 4 {
numHealers = 4
}
// allow overriding this value as well..
if v := globalHealConfig.GetWorkers(); v > 0 {
numHealers = uint64(v)
}
logger.Info(fmt.Sprintf("Healing drive '%s' - use %d parallel workers.", tracker.disk.String(), numHealers))
jt, _ := workers.New(int(numHealers))
var retErr error
// Heal all buckets with all objects
for _, bucket := range healBuckets {
@@ -267,6 +271,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// Note: updates from healEntry to tracker must be sent on results channel.
healEntry := func(bucket string, entry metaCacheEntry) {
defer jt.Give()
if entry.name == "" && len(entry.metadata) == 0 {
// ignore entries that don't have metadata.
return
@@ -291,14 +297,17 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
// erasureObjects layer needs object names to be encoded
encodedEntryName := encodeDirObject(entry.name)
var result healEntryResult
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: entry.name,
versionID: "",
}, madmin.HealItemObject)
_, err := er.HealObject(ctx, bucket, encodedEntryName, "",
madmin.HealOpts{
ScanMode: scanMode,
Remove: healDeleteDangling,
})
if err != nil {
if isErrObjectNotFound(err) {
// queueing happens across namespace, ignore
@@ -321,11 +330,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
if version.ModTime.After(tracker.Started) {
continue
}
if err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: version.Name,
versionID: version.VersionID,
}, madmin.HealItemObject); err != nil {
if _, err := er.HealObject(ctx, bucket, encodedEntryName,
version.VersionID, madmin.HealOpts{
ScanMode: scanMode,
Remove: healDeleteDangling,
}); err != nil {
if isErrObjectNotFound(err) {
// queueing happens across namespace, ignore
// objects that are not found.
@@ -344,7 +353,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
} else {
result = healEntrySuccess(uint64(version.Size))
}
bgSeq.logHeal(madmin.HealItemObject)
if !send(result) {
return
@@ -382,7 +390,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
minDisks: 1,
reportNotFound: false,
agreed: func(entry metaCacheEntry) {
healEntry(actualBucket, entry)
jt.Take()
go healEntry(actualBucket, entry)
},
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
@@ -391,10 +400,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
healEntry(actualBucket, *entry)
jt.Take()
go healEntry(actualBucket, *entry)
},
finished: nil,
})
jt.Wait() // synchronize all the concurrent heal jobs
close(results)
if err != nil {
// Set this such that when we return this function