Support TLS auth for Kafka notification target (#8609)

This commit is contained in:
Aleksandr Petruhin
2019-12-06 02:31:46 +03:00
committed by Harshavardhana
parent d8e3de0cae
commit d2dc964cb5
7 changed files with 84 additions and 13 deletions

View File

@@ -30,7 +30,8 @@ import (
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
sarama "gopkg.in/Shopify/sarama.v1"
sarama "github.com/Shopify/sarama"
saramatls "github.com/Shopify/sarama/tools/tls"
)
// MQTT input constants
@@ -45,6 +46,8 @@ const (
KafkaSASL = "sasl"
KafkaSASLUsername = "sasl_username"
KafkaSASLPassword = "sasl_password"
KafkaClientTLSCert = "client_tls_cert"
KafkaClientTLSKey = "client_tls_key"
EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS"
@@ -57,6 +60,8 @@ const (
EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL"
EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME"
EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD"
EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT"
EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
)
// KafkaArgs - Kafka target arguments.
@@ -67,10 +72,12 @@ type KafkaArgs struct {
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
TLS struct {
Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"`
SkipVerify bool `json:"skipVerify"`
ClientAuth tls.ClientAuthType `json:"clientAuth"`
Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"`
SkipVerify bool `json:"skipVerify"`
ClientAuth tls.ClientAuthType `json:"clientAuth"`
ClientTLSCert string `json:"clientTLSCert"`
ClientTLSKey string `json:"clientTLSKey"`
} `json:"tls"`
SASL struct {
Enable bool `json:"enable"`
@@ -225,13 +232,17 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
config.Net.SASL.Password = args.SASL.Password
config.Net.SASL.Enable = args.SASL.Enable
config.Net.TLS.Enable = args.TLS.Enable
tlsConfig := &tls.Config{
ClientAuth: args.TLS.ClientAuth,
InsecureSkipVerify: args.TLS.SkipVerify,
RootCAs: args.TLS.RootCAs,
tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey)
if err != nil {
return nil, err
}
config.Net.TLS.Enable = args.TLS.Enable
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = args.TLS.SkipVerify
config.Net.TLS.Config.ClientAuth = args.TLS.ClientAuth
config.Net.TLS.Config.RootCAs = args.TLS.RootCAs
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10