heal: Report correctly in multip-pools setup (#15117)

`mc admin heal -r <alias>` in a multi setup pools returns incorrectly
grey objects. The reason is that erasure-server-pools.HealObject() runs
HealObject in all pools and returns the result of the first nil
error. However, in the lower erasureObject level, HealObject() returns
nil if an object does not exist + missing error in each disk of the object
in that pool, therefore confusing mc.

Make erasureObject.HealObject() to return not found error in the lower
level, so at least erasureServerPools will know what pools to ignore.
This commit is contained in:
Anis Elleuch
2022-06-20 16:07:45 +01:00
committed by GitHub
parent ce6c23a360
commit 73733a8fb9
3 changed files with 137 additions and 119 deletions

View File

@@ -306,9 +306,13 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Re-read when we have lock...
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
if isAllNotFound(errs) {
err := errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
// Nothing to do, file is already gone.
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
errs, bucket, object, versionID), err
}
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
@@ -327,7 +331,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// present, it is as good as object not found.
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
if err != nil {
return result, toObjectErr(err, bucket, object, versionID)
return result, err
}
// List of disks having all parts as per latest metadata.
@@ -397,8 +401,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
if isAllNotFound(errs) {
// File is fully gone, fileInfo is empty.
err := errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs,
bucket, object, versionID), nil
bucket, object, versionID), err
}
// If less than read quorum number of disks have all the parts
@@ -485,7 +493,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks,
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil {
return result, toObjectErr(err, bucket, object)
return result, err
}
erasureInfo := latestMeta.Erasure
@@ -523,7 +531,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
closeBitrotReaders(readers)
closeBitrotWriters(writers)
if err != nil {
return result, toObjectErr(err, bucket, object)
return result, err
}
// outDatedDisks that had write errors should not be
@@ -578,7 +586,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Attempt a rename now from healed data to final location.
if err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object); err != nil {
logger.LogIf(ctx, err)
return result, toObjectErr(err, bucket, object)
return result, err
}
// Remove any remaining parts from outdated disks from before transition.
@@ -674,10 +682,16 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
}
}
if dryRun || danglingObject || isAllNotFound(errs) {
if danglingObject || isAllNotFound(errs) {
// Nothing to do, file is already gone.
return hr, errFileNotFound
}
if dryRun {
// Quit without try to heal the object dir
return hr, nil
}
for i, err := range errs {
if err == errVolumeNotFound || err == errFileNotFound {
// Bucket or prefix/directory not found
@@ -837,8 +851,13 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
errs, bucket, object, versionID), err
}
// Object is considered dangling/corrupted if any only
@@ -908,12 +927,6 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
func (er erasureObjects) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) {
defer func() {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
err = nil
}
}()
// Create context that also contains information about the object and bucket.
// The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx)
@@ -927,7 +940,8 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
// Healing directories handle it separately.
if HasSuffix(object, SlashSeparator) {
return er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
hr, err := er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
return hr, toObjectErr(err, bucket, object)
}
storageDisks := er.getDisks()
@@ -942,9 +956,13 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
// This allows to quickly check if all is ok or all are missing.
_, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
if isAllNotFound(errs) {
err := errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
// Nothing to do, file is already gone.
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
}
// Heal the object.
@@ -955,5 +973,5 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
opts.ScanMode = madmin.HealDeepScan
hr, err = er.healObject(healCtx, bucket, object, versionID, opts)
}
return hr, err
return hr, toObjectErr(err, bucket, object, versionID)
}