avoid limits on the number of parallel trace/bucket notifications listeners (#14799)

Simplifies overall limits on the incoming listeners for notifications.

Fixes #14566
This commit is contained in:
Anis Elleuch 2022-06-05 22:29:12 +01:00 committed by GitHub
parent addfa35d93
commit fd02492cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 113 additions and 38 deletions

View File

@ -1409,9 +1409,15 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
peers, _ := newPeerRestClients(globalEndpoints) peers, _ := newPeerRestClients(globalEndpoints)
globalTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool { traceFn := func(entry interface{}) bool {
return mustTrace(entry, traceOpts) return mustTrace(entry, traceOpts)
}) }
err = globalTrace.Subscribe(traceCh, ctx.Done(), traceFn)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL)
return
}
for _, peer := range peers { for _, peer := range peers {
if peer == nil { if peer == nil {
@ -1483,7 +1489,11 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
peers, _ := newPeerRestClients(globalEndpoints) peers, _ := newPeerRestClients(globalEndpoints)
globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil) err = globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL)
return
}
for _, peer := range peers { for _, peer := range peers {
if peer == nil { if peer == nil {

View File

@ -342,7 +342,7 @@ func registerAPIRouter(router *mux.Router) {
collectAPIStats("getbucketnotification", maxClients(gz(httpTraceAll(api.GetBucketNotificationHandler))))).Queries("notification", "") collectAPIStats("getbucketnotification", maxClients(gz(httpTraceAll(api.GetBucketNotificationHandler))))).Queries("notification", "")
// ListenNotification // ListenNotification
router.Methods(http.MethodGet).HandlerFunc( router.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("listennotification", maxClients(gz(httpTraceAll(api.ListenNotificationHandler))))).Queries("events", "{events:.*}") collectAPIStats("listennotification", gz(httpTraceAll(api.ListenNotificationHandler)))).Queries("events", "{events:.*}")
// ResetBucketReplicationStatus - MinIO extension API // ResetBucketReplicationStatus - MinIO extension API
router.Methods(http.MethodGet).HandlerFunc( router.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("resetbucketreplicationstatus", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStatusHandler))))).Queries("replication-reset-status", "") collectAPIStats("resetbucketreplicationstatus", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStatusHandler))))).Queries("replication-reset-status", "")
@ -474,7 +474,7 @@ func registerAPIRouter(router *mux.Router) {
// ListenNotification // ListenNotification
apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc( apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(
collectAPIStats("listennotification", maxClients(gz(httpTraceAll(api.ListenNotificationHandler))))).Queries("events", "{events:.*}") collectAPIStats("listennotification", gz(httpTraceAll(api.ListenNotificationHandler)))).Queries("events", "{events:.*}")
// ListBuckets // ListBuckets
apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc( apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(

View File

@ -45,7 +45,7 @@ type HTTPConsoleLoggerSys struct {
// NewConsoleLogger - creates new HTTPConsoleLoggerSys with all nodes subscribed to // NewConsoleLogger - creates new HTTPConsoleLoggerSys with all nodes subscribed to
// the console logging pub sub system // the console logging pub sub system
func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys { func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys {
ps := pubsub.New() ps := pubsub.New(8)
return &HTTPConsoleLoggerSys{ return &HTTPConsoleLoggerSys{
pubsub: ps, pubsub: ps,
console: console.New(), console: console.New(),
@ -75,7 +75,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
} }
// Subscribe starts console logging for this node. // Subscribe starts console logging for this node.
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) { func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) error {
// Enable console logging for remote client. // Enable console logging for remote client.
if !sys.HasLogListeners() { if !sys.HasLogListeners() {
logger.AddSystemTarget(sys) logger.AddSystemTarget(sys)
@ -111,11 +111,12 @@ func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan
select { select {
case subCh <- entry: case subCh <- entry:
case <-doneCh: case <-doneCh:
return return nil
} }
} }
} }
sys.pubsub.Subscribe(subCh, doneCh, filter)
return sys.pubsub.Subscribe(subCh, doneCh, filter)
} }
// Init if HTTPConsoleLoggerSys is valid, always returns nil right now // Init if HTTPConsoleLoggerSys is valid, always returns nil right now

View File

@ -226,10 +226,10 @@ var (
// global Trace system to send HTTP request/response // global Trace system to send HTTP request/response
// and Storage/OS calls info to registered listeners. // and Storage/OS calls info to registered listeners.
globalTrace = pubsub.New() globalTrace = pubsub.New(8)
// global Listen system to send S3 API events to registered listeners // global Listen system to send S3 API events to registered listeners
globalHTTPListen = pubsub.New() globalHTTPListen = pubsub.New(0)
// global console system to send console logs to // global console system to send console logs to
// registered listeners // registered listeners

View File

@ -127,7 +127,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
peers, _ := newPeerRestClients(globalEndpoints) peers, _ := newPeerRestClients(globalEndpoints)
globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool { listenFn := func(evI interface{}) bool {
ev, ok := evI.(event.Event) ev, ok := evI.(event.Event)
if !ok { if !ok {
return false return false
@ -138,7 +138,13 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
} }
} }
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
}) }
err := globalHTTPListen.Subscribe(listenCh, ctx.Done(), listenFn)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL)
return
}
if bucketName != "" { if bucketName != "" {
values.Set(peerRESTListenBucket, bucketName) values.Set(peerRESTListenBucket, bucketName)

View File

@ -933,7 +933,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000) ch := make(chan interface{}, 2000)
globalHTTPListen.Subscribe(ch, doneCh, func(evI interface{}) bool { listenFn := func(evI interface{}) bool {
ev, ok := evI.(event.Event) ev, ok := evI.(event.Event)
if !ok { if !ok {
return false return false
@ -944,7 +944,13 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
}) }
err := globalHTTPListen.Subscribe(ch, doneCh, listenFn)
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond) keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
@ -1005,9 +1011,15 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000) ch := make(chan interface{}, 2000)
globalTrace.Subscribe(ch, doneCh, func(entry interface{}) bool { traceFn := func(entry interface{}) bool {
return mustTrace(entry, traceOpts) return mustTrace(entry, traceOpts)
}) }
err = globalTrace.Subscribe(ch, doneCh, traceFn)
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond) keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
@ -1092,7 +1104,11 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
defer close(doneCh) defer close(doneCh)
ch := make(chan interface{}, 2000) ch := make(chan interface{}, 2000)
globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil)
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond) keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()

View File

@ -18,6 +18,7 @@
package pubsub package pubsub
import ( import (
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -32,6 +33,7 @@ type Sub struct {
type PubSub struct { type PubSub struct {
subs []*Sub subs []*Sub
numSubscribers int32 numSubscribers int32
maxSubscribers int32
sync.RWMutex sync.RWMutex
} }
@ -53,13 +55,18 @@ func (ps *PubSub) Publish(item interface{}) {
} }
// Subscribe - Adds a subscriber to pubsub system // Subscribe - Adds a subscriber to pubsub system
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) { func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) error {
totalSubs := atomic.AddInt32(&ps.numSubscribers, 1)
if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers {
atomic.AddInt32(&ps.numSubscribers, -1)
return fmt.Errorf("the limit of `%d` subscribers is reached", ps.maxSubscribers)
}
ps.Lock() ps.Lock()
defer ps.Unlock() defer ps.Unlock()
sub := &Sub{subCh, filter} sub := &Sub{subCh, filter}
ps.subs = append(ps.subs, sub) ps.subs = append(ps.subs, sub)
atomic.AddInt32(&ps.numSubscribers, 1)
go func() { go func() {
<-doneCh <-doneCh
@ -74,6 +81,8 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt
} }
atomic.AddInt32(&ps.numSubscribers, -1) atomic.AddInt32(&ps.numSubscribers, -1)
}() }()
return nil
} }
// NumSubscribers returns the number of current subscribers // NumSubscribers returns the number of current subscribers
@ -81,7 +90,8 @@ func (ps *PubSub) NumSubscribers() int32 {
return atomic.LoadInt32(&ps.numSubscribers) return atomic.LoadInt32(&ps.numSubscribers)
} }
// New inits a PubSub system // New inits a PubSub system with a limit of maximum
func New() *PubSub { // subscribers unless zero is specified
return &PubSub{} func New(maxSubscribers int32) *PubSub {
return &PubSub{maxSubscribers: maxSubscribers}
} }

View File

@ -24,68 +24,100 @@ import (
) )
func TestSubscribe(t *testing.T) { func TestSubscribe(t *testing.T) {
ps := New() ps := New(2)
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
ps.Subscribe(ch1, doneCh, nil) if err := ps.Subscribe(ch1, doneCh, nil); err != nil {
ps.Subscribe(ch2, doneCh, nil) t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
ps.Lock() ps.Lock()
defer ps.Unlock() defer ps.Unlock()
if len(ps.subs) != 2 { if len(ps.subs) != 2 {
t.Errorf("expected 2 subscribers") t.Fatalf("expected 2 subscribers")
}
}
func TestSubscribeExceedingLimit(t *testing.T) {
ps := New(2)
ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1)
ch3 := make(chan interface{}, 1)
doneCh := make(chan struct{})
defer close(doneCh)
if err := ps.Subscribe(ch1, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch3, doneCh, nil); err == nil {
t.Fatalf("unexpected nil err")
} }
} }
func TestUnsubscribe(t *testing.T) { func TestUnsubscribe(t *testing.T) {
ps := New() ps := New(2)
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
doneCh1 := make(chan struct{}) doneCh1 := make(chan struct{})
doneCh2 := make(chan struct{}) doneCh2 := make(chan struct{})
ps.Subscribe(ch1, doneCh1, nil) if err := ps.Subscribe(ch1, doneCh1, nil); err != nil {
ps.Subscribe(ch2, doneCh2, nil) t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh2, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
close(doneCh1) close(doneCh1)
// Allow for the above statement to take effect. // Allow for the above statement to take effect.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
ps.Lock() ps.Lock()
if len(ps.subs) != 1 { if len(ps.subs) != 1 {
t.Errorf("expected 1 subscriber") t.Fatal("expected 1 subscriber")
} }
ps.Unlock() ps.Unlock()
close(doneCh2) close(doneCh2)
} }
func TestPubSub(t *testing.T) { func TestPubSub(t *testing.T) {
ps := New() ps := New(1)
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
doneCh1 := make(chan struct{}) doneCh1 := make(chan struct{})
defer close(doneCh1) defer close(doneCh1)
ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }) if err := ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
val := "hello" val := "hello"
ps.Publish(val) ps.Publish(val)
msg := <-ch1 msg := <-ch1
if msg != "hello" { if msg != "hello" {
t.Errorf(fmt.Sprintf("expected %s , found %s", val, msg)) t.Fatalf(fmt.Sprintf("expected %s , found %s", val, msg))
} }
} }
func TestMultiPubSub(t *testing.T) { func TestMultiPubSub(t *testing.T) {
ps := New() ps := New(2)
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }) if err := ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }); err != nil {
ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }) t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
val := "hello" val := "hello"
ps.Publish(val) ps.Publish(val)
msg1 := <-ch1 msg1 := <-ch1
msg2 := <-ch2 msg2 := <-ch2
if msg1 != "hello" && msg2 != "hello" { if msg1 != "hello" && msg2 != "hello" {
t.Errorf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2)) t.Fatalf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2))
} }
} }