Add support for audit/logger max retry and retry interval (#20402)

Current implementation retries forever until our
log buffer is full, and we start dropping events.

This PR allows you to set a value until we give
up on existing audit/logger batches to proceed to
process the new ones.

Bonus:
 - do not blow up buffers beyond batchSize value
 - do not leak the ticker if the worker returns
This commit is contained in:
Harshavardhana 2024-09-08 05:15:09 -07:00 committed by GitHub
parent 3f39da48ea
commit 8268c12cfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 162 additions and 77 deletions

View File

@ -144,7 +144,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl
// Send audit logs only to http targets.
for _, t := range auditTgts {
if err := t.Send(ctx, entry); err != nil {
LogOnceIf(ctx, "logging", fmt.Errorf("Unable to send an audit event to the target `%v`: %v", t, err), "send-audit-event-failure")
LogOnceIf(ctx, "logging", fmt.Errorf("Unable to send audit event(s) to the target `%v`: %v", t, err), "send-audit-event-failure")
}
}
}

View File

@ -21,8 +21,10 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/minio/pkg/v3/env"
xnet "github.com/minio/pkg/v3/net"
@ -39,14 +41,16 @@ type Console struct {
// Audit/Logger constants
const (
Endpoint = "endpoint"
AuthToken = "auth_token"
ClientCert = "client_cert"
ClientKey = "client_key"
BatchSize = "batch_size"
QueueSize = "queue_size"
QueueDir = "queue_dir"
Proxy = "proxy"
Endpoint = "endpoint"
AuthToken = "auth_token"
ClientCert = "client_cert"
ClientKey = "client_key"
BatchSize = "batch_size"
QueueSize = "queue_size"
QueueDir = "queue_dir"
MaxRetry = "max_retry"
RetryInterval = "retry_interval"
Proxy = "proxy"
KafkaBrokers = "brokers"
KafkaTopic = "topic"
@ -63,24 +67,28 @@ const (
KafkaQueueDir = "queue_dir"
KafkaQueueSize = "queue_size"
EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE"
EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT"
EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN"
EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT"
EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY"
EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY"
EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE"
EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE"
EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR"
EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE"
EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT"
EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN"
EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT"
EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY"
EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY"
EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE"
EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE"
EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR"
EnvLoggerWebhookMaxRetry = "MINIO_LOGGER_WEBHOOK_MAX_RETRY"
EnvLoggerWebhookRetryInterval = "MINIO_LOGGER_WEBHOOK_RETRY_INTERVAL"
EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE"
EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT"
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT"
EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY"
EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE"
EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE"
EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR"
EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE"
EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT"
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT"
EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY"
EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE"
EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE"
EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR"
EnvAuditWebhookMaxRetry = "MINIO_AUDIT_WEBHOOK_MAX_RETRY"
EnvAuditWebhookRetryInterval = "MINIO_AUDIT_WEBHOOK_RETRY_INTERVAL"
EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS"
@ -146,6 +154,14 @@ var (
Key: QueueDir,
Value: "",
},
config.KV{
Key: MaxRetry,
Value: "0",
},
config.KV{
Key: RetryInterval,
Value: "3s",
},
}
DefaultAuditWebhookKVS = config.KVS{
@ -181,6 +197,14 @@ var (
Key: QueueDir,
Value: "",
},
config.KV{
Key: MaxRetry,
Value: "0",
},
config.KV{
Key: RetryInterval,
Value: "3s",
},
}
DefaultAuditKafkaKVS = config.KVS{
@ -457,6 +481,19 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
if batchSize <= 0 {
return cfg, errInvalidBatchSize
}
maxRetryCfgVal := getCfgVal(EnvLoggerWebhookMaxRetry, k, kv.Get(MaxRetry))
maxRetry, err := strconv.Atoi(maxRetryCfgVal)
if err != nil {
return cfg, err
}
if maxRetry < 0 {
return cfg, fmt.Errorf("invalid %s max_retry", maxRetryCfgVal)
}
retryIntervalCfgVal := getCfgVal(EnvLoggerWebhookRetryInterval, k, kv.Get(RetryInterval))
retryInterval, err := time.ParseDuration(retryIntervalCfgVal)
if err != nil {
return cfg, err
}
cfg.HTTP[k] = http.Config{
Enabled: true,
Endpoint: url,
@ -467,6 +504,8 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
BatchSize: batchSize,
QueueSize: queueSize,
QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)),
MaxRetry: maxRetry,
RetryIntvl: retryInterval,
Name: loggerTargetNamePrefix + k,
}
}
@ -519,6 +558,19 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
if batchSize <= 0 {
return cfg, errInvalidBatchSize
}
maxRetryCfgVal := getCfgVal(EnvAuditWebhookMaxRetry, k, kv.Get(MaxRetry))
maxRetry, err := strconv.Atoi(maxRetryCfgVal)
if err != nil {
return cfg, err
}
if maxRetry < 0 {
return cfg, fmt.Errorf("invalid %s max_retry", maxRetryCfgVal)
}
retryIntervalCfgVal := getCfgVal(EnvAuditWebhookRetryInterval, k, kv.Get(RetryInterval))
retryInterval, err := time.ParseDuration(retryIntervalCfgVal)
if err != nil {
return cfg, err
}
cfg.AuditWebhook[k] = http.Config{
Enabled: true,
Endpoint: url,
@ -528,6 +580,8 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
BatchSize: batchSize,
QueueSize: queueSize,
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)),
MaxRetry: maxRetry,
RetryIntvl: retryInterval,
Name: auditTargetNamePrefix + k,
}
}

View File

@ -131,6 +131,18 @@ var (
Optional: true,
Type: "string",
},
config.HelpKV{
Key: MaxRetry,
Description: `maximum retry count before we start dropping upto batch_size events`,
Optional: true,
Type: "number",
},
config.HelpKV{
Key: RetryInterval,
Description: `maximum retry sleeps between each retries`,
Optional: true,
Type: "duration",
},
config.HelpKV{
Key: config.Comment,
Description: config.DefaultComment,

View File

@ -79,6 +79,8 @@ type Config struct {
BatchSize int `json:"batchSize"`
QueueSize int `json:"queueSize"`
QueueDir string `json:"queueDir"`
MaxRetry int `json:"maxRetry"`
RetryIntvl time.Duration `json:"retryInterval"`
Proxy string `json:"string"`
Transport http.RoundTripper `json:"-"`
@ -227,6 +229,7 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay
if xnet.IsNetworkOrHostDown(err, false) {
h.status.Store(statusOffline)
}
h.failedMessages.Add(int64(payloadCount))
} else {
h.status.Store(statusOnline)
}
@ -257,7 +260,6 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay
resp, err := h.client.Do(req)
if err != nil {
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err)
}
@ -268,10 +270,8 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay
// accepted HTTP status codes.
return nil
} else if resp.StatusCode == http.StatusForbidden {
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status)
}
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status)
}
@ -326,9 +326,6 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
}
}()
var entry interface{}
var ok bool
var err error
lastBatchProcess := time.Now()
buf := bytebufferpool.Get()
@ -343,61 +340,76 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
globalBuffer := logChBuffers[name]
logChLock.Unlock()
newTicker := time.NewTicker(time.Second)
isTick := false
var count int
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var count int
for {
isTick = false
select {
case _ = <-newTicker.C:
isTick = true
case entry, _ = <-globalBuffer:
case entry, ok = <-h.logCh:
if !ok {
var (
ok bool
entry any
)
if count < h.batchSize {
tickered := false
select {
case _ = <-ticker.C:
tickered = true
case entry, _ = <-globalBuffer:
case entry, ok = <-h.logCh:
if !ok {
return
}
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
if !isTick {
h.totalMessages.Add(1)
if !isDirQueue {
if err := enc.Encode(&entry); err != nil {
h.config.LogOnceIf(
ctx,
fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry),
h.Name(),
)
h.failedMessages.Add(1)
continue
if !tickered {
h.totalMessages.Add(1)
if !isDirQueue {
if err := enc.Encode(&entry); err != nil {
h.config.LogOnceIf(
ctx,
fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry),
h.Name(),
)
h.failedMessages.Add(1)
continue
}
} else {
entries = append(entries, entry)
}
count++
} else {
entries = append(entries, entry)
count++
}
}
if count != h.batchSize {
if len(h.logCh) > 0 || len(globalBuffer) > 0 || count == 0 {
// there is something in the log queue
// process it first, even if we tickered
// first, or we have not received any events
// yet, still wait on it.
continue
}
if h.batchSize > 1 {
// If we are doing batching, we should wait
// at least one second before sending.
// Even if there is nothing in the queue.
if time.Since(lastBatchProcess).Seconds() < 1 {
continue
}
// If we are doing batching, we should wait
// at least for a second, before sending.
// Even if there is nothing in the queue.
if h.batchSize > 1 && time.Since(lastBatchProcess) < time.Second {
continue
}
}
// if we have reached the count send at once
// or we have crossed last second before batch was sent, send at once
lastBatchProcess = time.Now()
var retries int
retryIntvl := h.config.RetryIntvl
if retryIntvl <= 0 {
retryIntvl = 3 * time.Second
}
maxRetries := h.config.MaxRetry
retry:
// If the channel reaches above half capacity
// we spawn more workers. The workers spawned
@ -415,6 +427,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
}
}
var err error
if !isDirQueue {
err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout)
} else {
@ -422,18 +435,24 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
}
if err != nil {
h.config.LogOnceIf(
context.Background(),
fmt.Errorf("unable to send webhook log entry(s) to '%s' err '%w': %d", name, err, count),
name,
)
if errors.Is(err, context.Canceled) {
return
}
time.Sleep(3 * time.Second)
goto retry
h.config.LogOnceIf(
context.Background(),
fmt.Errorf("unable to send audit/log entry(s) to '%s' err '%w': %d", name, err, count),
name,
)
time.Sleep(retryIntvl)
if maxRetries == 0 {
goto retry
}
retries++
if retries <= maxRetries {
goto retry
}
}
entries = make([]interface{}, 0)