Improvements in logger and audit webhooks (#16102)

This commit is contained in:
Shireesh Anjal 2022-11-28 21:33:26 +05:30 committed by GitHub
parent 9b1e70e4f9
commit 98a67a3776
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 96 additions and 84 deletions

View File

@ -2519,63 +2519,21 @@ func fetchKMSStatus() madmin.KMS {
func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) {
var loggerInfo []madmin.Logger var loggerInfo []madmin.Logger
var auditloggerInfo []madmin.Audit var auditloggerInfo []madmin.Audit
for _, target := range logger.SystemTargets() { for _, tgt := range logger.SystemTargets() {
if target.Endpoint() != "" { if tgt.Endpoint() != "" {
tgt := target.String() loggerInfo = append(loggerInfo, madmin.Logger{tgt.String(): logger.TargetStatus(tgt)})
err := checkConnection(target.Endpoint(), 15*time.Second)
if err == nil {
mapLog := make(map[string]madmin.Status)
mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOnline)}
loggerInfo = append(loggerInfo, mapLog)
} else {
mapLog := make(map[string]madmin.Status)
mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOffline)}
loggerInfo = append(loggerInfo, mapLog)
}
} }
} }
for _, target := range logger.AuditTargets() { for _, tgt := range logger.AuditTargets() {
if target.Endpoint() != "" { if tgt.Endpoint() != "" {
tgt := target.String() auditloggerInfo = append(auditloggerInfo, madmin.Audit{tgt.String(): logger.TargetStatus(tgt)})
err := checkConnection(target.Endpoint(), 15*time.Second)
if err == nil {
mapAudit := make(map[string]madmin.Status)
mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOnline)}
auditloggerInfo = append(auditloggerInfo, mapAudit)
} else {
mapAudit := make(map[string]madmin.Status)
mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOffline)}
auditloggerInfo = append(auditloggerInfo, mapAudit)
}
} }
} }
return loggerInfo, auditloggerInfo return loggerInfo, auditloggerInfo
} }
// checkConnection - ping an endpoint , return err in case of no connection
func checkConnection(endpointStr string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(GlobalContext, timeout)
defer cancel()
client := &http.Client{
Transport: globalProxyTransport,
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, endpointStr, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer xhttp.DrainBody(resp.Body)
return nil
}
func embedFileInZip(zipWriter *zip.Writer, name string, data []byte) error { func embedFileInZip(zipWriter *zip.Writer, name string, data []byte) error {
// Send profiling data to zip as file // Send profiling data to zip as file
header, zerr := zip.FileInfoHeader(dummyFileInfo{ header, zerr := zip.FileInfoHeader(dummyFileInfo{

View File

@ -590,11 +590,11 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
l.LogOnce = logger.LogOnceConsoleIf l.LogOnce = logger.LogOnceConsoleIf
l.UserAgent = userAgent l.UserAgent = userAgent
l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
}
loggerCfg.HTTP[n] = l loggerCfg.HTTP[n] = l
} }
} if errs := logger.UpdateSystemTargets(loggerCfg); len(errs) > 0 {
if err = logger.UpdateSystemTargets(loggerCfg); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs))
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
} }
case config.AuditWebhookSubSys: case config.AuditWebhookSubSys:
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys) loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys)
@ -607,12 +607,12 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
l.LogOnce = logger.LogOnceConsoleIf l.LogOnce = logger.LogOnceConsoleIf
l.UserAgent = userAgent l.UserAgent = userAgent
l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
}
loggerCfg.AuditWebhook[n] = l loggerCfg.AuditWebhook[n] = l
} }
}
if err = logger.UpdateAuditWebhookTargets(loggerCfg); err != nil { if errs := logger.UpdateAuditWebhookTargets(loggerCfg); len(errs) > 0 {
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs))
} }
case config.AuditKafkaSubSys: case config.AuditKafkaSubSys:
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys) loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys)
@ -625,8 +625,8 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
loggerCfg.AuditKafka[n] = l loggerCfg.AuditKafka[n] = l
} }
} }
if err = logger.UpdateAuditKafkaTargets(loggerCfg); err != nil { if errs := logger.UpdateAuditKafkaTargets(loggerCfg); len(errs) > 0 {
logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %v", errs))
} }
case config.StorageClassSubSys: case config.StorageClassSubSys:
for i, setDriveCount := range setDriveCounts { for i, setDriveCount := range setDriveCounts {

View File

@ -57,6 +57,11 @@ func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys {
} }
} }
// IsOnline always true in case of console logger
func (sys *HTTPConsoleLoggerSys) IsOnline() bool {
return true
}
// SetNodeName - sets the node name if any after distributed setup has initialized // SetNodeName - sets the node name if any after distributed setup has initialized
func (sys *HTTPConsoleLoggerSys) SetNodeName(nodeName string) { func (sys *HTTPConsoleLoggerSys) SetNodeName(nodeName string) {
if !globalIsDistErasure { if !globalIsDistErasure {

View File

@ -62,6 +62,10 @@ func (t *testingLogger) Type() types.TargetType {
return types.TargetHTTP return types.TargetHTTP
} }
func (t *testingLogger) IsOnline() bool {
return true
}
// Stats returns the target statistics. // Stats returns the target statistics.
func (t *testingLogger) Stats() types.TargetStats { func (t *testingLogger) Stats() types.TargetStats {
return types.TargetStats{} return types.TargetStats{}

View File

@ -459,6 +459,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
ClientCert: env.Get(clientCertEnv, ""), ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""), ClientKey: env.Get(clientKeyEnv, ""),
QueueSize: queueSize, QueueSize: queueSize,
Name: target,
} }
} }
@ -501,6 +502,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
ClientCert: kv.Get(ClientCert), ClientCert: kv.Get(ClientCert),
ClientKey: kv.Get(ClientKey), ClientKey: kv.Get(ClientKey),
QueueSize: queueSize, QueueSize: queueSize,
Name: starget,
} }
} }
@ -570,6 +572,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
ClientCert: env.Get(clientCertEnv, ""), ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""), ClientKey: env.Get(clientKeyEnv, ""),
QueueSize: queueSize, QueueSize: queueSize,
Name: target,
} }
} }
@ -613,6 +616,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
ClientCert: kv.Get(ClientCert), ClientCert: kv.Get(ClientCert),
ClientKey: kv.Get(ClientKey), ClientKey: kv.Get(ClientKey),
QueueSize: queueSize, QueueSize: queueSize,
Name: starget,
} }
} }

View File

@ -77,6 +77,9 @@ type Target struct {
// Channel of log entries // Channel of log entries
logCh chan interface{} logCh chan interface{}
// is the target online?
online bool
config Config config Config
client *http.Client client *http.Client
} }
@ -90,6 +93,11 @@ func (h *Target) String() string {
return h.config.Name return h.config.Name
} }
// IsOnline returns true if the initialization was successful
func (h *Target) IsOnline() bool {
return h.online
}
// Stats returns the target statistics. // Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats { func (h *Target) Stats() types.TargetStats {
return types.TargetStats{ return types.TargetStats{
@ -140,6 +148,7 @@ func (h *Target) Init() error {
} }
h.lastStarted = time.Now() h.lastStarted = time.Now()
h.online = true
atomic.AddInt64(&h.workers, 1) atomic.AddInt64(&h.workers, 1)
go h.startHTTPLogger() go h.startHTTPLogger()
return nil return nil
@ -230,6 +239,7 @@ func New(config Config) *Target {
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
doneCh: make(chan struct{}), doneCh: make(chan struct{}),
config: config, config: config,
online: false,
} }
return h return h
@ -237,6 +247,10 @@ func New(config Config) *Target {
// Send log message 'e' to http target. // Send log message 'e' to http target.
func (h *Target) Send(entry interface{}) error { func (h *Target) Send(entry interface{}) error {
if !h.online {
return nil
}
select { select {
case <-h.doneCh: case <-h.doneCh:
return nil return nil

View File

@ -46,6 +46,9 @@ type Target struct {
// Channel of log entries // Channel of log entries
logCh chan audit.Entry logCh chan audit.Entry
// is the target online?
online bool
producer sarama.SyncProducer producer sarama.SyncProducer
kconfig Config kconfig Config
config *sarama.Config config *sarama.Config
@ -53,6 +56,10 @@ type Target struct {
// Send log message 'e' to kafka target. // Send log message 'e' to kafka target.
func (h *Target) Send(entry interface{}) error { func (h *Target) Send(entry interface{}) error {
if !h.online {
return nil
}
select { select {
case <-h.doneCh: case <-h.doneCh:
return nil return nil
@ -173,6 +180,11 @@ func (h *Target) String() string {
return "kafka" return "kafka"
} }
// IsOnline returns true if the initialization was successful
func (h *Target) IsOnline() bool {
return h.online
}
// Init initialize kafka target // Init initialize kafka target
func (h *Target) Init() error { func (h *Target) Init() error {
if !h.kconfig.Enabled { if !h.kconfig.Enabled {
@ -232,6 +244,7 @@ func (h *Target) Init() error {
} }
h.producer = producer h.producer = producer
h.online = true
go h.startKakfaLogger() go h.startKakfaLogger()
return nil return nil
} }
@ -250,6 +263,7 @@ func New(config Config) *Target {
logCh: make(chan audit.Entry, 10000), logCh: make(chan audit.Entry, 10000),
doneCh: make(chan struct{}), doneCh: make(chan struct{}),
kconfig: config, kconfig: config,
online: false,
} }
return target return target
} }

View File

@ -22,6 +22,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger/target/http" "github.com/minio/minio/internal/logger/target/http"
"github.com/minio/minio/internal/logger/target/kafka" "github.com/minio/minio/internal/logger/target/kafka"
"github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/logger/target/types"
@ -36,6 +37,7 @@ type Target interface {
Endpoint() string Endpoint() string
Stats() types.TargetStats Stats() types.TargetStats
Init() error Init() error
IsOnline() bool
Cancel() Cancel()
Send(entry interface{}) error Send(entry interface{}) error
Type() types.TargetType Type() types.TargetType
@ -54,6 +56,18 @@ var (
consoleTgt Target consoleTgt Target
) )
// TargetStatus returns status of the target (online|offline)
func TargetStatus(h Target) madmin.Status {
if h.IsOnline() {
return madmin.Status{Status: string(madmin.ItemOnline)}
}
// Previous initialization had failed. Try again.
if e := h.Init(); e == nil {
return madmin.Status{Status: string(madmin.ItemOnline)}
}
return madmin.Status{Status: string(madmin.ItemOffline)}
}
// SystemTargets returns active targets. // SystemTargets returns active targets.
// Returned slice may not be modified in any way. // Returned slice may not be modified in any way.
func SystemTargets() []Target { func SystemTargets() []Target {
@ -130,30 +144,38 @@ func AddSystemTarget(t Target) error {
return nil return nil
} }
func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) { func initSystemTargets(cfgMap map[string]http.Config) ([]Target, []error) {
tgts := []Target{}
errs := []error{}
for _, l := range cfgMap { for _, l := range cfgMap {
if l.Enabled { if l.Enabled {
t := http.New(l) t := http.New(l)
if err = t.Init(); err != nil {
return tgts, err
}
tgts = append(tgts, t) tgts = append(tgts, t)
e := t.Init()
if e != nil {
errs = append(errs, e)
} }
} }
return tgts, err }
return tgts, errs
} }
func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) { func initKafkaTargets(cfgMap map[string]kafka.Config) ([]Target, []error) {
tgts := []Target{}
errs := []error{}
for _, l := range cfgMap { for _, l := range cfgMap {
if l.Enabled { if l.Enabled {
t := kafka.New(l) t := kafka.New(l)
if err = t.Init(); err != nil {
return tgts, err
}
tgts = append(tgts, t) tgts = append(tgts, t)
e := t.Init()
if e != nil {
errs = append(errs, e)
} }
} }
return tgts, err }
return tgts, errs
} }
// Split targets into two groups: // Split targets into two groups:
@ -178,11 +200,8 @@ func cancelTargets(targets []Target) {
} }
// UpdateSystemTargets swaps targets with newly loaded ones from the cfg // UpdateSystemTargets swaps targets with newly loaded ones from the cfg
func UpdateSystemTargets(cfg Config) error { func UpdateSystemTargets(cfg Config) []error {
newTgts, err := initSystemTargets(cfg.HTTP) newTgts, errs := initSystemTargets(cfg.HTTP)
if err != nil {
return err
}
swapSystemMuRW.Lock() swapSystemMuRW.Lock()
consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole)
@ -191,15 +210,12 @@ func UpdateSystemTargets(cfg Config) error {
swapSystemMuRW.Unlock() swapSystemMuRW.Unlock()
cancelTargets(otherTargets) // cancel running targets cancelTargets(otherTargets) // cancel running targets
return nil return errs
} }
// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhookTargets(cfg Config) error { func UpdateAuditWebhookTargets(cfg Config) []error {
newWebhookTgts, err := initSystemTargets(cfg.AuditWebhook) newWebhookTgts, errs := initSystemTargets(cfg.AuditWebhook)
if err != nil {
return err
}
swapAuditMuRW.Lock() swapAuditMuRW.Lock()
// Retain kafka targets // Retain kafka targets
@ -209,15 +225,12 @@ func UpdateAuditWebhookTargets(cfg Config) error {
swapAuditMuRW.Unlock() swapAuditMuRW.Unlock()
cancelTargets(oldWebhookTgts) // cancel running targets cancelTargets(oldWebhookTgts) // cancel running targets
return nil return errs
} }
// UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg // UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg
func UpdateAuditKafkaTargets(cfg Config) error { func UpdateAuditKafkaTargets(cfg Config) []error {
newKafkaTgts, err := initKafkaTargets(cfg.AuditKafka) newKafkaTgts, errs := initKafkaTargets(cfg.AuditKafka)
if err != nil {
return err
}
swapAuditMuRW.Lock() swapAuditMuRW.Lock()
// Retain webhook targets // Retain webhook targets
@ -227,5 +240,5 @@ func UpdateAuditKafkaTargets(cfg Config) error {
swapAuditMuRW.Unlock() swapAuditMuRW.Unlock()
cancelTargets(oldKafkaTgts) // cancel running targets cancelTargets(oldKafkaTgts) // cancel running targets
return nil return errs
} }