diff --git a/cmd/config/notify/help.go b/cmd/config/notify/help.go index 92a088943..655bca24d 100644 --- a/cmd/config/notify/help.go +++ b/cmd/config/notify/help.go @@ -213,6 +213,12 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: target.KafkaVersion, + Description: "specify the version of the Kafka cluster", + Optional: true, + Type: "string", + }, config.HelpKV{ Key: config.Comment, Description: config.DefaultComment, diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index 944c10f35..4e9ec96b5 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -384,6 +384,10 @@ var ( Key: target.KafkaQueueDir, Value: "", }, + config.KV{ + Key: target.KafkaVersion, + Value: "", + }, } ) @@ -451,12 +455,18 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs queueDirEnv = queueDirEnv + config.Default + k } + versionEnv := target.EnvKafkaVersion + if k != config.Default { + versionEnv = versionEnv + config.Default + k + } + kafkaArgs := target.KafkaArgs{ Enable: enabled, Brokers: brokers, Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), QueueLimit: queueLimit, + Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), } tlsEnableEnv := target.EnvKafkaTLS diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 8e131d154..9c0871960 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -1069,6 +1069,7 @@ client_tls_cert (path) path to client certificate for mTLS auth client_tls_key (path) path to client key for mTLS auth queue_dir (path) staging dir for undelivered messages e.g. '/home/events' queue_limit (number) maximum limit for undelivered messages, defaults to '10000' +version (string) specify the version of the Kafka cluster e.g '2.2.0' comment (sentence) optionally add a comment to this setting ``` @@ -1092,19 +1093,20 @@ MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events' MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '10000' MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting +MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' ``` To update the configuration, use `mc admin config get` command to get the current configuration. ```sh $ mc admin config get myminio/ notify_kafka -notify_kafka:1 tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" +notify_kafka:1 tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version="" ``` Use `mc admin config set` command to update the configuration for the deployment. Restart the MinIO server to put the changes into effect. The server will print a line like `SQS ARNs: arn:minio:sqs::1:kafka` at start-up if there were no errors.`bucketevents` is the topic used by kafka in this example. ```sh -$ mc admin config set myminio notify_kafka:1 tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" client_tls_cert="" client_tls_key="" brokers="localhost:9092,localhost:9093" topic="bucketevents" +$ mc admin config set myminio notify_kafka:1 tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" client_tls_cert="" client_tls_key="" brokers="localhost:9092,localhost:9093" topic="bucketevents" version="" ``` ### Step 3: Enable bucket notification using MinIO client diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 24ef7a85f..e6fb55f38 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -34,7 +34,7 @@ import ( saramatls "github.com/Shopify/sarama/tools/tls" ) -// MQTT input constants +// Kafka input constants const ( KafkaBrokers = "brokers" KafkaTopic = "topic" @@ -48,6 +48,7 @@ const ( KafkaSASLPassword = "sasl_password" KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSKey = "client_tls_key" + KafkaVersion = "version" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" @@ -62,6 +63,7 @@ const ( EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD" EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT" EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" + EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" ) // KafkaArgs - Kafka target arguments. @@ -71,6 +73,7 @@ type KafkaArgs struct { Topic string `json:"topic"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` + Version string `json:"version"` TLS struct { Enable bool `json:"enable"` RootCAs *x509.CertPool `json:"-"` @@ -107,6 +110,11 @@ func (k KafkaArgs) Validate() error { if k.QueueLimit > 10000 { return errors.New("queueLimit should not exceed 10000") } + if k.Version != "" { + if _, err := sarama.ParseKafkaVersion(k.Version); err != nil { + return err + } + } return nil } @@ -237,6 +245,14 @@ func (k KafkaArgs) pingBrokers() bool { func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) { config := sarama.NewConfig() + if args.Version != "" { + kafkaVersion, err := sarama.ParseKafkaVersion(args.Version) + if err != nil { + return nil, err + } + config.Version = kafkaVersion + } + config.Net.SASL.User = args.SASL.User config.Net.SASL.Password = args.SASL.Password config.Net.SASL.Enable = args.SASL.Enable