diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index a18fdedbf..c8e5ebe45 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2519,63 +2519,21 @@ func fetchKMSStatus() madmin.KMS { func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { var loggerInfo []madmin.Logger var auditloggerInfo []madmin.Audit - for _, target := range logger.SystemTargets() { - if target.Endpoint() != "" { - tgt := target.String() - err := checkConnection(target.Endpoint(), 15*time.Second) - if err == nil { - mapLog := make(map[string]madmin.Status) - mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOnline)} - loggerInfo = append(loggerInfo, mapLog) - } else { - mapLog := make(map[string]madmin.Status) - mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOffline)} - loggerInfo = append(loggerInfo, mapLog) - } + for _, tgt := range logger.SystemTargets() { + if tgt.Endpoint() != "" { + loggerInfo = append(loggerInfo, madmin.Logger{tgt.String(): logger.TargetStatus(tgt)}) } } - for _, target := range logger.AuditTargets() { - if target.Endpoint() != "" { - tgt := target.String() - err := checkConnection(target.Endpoint(), 15*time.Second) - if err == nil { - mapAudit := make(map[string]madmin.Status) - mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOnline)} - auditloggerInfo = append(auditloggerInfo, mapAudit) - } else { - mapAudit := make(map[string]madmin.Status) - mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOffline)} - auditloggerInfo = append(auditloggerInfo, mapAudit) - } + for _, tgt := range logger.AuditTargets() { + if tgt.Endpoint() != "" { + auditloggerInfo = append(auditloggerInfo, madmin.Audit{tgt.String(): logger.TargetStatus(tgt)}) } } return loggerInfo, auditloggerInfo } -// checkConnection - ping an endpoint , return err in case of no connection -func checkConnection(endpointStr string, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(GlobalContext, timeout) - defer cancel() - - client := &http.Client{ - Transport: globalProxyTransport, - } - - req, err := http.NewRequestWithContext(ctx, http.MethodHead, endpointStr, nil) - if err != nil { - return err - } - - resp, err := client.Do(req) - if err != nil { - return err - } - defer xhttp.DrainBody(resp.Body) - return nil -} - func embedFileInZip(zipWriter *zip.Writer, name string, data []byte) error { // Send profiling data to zip as file header, zerr := zip.FileInfoHeader(dummyFileInfo{ diff --git a/cmd/config-current.go b/cmd/config-current.go index 80a3c000c..b1be6960a 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -590,11 +590,11 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf l.LogOnce = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) - loggerCfg.HTTP[n] = l } + loggerCfg.HTTP[n] = l } - if err = logger.UpdateSystemTargets(loggerCfg); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err)) + if errs := logger.UpdateSystemTargets(loggerCfg); len(errs) > 0 { + logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs)) } case config.AuditWebhookSubSys: loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys) @@ -607,12 +607,12 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf l.LogOnce = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) - loggerCfg.AuditWebhook[n] = l } + loggerCfg.AuditWebhook[n] = l } - if err = logger.UpdateAuditWebhookTargets(loggerCfg); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %w", err)) + if errs := logger.UpdateAuditWebhookTargets(loggerCfg); len(errs) > 0 { + logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs)) } case config.AuditKafkaSubSys: loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys) @@ -625,8 +625,8 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf loggerCfg.AuditKafka[n] = l } } - if err = logger.UpdateAuditKafkaTargets(loggerCfg); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err)) + if errs := logger.UpdateAuditKafkaTargets(loggerCfg); len(errs) > 0 { + logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %v", errs)) } case config.StorageClassSubSys: for i, setDriveCount := range setDriveCounts { diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index 78a9b98d8..925ba6b62 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -57,6 +57,11 @@ func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys { } } +// IsOnline always true in case of console logger +func (sys *HTTPConsoleLoggerSys) IsOnline() bool { + return true +} + // SetNodeName - sets the node name if any after distributed setup has initialized func (sys *HTTPConsoleLoggerSys) SetNodeName(nodeName string) { if !globalIsDistErasure { diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index 8da145b48..217853d34 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -62,6 +62,10 @@ func (t *testingLogger) Type() types.TargetType { return types.TargetHTTP } +func (t *testingLogger) IsOnline() bool { + return true +} + // Stats returns the target statistics. func (t *testingLogger) Stats() types.TargetStats { return types.TargetStats{} diff --git a/internal/logger/config.go b/internal/logger/config.go index 7a6cc16db..478e61b37 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -459,6 +459,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), QueueSize: queueSize, + Name: target, } } @@ -501,6 +502,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), QueueSize: queueSize, + Name: starget, } } @@ -570,6 +572,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), QueueSize: queueSize, + Name: target, } } @@ -613,6 +616,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), QueueSize: queueSize, + Name: starget, } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 4bbe63d19..4bb7b6105 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -77,6 +77,9 @@ type Target struct { // Channel of log entries logCh chan interface{} + // is the target online? + online bool + config Config client *http.Client } @@ -90,6 +93,11 @@ func (h *Target) String() string { return h.config.Name } +// IsOnline returns true if the initialization was successful +func (h *Target) IsOnline() bool { + return h.online +} + // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { return types.TargetStats{ @@ -140,6 +148,7 @@ func (h *Target) Init() error { } h.lastStarted = time.Now() + h.online = true atomic.AddInt64(&h.workers, 1) go h.startHTTPLogger() return nil @@ -230,6 +239,7 @@ func New(config Config) *Target { logCh: make(chan interface{}, config.QueueSize), doneCh: make(chan struct{}), config: config, + online: false, } return h @@ -237,6 +247,10 @@ func New(config Config) *Target { // Send log message 'e' to http target. func (h *Target) Send(entry interface{}) error { + if !h.online { + return nil + } + select { case <-h.doneCh: return nil diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 42512dd9b..a43e0ace9 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -46,6 +46,9 @@ type Target struct { // Channel of log entries logCh chan audit.Entry + // is the target online? + online bool + producer sarama.SyncProducer kconfig Config config *sarama.Config @@ -53,6 +56,10 @@ type Target struct { // Send log message 'e' to kafka target. func (h *Target) Send(entry interface{}) error { + if !h.online { + return nil + } + select { case <-h.doneCh: return nil @@ -173,6 +180,11 @@ func (h *Target) String() string { return "kafka" } +// IsOnline returns true if the initialization was successful +func (h *Target) IsOnline() bool { + return h.online +} + // Init initialize kafka target func (h *Target) Init() error { if !h.kconfig.Enabled { @@ -232,6 +244,7 @@ func (h *Target) Init() error { } h.producer = producer + h.online = true go h.startKakfaLogger() return nil } @@ -250,6 +263,7 @@ func New(config Config) *Target { logCh: make(chan audit.Entry, 10000), doneCh: make(chan struct{}), kconfig: config, + online: false, } return target } diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 7c5f1a8ed..91961191f 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -22,6 +22,7 @@ import ( "strings" "sync" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/logger/target/http" "github.com/minio/minio/internal/logger/target/kafka" "github.com/minio/minio/internal/logger/target/types" @@ -36,6 +37,7 @@ type Target interface { Endpoint() string Stats() types.TargetStats Init() error + IsOnline() bool Cancel() Send(entry interface{}) error Type() types.TargetType @@ -54,6 +56,18 @@ var ( consoleTgt Target ) +// TargetStatus returns status of the target (online|offline) +func TargetStatus(h Target) madmin.Status { + if h.IsOnline() { + return madmin.Status{Status: string(madmin.ItemOnline)} + } + // Previous initialization had failed. Try again. + if e := h.Init(); e == nil { + return madmin.Status{Status: string(madmin.ItemOnline)} + } + return madmin.Status{Status: string(madmin.ItemOffline)} +} + // SystemTargets returns active targets. // Returned slice may not be modified in any way. func SystemTargets() []Target { @@ -130,30 +144,38 @@ func AddSystemTarget(t Target) error { return nil } -func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) { +func initSystemTargets(cfgMap map[string]http.Config) ([]Target, []error) { + tgts := []Target{} + errs := []error{} for _, l := range cfgMap { if l.Enabled { t := http.New(l) - if err = t.Init(); err != nil { - return tgts, err - } tgts = append(tgts, t) + + e := t.Init() + if e != nil { + errs = append(errs, e) + } } } - return tgts, err + return tgts, errs } -func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) { +func initKafkaTargets(cfgMap map[string]kafka.Config) ([]Target, []error) { + tgts := []Target{} + errs := []error{} for _, l := range cfgMap { if l.Enabled { t := kafka.New(l) - if err = t.Init(); err != nil { - return tgts, err - } tgts = append(tgts, t) + + e := t.Init() + if e != nil { + errs = append(errs, e) + } } } - return tgts, err + return tgts, errs } // Split targets into two groups: @@ -178,11 +200,8 @@ func cancelTargets(targets []Target) { } // UpdateSystemTargets swaps targets with newly loaded ones from the cfg -func UpdateSystemTargets(cfg Config) error { - newTgts, err := initSystemTargets(cfg.HTTP) - if err != nil { - return err - } +func UpdateSystemTargets(cfg Config) []error { + newTgts, errs := initSystemTargets(cfg.HTTP) swapSystemMuRW.Lock() consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) @@ -191,15 +210,12 @@ func UpdateSystemTargets(cfg Config) error { swapSystemMuRW.Unlock() cancelTargets(otherTargets) // cancel running targets - return nil + return errs } // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg -func UpdateAuditWebhookTargets(cfg Config) error { - newWebhookTgts, err := initSystemTargets(cfg.AuditWebhook) - if err != nil { - return err - } +func UpdateAuditWebhookTargets(cfg Config) []error { + newWebhookTgts, errs := initSystemTargets(cfg.AuditWebhook) swapAuditMuRW.Lock() // Retain kafka targets @@ -209,15 +225,12 @@ func UpdateAuditWebhookTargets(cfg Config) error { swapAuditMuRW.Unlock() cancelTargets(oldWebhookTgts) // cancel running targets - return nil + return errs } // UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg -func UpdateAuditKafkaTargets(cfg Config) error { - newKafkaTgts, err := initKafkaTargets(cfg.AuditKafka) - if err != nil { - return err - } +func UpdateAuditKafkaTargets(cfg Config) []error { + newKafkaTgts, errs := initKafkaTargets(cfg.AuditKafka) swapAuditMuRW.Lock() // Retain webhook targets @@ -227,5 +240,5 @@ func UpdateAuditKafkaTargets(cfg Config) error { swapAuditMuRW.Unlock() cancelTargets(oldKafkaTgts) // cancel running targets - return nil + return errs }