fix: Re-use TCP connections for Kafka dials (#18860)

Fixes #18857
This commit is contained in:
Praveen raj Mani 2024-01-25 02:40:52 +05:30 committed by GitHub
parent b6e9d235fe
commit c905d3fe21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 68 additions and 48 deletions

View File

@ -154,14 +154,15 @@ func (k KafkaArgs) Validate() error {
type KafkaTarget struct { type KafkaTarget struct {
initOnce once.Init initOnce once.Init
id event.TargetID id event.TargetID
args KafkaArgs args KafkaArgs
producer sarama.SyncProducer producer sarama.SyncProducer
config *sarama.Config config *sarama.Config
store store.Store[event.Event] store store.Store[event.Event]
batch *store.Batch[string, *sarama.ProducerMessage] batch *store.Batch[string, *sarama.ProducerMessage]
loggerOnce logger.LogOnce loggerOnce logger.LogOnce
quitCh chan struct{} brokerConns map[string]net.Conn
quitCh chan struct{}
} }
// ID - returns target ID. // ID - returns target ID.
@ -188,7 +189,7 @@ func (target *KafkaTarget) IsActive() (bool, error) {
} }
func (target *KafkaTarget) isActive() (bool, error) { func (target *KafkaTarget) isActive() (bool, error) {
if err := target.args.pingBrokers(); err != nil { if err := target.pingBrokers(); err != nil {
return false, store.ErrNotConnected return false, store.ErrNotConnected
} }
return true, nil return true, nil
@ -202,10 +203,6 @@ func (target *KafkaTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil { if err := target.init(); err != nil {
return err return err
} }
_, err := target.isActive()
if err != nil {
return err
}
return target.send(eventData) return target.send(eventData)
} }
@ -234,12 +231,6 @@ func (target *KafkaTarget) SendFromStore(key store.Key) error {
return target.addToBatch(key) return target.addToBatch(key)
} }
var err error
_, err = target.isActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(key.Name) eventData, eErr := target.store.Get(key.Name)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
@ -250,8 +241,7 @@ func (target *KafkaTarget) SendFromStore(key store.Key) error {
return eErr return eErr
} }
err = target.send(eventData) if err := target.send(eventData); err != nil {
if err != nil {
if isKafkaConnErr(err) { if isKafkaConnErr(err) {
return store.ErrNotConnected return store.ErrNotConnected
} }
@ -292,9 +282,6 @@ func (target *KafkaTarget) addToBatch(key store.Key) error {
} }
func (target *KafkaTarget) commitBatch() error { func (target *KafkaTarget) commitBatch() error {
if _, err := target.isActive(); err != nil {
return err
}
keys, msgs, err := target.batch.GetAll() keys, msgs, err := target.batch.GetAll()
if err != nil { if err != nil {
return err return err
@ -333,23 +320,38 @@ func (target *KafkaTarget) Close() error {
if target.producer != nil { if target.producer != nil {
return target.producer.Close() return target.producer.Close()
} }
for _, conn := range target.brokerConns {
if conn != nil {
conn.Close()
}
}
return nil return nil
} }
// Check if at least one broker in cluster is active // Check if at least one broker in cluster is active
func (k KafkaArgs) pingBrokers() (err error) { func (target *KafkaTarget) pingBrokers() (err error) {
d := net.Dialer{Timeout: 1 * time.Second} d := net.Dialer{Timeout: 1 * time.Second}
errs := make([]error, len(k.Brokers)) errs := make([]error, len(target.args.Brokers))
var wg sync.WaitGroup var wg sync.WaitGroup
for idx, broker := range k.Brokers { for idx, broker := range target.args.Brokers {
broker := broker broker := broker
idx := idx idx := idx
wg.Add(1) wg.Add(1)
go func(broker xnet.Host, idx int) { go func(broker xnet.Host, idx int) {
defer wg.Done() defer wg.Done()
conn, ok := target.brokerConns[broker.String()]
_, errs[idx] = d.Dial("tcp", broker.String()) if !ok || conn == nil {
conn, errs[idx] = d.Dial("tcp", broker.String())
if errs[idx] != nil {
return
}
target.brokerConns[broker.String()] = conn
}
if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil {
conn.Close()
target.brokerConns[broker.String()] = nil
}
}(broker, idx) }(broker, idx)
} }
wg.Wait() wg.Wait()
@ -461,11 +463,12 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk
} }
target := &KafkaTarget{ target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"}, id: event.TargetID{ID: id, Name: "kafka"},
args: args, args: args,
store: queueStore, store: queueStore,
loggerOnce: loggerOnce, loggerOnce: loggerOnce,
quitCh: make(chan struct{}), quitCh: make(chan struct{}),
brokerConns: make(map[string]net.Conn, len(args.Brokers)),
} }
if target.store != nil { if target.store != nil {

View File

@ -77,20 +77,29 @@ type Config struct {
LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
} }
// Check if at least one broker in cluster is active func (h *Target) pingBrokers() (err error) {
func (k Config) pingBrokers() (err error) {
d := net.Dialer{Timeout: 1 * time.Second} d := net.Dialer{Timeout: 1 * time.Second}
errs := make([]error, len(k.Brokers)) errs := make([]error, len(h.kconfig.Brokers))
var wg sync.WaitGroup var wg sync.WaitGroup
for idx, broker := range k.Brokers { for idx, broker := range h.kconfig.Brokers {
broker := broker broker := broker
idx := idx idx := idx
wg.Add(1) wg.Add(1)
go func(broker xnet.Host, idx int) { go func(broker xnet.Host, idx int) {
defer wg.Done() defer wg.Done()
conn, ok := h.brokerConns[broker.String()]
_, errs[idx] = d.Dial("tcp", broker.String()) if !ok || conn == nil {
conn, errs[idx] = d.Dial("tcp", broker.String())
if errs[idx] != nil {
return
}
h.brokerConns[broker.String()] = conn
}
if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil {
conn.Close()
h.brokerConns[broker.String()] = nil
}
}(broker, idx) }(broker, idx)
} }
wg.Wait() wg.Wait()
@ -98,7 +107,7 @@ func (k Config) pingBrokers() (err error) {
var retErr error var retErr error
for _, err := range errs { for _, err := range errs {
if err == nil { if err == nil {
// if one broker is online its enough // if one of them is active we are good.
return nil return nil
} }
retErr = err retErr = err
@ -129,9 +138,10 @@ type Target struct {
initKafkaOnce once.Init initKafkaOnce once.Init
initQueueStoreOnce once.Init initQueueStoreOnce once.Init
producer sarama.SyncProducer producer sarama.SyncProducer
kconfig Config kconfig Config
config *sarama.Config config *sarama.Config
brokerConns map[string]net.Conn
} }
func (h *Target) validate() error { func (h *Target) validate() error {
@ -262,7 +272,7 @@ func (h *Target) send(entry interface{}) error {
// Init initialize kafka target // Init initialize kafka target
func (h *Target) init() error { func (h *Target) init() error {
if err := h.kconfig.pingBrokers(); err != nil { if err := h.pingBrokers(); err != nil {
return err return err
} }
@ -400,6 +410,12 @@ func (h *Target) Cancel() {
h.producer.Close() h.producer.Close()
} }
for _, conn := range h.brokerConns {
if conn != nil {
conn.Close()
}
}
// Wait for messages to be sent... // Wait for messages to be sent...
h.wg.Wait() h.wg.Wait()
} }
@ -408,9 +424,10 @@ func (h *Target) Cancel() {
// sends log over http to the specified endpoint // sends log over http to the specified endpoint
func New(config Config) *Target { func New(config Config) *Target {
target := &Target{ target := &Target{
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
kconfig: config, kconfig: config,
status: statusOffline, status: statusOffline,
brokerConns: make(map[string]net.Conn, len(config.Brokers)),
} }
return target return target
} }