diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 2e1c2e5be..5273af4e7 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -715,9 +715,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem task.opts.ScanMode = madmin.HealDeepScan } - // Wait and proceed if there are active requests - waitForLowHTTPReq(opts.IOCount, opts.Sleep) - h.mutex.Lock() h.scannedItemsMap[healType]++ h.lastHealActivity = UTCNow() @@ -963,5 +960,9 @@ func (h *healSequence) healObject(bucket, object, versionID string) error { object: object, versionID: versionID, }, madmin.HealItemObject) + + // Wait and proceed if there are active requests + waitForLowHTTPReq() + return err } diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 49bc008fc..07d7d69c3 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -52,10 +52,17 @@ type healRoutine struct { // Add a new task in the tasks queue func (h *healRoutine) queueHealTask(task healTask) { - h.tasks <- task + select { + case h.tasks <- task: + default: + } } -func waitForLowHTTPReq(maxIO int, maxWait time.Duration) { +func waitForLowHTTPReq() { + globalHealConfigMu.Lock() + maxIO, maxWait := globalHealConfig.IOCount, globalHealConfig.Sleep + globalHealConfigMu.Unlock() + // No need to wait run at full speed. if maxIO <= 0 { return @@ -115,7 +122,11 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts) } } - task.responseCh <- healResult{result: res, err: err} + + select { + case task.responseCh <- healResult{result: res, err: err}: + default: + } case <-h.doneCh: return @@ -127,7 +138,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { func newHealRoutine() *healRoutine { return &healRoutine{ - tasks: make(chan healTask), + tasks: make(chan healTask, 50000), doneCh: make(chan struct{}), } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index faaa79f1b..2053360d6 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1672,7 +1672,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str cancel() return } - waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + for _, version := range fivs.Versions { if err := healObject(bucket, version.Name, version.VersionID); err != nil { errCh <- err diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 3c6a0b3ee..87435ae1e 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -41,15 +41,24 @@ func newBgHealSequence() *healSequence { reqInfo := &logger.ReqInfo{API: "BackgroundHeal"} ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo)) + globalHealConfigMu.Lock() + opts := globalHealConfig + globalHealConfigMu.Unlock() + + scanMode := madmin.HealNormalScan + if opts.Bitrot { + scanMode = madmin.HealDeepScan + } + hs := madmin.HealOpts{ // Remove objects that do not have read-quorum - Remove: true, - ScanMode: madmin.HealNormalScan, + Remove: healDeleteDangling, + ScanMode: scanMode, } return &healSequence{ - sourceCh: make(chan healSource), - respCh: make(chan healResult), + sourceCh: make(chan healSource, 50000), + respCh: make(chan healResult, 50000), startTime: UTCNow(), clientToken: bgHealingUUID, // run-background heal with reserved bucket @@ -170,6 +179,15 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn Name: pathJoin(minioMetaBucket, minioConfigPrefix), }) + globalHealConfigMu.Lock() + opts := globalHealConfig + globalHealConfigMu.Unlock() + + scanMode := madmin.HealNormalScan + if opts.Bitrot { + scanMode = madmin.HealDeepScan + } + // Heal all buckets with all objects for _, bucket := range buckets { if tracker.isHealed(bucket.Name) { @@ -188,7 +206,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn tracker.Object = "" tracker.Bucket = bucket.Name // Heal current bucket - if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { + if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{ + ScanMode: scanMode, + }); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err) } @@ -236,10 +256,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn logger.LogIf(ctx, err) return } - waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + for _, version := range fivs.Versions { if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ - ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { + ScanMode: scanMode, + Remove: healDeleteDangling, + }); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed. tracker.ItemsFailed++ @@ -256,6 +278,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn if time.Since(tracker.LastUpdate) > time.Minute { logger.LogIf(ctx, tracker.update(ctx)) } + + // Wait and proceed if there are active requests + waitForLowHTTPReq() } // How to resolve partial results. @@ -307,15 +332,19 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn func healObject(bucket, object, versionID string, scan madmin.HealScanMode) { // Get background heal sequence to send elements to heal bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if ok { - bgSeq.sourceCh <- healSource{ - bucket: bucket, - object: object, - versionID: versionID, - opts: &madmin.HealOpts{ - Remove: true, // if found dangling purge it. - ScanMode: scan, - }, - } + if !ok { + return + } + select { + case bgSeq.sourceCh <- healSource{ + bucket: bucket, + object: object, + versionID: versionID, + opts: &madmin.HealOpts{ + Remove: healDeleteDangling, // if found dangling purge it. + ScanMode: scan, + }, + }: + default: } } diff --git a/cmd/mrf.go b/cmd/mrf.go index f2c8ced47..decd2eca8 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -27,8 +27,6 @@ import ( "github.com/minio/minio/internal/logger" ) -var mrfHealingOpts = madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling} - const ( mrfInfoResetInterval = 10 * time.Second mrfOpsQueueSize = 10000 @@ -185,6 +183,20 @@ func (m *mrfState) healRoutine() { idler := time.NewTimer(mrfInfoResetInterval) defer idler.Stop() + globalHealConfigMu.Lock() + opts := globalHealConfig + globalHealConfigMu.Unlock() + + scanMode := madmin.HealNormalScan + if opts.Bitrot { + scanMode = madmin.HealDeepScan + } + + var mrfHealingOpts = madmin.HealOpts{ + ScanMode: scanMode, + Remove: healDeleteDangling, + } + for { idler.Reset(mrfInfoResetInterval) select { @@ -214,7 +226,6 @@ func (m *mrfState) healRoutine() { // Heal objects for _, u := range mrfOperations { - waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed. @@ -238,6 +249,8 @@ func (m *mrfState) healRoutine() { delete(m.pendingOps, u) m.mu.Unlock() } + + waitForLowHTTPReq() } } } diff --git a/internal/config/heal/heal.go b/internal/config/heal/heal.go index e82001783..31151b1bf 100644 --- a/internal/config/heal/heal.go +++ b/internal/config/heal/heal.go @@ -59,7 +59,7 @@ var ( }, config.KV{ Key: IOCount, - Value: "10", + Value: "100", }, }