mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: reuser timers in erasure set hotpaths (#11106)
reuser timers in - connectDisks() monitoring - healMRFRoutine() channel timeouts
This commit is contained in:
parent
cce5d7152a
commit
b390a2a0b9
@ -245,10 +245,13 @@ func (s *erasureSets) connectDisks() {
|
|||||||
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
|
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
|
||||||
s.erasureDisksMu.Unlock()
|
s.erasureDisksMu.Unlock()
|
||||||
go func(setIndex int) {
|
go func(setIndex int) {
|
||||||
|
idler := time.NewTimer(100 * time.Millisecond)
|
||||||
|
defer idler.Stop()
|
||||||
|
|
||||||
// Send a new disk connect event with a timeout
|
// Send a new disk connect event with a timeout
|
||||||
select {
|
select {
|
||||||
case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}:
|
case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}:
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-idler.C:
|
||||||
}
|
}
|
||||||
}(setIndex)
|
}(setIndex)
|
||||||
}(endpoint)
|
}(endpoint)
|
||||||
@ -267,13 +270,22 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
|
|||||||
// Pre-emptively connect the disks if possible.
|
// Pre-emptively connect the disks if possible.
|
||||||
s.connectDisks()
|
s.connectDisks()
|
||||||
|
|
||||||
|
monitor := time.NewTimer(monitorInterval)
|
||||||
|
defer monitor.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(monitorInterval):
|
case <-monitor.C:
|
||||||
s.connectDisks()
|
s.connectDisks()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !monitor.Stop() {
|
||||||
|
<-monitor.C
|
||||||
|
}
|
||||||
|
|
||||||
|
monitor.Reset(monitorInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1363,6 +1375,9 @@ func (s *erasureSets) healMRFRoutine() {
|
|||||||
// Wait until background heal state is initialized
|
// Wait until background heal state is initialized
|
||||||
bgSeq := mustGetHealSequence(GlobalContext)
|
bgSeq := mustGetHealSequence(GlobalContext)
|
||||||
|
|
||||||
|
idler := time.NewTimer(100 * time.Millisecond)
|
||||||
|
defer idler.Stop()
|
||||||
|
|
||||||
for e := range s.disksConnectEvent {
|
for e := range s.disksConnectEvent {
|
||||||
// Get the list of objects related the er.set
|
// Get the list of objects related the er.set
|
||||||
// to which the connected disk belongs.
|
// to which the connected disk belongs.
|
||||||
@ -1380,9 +1395,14 @@ func (s *erasureSets) healMRFRoutine() {
|
|||||||
// Send an object to be healed with a timeout
|
// Send an object to be healed with a timeout
|
||||||
select {
|
select {
|
||||||
case bgSeq.sourceCh <- u:
|
case bgSeq.sourceCh <- u:
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-idler.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !idler.Stop() {
|
||||||
|
<-idler.C
|
||||||
|
}
|
||||||
|
idler.Reset(100 * time.Millisecond)
|
||||||
|
|
||||||
s.mrfMU.Lock()
|
s.mrfMU.Lock()
|
||||||
delete(s.mrfOperations, u)
|
delete(s.mrfOperations, u)
|
||||||
s.mrfMU.Unlock()
|
s.mrfMU.Unlock()
|
||||||
|
@ -87,7 +87,8 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case <-time.After(250 * time.Millisecond):
|
default:
|
||||||
|
time.Sleep(250 * time.Millisecond)
|
||||||
}
|
}
|
||||||
objAPI = newObjectLayerFn()
|
objAPI = newObjectLayerFn()
|
||||||
if objAPI == nil {
|
if objAPI == nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user