heal: Reset healing params when a retry is decided (#20285)

Currently, retry healing of a new drive healing does not reset
HealedBuckets means that the next healing retry will skip those
buckets. The commit will fix this behavior.

Also, the skipped objects counter will include objects uploaded
that are uploaded after the healing is started.
This commit is contained in:
Anis Eleuch
2024-08-22 13:35:43 +01:00
committed by GitHub
parent 2d44c161c7
commit a8f143298f
4 changed files with 46 additions and 16 deletions

View File

@@ -148,6 +148,26 @@ func initHealingTracker(disk StorageAPI, healID string) *healingTracker {
return h
}
func (h *healingTracker) resetHealing() {
h.mu.Lock()
defer h.mu.Unlock()
h.ItemsHealed = 0
h.ItemsFailed = 0
h.BytesDone = 0
h.BytesFailed = 0
h.ResumeItemsHealed = 0
h.ResumeItemsFailed = 0
h.ResumeBytesDone = 0
h.ResumeBytesFailed = 0
h.ItemsSkipped = 0
h.BytesSkipped = 0
h.HealedBuckets = nil
h.Object = ""
h.Bucket = ""
}
func (h *healingTracker) getLastUpdate() time.Time {
h.mu.RLock()
defer h.mu.RUnlock()
@@ -349,6 +369,7 @@ func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
Object: h.Object,
QueuedBuckets: h.QueuedBuckets,
HealedBuckets: h.HealedBuckets,
RetryAttempts: h.RetryAttempts,
ObjectsHealed: h.ItemsHealed, // Deprecated July 2021
ObjectsFailed: h.ItemsFailed, // Deprecated July 2021
@@ -482,16 +503,19 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
// if objects have failed healing, we attempt a retry to heal the drive upto 3 times before giving up.
if tracker.ItemsFailed > 0 && tracker.RetryAttempts < 4 {
tracker.RetryAttempts++
bugLogIf(ctx, tracker.update(ctx))
healingLogEvent(ctx, "Healing of drive '%s' is incomplete, retrying %s time (healed: %d, skipped: %d, failed: %d).", disk,
humanize.Ordinal(int(tracker.RetryAttempts)), tracker.ItemsHealed, tracker.ItemsSkipped, tracker.ItemsFailed)
tracker.resetHealing()
bugLogIf(ctx, tracker.update(ctx))
return errRetryHealing
}
if tracker.ItemsFailed > 0 {
healingLogEvent(ctx, "Healing of drive '%s' is incomplete, retried %d times (healed: %d, skipped: %d, failed: %d).", disk,
tracker.RetryAttempts-1, tracker.ItemsHealed, tracker.ItemsSkipped, tracker.ItemsFailed)
tracker.RetryAttempts, tracker.ItemsHealed, tracker.ItemsSkipped, tracker.ItemsFailed)
} else {
if tracker.RetryAttempts > 0 {
healingLogEvent(ctx, "Healing of drive '%s' is complete, retried %d times (healed: %d, skipped: %d).", disk,

View File

@@ -167,6 +167,19 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
return errServerNotInitialized
}
started := tracker.Started
if started.IsZero() || started.Equal(timeSentinel) {
healingLogIf(ctx, fmt.Errorf("unexpected tracker healing start time found: %v", started))
started = time.Time{}
}
// Final tracer update before quitting
defer func() {
tracker.setObject("")
tracker.setBucket("")
healingLogIf(ctx, tracker.update(ctx))
}()
for _, bucket := range healBuckets {
if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
// Log bucket healing error if any, we shall retry again.
@@ -435,13 +448,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
var versionNotFound int
for _, version := range fivs.Versions {
// Ignore a version with a modtime newer than healing start time.
if version.ModTime.After(tracker.Started) {
continue
}
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bucket, version.Name, version) {
// Ignore healing a version if:
// - It is uploaded after the drive healing is started
// - An object that is already expired by ILM rule.
if !started.IsZero() && version.ModTime.After(started) || filterLifecycle(bucket, version.Name, version) {
versionNotFound++
if !send(healEntrySkipped(uint64(version.Size))) {
return
@@ -556,10 +566,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
healingLogIf(ctx, tracker.update(ctx))
}
}
tracker.setObject("")
tracker.setBucket("")
if retErr != nil {
return retErr
}