diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 3173e9c10..c57837593 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -24,12 +24,10 @@ import ( "encoding/json" "errors" "fmt" - "net" "net/url" "os" "path/filepath" "strings" - "sync" "time" "github.com/minio/minio/internal/event" @@ -154,15 +152,15 @@ func (k KafkaArgs) Validate() error { type KafkaTarget struct { initOnce once.Init - id event.TargetID - args KafkaArgs - producer sarama.SyncProducer - config *sarama.Config - store store.Store[event.Event] - batch *store.Batch[string, *sarama.ProducerMessage] - loggerOnce logger.LogOnce - brokerConns map[string]net.Conn - quitCh chan struct{} + id event.TargetID + args KafkaArgs + client sarama.Client + producer sarama.SyncProducer + config *sarama.Config + store store.Store[event.Event] + batch *store.Batch[string, *sarama.ProducerMessage] + loggerOnce logger.LogOnce + quitCh chan struct{} } // ID - returns target ID. @@ -189,7 +187,9 @@ func (target *KafkaTarget) IsActive() (bool, error) { } func (target *KafkaTarget) isActive() (bool, error) { - if err := target.pingBrokers(); err != nil { + // Refer https://github.com/IBM/sarama/issues/1341 + brokers := target.client.Brokers() + if len(brokers) == 0 { return false, store.ErrNotConnected } return true, nil @@ -317,56 +317,15 @@ func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.Pro // Close - closes underneath kafka connection. func (target *KafkaTarget) Close() error { close(target.quitCh) + if target.producer != nil { - return target.producer.Close() - } - for _, conn := range target.brokerConns { - if conn != nil { - conn.Close() - } + target.producer.Close() + return target.client.Close() } + return nil } -// Check if at least one broker in cluster is active -func (target *KafkaTarget) pingBrokers() (err error) { - d := net.Dialer{Timeout: 1 * time.Second} - - errs := make([]error, len(target.args.Brokers)) - var wg sync.WaitGroup - for idx, broker := range target.args.Brokers { - broker := broker - idx := idx - wg.Add(1) - go func(broker xnet.Host, idx int) { - defer wg.Done() - conn, ok := target.brokerConns[broker.String()] - if !ok || conn == nil { - conn, errs[idx] = d.Dial("tcp", broker.String()) - if errs[idx] != nil { - return - } - target.brokerConns[broker.String()] = conn - } - if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil { - conn.Close() - target.brokerConns[broker.String()] = nil - } - }(broker, idx) - } - wg.Wait() - - var retErr error - for _, err := range errs { - if err == nil { - // if one of them is active we are good. - return nil - } - retErr = err - } - return retErr -} - func (target *KafkaTarget) init() error { return target.initOnce.Do(target.initKafka) } @@ -431,13 +390,22 @@ func (target *KafkaTarget) initKafka() error { brokers = append(brokers, broker.String()) } - producer, err := sarama.NewSyncProducer(brokers, config) + client, err := sarama.NewClient(brokers, config) if err != nil { - if err != sarama.ErrOutOfBrokers { + if !errors.Is(err, sarama.ErrOutOfBrokers) { target.loggerOnce(context.Background(), err, target.ID().String()) } return err } + + producer, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + if !errors.Is(err, sarama.ErrOutOfBrokers) { + target.loggerOnce(context.Background(), err, target.ID().String()) + } + return err + } + target.client = client target.producer = producer yes, err := target.isActive() @@ -463,12 +431,11 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk } target := &KafkaTarget{ - id: event.TargetID{ID: id, Name: "kafka"}, - args: args, - store: queueStore, - loggerOnce: loggerOnce, - quitCh: make(chan struct{}), - brokerConns: make(map[string]net.Conn, len(args.Brokers)), + id: event.TargetID{ID: id, Name: "kafka"}, + args: args, + store: queueStore, + loggerOnce: loggerOnce, + quitCh: make(chan struct{}), } if target.store != nil { diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index f96976435..b6a77abf8 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "net" "os" "path/filepath" "sync" @@ -78,44 +77,6 @@ type Config struct { LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } -func (h *Target) pingBrokers() (err error) { - d := net.Dialer{Timeout: 1 * time.Second} - - errs := make([]error, len(h.kconfig.Brokers)) - var wg sync.WaitGroup - for idx, broker := range h.kconfig.Brokers { - broker := broker - idx := idx - wg.Add(1) - go func(broker xnet.Host, idx int) { - defer wg.Done() - conn, ok := h.brokerConns[broker.String()] - if !ok || conn == nil { - conn, errs[idx] = d.Dial("tcp", broker.String()) - if errs[idx] != nil { - return - } - h.brokerConns[broker.String()] = conn - } - if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil { - conn.Close() - h.brokerConns[broker.String()] = nil - } - }(broker, idx) - } - wg.Wait() - - var retErr error - for _, err := range errs { - if err == nil { - // if one of them is active we are good. - return nil - } - retErr = err - } - return retErr -} - // Target - Kafka target. type Target struct { status int32 @@ -139,10 +100,10 @@ type Target struct { initKafkaOnce once.Init initQueueStoreOnce once.Init - producer sarama.SyncProducer - kconfig Config - config *sarama.Config - brokerConns map[string]net.Conn + client sarama.Client + producer sarama.SyncProducer + kconfig Config + config *sarama.Config } func (h *Target) validate() error { @@ -273,10 +234,6 @@ func (h *Target) send(entry interface{}) error { // Init initialize kafka target func (h *Target) init() error { - if err := h.pingBrokers(); err != nil { - return err - } - sconfig := sarama.NewConfig() if h.kconfig.Version != "" { kafkaVersion, err := sarama.ParseKafkaVersion(h.kconfig.Version) @@ -325,13 +282,23 @@ func (h *Target) init() error { brokers = append(brokers, broker.String()) } - producer, err := sarama.NewSyncProducer(brokers, sconfig) + client, err := sarama.NewClient(brokers, sconfig) if err != nil { return err } + producer, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + return err + } + h.client = client h.producer = producer - atomic.StoreInt32(&h.status, statusOnline) + + if len(h.client.Brokers()) > 0 { + // Refer https://github.com/IBM/sarama/issues/1341 + atomic.StoreInt32(&h.status, statusOnline) + } + return nil } @@ -409,12 +376,7 @@ func (h *Target) Cancel() { if h.producer != nil { h.producer.Close() - } - - for _, conn := range h.brokerConns { - if conn != nil { - conn.Close() - } + h.client.Close() } // Wait for messages to be sent... @@ -425,10 +387,9 @@ func (h *Target) Cancel() { // sends log over http to the specified endpoint func New(config Config) *Target { target := &Target{ - logCh: make(chan interface{}, config.QueueSize), - kconfig: config, - status: statusOffline, - brokerConns: make(map[string]net.Conn, len(config.Brokers)), + logCh: make(chan interface{}, config.QueueSize), + kconfig: config, + status: statusOffline, } return target } diff --git a/internal/s3select/unused-errors.go b/internal/s3select/unused-errors.go index 818215e9e..d40310af6 100644 --- a/internal/s3select/unused-errors.go +++ b/internal/s3select/unused-errors.go @@ -20,11 +20,11 @@ package s3select -/////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////// // -// Validation errors. +// Validation errors. // -/////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////// func errExpressionTooLong(err error) *s3Error { return &s3Error{ code: "ExpressionTooLong", @@ -151,11 +151,11 @@ func errUnrecognizedFormatException(err error) *s3Error { } } -////////////////////////////////////////////////////////////////////////////////////// +// //////////////////////////////////////////////////////////////////////////////////// // -// SQL parsing errors. +// SQL parsing errors. // -////////////////////////////////////////////////////////////////////////////////////// +// //////////////////////////////////////////////////////////////////////////////////// func errLexerInvalidChar(err error) *s3Error { return &s3Error{ code: "LexerInvalidChar", @@ -498,11 +498,11 @@ func errParseCannotMixSqbAndWildcardInSelectList(err error) *s3Error { } } -////////////////////////////////////////////////////////////////////////////////////// +// //////////////////////////////////////////////////////////////////////////////////// // -// CAST() related errors. +// CAST() related errors. // -////////////////////////////////////////////////////////////////////////////////////// +// //////////////////////////////////////////////////////////////////////////////////// func errCastFailed(err error) *s3Error { return &s3Error{ code: "CastFailed", @@ -593,11 +593,11 @@ func errEvaluatorInvalidTimestampFormatPatternSymbol(err error) *s3Error { } } -//////////////////////////////////////////////////////////////////////// +// ////////////////////////////////////////////////////////////////////// // -// Generic S3 HTTP handler errors. +// Generic S3 HTTP handler errors. // -//////////////////////////////////////////////////////////////////////// +// ////////////////////////////////////////////////////////////////////// func errBusy(err error) *s3Error { return &s3Error{ code: "Busy",