diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index 5b5155bbf..40ab94dc4 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -20,6 +20,12 @@ package event import ( "fmt" "sync" + "sync/atomic" +) + +const ( + // The maximum allowed number of concurrent Send() calls to all configured notifications targets + maxConcurrentTargetSendCalls = 20000 ) // Target - event target interface @@ -34,6 +40,9 @@ type Target interface { // TargetList - holds list of targets indexed by target ID. type TargetList struct { + // The number of concurrent async Send calls to all targets + currentSendCalls int64 + sync.RWMutex targets map[TargetID]Target } @@ -124,6 +133,14 @@ func (list *TargetList) TargetMap() map[TargetID]Target { // Send - sends events to targets identified by target IDs. func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { + if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls { + err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls) + for id := range targetIDset { + resCh <- TargetIDResult{ID: id, Err: err} + } + return + } + go func() { var wg sync.WaitGroup for id := range targetIDset { @@ -133,6 +150,8 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- if ok { wg.Add(1) go func(id TargetID, target Target) { + atomic.AddInt64(&list.currentSendCalls, 1) + defer atomic.AddInt64(&list.currentSendCalls, -1) defer wg.Done() tgtRes := TargetIDResult{ID: id} if err := target.Save(event); err != nil {