count metrics properly for any failures during drive heal (#20193)

or via `mc admin heal --set 1 --pool 1`
This commit is contained in:
Harshavardhana 2024-07-30 22:46:26 -07:00 committed by GitHub
parent 01a8c09920
commit a9dc061d84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 9 deletions

View File

@ -761,6 +761,15 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
return nil return nil
} }
countOKDrives := func(drives []madmin.HealDriveInfo) (count int) {
for _, drive := range drives {
if drive.State == madmin.DriveStateOk {
count++
}
}
return count
}
// task queued, now wait for the response. // task queued, now wait for the response.
select { select {
case res := <-task.respCh: case res := <-task.respCh:
@ -781,6 +790,11 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if res.err != nil { if res.err != nil {
res.result.Detail = res.err.Error() res.result.Detail = res.err.Error()
} }
if res.result.ParityBlocks > 0 && res.result.DataBlocks > 0 && res.result.DataBlocks > res.result.ParityBlocks {
if got := countOKDrives(res.result.After.Drives); got < res.result.ParityBlocks {
res.result.Detail = fmt.Sprintf("quorum loss - expected %d minimum, got drive states in OK %d", res.result.ParityBlocks, got)
}
}
return h.pushHealResultItem(res.result) return h.pushHealResultItem(res.result)
case <-h.ctx.Done(): case <-h.ctx.Done():
return nil return nil

View File

@ -150,6 +150,11 @@ type healEntryResult struct {
// healErasureSet lists and heals all objects in a specific erasure set // healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error { func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if !found {
return errors.New("no local healing sequence initialized, unable to heal the drive")
}
scanMode := madmin.HealNormalScan scanMode := madmin.HealNormalScan
// Make sure to copy since `buckets slice` // Make sure to copy since `buckets slice`
@ -163,11 +168,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
} }
for _, bucket := range healBuckets { for _, bucket := range healBuckets {
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
Recreate: true,
ScanMode: scanMode,
})
if err != nil {
// Log bucket healing error if any, we shall retry again. // Log bucket healing error if any, we shall retry again.
healingLogIf(ctx, err) healingLogIf(ctx, err)
} }
@ -264,10 +265,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
tracker.setBucket(bucket) tracker.setBucket(bucket)
// Heal current bucket again in case if it is failed // Heal current bucket again in case if it is failed
// in the beginning of erasure set healing // in the beginning of erasure set healing
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
Recreate: true,
ScanMode: scanMode,
}); err != nil {
// Set this such that when we return this function // Set this such that when we return this function
// we let the caller retry this disk again for the // we let the caller retry this disk again for the
// buckets that failed healing. // buckets that failed healing.
@ -366,6 +364,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
} }
return false return false
case results <- result: case results <- result:
bgSeq.countScanned(madmin.HealItemObject)
return true return true
} }
} }
@ -416,8 +415,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
return return
} }
result = healEntryFailure(0) result = healEntryFailure(0)
bgSeq.countFailed(madmin.HealItemObject)
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err)) healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
} else { } else {
bgSeq.countHealed(madmin.HealItemObject)
result = healEntrySuccess(uint64(res.ObjectSize)) result = healEntrySuccess(uint64(res.ObjectSize))
} }
@ -463,8 +464,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
} }
if versionHealed { if versionHealed {
bgSeq.countHealed(madmin.HealItemObject)
result = healEntrySuccess(uint64(version.Size)) result = healEntrySuccess(uint64(version.Size))
} else { } else {
bgSeq.countFailed(madmin.HealItemObject)
result = healEntryFailure(uint64(version.Size)) result = healEntryFailure(uint64(version.Size))
if version.VersionID != "" { if version.VersionID != "" {
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err)) healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err))