mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
update buffer channels for both trace and listen events (#18171)
- Trace needs higher buffered channels than 4000 to ensure when we run `mc admin trace -a` it captures all information sufficiently. - Listen event notification needs the event channel to be `apiRequestsMaxPerNode` * number of nodes
This commit is contained in:
parent
bb77b89da0
commit
1971c54a50
@ -1581,7 +1581,9 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers.
|
// Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers.
|
||||||
// Use buffered channel to take care of burst sends or slow w.Write()
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
||||||
traceCh := make(chan madmin.TraceInfo, 4000)
|
|
||||||
|
// Keep 100k buffered channel, should be sufficient to ensure we do not lose any events.
|
||||||
|
traceCh := make(chan madmin.TraceInfo, 100000)
|
||||||
|
|
||||||
peers, _ := newPeerRestClients(globalEndpoints)
|
peers, _ := newPeerRestClients(globalEndpoints)
|
||||||
|
|
||||||
@ -1605,7 +1607,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
peer.Trace(traceCh, ctx.Done(), traceOpts)
|
peer.Trace(traceCh, ctx.Done(), traceOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
|
keepAliveTicker := time.NewTicker(time.Second)
|
||||||
defer keepAliveTicker.Stop()
|
defer keepAliveTicker.Stop()
|
||||||
|
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
|
@ -116,7 +116,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
|
|||||||
|
|
||||||
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
|
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
|
||||||
// Use buffered channel to take care of burst sends or slow w.Write()
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
||||||
listenCh := make(chan event.Event, 4000)
|
listenCh := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity()*len(globalEndpoints.Hostnames()))
|
||||||
|
|
||||||
peers, _ := newPeerRestClients(globalEndpoints)
|
peers, _ := newPeerRestClients(globalEndpoints)
|
||||||
|
|
||||||
|
@ -955,7 +955,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
||||||
// Use buffered channel to take care of burst sends or slow w.Write()
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
||||||
ch := make(chan event.Event, 2000)
|
ch := make(chan event.Event, globalAPIConfig.getRequestsPoolCapacity())
|
||||||
|
|
||||||
err := globalHTTPListen.Subscribe(mask, ch, doneCh, func(ev event.Event) bool {
|
err := globalHTTPListen.Subscribe(mask, ch, doneCh, func(ev event.Event) bool {
|
||||||
if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" {
|
if ev.S3.Bucket.Name != "" && values.Get(peerRESTListenBucket) != "" {
|
||||||
@ -1010,7 +1010,7 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
|
||||||
// Use buffered channel to take care of burst sends or slow w.Write()
|
// Use buffered channel to take care of burst sends or slow w.Write()
|
||||||
ch := make(chan madmin.TraceInfo, 2000)
|
ch := make(chan madmin.TraceInfo, 100000)
|
||||||
err = globalTrace.Subscribe(traceOpts.TraceTypes(), ch, r.Context().Done(), func(entry madmin.TraceInfo) bool {
|
err = globalTrace.Subscribe(traceOpts.TraceTypes(), ch, r.Context().Done(), func(entry madmin.TraceInfo) bool {
|
||||||
return shouldTrace(entry, traceOpts)
|
return shouldTrace(entry, traceOpts)
|
||||||
})
|
})
|
||||||
@ -1159,13 +1159,13 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
|
|||||||
doneCh := make(chan struct{})
|
doneCh := make(chan struct{})
|
||||||
defer close(doneCh)
|
defer close(doneCh)
|
||||||
|
|
||||||
ch := make(chan log.Info, 2000)
|
ch := make(chan log.Info, 100000)
|
||||||
err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, madmin.LogMaskAll, nil)
|
err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, madmin.LogMaskAll, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
|
keepAliveTicker := time.NewTicker(time.Second)
|
||||||
defer keepAliveTicker.Stop()
|
defer keepAliveTicker.Stop()
|
||||||
|
|
||||||
enc := gob.NewEncoder(w)
|
enc := gob.NewEncoder(w)
|
||||||
|
Loading…
Reference in New Issue
Block a user