From 128256e3abdbe092a895ba14758f95a4ca086850 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 12 Oct 2023 15:39:22 -0700 Subject: [PATCH] Add event counters (#18232) Export metric for global events sent and skipped for the lifetime of the server. --- cmd/metrics-v2.go | 20 ++++++++++++++++++++ internal/event/targetlist.go | 13 ++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 2ac65ba19..d2e16e63b 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -2446,6 +2446,26 @@ func getNotificationMetrics() *MetricsGroup { }, Value: float64(nstats.CurrentSendCalls), }) + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: notifySubsystem, + Name: "events_skipped_total", + Help: "Events that were skipped due to full queue", + Type: counterMetric, + }, + Value: float64(nstats.EventsSkipped), + }) + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: notifySubsystem, + Name: "events_sent_total", + Help: "Total number of events sent since start", + Type: counterMetric, + }, + Value: float64(nstats.TotalEvents), + }) for _, st := range nstats.TargetStats { metrics = append(metrics, Metric{ Description: MetricDescription{ diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index b30b9f915..8755f613f 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -50,6 +50,8 @@ type TargetStore interface { type TargetStats struct { // CurrentSendCalls is the number of concurrent async Send calls to all targets CurrentSendCalls int64 + TotalEvents int64 + EventsSkipped int64 TargetStats map[string]TargetStat } @@ -64,6 +66,8 @@ type TargetStat struct { type TargetList struct { // The number of concurrent async Send calls to all targets currentSendCalls int64 + totalEvents int64 + eventsSkipped int64 sync.RWMutex targets map[TargetID]Target @@ -161,6 +165,7 @@ func (list *TargetList) TargetMap() map[TargetID]Target { // Send - sends events to targets identified by target IDs. func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) { if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls { + atomic.AddInt64(&list.eventsSkipped, 1) err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls) for id := range targetIDset { resCh <- TargetIDResult{ID: id, Err: err} @@ -171,9 +176,7 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- list.send(event, targetIDset, resCh) return } - go func() { - list.send(event, targetIDset, resCh) - }() + go list.send(event, targetIDset, resCh) } func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { @@ -199,6 +202,7 @@ func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- } } wg.Wait() + atomic.AddInt64(&list.totalEvents, 1) } // Stats returns stats for targets. @@ -208,6 +212,9 @@ func (list *TargetList) Stats() TargetStats { return t } t.CurrentSendCalls = atomic.LoadInt64(&list.currentSendCalls) + t.EventsSkipped = atomic.LoadInt64(&list.eventsSkipped) + t.TotalEvents = atomic.LoadInt64(&list.totalEvents) + list.RLock() defer list.RUnlock() t.TargetStats = make(map[string]TargetStat, len(list.targets))