mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: background heal to call HealFormat only if needed (#9491)
In large setups this avoids unnecessary data transfer across nodes and potential locks. This PR also optimizes heal result channel, which should be avoided for each queueHealTask as its expensive to create/close channels for large number of objects.
This commit is contained in:
parent
5205c9591f
commit
71ce63f79c
@ -308,8 +308,8 @@ func (ahs *allHealState) PopHealStatusJSON(path string,
|
||||
|
||||
// healSource denotes single entity and heal option.
|
||||
type healSource struct {
|
||||
path string // entity path (format, buckets, objects) to heal
|
||||
opts *madmin.HealOpts // optional heal option overrides default setting
|
||||
path string // entity path (format, buckets, objects) to heal
|
||||
opts madmin.HealOpts // optional heal option overrides default setting
|
||||
}
|
||||
|
||||
// healSequence - state for each heal sequence initiated on the
|
||||
@ -321,9 +321,12 @@ type healSequence struct {
|
||||
// path is just pathJoin(bucket, objPrefix)
|
||||
path string
|
||||
|
||||
// List of entities (format, buckets, objects) to heal
|
||||
// A channel of entities (format, buckets, objects) to heal
|
||||
sourceCh chan healSource
|
||||
|
||||
// A channel of entities with heal result
|
||||
respCh chan healResult
|
||||
|
||||
// Report healing progress
|
||||
reportProgress bool
|
||||
|
||||
@ -385,6 +388,7 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
|
||||
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
||||
|
||||
return &healSequence{
|
||||
respCh: make(chan healResult),
|
||||
bucket: bucket,
|
||||
objPrefix: objPrefix,
|
||||
path: pathJoin(bucket, objPrefix),
|
||||
@ -636,53 +640,58 @@ func (h *healSequence) healSequenceStart() {
|
||||
}
|
||||
|
||||
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
|
||||
var respCh = make(chan healResult)
|
||||
defer close(respCh)
|
||||
// Send heal request
|
||||
task := healTask{
|
||||
path: source.path,
|
||||
responseCh: respCh,
|
||||
opts: h.settings,
|
||||
responseCh: h.respCh,
|
||||
}
|
||||
if source.opts != nil {
|
||||
task.opts = *source.opts
|
||||
if !source.opts.Equal(h.settings) {
|
||||
task.opts = source.opts
|
||||
}
|
||||
globalBackgroundHealRoutine.queueHealTask(task)
|
||||
// Wait for answer and push result to the client
|
||||
res := <-respCh
|
||||
if !h.reportProgress {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
|
||||
// Progress is not reported in case of background heal processing.
|
||||
// Instead we increment relevant counter based on the heal result
|
||||
// for prometheus reporting.
|
||||
if res.err != nil && !isErrObjectNotFound(res.err) {
|
||||
for _, d := range res.result.After.Drives {
|
||||
// For failed items we report the endpoint and drive state
|
||||
// This will help users take corrective actions for drives
|
||||
h.healFailedItemsMap[d.Endpoint+","+d.State]++
|
||||
select {
|
||||
case res := <-h.respCh:
|
||||
if !h.reportProgress {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
|
||||
// Progress is not reported in case of background heal processing.
|
||||
// Instead we increment relevant counter based on the heal result
|
||||
// for prometheus reporting.
|
||||
if res.err != nil && !isErrObjectNotFound(res.err) {
|
||||
for _, d := range res.result.After.Drives {
|
||||
// For failed items we report the endpoint and drive state
|
||||
// This will help users take corrective actions for drives
|
||||
h.healFailedItemsMap[d.Endpoint+","+d.State]++
|
||||
}
|
||||
} else {
|
||||
// Only object type reported for successful healing
|
||||
h.healedItemsMap[res.result.Type]++
|
||||
}
|
||||
} else {
|
||||
// Only object type reported for successful healing
|
||||
h.healedItemsMap[res.result.Type]++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
res.result.Type = healType
|
||||
if res.err != nil {
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted, we should ignore this object and return success.
|
||||
if isErrObjectNotFound(res.err) {
|
||||
return nil
|
||||
}
|
||||
// Only report object error
|
||||
if healType != madmin.HealItemObject {
|
||||
return res.err
|
||||
res.result.Type = healType
|
||||
if res.err != nil {
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted, we should ignore this object and return success.
|
||||
if isErrObjectNotFound(res.err) {
|
||||
return nil
|
||||
}
|
||||
// Only report object error
|
||||
if healType != madmin.HealItemObject {
|
||||
return res.err
|
||||
}
|
||||
res.result.Detail = res.err.Error()
|
||||
}
|
||||
res.result.Detail = res.err.Error()
|
||||
return h.pushHealResultItem(res.result)
|
||||
case <-h.ctx.Done():
|
||||
return nil
|
||||
case <-h.traverseAndHealDoneCh:
|
||||
return nil
|
||||
}
|
||||
return h.pushHealResultItem(res.result)
|
||||
|
||||
}
|
||||
|
||||
func (h *healSequence) healItemsFromSourceCh() error {
|
||||
|
@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@ -89,10 +88,10 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
case bucket != "" && object != "":
|
||||
res, err = objAPI.HealObject(ctx, bucket, object, task.opts)
|
||||
}
|
||||
ObjectPathUpdated(path.Join(bucket, object))
|
||||
if task.responseCh != nil {
|
||||
task.responseCh <- healResult{result: res, err: err}
|
||||
if task.path != slashSeparator && task.path != nopHeal {
|
||||
ObjectPathUpdated(task.path)
|
||||
}
|
||||
task.responseCh <- healResult{result: res, err: err}
|
||||
case <-h.doneCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
|
@ -57,6 +57,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
case <-time.After(defaultMonitorNewDiskInterval):
|
||||
// Attempt a heal as the server starts-up first.
|
||||
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
|
||||
var healNewDisks bool
|
||||
for i, ep := range globalEndpoints {
|
||||
localDisksToHeal := Endpoints{}
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
@ -74,6 +75,12 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
continue
|
||||
}
|
||||
localDisksInZoneHeal[i] = localDisksToHeal
|
||||
healNewDisks = true
|
||||
}
|
||||
|
||||
// Reformat disks only if needed.
|
||||
if !healNewDisks {
|
||||
continue
|
||||
}
|
||||
|
||||
// Reformat disks
|
||||
|
@ -48,6 +48,7 @@ func newBgHealSequence(numDisks int) *healSequence {
|
||||
|
||||
return &healSequence{
|
||||
sourceCh: make(chan healSource),
|
||||
respCh: make(chan healResult),
|
||||
startTime: UTCNow(),
|
||||
clientToken: bgHealingUUID,
|
||||
settings: hs,
|
||||
@ -125,7 +126,7 @@ func deepHealObject(objectPath string) {
|
||||
|
||||
bgSeq.sourceCh <- healSource{
|
||||
path: objectPath,
|
||||
opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan},
|
||||
opts: madmin.HealOpts{ScanMode: madmin.HealDeepScan},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,20 @@ type HealOpts struct {
|
||||
ScanMode HealScanMode `json:"scanMode"`
|
||||
}
|
||||
|
||||
// Equal returns true if no is same as o.
|
||||
func (o HealOpts) Equal(no HealOpts) bool {
|
||||
if o.Recursive != no.Recursive {
|
||||
return false
|
||||
}
|
||||
if o.DryRun != no.DryRun {
|
||||
return false
|
||||
}
|
||||
if o.Remove != no.Remove {
|
||||
return false
|
||||
}
|
||||
return o.ScanMode == no.ScanMode
|
||||
}
|
||||
|
||||
// HealStartSuccess - holds information about a successfully started
|
||||
// heal operation
|
||||
type HealStartSuccess struct {
|
||||
|
Loading…
Reference in New Issue
Block a user