heal: Fix heal sequences cleanup process (#6780)

The current code triggers a timeout to cleanup a heal seq from
healSeqMap, but we don't know if the user did or not launch a new
healing sequence with the same path.

Add endTime to healSequence struct and add a periodic heal-sequence
cleaner to remove heal sequences only if this latter is older than
10 minutes.
This commit is contained in:
Anis Elleuch 2018-11-16 23:59:51 +01:00 committed by kannappanr
parent 272b8003d6
commit 7b579caf68

View File

@ -113,6 +113,32 @@ func initAllHealState(isErasureMode bool) {
globalAllHealState = allHealState{
healSeqMap: make(map[string]*healSequence),
}
go globalAllHealState.periodicHealSeqsClean()
}
func (ahs *allHealState) periodicHealSeqsClean() {
// Launch clean-up routine to remove this heal sequence (after
// it ends) from the global state after timeout has elapsed.
ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := UTCNow()
ahs.Lock()
for path, h := range ahs.healSeqMap {
if h.hasEnded() && h.endTime.Add(keepHealSeqStateDuration).Before(now) {
delete(ahs.healSeqMap, path)
}
}
ahs.Unlock()
case <-globalServiceDoneCh:
// server could be restarting - need
// to exit immediately
return
}
}
}
// getHealSequence - Retrieve a heal sequence by path. The second
@ -213,41 +239,6 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) (
// Launch top-level background heal go-routine
go h.healSequenceStart()
// Launch clean-up routine to remove this heal sequence (after
// it ends) from the global state after timeout has elapsed.
go func() {
var keepStateTimeout <-chan time.Time
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
everyMinute := ticker.C
for {
select {
// Check every minute if heal sequence has ended.
case <-everyMinute:
if h.hasEnded() {
keepStateTimeout = time.After(keepHealSeqStateDuration)
everyMinute = nil
}
// This case does not fire until the heal
// sequence completes.
case <-keepStateTimeout:
// Heal sequence has ended, keep
// results state duration has elapsed,
// so purge state.
ahs.Lock()
defer ahs.Unlock()
delete(ahs.healSeqMap, h.path)
return
case <-globalServiceDoneCh:
// server could be restarting - need
// to exit immediately
return
}
}
}()
b, err := json.Marshal(madmin.HealStartSuccess{
ClientToken: h.clientToken,
ClientAddress: h.clientAddress,
@ -321,6 +312,9 @@ type healSequence struct {
// time at which heal sequence was started
startTime time.Time
// time at which heal sequence has ended
endTime time.Time
// Heal client info
clientToken, clientAddress string
@ -498,6 +492,7 @@ func (h *healSequence) healSequenceStart() {
select {
case err, ok := <-h.traverseAndHealDoneCh:
h.endTime = UTCNow()
h.currentStatus.updateLock.Lock()
defer h.currentStatus.updateLock.Unlock()
// Heal traversal is complete.
@ -511,6 +506,7 @@ func (h *healSequence) healSequenceStart() {
}
case <-h.stopSignalCh:
h.endTime = UTCNow()
h.currentStatus.updateLock.Lock()
h.currentStatus.Summary = healStoppedStatus
h.currentStatus.FailureDetail = errHealStopSignalled.Error()