From 9890f579f8ae47f7dd0aaf152fd926e9245bb9f0 Mon Sep 17 00:00:00 2001 From: Shireesh Anjal <355479+anjalshireesh@users.noreply.github.com> Date: Wed, 9 Feb 2022 00:06:41 +0530 Subject: [PATCH] Add subsystem level validation on `config set` (#14269) When setting a config of a particular sub-system, validate the existing config and notification targets of only that sub-system, so that existing errors related to one sub-system (e.g. notification target offline) do not result in errors for other sub-systems. --- cmd/admin-handlers-config-kv.go | 14 +- cmd/config-current.go | 176 +++++----- internal/config/config.go | 54 ++- internal/config/notify/parse.go | 560 ++++++++++++++++---------------- internal/logger/config.go | 158 +++++---- 5 files changed, 539 insertions(+), 423 deletions(-) diff --git a/cmd/admin-handlers-config-kv.go b/cmd/admin-handlers-config-kv.go index d4c34678a..35a6fdd1e 100644 --- a/cmd/admin-handlers-config-kv.go +++ b/cmd/admin-handlers-config-kv.go @@ -74,7 +74,7 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - if err = validateConfig(cfg); err != nil { + if err = validateConfig(cfg, ""); err != nil { writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL) return } @@ -138,7 +138,13 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ return } - if err = validateConfig(cfg); err != nil { + subSys, _, _, err := config.GetSubSys(string(kvBytes)) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err = validateConfig(cfg, subSys); err != nil { writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL) return } @@ -260,7 +266,7 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r return } - if err = validateConfig(cfg); err != nil { + if err = validateConfig(cfg, ""); err != nil { writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL) return } @@ -371,7 +377,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques return } - if err = validateConfig(cfg); err != nil { + if err = validateConfig(cfg, ""); err != nil { writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL) return } diff --git a/cmd/config-current.go b/cmd/config-current.go index d035d8c9d..9b7c87ad5 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -255,65 +255,55 @@ var ( globalServerConfigMu sync.RWMutex ) -func validateConfig(s config.Config) error { - objAPI := newObjectLayerFn() - - // We must have a global lock for this so nobody else modifies env while we do. - defer env.LockSetEnv()() - - // Disable merging env values with config for validation. - env.SetEnvOff() - - // Enable env values to validate KMS. - defer env.SetEnvOn() - - if _, err := config.LookupCreds(s[config.CredentialsSubSys][config.Default]); err != nil { - return err - } - - if _, err := config.LookupSite(s[config.SiteSubSys][config.Default], s[config.RegionSubSys][config.Default]); err != nil { - return err - } - - if _, err := api.LookupConfig(s[config.APISubSys][config.Default]); err != nil { - return err - } - - if globalIsErasure { - if objAPI == nil { - return errServerNotInitialized +func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) error { + switch subSys { + case config.CredentialsSubSys: + if _, err := config.LookupCreds(s[config.CredentialsSubSys][config.Default]); err != nil { + return err } - for _, setDriveCount := range objAPI.SetDriveCounts() { - if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil { - return err + case config.SiteSubSys: + if _, err := config.LookupSite(s[config.SiteSubSys][config.Default], s[config.RegionSubSys][config.Default]); err != nil { + return err + } + case config.APISubSys: + if _, err := api.LookupConfig(s[config.APISubSys][config.Default]); err != nil { + return err + } + case config.StorageClassSubSys: + if globalIsErasure { + if objAPI == nil { + return errServerNotInitialized + } + for _, setDriveCount := range objAPI.SetDriveCounts() { + if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil { + return err + } } } - } - - if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default]); err != nil { - return err - } - - compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]) - if err != nil { - return err - } - - if objAPI != nil { - if compCfg.Enabled && !objAPI.IsCompressionSupported() { - return fmt.Errorf("Backend does not support compression") + case config.CacheSubSys: + if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default]); err != nil { + return err + } + case config.CompressionSubSys: + compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]) + if err != nil { + return err } - } - if _, err = heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil { - return err - } - - if _, err = scanner.LookupConfig(s[config.ScannerSubSys][config.Default]); err != nil { - return err - } - - { + if objAPI != nil { + if compCfg.Enabled && !objAPI.IsCompressionSupported() { + return fmt.Errorf("Backend does not support compression") + } + } + case config.HealSubSys: + if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil { + return err + } + case config.ScannerSubSys: + if _, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default]); err != nil { + return err + } + case config.EtcdSubSys: etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs) if err != nil { return err @@ -325,15 +315,13 @@ func validateConfig(s config.Config) error { } etcdClnt.Close() } - } - if _, err := openid.LookupConfig(s[config.IdentityOpenIDSubSys][config.Default], - NewGatewayHTTPTransport(), xhttp.DrainBody, globalSite.Region); err != nil { - return err - } - - { - cfg, err := xldap.Lookup(s[config.IdentityLDAPSubSys][config.Default], - globalRootCAs) + case config.IdentityOpenIDSubSys: + if _, err := openid.LookupConfig(s[config.IdentityOpenIDSubSys][config.Default], + NewGatewayHTTPTransport(), xhttp.DrainBody, globalSite.Region); err != nil { + return err + } + case config.IdentityLDAPSubSys: + cfg, err := xldap.Lookup(s[config.IdentityLDAPSubSys][config.Default], globalRootCAs) if err != nil { return err } @@ -344,28 +332,64 @@ func validateConfig(s config.Config) error { } conn.Close() } + case config.IdentityTLSSubSys: + if _, err := xtls.Lookup(s[config.IdentityTLSSubSys][config.Default]); err != nil { + return err + } + case config.SubnetSubSys: + if _, err := subnet.LookupConfig(s[config.SubnetSubSys][config.Default]); err != nil { + return err + } + case config.PolicyOPASubSys: + if _, err := opa.LookupConfig(s[config.PolicyOPASubSys][config.Default], + NewGatewayHTTPTransport(), xhttp.DrainBody); err != nil { + return err + } + default: + if config.LoggerSubSystems.Contains(subSys) { + if err := logger.ValidateSubSysConfig(s, subSys); err != nil { + return err + } + } } - { - _, err := xtls.Lookup(s[config.IdentityTLSSubSys][config.Default]) - if err != nil { + + if config.LoggerSubSystems.Contains(subSys) { + if err := logger.ValidateSubSysConfig(s, subSys); err != nil { return err } } - if _, err := opa.LookupConfig(s[config.PolicyOPASubSys][config.Default], - NewGatewayHTTPTransport(), xhttp.DrainBody); err != nil { - return err + if config.NotifySubSystems.Contains(subSys) { + if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil { + return err + } + } + return nil +} + +func validateConfig(s config.Config, subSys string) error { + objAPI := newObjectLayerFn() + + // We must have a global lock for this so nobody else modifies env while we do. + defer env.LockSetEnv()() + + // Disable merging env values with config for validation. + env.SetEnvOff() + + // Enable env values to validate KMS. + defer env.SetEnvOn() + if subSys != "" { + return validateSubSysConfig(s, subSys, objAPI) } - if _, err := logger.LookupConfig(s); err != nil { - return err + // No sub-system passed. Validate all of them. + for _, ss := range config.SubSystems.ToSlice() { + if err := validateSubSysConfig(s, ss, objAPI); err != nil { + return err + } } - if _, err = subnet.LookupConfig(s[config.SubnetSubSys][config.Default]); err != nil { - return err - } - - return notify.TestNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs()) + return nil } func lookupConfigs(s config.Config, objAPI ObjectLayer) { diff --git a/internal/config/config.go b/internal/config/config.go index 6e3c9a684..d835533cf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -106,6 +106,27 @@ const ( // Add new constants here if you add new fields to config. ) +// NotifySubSystems - all notification sub-systems +var NotifySubSystems = set.CreateStringSet( + NotifyKafkaSubSys, + NotifyMQTTSubSys, + NotifyMySQLSubSys, + NotifyNATSSubSys, + NotifyNSQSubSys, + NotifyESSubSys, + NotifyAMQPSubSys, + NotifyPostgresSubSys, + NotifyRedisSubSys, + NotifyWebhookSubSys, +) + +// LoggerSubSystems - all sub-systems related to logger +var LoggerSubSystems = set.CreateStringSet( + LoggerWebhookSubSys, + AuditWebhookSubSys, + AuditKafkaSubSys, +) + // SubSystems - all supported sub-systems var SubSystems = set.CreateStringSet( CredentialsSubSys, @@ -784,35 +805,46 @@ func (c Config) Clone() Config { return cp } -// SetKVS - set specific key values per sub-system. -func (c Config) SetKVS(s string, defaultKVS map[string]KVS) (dynamic bool, err error) { +// GetSubSys - extracts subssystem info from given config string +func GetSubSys(s string) (subSys string, inputs []string, tgt string, e error) { + tgt = Default if len(s) == 0 { - return false, Errorf("input arguments cannot be empty") + return subSys, inputs, tgt, Errorf("input arguments cannot be empty") } - inputs := strings.SplitN(s, KvSpaceSeparator, 2) + inputs = strings.SplitN(s, KvSpaceSeparator, 2) if len(inputs) <= 1 { - return false, Errorf("invalid number of arguments '%s'", s) + return subSys, inputs, tgt, Errorf("invalid number of arguments '%s'", s) } subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2) if len(subSystemValue) == 0 { - return false, Errorf("invalid number of arguments %s", s) + return subSys, inputs, tgt, Errorf("invalid number of arguments %s", s) } if !SubSystems.Contains(subSystemValue[0]) { - return false, Errorf("unknown sub-system %s", s) + return subSys, inputs, tgt, Errorf("unknown sub-system %s", s) } + subSys = subSystemValue[0] if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 { - return false, Errorf("sub-system '%s' only supports single target", subSystemValue[0]) + return subSys, inputs, tgt, Errorf("sub-system '%s' only supports single target", subSystemValue[0]) } - dynamic = SubSystemsDynamic.Contains(subSystemValue[0]) - tgt := Default - subSys := subSystemValue[0] if len(subSystemValue) == 2 { tgt = subSystemValue[1] } + return subSys, inputs, tgt, e +} + +// SetKVS - set specific key values per sub-system. +func (c Config) SetKVS(s string, defaultKVS map[string]KVS) (dynamic bool, err error) { + subSys, inputs, tgt, err := GetSubSys(s) + if err != nil { + return false, err + } + + dynamic = SubSystemsDynamic.Contains(subSys) + fields := madmin.KvFields(inputs[1], defaultKVS[subSys].Keys()) if len(fields) == 0 { return false, Errorf("sub-system '%s' cannot have empty keys", subSys) diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 556ec78de..b2e71f447 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -58,6 +58,27 @@ func TestNotificationTargets(ctx context.Context, cfg config.Config, transport * return err } +// TestSubSysNotificationTargets - tests notification targets of given subsystem +func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, subSys string) error { + if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil { + return err + } + targetList := event.NewTargetList() + targetsOffline, err := fetchSubSysTargets(ctx, cfg, transport, true, true, subSys, targetList) + if err == nil { + // Close all targets since we are only testing connections. + for _, t := range targetList.TargetMap() { + _ = t.Close() + } + } + + if targetsOffline { + return ErrTargetsOffline + } + + return err +} + // GetNotificationTargets registers and initializes all notification // targets, returns error if any. func GetNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool) (*event.TargetList, error) { @@ -91,6 +112,257 @@ func RegisterNotificationTargets(ctx context.Context, cfg config.Config, transpo return targetList, nil } +func fetchSubSysTargets(ctx context.Context, cfg config.Config, + transport *http.Transport, test bool, returnOnTargetError bool, + subSys string, targetList *event.TargetList) (targetsOffline bool, err error) { + targetsOffline = false + if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil { + return targetsOffline, err + } + + switch subSys { + case config.NotifyAMQPSubSys: + amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range amqpTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + } + case config.NotifyESSubSys: + esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport) + if err != nil { + return targetsOffline, err + } + for id, args := range esTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyKafkaSubSys: + kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range kafkaTargets { + if !args.Enable { + continue + } + args.TLS.RootCAs = transport.TLSClientConfig.RootCAs + newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + + case config.NotifyMQTTSubSys: + mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs) + if err != nil { + return targetsOffline, err + } + for id, args := range mqttTargets { + if !args.Enable { + continue + } + args.RootCAs = transport.TLSClientConfig.RootCAs + newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyMySQLSubSys: + mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range mysqlTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyNATSSubSys: + natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs) + if err != nil { + return targetsOffline, err + } + for id, args := range natsTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyNSQSubSys: + nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range nsqTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyPostgresSubSys: + postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range postgresTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyRedisSubSys: + redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys]) + if err != nil { + return targetsOffline, err + } + for id, args := range redisTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + case config.NotifyWebhookSubSys: + webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport) + if err != nil { + return targetsOffline, err + } + for id, args := range webhookTargets { + if !args.Enable { + continue + } + newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test) + if err != nil { + targetsOffline = true + if returnOnTargetError { + return targetsOffline, err + } + _ = newTarget.Close() + } + if err = targetList.Add(newTarget); err != nil { + logger.LogIf(context.Background(), err) + if returnOnTargetError { + return targetsOffline, err + } + } + } + + } + return targetsOffline, nil +} + // FetchRegisteredTargets - Returns a set of configured TargetList // If `returnOnTargetError` is set to true, The function returns when a target initialization fails // Else, the function will return a complete TargetList irrespective of errors @@ -109,265 +381,13 @@ func FetchRegisteredTargets(ctx context.Context, cfg config.Config, transport *h } }() - if err = checkValidNotificationKeys(cfg); err != nil { - return nil, err - } - - amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys]) - if err != nil { - return nil, err - } - - esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport) - if err != nil { - return nil, err - } - - kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys]) - if err != nil { - return nil, err - } - - mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs) - if err != nil { - return nil, err - } - - mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys]) - if err != nil { - return nil, err - } - - natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs) - if err != nil { - return nil, err - } - - nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys]) - if err != nil { - return nil, err - } - - postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys]) - if err != nil { - return nil, err - } - - redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys]) - if err != nil { - return nil, err - } - - webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport) - if err != nil { - return nil, err - } - - for id, args := range amqpTargets { - if !args.Enable { - continue + for _, subSys := range config.NotifySubSystems.ToSlice() { + if targetsOffline, err = fetchSubSysTargets(ctx, cfg, transport, test, returnOnTargetError, subSys, targetList); err != nil { + return targetList, err } - newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() + if targetsOffline { + return targetList, ErrTargetsOffline } - - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range esTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range kafkaTargets { - if !args.Enable { - continue - } - args.TLS.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range mqttTargets { - if !args.Enable { - continue - } - args.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range mysqlTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range natsTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range nsqTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range postgresTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range redisTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - for id, args := range webhookTargets { - if !args.Enable { - continue - } - newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test) - if err != nil { - targetsOffline = true - if returnOnTargetError { - return nil, err - } - _ = newTarget.Close() - } - if err = targetList.Add(newTarget); err != nil { - logger.LogIf(context.Background(), err) - if returnOnTargetError { - return nil, err - } - } - } - - if targetsOffline { - return targetList, ErrTargetsOffline } return targetList, nil @@ -389,21 +409,19 @@ var ( } ) -func checkValidNotificationKeys(cfg config.Config) error { - for subSys, tgt := range cfg { - validKVS, ok := DefaultNotificationKVS[subSys] - if !ok { - continue +func checkValidNotificationKeysForSubSys(subSys string, tgt map[string]config.KVS) error { + validKVS, ok := DefaultNotificationKVS[subSys] + if !ok { + return nil + } + for tname, kv := range tgt { + subSysTarget := subSys + if tname != config.Default { + subSysTarget = subSys + config.SubSystemSeparator + tname } - for tname, kv := range tgt { - subSysTarget := subSys - if tname != config.Default { - subSysTarget = subSys + config.SubSystemSeparator + tname - } - if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn { - if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil { - return err - } + if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn { + if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil { + return err } } } diff --git a/internal/logger/config.go b/internal/logger/config.go index 0119f46ab..8f7c14cf6 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -394,14 +394,7 @@ func GetAuditKafka(kafkaKVS map[string]config.KVS) (map[string]kafka.Config, err return kafkaTargets, nil } -// LookupConfig - lookup logger config, override with ENVs if set. -func LookupConfig(scfg config.Config) (Config, error) { - // Lookup for legacy environment variables first - cfg, err := lookupLegacyConfig() - if err != nil { - return cfg, err - } - +func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { envs := env.List(EnvLoggerWebhookEndpoint) var loggerTargets []string for _, k := range envs { @@ -412,16 +405,6 @@ func LookupConfig(scfg config.Config) (Config, error) { loggerTargets = append(loggerTargets, target) } - var loggerAuditTargets []string - envs = env.List(EnvAuditWebhookEndpoint) - for _, k := range envs { - target := strings.TrimPrefix(k, EnvAuditWebhookEndpoint+config.Default) - if target == EnvAuditWebhookEndpoint { - target = config.Default - } - loggerAuditTargets = append(loggerAuditTargets, target) - } - // Load HTTP logger from the environment if found for _, target := range loggerTargets { if v, ok := cfg.HTTP[target]; ok && v.Enabled { @@ -478,6 +461,62 @@ func LookupConfig(scfg config.Config) (Config, error) { } } + for starget, kv := range scfg[config.LoggerWebhookSubSys] { + if l, ok := cfg.HTTP[starget]; ok && l.Enabled { + // Ignore this HTTP logger config since there is + // a target with the same name loaded and enabled + // from the environment. + continue + } + subSysTarget := config.LoggerWebhookSubSys + if starget != config.Default { + subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget + } + if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil { + return cfg, err + } + enabled, err := config.ParseBool(kv.Get(config.Enable)) + if err != nil { + return cfg, err + } + if !enabled { + continue + } + err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) + if err != nil { + return cfg, err + } + queueSize, err := strconv.Atoi(kv.Get(QueueSize)) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } + cfg.HTTP[starget] = http.Config{ + Enabled: true, + Endpoint: kv.Get(Endpoint), + AuthToken: kv.Get(AuthToken), + ClientCert: kv.Get(ClientCert), + ClientKey: kv.Get(ClientKey), + QueueSize: queueSize, + } + } + + return cfg, nil +} + +func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { + var loggerAuditTargets []string + envs := env.List(EnvAuditWebhookEndpoint) + for _, k := range envs { + target := strings.TrimPrefix(k, EnvAuditWebhookEndpoint+config.Default) + if target == EnvAuditWebhookEndpoint { + target = config.Default + } + loggerAuditTargets = append(loggerAuditTargets, target) + } + for _, target := range loggerAuditTargets { if v, ok := cfg.AuditWebhook[target]; ok && v.Enabled { // This target is already enabled using the @@ -533,48 +572,6 @@ func LookupConfig(scfg config.Config) (Config, error) { } } - for starget, kv := range scfg[config.LoggerWebhookSubSys] { - if l, ok := cfg.HTTP[starget]; ok && l.Enabled { - // Ignore this HTTP logger config since there is - // a target with the same name loaded and enabled - // from the environment. - continue - } - subSysTarget := config.LoggerWebhookSubSys - if starget != config.Default { - subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget - } - if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil { - return cfg, err - } - enabled, err := config.ParseBool(kv.Get(config.Enable)) - if err != nil { - return cfg, err - } - if !enabled { - continue - } - err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) - if err != nil { - return cfg, err - } - queueSize, err := strconv.Atoi(kv.Get(QueueSize)) - if err != nil { - return cfg, err - } - if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") - } - cfg.HTTP[starget] = http.Config{ - Enabled: true, - Endpoint: kv.Get(Endpoint), - AuthToken: kv.Get(AuthToken), - ClientCert: kv.Get(ClientCert), - ClientKey: kv.Get(ClientKey), - QueueSize: queueSize, - } - } - for starget, kv := range scfg[config.AuditWebhookSubSys] { if l, ok := cfg.AuditWebhook[starget]; ok && l.Enabled { // Ignore this audit config since another target @@ -617,10 +614,49 @@ func LookupConfig(scfg config.Config) (Config, error) { } } - cfg.AuditKafka, err = GetAuditKafka(scfg[config.AuditKafkaSubSys]) + return cfg, nil +} + +// LookupConfig - lookup logger config, override with ENVs if set. +func LookupConfig(scfg config.Config) (Config, error) { + // Lookup for legacy environment variables first + cfg, err := lookupLegacyConfig() if err != nil { return cfg, err } + for _, ss := range config.LoggerSubSystems.ToSlice() { + lookupConfigForSubSys(scfg, cfg, ss) + } + return cfg, nil } + +func lookupConfigForSubSys(scfg config.Config, cfg Config, subSys string) (Config, error) { + switch subSys { + case config.LoggerWebhookSubSys: + if _, err := lookupLoggerWebhookConfig(scfg, cfg); err != nil { + return cfg, err + } + case config.AuditWebhookSubSys: + if _, err := lookupAuditWebhookConfig(scfg, cfg); err != nil { + return cfg, err + } + case config.AuditKafkaSubSys: + if _, err := GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil { + return cfg, err + } + } + return cfg, nil +} + +// ValidateSubSysConfig - validates logger related config of given sub-system +func ValidateSubSysConfig(scfg config.Config, subSys string) error { + // Lookup for legacy environment variables first + cfg, err := lookupLegacyConfig() + if err != nil { + return err + } + _, err = lookupConfigForSubSys(scfg, cfg, subSys) + return err +}