diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 84842b216..6f7d0af59 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -28,6 +28,7 @@ import ( "net/url" "os" "path/filepath" + "sync" "time" "github.com/minio/minio/internal/event" @@ -161,7 +162,7 @@ func (target *KafkaTarget) IsActive() (bool, error) { } func (target *KafkaTarget) isActive() (bool, error) { - if !target.args.pingBrokers() { + if err := target.args.pingBrokers(); err != nil { return false, store.ErrNotConnected } return true, nil @@ -268,14 +269,32 @@ func (target *KafkaTarget) Close() error { } // Check if atleast one broker in cluster is active -func (k KafkaArgs) pingBrokers() bool { - d := net.Dialer{Timeout: 60 * time.Second} - for _, broker := range k.Brokers { - if _, err := d.Dial("tcp", broker.String()); err == nil { - return true - } +func (k KafkaArgs) pingBrokers() (err error) { + d := net.Dialer{Timeout: 1 * time.Second} + + errs := make([]error, len(k.Brokers)) + var wg sync.WaitGroup + for idx, broker := range k.Brokers { + broker := broker + idx := idx + wg.Add(1) + go func(broker xnet.Host, idx int) { + defer wg.Done() + + _, errs[idx] = d.Dial("tcp", broker.String()) + }(broker, idx) } - return false + wg.Wait() + + var retErr error + for _, err := range errs { + if err == nil { + // if one of them is active we are good. + return nil + } + retErr = err + } + return retErr } func (target *KafkaTarget) init() error { @@ -295,6 +314,7 @@ func (target *KafkaTarget) initKafka() error { config.Version = kafkaVersion } + config.Net.KeepAlive = 60 * time.Second config.Net.SASL.User = args.SASL.User config.Net.SASL.Password = args.SASL.Password initScramClient(args, config) // initializes configured scram client. diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index c8d290b96..3db24e7dd 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -103,8 +103,9 @@ type Target struct { // store to persist and replay the logs to the target // to avoid missing events when the target is down. - store store.Store[interface{}] - storeCtxCancel context.CancelFunc + store store.Store[interface{}] + storeCtxCancel context.CancelFunc + initQueueStoreOnce once.Init config Config @@ -136,12 +137,12 @@ func (h *Target) IsOnline(ctx context.Context) bool { // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { h.logChMu.RLock() - logCh := h.logCh + queueLength := len(h.logCh) h.logChMu.RUnlock() stats := types.TargetStats{ TotalMessages: atomic.LoadInt64(&h.totalMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages), - QueueLength: len(logCh), + QueueLength: queueLength, } return stats @@ -149,7 +150,7 @@ func (h *Target) Stats() types.TargetStats { // This will check if we can reach the remote. func (h *Target) checkAlive(ctx context.Context) (err error) { - return h.send(ctx, []byte(`{}`), 2*webhookCallTimeout) + return h.send(ctx, []byte(`{}`), webhookCallTimeout) } // Init validate and initialize the http target @@ -406,7 +407,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { // an error immediately to the caller atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full") + return errors.New("log buffer full, remote endpoint is not able to keep up") } return nil diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 4edb12de3..89cba52f0 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -27,14 +27,14 @@ import ( "net" "os" "path/filepath" + "reflect" "sync" "sync/atomic" "time" - "github.com/minio/pkg/logger/message/audit" - "github.com/Shopify/sarama" saramatls "github.com/Shopify/sarama/tools/tls" + "github.com/tidwall/gjson" "github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/once" @@ -75,15 +75,31 @@ type Config struct { // Check if atleast one broker in cluster is active func (k Config) pingBrokers() (err error) { - d := net.Dialer{Timeout: 60 * time.Second} + d := net.Dialer{Timeout: 1 * time.Second} - for _, broker := range k.Brokers { - _, err = d.Dial("tcp", broker.String()) - if err != nil { - return err - } + errs := make([]error, len(k.Brokers)) + var wg sync.WaitGroup + for idx, broker := range k.Brokers { + broker := broker + idx := idx + wg.Add(1) + go func(broker xnet.Host, idx int) { + defer wg.Done() + + _, errs[idx] = d.Dial("tcp", broker.String()) + }(broker, idx) } - return nil + wg.Wait() + + var retErr error + for _, err := range errs { + if err == nil { + // if one broker is online its enough + return nil + } + retErr = err + } + return retErr } // Target - Kafka target. @@ -91,16 +107,19 @@ type Target struct { totalMessages int64 failedMessages int64 - wg sync.WaitGroup - doneCh chan struct{} + wg sync.WaitGroup - // Channel of log entries - logCh chan audit.Entry + // Channel of log entries. + // Reading logCh must hold read lock on logChMu (to avoid read race) + // Sending a value on logCh must hold read lock on logChMu (to avoid closing) + logCh chan interface{} + logChMu sync.RWMutex // store to persist and replay the logs to the target // to avoid missing events when the target is down. - store store.Store[audit.Entry] - storeCtxCancel context.CancelFunc + store store.Store[interface{}] + storeCtxCancel context.CancelFunc + initKafkaOnce once.Init initQueueStoreOnce once.Init @@ -138,10 +157,14 @@ func (h *Target) String() string { // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { + h.logChMu.RLock() + queueLength := len(h.logCh) + h.logChMu.RUnlock() + return types.TargetStats{ TotalMessages: atomic.LoadInt64(&h.totalMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages), - QueueLength: len(h.logCh), + QueueLength: queueLength, } } @@ -167,9 +190,9 @@ func (h *Target) Init(ctx context.Context) error { } func (h *Target) initQueueStore(ctx context.Context) (err error) { - var queueStore store.Store[audit.Entry] + var queueStore store.Store[interface{}] queueDir := filepath.Join(h.kconfig.QueueDir, h.Name()) - queueStore = store.NewQueueStore[audit.Entry](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) + queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) if err = queueStore.Open(); err != nil { return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) } @@ -181,24 +204,28 @@ func (h *Target) initQueueStore(ctx context.Context) (err error) { } func (h *Target) startKakfaLogger() { - // Create a routine which sends json logs received - // from an internal channel. - h.wg.Add(1) - go func() { + h.logChMu.RLock() + logCh := h.logCh + if logCh != nil { + // We are not allowed to add when logCh is nil + h.wg.Add(1) defer h.wg.Done() - for { - select { - case entry := <-h.logCh: - h.logEntry(entry) - case <-h.doneCh: - return - } - } - }() + } + h.logChMu.RUnlock() + + if logCh == nil { + return + } + + // Create a routine which sends json logs received + // from an internal channel. + for entry := range logCh { + h.logEntry(entry) + } } -func (h *Target) logEntry(entry audit.Entry) { +func (h *Target) logEntry(entry interface{}) { atomic.AddInt64(&h.totalMessages, 1) if err := h.send(entry); err != nil { atomic.AddInt64(&h.failedMessages, 1) @@ -206,7 +233,7 @@ func (h *Target) logEntry(entry audit.Entry) { } } -func (h *Target) send(entry audit.Entry) error { +func (h *Target) send(entry interface{}) error { if err := h.initKafkaOnce.Do(h.init); err != nil { return err } @@ -214,9 +241,14 @@ func (h *Target) send(entry audit.Entry) error { if err != nil { return err } + requestID := gjson.GetBytes(logJSON, "requestID").Str + if requestID == "" { + // unsupported data structure + return fmt.Errorf("unsupported data structure: %s must be either audit.Entry or log.Entry", reflect.TypeOf(entry)) + } msg := sarama.ProducerMessage{ Topic: h.kconfig.Topic, - Key: sarama.StringEncoder(entry.RequestID), + Key: sarama.StringEncoder(requestID), Value: sarama.ByteEncoder(logJSON), } _, _, err = h.producer.SendMessage(&msg) @@ -238,6 +270,7 @@ func (h *Target) init() error { sconfig.Version = kafkaVersion } + sconfig.Net.KeepAlive = 60 * time.Second sconfig.Net.SASL.User = h.kconfig.SASL.User sconfig.Net.SASL.Password = h.kconfig.SASL.Password initScramClient(h.kconfig, sconfig) // initializes configured scram client. @@ -284,32 +317,32 @@ func (h *Target) IsOnline(_ context.Context) bool { // Send log message 'e' to kafka target. func (h *Target) Send(ctx context.Context, entry interface{}) error { - if auditEntry, ok := entry.(audit.Entry); ok { - if h.store != nil { - // save the entry to the queue store which will be replayed to the target. - return h.store.Put(auditEntry) - } - if err := h.initKafkaOnce.Do(h.init); err != nil { - return err - } - select { - case <-h.doneCh: - case h.logCh <- auditEntry: - default: - // log channel is full, do not wait and return - // an error immediately to the caller - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full") - } + if h.store != nil { + // save the entry to the queue store which will be replayed to the target. + return h.store.Put(entry) + } + h.logChMu.RLock() + defer h.logChMu.RUnlock() + if h.logCh == nil { + // We are closing... + return nil + } + + select { + case h.logCh <- entry: + default: + // log channel is full, do not wait and return + // an error immediately to the caller + atomic.AddInt64(&h.totalMessages, 1) + atomic.AddInt64(&h.failedMessages, 1) + return errors.New("log buffer full") } return nil } // SendFromStore - reads the log from store and sends it to kafka. func (h *Target) SendFromStore(key string) (err error) { - var auditEntry audit.Entry - auditEntry, err = h.store.Get(key) + auditEntry, err := h.store.Get(key) if err != nil { if os.IsNotExist(err) { return nil @@ -328,16 +361,26 @@ func (h *Target) SendFromStore(key string) (err error) { // Cancel - cancels the target func (h *Target) Cancel() { - close(h.doneCh) - close(h.logCh) // If queuestore is configured, cancel it's context to // stop the replay go-routine. if h.store != nil { h.storeCtxCancel() } + + // Set logch to nil and close it. + // This will block all Send operations, + // and finish the existing ones. + // All future ones will be discarded. + h.logChMu.Lock() + close(h.logCh) + h.logCh = nil + h.logChMu.Unlock() + if h.producer != nil { h.producer.Close() } + + // Wait for messages to be sent... h.wg.Wait() } @@ -345,8 +388,7 @@ func (h *Target) Cancel() { // sends log over http to the specified endpoint func New(config Config) *Target { target := &Target{ - logCh: make(chan audit.Entry, 10000), - doneCh: make(chan struct{}), + logCh: make(chan interface{}, config.QueueSize), kconfig: config, } return target diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index d4a2968ac..60e8a09fd 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack //