diff --git a/internal/logger/audit.go b/internal/logger/audit.go index 4f4ba62d3..d175144a7 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -28,7 +28,6 @@ import ( "github.com/minio/minio/internal/mcontext" "github.com/minio/pkg/logger/message/audit" - "github.com/minio/madmin-go/v3" xhttp "github.com/minio/minio/internal/http" ) @@ -145,7 +144,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl // Send audit logs only to http targets. for _, t := range auditTgts { if err := t.Send(ctx, entry); err != nil { - LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), madmin.LogKindAll) + LogOnceIf(ctx, fmt.Errorf("Unable to send an audit event to the target `%v`: %v", t, err), "send-audit-event-failure") } } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 3db24e7dd..4ff666a1f 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -43,7 +43,7 @@ const ( // Timeout for the webhook http call webhookCallTimeout = 5 * time.Second - // maxWorkers is the maximum number of concurrent operations. + // maxWorkers is the maximum number of concurrent http loggers maxWorkers = 16 // the suffix for the configured queue dir where the logs will be persisted. @@ -361,32 +361,29 @@ func (h *Target) SendFromStore(key string) (err error) { return h.store.Del(key) } -// Send log message 'e' to http target. -// If servers are offline messages are queued until queue is full. +// Send the log message 'entry' to the http target. +// Messages are queued in the disk if the store is enabled // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { + if atomic.LoadInt32(&h.status) == statusClosed { + return nil + } if h.store != nil { // save the entry to the queue store which will be replayed to the target. return h.store.Put(entry) } - if atomic.LoadInt32(&h.status) == statusClosed { - return nil - } h.logChMu.RLock() defer h.logChMu.RUnlock() if h.logCh == nil { // We are closing... return nil } + select { case h.logCh <- entry: + case <-ctx.Done(): + return ctx.Err() default: - // Drop messages until we are online. - if !h.IsOnline(ctx) { - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full and remote offline") - } nWorkers := atomic.LoadInt64(&h.workers) if nWorkers < maxWorkers { // Only have one try to start at the same time. @@ -403,11 +400,9 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { h.logCh <- entry return nil } - // log channel is full, do not wait and return - // an error immediately to the caller atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full, remote endpoint is not able to keep up") + return errors.New("log buffer full") } return nil