mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
fix: refactor background heal for cluster health (#10225)
This commit is contained in:
parent
8049184dcc
commit
2a9819aff8
@ -802,8 +802,15 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func getAggregatedBackgroundHealState(ctx context.Context, failOnErr bool) (madmin.BgHealState, error) {
|
||||
var bgHealStates []madmin.BgHealState
|
||||
|
||||
localHealState, ok := getLocalBackgroundHealStatus()
|
||||
if !ok {
|
||||
if failOnErr {
|
||||
return madmin.BgHealState{}, errServerNotInitialized
|
||||
}
|
||||
}
|
||||
|
||||
// Get local heal status first
|
||||
bgHealStates = append(bgHealStates, getLocalBackgroundHealStatus())
|
||||
bgHealStates = append(bgHealStates, localHealState)
|
||||
|
||||
if globalIsDistErasure {
|
||||
// Get heal status from other peers
|
||||
|
@ -112,11 +112,11 @@ func (ahs *allHealState) getHealLocalDisks() []Endpoints {
|
||||
return healLocalDisks
|
||||
}
|
||||
|
||||
func (ahs *allHealState) updateHealLocalDisks(eps []Endpoints) {
|
||||
func (ahs *allHealState) updateHealLocalDisks(healLocalDisks []Endpoints) {
|
||||
ahs.Lock()
|
||||
defer ahs.Unlock()
|
||||
|
||||
ahs.healLocalDisks = eps
|
||||
ahs.healLocalDisks = healLocalDisks
|
||||
}
|
||||
|
||||
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
||||
@ -502,6 +502,10 @@ func (h *healSequence) isQuitting() bool {
|
||||
// check if the heal sequence has ended
|
||||
func (h *healSequence) hasEnded() bool {
|
||||
h.mutex.RLock()
|
||||
// background heal never ends
|
||||
if h.clientToken == bgHealingUUID {
|
||||
return false
|
||||
}
|
||||
ended := len(h.currentStatus.Items) == 0 || h.currentStatus.Summary == healStoppedStatus || h.currentStatus.Summary == healFinishedStatus
|
||||
h.mutex.RUnlock()
|
||||
return ended
|
||||
|
@ -90,9 +90,6 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
case task.bucket == nopHeal:
|
||||
continue
|
||||
case task.bucket == SlashSeparator:
|
||||
// Quickly check if drives need healing upon start-up
|
||||
globalBackgroundHealState.updateHealLocalDisks(getLocalDisksToHeal(objAPI))
|
||||
|
||||
res, err = healDiskFormat(ctx, objAPI, task.opts)
|
||||
case task.bucket != "" && task.object == "":
|
||||
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts.DryRun, task.opts.Remove)
|
||||
@ -119,24 +116,6 @@ func newHealRoutine() *healRoutine {
|
||||
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
||||
|
||||
nh := newBgHealSequence()
|
||||
// Heal any disk format and metadata early, if possible.
|
||||
if err := nh.healDiskMeta(); err != nil {
|
||||
if newObjectLayerFn() != nil {
|
||||
// log only in situations, when object layer
|
||||
// has fully initialized.
|
||||
logger.LogIf(nh.ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
globalBackgroundHealState.LaunchNewHealSequence(nh)
|
||||
}
|
||||
|
||||
// healDiskFormat - heals format.json, return value indicates if a
|
||||
// failure error occurred.
|
||||
func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
|
@ -27,8 +27,46 @@ import (
|
||||
|
||||
const defaultMonitorNewDiskInterval = time.Minute * 3
|
||||
|
||||
func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
go monitorLocalDisksAndHeal(ctx, objAPI)
|
||||
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
initBackgroundHealing(ctx, objAPI) // start quick background healing
|
||||
|
||||
localDisksInZoneHeal := getLocalDisksToHeal(objAPI)
|
||||
globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal)
|
||||
|
||||
drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal)
|
||||
if drivesToHeal != 0 {
|
||||
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
|
||||
drivesToHeal, defaultMonitorNewDiskInterval))
|
||||
}
|
||||
|
||||
var bgSeq *healSequence
|
||||
var found bool
|
||||
|
||||
for {
|
||||
bgSeq, found = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if found {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if drivesToHeal != 0 {
|
||||
// Heal any disk format and metadata early, if possible.
|
||||
if err := bgSeq.healDiskMeta(); err != nil {
|
||||
if newObjectLayerFn() != nil {
|
||||
// log only in situations, when object layer
|
||||
// has fully initialized.
|
||||
logger.LogIf(bgSeq.ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go monitorLocalDisksAndHeal(ctx, z, drivesToHeal, localDisksInZoneHeal, bgSeq)
|
||||
}
|
||||
|
||||
func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints {
|
||||
@ -71,36 +109,18 @@ func getDrivesToHealCount(localDisksInZoneHeal []Endpoints) int {
|
||||
return drivesToHeal
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
||||
|
||||
globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence())
|
||||
}
|
||||
|
||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||
// 1. Only the concerned erasure set will be listed and healed
|
||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||
func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var bgSeq *healSequence
|
||||
var found bool
|
||||
|
||||
for {
|
||||
bgSeq, found = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if found {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
localDisksInZoneHeal := globalBackgroundHealState.getHealLocalDisks()
|
||||
|
||||
drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal)
|
||||
if drivesToHeal != 0 {
|
||||
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
|
||||
drivesToHeal, defaultMonitorNewDiskInterval))
|
||||
}
|
||||
|
||||
firstTime := true
|
||||
|
||||
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal int, localDisksInZoneHeal []Endpoints, bgSeq *healSequence) {
|
||||
// Perform automatic disk healing when a disk is replaced locally.
|
||||
for {
|
||||
select {
|
||||
@ -109,7 +129,6 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
case <-time.After(defaultMonitorNewDiskInterval):
|
||||
// heal only if new disks found.
|
||||
if drivesToHeal == 0 {
|
||||
firstTime = false
|
||||
localDisksInZoneHeal = getLocalDisksToHeal(z)
|
||||
drivesToHeal = getDrivesToHealCount(localDisksInZoneHeal)
|
||||
if drivesToHeal == 0 {
|
||||
@ -118,9 +137,10 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
continue
|
||||
}
|
||||
globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal)
|
||||
}
|
||||
|
||||
if !firstTime {
|
||||
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
|
||||
drivesToHeal))
|
||||
|
||||
// Reformat disks
|
||||
bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
|
||||
|
||||
|
@ -2099,6 +2099,10 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
|
||||
}
|
||||
}
|
||||
|
||||
if len(aggHealStateResult.HealDisks) > 0 {
|
||||
logger.LogIf(ctx, fmt.Errorf("Total drives to be healed %d", len(aggHealStateResult.HealDisks)))
|
||||
}
|
||||
|
||||
healthy := len(aggHealStateResult.HealDisks) == 0
|
||||
|
||||
return HealthResult{
|
||||
|
@ -53,6 +53,8 @@ func newBgHealSequence() *healSequence {
|
||||
respCh: make(chan healResult),
|
||||
startTime: UTCNow(),
|
||||
clientToken: bgHealingUUID,
|
||||
// run-background heal with reserved bucket
|
||||
bucket: minioReservedBucket,
|
||||
settings: hs,
|
||||
currentStatus: healSequenceStatus{
|
||||
Summary: healNotStartedStatus,
|
||||
@ -67,10 +69,10 @@ func newBgHealSequence() *healSequence {
|
||||
}
|
||||
}
|
||||
|
||||
func getLocalBackgroundHealStatus() madmin.BgHealState {
|
||||
func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
|
||||
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if !ok {
|
||||
return madmin.BgHealState{}
|
||||
return madmin.BgHealState{}, false
|
||||
}
|
||||
|
||||
var healDisks []string
|
||||
@ -85,7 +87,7 @@ func getLocalBackgroundHealStatus() madmin.BgHealState {
|
||||
LastHealActivity: bgSeq.lastHealActivity,
|
||||
HealDisks: healDisks,
|
||||
NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)),
|
||||
}
|
||||
}, true
|
||||
}
|
||||
|
||||
// healErasureSet lists and heals all objects in a specific erasure set
|
||||
@ -172,14 +174,15 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, dr
|
||||
// deepHealObject heals given object path in deep to fix bitrot.
|
||||
func deepHealObject(bucket, object, versionID string) {
|
||||
// Get background heal sequence to send elements to heal
|
||||
bgSeq, _ := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
|
||||
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if ok {
|
||||
bgSeq.sourceCh <- healSource{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: versionID,
|
||||
opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the duration to the next background healing round
|
||||
|
@ -276,6 +276,7 @@ func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []Noti
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
idx := idx
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
st, err := client.BackgroundHealStatus()
|
||||
|
@ -976,7 +976,11 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
|
||||
|
||||
ctx := newContext(r, w, "BackgroundHealStatus")
|
||||
|
||||
state := getLocalBackgroundHealStatus()
|
||||
state, ok := getLocalBackgroundHealStatus()
|
||||
if !ok {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
defer w.(http.Flusher).Flush()
|
||||
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
|
||||
|
@ -221,8 +221,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
|
||||
// Enable healing to heal drives if possible
|
||||
if globalIsErasure {
|
||||
initBackgroundHealing(ctx, newObject)
|
||||
initLocalDisksAutoHeal(ctx, newObject)
|
||||
initAutoHeal(ctx, newObject)
|
||||
}
|
||||
|
||||
// **** WARNING ****
|
||||
|
Loading…
Reference in New Issue
Block a user