diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index 94daf4e13..78a9b98d8 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -21,6 +21,7 @@ import ( "container/ring" "context" "sync" + "sync/atomic" "github.com/minio/madmin-go" "github.com/minio/minio/internal/logger" @@ -36,6 +37,9 @@ const defaultLogBufferCount = 10000 // HTTPConsoleLoggerSys holds global console logger state type HTTPConsoleLoggerSys struct { + totalMessages int64 + failedMessages int64 + sync.RWMutex pubsub *pubsub.PubSub[log.Info, madmin.LogMask] console *console.Target @@ -133,6 +137,15 @@ func (sys *HTTPConsoleLoggerSys) String() string { return logger.ConsoleLoggerTgt } +// Stats returns the target statistics. +func (sys *HTTPConsoleLoggerSys) Stats() types.TargetStats { + return types.TargetStats{ + TotalMessages: atomic.LoadInt64(&sys.totalMessages), + FailedMessages: atomic.LoadInt64(&sys.failedMessages), + QueueLength: 0, + } +} + // Content returns the console stdout log func (sys *HTTPConsoleLoggerSys) Content() (logs []log.Entry) { sys.RLock() @@ -170,6 +183,7 @@ func (sys *HTTPConsoleLoggerSys) Send(entry interface{}) error { case string: lg = log.Info{ConsoleMsg: e, NodeName: sys.nodeName} } + atomic.AddInt64(&sys.totalMessages, 1) sys.pubsub.Publish(lg) sys.Lock() @@ -177,6 +191,9 @@ func (sys *HTTPConsoleLoggerSys) Send(entry interface{}) error { sys.logBuf.Value = lg sys.logBuf = sys.logBuf.Next() sys.Unlock() - - return sys.console.Send(entry, string(logger.All)) + err := sys.console.Send(entry, string(logger.All)) + if err != nil { + atomic.AddInt64(&sys.failedMessages, 1) + } + return err } diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index d07bad1d9..8da145b48 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -62,6 +62,11 @@ func (t *testingLogger) Type() types.TargetType { return types.TargetHTTP } +// Stats returns the target statistics. +func (t *testingLogger) Stats() types.TargetStats { + return types.TargetStats{} +} + func (t *testingLogger) Send(entry interface{}) error { t.mu.Lock() defer t.mu.Unlock() diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 56a71e449..16f50ef33 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -130,6 +130,7 @@ const ( iamSubsystem MetricSubsystem = "iam" kmsSubsystem MetricSubsystem = "kms" notifySubsystem MetricSubsystem = "notify" + auditSubsystem MetricSubsystem = "audit" ) // MetricName are the individual names for the metric. @@ -1567,6 +1568,43 @@ func getNotificationMetrics() *MetricsGroup { Value: float64(st.CurrentQueue), }) } + // Audit and system: + audit := logger.CurrentStats() + for id, st := range audit { + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: auditSubsystem, + Name: "target_queue_length", + Help: "Number of unsent messages in queue for target", + Type: gaugeMetric, + }, + VariableLabels: map[string]string{"target_id": id}, + Value: float64(st.QueueLength), + }) + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: auditSubsystem, + Name: "total_messages", + Help: "Total number of messages sent since start", + Type: counterMetric, + }, + VariableLabels: map[string]string{"target_id": id}, + Value: float64(st.TotalMessages), + }) + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: auditSubsystem, + Name: "failed_messages", + Help: "Total number of messages that failed to send since start", + Type: counterMetric, + }, + VariableLabels: map[string]string{"target_id": id}, + Value: float64(st.FailedMessages), + }) + } return metrics }) return mg diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 1048220a8..4bbe63d19 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -26,14 +26,20 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger/target/types" ) -// Timeout for the webhook http call -const webhookCallTimeout = 5 * time.Second +const ( + // Timeout for the webhook http call + webhookCallTimeout = 5 * time.Second + + // maxWorkers is the maximum number of concurrent operations. + maxWorkers = 8 +) // Config http logger target type Config struct { @@ -57,6 +63,14 @@ type Config struct { // buffer is full, new logs are just ignored and an error // is returned to the caller. type Target struct { + totalMessages int64 + failedMessages int64 + + // Worker control + workers int64 + workerStartMu sync.Mutex + lastStarted time.Time + wg sync.WaitGroup doneCh chan struct{} @@ -64,6 +78,7 @@ type Target struct { logCh chan interface{} config Config + client *http.Client } // Endpoint returns the backend endpoint @@ -75,6 +90,15 @@ func (h *Target) String() string { return h.config.Name } +// Stats returns the target statistics. +func (h *Target) Stats() types.TargetStats { + return types.TargetStats{ + TotalMessages: atomic.LoadInt64(&h.totalMessages), + FailedMessages: atomic.LoadInt64(&h.failedMessages), + QueueLength: len(h.logCh), + } +} + // Init validate and initialize the http target func (h *Target) Init() error { ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) @@ -100,6 +124,7 @@ func (h *Target) Init() error { if err != nil { return err } + h.client = &client // Drain any response. xhttp.DrainBody(resp.Body) @@ -114,6 +139,8 @@ func (h *Target) Init() error { h.config.Endpoint, resp.Status) } + h.lastStarted = time.Now() + atomic.AddInt64(&h.workers, 1) go h.startHTTPLogger() return nil } @@ -128,15 +155,17 @@ func acceptedResponseStatusCode(code int) bool { func (h *Target) logEntry(entry interface{}) { logJSON, err := json.Marshal(&entry) if err != nil { + atomic.AddInt64(&h.failedMessages, 1) return } ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) + defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, bytes.NewReader(logJSON)) if err != nil { h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - cancel() + atomic.AddInt64(&h.failedMessages, 1) return } req.Header.Set(xhttp.ContentType, "application/json") @@ -151,10 +180,9 @@ func (h *Target) logEntry(entry interface{}) { req.Header.Set("Authorization", h.config.AuthToken) } - client := http.Client{Transport: h.config.Transport} - resp, err := client.Do(req) - cancel() + resp, err := h.client.Do(req) if err != nil { + atomic.AddInt64(&h.failedMessages, 1) h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) return } @@ -163,6 +191,7 @@ func (h *Target) logEntry(entry interface{}) { xhttp.DrainBody(resp.Body) if !acceptedResponseStatusCode(resp.StatusCode) { + atomic.AddInt64(&h.failedMessages, 1) switch resp.StatusCode { case http.StatusForbidden: h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint) @@ -177,11 +206,15 @@ func (h *Target) startHTTPLogger() { // from an internal channel. h.wg.Add(1) go func() { - defer h.wg.Done() + defer func() { + h.wg.Done() + atomic.AddInt64(&h.workers, -1) + }() for { select { case entry := <-h.logCh: + atomic.AddInt64(&h.totalMessages, 1) h.logEntry(entry) case <-h.doneCh: return @@ -214,8 +247,30 @@ func (h *Target) Send(entry interface{}) error { case <-h.doneCh: case h.logCh <- entry: default: + nWorkers := atomic.LoadInt64(&h.workers) + if nWorkers < maxWorkers { + // Only have one try to start at the same time. + h.workerStartMu.Lock() + defer h.workerStartMu.Unlock() + // Start one max every second. + if time.Since(h.lastStarted) > time.Second { + if atomic.CompareAndSwapInt64(&h.workers, nWorkers, nWorkers+1) { + // Start another logger. + h.lastStarted = time.Now() + go h.startHTTPLogger() + } + } + // Block to send + select { + case <-h.doneCh: + case h.logCh <- entry: + } + return nil + } // log channel is full, do not wait and return // an error immediately to the caller + atomic.AddInt64(&h.totalMessages, 1) + atomic.AddInt64(&h.failedMessages, 1) return errors.New("log buffer full") } diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 123f9f6df..42512dd9b 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -25,6 +25,7 @@ import ( "errors" "net" "sync" + "sync/atomic" "github.com/Shopify/sarama" saramatls "github.com/Shopify/sarama/tools/tls" @@ -36,11 +37,14 @@ import ( // Target - Kafka target. type Target struct { + totalMessages int64 + failedMessages int64 + wg sync.WaitGroup doneCh chan struct{} // Channel of log entries - logCh chan interface{} + logCh chan audit.Entry producer sarama.SyncProducer kconfig Config @@ -55,37 +59,40 @@ func (h *Target) Send(entry interface{}) error { default: } - select { - case <-h.doneCh: - case h.logCh <- entry: - default: - // log channel is full, do not wait and return - // an error immediately to the caller - return errors.New("log buffer full") + if e, ok := entry.(audit.Entry); ok { + select { + case <-h.doneCh: + case h.logCh <- e: + default: + // log channel is full, do not wait and return + // an error immediately to the caller + atomic.AddInt64(&h.totalMessages, 1) + atomic.AddInt64(&h.failedMessages, 1) + return errors.New("log buffer full") + } } return nil } -func (h *Target) logEntry(entry interface{}) { +func (h *Target) logEntry(entry audit.Entry) { + atomic.AddInt64(&h.totalMessages, 1) logJSON, err := json.Marshal(&entry) if err != nil { + atomic.AddInt64(&h.failedMessages, 1) return } + msg := sarama.ProducerMessage{ + Topic: h.kconfig.Topic, + Key: sarama.StringEncoder(entry.RequestID), + Value: sarama.ByteEncoder(logJSON), + } - ae, ok := entry.(audit.Entry) - if ok { - msg := sarama.ProducerMessage{ - Topic: h.kconfig.Topic, - Key: sarama.StringEncoder(ae.RequestID), - Value: sarama.ByteEncoder(logJSON), - } - - _, _, err = h.producer.SendMessage(&msg) - if err != nil { - h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) - return - } + _, _, err = h.producer.SendMessage(&msg) + if err != nil { + atomic.AddInt64(&h.failedMessages, 1) + h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) + return } } @@ -147,6 +154,15 @@ func (k Config) pingBrokers() error { return err } +// Stats returns the target statistics. +func (h *Target) Stats() types.TargetStats { + return types.TargetStats{ + TotalMessages: atomic.LoadInt64(&h.totalMessages), + FailedMessages: atomic.LoadInt64(&h.failedMessages), + QueueLength: len(h.logCh), + } +} + // Endpoint - return kafka target func (h *Target) Endpoint() string { return "kafka" @@ -231,7 +247,7 @@ func (h *Target) Cancel() { // sends log over http to the specified endpoint func New(config Config) *Target { target := &Target{ - logCh: make(chan interface{}, 10000), + logCh: make(chan audit.Entry, 10000), doneCh: make(chan struct{}), kconfig: config, } diff --git a/internal/logger/target/types/targettype_string.go b/internal/logger/target/types/targettype_string.go new file mode 100644 index 000000000..6aa5e3974 --- /dev/null +++ b/internal/logger/target/types/targettype_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=TargetType -trimprefix=Target types.go"; DO NOT EDIT. + +package types + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[TargetConsole-1] + _ = x[TargetHTTP-2] + _ = x[TargetKafka-3] +} + +const _TargetType_name = "ConsoleHTTPKafka" + +var _TargetType_index = [...]uint8{0, 7, 11, 16} + +func (i TargetType) String() string { + i -= 1 + if i >= TargetType(len(_TargetType_index)-1) { + return "TargetType(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _TargetType_name[_TargetType_index[i]:_TargetType_index[i+1]] +} diff --git a/internal/logger/target/types/types.go b/internal/logger/target/types/types.go index 99618696f..d20b0cc02 100644 --- a/internal/logger/target/types/types.go +++ b/internal/logger/target/types/types.go @@ -20,6 +20,8 @@ package types // TargetType indicates type of the target e.g. console, http, kafka type TargetType uint8 +//go:generate stringer -type=TargetType -trimprefix=Target $GOFILE + // Constants for target types const ( _ TargetType = iota @@ -27,3 +29,15 @@ const ( TargetHTTP TargetKafka ) + +// TargetStats contains statistics for a target. +type TargetStats struct { + // QueueLength is the queue length if any messages are queued. + QueueLength int + + // TotalMessages is the total number of messages sent in the lifetime of the target + TotalMessages int64 + + // FailedMessages should log message count that failed to send. + FailedMessages int64 +} diff --git a/internal/logger/targets.go b/internal/logger/targets.go index c64166dd0..7c5f1a8ed 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -18,6 +18,8 @@ package logger import ( + "fmt" + "strings" "sync" "github.com/minio/minio/internal/logger/target/http" @@ -32,6 +34,7 @@ import ( type Target interface { String() string Endpoint() string + Stats() types.TargetStats Init() error Cancel() Send(entry interface{}) error @@ -71,6 +74,33 @@ func AuditTargets() []Target { return res } +// CurrentStats returns the current statistics. +func CurrentStats() map[string]types.TargetStats { + sys := SystemTargets() + audit := AuditTargets() + res := make(map[string]types.TargetStats, len(sys)+len(audit)) + cnt := make(map[string]int, len(sys)+len(audit)) + + // Add system and audit. + for _, t := range sys { + key := strings.ToLower(t.Type().String()) + n := cnt[key] + cnt[key]++ + key = fmt.Sprintf("sys_%s_%d", key, n) + res[key] = t.Stats() + } + + for _, t := range audit { + key := strings.ToLower(t.Type().String()) + n := cnt[key] + cnt[key]++ + key = fmt.Sprintf("audit_%s_%d", key, n) + res[key] = t.Stats() + } + + return res +} + // auditTargets is the list of enabled audit loggers // Must be immutable at all times. // Can be swapped to another while holding swapMu