fix: kafka broker pings must not be greater than 1sec (#17376)

This commit is contained in:
Harshavardhana 2023-06-07 11:47:00 -07:00 committed by GitHub
parent 49ce85ee3d
commit dbd4c2425e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 73 deletions

View File

@ -28,6 +28,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
@ -161,7 +162,7 @@ func (target *KafkaTarget) IsActive() (bool, error) {
} }
func (target *KafkaTarget) isActive() (bool, error) { func (target *KafkaTarget) isActive() (bool, error) {
if !target.args.pingBrokers() { if err := target.args.pingBrokers(); err != nil {
return false, store.ErrNotConnected return false, store.ErrNotConnected
} }
return true, nil return true, nil
@ -268,14 +269,32 @@ func (target *KafkaTarget) Close() error {
} }
// Check if atleast one broker in cluster is active // Check if atleast one broker in cluster is active
func (k KafkaArgs) pingBrokers() bool { func (k KafkaArgs) pingBrokers() (err error) {
d := net.Dialer{Timeout: 60 * time.Second} d := net.Dialer{Timeout: 1 * time.Second}
for _, broker := range k.Brokers {
if _, err := d.Dial("tcp", broker.String()); err == nil { errs := make([]error, len(k.Brokers))
return true var wg sync.WaitGroup
for idx, broker := range k.Brokers {
broker := broker
idx := idx
wg.Add(1)
go func(broker xnet.Host, idx int) {
defer wg.Done()
_, errs[idx] = d.Dial("tcp", broker.String())
}(broker, idx)
} }
wg.Wait()
var retErr error
for _, err := range errs {
if err == nil {
// if one of them is active we are good.
return nil
} }
return false retErr = err
}
return retErr
} }
func (target *KafkaTarget) init() error { func (target *KafkaTarget) init() error {
@ -295,6 +314,7 @@ func (target *KafkaTarget) initKafka() error {
config.Version = kafkaVersion config.Version = kafkaVersion
} }
config.Net.KeepAlive = 60 * time.Second
config.Net.SASL.User = args.SASL.User config.Net.SASL.User = args.SASL.User
config.Net.SASL.Password = args.SASL.Password config.Net.SASL.Password = args.SASL.Password
initScramClient(args, config) // initializes configured scram client. initScramClient(args, config) // initializes configured scram client.

View File

@ -105,6 +105,7 @@ type Target struct {
// to avoid missing events when the target is down. // to avoid missing events when the target is down.
store store.Store[interface{}] store store.Store[interface{}]
storeCtxCancel context.CancelFunc storeCtxCancel context.CancelFunc
initQueueStoreOnce once.Init initQueueStoreOnce once.Init
config Config config Config
@ -136,12 +137,12 @@ func (h *Target) IsOnline(ctx context.Context) bool {
// Stats returns the target statistics. // Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats { func (h *Target) Stats() types.TargetStats {
h.logChMu.RLock() h.logChMu.RLock()
logCh := h.logCh queueLength := len(h.logCh)
h.logChMu.RUnlock() h.logChMu.RUnlock()
stats := types.TargetStats{ stats := types.TargetStats{
TotalMessages: atomic.LoadInt64(&h.totalMessages), TotalMessages: atomic.LoadInt64(&h.totalMessages),
FailedMessages: atomic.LoadInt64(&h.failedMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages),
QueueLength: len(logCh), QueueLength: queueLength,
} }
return stats return stats
@ -149,7 +150,7 @@ func (h *Target) Stats() types.TargetStats {
// This will check if we can reach the remote. // This will check if we can reach the remote.
func (h *Target) checkAlive(ctx context.Context) (err error) { func (h *Target) checkAlive(ctx context.Context) (err error) {
return h.send(ctx, []byte(`{}`), 2*webhookCallTimeout) return h.send(ctx, []byte(`{}`), webhookCallTimeout)
} }
// Init validate and initialize the http target // Init validate and initialize the http target
@ -406,7 +407,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
// an error immediately to the caller // an error immediately to the caller
atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.totalMessages, 1)
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
return errors.New("log buffer full") return errors.New("log buffer full, remote endpoint is not able to keep up")
} }
return nil return nil

View File

@ -27,14 +27,14 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/minio/pkg/logger/message/audit"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
saramatls "github.com/Shopify/sarama/tools/tls" saramatls "github.com/Shopify/sarama/tools/tls"
"github.com/tidwall/gjson"
"github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once" "github.com/minio/minio/internal/once"
@ -75,16 +75,32 @@ type Config struct {
// Check if atleast one broker in cluster is active // Check if atleast one broker in cluster is active
func (k Config) pingBrokers() (err error) { func (k Config) pingBrokers() (err error) {
d := net.Dialer{Timeout: 60 * time.Second} d := net.Dialer{Timeout: 1 * time.Second}
for _, broker := range k.Brokers { errs := make([]error, len(k.Brokers))
_, err = d.Dial("tcp", broker.String()) var wg sync.WaitGroup
if err != nil { for idx, broker := range k.Brokers {
return err broker := broker
} idx := idx
wg.Add(1)
go func(broker xnet.Host, idx int) {
defer wg.Done()
_, errs[idx] = d.Dial("tcp", broker.String())
}(broker, idx)
} }
wg.Wait()
var retErr error
for _, err := range errs {
if err == nil {
// if one broker is online its enough
return nil return nil
} }
retErr = err
}
return retErr
}
// Target - Kafka target. // Target - Kafka target.
type Target struct { type Target struct {
@ -92,15 +108,18 @@ type Target struct {
failedMessages int64 failedMessages int64
wg sync.WaitGroup wg sync.WaitGroup
doneCh chan struct{}
// Channel of log entries // Channel of log entries.
logCh chan audit.Entry // Reading logCh must hold read lock on logChMu (to avoid read race)
// Sending a value on logCh must hold read lock on logChMu (to avoid closing)
logCh chan interface{}
logChMu sync.RWMutex
// store to persist and replay the logs to the target // store to persist and replay the logs to the target
// to avoid missing events when the target is down. // to avoid missing events when the target is down.
store store.Store[audit.Entry] store store.Store[interface{}]
storeCtxCancel context.CancelFunc storeCtxCancel context.CancelFunc
initKafkaOnce once.Init initKafkaOnce once.Init
initQueueStoreOnce once.Init initQueueStoreOnce once.Init
@ -138,10 +157,14 @@ func (h *Target) String() string {
// Stats returns the target statistics. // Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats { func (h *Target) Stats() types.TargetStats {
h.logChMu.RLock()
queueLength := len(h.logCh)
h.logChMu.RUnlock()
return types.TargetStats{ return types.TargetStats{
TotalMessages: atomic.LoadInt64(&h.totalMessages), TotalMessages: atomic.LoadInt64(&h.totalMessages),
FailedMessages: atomic.LoadInt64(&h.failedMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages),
QueueLength: len(h.logCh), QueueLength: queueLength,
} }
} }
@ -167,9 +190,9 @@ func (h *Target) Init(ctx context.Context) error {
} }
func (h *Target) initQueueStore(ctx context.Context) (err error) { func (h *Target) initQueueStore(ctx context.Context) (err error) {
var queueStore store.Store[audit.Entry] var queueStore store.Store[interface{}]
queueDir := filepath.Join(h.kconfig.QueueDir, h.Name()) queueDir := filepath.Join(h.kconfig.QueueDir, h.Name())
queueStore = store.NewQueueStore[audit.Entry](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension)
if err = queueStore.Open(); err != nil { if err = queueStore.Open(); err != nil {
return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
} }
@ -181,24 +204,28 @@ func (h *Target) initQueueStore(ctx context.Context) (err error) {
} }
func (h *Target) startKakfaLogger() { func (h *Target) startKakfaLogger() {
// Create a routine which sends json logs received h.logChMu.RLock()
// from an internal channel. logCh := h.logCh
if logCh != nil {
// We are not allowed to add when logCh is nil
h.wg.Add(1) h.wg.Add(1)
go func() {
defer h.wg.Done() defer h.wg.Done()
for { }
select { h.logChMu.RUnlock()
case entry := <-h.logCh:
h.logEntry(entry) if logCh == nil {
case <-h.doneCh:
return return
} }
// Create a routine which sends json logs received
// from an internal channel.
for entry := range logCh {
h.logEntry(entry)
} }
}()
} }
func (h *Target) logEntry(entry audit.Entry) { func (h *Target) logEntry(entry interface{}) {
atomic.AddInt64(&h.totalMessages, 1) atomic.AddInt64(&h.totalMessages, 1)
if err := h.send(entry); err != nil { if err := h.send(entry); err != nil {
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
@ -206,7 +233,7 @@ func (h *Target) logEntry(entry audit.Entry) {
} }
} }
func (h *Target) send(entry audit.Entry) error { func (h *Target) send(entry interface{}) error {
if err := h.initKafkaOnce.Do(h.init); err != nil { if err := h.initKafkaOnce.Do(h.init); err != nil {
return err return err
} }
@ -214,9 +241,14 @@ func (h *Target) send(entry audit.Entry) error {
if err != nil { if err != nil {
return err return err
} }
requestID := gjson.GetBytes(logJSON, "requestID").Str
if requestID == "" {
// unsupported data structure
return fmt.Errorf("unsupported data structure: %s must be either audit.Entry or log.Entry", reflect.TypeOf(entry))
}
msg := sarama.ProducerMessage{ msg := sarama.ProducerMessage{
Topic: h.kconfig.Topic, Topic: h.kconfig.Topic,
Key: sarama.StringEncoder(entry.RequestID), Key: sarama.StringEncoder(requestID),
Value: sarama.ByteEncoder(logJSON), Value: sarama.ByteEncoder(logJSON),
} }
_, _, err = h.producer.SendMessage(&msg) _, _, err = h.producer.SendMessage(&msg)
@ -238,6 +270,7 @@ func (h *Target) init() error {
sconfig.Version = kafkaVersion sconfig.Version = kafkaVersion
} }
sconfig.Net.KeepAlive = 60 * time.Second
sconfig.Net.SASL.User = h.kconfig.SASL.User sconfig.Net.SASL.User = h.kconfig.SASL.User
sconfig.Net.SASL.Password = h.kconfig.SASL.Password sconfig.Net.SASL.Password = h.kconfig.SASL.Password
initScramClient(h.kconfig, sconfig) // initializes configured scram client. initScramClient(h.kconfig, sconfig) // initializes configured scram client.
@ -284,17 +317,19 @@ func (h *Target) IsOnline(_ context.Context) bool {
// Send log message 'e' to kafka target. // Send log message 'e' to kafka target.
func (h *Target) Send(ctx context.Context, entry interface{}) error { func (h *Target) Send(ctx context.Context, entry interface{}) error {
if auditEntry, ok := entry.(audit.Entry); ok {
if h.store != nil { if h.store != nil {
// save the entry to the queue store which will be replayed to the target. // save the entry to the queue store which will be replayed to the target.
return h.store.Put(auditEntry) return h.store.Put(entry)
} }
if err := h.initKafkaOnce.Do(h.init); err != nil { h.logChMu.RLock()
return err defer h.logChMu.RUnlock()
if h.logCh == nil {
// We are closing...
return nil
} }
select { select {
case <-h.doneCh: case h.logCh <- entry:
case h.logCh <- auditEntry:
default: default:
// log channel is full, do not wait and return // log channel is full, do not wait and return
// an error immediately to the caller // an error immediately to the caller
@ -302,14 +337,12 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
return errors.New("log buffer full") return errors.New("log buffer full")
} }
}
return nil return nil
} }
// SendFromStore - reads the log from store and sends it to kafka. // SendFromStore - reads the log from store and sends it to kafka.
func (h *Target) SendFromStore(key string) (err error) { func (h *Target) SendFromStore(key string) (err error) {
var auditEntry audit.Entry auditEntry, err := h.store.Get(key)
auditEntry, err = h.store.Get(key)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil return nil
@ -328,16 +361,26 @@ func (h *Target) SendFromStore(key string) (err error) {
// Cancel - cancels the target // Cancel - cancels the target
func (h *Target) Cancel() { func (h *Target) Cancel() {
close(h.doneCh)
close(h.logCh)
// If queuestore is configured, cancel it's context to // If queuestore is configured, cancel it's context to
// stop the replay go-routine. // stop the replay go-routine.
if h.store != nil { if h.store != nil {
h.storeCtxCancel() h.storeCtxCancel()
} }
// Set logch to nil and close it.
// This will block all Send operations,
// and finish the existing ones.
// All future ones will be discarded.
h.logChMu.Lock()
close(h.logCh)
h.logCh = nil
h.logChMu.Unlock()
if h.producer != nil { if h.producer != nil {
h.producer.Close() h.producer.Close()
} }
// Wait for messages to be sent...
h.wg.Wait() h.wg.Wait()
} }
@ -345,8 +388,7 @@ func (h *Target) Cancel() {
// sends log over http to the specified endpoint // sends log over http to the specified endpoint
func New(config Config) *Target { func New(config Config) *Target {
target := &Target{ target := &Target{
logCh: make(chan audit.Entry, 10000), logCh: make(chan interface{}, config.QueueSize),
doneCh: make(chan struct{}),
kconfig: config, kconfig: config,
} }
return target return target

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2023 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //