diff --git a/cmd/global-heal.go b/cmd/global-heal.go index e766e2557..43b38ba73 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -243,6 +243,53 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, disks = disks[:3] } + type healEntryResult struct { + bytes uint64 + success bool + entryDone bool + name string + } + healEntryDone := func(name string) healEntryResult { + return healEntryResult{ + entryDone: true, + name: name, + } + } + healEntrySuccess := func(sz uint64) healEntryResult { + return healEntryResult{ + bytes: sz, + success: true, + } + } + healEntryFailure := func(sz uint64) healEntryResult { + return healEntryResult{ + bytes: sz, + } + } + + // Collect updates to tracker from concurrent healEntry calls + results := make(chan healEntryResult) + go func() { + for res := range results { + if res.entryDone { + tracker.Object = res.name + if time.Since(tracker.LastUpdate) > time.Minute { + logger.LogIf(ctx, tracker.update(ctx)) + } + continue + } + + if res.success { + tracker.ItemsHealed++ + tracker.BytesDone += res.bytes + } else { + tracker.ItemsFailed++ + tracker.BytesFailed += res.bytes + } + } + }() + + // Note: updates from healEntry to tracker must be sent on results channel. healEntry := func(entry metaCacheEntry) { defer jt.Give() @@ -269,6 +316,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } } + var result healEntryResult fivs, err := entry.fileInfoVersions(bucket) if err != nil { err := bgSeq.queueHealTask(healSource{ @@ -277,12 +325,18 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, versionID: "", }, madmin.HealItemObject) if err != nil { - tracker.ItemsFailed++ + result = healEntryFailure(0) logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err)) } else { - tracker.ItemsHealed++ + result = healEntrySuccess(0) } - bgSeq.logHeal(madmin.HealItemObject) + + select { + case <-ctx.Done(): + return + case results <- result: + } + return } @@ -296,22 +350,27 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, Remove: healDeleteDangling, }); err != nil { // If not deleted, assume they failed. - tracker.ItemsFailed++ - tracker.BytesFailed += uint64(version.Size) + result = healEntryFailure(uint64(version.Size)) if version.VersionID != "" { logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err)) } else { logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, version.Name, err)) } } else { - tracker.ItemsHealed++ - tracker.BytesDone += uint64(version.Size) + result = healEntrySuccess(uint64(version.Size)) } bgSeq.logHeal(madmin.HealItemObject) + + select { + case <-ctx.Done(): + return + case results <- result: + } } - tracker.Object = entry.name - if time.Since(tracker.LastUpdate) > time.Minute { - logger.LogIf(ctx, tracker.update(ctx)) + select { + case <-ctx.Done(): + return + case results <- healEntryDone(entry.name): } // Wait and proceed if there are active requests @@ -349,6 +408,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, finished: nil, }) jt.Wait() // synchronize all the concurrent heal jobs + close(results) if err != nil { // Set this such that when we return this function // we let the caller retry this disk again for the