From ec427152198601ea65b435e7e42ddaebafeb993e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Dec 2021 21:24:12 -0800 Subject: [PATCH] configure audit queue_size --- cmd/config-current.go | 2 ++ cmd/logger/config.go | 24 ++++++++++++++++++++++++ cmd/logger/target/http/http.go | 23 +++++++++++++++++------ 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/cmd/config-current.go b/cmd/config-current.go index 3d026aca8..bcf75b997 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -506,6 +506,7 @@ func lookupConfigs(s config.Config, setDriveCount int) { http.WithTargetName(k), http.WithEndpoint(l.Endpoint), http.WithAuthToken(l.AuthToken), + http.WithQueueSize(l.QueueSize), http.WithUserAgent(loggerUserAgent), http.WithLogKind(string(logger.All)), http.WithTransport(NewGatewayHTTPTransport()), @@ -524,6 +525,7 @@ func lookupConfigs(s config.Config, setDriveCount int) { http.WithTargetName(k), http.WithEndpoint(l.Endpoint), http.WithAuthToken(l.AuthToken), + http.WithQueueSize(l.QueueSize), http.WithUserAgent(loggerUserAgent), http.WithLogKind(string(logger.All)), http.WithTransport(NewGatewayHTTPTransport()), diff --git a/cmd/logger/config.go b/cmd/logger/config.go index 2cc6e71f3..14dbb6469 100644 --- a/cmd/logger/config.go +++ b/cmd/logger/config.go @@ -17,6 +17,7 @@ package logger import ( + "strconv" "strings" "github.com/minio/minio/cmd/config" @@ -33,6 +34,7 @@ type HTTP struct { Enabled bool `json:"enabled"` Endpoint string `json:"endpoint"` AuthToken string `json:"authToken"` + QueueSize int `json:"queueSize"` } // Config console and http logger targets @@ -49,10 +51,12 @@ const ( EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" + EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" + EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" ) @@ -220,6 +224,14 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil || !enable { continue } + queueSizeEnv := EnvLoggerWebhookQueueSize + if target != config.Default { + queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target + } + queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + if err != nil { + continue + } endpointEnv := EnvLoggerWebhookEndpoint if target != config.Default { endpointEnv = EnvLoggerWebhookEndpoint + config.Default + target @@ -232,6 +244,7 @@ func LookupConfig(scfg config.Config) (Config, error) { Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), + QueueSize: queueSize, } } @@ -249,6 +262,14 @@ func LookupConfig(scfg config.Config) (Config, error) { if err != nil || !enable { continue } + queueSizeEnv := EnvAuditWebhookQueueSize + if target != config.Default { + queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target + } + queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + if err != nil { + continue + } endpointEnv := EnvAuditWebhookEndpoint if target != config.Default { endpointEnv = EnvAuditWebhookEndpoint + config.Default + target @@ -261,6 +282,7 @@ func LookupConfig(scfg config.Config) (Config, error) { Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), + QueueSize: queueSize, } } @@ -289,6 +311,7 @@ func LookupConfig(scfg config.Config) (Config, error) { Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), + QueueSize: 100000, } } @@ -317,6 +340,7 @@ func LookupConfig(scfg config.Config) (Config, error) { Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), + QueueSize: 100000, } } diff --git a/cmd/logger/target/http/http.go b/cmd/logger/target/http/http.go index fbdde3deb..ccec1abb5 100644 --- a/cmd/logger/target/http/http.go +++ b/cmd/logger/target/http/http.go @@ -61,7 +61,7 @@ func (h *Target) String() string { // Validate validate the http target func (h *Target) Validate() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.endpoint, strings.NewReader(`{}`)) @@ -107,6 +107,7 @@ func (h *Target) startHTTPLogger() { for entry := range h.logCh { logJSON, err := json.Marshal(&entry) if err != nil { + logger.LogIf(context.Background(), fmt.Errorf("failed to marshal %v: %v", entry, err)) continue } @@ -114,6 +115,8 @@ func (h *Target) startHTTPLogger() { req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.endpoint, bytes.NewReader(logJSON)) if err != nil { + logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", + h.endpoint, err)) cancel() continue } @@ -130,7 +133,7 @@ func (h *Target) startHTTPLogger() { resp, err := h.client.Do(req) cancel() if err != nil { - logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration\n", + logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.endpoint, err)) continue } @@ -199,12 +202,18 @@ func WithTransport(transport *http.Transport) Option { } } +// WithQueueSize adds a custom queue size to control the +// in-flux of requeusts +func WithQueueSize(size int) Option { + return func(t *Target) { + t.logCh = make(chan interface{}, size) + } +} + // New initializes a new logger target which // sends log over http to the specified endpoint func New(opts ...Option) *Target { - h := &Target{ - logCh: make(chan interface{}, 10000), - } + h := &Target{} // Loop through each option for _, opt := range opts { @@ -226,9 +235,11 @@ func (h *Target) Send(entry interface{}, errKind string) error { select { case h.logCh <- entry: default: + err := errors.New("log buffer full") + logger.LogIf(context.Background(), fmt.Errorf("audit event lost %v: %v", entry, err)) // log channel is full, do not wait and return // an error immediately to the caller - return errors.New("log buffer full") + return err } return nil