simplify broker healthcheck by following kafka guidelines (#19082)

fixes #19081
This commit is contained in:
Harshavardhana 2024-02-20 00:16:35 -08:00 committed by GitHub
parent e06168596f
commit cd419a35fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 136 deletions

View File

@ -24,12 +24,10 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
@ -154,15 +152,15 @@ func (k KafkaArgs) Validate() error {
type KafkaTarget struct { type KafkaTarget struct {
initOnce once.Init initOnce once.Init
id event.TargetID id event.TargetID
args KafkaArgs args KafkaArgs
producer sarama.SyncProducer client sarama.Client
config *sarama.Config producer sarama.SyncProducer
store store.Store[event.Event] config *sarama.Config
batch *store.Batch[string, *sarama.ProducerMessage] store store.Store[event.Event]
loggerOnce logger.LogOnce batch *store.Batch[string, *sarama.ProducerMessage]
brokerConns map[string]net.Conn loggerOnce logger.LogOnce
quitCh chan struct{} quitCh chan struct{}
} }
// ID - returns target ID. // ID - returns target ID.
@ -189,7 +187,9 @@ func (target *KafkaTarget) IsActive() (bool, error) {
} }
func (target *KafkaTarget) isActive() (bool, error) { func (target *KafkaTarget) isActive() (bool, error) {
if err := target.pingBrokers(); err != nil { // Refer https://github.com/IBM/sarama/issues/1341
brokers := target.client.Brokers()
if len(brokers) == 0 {
return false, store.ErrNotConnected return false, store.ErrNotConnected
} }
return true, nil return true, nil
@ -317,56 +317,15 @@ func (target *KafkaTarget) toProducerMessage(eventData event.Event) (*sarama.Pro
// Close - closes underneath kafka connection. // Close - closes underneath kafka connection.
func (target *KafkaTarget) Close() error { func (target *KafkaTarget) Close() error {
close(target.quitCh) close(target.quitCh)
if target.producer != nil { if target.producer != nil {
return target.producer.Close() target.producer.Close()
} return target.client.Close()
for _, conn := range target.brokerConns {
if conn != nil {
conn.Close()
}
} }
return nil return nil
} }
// Check if at least one broker in cluster is active
func (target *KafkaTarget) pingBrokers() (err error) {
d := net.Dialer{Timeout: 1 * time.Second}
errs := make([]error, len(target.args.Brokers))
var wg sync.WaitGroup
for idx, broker := range target.args.Brokers {
broker := broker
idx := idx
wg.Add(1)
go func(broker xnet.Host, idx int) {
defer wg.Done()
conn, ok := target.brokerConns[broker.String()]
if !ok || conn == nil {
conn, errs[idx] = d.Dial("tcp", broker.String())
if errs[idx] != nil {
return
}
target.brokerConns[broker.String()] = conn
}
if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil {
conn.Close()
target.brokerConns[broker.String()] = nil
}
}(broker, idx)
}
wg.Wait()
var retErr error
for _, err := range errs {
if err == nil {
// if one of them is active we are good.
return nil
}
retErr = err
}
return retErr
}
func (target *KafkaTarget) init() error { func (target *KafkaTarget) init() error {
return target.initOnce.Do(target.initKafka) return target.initOnce.Do(target.initKafka)
} }
@ -431,13 +390,22 @@ func (target *KafkaTarget) initKafka() error {
brokers = append(brokers, broker.String()) brokers = append(brokers, broker.String())
} }
producer, err := sarama.NewSyncProducer(brokers, config) client, err := sarama.NewClient(brokers, config)
if err != nil { if err != nil {
if err != sarama.ErrOutOfBrokers { if !errors.Is(err, sarama.ErrOutOfBrokers) {
target.loggerOnce(context.Background(), err, target.ID().String()) target.loggerOnce(context.Background(), err, target.ID().String())
} }
return err return err
} }
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
if !errors.Is(err, sarama.ErrOutOfBrokers) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
return err
}
target.client = client
target.producer = producer target.producer = producer
yes, err := target.isActive() yes, err := target.isActive()
@ -463,12 +431,11 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
} }
target := &KafkaTarget{ target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"}, id: event.TargetID{ID: id, Name: "kafka"},
args: args, args: args,
store: queueStore, store: queueStore,
loggerOnce: loggerOnce, loggerOnce: loggerOnce,
quitCh: make(chan struct{}), quitCh: make(chan struct{}),
brokerConns: make(map[string]net.Conn, len(args.Brokers)),
} }
if target.store != nil { if target.store != nil {

View File

@ -24,7 +24,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -78,44 +77,6 @@ type Config struct {
LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
} }
func (h *Target) pingBrokers() (err error) {
d := net.Dialer{Timeout: 1 * time.Second}
errs := make([]error, len(h.kconfig.Brokers))
var wg sync.WaitGroup
for idx, broker := range h.kconfig.Brokers {
broker := broker
idx := idx
wg.Add(1)
go func(broker xnet.Host, idx int) {
defer wg.Done()
conn, ok := h.brokerConns[broker.String()]
if !ok || conn == nil {
conn, errs[idx] = d.Dial("tcp", broker.String())
if errs[idx] != nil {
return
}
h.brokerConns[broker.String()] = conn
}
if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil {
conn.Close()
h.brokerConns[broker.String()] = nil
}
}(broker, idx)
}
wg.Wait()
var retErr error
for _, err := range errs {
if err == nil {
// if one of them is active we are good.
return nil
}
retErr = err
}
return retErr
}
// Target - Kafka target. // Target - Kafka target.
type Target struct { type Target struct {
status int32 status int32
@ -139,10 +100,10 @@ type Target struct {
initKafkaOnce once.Init initKafkaOnce once.Init
initQueueStoreOnce once.Init initQueueStoreOnce once.Init
producer sarama.SyncProducer client sarama.Client
kconfig Config producer sarama.SyncProducer
config *sarama.Config kconfig Config
brokerConns map[string]net.Conn config *sarama.Config
} }
func (h *Target) validate() error { func (h *Target) validate() error {
@ -273,10 +234,6 @@ func (h *Target) send(entry interface{}) error {
// Init initialize kafka target // Init initialize kafka target
func (h *Target) init() error { func (h *Target) init() error {
if err := h.pingBrokers(); err != nil {
return err
}
sconfig := sarama.NewConfig() sconfig := sarama.NewConfig()
if h.kconfig.Version != "" { if h.kconfig.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(h.kconfig.Version) kafkaVersion, err := sarama.ParseKafkaVersion(h.kconfig.Version)
@ -325,13 +282,23 @@ func (h *Target) init() error {
brokers = append(brokers, broker.String()) brokers = append(brokers, broker.String())
} }
producer, err := sarama.NewSyncProducer(brokers, sconfig) client, err := sarama.NewClient(brokers, sconfig)
if err != nil { if err != nil {
return err return err
} }
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
h.client = client
h.producer = producer h.producer = producer
atomic.StoreInt32(&h.status, statusOnline)
if len(h.client.Brokers()) > 0 {
// Refer https://github.com/IBM/sarama/issues/1341
atomic.StoreInt32(&h.status, statusOnline)
}
return nil return nil
} }
@ -409,12 +376,7 @@ func (h *Target) Cancel() {
if h.producer != nil { if h.producer != nil {
h.producer.Close() h.producer.Close()
} h.client.Close()
for _, conn := range h.brokerConns {
if conn != nil {
conn.Close()
}
} }
// Wait for messages to be sent... // Wait for messages to be sent...
@ -425,10 +387,9 @@ func (h *Target) Cancel() {
// sends log over http to the specified endpoint // sends log over http to the specified endpoint
func New(config Config) *Target { func New(config Config) *Target {
target := &Target{ target := &Target{
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
kconfig: config, kconfig: config,
status: statusOffline, status: statusOffline,
brokerConns: make(map[string]net.Conn, len(config.Brokers)),
} }
return target return target
} }

View File

@ -20,11 +20,11 @@
package s3select package s3select
/////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////
// //
// Validation errors. // Validation errors.
// //
/////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////
func errExpressionTooLong(err error) *s3Error { func errExpressionTooLong(err error) *s3Error {
return &s3Error{ return &s3Error{
code: "ExpressionTooLong", code: "ExpressionTooLong",
@ -151,11 +151,11 @@ func errUnrecognizedFormatException(err error) *s3Error {
} }
} }
////////////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////////////
// //
// SQL parsing errors. // SQL parsing errors.
// //
////////////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////////////
func errLexerInvalidChar(err error) *s3Error { func errLexerInvalidChar(err error) *s3Error {
return &s3Error{ return &s3Error{
code: "LexerInvalidChar", code: "LexerInvalidChar",
@ -498,11 +498,11 @@ func errParseCannotMixSqbAndWildcardInSelectList(err error) *s3Error {
} }
} }
////////////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////////////
// //
// CAST() related errors. // CAST() related errors.
// //
////////////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////////////
func errCastFailed(err error) *s3Error { func errCastFailed(err error) *s3Error {
return &s3Error{ return &s3Error{
code: "CastFailed", code: "CastFailed",
@ -593,11 +593,11 @@ func errEvaluatorInvalidTimestampFormatPatternSymbol(err error) *s3Error {
} }
} }
//////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////
// //
// Generic S3 HTTP handler errors. // Generic S3 HTTP handler errors.
// //
//////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////
func errBusy(err error) *s3Error { func errBusy(err error) *s3Error {
return &s3Error{ return &s3Error{
code: "Busy", code: "Busy",