diff --git a/cmd/config-current.go b/cmd/config-current.go index 6093fbf53..0e4176294 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -62,7 +62,7 @@ func initHelp() { config.RegionSubSys: config.DefaultRegionKVS, config.APISubSys: api.DefaultKVS, config.CredentialsSubSys: config.DefaultCredentialKVS, - config.LoggerWebhookSubSys: logger.DefaultKVS, + config.LoggerWebhookSubSys: logger.DefaultLoggerWebhookKVS, config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS, config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS, config.HealSubSys: heal.DefaultKVS, @@ -543,7 +543,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { loggerCfg, err := logger.LookupConfig(s) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger: %w", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err)) } for _, l := range loggerCfg.HTTP { @@ -553,7 +553,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) // Enable http logging if err = logger.AddTarget(http.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize console HTTP target: %w", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to initialize server logger HTTP target: %w", err)) } } } @@ -565,7 +565,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) // Enable http audit logging if err = logger.AddAuditTarget(http.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit HTTP target: %w", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit HTTP target: %w", err)) } } } @@ -575,7 +575,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { l.LogOnce = logger.LogOnceIf // Enable Kafka audit logging if err = logger.AddAuditTarget(kafka.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit Kafka target: %w", err)) + logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit Kafka target: %w", err)) } } } diff --git a/internal/logger/audit.go b/internal/logger/audit.go index e001819b3..28c65e816 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -227,6 +227,8 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl // Send audit logs only to http targets. for _, t := range AuditTargets() { - _ = t.Send(entry, string(All)) + if err := t.Send(entry, string(All)); err != nil { + LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), All) + } } } diff --git a/internal/logger/config.go b/internal/logger/config.go index 8a9508354..76d375902 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -19,6 +19,7 @@ package logger import ( "crypto/tls" + "errors" "strconv" "strings" @@ -41,6 +42,7 @@ const ( AuthToken = "auth_token" ClientCert = "client_cert" ClientKey = "client_key" + QueueSize = "queue_size" KafkaBrokers = "brokers" KafkaTopic = "topic" @@ -55,15 +57,19 @@ const ( KafkaClientTLSKey = "client_tls_key" KafkaVersion = "version" - EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" - EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" - EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" + EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" + EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" + EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" + EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" + EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" + EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" + EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" @@ -82,7 +88,7 @@ const ( // Default KVS for loggerHTTP and loggerAuditHTTP var ( - DefaultKVS = config.KVS{ + DefaultLoggerWebhookKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, @@ -95,6 +101,18 @@ var ( Key: AuthToken, Value: "", }, + config.KV{ + Key: ClientCert, + Value: "", + }, + config.KV{ + Key: ClientKey, + Value: "", + }, + config.KV{ + Key: QueueSize, + Value: "100000", + }, } DefaultAuditWebhookKVS = config.KVS{ @@ -118,6 +136,10 @@ var ( Key: ClientKey, Value: "", }, + config.KV{ + Key: QueueSize, + Value: "100000", + }, } DefaultAuditKafkaKVS = config.KVS{ @@ -424,10 +446,36 @@ func LookupConfig(scfg config.Config) (Config, error) { if target != config.Default { authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target } + clientCertEnv := EnvLoggerWebhookClientCert + if target != config.Default { + clientCertEnv = EnvLoggerWebhookClientCert + config.Default + target + } + clientKeyEnv := EnvLoggerWebhookClientKey + if target != config.Default { + clientKeyEnv = EnvLoggerWebhookClientKey + config.Default + target + } + err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, "")) + if err != nil { + return cfg, err + } + queueSizeEnv := EnvAuditWebhookQueueSize + if target != config.Default { + queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target + } + queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } cfg.HTTP[target] = http.Config{ - Enabled: true, - Endpoint: env.Get(endpointEnv, ""), - AuthToken: env.Get(authTokenEnv, ""), + Enabled: true, + Endpoint: env.Get(endpointEnv, ""), + AuthToken: env.Get(authTokenEnv, ""), + ClientCert: env.Get(clientCertEnv, ""), + ClientKey: env.Get(clientKeyEnv, ""), + QueueSize: queueSize, } } @@ -465,12 +513,24 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil { return cfg, err } + queueSizeEnv := EnvAuditWebhookQueueSize + if target != config.Default { + queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target + } + queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), + QueueSize: queueSize, } } @@ -485,7 +545,7 @@ func LookupConfig(scfg config.Config) (Config, error) { if starget != config.Default { subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget } - if err := config.CheckValidKeys(subSysTarget, kv, DefaultKVS); err != nil { + if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil { return cfg, err } enabled, err := config.ParseBool(kv.Get(config.Enable)) @@ -495,10 +555,24 @@ func LookupConfig(scfg config.Config) (Config, error) { if !enabled { continue } + err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) + if err != nil { + return cfg, err + } + queueSize, err := strconv.Atoi(kv.Get(QueueSize)) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } cfg.HTTP[starget] = http.Config{ - Enabled: true, - Endpoint: kv.Get(Endpoint), - AuthToken: kv.Get(AuthToken), + Enabled: true, + Endpoint: kv.Get(Endpoint), + AuthToken: kv.Get(AuthToken), + ClientCert: kv.Get(ClientCert), + ClientKey: kv.Get(ClientKey), + QueueSize: queueSize, } } @@ -527,12 +601,20 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil { return cfg, err } + queueSize, err := strconv.Atoi(kv.Get(QueueSize)) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } cfg.AuditWebhook[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), + QueueSize: queueSize, } } diff --git a/internal/logger/help.go b/internal/logger/help.go index dd9b1cc1b..cf2121b18 100644 --- a/internal/logger/help.go +++ b/internal/logger/help.go @@ -37,6 +37,26 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: ClientCert, + Description: "mTLS certificate for Logger Webhook authentication", + Optional: true, + Type: "string", + Sensitive: true, + }, + config.HelpKV{ + Key: ClientKey, + Description: "mTLS certificate key for Logger Webhook authentication", + Optional: true, + Type: "string", + Sensitive: true, + }, + config.HelpKV{ + Key: QueueSize, + Description: "configure channel queue size for Logger Webhook targets", + Optional: true, + Type: "number", + }, config.HelpKV{ Key: config.Comment, Description: config.DefaultComment, @@ -73,6 +93,12 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: QueueSize, + Description: "configure channel queue size for Audit Webhook targets", + Optional: true, + Type: "number", + }, config.HelpKV{ Key: config.Comment, Description: config.DefaultComment, diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 226048ca7..b18e093bc 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -374,7 +374,10 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) { // Iterate over all logger targets to send the log entry for _, t := range Targets() { - t.Send(entry, entry.LogKind) + if err := t.Send(entry, entry.LogKind); err != nil { + LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Logger target (%v): %v", entry, t, err), entry.LogKind) + } + } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index d08a9de9d..e2d57898e 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -42,6 +42,7 @@ type Config struct { AuthToken string `json:"authToken"` ClientCert string `json:"clientCert"` ClientKey string `json:"clientKey"` + QueueSize int `json:"queueSize"` Transport http.RoundTripper `json:"-"` // Custom logger @@ -133,6 +134,7 @@ func (h *Target) startHTTPLogger() { req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, bytes.NewReader(logJSON)) if err != nil { + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) cancel() continue } @@ -173,7 +175,7 @@ func (h *Target) startHTTPLogger() { // sends log over http to the specified endpoint func New(config Config) *Target { h := &Target{ - logCh: make(chan interface{}, 10000), + logCh: make(chan interface{}, config.QueueSize), config: config, }