Final changes to config sub-system (#8600)

- Introduces changes such as certain types of
  errors that can be ignored or which need to 
  go into safe mode.
- Update help text as per the review
This commit is contained in:
Harshavardhana
2019-12-04 15:32:37 -08:00
committed by kannappanr
parent 794eb54da8
commit c9940d8c3f
65 changed files with 605 additions and 1033 deletions

View File

@@ -341,8 +341,8 @@ func mergeTargets(cfgTargets map[string]config.KVS, envname string, defaultKVS c
var (
DefaultKafkaKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTopic,
@@ -366,15 +366,15 @@ var (
},
config.KV{
Key: target.KafkaSASL,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLS,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaTLSSkipVerify,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.KafkaQueueLimit,
@@ -390,12 +390,12 @@ var (
// GetNotifyKafka - returns a map of registered notification 'kafka' targets
func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) {
kafkaTargets := make(map[string]target.KafkaArgs)
for k, kv := range mergeTargets(kafkaKVS, target.EnvKafkaState, DefaultKafkaKVS) {
stateEnv := target.EnvKafkaState
for k, kv := range mergeTargets(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) {
enableEnv := target.EnvKafkaEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -409,7 +409,7 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
if len(kafkaBrokers) == 0 {
return nil, config.Error("kafka 'brokers' cannot be empty")
return nil, config.Errorf(config.SafeModeKind, "kafka 'brokers' cannot be empty")
}
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host
@@ -467,8 +467,8 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.StateOn
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.StateOn
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.EnableOn
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.EnableOn
kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth)
saslEnableEnv := target.EnvKafkaSASLEnable
@@ -483,7 +483,7 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
if k != config.Default {
saslPasswordEnv = saslPasswordEnv + config.Default + k
}
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.StateOn
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername))
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
@@ -501,8 +501,8 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
var (
DefaultMQTTKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MqttBroker,
@@ -546,13 +546,13 @@ var (
// GetNotifyMQTT - returns a map of registered notification 'mqtt' targets
func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) {
mqttTargets := make(map[string]target.MQTTArgs)
for k, kv := range mergeTargets(mqttKVS, target.EnvMQTTState, DefaultMQTTKVS) {
stateEnv := target.EnvMQTTState
for k, kv := range mergeTargets(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) {
enableEnv := target.EnvMQTTEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -656,8 +656,8 @@ func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s
var (
DefaultMySQLKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.MySQLFormat,
@@ -705,13 +705,13 @@ var (
// GetNotifyMySQL - returns a map of registered notification 'mysql' targets
func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) {
mysqlTargets := make(map[string]target.MySQLArgs)
for k, kv := range mergeTargets(mysqlKVS, target.EnvMySQLState, DefaultMySQLKVS) {
stateEnv := target.EnvMySQLState
for k, kv := range mergeTargets(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) {
enableEnv := target.EnvMySQLEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -795,8 +795,8 @@ func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs
var (
DefaultNATSKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSAddress,
@@ -832,7 +832,7 @@ var (
},
config.KV{
Key: target.NATSSecure,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSPingInterval,
@@ -840,11 +840,11 @@ var (
},
config.KV{
Key: target.NATSStreaming,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreamingAsync,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.NATSStreamingMaxPubAcksInFlight,
@@ -868,13 +868,13 @@ var (
// GetNotifyNATS - returns a map of registered notification 'nats' targets
func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, error) {
natsTargets := make(map[string]target.NATSArgs)
for k, kv := range mergeTargets(natsKVS, target.EnvNATSState, DefaultNATSKVS) {
stateEnv := target.EnvNATSState
for k, kv := range mergeTargets(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) {
enableEnv := target.EnvNATSEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -967,7 +967,7 @@ func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, e
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
Secure: env.Get(secureEnv, kv.Get(target.NATSSecure)) == config.StateOn,
Secure: env.Get(secureEnv, kv.Get(target.NATSSecure)) == config.EnableOn,
PingInterval: pingInterval,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
QueueLimit: queueLimit,
@@ -978,7 +978,7 @@ func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, e
streamingEnableEnv = streamingEnableEnv + config.Default + k
}
streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.StateOn
streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.EnableOn
if streamingEnabled {
asyncEnv := target.EnvNATSStreamingAsync
if k != config.Default {
@@ -999,7 +999,7 @@ func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, e
}
natsArgs.Streaming.Enable = streamingEnabled
natsArgs.Streaming.ClusterID = env.Get(clusterIDEnv, kv.Get(target.NATSStreamingClusterID))
natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.StateOn
natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.EnableOn
natsArgs.Streaming.MaxPubAcksInflight = maxPubAcksInflight
}
@@ -1016,8 +1016,8 @@ func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, e
var (
DefaultNSQKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQAddress,
@@ -1029,11 +1029,11 @@ var (
},
config.KV{
Key: target.NSQTLS,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQTLSSkipVerify,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.NSQQueueDir,
@@ -1049,13 +1049,13 @@ var (
// GetNotifyNSQ - returns a map of registered notification 'nsq' targets
func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) {
nsqTargets := make(map[string]target.NSQArgs)
for k, kv := range mergeTargets(nsqKVS, target.EnvNSQState, DefaultNSQKVS) {
stateEnv := target.EnvNSQState
for k, kv := range mergeTargets(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) {
enableEnv := target.EnvNSQEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1105,8 +1105,8 @@ func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, erro
QueueDir: env.Get(queueDirEnv, kv.Get(target.NSQQueueDir)),
QueueLimit: queueLimit,
}
nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.StateOn
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.StateOn
nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.EnableOn
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.EnableOn
if err = nsqArgs.Validate(); err != nil {
return nil, err
@@ -1121,8 +1121,8 @@ func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, erro
var (
DefaultPostgresKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.PostgresFormat,
@@ -1170,13 +1170,13 @@ var (
// GetNotifyPostgres - returns a map of registered notification 'postgres' targets
func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) {
psqlTargets := make(map[string]target.PostgreSQLArgs)
for k, kv := range mergeTargets(postgresKVS, target.EnvPostgresState, DefaultPostgresKVS) {
stateEnv := target.EnvPostgresState
for k, kv := range mergeTargets(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) {
enableEnv := target.EnvPostgresEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1270,8 +1270,8 @@ func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.Pos
var (
DefaultRedisKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.RedisFormat,
@@ -1303,13 +1303,13 @@ var (
// GetNotifyRedis - returns a map of registered notification 'redis' targets
func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) {
redisTargets := make(map[string]target.RedisArgs)
for k, kv := range mergeTargets(redisKVS, target.EnvRedisState, DefaultRedisKVS) {
stateEnv := target.EnvRedisState
for k, kv := range mergeTargets(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) {
enableEnv := target.EnvRedisEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1370,8 +1370,8 @@ func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs
var (
DefaultWebhookKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.WebhookEndpoint,
@@ -1395,12 +1395,12 @@ var (
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.WebhookArgs, error) {
webhookTargets := make(map[string]target.WebhookArgs)
for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookState, DefaultWebhookKVS) {
stateEnv := target.EnvWebhookState
for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
enableEnv := target.EnvWebhookEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1452,8 +1452,8 @@ func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool)
var (
DefaultESKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.ElasticURL,
@@ -1481,12 +1481,12 @@ var (
// GetNotifyES - returns a map of registered notification 'elasticsearch' targets
func GetNotifyES(esKVS map[string]config.KVS) (map[string]target.ElasticsearchArgs, error) {
esTargets := make(map[string]target.ElasticsearchArgs)
for k, kv := range mergeTargets(esKVS, target.EnvElasticState, DefaultESKVS) {
stateEnv := target.EnvElasticState
for k, kv := range mergeTargets(esKVS, target.EnvElasticEnable, DefaultESKVS) {
enableEnv := target.EnvElasticEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1549,8 +1549,8 @@ func GetNotifyES(esKVS map[string]config.KVS) (map[string]target.ElasticsearchAr
var (
DefaultAMQPKVS = config.KVS{
config.KV{
Key: config.State,
Value: config.StateOff,
Key: config.Enable,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpURL,
@@ -1570,23 +1570,23 @@ var (
},
config.KV{
Key: target.AmqpMandatory,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDurable,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpNoWait,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpInternal,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpAutoDeleted,
Value: config.StateOff,
Value: config.EnableOff,
},
config.KV{
Key: target.AmqpDeliveryMode,
@@ -1606,12 +1606,12 @@ var (
// GetNotifyAMQP - returns a map of registered notification 'amqp' targets
func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) {
amqpTargets := make(map[string]target.AMQPArgs)
for k, kv := range mergeTargets(amqpKVS, target.EnvAMQPState, DefaultAMQPKVS) {
stateEnv := target.EnvAMQPState
for k, kv := range mergeTargets(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) {
enableEnv := target.EnvAMQPEnable
if k != config.Default {
stateEnv = stateEnv + config.Default + k
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(stateEnv, kv.Get(config.State)))
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
if err != nil {
return nil, err
}
@@ -1689,12 +1689,12 @@ func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, e
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
DeliveryMode: uint8(deliveryMode),
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.StateOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.StateOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.StateOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.StateOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.StateOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.StateOn,
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
QueueLimit: queueLimit,
}