diff --git a/internal/logger/audit.go b/internal/logger/audit.go index 41f5cdac8..cd60c6004 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -144,7 +144,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl // Send audit logs only to http targets. for _, t := range auditTgts { if err := t.Send(ctx, entry); err != nil { - LogOnceIf(ctx, "logging", fmt.Errorf("Unable to send an audit event to the target `%v`: %v", t, err), "send-audit-event-failure") + LogOnceIf(ctx, "logging", fmt.Errorf("Unable to send audit event(s) to the target `%v`: %v", t, err), "send-audit-event-failure") } } } diff --git a/internal/logger/config.go b/internal/logger/config.go index 293d8fbe4..389b23021 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -21,8 +21,10 @@ import ( "context" "crypto/tls" "errors" + "fmt" "strconv" "strings" + "time" "github.com/minio/pkg/v3/env" xnet "github.com/minio/pkg/v3/net" @@ -39,14 +41,16 @@ type Console struct { // Audit/Logger constants const ( - Endpoint = "endpoint" - AuthToken = "auth_token" - ClientCert = "client_cert" - ClientKey = "client_key" - BatchSize = "batch_size" - QueueSize = "queue_size" - QueueDir = "queue_dir" - Proxy = "proxy" + Endpoint = "endpoint" + AuthToken = "auth_token" + ClientCert = "client_cert" + ClientKey = "client_key" + BatchSize = "batch_size" + QueueSize = "queue_size" + QueueDir = "queue_dir" + MaxRetry = "max_retry" + RetryInterval = "retry_interval" + Proxy = "proxy" KafkaBrokers = "brokers" KafkaTopic = "topic" @@ -63,24 +67,28 @@ const ( KafkaQueueDir = "queue_dir" KafkaQueueSize = "queue_size" - EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" - EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" - EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" - EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" - EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" - EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" - EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE" - EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" - EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" + EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" + EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" + EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" + EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" + EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" + EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" + EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE" + EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" + EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" + EnvLoggerWebhookMaxRetry = "MINIO_LOGGER_WEBHOOK_MAX_RETRY" + EnvLoggerWebhookRetryInterval = "MINIO_LOGGER_WEBHOOK_RETRY_INTERVAL" - EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" - EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" - EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" - EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" - EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" - EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE" - EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" - EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" + EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" + EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" + EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" + EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" + EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" + EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE" + EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" + EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" + EnvAuditWebhookMaxRetry = "MINIO_AUDIT_WEBHOOK_MAX_RETRY" + EnvAuditWebhookRetryInterval = "MINIO_AUDIT_WEBHOOK_RETRY_INTERVAL" EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" @@ -146,6 +154,14 @@ var ( Key: QueueDir, Value: "", }, + config.KV{ + Key: MaxRetry, + Value: "0", + }, + config.KV{ + Key: RetryInterval, + Value: "3s", + }, } DefaultAuditWebhookKVS = config.KVS{ @@ -181,6 +197,14 @@ var ( Key: QueueDir, Value: "", }, + config.KV{ + Key: MaxRetry, + Value: "0", + }, + config.KV{ + Key: RetryInterval, + Value: "3s", + }, } DefaultAuditKafkaKVS = config.KVS{ @@ -457,6 +481,19 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if batchSize <= 0 { return cfg, errInvalidBatchSize } + maxRetryCfgVal := getCfgVal(EnvLoggerWebhookMaxRetry, k, kv.Get(MaxRetry)) + maxRetry, err := strconv.Atoi(maxRetryCfgVal) + if err != nil { + return cfg, err + } + if maxRetry < 0 { + return cfg, fmt.Errorf("invalid %s max_retry", maxRetryCfgVal) + } + retryIntervalCfgVal := getCfgVal(EnvLoggerWebhookRetryInterval, k, kv.Get(RetryInterval)) + retryInterval, err := time.ParseDuration(retryIntervalCfgVal) + if err != nil { + return cfg, err + } cfg.HTTP[k] = http.Config{ Enabled: true, Endpoint: url, @@ -467,6 +504,8 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { BatchSize: batchSize, QueueSize: queueSize, QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)), + MaxRetry: maxRetry, + RetryIntvl: retryInterval, Name: loggerTargetNamePrefix + k, } } @@ -519,6 +558,19 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if batchSize <= 0 { return cfg, errInvalidBatchSize } + maxRetryCfgVal := getCfgVal(EnvAuditWebhookMaxRetry, k, kv.Get(MaxRetry)) + maxRetry, err := strconv.Atoi(maxRetryCfgVal) + if err != nil { + return cfg, err + } + if maxRetry < 0 { + return cfg, fmt.Errorf("invalid %s max_retry", maxRetryCfgVal) + } + retryIntervalCfgVal := getCfgVal(EnvAuditWebhookRetryInterval, k, kv.Get(RetryInterval)) + retryInterval, err := time.ParseDuration(retryIntervalCfgVal) + if err != nil { + return cfg, err + } cfg.AuditWebhook[k] = http.Config{ Enabled: true, Endpoint: url, @@ -528,6 +580,8 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { BatchSize: batchSize, QueueSize: queueSize, QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)), + MaxRetry: maxRetry, + RetryIntvl: retryInterval, Name: auditTargetNamePrefix + k, } } diff --git a/internal/logger/help.go b/internal/logger/help.go index b540774e7..d751200e5 100644 --- a/internal/logger/help.go +++ b/internal/logger/help.go @@ -131,6 +131,18 @@ var ( Optional: true, Type: "string", }, + config.HelpKV{ + Key: MaxRetry, + Description: `maximum retry count before we start dropping upto batch_size events`, + Optional: true, + Type: "number", + }, + config.HelpKV{ + Key: RetryInterval, + Description: `maximum retry sleeps between each retries`, + Optional: true, + Type: "duration", + }, config.HelpKV{ Key: config.Comment, Description: config.DefaultComment, diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 67022538a..17deadd41 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -79,6 +79,8 @@ type Config struct { BatchSize int `json:"batchSize"` QueueSize int `json:"queueSize"` QueueDir string `json:"queueDir"` + MaxRetry int `json:"maxRetry"` + RetryIntvl time.Duration `json:"retryInterval"` Proxy string `json:"string"` Transport http.RoundTripper `json:"-"` @@ -227,6 +229,7 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay if xnet.IsNetworkOrHostDown(err, false) { h.status.Store(statusOffline) } + h.failedMessages.Add(int64(payloadCount)) } else { h.status.Store(statusOnline) } @@ -257,7 +260,6 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay resp, err := h.client.Do(req) if err != nil { - h.failedMessages.Add(int64(payloadCount)) return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err) } @@ -268,10 +270,8 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay // accepted HTTP status codes. return nil } else if resp.StatusCode == http.StatusForbidden { - h.failedMessages.Add(int64(payloadCount)) return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status) } - h.failedMessages.Add(int64(payloadCount)) return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status) } @@ -326,9 +326,6 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } }() - var entry interface{} - var ok bool - var err error lastBatchProcess := time.Now() buf := bytebufferpool.Get() @@ -343,61 +340,76 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { globalBuffer := logChBuffers[name] logChLock.Unlock() - newTicker := time.NewTicker(time.Second) - isTick := false - var count int + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var count int for { - isTick = false - select { - case _ = <-newTicker.C: - isTick = true - case entry, _ = <-globalBuffer: - case entry, ok = <-h.logCh: - if !ok { + var ( + ok bool + entry any + ) + + if count < h.batchSize { + tickered := false + select { + case _ = <-ticker.C: + tickered = true + case entry, _ = <-globalBuffer: + case entry, ok = <-h.logCh: + if !ok { + return + } + case <-ctx.Done(): return } - case <-ctx.Done(): - return - } - if !isTick { - h.totalMessages.Add(1) - - if !isDirQueue { - if err := enc.Encode(&entry); err != nil { - h.config.LogOnceIf( - ctx, - fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), - h.Name(), - ) - h.failedMessages.Add(1) - continue + if !tickered { + h.totalMessages.Add(1) + if !isDirQueue { + if err := enc.Encode(&entry); err != nil { + h.config.LogOnceIf( + ctx, + fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), + h.Name(), + ) + h.failedMessages.Add(1) + continue + } + } else { + entries = append(entries, entry) } count++ - } else { - entries = append(entries, entry) - count++ } - } - if count != h.batchSize { if len(h.logCh) > 0 || len(globalBuffer) > 0 || count == 0 { + // there is something in the log queue + // process it first, even if we tickered + // first, or we have not received any events + // yet, still wait on it. continue } - if h.batchSize > 1 { - // If we are doing batching, we should wait - // at least one second before sending. - // Even if there is nothing in the queue. - if time.Since(lastBatchProcess).Seconds() < 1 { - continue - } + // If we are doing batching, we should wait + // at least for a second, before sending. + // Even if there is nothing in the queue. + if h.batchSize > 1 && time.Since(lastBatchProcess) < time.Second { + continue } } + // if we have reached the count send at once + // or we have crossed last second before batch was sent, send at once lastBatchProcess = time.Now() + var retries int + retryIntvl := h.config.RetryIntvl + if retryIntvl <= 0 { + retryIntvl = 3 * time.Second + } + + maxRetries := h.config.MaxRetry + retry: // If the channel reaches above half capacity // we spawn more workers. The workers spawned @@ -415,6 +427,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } } + var err error if !isDirQueue { err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout) } else { @@ -422,18 +435,24 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } if err != nil { - h.config.LogOnceIf( - context.Background(), - fmt.Errorf("unable to send webhook log entry(s) to '%s' err '%w': %d", name, err, count), - name, - ) - if errors.Is(err, context.Canceled) { return } - time.Sleep(3 * time.Second) - goto retry + h.config.LogOnceIf( + context.Background(), + fmt.Errorf("unable to send audit/log entry(s) to '%s' err '%w': %d", name, err, count), + name, + ) + + time.Sleep(retryIntvl) + if maxRetries == 0 { + goto retry + } + retries++ + if retries <= maxRetries { + goto retry + } } entries = make([]interface{}, 0)