Label the notification target metrics by their target IDs (#18633)

This patch adds the targetID to the existing notification target metrics
and deprecates the current target metrics which points to the overall
event notification subsystem
This commit is contained in:
Praveen raj Mani
2023-12-14 22:39:26 +05:30
committed by GitHub
parent b3314e97a6
commit 10ca0a6936
3 changed files with 157 additions and 32 deletions

View File

@@ -21,7 +21,6 @@ import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
@@ -50,22 +49,23 @@ type TargetStore interface {
Len() int
}
// TargetStats is a collection of stats for multiple targets.
type TargetStats struct {
// CurrentSendCalls is the number of concurrent async Send calls to all targets
CurrentSendCalls int64
TotalEvents int64
// Stats is a collection of stats for multiple targets.
type Stats struct {
TotalEvents int64 // Deprecated
EventsSkipped int64
CurrentQueuedCalls int64
EventsErrorsTotal int64
CurrentQueuedCalls int64 // Deprecated
EventsErrorsTotal int64 // Deprecated
CurrentSendCalls int64 // Deprecated
TargetStats map[string]TargetStat
TargetStats map[TargetID]TargetStat
}
// TargetStat is the stats of a single target.
type TargetStat struct {
ID TargetID
CurrentQueue int // Populated if target has a store.
CurrentSendCalls int64 // CurrentSendCalls is the number of concurrent async Send calls to all targets
CurrentQueue int // Populated if target has a store.
TotalEvents int64
FailedEvents int64 // Number of failed events per target
}
// TargetList - holds list of targets indexed by target ID.
@@ -80,6 +80,82 @@ type TargetList struct {
targets map[TargetID]Target
queue chan asyncEvent
ctx context.Context
statLock sync.RWMutex
targetStats map[TargetID]targetStat
}
type targetStat struct {
// The number of concurrent async Send calls per targets
currentSendCalls int64
// The number of total events per target
totalEvents int64
// The number of failed events per target
failedEvents int64
}
func (list *TargetList) getStatsByTargetID(id TargetID) (stat targetStat) {
list.statLock.RLock()
defer list.statLock.RUnlock()
return list.targetStats[id]
}
func (list *TargetList) incCurrentSendCalls(id TargetID) {
list.statLock.Lock()
defer list.statLock.Unlock()
stats, ok := list.targetStats[id]
if !ok {
stats = targetStat{}
}
stats.currentSendCalls++
list.targetStats[id] = stats
return
}
func (list *TargetList) decCurrentSendCalls(id TargetID) {
list.statLock.Lock()
defer list.statLock.Unlock()
stats, ok := list.targetStats[id]
if !ok {
// should not happen
return
}
stats.currentSendCalls--
list.targetStats[id] = stats
return
}
func (list *TargetList) incFailedEvents(id TargetID) {
list.statLock.Lock()
defer list.statLock.Unlock()
stats, ok := list.targetStats[id]
if !ok {
stats = targetStat{}
}
stats.failedEvents++
list.targetStats[id] = stats
return
}
func (list *TargetList) incTotalEvents(id TargetID) {
list.statLock.Lock()
defer list.statLock.Unlock()
stats, ok := list.targetStats[id]
if !ok {
stats = targetStat{}
}
stats.totalEvents++
list.targetStats[id] = stats
return
}
type asyncEvent struct {
@@ -203,11 +279,15 @@ func (list *TargetList) sendSync(event Event, targetIDset TargetIDSet) {
wg.Add(1)
go func(id TargetID, target Target) {
list.currentSendCalls.Add(1)
list.incCurrentSendCalls(id)
list.incTotalEvents(id)
defer list.decCurrentSendCalls(id)
defer list.currentSendCalls.Add(-1)
defer wg.Done()
if err := target.Save(event); err != nil {
list.eventsErrorsTotal.Add(1)
list.incFailedEvents(id)
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("targetID", id.String())
logger.LogOnceIf(logger.SetReqInfo(context.Background(), reqInfo), err, id.String())
@@ -240,8 +320,8 @@ func (list *TargetList) sendAsync(event Event, targetIDset TargetIDSet) {
}
// Stats returns stats for targets.
func (list *TargetList) Stats() TargetStats {
t := TargetStats{}
func (list *TargetList) Stats() Stats {
t := Stats{}
if list == nil {
return t
}
@@ -253,14 +333,21 @@ func (list *TargetList) Stats() TargetStats {
list.RLock()
defer list.RUnlock()
t.TargetStats = make(map[string]TargetStat, len(list.targets))
t.TargetStats = make(map[TargetID]TargetStat, len(list.targets))
for id, target := range list.targets {
ts := TargetStat{ID: id}
var currentQueue int
if st := target.Store(); st != nil {
ts.CurrentQueue = st.Len()
currentQueue = st.Len()
}
stats := list.getStatsByTargetID(id)
t.TargetStats[id] = TargetStat{
CurrentSendCalls: stats.currentSendCalls,
CurrentQueue: currentQueue,
FailedEvents: stats.failedEvents,
TotalEvents: stats.totalEvents,
}
t.TargetStats[strings.ReplaceAll(id.String(), ":", "_")] = ts
}
return t
}
@@ -303,9 +390,10 @@ func (list *TargetList) Init(workers int) *TargetList {
// NewTargetList - creates TargetList.
func NewTargetList(ctx context.Context) *TargetList {
list := &TargetList{
targets: make(map[TargetID]Target),
queue: make(chan asyncEvent, maxConcurrentAsyncSend),
ctx: ctx,
targets: make(map[TargetID]Target),
queue: make(chan asyncEvent, maxConcurrentAsyncSend),
targetStats: make(map[TargetID]targetStat),
ctx: ctx,
}
return list
}