add batchSize support for webhook endpoints (#19214)

configure batch size to send audit/logger events
in batches instead of sending one event per connection.

this is mainly to optimize the number of requests
we make to webhook endpoint.
This commit is contained in:
Harshavardhana 2024-03-07 12:17:46 -08:00 committed by GitHub
parent 68dd74c5ab
commit 233cc3905a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 105 additions and 19 deletions

View File

@ -43,6 +43,7 @@ const (
AuthToken = "auth_token" AuthToken = "auth_token"
ClientCert = "client_cert" ClientCert = "client_cert"
ClientKey = "client_key" ClientKey = "client_key"
BatchSize = "batch_size"
QueueSize = "queue_size" QueueSize = "queue_size"
QueueDir = "queue_dir" QueueDir = "queue_dir"
Proxy = "proxy" Proxy = "proxy"
@ -68,6 +69,7 @@ const (
EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT"
EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY"
EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY"
EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE"
EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE"
EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR"
@ -76,6 +78,7 @@ const (
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT"
EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY"
EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE"
EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE"
EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR"
@ -99,7 +102,10 @@ const (
auditTargetNamePrefix = "audit-" auditTargetNamePrefix = "audit-"
) )
var errInvalidQueueSize = errors.New("invalid queue_size value") var (
errInvalidQueueSize = errors.New("invalid queue_size value")
errInvalidBatchSize = errors.New("invalid batch_size value")
)
// Default KVS for loggerHTTP and loggerAuditHTTP // Default KVS for loggerHTTP and loggerAuditHTTP
var ( var (
@ -128,6 +134,10 @@ var (
Key: Proxy, Key: Proxy,
Value: "", Value: "",
}, },
config.KV{
Key: BatchSize,
Value: "1",
},
config.KV{ config.KV{
Key: QueueSize, Key: QueueSize,
Value: "100000", Value: "100000",
@ -159,6 +169,10 @@ var (
Key: ClientKey, Key: ClientKey,
Value: "", Value: "",
}, },
config.KV{
Key: BatchSize,
Value: "1",
},
config.KV{ config.KV{
Key: QueueSize, Key: QueueSize,
Value: "100000", Value: "100000",
@ -435,6 +449,14 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
if queueSize <= 0 { if queueSize <= 0 {
return cfg, errInvalidQueueSize return cfg, errInvalidQueueSize
} }
batchSizeCfgVal := getCfgVal(EnvLoggerWebhookBatchSize, k, kv.Get(BatchSize))
batchSize, err := strconv.Atoi(batchSizeCfgVal)
if err != nil {
return cfg, err
}
if batchSize <= 0 {
return cfg, errInvalidBatchSize
}
cfg.HTTP[k] = http.Config{ cfg.HTTP[k] = http.Config{
Enabled: true, Enabled: true,
Endpoint: url, Endpoint: url,
@ -442,6 +464,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
ClientCert: clientCert, ClientCert: clientCert,
ClientKey: clientKey, ClientKey: clientKey,
Proxy: getCfgVal(EnvLoggerWebhookProxy, k, kv.Get(Proxy)), Proxy: getCfgVal(EnvLoggerWebhookProxy, k, kv.Get(Proxy)),
BatchSize: batchSize,
QueueSize: queueSize, QueueSize: queueSize,
QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)), QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)),
Name: loggerTargetNamePrefix + k, Name: loggerTargetNamePrefix + k,
@ -488,12 +511,21 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
if queueSize <= 0 { if queueSize <= 0 {
return cfg, errInvalidQueueSize return cfg, errInvalidQueueSize
} }
batchSizeCfgVal := getCfgVal(EnvAuditWebhookBatchSize, k, kv.Get(BatchSize))
batchSize, err := strconv.Atoi(batchSizeCfgVal)
if err != nil {
return cfg, err
}
if batchSize <= 0 {
return cfg, errInvalidBatchSize
}
cfg.AuditWebhook[k] = http.Config{ cfg.AuditWebhook[k] = http.Config{
Enabled: true, Enabled: true,
Endpoint: url, Endpoint: url,
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)), AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)),
ClientCert: clientCert, ClientCert: clientCert,
ClientKey: clientKey, ClientKey: clientKey,
BatchSize: batchSize,
QueueSize: queueSize, QueueSize: queueSize,
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)), QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)),
Name: auditTargetNamePrefix + k, Name: auditTargetNamePrefix + k,

View File

@ -52,6 +52,12 @@ var (
Type: "string", Type: "string",
Sensitive: true, Sensitive: true,
}, },
config.HelpKV{
Key: BatchSize,
Description: "Number of events per HTTP send to webhook target",
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: QueueSize, Key: QueueSize,
Description: "configure channel queue size for webhook targets", Description: "configure channel queue size for webhook targets",
@ -107,6 +113,12 @@ var (
Type: "string", Type: "string",
Sensitive: true, Sensitive: true,
}, },
config.HelpKV{
Key: BatchSize,
Description: "Number of events per HTTP send to webhook target",
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: QueueSize, Key: QueueSize,
Description: "configure channel queue size for webhook targets", Description: "configure channel queue size for webhook targets",

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2023 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -33,12 +33,14 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
jsoniter "github.com/json-iterator/go"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once" "github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store" "github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/v2/net" xnet "github.com/minio/pkg/v2/net"
"github.com/valyala/bytebufferpool"
) )
const ( const (
@ -48,6 +50,9 @@ const (
// maxWorkers is the maximum number of concurrent http loggers // maxWorkers is the maximum number of concurrent http loggers
maxWorkers = 16 maxWorkers = 16
// maxWorkers is the maximum number of concurrent batch http loggers
maxWorkersWithBatchEvents = 4
// the suffix for the configured queue dir where the logs will be persisted. // the suffix for the configured queue dir where the logs will be persisted.
httpLoggerExtension = ".http.log" httpLoggerExtension = ".http.log"
) )
@ -67,6 +72,7 @@ type Config struct {
AuthToken string `json:"authToken"` AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"` ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"` ClientKey string `json:"clientKey"`
BatchSize int `json:"batchSize"`
QueueSize int `json:"queueSize"` QueueSize int `json:"queueSize"`
QueueDir string `json:"queueDir"` QueueDir string `json:"queueDir"`
Proxy string `json:"string"` Proxy string `json:"string"`
@ -99,6 +105,13 @@ type Target struct {
logCh chan interface{} logCh chan interface{}
logChMu sync.RWMutex logChMu sync.RWMutex
// Number of events per HTTP send to webhook target
// this is ideally useful only if your endpoint can
// support reading multiple events on a stream for example
// like : Splunk HTTP Event collector, if you are unsure
// set this to '1'.
batchSize int
// If the first init fails, this starts a goroutine that // If the first init fails, this starts a goroutine that
// will attempt to establish the connection. // will attempt to establish the connection.
revive sync.Once revive sync.Once
@ -135,7 +148,7 @@ func (h *Target) IsOnline(ctx context.Context) bool {
// ping returns true if the target is reachable. // ping returns true if the target is reachable.
func (h *Target) ping(ctx context.Context) bool { func (h *Target) ping(ctx context.Context) bool {
if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil { if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil {
return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err)
} }
// We are online. // We are online.
@ -213,7 +226,7 @@ func (h *Target) init(ctx context.Context) (err error) {
return nil return nil
} }
func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
atomic.StoreInt32(&h.status, statusOffline) atomic.StoreInt32(&h.status, statusOffline)
@ -229,7 +242,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
if err != nil { if err != nil {
return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err) return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err)
} }
req.Header.Set(xhttp.ContentType, "application/json") if payloadType != "" {
req.Header.Set(xhttp.ContentType, payloadType)
}
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
@ -260,13 +275,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
} }
} }
func (h *Target) logEntry(ctx context.Context, entry interface{}) { func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) {
logJSON, err := json.Marshal(&entry)
if err != nil {
atomic.AddInt64(&h.failedMessages, 1)
return
}
const maxTries = 3 const maxTries = 3
tries := 0 tries := 0
for tries < maxTries { for tries < maxTries {
@ -281,7 +290,7 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) {
} }
time.Sleep(sleep) time.Sleep(sleep)
tries++ tries++
err := h.send(ctx, logJSON, webhookCallTimeout) err := h.send(ctx, payload, payloadType, webhookCallTimeout)
if err == nil { if err == nil {
return return
} }
@ -309,9 +318,36 @@ func (h *Target) startHTTPLogger(ctx context.Context) {
return return
} }
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
json := jsoniter.ConfigCompatibleWithStandardLibrary
enc := json.NewEncoder(buf)
batchSize := h.batchSize
if batchSize <= 0 {
batchSize = 1
}
payloadType := "application/json"
if batchSize > 1 {
payloadType = ""
}
var nevents int
// Send messages until channel is closed. // Send messages until channel is closed.
for entry := range logCh { for entry := range logCh {
h.logEntry(ctx, entry) atomic.AddInt64(&h.totalMessages, 1)
nevents++
if err := enc.Encode(&entry); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
nevents--
continue
}
if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 {
h.logEntry(ctx, buf.Bytes(), payloadType)
buf.Reset()
nevents = 0
}
} }
} }
@ -319,9 +355,10 @@ func (h *Target) startHTTPLogger(ctx context.Context) {
// sends log over http to the specified endpoint // sends log over http to the specified endpoint
func New(config Config) *Target { func New(config Config) *Target {
h := &Target{ h := &Target{
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
config: config, config: config,
status: statusOffline, status: statusOffline,
batchSize: config.BatchSize,
} }
// If proxy available, set the same // If proxy available, set the same
@ -353,7 +390,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
return return
} }
if err := h.send(context.Background(), logJSON, webhookCallTimeout); err != nil { if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil {
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
if xnet.IsNetworkOrHostDown(err, true) { if xnet.IsNetworkOrHostDown(err, true) {
return store.ErrNotConnected return store.ErrNotConnected
@ -382,6 +419,11 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
return nil return nil
} }
mworkers := maxWorkers
if h.batchSize > 100 {
mworkers = maxWorkersWithBatchEvents
}
retry: retry:
select { select {
case h.logCh <- entry: case h.logCh <- entry:
@ -394,7 +436,7 @@ retry:
return nil return nil
default: default:
nWorkers := atomic.LoadInt64(&h.workers) nWorkers := atomic.LoadInt64(&h.workers)
if nWorkers < maxWorkers { if nWorkers < int64(mworkers) {
// Only have one try to start at the same time. // Only have one try to start at the same time.
h.workerStartMu.Lock() h.workerStartMu.Lock()
if time.Since(h.lastStarted) > time.Second { if time.Since(h.lastStarted) > time.Second {