mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: timer deadlock on expired timers (#11124)
issue was introduced in #11106 the following pattern <-t.C // timer fired if !t.Stop() { <-t.C // timer hangs } Seems to hang at the last `t.C` line, this issue happens because a fired timer cannot be Stopped() anymore and t.Stop() returns `false` leading to confusing state of usage. Refactor the code such that use timers appropriately with exact requirements in place.
This commit is contained in:
parent
cffdb01279
commit
7c9ef76f66
@ -145,9 +145,13 @@ func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) {
|
|||||||
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
||||||
// Launch clean-up routine to remove this heal sequence (after
|
// Launch clean-up routine to remove this heal sequence (after
|
||||||
// it ends) from the global state after timeout has elapsed.
|
// it ends) from the global state after timeout has elapsed.
|
||||||
|
periodicTimer := time.NewTimer(time.Minute * 5)
|
||||||
|
defer periodicTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Minute * 5):
|
case <-periodicTimer.C:
|
||||||
|
periodicTimer.Reset(time.Minute * 5)
|
||||||
now := UTCNow()
|
now := UTCNow()
|
||||||
ahs.Lock()
|
ahs.Lock()
|
||||||
for path, h := range ahs.healSeqMap {
|
for path, h := range ahs.healSeqMap {
|
||||||
|
@ -110,12 +110,17 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||||
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq *healSequence) {
|
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq *healSequence) {
|
||||||
// Perform automatic disk healing when a disk is replaced locally.
|
// Perform automatic disk healing when a disk is replaced locally.
|
||||||
|
diskCheckTimer := time.NewTimer(defaultMonitorNewDiskInterval)
|
||||||
|
defer diskCheckTimer.Stop()
|
||||||
wait:
|
wait:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(defaultMonitorNewDiskInterval):
|
case <-diskCheckTimer.C:
|
||||||
|
// Reset to next interval.
|
||||||
|
diskCheckTimer.Reset(defaultMonitorNewDiskInterval)
|
||||||
|
|
||||||
var erasureSetInZoneDisksToHeal []map[int][]StorageAPI
|
var erasureSetInZoneDisksToHeal []map[int][]StorageAPI
|
||||||
|
|
||||||
healDisks := globalBackgroundHealState.getHealLocalDisks()
|
healDisks := globalBackgroundHealState.getHealLocalDisks()
|
||||||
|
@ -99,11 +99,17 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
crawlTimer := time.NewTimer(dataCrawlStartDelay)
|
||||||
|
defer crawlTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(dataCrawlStartDelay):
|
case <-crawlTimer.C:
|
||||||
|
// Reset the timer for next cycle.
|
||||||
|
crawlTimer.Reset(dataCrawlStartDelay)
|
||||||
|
|
||||||
// Wait before starting next cycle and wait on startup.
|
// Wait before starting next cycle and wait on startup.
|
||||||
results := make(chan DataUsageInfo, 1)
|
results := make(chan DataUsageInfo, 1)
|
||||||
go storeDataUsageInBackend(ctx, objAPI, results)
|
go storeDataUsageInBackend(ctx, objAPI, results)
|
||||||
|
@ -278,14 +278,11 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-monitor.C:
|
case <-monitor.C:
|
||||||
|
// Reset the timer once fired for required interval.
|
||||||
|
monitor.Reset(monitorInterval)
|
||||||
|
|
||||||
s.connectDisks()
|
s.connectDisks()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !monitor.Stop() {
|
|
||||||
<-monitor.C
|
|
||||||
}
|
|
||||||
|
|
||||||
monitor.Reset(monitorInterval)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1369,6 +1366,26 @@ func (s *erasureSets) maintainMRFList() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func toSourceChTimed(t *time.Timer, sourceCh chan healSource, u healSource) {
|
||||||
|
t.Reset(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// No defer, as we don't know which
|
||||||
|
// case will be selected
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sourceCh <- u:
|
||||||
|
case <-t.C:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We still need to check the return value
|
||||||
|
// of Stop, because t could have fired
|
||||||
|
// between the send on sourceCh and this line.
|
||||||
|
if !t.Stop() {
|
||||||
|
<-t.C
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// healMRFRoutine monitors new disks connection, sweep the MRF list
|
// healMRFRoutine monitors new disks connection, sweep the MRF list
|
||||||
// to find objects related to the new disk that needs to be healed.
|
// to find objects related to the new disk that needs to be healed.
|
||||||
func (s *erasureSets) healMRFRoutine() {
|
func (s *erasureSets) healMRFRoutine() {
|
||||||
@ -1392,16 +1409,8 @@ func (s *erasureSets) healMRFRoutine() {
|
|||||||
|
|
||||||
// Heal objects
|
// Heal objects
|
||||||
for _, u := range mrfOperations {
|
for _, u := range mrfOperations {
|
||||||
// Send an object to be healed with a timeout
|
// Send an object to background heal
|
||||||
select {
|
toSourceChTimed(idler, bgSeq.sourceCh, u)
|
||||||
case bgSeq.sourceCh <- u:
|
|
||||||
case <-idler.C:
|
|
||||||
}
|
|
||||||
|
|
||||||
if !idler.Stop() {
|
|
||||||
<-idler.C
|
|
||||||
}
|
|
||||||
idler.Reset(100 * time.Millisecond)
|
|
||||||
|
|
||||||
s.mrfMU.Lock()
|
s.mrfMU.Lock()
|
||||||
delete(s.mrfOperations, u)
|
delete(s.mrfOperations, u)
|
||||||
|
Loading…
Reference in New Issue
Block a user