diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index b14b26f72..6e41b3640 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -780,7 +780,8 @@ func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request go func(setNumber int) { defer wg.Done() lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks() - if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil { + setDriveCount := z.SetDriveCount() + if err := healErasureSet(ctx, setNumber, setDriveCount, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil { logger.LogIf(ctx, err) } }(setNumber) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 15ac3ec8c..017e819b0 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -168,7 +168,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq * logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks() - if err := healErasureSet(ctx, setIndex, 10, time.Second, buckets, lbDisks); err != nil { + setDriveCount := z.SetDriveCount() + if err := healErasureSet(ctx, setIndex, setDriveCount, 10, time.Second, buckets, lbDisks); err != nil { logger.LogIf(ctx, err) continue } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 2d2973fb5..03d91c2c1 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -98,7 +98,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error { +func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error { // Get background heal sequence to send elements to heal var bgSeq *healSequence var ok bool @@ -128,59 +128,69 @@ func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time. }) // add metadata .minio.sys/ bucket prefixes to heal // Heal all buckets with all objects + var wg sync.WaitGroup for _, bucket := range buckets { - // Heal current bucket - bgSeq.sourceCh <- healSource{ - bucket: bucket.Name, - } + wg.Add(1) + go func(bucket BucketInfo, disks []StorageAPI) { + defer wg.Done() - var entryChs []FileInfoVersionsCh - var mu sync.Mutex - var wg sync.WaitGroup - for _, disk := range disks { - disk := disk - wg.Add(1) - go func() { - defer wg.Done() - entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) - if err != nil { - // Disk walk returned error, ignore it. - return - } - mu.Lock() - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) - mu.Unlock() - }() - } - wg.Wait() - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) - - for { - entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) - if !ok { - break + // Heal current bucket + bgSeq.sourceCh <- healSource{ + bucket: bucket.Name, } - for _, version := range entry.Versions { - hsrc := healSource{ - bucket: bucket.Name, - object: version.Name, - versionID: version.VersionID, + var entryChs []FileInfoVersionsCh + var mu sync.Mutex + var wwg sync.WaitGroup + for _, disk := range disks { + wwg.Add(1) + go func(disk StorageAPI) { + defer wwg.Done() + entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) + if err != nil { + // Disk walk returned error, ignore it. + return + } + mu.Lock() + entryChs = append(entryChs, FileInfoVersionsCh{ + Ch: entryCh, + }) + mu.Unlock() + }(disk) + } + wwg.Wait() + + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfoVersions, len(entryChs)) + + for { + entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) + if !ok { + break } - hsrc.throttle.maxIO = maxIO - hsrc.throttle.maxSleep = maxSleep - if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil { - if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { - logger.LogIf(ctx, err) + + if quorumCount == setDriveCount { + continue + } + + for _, version := range entry.Versions { + hsrc := healSource{ + bucket: bucket.Name, + object: version.Name, + versionID: version.VersionID, + } + hsrc.throttle.maxIO = maxIO + hsrc.throttle.maxSleep = maxSleep + if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil { + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } } } } - } + }(bucket, disks) } + wg.Wait() return nil }