From 040ac5cad8750a981184f91cb87ab8991aa8edd3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 16 May 2022 16:10:51 -0700 Subject: [PATCH] fix: when logger queue is full exit quickly upon doneCh (#14928) Additionally only reload requested sub-system not everything --- cmd/admin-handlers-config-kv.go | 2 +- cmd/notification.go | 19 +++++++++++++++++-- cmd/peer-rest-client.go | 3 ++- cmd/peer-rest-common.go | 1 + cmd/peer-rest-server.go | 9 ++++++++- internal/logger/target/http/http.go | 24 ++++++++++++++---------- internal/logger/target/kafka/kafka.go | 25 ++++++++++++++++--------- 7 files changed, 59 insertions(+), 24 deletions(-) diff --git a/cmd/admin-handlers-config-kv.go b/cmd/admin-handlers-config-kv.go index fb4c683f0..d119a81ff 100644 --- a/cmd/admin-handlers-config-kv.go +++ b/cmd/admin-handlers-config-kv.go @@ -104,7 +104,7 @@ func applyDynamic(ctx context.Context, objectAPI ObjectLayer, cfg config.Config, writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - globalNotificationSys.SignalService(serviceReloadDynamic) + globalNotificationSys.SignalConfigReload(subSys) // Tell the client that dynamic config was applied. w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue) } diff --git a/cmd/notification.go b/cmd/notification.go index 5f1ba6e51..6f076bfa7 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -414,6 +414,21 @@ func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256 return ng.Wait() } +// SignalConfigReload reloads requested sub-system on a remote peer dynamically. +func (sys *NotificationSys) SignalConfigReload(subSys string) []NotificationPeerErr { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(GlobalContext, func() error { + return client.SignalService(serviceReloadDynamic, subSys) + }, idx, *client.host) + } + return ng.Wait() +} + // SignalService - calls signal service RPC call on all peers. func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)) @@ -423,7 +438,7 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE } client := client ng.Go(GlobalContext, func() error { - return client.SignalService(sig) + return client.SignalService(sig, "") }, idx, *client.host) } return ng.Wait() @@ -1541,7 +1556,7 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No } client := client ng.Go(GlobalContext, func() error { - return client.SignalService(serviceSig) + return client.SignalService(serviceSig, "") }, idx, *client.host) } nerrs := ng.Wait() diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 22c1039b5..8d1d85912 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -688,9 +688,10 @@ func (client *peerRESTClient) ServerUpdate(ctx context.Context, u *url.URL, sha2 } // SignalService - sends signal to peer nodes. -func (client *peerRESTClient) SignalService(sig serviceSignal) error { +func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string) error { values := make(url.Values) values.Set(peerRESTSignal, strconv.Itoa(int(sig))) + values.Set(peerRESTSubSys, subSys) respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1) if err != nil { return err diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 5bfbd61af..9333ebc49 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -84,6 +84,7 @@ const ( peerRESTUserOrGroup = "user-or-group" peerRESTIsGroup = "is-group" peerRESTSignal = "signal" + peerRESTSubSys = "sub-sys" peerRESTProfiler = "profiler" peerRESTTraceErr = "err" peerRESTTraceInternal = "internal" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 8b6a39c59..bc613db33 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -837,7 +837,14 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req s.writeErrorResponse(w, err) return } - if err = applyDynamicConfig(r.Context(), objAPI, srvCfg); err != nil { + subSys := r.Form.Get(peerRESTSubSys) + // Apply dynamic values. + if subSys == "" { + err = applyDynamicConfig(r.Context(), objAPI, srvCfg) + } else { + err = applyDynamicConfigForSubSys(r.Context(), objAPI, srvCfg, subSys) + } + if err != nil { s.writeErrorResponse(w, err) } return diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 56b18661b..056ad354f 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -26,7 +26,6 @@ import ( "net/http" "strings" "sync" - "sync/atomic" "time" xhttp "github.com/minio/minio/internal/http" @@ -58,8 +57,8 @@ type Config struct { // buffer is full, new logs are just ignored and an error // is returned to the caller. type Target struct { - status int32 wg sync.WaitGroup + doneCh chan struct{} // Channel of log entries logCh chan interface{} @@ -115,7 +114,6 @@ func (h *Target) Init() error { h.config.Endpoint, resp.Status) } - h.status = 1 go h.startHTTPLogger() return nil } @@ -177,11 +175,15 @@ func (h *Target) logEntry(entry interface{}) { func (h *Target) startHTTPLogger() { // Create a routine which sends json logs received // from an internal channel. + h.wg.Add(1) go func() { - h.wg.Add(1) defer h.wg.Done() - for entry := range h.logCh { + + select { + case entry := <-h.logCh: h.logEntry(entry) + case <-h.doneCh: + return } }() } @@ -191,6 +193,7 @@ func (h *Target) startHTTPLogger() { func New(config Config) *Target { h := &Target{ logCh: make(chan interface{}, config.QueueSize), + doneCh: make(chan struct{}), config: config, } @@ -199,12 +202,14 @@ func New(config Config) *Target { // Send log message 'e' to http target. func (h *Target) Send(entry interface{}, errKind string) error { - if atomic.LoadInt32(&h.status) == 0 { - // Channel was closed or used before init. + select { + case <-h.doneCh: return nil + default: } select { + case <-h.doneCh: case h.logCh <- entry: default: // log channel is full, do not wait and return @@ -217,9 +222,8 @@ func (h *Target) Send(entry interface{}, errKind string) error { // Cancel - cancels the target func (h *Target) Cancel() { - if atomic.CompareAndSwapInt32(&h.status, 1, 0) { - close(h.logCh) - } + close(h.doneCh) + close(h.logCh) h.wg.Wait() } diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 39f63a87c..02a47eb61 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -25,7 +25,6 @@ import ( "errors" "net" "sync" - "sync/atomic" "github.com/Shopify/sarama" saramatls "github.com/Shopify/sarama/tools/tls" @@ -37,8 +36,8 @@ import ( // Target - Kafka target. type Target struct { - status int32 wg sync.WaitGroup + doneCh chan struct{} // Channel of log entries logCh chan interface{} @@ -51,6 +50,13 @@ type Target struct { // Send log message 'e' to kafka target. func (h *Target) Send(entry interface{}, errKind string) error { select { + case <-h.doneCh: + return nil + default: + } + + select { + case <-h.doneCh: case h.logCh <- entry: default: // log channel is full, do not wait and return @@ -86,11 +92,15 @@ func (h *Target) logEntry(entry interface{}) { func (h *Target) startKakfaLogger() { // Create a routine which sends json logs received // from an internal channel. + h.wg.Add(1) go func() { - h.wg.Add(1) defer h.wg.Done() - for entry := range h.logCh { + + select { + case entry := <-h.logCh: h.logEntry(entry) + case <-h.doneCh: + return } }() } @@ -204,17 +214,14 @@ func (h *Target) Init() error { } h.producer = producer - - h.status = 1 go h.startKakfaLogger() return nil } // Cancel - cancels the target func (h *Target) Cancel() { - if atomic.CompareAndSwapInt32(&h.status, 1, 0) { - close(h.logCh) - } + close(h.doneCh) + close(h.logCh) h.wg.Wait() }