From 3934700a085911195aba27f224f385b998a386cb Mon Sep 17 00:00:00 2001 From: Shireesh Anjal <355479+anjalshireesh@users.noreply.github.com> Date: Thu, 24 Feb 2022 22:35:33 +0530 Subject: [PATCH] Make audit webhook and kafka config dynamic (#14390) --- cmd/admin-handlers.go | 2 +- cmd/config-current.go | 76 +++++++------- cmd/consolelogger.go | 8 +- cmd/data-update-tracker_test.go | 7 +- cmd/gateway-main.go | 2 +- cmd/server-main.go | 2 +- internal/config/config.go | 2 + internal/logger/config.go | 142 ++++++++++++-------------- internal/logger/logger.go | 2 +- internal/logger/target/http/http.go | 6 ++ internal/logger/target/kafka/kafka.go | 63 ++++++++---- internal/logger/target/types/types.go | 29 ++++++ internal/logger/targets.go | 128 ++++++++++++++++------- 13 files changed, 288 insertions(+), 181 deletions(-) create mode 100644 internal/logger/target/types/types.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index bf1660aa7..6a50f3ab1 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2222,7 +2222,7 @@ func fetchKMSStatus() madmin.KMS { func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { var loggerInfo []madmin.Logger var auditloggerInfo []madmin.Audit - for _, target := range logger.Targets() { + for _, target := range logger.SystemTargets() { if target.Endpoint() != "" { tgt := target.String() err := checkConnection(target.Endpoint(), 15*time.Second) diff --git a/cmd/config-current.go b/cmd/config-current.go index 3e268a114..3e8e922bf 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -44,8 +44,6 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/logger/target/http" - "github.com/minio/minio/internal/logger/target/kafka" "github.com/minio/pkg/env" ) @@ -353,12 +351,6 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er } } - if config.LoggerSubSystems.Contains(subSys) { - if err := logger.ValidateSubSysConfig(s, subSys); err != nil { - return err - } - } - if config.NotifySubSystems.Contains(subSys) { if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil { return err @@ -566,36 +558,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err)) } - // Load logger targets based on user's configuration - loggerUserAgent := getUserAgent(getMinioMode()) - - loggerCfg, err := logger.LookupConfig(s) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err)) - } - - for _, l := range loggerCfg.AuditWebhook { - if l.Enabled { - l.LogOnce = logger.LogOnceIf - l.UserAgent = loggerUserAgent - l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) - // Enable http audit logging - if err = logger.AddAuditTarget(http.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit HTTP target: %w", err)) - } - } - } - - for _, l := range loggerCfg.AuditKafka { - if l.Enabled { - l.LogOnce = logger.LogOnceIf - // Enable Kafka audit logging - if err = logger.AddAuditTarget(kafka.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit Kafka target: %w", err)) - } - } - } - globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), false) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) @@ -657,7 +619,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf scannerCycle.Update(scannerCfg.Cycle) logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait)) case config.LoggerWebhookSubSys: - loggerCfg, err := logger.LookupConfig(s) + loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err)) } @@ -670,10 +632,44 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf loggerCfg.HTTP[n] = l } } - err = logger.UpdateTargets(loggerCfg) + err = logger.UpdateSystemTargets(loggerCfg) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err)) } + case config.AuditWebhookSubSys: + loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err)) + } + userAgent := getUserAgent(getMinioMode()) + for n, l := range loggerCfg.AuditWebhook { + if l.Enabled { + l.LogOnce = logger.LogOnceIf + l.UserAgent = userAgent + l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) + loggerCfg.AuditWebhook[n] = l + } + } + + err = logger.UpdateAuditWebhookTargets(loggerCfg) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %w", err)) + } + case config.AuditKafkaSubSys: + loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err)) + } + for n, l := range loggerCfg.AuditKafka { + if l.Enabled { + l.LogOnce = logger.LogOnceIf + loggerCfg.AuditKafka[n] = l + } + } + err = logger.UpdateAuditKafkaTargets(loggerCfg) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err)) + } } globalServerConfigMu.Lock() defer globalServerConfigMu.Unlock() diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index ce22c21e1..e1d1479b8 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -25,6 +25,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger/message/log" "github.com/minio/minio/internal/logger/target/console" + "github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/pubsub" xnet "github.com/minio/pkg/net" ) @@ -77,7 +78,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) { // Enable console logging for remote client. if !sys.HasLogListeners() { - logger.AddTarget(sys) + logger.AddSystemTarget(sys) } cnt := 0 @@ -154,6 +155,11 @@ func (sys *HTTPConsoleLoggerSys) Content() (logs []log.Entry) { func (sys *HTTPConsoleLoggerSys) Cancel() { } +// Type - returns type of the target +func (sys *HTTPConsoleLoggerSys) Type() types.TargetType { + return types.TargetConsole +} + // Send log message 'e' to console and publish to console // log pubsub system func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error { diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index 995c00453..c76d9ed1c 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger/message/log" + "github.com/minio/minio/internal/logger/target/types" ) type testLoggerI interface { @@ -58,6 +59,10 @@ func (t *testingLogger) Init() error { func (t *testingLogger) Cancel() { } +func (t *testingLogger) Type() types.TargetType { + return types.TargetHTTP +} + func (t *testingLogger) Send(entry interface{}, errKind string) error { t.mu.Lock() defer t.mu.Unlock() @@ -76,7 +81,7 @@ func (t *testingLogger) Send(entry interface{}, errKind string) error { func addTestingLogging(t testLoggerI) func() { tl := &testingLogger{t: t} - logger.AddTarget(tl) + logger.AddSystemTarget(tl) return func() { tl.mu.Lock() defer tl.mu.Unlock() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 6b10a02ba..e3298b574 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -176,7 +176,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { // Initialize globalConsoleSys system globalConsoleSys = NewConsoleLogger(GlobalContext) - logger.AddTarget(globalConsoleSys) + logger.AddSystemTarget(globalConsoleSys) // Handle common command args. handleCommonCmdArgs(ctx) diff --git a/cmd/server-main.go b/cmd/server-main.go index 9ecd102d5..2056678c9 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -401,7 +401,7 @@ func serverMain(ctx *cli.Context) { // Initialize globalConsoleSys system globalConsoleSys = NewConsoleLogger(GlobalContext) - logger.AddTarget(globalConsoleSys) + logger.AddSystemTarget(globalConsoleSys) // Perform any self-tests bitrotSelfTest() diff --git a/internal/config/config.go b/internal/config/config.go index 35731150f..400cc7191 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -167,6 +167,8 @@ var SubSystemsDynamic = set.CreateStringSet( HealSubSys, SubnetSubSys, LoggerWebhookSubSys, + AuditWebhookSubSys, + AuditKafkaSubSys, ) // SubSystemsSingleTargets - subsystems which only support single target. diff --git a/internal/logger/config.go b/internal/logger/config.go index 8f7c14cf6..957e8228f 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -221,62 +221,65 @@ func NewConfig() Config { return cfg } -func lookupLegacyConfig() (Config, error) { +func lookupLegacyConfigForSubSys(subSys string) Config { cfg := NewConfig() - - var loggerTargets []string - envs := env.List(legacyEnvLoggerHTTPEndpoint) - for _, k := range envs { - target := strings.TrimPrefix(k, legacyEnvLoggerHTTPEndpoint+config.Default) - if target == legacyEnvLoggerHTTPEndpoint { - target = config.Default + switch subSys { + case config.LoggerWebhookSubSys: + var loggerTargets []string + envs := env.List(legacyEnvLoggerHTTPEndpoint) + for _, k := range envs { + target := strings.TrimPrefix(k, legacyEnvLoggerHTTPEndpoint+config.Default) + if target == legacyEnvLoggerHTTPEndpoint { + target = config.Default + } + loggerTargets = append(loggerTargets, target) } - loggerTargets = append(loggerTargets, target) + + // Load HTTP logger from the environment if found + for _, target := range loggerTargets { + endpointEnv := legacyEnvLoggerHTTPEndpoint + if target != config.Default { + endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target + } + endpoint := env.Get(endpointEnv, "") + if endpoint == "" { + continue + } + cfg.HTTP[target] = http.Config{ + Enabled: true, + Endpoint: endpoint, + } + } + + case config.AuditWebhookSubSys: + // List legacy audit ENVs if any. + var loggerAuditTargets []string + envs := env.List(legacyEnvAuditLoggerHTTPEndpoint) + for _, k := range envs { + target := strings.TrimPrefix(k, legacyEnvAuditLoggerHTTPEndpoint+config.Default) + if target == legacyEnvAuditLoggerHTTPEndpoint { + target = config.Default + } + loggerAuditTargets = append(loggerAuditTargets, target) + } + + for _, target := range loggerAuditTargets { + endpointEnv := legacyEnvAuditLoggerHTTPEndpoint + if target != config.Default { + endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target + } + endpoint := env.Get(endpointEnv, "") + if endpoint == "" { + continue + } + cfg.AuditWebhook[target] = http.Config{ + Enabled: true, + Endpoint: endpoint, + } + } + } - - // Load HTTP logger from the environment if found - for _, target := range loggerTargets { - endpointEnv := legacyEnvLoggerHTTPEndpoint - if target != config.Default { - endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target - } - endpoint := env.Get(endpointEnv, "") - if endpoint == "" { - continue - } - cfg.HTTP[target] = http.Config{ - Enabled: true, - Endpoint: endpoint, - } - } - - // List legacy audit ENVs if any. - var loggerAuditTargets []string - envs = env.List(legacyEnvAuditLoggerHTTPEndpoint) - for _, k := range envs { - target := strings.TrimPrefix(k, legacyEnvAuditLoggerHTTPEndpoint+config.Default) - if target == legacyEnvAuditLoggerHTTPEndpoint { - target = config.Default - } - loggerAuditTargets = append(loggerAuditTargets, target) - } - - for _, target := range loggerAuditTargets { - endpointEnv := legacyEnvAuditLoggerHTTPEndpoint - if target != config.Default { - endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target - } - endpoint := env.Get(endpointEnv, "") - if endpoint == "" { - continue - } - cfg.AuditWebhook[target] = http.Config{ - Enabled: true, - Endpoint: endpoint, - } - } - - return cfg, nil + return cfg } // GetAuditKafka - returns a map of registered notification 'kafka' targets @@ -604,6 +607,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } + cfg.AuditWebhook[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), @@ -617,33 +621,21 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { return cfg, nil } -// LookupConfig - lookup logger config, override with ENVs if set. -func LookupConfig(scfg config.Config) (Config, error) { - // Lookup for legacy environment variables first - cfg, err := lookupLegacyConfig() - if err != nil { - return cfg, err - } - - for _, ss := range config.LoggerSubSystems.ToSlice() { - lookupConfigForSubSys(scfg, cfg, ss) - } - - return cfg, nil -} - -func lookupConfigForSubSys(scfg config.Config, cfg Config, subSys string) (Config, error) { +// LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system +func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err error) { switch subSys { case config.LoggerWebhookSubSys: - if _, err := lookupLoggerWebhookConfig(scfg, cfg); err != nil { + cfg = lookupLegacyConfigForSubSys(config.LoggerWebhookSubSys) + if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil { return cfg, err } case config.AuditWebhookSubSys: - if _, err := lookupAuditWebhookConfig(scfg, cfg); err != nil { + cfg = lookupLegacyConfigForSubSys(config.AuditWebhookSubSys) + if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil { return cfg, err } case config.AuditKafkaSubSys: - if _, err := GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil { + if _, err = GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil { return cfg, err } } @@ -653,10 +645,6 @@ func lookupConfigForSubSys(scfg config.Config, cfg Config, subSys string) (Confi // ValidateSubSysConfig - validates logger related config of given sub-system func ValidateSubSysConfig(scfg config.Config, subSys string) error { // Lookup for legacy environment variables first - cfg, err := lookupLegacyConfig() - if err != nil { - return err - } - _, err = lookupConfigForSubSys(scfg, cfg, subSys) + _, err := LookupConfigForSubSys(scfg, subSys) return err } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 9388a10b9..5ef5244c7 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -358,7 +358,7 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) { } // Iterate over all logger targets to send the log entry - for _, t := range Targets() { + for _, t := range SystemTargets() { if err := t.Send(entry, entry.LogKind); err != nil { LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Logger target (%v): %v", entry, t, err), entry.LogKind) } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 38ed9ebb2..f811965a6 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -30,6 +30,7 @@ import ( "time" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger/target/types" ) // Timeout for the webhook http call @@ -222,3 +223,8 @@ func (h *Target) Cancel() { } h.wg.Wait() } + +// Type - returns type of the target +func (h *Target) Type() types.TargetType { + return types.TargetHTTP +} diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 7a8db3a8c..08003a4d0 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2022 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -24,16 +24,22 @@ import ( "encoding/json" "errors" "net" + "sync" + "sync/atomic" sarama "github.com/Shopify/sarama" saramatls "github.com/Shopify/sarama/tools/tls" "github.com/minio/minio/internal/logger/message/audit" + "github.com/minio/minio/internal/logger/target/types" xnet "github.com/minio/pkg/net" ) // Target - Kafka target. type Target struct { + status int32 + wg sync.WaitGroup + // Channel of log entries logCh chan interface{} @@ -55,30 +61,37 @@ func (h *Target) Send(entry interface{}, errKind string) error { return nil } +func (h *Target) logEntry(entry interface{}) { + h.wg.Add(1) + defer h.wg.Done() + + logJSON, err := json.Marshal(&entry) + if err != nil { + return + } + + ae, ok := entry.(audit.Entry) + if ok { + msg := sarama.ProducerMessage{ + Topic: h.kconfig.Topic, + Key: sarama.StringEncoder(ae.RequestID), + Value: sarama.ByteEncoder(logJSON), + } + + _, _, err = h.producer.SendMessage(&msg) + if err != nil { + h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) + return + } + } +} + func (h *Target) startKakfaLogger() { // Create a routine which sends json logs received // from an internal channel. go func() { for entry := range h.logCh { - logJSON, err := json.Marshal(&entry) - if err != nil { - continue - } - - ae, ok := entry.(audit.Entry) - if ok { - msg := sarama.ProducerMessage{ - Topic: h.kconfig.Topic, - Key: sarama.StringEncoder(ae.RequestID), - Value: sarama.ByteEncoder(logJSON), - } - - _, _, err = h.producer.SendMessage(&msg) - if err != nil { - h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) - continue - } - } + h.logEntry(entry) } }() } @@ -193,12 +206,17 @@ func (h *Target) Init() error { h.producer = producer + h.status = 1 go h.startKakfaLogger() return nil } // Cancel - cancels the target func (h *Target) Cancel() { + if atomic.CompareAndSwapInt32(&h.status, 1, 0) { + close(h.logCh) + } + h.wg.Wait() } // New initializes a new logger target which @@ -210,3 +228,8 @@ func New(config Config) *Target { } return target } + +// Type - returns type of the target +func (h *Target) Type() types.TargetType { + return types.TargetKafka +} diff --git a/internal/logger/target/types/types.go b/internal/logger/target/types/types.go new file mode 100644 index 000000000..99618696f --- /dev/null +++ b/internal/logger/target/types/types.go @@ -0,0 +1,29 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package types + +// TargetType indicates type of the target e.g. console, http, kafka +type TargetType uint8 + +// Constants for target types +const ( + _ TargetType = iota + TargetConsole + TargetHTTP + TargetKafka +) diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 18560410e..4146a9053 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2022 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -22,6 +22,8 @@ import ( "sync/atomic" "github.com/minio/minio/internal/logger/target/http" + "github.com/minio/minio/internal/logger/target/kafka" + "github.com/minio/minio/internal/logger/target/types" ) // Target is the entity that we will receive @@ -33,28 +35,29 @@ type Target interface { Init() error Cancel() Send(entry interface{}, errKind string) error + Type() types.TargetType } var ( // swapMu must be held while reading slice info or swapping targets or auditTargets. swapMu sync.Mutex - // targets is the set of enabled loggers. + // systemTargets is the set of enabled loggers. // Must be immutable at all times. // Can be swapped to another while holding swapMu - targets = []Target{} - nTargets int32 // atomic count of len(targets) + systemTargets = []Target{} + nTargets int32 // atomic count of len(targets) ) -// Targets returns active targets. +// SystemTargets returns active targets. // Returned slice may not be modified in any way. -func Targets() []Target { +func SystemTargets() []Target { if atomic.LoadInt32(&nTargets) == 0 { // Lock free if none... return nil } swapMu.Lock() - res := targets + res := systemTargets swapMu.Unlock() return res } @@ -80,46 +83,30 @@ var ( nAuditTargets int32 // atomic count of len(auditTargets) ) -// AddAuditTarget adds a new audit logger target to the +// AddSystemTarget adds a new logger target to the // list of enabled loggers -func AddAuditTarget(t Target) error { - if err := t.Init(); err != nil { - return err - } - - swapMu.Lock() - updated := append(make([]Target, 0, len(auditTargets)+1), auditTargets...) - updated = append(updated, t) - auditTargets = updated - atomic.StoreInt32(&nAuditTargets, int32(len(updated))) - swapMu.Unlock() - return nil -} - -// AddTarget adds a new logger target to the -// list of enabled loggers -func AddTarget(t Target) error { +func AddSystemTarget(t Target) error { if err := t.Init(); err != nil { return err } swapMu.Lock() - updated := append(make([]Target, 0, len(targets)+1), targets...) + updated := append(make([]Target, 0, len(systemTargets)+1), systemTargets...) updated = append(updated, t) - targets = updated + systemTargets = updated atomic.StoreInt32(&nTargets, int32(len(updated))) swapMu.Unlock() return nil } -func cancelAllTargets() { - for _, tgt := range targets { +func cancelAllSystemTargets() { + for _, tgt := range systemTargets { tgt.Cancel() } } -func initTargets(cfg Config) (tgts []Target, err error) { - for _, l := range cfg.HTTP { +func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) { + for _, l := range cfgMap { if l.Enabled { t := http.New(l) if err = t.Init(); err != nil { @@ -131,25 +118,90 @@ func initTargets(cfg Config) (tgts []Target, err error) { return tgts, err } -// UpdateTargets swaps targets with newly loaded ones from the cfg -func UpdateTargets(cfg Config) error { - updated, err := initTargets(cfg) +func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) { + for _, l := range cfgMap { + if l.Enabled { + t := kafka.New(l) + if err = t.Init(); err != nil { + return tgts, err + } + tgts = append(tgts, t) + } + } + return tgts, err +} + +// UpdateSystemTargets swaps targets with newly loaded ones from the cfg +func UpdateSystemTargets(cfg Config) error { + updated, err := initSystemTargets(cfg.HTTP) if err != nil { return err } swapMu.Lock() - for _, tgt := range targets { + for _, tgt := range systemTargets { // Preserve console target when dynamically updating // other HTTP targets, console target is always present. - if tgt.String() == ConsoleLoggerTgt { + if tgt.Type() == types.TargetConsole { updated = append(updated, tgt) break } } atomic.StoreInt32(&nTargets, int32(len(updated))) - cancelAllTargets() // cancel running targets - targets = updated + cancelAllSystemTargets() // cancel running targets + systemTargets = updated + swapMu.Unlock() + return nil +} + +func cancelAuditTargetType(t types.TargetType) { + for _, tgt := range auditTargets { + if tgt.Type() == t { + tgt.Cancel() + } + } +} + +func existingAuditTargets(t types.TargetType) []Target { + tgts := make([]Target, 0, len(auditTargets)) + for _, tgt := range auditTargets { + if tgt.Type() == t { + tgts = append(tgts, tgt) + } + } + return tgts +} + +// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg +func UpdateAuditWebhookTargets(cfg Config) error { + updated, err := initSystemTargets(cfg.AuditWebhook) + if err != nil { + return err + } + // retain kafka targets + updated = append(existingAuditTargets(types.TargetKafka), updated...) + + swapMu.Lock() + atomic.StoreInt32(&nAuditTargets, int32(len(updated))) + cancelAuditTargetType(types.TargetHTTP) // cancel running targets + auditTargets = updated + swapMu.Unlock() + return nil +} + +// UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg +func UpdateAuditKafkaTargets(cfg Config) error { + updated, err := initKafkaTargets(cfg.AuditKafka) + if err != nil { + return err + } + // retain HTTP targets + updated = append(existingAuditTargets(types.TargetHTTP), updated...) + + swapMu.Lock() + atomic.StoreInt32(&nAuditTargets, int32(len(updated))) + cancelAuditTargetType(types.TargetKafka) // cancel running targets + auditTargets = updated swapMu.Unlock() return nil }