From 4eeb48f8e09b86db8ec8ce49ca74404d93912ffe Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Thu, 21 Sep 2023 16:58:24 -0700 Subject: [PATCH] Return cached online/offline status for audit/http loggers (#18083) To avoid having delays in prometheus scrape and in 'mc admin info' command. --- internal/logger/target/http/http.go | 19 ++++++++++++++++--- internal/logger/target/kafka/kafka.go | 20 ++++++++++++++++---- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 512cc2d1f..55a4d3d50 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -127,8 +127,13 @@ func (h *Target) String() string { return h.config.Name } -// IsOnline returns true if the target is reachable. +// IsOnline returns true if the target is reachable using a cached value func (h *Target) IsOnline(ctx context.Context) bool { + return atomic.LoadInt32(&h.status) == statusOnline +} + +// ping returns true if the target is reachable. +func (h *Target) ping(ctx context.Context) bool { if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil { return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) } @@ -179,7 +184,7 @@ func (h *Target) init(ctx context.Context) (err error) { return errors.New("target is closed") } - if !h.IsOnline(ctx) { + if !h.ping(ctx) { // Start a goroutine that will continue to check if we can reach h.revive.Do(func() { go func() { @@ -191,7 +196,7 @@ func (h *Target) init(ctx context.Context) (err error) { if atomic.LoadInt32(&h.status) != statusOffline { return } - if h.IsOnline(ctx) { + if h.ping(ctx) { // We are online. if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { h.workerStartMu.Lock() @@ -219,6 +224,14 @@ func (h *Target) init(ctx context.Context) (err error) { } func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration) (err error) { + defer func() { + if err != nil { + atomic.StoreInt32(&h.status, statusOffline) + } else { + atomic.StoreInt32(&h.status, statusOnline) + } + }() + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 9d194f2a7..b04278bf4 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -43,6 +43,12 @@ import ( // the suffix for the configured queue dir where the logs will be persisted. const kafkaLoggerExtension = ".kafka.log" +const ( + statusClosed = iota + statusOffline + statusOnline +) + // Config - kafka target arguments. type Config struct { Enabled bool `json:"enable"` @@ -102,6 +108,8 @@ func (k Config) pingBrokers() (err error) { // Target - Kafka target. type Target struct { + status int32 + totalMessages int64 failedMessages int64 @@ -244,6 +252,11 @@ func (h *Target) send(entry interface{}) error { Value: sarama.ByteEncoder(logJSON), } _, _, err = h.producer.SendMessage(&msg) + if err != nil { + atomic.StoreInt32(&h.status, statusOffline) + } else { + atomic.StoreInt32(&h.status, statusOnline) + } return err } @@ -307,15 +320,13 @@ func (h *Target) init() error { } h.producer = producer + atomic.StoreInt32(&h.status, statusOnline) return nil } // IsOnline returns true if the target is online. func (h *Target) IsOnline(_ context.Context) bool { - if err := h.initKafkaOnce.Do(h.init); err != nil { - return false - } - return h.kconfig.pingBrokers() == nil + return atomic.LoadInt32(&h.status) == statusOnline } // Send log message 'e' to kafka target. @@ -399,6 +410,7 @@ func New(config Config) *Target { target := &Target{ logCh: make(chan interface{}, config.QueueSize), kconfig: config, + status: statusOffline, } return target }