From b1d98febfd2fdc6f9617603f4fbb3a07b52aa79f Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 8 Feb 2023 18:25:29 +0100 Subject: [PATCH] New disk healing goes through the healing workers (#16568) --- cmd/background-heal-ops.go | 14 ++++++++++++ cmd/global-heal.go | 44 +++++++------------------------------- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 3550f06d5..ea55c968f 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -19,9 +19,13 @@ package cmd import ( "context" + "fmt" "runtime" + "strconv" "github.com/minio/madmin-go/v2" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/env" ) // healTask represents what to heal along with options @@ -111,9 +115,19 @@ func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) { func newHealRoutine() *healRoutine { workers := runtime.GOMAXPROCS(0) / 2 + + if envHealWorkers := env.Get("_MINIO_HEAL_WORKERS", ""); envHealWorkers != "" { + if numHealers, err := strconv.Atoi(envHealWorkers); err != nil { + logger.LogIf(context.Background(), fmt.Errorf("invalid _MINIO_HEAL_WORKERS value: %w", err)) + } else { + workers = numHealers + } + } + if workers == 0 { workers = 4 } + return &healRoutine{ tasks: make(chan healTask), workers: workers, diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 9f21dd3f4..a95c6a59e 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -21,17 +21,14 @@ import ( "context" "fmt" "sort" - "strconv" "time" "github.com/dustin/go-humanize" "github.com/minio/madmin-go/v2" "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config/storageclass" - "github.com/minio/minio/internal/jobtokens" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" - "github.com/minio/pkg/env" "github.com/minio/pkg/wildcard" ) @@ -163,8 +160,6 @@ func mustGetHealSequence(ctx context.Context) *healSequence { } } -const envHealWorkers = "_MINIO_HEAL_WORKERS" - // healErasureSet lists and heals all objects in a specific erasure set func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error { bgSeq := mustGetHealSequence(ctx) @@ -185,16 +180,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } } - // numHealers - number of concurrent heal jobs, defaults to 1 - numHealers, err := strconv.Atoi(env.Get(envHealWorkers, "1")) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("invalid %s value %v, defaulting to 1", envHealWorkers, err)) - } - if numHealers < 1 { - numHealers = 1 - } - // jt will never be nil since we ensure that numHealers > 0 - jt, _ := jobtokens.New(numHealers) var retErr error // Heal all buckets with all objects for _, bucket := range healBuckets { @@ -290,8 +275,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Note: updates from healEntry to tracker must be sent on results channel. healEntry := func(entry metaCacheEntry) { - defer jt.Give() - if entry.name == "" && len(entry.metadata) == 0 { // ignore entries that don't have metadata. return @@ -339,20 +322,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, return } - // erasureObjects layer needs object names to be encoded - encodedEntryName := encodeDirObject(entry.name) - if healDeleteDangling { - err := er.checkAbandonedParts(ctx, bucket, encodedEntryName, madmin.HealOpts{Remove: healDeleteDangling}) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err)) - } - } for _, version := range fivs.Versions { - if _, err := er.HealObject(ctx, bucket, encodedEntryName, - version.VersionID, madmin.HealOpts{ - ScanMode: scanMode, - Remove: healDeleteDangling, - }); err != nil { + if err := bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: version.Name, + versionID: version.VersionID, + }, madmin.HealItemObject); err != nil { // If not deleted, assume they failed. result = healEntryFailure(uint64(version.Size)) if version.VersionID != "" { @@ -388,7 +363,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, bucket: bucket, } - err = listPathRaw(ctx, listPathRawOptions{ + err := listPathRaw(ctx, listPathRawOptions{ disks: disks, bucket: bucket, recursive: true, @@ -396,8 +371,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, minDisks: 1, reportNotFound: false, agreed: func(entry metaCacheEntry) { - jt.Take() - go healEntry(entry) + healEntry(entry) }, partial: func(entries metaCacheEntries, _ []error) { entry, ok := entries.resolve(&resolver) @@ -406,12 +380,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // proceed to heal nonetheless. entry, _ = entries.firstFound() } - jt.Take() - go healEntry(*entry) + healEntry(*entry) }, finished: nil, }) - jt.Wait() // synchronize all the concurrent heal jobs close(results) if err != nil { // Set this such that when we return this function