Add configurable channel queue_size for audit/logger webhook targets (#13819)

Also log all the missed events and logs instead of silently
swallowing the events.

Bonus: Extend the logger webhook to support mTLS
similar to audit webhook target.
This commit is contained in:
Harshavardhana 2021-12-20 13:16:53 -08:00 committed by GitHub
parent 5cc16e098c
commit 499872f31d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 19 deletions

View File

@ -62,7 +62,7 @@ func initHelp() {
config.RegionSubSys: config.DefaultRegionKVS, config.RegionSubSys: config.DefaultRegionKVS,
config.APISubSys: api.DefaultKVS, config.APISubSys: api.DefaultKVS,
config.CredentialsSubSys: config.DefaultCredentialKVS, config.CredentialsSubSys: config.DefaultCredentialKVS,
config.LoggerWebhookSubSys: logger.DefaultKVS, config.LoggerWebhookSubSys: logger.DefaultLoggerWebhookKVS,
config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS, config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS,
config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS, config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS,
config.HealSubSys: heal.DefaultKVS, config.HealSubSys: heal.DefaultKVS,
@ -543,7 +543,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
loggerCfg, err := logger.LookupConfig(s) loggerCfg, err := logger.LookupConfig(s)
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err))
} }
for _, l := range loggerCfg.HTTP { for _, l := range loggerCfg.HTTP {
@ -553,7 +553,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
// Enable http logging // Enable http logging
if err = logger.AddTarget(http.New(l)); err != nil { if err = logger.AddTarget(http.New(l)); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize console HTTP target: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize server logger HTTP target: %w", err))
} }
} }
} }
@ -565,7 +565,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
// Enable http audit logging // Enable http audit logging
if err = logger.AddAuditTarget(http.New(l)); err != nil { if err = logger.AddAuditTarget(http.New(l)); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit HTTP target: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit HTTP target: %w", err))
} }
} }
} }
@ -575,7 +575,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
l.LogOnce = logger.LogOnceIf l.LogOnce = logger.LogOnceIf
// Enable Kafka audit logging // Enable Kafka audit logging
if err = logger.AddAuditTarget(kafka.New(l)); err != nil { if err = logger.AddAuditTarget(kafka.New(l)); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize audit Kafka target: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit Kafka target: %w", err))
} }
} }
} }

View File

@ -227,6 +227,8 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl
// Send audit logs only to http targets. // Send audit logs only to http targets.
for _, t := range AuditTargets() { for _, t := range AuditTargets() {
_ = t.Send(entry, string(All)) if err := t.Send(entry, string(All)); err != nil {
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), All)
}
} }
} }

View File

@ -19,6 +19,7 @@ package logger
import ( import (
"crypto/tls" "crypto/tls"
"errors"
"strconv" "strconv"
"strings" "strings"
@ -41,6 +42,7 @@ const (
AuthToken = "auth_token" AuthToken = "auth_token"
ClientCert = "client_cert" ClientCert = "client_cert"
ClientKey = "client_key" ClientKey = "client_key"
QueueSize = "queue_size"
KafkaBrokers = "brokers" KafkaBrokers = "brokers"
KafkaTopic = "topic" KafkaTopic = "topic"
@ -58,12 +60,16 @@ const (
EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE"
EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT"
EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN"
EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT"
EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY"
EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE"
EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE"
EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT"
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT"
EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY"
EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE"
EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS"
@ -82,7 +88,7 @@ const (
// Default KVS for loggerHTTP and loggerAuditHTTP // Default KVS for loggerHTTP and loggerAuditHTTP
var ( var (
DefaultKVS = config.KVS{ DefaultLoggerWebhookKVS = config.KVS{
config.KV{ config.KV{
Key: config.Enable, Key: config.Enable,
Value: config.EnableOff, Value: config.EnableOff,
@ -95,6 +101,18 @@ var (
Key: AuthToken, Key: AuthToken,
Value: "", Value: "",
}, },
config.KV{
Key: ClientCert,
Value: "",
},
config.KV{
Key: ClientKey,
Value: "",
},
config.KV{
Key: QueueSize,
Value: "100000",
},
} }
DefaultAuditWebhookKVS = config.KVS{ DefaultAuditWebhookKVS = config.KVS{
@ -118,6 +136,10 @@ var (
Key: ClientKey, Key: ClientKey,
Value: "", Value: "",
}, },
config.KV{
Key: QueueSize,
Value: "100000",
},
} }
DefaultAuditKafkaKVS = config.KVS{ DefaultAuditKafkaKVS = config.KVS{
@ -424,10 +446,36 @@ func LookupConfig(scfg config.Config) (Config, error) {
if target != config.Default { if target != config.Default {
authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target
} }
clientCertEnv := EnvLoggerWebhookClientCert
if target != config.Default {
clientCertEnv = EnvLoggerWebhookClientCert + config.Default + target
}
clientKeyEnv := EnvLoggerWebhookClientKey
if target != config.Default {
clientKeyEnv = EnvLoggerWebhookClientKey + config.Default + target
}
err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, ""))
if err != nil {
return cfg, err
}
queueSizeEnv := EnvAuditWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
cfg.HTTP[target] = http.Config{ cfg.HTTP[target] = http.Config{
Enabled: true, Enabled: true,
Endpoint: env.Get(endpointEnv, ""), Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""), AuthToken: env.Get(authTokenEnv, ""),
ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""),
QueueSize: queueSize,
} }
} }
@ -465,12 +513,24 @@ func LookupConfig(scfg config.Config) (Config, error) {
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
queueSizeEnv := EnvAuditWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
cfg.AuditWebhook[target] = http.Config{ cfg.AuditWebhook[target] = http.Config{
Enabled: true, Enabled: true,
Endpoint: env.Get(endpointEnv, ""), Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""), AuthToken: env.Get(authTokenEnv, ""),
ClientCert: env.Get(clientCertEnv, ""), ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""), ClientKey: env.Get(clientKeyEnv, ""),
QueueSize: queueSize,
} }
} }
@ -485,7 +545,7 @@ func LookupConfig(scfg config.Config) (Config, error) {
if starget != config.Default { if starget != config.Default {
subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget
} }
if err := config.CheckValidKeys(subSysTarget, kv, DefaultKVS); err != nil { if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil {
return cfg, err return cfg, err
} }
enabled, err := config.ParseBool(kv.Get(config.Enable)) enabled, err := config.ParseBool(kv.Get(config.Enable))
@ -495,10 +555,24 @@ func LookupConfig(scfg config.Config) (Config, error) {
if !enabled { if !enabled {
continue continue
} }
err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey))
if err != nil {
return cfg, err
}
queueSize, err := strconv.Atoi(kv.Get(QueueSize))
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
cfg.HTTP[starget] = http.Config{ cfg.HTTP[starget] = http.Config{
Enabled: true, Enabled: true,
Endpoint: kv.Get(Endpoint), Endpoint: kv.Get(Endpoint),
AuthToken: kv.Get(AuthToken), AuthToken: kv.Get(AuthToken),
ClientCert: kv.Get(ClientCert),
ClientKey: kv.Get(ClientKey),
QueueSize: queueSize,
} }
} }
@ -527,12 +601,20 @@ func LookupConfig(scfg config.Config) (Config, error) {
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
queueSize, err := strconv.Atoi(kv.Get(QueueSize))
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
cfg.AuditWebhook[starget] = http.Config{ cfg.AuditWebhook[starget] = http.Config{
Enabled: true, Enabled: true,
Endpoint: kv.Get(Endpoint), Endpoint: kv.Get(Endpoint),
AuthToken: kv.Get(AuthToken), AuthToken: kv.Get(AuthToken),
ClientCert: kv.Get(ClientCert), ClientCert: kv.Get(ClientCert),
ClientKey: kv.Get(ClientKey), ClientKey: kv.Get(ClientKey),
QueueSize: queueSize,
} }
} }

View File

@ -37,6 +37,26 @@ var (
Type: "string", Type: "string",
Sensitive: true, Sensitive: true,
}, },
config.HelpKV{
Key: ClientCert,
Description: "mTLS certificate for Logger Webhook authentication",
Optional: true,
Type: "string",
Sensitive: true,
},
config.HelpKV{
Key: ClientKey,
Description: "mTLS certificate key for Logger Webhook authentication",
Optional: true,
Type: "string",
Sensitive: true,
},
config.HelpKV{
Key: QueueSize,
Description: "configure channel queue size for Logger Webhook targets",
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: config.Comment, Key: config.Comment,
Description: config.DefaultComment, Description: config.DefaultComment,
@ -73,6 +93,12 @@ var (
Type: "string", Type: "string",
Sensitive: true, Sensitive: true,
}, },
config.HelpKV{
Key: QueueSize,
Description: "configure channel queue size for Audit Webhook targets",
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: config.Comment, Key: config.Comment,
Description: config.DefaultComment, Description: config.DefaultComment,

View File

@ -374,7 +374,10 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
// Iterate over all logger targets to send the log entry // Iterate over all logger targets to send the log entry
for _, t := range Targets() { for _, t := range Targets() {
t.Send(entry, entry.LogKind) if err := t.Send(entry, entry.LogKind); err != nil {
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Logger target (%v): %v", entry, t, err), entry.LogKind)
}
} }
} }

View File

@ -42,6 +42,7 @@ type Config struct {
AuthToken string `json:"authToken"` AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"` ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"` ClientKey string `json:"clientKey"`
QueueSize int `json:"queueSize"`
Transport http.RoundTripper `json:"-"` Transport http.RoundTripper `json:"-"`
// Custom logger // Custom logger
@ -133,6 +134,7 @@ func (h *Target) startHTTPLogger() {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(logJSON)) h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil { if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
cancel() cancel()
continue continue
} }
@ -173,7 +175,7 @@ func (h *Target) startHTTPLogger() {
// sends log over http to the specified endpoint // sends log over http to the specified endpoint
func New(config Config) *Target { func New(config Config) *Target {
h := &Target{ h := &Target{
logCh: make(chan interface{}, 10000), logCh: make(chan interface{}, config.QueueSize),
config: config, config: config,
} }