heal: Better reporting to mc with dangling/timeout errors (#20690)

The code assigns corrupted state to a drive for any unexpected error,
which is confusing for users. This change will make sure to assign
corrupted state only for corrupted parts or xl.meta. Use unknown state
with a explanation for any unexpected error, like canceled, deadline
errors, drive timeout, ...

Also make sure to return the bucket/object name when the object is not
found or marked not found by the heal dangling code.
This commit is contained in:
Anis Eleuch
2024-11-26 19:45:35 +01:00
committed by GitHub
parent 366876e98b
commit 02e93fd6ba
2 changed files with 35 additions and 28 deletions

View File

@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"slices"
"strconv"
"strings"
"sync"
@@ -149,7 +148,10 @@ var errLegacyXLMeta = errors.New("legacy XL meta")
var errOutdatedXLMeta = errors.New("outdated XL meta")
var errPartMissingOrCorrupt = errors.New("part missing or corrupt")
var (
errPartCorrupt = errors.New("part corrupt")
errPartMissing = errors.New("part missing")
)
// Only heal on disks where we are sure that healing is needed. We can expand
// this list as and when we figure out more errors can be added to this list safely.
@@ -169,11 +171,11 @@ func shouldHealObjectOnDisk(erErr error, partsErrs []int, meta FileInfo, latestM
if !meta.Deleted && !meta.IsRemote() {
// If xl.meta was read fine but there may be problem with the part.N files.
for _, partErr := range partsErrs {
if slices.Contains([]int{
checkPartFileNotFound,
checkPartFileCorrupt,
}, partErr) {
return true, errPartMissingOrCorrupt
if partErr == checkPartFileNotFound {
return true, errPartMissing
}
if partErr == checkPartFileCorrupt {
return true, errPartCorrupt
}
}
}
@@ -254,6 +256,21 @@ func (er *erasureObjects) auditHealObject(ctx context.Context, bucket, object, v
auditLogInternal(ctx, opts)
}
func objectErrToDriveState(reason error) string {
switch {
case reason == nil:
return madmin.DriveStateOk
case IsErr(reason, errDiskNotFound):
return madmin.DriveStateOffline
case IsErr(reason, errFileNotFound, errFileVersionNotFound, errVolumeNotFound, errPartMissing, errOutdatedXLMeta, errLegacyXLMeta):
return madmin.DriveStateMissing
case IsErr(reason, errFileCorrupt, errPartCorrupt):
return madmin.DriveStateCorrupt
default:
return fmt.Sprintf("%s (%s)", madmin.DriveStateUnknown, reason.Error())
}
}
// Heals an object by re-writing corrupt/missing erasure blocks.
func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
dryRun := opts.DryRun
@@ -379,18 +396,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
disksToHealCount++
}
driveState := ""
switch {
case reason == nil:
driveState = madmin.DriveStateOk
case IsErr(reason, errDiskNotFound):
driveState = madmin.DriveStateOffline
case IsErr(reason, errFileNotFound, errFileVersionNotFound, errVolumeNotFound, errPartMissingOrCorrupt, errOutdatedXLMeta, errLegacyXLMeta):
driveState = madmin.DriveStateMissing
default:
// all remaining cases imply corrupt data/metadata
driveState = madmin.DriveStateCorrupt
}
driveState := objectErrToDriveState(reason)
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
@@ -817,13 +823,7 @@ func (er *erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []Storage
})
continue
}
driveState := madmin.DriveStateCorrupt
switch errs[index] {
case errFileNotFound, errVolumeNotFound:
driveState = madmin.DriveStateMissing
case nil:
driveState = madmin.DriveStateOk
}
driveState := objectErrToDriveState(errs[index])
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[index].String(),