fix: loading Audit kafka configuration loading (#15766)

This commit is contained in:
Anis Elleuch 2022-09-29 16:35:08 +01:00 committed by GitHub
parent d44f3526dc
commit f69a98ce49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -282,17 +282,15 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
return cfg return cfg
} }
// GetAuditKafka - returns a map of registered notification 'kafka' targets func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, error) { for k, kv := range config.Merge(scfg[config.AuditKafkaSubSys], EnvKafkaEnable, DefaultAuditKafkaKVS) {
kafkaTargets := make(map[string]kafka.Config)
for k, kv := range config.Merge(kafkaKVS, EnvKafkaEnable, DefaultAuditKafkaKVS) {
enableEnv := EnvKafkaEnable enableEnv := EnvKafkaEnable
if k != config.Default { if k != config.Default {
enableEnv = enableEnv + config.Default + k enableEnv = enableEnv + config.Default + k
} }
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil { if err != nil {
return nil, err return cfg, err
} }
if !enabled { if !enabled {
continue continue
@ -304,7 +302,7 @@ func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, err
} }
kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers)) kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers))
if len(kafkaBrokers) == 0 { if len(kafkaBrokers) == 0 {
return nil, config.Errorf("kafka 'brokers' cannot be empty") return cfg, config.Errorf("kafka 'brokers' cannot be empty")
} }
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) { for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host var host *xnet.Host
@ -315,7 +313,7 @@ func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, err
brokers = append(brokers, *host) brokers = append(brokers, *host)
} }
if err != nil { if err != nil {
return nil, err return cfg, err
} }
clientAuthEnv := EnvKafkaTLSClientAuth clientAuthEnv := EnvKafkaTLSClientAuth
@ -324,7 +322,7 @@ func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, err
} }
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth))) clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth)))
if err != nil { if err != nil {
return nil, err return cfg, err
} }
topicEnv := EnvKafkaTopic topicEnv := EnvKafkaTopic
@ -391,10 +389,10 @@ func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, err
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword)) kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism)) kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism))
kafkaTargets[k] = kafkaArgs cfg.AuditKafka[k] = kafkaArgs
} }
return kafkaTargets, nil return cfg, nil
} }
func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
@ -635,7 +633,8 @@ func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err e
return cfg, err return cfg, err
} }
case config.AuditKafkaSubSys: case config.AuditKafkaSubSys:
if _, err = GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil { cfg.AuditKafka = make(map[string]kafka.Config)
if cfg, err = lookupAuditKafkaConfig(scfg, cfg); err != nil {
return cfg, err return cfg, err
} }
} }