heal: List and heal again for any listing error (#19999)

When a fresh drive healing is finished, add more checks for the drive listing
errors. If any, re-list and heal again. Although this is an infrequent use
case to have listPathRaw() returning nil when minDisks is set to 1, we
still need to handle all possible use cases to avoid missing healing
any object.

Also, check for HealObject result to decide of an object is healed in the
fresh disk since HealObject returns nil if an object is healed in any
disk, and not in the new fresh drive.
This commit is contained in:
Anis Eleuch 2024-07-10 17:55:36 +01:00 committed by GitHub
parent b3bac73c0f
commit ce183cb2b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 22 additions and 6 deletions

View File

@ -629,7 +629,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
}
for i, v := range result.Before.Drives {
if v.Endpoint == disk.String() {
if v.Endpoint == disk.Endpoint().String() {
result.After.Drives[i].State = madmin.DriveStateOk
}
}

View File

@ -441,6 +441,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
continue
}
var versionHealed bool
res, err := er.HealObject(ctx, bucket, encodedEntryName,
version.VersionID, madmin.HealOpts{
ScanMode: scanMode,
@ -453,15 +455,22 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
versionNotFound++
continue
}
// If not deleted, assume they failed.
} else {
// Look for the healing results
if res.After.Drives[tracker.DiskIndex].State == madmin.DriveStateOk {
versionHealed = true
}
}
if versionHealed {
result = healEntrySuccess(uint64(version.Size))
} else {
result = healEntryFailure(uint64(version.Size))
if version.VersionID != "" {
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err))
} else {
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, version.Name, err))
}
} else {
result = healEntrySuccess(uint64(res.ObjectSize))
}
if !send(result) {
@ -509,7 +518,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
jt.Take()
go healEntry(bucket, *entry)
},
finished: nil,
finished: func(errs []error) {
if countErrs(errs, nil) != len(errs) {
retErr = fmt.Errorf("one or more errors reported during listing: %v", errors.Join(errs...))
}
},
})
jt.Wait() // synchronize all the concurrent heal jobs
if err != nil {
@ -517,7 +530,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// we let the caller retry this disk again for the
// buckets it failed to list.
retErr = err
healingLogIf(ctx, fmt.Errorf("listing failed with: %v on bucket: %v", err, bucket))
}
if retErr != nil {
healingLogIf(ctx, fmt.Errorf("listing failed with: %v on bucket: %v", retErr, bucket))
continue
}