Support for Kafka version in the config (#9001)

Add a field for the Kafka version in the config. The user can explicitly 
set the version of the Kafka cluster.

Fixes #8768
This commit is contained in:
Praveen raj Mani
2020-02-17 07:56:34 +05:30
committed by GitHub
parent 02acff7fac
commit 1b427ddb69
4 changed files with 37 additions and 3 deletions

View File

@@ -34,7 +34,7 @@ import (
saramatls "github.com/Shopify/sarama/tools/tls"
)
// MQTT input constants
// Kafka input constants
const (
KafkaBrokers = "brokers"
KafkaTopic = "topic"
@@ -48,6 +48,7 @@ const (
KafkaSASLPassword = "sasl_password"
KafkaClientTLSCert = "client_tls_cert"
KafkaClientTLSKey = "client_tls_key"
KafkaVersion = "version"
EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS"
@@ -62,6 +63,7 @@ const (
EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD"
EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT"
EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION"
)
// KafkaArgs - Kafka target arguments.
@@ -71,6 +73,7 @@ type KafkaArgs struct {
Topic string `json:"topic"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Version string `json:"version"`
TLS struct {
Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"`
@@ -107,6 +110,11 @@ func (k KafkaArgs) Validate() error {
if k.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
if k.Version != "" {
if _, err := sarama.ParseKafkaVersion(k.Version); err != nil {
return err
}
}
return nil
}
@@ -237,6 +245,14 @@ func (k KafkaArgs) pingBrokers() bool {
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) {
config := sarama.NewConfig()
if args.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(args.Version)
if err != nil {
return nil, err
}
config.Version = kafkaVersion
}
config.Net.SASL.User = args.SASL.User
config.Net.SASL.Password = args.SASL.Password
config.Net.SASL.Enable = args.SASL.Enable