add more dangling heal related tests (#13140)

also make sure that HealObject() never returns
'ObjectNotFound' or 'VersionNotFound' errors,
as those are meaningless and not useful for the
caller.
This commit is contained in:
Harshavardhana 2021-09-02 20:56:13 -07:00 committed by GitHub
parent 495c55e6a5
commit a19e3bc9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 73 additions and 62 deletions

View File

@ -720,14 +720,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal
return nil return nil
} }
// Object might have been deleted, by the time heal
// was attempted, we should ignore this object and
// return the error and not calculate this object
// as part of the metrics.
if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) {
// Return the error so that caller can handle it.
return res.err
}
h.mutex.Lock() h.mutex.Lock()
defer h.mutex.Unlock() defer h.mutex.Unlock()
@ -751,11 +743,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
} }
res.result.Type = healType res.result.Type = healType
if res.err != nil { if res.err != nil {
// Object might have been deleted, by the time heal
// was attempted, we should ignore this object and return success.
if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) {
return nil
}
// Only report object error // Only report object error
if healType != madmin.HealItemObject { if healType != madmin.HealItemObject {
return res.err return res.err
@ -812,11 +799,6 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f
object: object, object: object,
versionID: versionID, versionID: versionID,
}, madmin.HealItemBucketMetadata) }, madmin.HealItemBucketMetadata)
// Object might have been deleted, by the time heal
// was attempted we ignore this object an move on.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return nil
}
return err return err
}) })
} }
@ -865,9 +847,7 @@ func (h *healSequence) healBuckets(objAPI ObjectLayer, bucketsOnly bool) error {
// healBucket - traverses and heals given bucket // healBucket - traverses and heals given bucket
func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly bool) error { func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly bool) error {
if err := h.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket); err != nil { if err := h.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { return err
return err
}
} }
if bucketsOnly { if bucketsOnly {
@ -881,9 +861,6 @@ func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly
oi, err := objAPI.GetObjectInfo(h.ctx, bucket, h.object, ObjectOptions{}) oi, err := objAPI.GetObjectInfo(h.ctx, bucket, h.object, ObjectOptions{})
if err == nil { if err == nil {
if err = h.healObject(bucket, h.object, oi.VersionID); err != nil { if err = h.healObject(bucket, h.object, oi.VersionID); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return nil
}
return err return err
} }
} }
@ -893,11 +870,7 @@ func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly
} }
if err := objAPI.HealObjects(h.ctx, bucket, h.object, h.settings, h.healObject); err != nil { if err := objAPI.HealObjects(h.ctx, bucket, h.object, h.settings, h.healObject); err != nil {
// Object might have been deleted, by the time heal return errFnHealFromAPIErr(h.ctx, err)
// was attempted we ignore this object an move on.
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
return errFnHealFromAPIErr(h.ctx, err)
}
} }
return nil return nil
} }

View File

@ -699,9 +699,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
object: entry.name, object: entry.name,
versionID: "", versionID: "",
}, madmin.HealItemObject) }, madmin.HealItemObject)
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err)
logger.LogIf(ctx, err)
}
foundObjs = foundObjs || err == nil foundObjs = foundObjs || err == nil
return return
} }
@ -716,9 +714,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
object: fiv.Name, object: fiv.Name,
versionID: ver.VersionID, versionID: ver.VersionID,
}, madmin.HealItemObject) }, madmin.HealItemObject)
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err)
logger.LogIf(ctx, err)
}
foundObjs = foundObjs || err == nil foundObjs = foundObjs || err == nil
} }
}, },
@ -872,9 +868,6 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi Object
ScanMode: globalHealConfig.ScanMode(), ScanMode: globalHealConfig.ScanMode(),
} }
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts) res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts)
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
}
if err != nil && !errors.Is(err, NotImplemented{}) { if err != nil && !errors.Is(err, NotImplemented{}) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return 0 return 0

View File

@ -885,6 +885,12 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true. // HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
func (er erasureObjects) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) { func (er erasureObjects) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) {
defer func() {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
err = nil
}
}()
// Create context that also contains information about the object and bucket. // Create context that also contains information about the object and bucket.
// The top level handler might not have this information. // The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx) reqInfo := logger.GetReqInfo(ctx)

View File

@ -288,6 +288,61 @@ func TestHealingDanglingObject(t *testing.T) {
if fileInfoPostHeal.NumVersions != 2 { if fileInfoPostHeal.NumVersions != 2 {
t.Fatalf("Expected versions 2, got %d", fileInfoPreHeal.NumVersions) t.Fatalf("Expected versions 2, got %d", fileInfoPreHeal.NumVersions)
} }
rd = mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", "")
objInfo, err = objLayer.PutObject(ctx, bucket, object, rd, ObjectOptions{
Versioned: true,
})
if err != nil {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0400); err != nil {
t.Fatal(err)
}
}
// Create delete marker under quorum.
_, err = objLayer.DeleteObject(ctx, bucket, object, ObjectOptions{
Versioned: true,
VersionID: objInfo.VersionID,
})
if err != nil {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0755); err != nil {
t.Fatal(err)
}
}
fileInfoPreHeal, err = disks[0].ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
t.Fatal(err)
}
if fileInfoPreHeal.NumVersions != 3 {
t.Fatalf("Expected versions 3, got %d", fileInfoPreHeal.NumVersions)
}
if err = objLayer.HealObjects(ctx, bucket, "", madmin.HealOpts{Remove: true},
func(bucket, object, vid string) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{Remove: true})
return err
}); err != nil {
t.Fatal(err)
}
fileInfoPostHeal, err = disks[0].ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
t.Fatal(err)
}
if fileInfoPostHeal.NumVersions != 2 {
t.Fatalf("Expected versions 2, got %d", fileInfoPreHeal.NumVersions)
}
} }
func TestHealObjectCorrupted(t *testing.T) { func TestHealObjectCorrupted(t *testing.T) {

View File

@ -1738,9 +1738,6 @@ func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, ver
result, err := pool.HealObject(ctx, bucket, object, versionID, opts) result, err := pool.HealObject(ctx, bucket, object, versionID, opts)
result.Object = decodeDirObject(result.Object) result.Object = decodeDirObject(result.Object)
if err != nil { if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue
}
return result, err return result, err
} }
return result, nil return result, nil

View File

@ -192,9 +192,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{ if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{
ScanMode: scanMode, ScanMode: scanMode,
}); err != nil { }); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err)
logger.LogIf(ctx, err)
}
} }
if serverDebugLog { if serverDebugLog {
@ -242,9 +240,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
object: entry.name, object: entry.name,
versionID: "", versionID: "",
}, madmin.HealItemObject) }, madmin.HealItemObject)
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err)
logger.LogIf(ctx, err)
}
return return
} }
@ -254,12 +250,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
ScanMode: scanMode, ScanMode: scanMode,
Remove: healDeleteDangling, Remove: healDeleteDangling,
}); err != nil { }); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed.
// If not deleted, assume they failed. tracker.ItemsFailed++
tracker.ItemsFailed++ tracker.BytesFailed += uint64(version.Size)
tracker.BytesFailed += uint64(version.Size) logger.LogIf(ctx, err)
logger.LogIf(ctx, err)
}
} else { } else {
tracker.ItemsHealed++ tracker.ItemsHealed++
tracker.BytesDone += uint64(version.Size) tracker.BytesDone += uint64(version.Size)

View File

@ -218,15 +218,8 @@ func (m *mrfState) healRoutine() {
// Heal objects // Heal objects
for _, u := range mrfOperations { for _, u := range mrfOperations {
if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil { if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed.
// If not deleted, assume they failed. logger.LogIf(m.ctx, err)
logger.LogIf(m.ctx, err)
} else {
m.mu.Lock()
m.itemsHealed++
m.pendingItems--
m.mu.Unlock()
}
} else { } else {
m.mu.Lock() m.mu.Lock()
m.itemsHealed++ m.itemsHealed++