added support for SASL/SCRAM on Kafka bucket notifications. (#9168)

fixes #9167
This commit is contained in:
Stephen N
2020-03-20 18:10:27 +00:00
committed by GitHub
parent ecf1566266
commit 1ffa983a9d
7 changed files with 121 additions and 18 deletions

View File

@@ -165,6 +165,12 @@ var (
Optional: true,
Type: "string",
},
config.HelpKV{
Key: target.KafkaSASLMechanism,
Description: "sasl authentication mechanism, default 'plain'",
Optional: true,
Type: "string",
},
config.HelpKV{
Key: target.KafkaTLSClientAuth,
Description: "clientAuth determines the Kafka server's policy for TLS client auth",

View File

@@ -352,6 +352,10 @@ var (
Key: target.KafkaSASLPassword,
Value: "",
},
config.KV{
Key: target.KafkaSASLMechanism,
Value: "plain",
},
config.KV{
Key: target.KafkaClientTLSCert,
Value: "",
@@ -507,9 +511,14 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
if k != config.Default {
saslPasswordEnv = saslPasswordEnv + config.Default + k
}
saslMechanismEnv := target.EnvKafkaSASLMechanism
if k != config.Default {
saslMechanismEnv = saslMechanismEnv + config.Default + k
}
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername))
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism))
if err = kafkaArgs.Validate(); err != nil {
return nil, err