allow bitrot files to be healed in MRF (#18618)

bitrot scanMode was ignored in MRF,
allow it to heal relevant content if
needed when seen as an error.
This commit is contained in:
Harshavardhana 2023-12-08 12:26:01 -08:00 committed by GitHub
parent 6f97663174
commit 196e7e072b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 23 deletions

View File

@ -810,7 +810,7 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f
// NOTE: Healing on meta is run regardless
// of any bucket being selected, this is to ensure that
// meta are always upto date and correct.
return objAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, h.settings, func(bucket, object, versionID string) error {
return objAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, h.settings, func(bucket, object, versionID string, scanMode madmin.HealScanMode) error {
if h.isQuitting() {
return errHealStopSignalled
}
@ -867,7 +867,7 @@ func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly
if !h.settings.Recursive {
if h.object != "" {
if err := h.healObject(bucket, h.object, ""); err != nil {
if err := h.healObject(bucket, h.object, "", h.settings.ScanMode); err != nil {
return err
}
}
@ -882,7 +882,7 @@ func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly
}
// healObject - heal the given object and record result
func (h *healSequence) healObject(bucket, object, versionID string) error {
func (h *healSequence) healObject(bucket, object, versionID string, scanMode madmin.HealScanMode) error {
if h.isQuitting() {
return errHealStopSignalled
}

View File

@ -68,7 +68,7 @@ func (fi FileInfo) DataShardFixed() bool {
return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true"
}
func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(string, metaCacheEntry) error) error {
func (er erasureObjects) listAndHeal(bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -101,7 +101,7 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin
minDisks: 1,
reportNotFound: false,
agreed: func(entry metaCacheEntry) {
if err := healEntry(bucket, entry); err != nil {
if err := healEntry(bucket, entry, scanMode); err != nil {
cancel()
}
},
@ -113,7 +113,7 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin
entry, _ = entries.firstFound()
}
if err := healEntry(bucket, *entry); err != nil {
if err := healEntry(bucket, *entry, scanMode); err != nil {
cancel()
return
}

View File

@ -647,8 +647,8 @@ func TestHealingDanglingObject(t *testing.T) {
}
if err = objLayer.HealObjects(ctx, bucket, "", madmin.HealOpts{Remove: true},
func(bucket, object, vid string) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{Remove: true})
func(bucket, object, vid string, scanMode madmin.HealScanMode) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{ScanMode: scanMode, Remove: true})
return err
}); err != nil {
t.Fatal(err)
@ -694,8 +694,8 @@ func TestHealingDanglingObject(t *testing.T) {
}
if err = objLayer.HealObjects(ctx, bucket, "", madmin.HealOpts{Remove: true},
func(bucket, object, vid string) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{Remove: true})
func(bucket, object, vid string, scanMode madmin.HealScanMode) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{ScanMode: scanMode, Remove: true})
return err
}); err != nil {
t.Fatal(err)
@ -743,8 +743,8 @@ func TestHealingDanglingObject(t *testing.T) {
}
if err = objLayer.HealObjects(ctx, bucket, "", madmin.HealOpts{Remove: true},
func(bucket, object, vid string) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{Remove: true})
func(bucket, object, vid string, scanMode madmin.HealScanMode) error {
_, err := objLayer.HealObject(ctx, bucket, object, vid, madmin.HealOpts{ScanMode: scanMode, Remove: true})
return err
}); err != nil {
t.Fatal(err)

View File

@ -413,6 +413,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
queued: time.Now(),
setIndex: er.setIndex,
poolIndex: er.poolIndex,
scanMode: scan,
})
})
// Healing is triggered and we have written
@ -1217,7 +1218,7 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
}
// Heal up to two versions of one object when there is disparity between disks
func healObjectVersionsDisparity(bucket string, entry metaCacheEntry) error {
func healObjectVersionsDisparity(bucket string, entry metaCacheEntry, scanMode madmin.HealScanMode) error {
if entry.isDir() {
return nil
}
@ -1241,13 +1242,13 @@ func healObjectVersionsDisparity(bucket string, entry metaCacheEntry) error {
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
go healObject(bucket, entry.name, "", madmin.HealDeepScan)
healObject(bucket, entry.name, "", madmin.HealDeepScan)
return err
}
if len(fivs.Versions) <= 2 {
for _, version := range fivs.Versions {
go healObject(bucket, entry.name, version.VersionID, madmin.HealNormalScan)
healObject(bucket, entry.name, version.VersionID, scanMode)
}
}

View File

@ -2108,10 +2108,10 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
}
// HealObjectFn closure function heals the object.
type HealObjectFn func(bucket, object, versionID string) error
type HealObjectFn func(bucket, object, versionID string, scanMode madmin.HealScanMode) error
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
healEntry := func(bucket string, entry metaCacheEntry) error {
healEntry := func(bucket string, entry metaCacheEntry, scanMode madmin.HealScanMode) error {
if entry.isDir() {
return nil
}
@ -2134,7 +2134,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
return healObjectFn(bucket, entry.name, "")
return healObjectFn(bucket, entry.name, "", scanMode)
}
if opts.Remove && !opts.DryRun {
err := z.CheckAbandonedParts(ctx, bucket, entry.name, opts)
@ -2143,7 +2143,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
}
}
for _, version := range fivs.Versions {
err := healObjectFn(bucket, version.Name, version.VersionID)
err := healObjectFn(bucket, version.Name, version.VersionID, scanMode)
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
return err
}
@ -2167,7 +2167,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
go func(idx int, set *erasureObjects) {
defer wg.Done()
errs[idx] = set.listAndHeal(bucket, prefix, healEntry)
errs[idx] = set.listAndHeal(bucket, prefix, opts.ScanMode, healEntry)
}(idx, set)
}
wg.Wait()

View File

@ -38,6 +38,7 @@ type partialOperation struct {
allVersions bool
setIndex, poolIndex int
queued time.Time
scanMode madmin.HealScanMode
}
// mrfState sncapsulates all the information
@ -103,13 +104,17 @@ func (m *mrfState) healRoutine() {
// wait on timer per heal
wait := healSleeper.Timer(context.Background())
scan := madmin.HealNormalScan
if u.scanMode != 0 {
scan = u.scanMode
}
if u.object == "" {
healBucket(u.bucket, madmin.HealNormalScan)
healBucket(u.bucket, scan)
} else {
if u.allVersions {
m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, healObjectVersionsDisparity)
m.pools.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity)
} else {
healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan)
healObject(u.bucket, u.object, u.versionID, scan)
}
}