From 9c0e8cd15baffaf063a7524300fb62deea1f91e0 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Sat, 29 Jul 2023 20:49:18 +0100 Subject: [PATCH] logger: Avoid slow calls in http logger Send() function (#17747) Send() is synchronous and can affect the latency of S3 requests when the logger buffer is full. Avoid checking if the HTTP target is online or not and increase the workers anyway since the buffer is already full. Also, avoid logs flooding when the audit target is down. --- internal/logger/audit.go | 3 +-- internal/logger/target/http/http.go | 25 ++++++++++--------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/internal/logger/audit.go b/internal/logger/audit.go index 4f4ba62d3..d175144a7 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -28,7 +28,6 @@ import ( "github.com/minio/minio/internal/mcontext" "github.com/minio/pkg/logger/message/audit" - "github.com/minio/madmin-go/v3" xhttp "github.com/minio/minio/internal/http" ) @@ -145,7 +144,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl // Send audit logs only to http targets. for _, t := range auditTgts { if err := t.Send(ctx, entry); err != nil { - LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), madmin.LogKindAll) + LogOnceIf(ctx, fmt.Errorf("Unable to send an audit event to the target `%v`: %v", t, err), "send-audit-event-failure") } } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 3db24e7dd..4ff666a1f 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -43,7 +43,7 @@ const ( // Timeout for the webhook http call webhookCallTimeout = 5 * time.Second - // maxWorkers is the maximum number of concurrent operations. + // maxWorkers is the maximum number of concurrent http loggers maxWorkers = 16 // the suffix for the configured queue dir where the logs will be persisted. @@ -361,32 +361,29 @@ func (h *Target) SendFromStore(key string) (err error) { return h.store.Del(key) } -// Send log message 'e' to http target. -// If servers are offline messages are queued until queue is full. +// Send the log message 'entry' to the http target. +// Messages are queued in the disk if the store is enabled // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { + if atomic.LoadInt32(&h.status) == statusClosed { + return nil + } if h.store != nil { // save the entry to the queue store which will be replayed to the target. return h.store.Put(entry) } - if atomic.LoadInt32(&h.status) == statusClosed { - return nil - } h.logChMu.RLock() defer h.logChMu.RUnlock() if h.logCh == nil { // We are closing... return nil } + select { case h.logCh <- entry: + case <-ctx.Done(): + return ctx.Err() default: - // Drop messages until we are online. - if !h.IsOnline(ctx) { - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full and remote offline") - } nWorkers := atomic.LoadInt64(&h.workers) if nWorkers < maxWorkers { // Only have one try to start at the same time. @@ -403,11 +400,9 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { h.logCh <- entry return nil } - // log channel is full, do not wait and return - // an error immediately to the caller atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full, remote endpoint is not able to keep up") + return errors.New("log buffer full") } return nil