Add subsystem level validation on config set (#14269)

When setting a config of a particular sub-system, validate the existing
config and notification targets of only that sub-system, so that
existing errors related to one sub-system (e.g. notification target
offline) do not result in errors for other sub-systems.
This commit is contained in:
Shireesh Anjal
2022-02-09 00:06:41 +05:30
committed by GitHub
parent 2ee337ead5
commit 9890f579f8
5 changed files with 539 additions and 423 deletions

View File

@@ -58,6 +58,27 @@ func TestNotificationTargets(ctx context.Context, cfg config.Config, transport *
return err
}
// TestSubSysNotificationTargets - tests notification targets of given subsystem
func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, subSys string) error {
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return err
}
targetList := event.NewTargetList()
targetsOffline, err := fetchSubSysTargets(ctx, cfg, transport, true, true, subSys, targetList)
if err == nil {
// Close all targets since we are only testing connections.
for _, t := range targetList.TargetMap() {
_ = t.Close()
}
}
if targetsOffline {
return ErrTargetsOffline
}
return err
}
// GetNotificationTargets registers and initializes all notification
// targets, returns error if any.
func GetNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool) (*event.TargetList, error) {
@@ -91,6 +112,257 @@ func RegisterNotificationTargets(ctx context.Context, cfg config.Config, transpo
return targetList, nil
}
func fetchSubSysTargets(ctx context.Context, cfg config.Config,
transport *http.Transport, test bool, returnOnTargetError bool,
subSys string, targetList *event.TargetList) (targetsOffline bool, err error) {
targetsOffline = false
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
return targetsOffline, err
}
switch subSys {
case config.NotifyAMQPSubSys:
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range amqpTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
}
case config.NotifyESSubSys:
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport)
if err != nil {
return targetsOffline, err
}
for id, args := range esTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyKafkaSubSys:
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range kafkaTargets {
if !args.Enable {
continue
}
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyMQTTSubSys:
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return targetsOffline, err
}
for id, args := range mqttTargets {
if !args.Enable {
continue
}
args.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyMySQLSubSys:
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range mysqlTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyNATSSubSys:
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return targetsOffline, err
}
for id, args := range natsTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyNSQSubSys:
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range nsqTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyPostgresSubSys:
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range postgresTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyRedisSubSys:
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
if err != nil {
return targetsOffline, err
}
for id, args := range redisTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
case config.NotifyWebhookSubSys:
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
if err != nil {
return targetsOffline, err
}
for id, args := range webhookTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return targetsOffline, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return targetsOffline, err
}
}
}
}
return targetsOffline, nil
}
// FetchRegisteredTargets - Returns a set of configured TargetList
// If `returnOnTargetError` is set to true, The function returns when a target initialization fails
// Else, the function will return a complete TargetList irrespective of errors
@@ -109,265 +381,13 @@ func FetchRegisteredTargets(ctx context.Context, cfg config.Config, transport *h
}
}()
if err = checkValidNotificationKeys(cfg); err != nil {
return nil, err
}
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
if err != nil {
return nil, err
}
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport)
if err != nil {
return nil, err
}
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
if err != nil {
return nil, err
}
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
if err != nil {
return nil, err
}
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
if err != nil {
return nil, err
}
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
if err != nil {
return nil, err
}
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
if err != nil {
return nil, err
}
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
if err != nil {
return nil, err
}
for id, args := range amqpTargets {
if !args.Enable {
continue
for _, subSys := range config.NotifySubSystems.ToSlice() {
if targetsOffline, err = fetchSubSysTargets(ctx, cfg, transport, test, returnOnTargetError, subSys, targetList); err != nil {
return targetList, err
}
newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
if targetsOffline {
return targetList, ErrTargetsOffline
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range esTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range kafkaTargets {
if !args.Enable {
continue
}
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range mqttTargets {
if !args.Enable {
continue
}
args.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range mysqlTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range natsTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range nsqTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range postgresTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range redisTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
for id, args := range webhookTargets {
if !args.Enable {
continue
}
newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test)
if err != nil {
targetsOffline = true
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
if targetsOffline {
return targetList, ErrTargetsOffline
}
return targetList, nil
@@ -389,21 +409,19 @@ var (
}
)
func checkValidNotificationKeys(cfg config.Config) error {
for subSys, tgt := range cfg {
validKVS, ok := DefaultNotificationKVS[subSys]
if !ok {
continue
func checkValidNotificationKeysForSubSys(subSys string, tgt map[string]config.KVS) error {
validKVS, ok := DefaultNotificationKVS[subSys]
if !ok {
return nil
}
for tname, kv := range tgt {
subSysTarget := subSys
if tname != config.Default {
subSysTarget = subSys + config.SubSystemSeparator + tname
}
for tname, kv := range tgt {
subSysTarget := subSys
if tname != config.Default {
subSysTarget = subSys + config.SubSystemSeparator + tname
}
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
return err
}
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
return err
}
}
}