mirror of
https://github.com/minio/minio.git
synced 2025-05-21 09:33:50 -04:00
Add event counters (#18232)
Export metric for global events sent and skipped for the lifetime of the server.
This commit is contained in:
parent
a66a7f3e97
commit
128256e3ab
@ -2446,6 +2446,26 @@ func getNotificationMetrics() *MetricsGroup {
|
|||||||
},
|
},
|
||||||
Value: float64(nstats.CurrentSendCalls),
|
Value: float64(nstats.CurrentSendCalls),
|
||||||
})
|
})
|
||||||
|
metrics = append(metrics, Metric{
|
||||||
|
Description: MetricDescription{
|
||||||
|
Namespace: minioNamespace,
|
||||||
|
Subsystem: notifySubsystem,
|
||||||
|
Name: "events_skipped_total",
|
||||||
|
Help: "Events that were skipped due to full queue",
|
||||||
|
Type: counterMetric,
|
||||||
|
},
|
||||||
|
Value: float64(nstats.EventsSkipped),
|
||||||
|
})
|
||||||
|
metrics = append(metrics, Metric{
|
||||||
|
Description: MetricDescription{
|
||||||
|
Namespace: minioNamespace,
|
||||||
|
Subsystem: notifySubsystem,
|
||||||
|
Name: "events_sent_total",
|
||||||
|
Help: "Total number of events sent since start",
|
||||||
|
Type: counterMetric,
|
||||||
|
},
|
||||||
|
Value: float64(nstats.TotalEvents),
|
||||||
|
})
|
||||||
for _, st := range nstats.TargetStats {
|
for _, st := range nstats.TargetStats {
|
||||||
metrics = append(metrics, Metric{
|
metrics = append(metrics, Metric{
|
||||||
Description: MetricDescription{
|
Description: MetricDescription{
|
||||||
|
@ -50,6 +50,8 @@ type TargetStore interface {
|
|||||||
type TargetStats struct {
|
type TargetStats struct {
|
||||||
// CurrentSendCalls is the number of concurrent async Send calls to all targets
|
// CurrentSendCalls is the number of concurrent async Send calls to all targets
|
||||||
CurrentSendCalls int64
|
CurrentSendCalls int64
|
||||||
|
TotalEvents int64
|
||||||
|
EventsSkipped int64
|
||||||
|
|
||||||
TargetStats map[string]TargetStat
|
TargetStats map[string]TargetStat
|
||||||
}
|
}
|
||||||
@ -64,6 +66,8 @@ type TargetStat struct {
|
|||||||
type TargetList struct {
|
type TargetList struct {
|
||||||
// The number of concurrent async Send calls to all targets
|
// The number of concurrent async Send calls to all targets
|
||||||
currentSendCalls int64
|
currentSendCalls int64
|
||||||
|
totalEvents int64
|
||||||
|
eventsSkipped int64
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
targets map[TargetID]Target
|
targets map[TargetID]Target
|
||||||
@ -161,6 +165,7 @@ func (list *TargetList) TargetMap() map[TargetID]Target {
|
|||||||
// Send - sends events to targets identified by target IDs.
|
// Send - sends events to targets identified by target IDs.
|
||||||
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) {
|
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) {
|
||||||
if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls {
|
if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls {
|
||||||
|
atomic.AddInt64(&list.eventsSkipped, 1)
|
||||||
err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls)
|
err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls)
|
||||||
for id := range targetIDset {
|
for id := range targetIDset {
|
||||||
resCh <- TargetIDResult{ID: id, Err: err}
|
resCh <- TargetIDResult{ID: id, Err: err}
|
||||||
@ -171,9 +176,7 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<-
|
|||||||
list.send(event, targetIDset, resCh)
|
list.send(event, targetIDset, resCh)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go func() {
|
go list.send(event, targetIDset, resCh)
|
||||||
list.send(event, targetIDset, resCh)
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
|
func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
|
||||||
@ -199,6 +202,7 @@ func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<-
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
atomic.AddInt64(&list.totalEvents, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats returns stats for targets.
|
// Stats returns stats for targets.
|
||||||
@ -208,6 +212,9 @@ func (list *TargetList) Stats() TargetStats {
|
|||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
t.CurrentSendCalls = atomic.LoadInt64(&list.currentSendCalls)
|
t.CurrentSendCalls = atomic.LoadInt64(&list.currentSendCalls)
|
||||||
|
t.EventsSkipped = atomic.LoadInt64(&list.eventsSkipped)
|
||||||
|
t.TotalEvents = atomic.LoadInt64(&list.totalEvents)
|
||||||
|
|
||||||
list.RLock()
|
list.RLock()
|
||||||
defer list.RUnlock()
|
defer list.RUnlock()
|
||||||
t.TargetStats = make(map[string]TargetStat, len(list.targets))
|
t.TargetStats = make(map[string]TargetStat, len(list.targets))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user