mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: timer usage across codebase (#14935)
it seems in some places we have been wrongly using the timer.Reset() function, nicely exposed by an example shared by @donatello https://go.dev/play/p/qoF71_D1oXD this PR fixes all the usage comprehensively
This commit is contained in:
parent
2dc8ac1e62
commit
6cfb1cb6fd
@ -654,12 +654,10 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
globalProfilerMu.Unlock()
|
globalProfilerMu.Unlock()
|
||||||
|
|
||||||
timer := time.NewTimer(duration)
|
timer := time.NewTimer(duration)
|
||||||
|
defer timer.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
for k, v := range globalProfiler {
|
for k, v := range globalProfiler {
|
||||||
v.Stop()
|
v.Stop()
|
||||||
delete(globalProfiler, k)
|
delete(globalProfiler, k)
|
||||||
|
@ -194,7 +194,6 @@ func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-periodicTimer.C:
|
case <-periodicTimer.C:
|
||||||
periodicTimer.Reset(time.Minute * 5)
|
|
||||||
now := UTCNow()
|
now := UTCNow()
|
||||||
ahs.Lock()
|
ahs.Lock()
|
||||||
for path, h := range ahs.healSeqMap {
|
for path, h := range ahs.healSeqMap {
|
||||||
@ -203,6 +202,8 @@ func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ahs.Unlock()
|
ahs.Unlock()
|
||||||
|
|
||||||
|
periodicTimer.Reset(time.Minute * 5)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// server could be restarting - need
|
// server could be restarting - need
|
||||||
// to exit immediately
|
// to exit immediately
|
||||||
@ -581,12 +582,7 @@ func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
|
|||||||
// heal-results in memory and the client has not consumed it
|
// heal-results in memory and the client has not consumed it
|
||||||
// for too long.
|
// for too long.
|
||||||
unconsumedTimer := time.NewTimer(healUnconsumedTimeout)
|
unconsumedTimer := time.NewTimer(healUnconsumedTimeout)
|
||||||
defer func() {
|
defer unconsumedTimer.Stop()
|
||||||
// stop the timeout timer so it is garbage collected.
|
|
||||||
if !unconsumedTimer.Stop() {
|
|
||||||
<-unconsumedTimer.C
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var itemsLen int
|
var itemsLen int
|
||||||
for {
|
for {
|
||||||
|
@ -312,9 +312,6 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-diskCheckTimer.C:
|
case <-diskCheckTimer.C:
|
||||||
// Reset to next interval.
|
|
||||||
diskCheckTimer.Reset(defaultMonitorNewDiskInterval)
|
|
||||||
|
|
||||||
var erasureSetInPoolDisksToHeal []map[int][]StorageAPI
|
var erasureSetInPoolDisksToHeal []map[int][]StorageAPI
|
||||||
|
|
||||||
healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints()
|
healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints()
|
||||||
@ -448,6 +445,9 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// Reset for next interval.
|
||||||
|
diskCheckTimer.Reset(defaultMonitorNewDiskInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -161,7 +161,7 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
|
|||||||
|
|
||||||
// load replication metrics at cluster start from initial data usage
|
// load replication metrics at cluster start from initial data usage
|
||||||
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
|
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
|
||||||
rTimer := time.NewTimer(time.Minute * 1)
|
rTimer := time.NewTimer(time.Minute)
|
||||||
defer rTimer.Stop()
|
defer rTimer.Stop()
|
||||||
var (
|
var (
|
||||||
dui DataUsageInfo
|
dui DataUsageInfo
|
||||||
@ -169,6 +169,7 @@ func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
|
|||||||
)
|
)
|
||||||
outer:
|
outer:
|
||||||
for {
|
for {
|
||||||
|
rTimer.Reset(time.Minute)
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
@ -1926,7 +1926,6 @@ func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-resyncTimer.C:
|
case <-resyncTimer.C:
|
||||||
resyncTimer.Reset(resyncTimeInterval)
|
|
||||||
now := UTCNow()
|
now := UTCNow()
|
||||||
p.resyncState.RLock()
|
p.resyncState.RLock()
|
||||||
for bucket, brs := range p.resyncState.statusMap {
|
for bucket, brs := range p.resyncState.statusMap {
|
||||||
@ -1947,6 +1946,8 @@ func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.resyncState.RUnlock()
|
p.resyncState.RUnlock()
|
||||||
|
|
||||||
|
resyncTimer.Reset(resyncTimeInterval)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// server could be restarting - need
|
// server could be restarting - need
|
||||||
// to exit immediately
|
// to exit immediately
|
||||||
|
@ -197,9 +197,6 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-scannerTimer.C:
|
case <-scannerTimer.C:
|
||||||
// Reset the timer for next cycle.
|
|
||||||
scannerTimer.Reset(scannerCycle.Get())
|
|
||||||
|
|
||||||
if intDataUpdateTracker.debug {
|
if intDataUpdateTracker.debug {
|
||||||
console.Debugln("starting scanner cycle")
|
console.Debugln("starting scanner cycle")
|
||||||
}
|
}
|
||||||
@ -232,6 +229,9 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
|
|||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset the timer for next cycle.
|
||||||
|
scannerTimer.Reset(scannerCycle.Get())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1630,8 +1630,6 @@ func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// Reset for the next interval
|
|
||||||
timer.Reset(cacheStaleUploadCleanupInterval)
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir), func(shaDir string, typ os.FileMode) error {
|
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir), func(shaDir string, typ os.FileMode) error {
|
||||||
return readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
return readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
||||||
@ -1662,6 +1660,9 @@ func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Reset for the next interval
|
||||||
|
timer.Reset(cacheStaleUploadCleanupInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,14 +301,14 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-monitor.C:
|
case <-monitor.C:
|
||||||
// Reset the timer once fired for required interval.
|
|
||||||
monitor.Reset(monitorInterval)
|
|
||||||
|
|
||||||
if serverDebugLog {
|
if serverDebugLog {
|
||||||
console.Debugln("running disk monitoring")
|
console.Debugln("running disk monitoring")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.connectDisks()
|
s.connectDisks()
|
||||||
|
|
||||||
|
// Reset the timer for next interval
|
||||||
|
monitor.Reset(monitorInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -504,9 +504,6 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// Reset for the next interval
|
|
||||||
timer.Reset(globalAPIConfig.getDeleteCleanupInterval())
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, set := range s.sets {
|
for _, set := range s.sets {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -519,6 +516,9 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context) {
|
|||||||
}(set)
|
}(set)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// Reset for the next interval
|
||||||
|
timer.Reset(globalAPIConfig.getDeleteCleanupInterval())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -532,9 +532,6 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// Reset for the next interval
|
|
||||||
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, set := range s.sets {
|
for _, set := range s.sets {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -547,6 +544,9 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
|
|||||||
}(set)
|
}(set)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// Reset for the next interval
|
||||||
|
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -887,8 +887,6 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-bgAppendTmpCleaner.C:
|
case <-bgAppendTmpCleaner.C:
|
||||||
bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval)
|
|
||||||
|
|
||||||
foundUploadIDs := fs.getAllUploadIDs(ctx)
|
foundUploadIDs := fs.getAllUploadIDs(ctx)
|
||||||
|
|
||||||
// Remove background append map from the memory
|
// Remove background append map from the memory
|
||||||
@ -915,10 +913,8 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bgAppendTmpCleaner.Reset(bgAppendsCleanupInterval)
|
||||||
case <-expiryUploadsTimer.C:
|
case <-expiryUploadsTimer.C:
|
||||||
// Reset for the next interval
|
|
||||||
expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
|
||||||
|
|
||||||
expiry := globalAPIConfig.getStaleUploadsExpiry()
|
expiry := globalAPIConfig.getStaleUploadsExpiry()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
@ -944,6 +940,9 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
|||||||
fs.appendFileMapMu.Unlock()
|
fs.appendFileMapMu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset for the next interval
|
||||||
|
expiryUploadsTimer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -239,10 +239,10 @@ func lockMaintenance(ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-lkTimer.C:
|
case <-lkTimer.C:
|
||||||
|
globalLockServer.expireOldLocks(lockValidityDuration)
|
||||||
|
|
||||||
// Reset the timer for next cycle.
|
// Reset the timer for next cycle.
|
||||||
lkTimer.Reset(lockMaintenanceInterval)
|
lkTimer.Reset(lockMaintenanceInterval)
|
||||||
|
|
||||||
globalLockServer.expireOldLocks(lockValidityDuration)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3330,10 +3330,10 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object
|
|||||||
defer healTimer.Stop()
|
defer healTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
|
||||||
case <-healTimer.C:
|
|
||||||
healTimer.Reset(siteHealTimeInterval)
|
healTimer.Reset(siteHealTimeInterval)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-healTimer.C:
|
||||||
c.RLock()
|
c.RLock()
|
||||||
enabled := c.enabled
|
enabled := c.enabled
|
||||||
c.RUnlock()
|
c.RUnlock()
|
||||||
|
@ -255,12 +255,12 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
|
|||||||
defer refreshTimer.Stop()
|
defer refreshTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
refreshTimer.Reset(dm.refreshInterval)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-refreshTimer.C:
|
case <-refreshTimer.C:
|
||||||
refreshTimer.Reset(dm.refreshInterval)
|
|
||||||
|
|
||||||
noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
|
noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
|
||||||
if err == nil && noQuorum {
|
if err == nil && noQuorum {
|
||||||
// Clean the lock locally and in remote nodes
|
// Clean the lock locally and in remote nodes
|
||||||
|
Loading…
Reference in New Issue
Block a user