mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
add healing workers support to parallelize healing (#13081)
Faster healing as well as making healing more responsive for faster scanner times. also fixes a bug introduced in #13079, newly replaced disks were not healing automatically.
This commit is contained in:
parent
27f895cf2c
commit
ed16ce9b73
@ -20,6 +20,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
@ -691,11 +692,11 @@ func (h *healSequence) logHeal(healType madmin.HealItemType) {
|
||||
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
|
||||
// Send heal request
|
||||
task := healTask{
|
||||
bucket: source.bucket,
|
||||
object: source.object,
|
||||
versionID: source.versionID,
|
||||
opts: h.settings,
|
||||
responseCh: h.respCh,
|
||||
bucket: source.bucket,
|
||||
object: source.object,
|
||||
versionID: source.versionID,
|
||||
opts: h.settings,
|
||||
respCh: h.respCh,
|
||||
}
|
||||
if source.opts != nil {
|
||||
task.opts = *source.opts
|
||||
@ -707,11 +708,18 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
||||
h.lastHealActivity = UTCNow()
|
||||
h.mutex.Unlock()
|
||||
|
||||
globalBackgroundHealRoutine.queueHealTask(task)
|
||||
select {
|
||||
case globalBackgroundHealRoutine.tasks <- task:
|
||||
case <-h.ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case res := <-h.respCh:
|
||||
if !h.reportProgress {
|
||||
if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal
|
||||
return nil
|
||||
}
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted, we should ignore this object and
|
||||
// return the error and not calculate this object
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
)
|
||||
@ -33,7 +34,7 @@ type healTask struct {
|
||||
versionID string
|
||||
opts madmin.HealOpts
|
||||
// Healing response will be sent here
|
||||
responseCh chan healResult
|
||||
respCh chan healResult
|
||||
}
|
||||
|
||||
// healResult represents a healing result with a possible error
|
||||
@ -44,13 +45,8 @@ type healResult struct {
|
||||
|
||||
// healRoutine receives heal tasks, to heal buckets, objects and format.json
|
||||
type healRoutine struct {
|
||||
tasks chan healTask
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// Add a new task in the tasks queue
|
||||
func (h *healRoutine) queueHealTask(task healTask) {
|
||||
h.tasks <- task
|
||||
tasks chan healTask
|
||||
workers int
|
||||
}
|
||||
|
||||
func systemIO() int {
|
||||
@ -68,8 +64,18 @@ func waitForLowHTTPReq() {
|
||||
globalHealConfig.Wait(currentIO, systemIO)
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
for i := 0; i < globalBackgroundHealRoutine.workers; i++ {
|
||||
go globalBackgroundHealRoutine.AddWorker(ctx, objAPI)
|
||||
}
|
||||
|
||||
globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence(), objAPI)
|
||||
}
|
||||
|
||||
// Wait for heal requests and process them
|
||||
func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) {
|
||||
for {
|
||||
select {
|
||||
case task, ok := <-h.tasks:
|
||||
@ -81,6 +87,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
var err error
|
||||
switch task.bucket {
|
||||
case nopHeal:
|
||||
task.respCh <- healResult{err: errSkipFile}
|
||||
continue
|
||||
case SlashSeparator:
|
||||
res, err = healDiskFormat(ctx, objAPI, task.opts)
|
||||
@ -92,10 +99,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
}
|
||||
|
||||
task.responseCh <- healResult{result: res, err: err}
|
||||
|
||||
case <-h.doneCh:
|
||||
return
|
||||
task.respCh <- healResult{result: res, err: err}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -103,9 +107,13 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
|
||||
func newHealRoutine() *healRoutine {
|
||||
workers := runtime.GOMAXPROCS(0) / 2
|
||||
if workers == 0 {
|
||||
workers = 4
|
||||
}
|
||||
return &healRoutine{
|
||||
tasks: make(chan healTask),
|
||||
doneCh: make(chan struct{}),
|
||||
tasks: make(chan healTask),
|
||||
workers: workers,
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -308,14 +308,6 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
|
||||
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
||||
|
||||
globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence(), objAPI)
|
||||
}
|
||||
|
||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||
// 1. Only the concerned erasure set will be listed and healed
|
||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||
|
@ -674,13 +674,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if !ok {
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If no errors, queue it for healing.
|
||||
// check if we can get one entry atleast
|
||||
// proceed to heal nonetheless.
|
||||
entry, _ = entries.firstFound()
|
||||
}
|
||||
|
||||
|
@ -234,17 +234,26 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
fivs, err := entry.fileInfoVersions(bucket.Name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
err := bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket.Name,
|
||||
object: entry.name,
|
||||
versionID: "",
|
||||
}, madmin.HealItemObject)
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, version := range fivs.Versions {
|
||||
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{
|
||||
ScanMode: scanMode,
|
||||
Remove: healDeleteDangling,
|
||||
}); err != nil {
|
||||
if _, err := er.HealObject(ctx, bucket.Name, version.Name,
|
||||
version.VersionID, madmin.HealOpts{
|
||||
ScanMode: scanMode,
|
||||
Remove: healDeleteDangling,
|
||||
}); err != nil {
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
// If not deleted, assume they failed.
|
||||
tracker.ItemsFailed++
|
||||
@ -283,9 +292,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||
agreed: healEntry,
|
||||
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if ok {
|
||||
healEntry(*entry)
|
||||
if !ok {
|
||||
// check if we can get one entry atleast
|
||||
// proceed to heal nonetheless.
|
||||
entry, _ = entries.firstFound()
|
||||
}
|
||||
healEntry(*entry)
|
||||
},
|
||||
finished: nil,
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user