Extend further validation of config values (#8469)

- This PR allows config KVS to be validated properly
  without being affected by ENV overrides, rejects
  invalid values during set operation

- Expands unit tests and refactors the error handling
  for notification targets, returns error instead of
  ignoring targets for invalid KVS

- Does all the prep-work for implementing safe-mode
  style operation for MinIO server, introduces a new
  global variable to toggle safe mode based operations
  NOTE: this PR itself doesn't provide safe mode operations
This commit is contained in:
Harshavardhana
2019-10-30 23:39:09 -07:00
committed by kannappanr
parent 599aae5ba6
commit 9e7a3e6adc
53 changed files with 723 additions and 396 deletions

View File

@@ -17,7 +17,6 @@
package notify
import (
"context"
"crypto/tls"
"crypto/x509"
"strconv"
@@ -61,7 +60,57 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
return nil, err
}
for id, args := range GetNotifyAMQP(cfg) {
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
if err != nil {
return nil, err
}
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys])
if err != nil {
return nil, err
}
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
if err != nil {
return nil, err
}
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], rootCAs)
if err != nil {
return nil, err
}
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
if err != nil {
return nil, err
}
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys])
if err != nil {
return nil, err
}
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
if err != nil {
return nil, err
}
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
if err != nil {
return nil, err
}
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
if err != nil {
return nil, err
}
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], rootCAs)
if err != nil {
return nil, err
}
for id, args := range amqpTargets {
if !args.Enable {
continue
}
@@ -78,7 +127,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyES(cfg) {
for id, args := range esTargets {
if !args.Enable {
continue
}
@@ -96,7 +145,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyKafka(cfg) {
for id, args := range kafkaTargets {
if !args.Enable {
continue
}
@@ -114,7 +163,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyMQTT(cfg, rootCAs) {
for id, args := range mqttTargets {
if !args.Enable {
continue
}
@@ -132,7 +181,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyMySQL(cfg) {
for id, args := range mysqlTargets {
if !args.Enable {
continue
}
@@ -149,7 +198,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyNATS(cfg) {
for id, args := range natsTargets {
if !args.Enable {
continue
}
@@ -166,7 +215,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyNSQ(cfg) {
for id, args := range nsqTargets {
if !args.Enable {
continue
}
@@ -183,7 +232,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyPostgres(cfg) {
for id, args := range postgresTargets {
if !args.Enable {
continue
}
@@ -200,7 +249,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyRedis(cfg) {
for id, args := range redisTargets {
if !args.Enable {
continue
}
@@ -217,7 +266,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
}
}
for id, args := range GetNotifyWebhook(cfg, rootCAs) {
for id, args := range webhookTargets {
if !args.Enable {
continue
}
@@ -307,17 +356,16 @@ var (
)
// GetNotifyKafka - returns a map of registered notification 'kafka' targets
func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) {
kafkaTargets := make(map[string]target.KafkaArgs)
for k, kv := range mergeTargets(s[config.NotifyKafkaSubSys], target.EnvKafkaState, DefaultKafkaKVS) {
for k, kv := range mergeTargets(kafkaKVS, target.EnvKafkaState, DefaultKafkaKVS) {
stateEnv := target.EnvKafkaState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -328,6 +376,9 @@ func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
brokersEnv = brokersEnv + config.Default + k
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
if len(kafkaBrokers) == 0 {
return nil, config.Error("kafka 'brokers' cannot be empty")
}
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host
host, err = xnet.ParseHost(s)
@@ -337,8 +388,7 @@ func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
brokers = append(brokers, *host)
}
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvKafkaQueueLimit
@@ -347,8 +397,7 @@ func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.KafkaQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
clientAuthEnv := target.EnvKafkaTLSClientAuth
@@ -357,8 +406,7 @@ func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
}
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(target.KafkaTLSClientAuth)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
topicEnv := target.EnvKafkaTopic
@@ -408,14 +456,13 @@ func GetNotifyKafka(s config.Config) map[string]target.KafkaArgs {
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
if err = kafkaArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
kafkaTargets[k] = kafkaArgs
}
return kafkaTargets
return kafkaTargets, nil
}
// DefaultMQTTKVS - default MQTT config
@@ -436,9 +483,9 @@ var (
)
// GetNotifyMQTT - returns a map of registered notification 'mqtt' targets
func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQTTArgs {
func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) {
mqttTargets := make(map[string]target.MQTTArgs)
for k, kv := range mergeTargets(s[config.NotifyMQTTSubSys], target.EnvMQTTState, DefaultMQTTKVS) {
for k, kv := range mergeTargets(mqttKVS, target.EnvMQTTState, DefaultMQTTKVS) {
stateEnv := target.EnvMQTTState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -446,8 +493,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -459,8 +505,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
}
brokerURL, err := xnet.ParseURL(env.Get(brokerEnv, kv.Get(target.MqttBroker)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
reconnectIntervalEnv := target.EnvMQTTReconnectInterval
@@ -470,8 +515,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
reconnectInterval, err := time.ParseDuration(env.Get(reconnectIntervalEnv,
kv.Get(target.MqttReconnectInterval)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
keepAliveIntervalEnv := target.EnvMQTTKeepAliveInterval
@@ -481,8 +525,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
keepAliveInterval, err := time.ParseDuration(env.Get(keepAliveIntervalEnv,
kv.Get(target.MqttKeepAliveInterval)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvMQTTQueueLimit
@@ -491,8 +534,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MqttQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
qosEnv := target.EnvMQTTQoS
@@ -503,8 +545,7 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
// Parse uint8 value
qos, err := strconv.ParseUint(env.Get(qosEnv, kv.Get(target.MqttQoS)), 10, 8)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
topicEnv := target.EnvMQTTTopic
@@ -542,12 +583,11 @@ func GetNotifyMQTT(s config.Config, rootCAs *x509.CertPool) map[string]target.MQ
}
if err = mqttArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
mqttTargets[k] = mqttArgs
}
return mqttTargets
return mqttTargets, nil
}
// DefaultMySQLKVS - default KV for MySQL
@@ -569,9 +609,9 @@ var (
)
// GetNotifyMySQL - returns a map of registered notification 'mysql' targets
func GetNotifyMySQL(s config.Config) map[string]target.MySQLArgs {
func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) {
mysqlTargets := make(map[string]target.MySQLArgs)
for k, kv := range mergeTargets(s[config.NotifyMySQLSubSys], target.EnvMySQLState, DefaultMySQLKVS) {
for k, kv := range mergeTargets(mysqlKVS, target.EnvMySQLState, DefaultMySQLKVS) {
stateEnv := target.EnvMySQLState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -579,8 +619,7 @@ func GetNotifyMySQL(s config.Config) map[string]target.MySQLArgs {
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -593,8 +632,7 @@ func GetNotifyMySQL(s config.Config) map[string]target.MySQLArgs {
host, err := xnet.ParseURL(env.Get(hostEnv, kv.Get(target.MySQLHost)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvMySQLQueueLimit
@@ -603,8 +641,7 @@ func GetNotifyMySQL(s config.Config) map[string]target.MySQLArgs {
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MySQLQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
formatEnv := target.EnvMySQLFormat
@@ -653,12 +690,11 @@ func GetNotifyMySQL(s config.Config) map[string]target.MySQLArgs {
QueueLimit: queueLimit,
}
if err = mysqlArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
mysqlTargets[k] = mysqlArgs
}
return mysqlTargets
return mysqlTargets, nil
}
// DefaultNATSKVS - NATS KV for nats config.
@@ -683,9 +719,9 @@ var (
)
// GetNotifyNATS - returns a map of registered notification 'nats' targets
func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, error) {
natsTargets := make(map[string]target.NATSArgs)
for k, kv := range mergeTargets(s[config.NotifyNATSSubSys], target.EnvNATSState, DefaultNATSKVS) {
for k, kv := range mergeTargets(natsKVS, target.EnvNATSState, DefaultNATSKVS) {
stateEnv := target.EnvNATSState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -693,8 +729,7 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -707,8 +742,7 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
address, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NATSAddress)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
pingIntervalEnv := target.EnvNATSPingInterval
@@ -718,8 +752,7 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
pingInterval, err := strconv.ParseInt(env.Get(pingIntervalEnv, kv.Get(target.NATSPingInterval)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvNATSQueueLimit
@@ -729,8 +762,7 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NATSQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
secureEnv := target.EnvNATSSecure
@@ -794,8 +826,7 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
maxPubAcksInflight, err := strconv.Atoi(env.Get(maxPubAcksInflightEnv,
kv.Get(target.NATSStreamingMaxPubAcksInFlight)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
clusterIDEnv := target.EnvNATSStreamingClusterID
if k != config.Default {
@@ -808,13 +839,12 @@ func GetNotifyNATS(s config.Config) map[string]target.NATSArgs {
}
if err = natsArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
natsTargets[k] = natsArgs
}
return natsTargets
return natsTargets, nil
}
// DefaultNSQKVS - NSQ KV for config
@@ -832,9 +862,9 @@ var (
)
// GetNotifyNSQ - returns a map of registered notification 'nsq' targets
func GetNotifyNSQ(s config.Config) map[string]target.NSQArgs {
func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) {
nsqTargets := make(map[string]target.NSQArgs)
for k, kv := range mergeTargets(s[config.NotifyNSQSubSys], target.EnvNSQState, DefaultNSQKVS) {
for k, kv := range mergeTargets(nsqKVS, target.EnvNSQState, DefaultNSQKVS) {
stateEnv := target.EnvNSQState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -842,8 +872,7 @@ func GetNotifyNSQ(s config.Config) map[string]target.NSQArgs {
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -855,8 +884,7 @@ func GetNotifyNSQ(s config.Config) map[string]target.NSQArgs {
}
nsqdAddress, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NSQAddress)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
tlsEnableEnv := target.EnvNSQTLSEnable
if k != config.Default {
@@ -873,8 +901,7 @@ func GetNotifyNSQ(s config.Config) map[string]target.NSQArgs {
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NSQQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
topicEnv := target.EnvNSQTopic
@@ -897,13 +924,12 @@ func GetNotifyNSQ(s config.Config) map[string]target.NSQArgs {
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.StateOn
if err = nsqArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
nsqTargets[k] = nsqArgs
}
return nsqTargets
return nsqTargets, nil
}
// DefaultPostgresKVS - default Postgres KV for server config.
@@ -925,9 +951,9 @@ var (
)
// GetNotifyPostgres - returns a map of registered notification 'postgres' targets
func GetNotifyPostgres(s config.Config) map[string]target.PostgreSQLArgs {
func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) {
psqlTargets := make(map[string]target.PostgreSQLArgs)
for k, kv := range mergeTargets(s[config.NotifyPostgresSubSys], target.EnvPostgresState, DefaultPostgresKVS) {
for k, kv := range mergeTargets(postgresKVS, target.EnvPostgresState, DefaultPostgresKVS) {
stateEnv := target.EnvPostgresState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -935,8 +961,7 @@ func GetNotifyPostgres(s config.Config) map[string]target.PostgreSQLArgs {
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -949,8 +974,7 @@ func GetNotifyPostgres(s config.Config) map[string]target.PostgreSQLArgs {
host, err := xnet.ParseHost(env.Get(hostEnv, kv.Get(target.PostgresHost)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvPostgresQueueLimit
@@ -960,8 +984,7 @@ func GetNotifyPostgres(s config.Config) map[string]target.PostgreSQLArgs {
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.PostgresQueueLimit)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
formatEnv := target.EnvPostgresFormat
@@ -1018,12 +1041,12 @@ func GetNotifyPostgres(s config.Config) map[string]target.PostgreSQLArgs {
QueueLimit: uint64(queueLimit),
}
if err = psqlArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
psqlTargets[k] = psqlArgs
}
return psqlTargets
return psqlTargets, nil
}
// DefaultRedisKVS - default KV for redis config
@@ -1041,9 +1064,9 @@ var (
)
// GetNotifyRedis - returns a map of registered notification 'redis' targets
func GetNotifyRedis(s config.Config) map[string]target.RedisArgs {
func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) {
redisTargets := make(map[string]target.RedisArgs)
for k, kv := range mergeTargets(s[config.NotifyRedisSubSys], target.EnvRedisState, DefaultRedisKVS) {
for k, kv := range mergeTargets(redisKVS, target.EnvRedisState, DefaultRedisKVS) {
stateEnv := target.EnvRedisState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
@@ -1051,8 +1074,7 @@ func GetNotifyRedis(s config.Config) map[string]target.RedisArgs {
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -1064,8 +1086,7 @@ func GetNotifyRedis(s config.Config) map[string]target.RedisArgs {
}
addr, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.RedisAddress)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvRedisQueueLimit
if k != config.Default {
@@ -1073,8 +1094,7 @@ func GetNotifyRedis(s config.Config) map[string]target.RedisArgs {
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.RedisQueueLimit)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
formatEnv := target.EnvRedisFormat
if k != config.Default {
@@ -1102,12 +1122,11 @@ func GetNotifyRedis(s config.Config) map[string]target.RedisArgs {
QueueLimit: uint64(queueLimit),
}
if err = redisArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
redisTargets[k] = redisArgs
}
return redisTargets
return redisTargets, nil
}
// DefaultWebhookKVS - default KV for webhook config
@@ -1123,17 +1142,16 @@ var (
)
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
func GetNotifyWebhook(s config.Config, rootCAs *x509.CertPool) map[string]target.WebhookArgs {
func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.WebhookArgs, error) {
webhookTargets := make(map[string]target.WebhookArgs)
for k, kv := range mergeTargets(s[config.NotifyWebhookSubSys], target.EnvWebhookState, DefaultWebhookKVS) {
for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookState, DefaultWebhookKVS) {
stateEnv := target.EnvWebhookState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -1142,10 +1160,9 @@ func GetNotifyWebhook(s config.Config, rootCAs *x509.CertPool) map[string]target
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvWebhookQueueLimit
if k != config.Default {
@@ -1153,8 +1170,7 @@ func GetNotifyWebhook(s config.Config, rootCAs *x509.CertPool) map[string]target
}
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.WebhookQueueLimit)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueDirEnv := target.EnvWebhookQueueDir
if k != config.Default {
@@ -1174,12 +1190,11 @@ func GetNotifyWebhook(s config.Config, rootCAs *x509.CertPool) map[string]target
QueueLimit: uint64(queueLimit),
}
if err = webhookArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
webhookTargets[k] = webhookArgs
}
return webhookTargets
return webhookTargets, nil
}
// DefaultESKVS - default KV config for Elasticsearch target
@@ -1196,17 +1211,16 @@ var (
)
// GetNotifyES - returns a map of registered notification 'elasticsearch' targets
func GetNotifyES(s config.Config) map[string]target.ElasticsearchArgs {
func GetNotifyES(esKVS map[string]config.KVS) (map[string]target.ElasticsearchArgs, error) {
esTargets := make(map[string]target.ElasticsearchArgs)
for k, kv := range mergeTargets(s[config.NotifyESSubSys], target.EnvElasticState, DefaultESKVS) {
for k, kv := range mergeTargets(esKVS, target.EnvElasticState, DefaultESKVS) {
stateEnv := target.EnvElasticState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
@@ -1217,10 +1231,9 @@ func GetNotifyES(s config.Config) map[string]target.ElasticsearchArgs {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.ElasticURL)))
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.ElasticURL)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
queueLimitEnv := target.EnvElasticQueueLimit
@@ -1230,8 +1243,7 @@ func GetNotifyES(s config.Config) map[string]target.ElasticsearchArgs {
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.ElasticQueueLimit)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
formatEnv := target.EnvElasticFormat
@@ -1258,12 +1270,11 @@ func GetNotifyES(s config.Config) map[string]target.ElasticsearchArgs {
QueueLimit: uint64(queueLimit),
}
if err = esArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
esTargets[k] = esArgs
}
return esTargets
return esTargets, nil
}
// DefaultAMQPKVS - default KV for AMQP config
@@ -1287,30 +1298,27 @@ var (
)
// GetNotifyAMQP - returns a map of registered notification 'amqp' targets
func GetNotifyAMQP(s config.Config) map[string]target.AMQPArgs {
func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) {
amqpTargets := make(map[string]target.AMQPArgs)
for k, kv := range mergeTargets(s[config.NotifyAMQPSubSys], target.EnvAMQPState, DefaultAMQPKVS) {
for k, kv := range mergeTargets(amqpKVS, target.EnvAMQPState, DefaultAMQPKVS) {
stateEnv := target.EnvAMQPState
if k != config.Default {
stateEnv = stateEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
if !enabled {
continue
}
urlEnv := target.EnvAMQPURL
if k != config.Default {
urlEnv = urlEnv + config.Default + k
}
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.AmqpURL)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
deliveryModeEnv := target.EnvAMQPDeliveryMode
if k != config.Default {
@@ -1318,8 +1326,7 @@ func GetNotifyAMQP(s config.Config) map[string]target.AMQPArgs {
}
deliveryMode, err := strconv.Atoi(env.Get(deliveryModeEnv, kv.Get(target.AmqpDeliveryMode)))
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
exchangeEnv := target.EnvAMQPExchange
if k != config.Default {
@@ -1367,8 +1374,7 @@ func GetNotifyAMQP(s config.Config) map[string]target.AMQPArgs {
}
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.AmqpQueueLimit)), 10, 64)
if err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
amqpArgs := target.AMQPArgs{
Enable: enabled,
@@ -1387,10 +1393,9 @@ func GetNotifyAMQP(s config.Config) map[string]target.AMQPArgs {
QueueLimit: queueLimit,
}
if err = amqpArgs.Validate(); err != nil {
logger.LogIf(context.Background(), err)
continue
return nil, err
}
amqpTargets[k] = amqpArgs
}
return amqpTargets
return amqpTargets, nil
}