fix: set 20000 as maximum parallel event calls (#15435)

This is needed to avoid consuming a lot of goroutines when a target is
very slow or there is a bug in a target library.
This commit is contained in:
Anis Elleuch 2022-07-30 20:12:33 +01:00 committed by GitHub
parent 6b4cb35f4f
commit 3856d078d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,6 +20,12 @@ package event
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
)
const (
// The maximum allowed number of concurrent Send() calls to all configured notifications targets
maxConcurrentTargetSendCalls = 20000
) )
// Target - event target interface // Target - event target interface
@ -34,6 +40,9 @@ type Target interface {
// TargetList - holds list of targets indexed by target ID. // TargetList - holds list of targets indexed by target ID.
type TargetList struct { type TargetList struct {
// The number of concurrent async Send calls to all targets
currentSendCalls int64
sync.RWMutex sync.RWMutex
targets map[TargetID]Target targets map[TargetID]Target
} }
@ -124,6 +133,14 @@ func (list *TargetList) TargetMap() map[TargetID]Target {
// Send - sends events to targets identified by target IDs. // Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls {
err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls)
for id := range targetIDset {
resCh <- TargetIDResult{ID: id, Err: err}
}
return
}
go func() { go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
for id := range targetIDset { for id := range targetIDset {
@ -133,6 +150,8 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<-
if ok { if ok {
wg.Add(1) wg.Add(1)
go func(id TargetID, target Target) { go func(id TargetID, target Target) {
atomic.AddInt64(&list.currentSendCalls, 1)
defer atomic.AddInt64(&list.currentSendCalls, -1)
defer wg.Done() defer wg.Done()
tgtRes := TargetIDResult{ID: id} tgtRes := TargetIDResult{ID: id}
if err := target.Save(event); err != nil { if err := target.Save(event); err != nil {