mirror of https://github.com/minio/minio.git
Do lockless last minute latency metrics (#17576)
Collect metrics in one second and accumulate lockless before sending upstream.
This commit is contained in:
parent
0bc34952eb
commit
6efcf9c982
|
@ -90,16 +90,6 @@ func (a *AccElem) add(dur time.Duration) {
|
|||
a.N++
|
||||
}
|
||||
|
||||
// Add a duration to a single element.
|
||||
func (a *AccElem) addSize(dur time.Duration, sz int64) {
|
||||
if dur < 0 {
|
||||
dur = 0
|
||||
}
|
||||
a.Total += int64(dur)
|
||||
a.Size += sz
|
||||
a.N++
|
||||
}
|
||||
|
||||
// Merge b into a.
|
||||
func (a *AccElem) merge(b AccElem) {
|
||||
a.N += b.N
|
||||
|
@ -156,11 +146,10 @@ func (l *lastMinuteLatency) add(t time.Duration) {
|
|||
}
|
||||
|
||||
// Add a new duration data
|
||||
func (l *lastMinuteLatency) addSize(t time.Duration, sz int64) {
|
||||
sec := time.Now().Unix()
|
||||
func (l *lastMinuteLatency) addAll(sec int64, a AccElem) {
|
||||
l.forwardTo(sec)
|
||||
winIdx := sec % 60
|
||||
l.Totals[winIdx].addSize(t, sz)
|
||||
l.Totals[winIdx].merge(a)
|
||||
l.LastSec = sec
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -105,28 +106,57 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
|||
return m.(DiskMetrics)
|
||||
}
|
||||
|
||||
// lockedLastMinuteLatency accumulates totals lockless for each second.
|
||||
type lockedLastMinuteLatency struct {
|
||||
sync.Mutex
|
||||
cachedSec int64
|
||||
cached atomic.Pointer[AccElem]
|
||||
mu sync.Mutex
|
||||
init sync.Once
|
||||
lastMinuteLatency
|
||||
}
|
||||
|
||||
func (e *lockedLastMinuteLatency) add(value time.Duration) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
e.lastMinuteLatency.add(value)
|
||||
e.addSize(value, 0)
|
||||
}
|
||||
|
||||
// addSize will add a duration and size.
|
||||
func (e *lockedLastMinuteLatency) addSize(value time.Duration, sz int64) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
e.lastMinuteLatency.addSize(value, sz)
|
||||
// alloc on every call, so we have a clean entry to swap in.
|
||||
t := time.Now().Unix()
|
||||
e.init.Do(func() {
|
||||
e.cached.Store(&AccElem{})
|
||||
atomic.StoreInt64(&e.cachedSec, t)
|
||||
})
|
||||
acc := e.cached.Load()
|
||||
if lastT := atomic.LoadInt64(&e.cachedSec); lastT != t {
|
||||
// Check if lastT was changed by someone else.
|
||||
if atomic.CompareAndSwapInt64(&e.cachedSec, lastT, t) {
|
||||
// Now we swap in a new.
|
||||
newAcc := &AccElem{}
|
||||
old := e.cached.Swap(newAcc)
|
||||
var a AccElem
|
||||
a.Size = atomic.LoadInt64(&old.Size)
|
||||
a.Total = atomic.LoadInt64(&old.Total)
|
||||
a.N = atomic.LoadInt64(&old.N)
|
||||
e.mu.Lock()
|
||||
e.lastMinuteLatency.addAll(t-1, a)
|
||||
e.mu.Unlock()
|
||||
acc = newAcc
|
||||
} else {
|
||||
// We may be able to grab the new accumulator by yielding.
|
||||
runtime.Gosched()
|
||||
acc = e.cached.Load()
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(&acc.N, 1)
|
||||
atomic.AddInt64(&acc.Total, int64(value))
|
||||
atomic.AddInt64(&acc.Size, sz)
|
||||
}
|
||||
|
||||
// total returns the total call count and latency for the last minute.
|
||||
func (e *lockedLastMinuteLatency) total() AccElem {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.lastMinuteLatency.getTotal()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue