mirror of
synced 2025-03-10 11:40:08 -04:00
configure batch size to send audit/logger events in batches instead of sending one event per connection. this is mainly to optimize the number of requests we make to webhook endpoint.
487 lines
13 KiB
487 lines
13 KiB
// Copyright (c) 2015-2024 MinIO, Inc.
// This file is part of MinIO Object Storage stack
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
jsoniter "github.com/json-iterator/go"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
xnet "github.com/minio/pkg/v2/net"
const (
// Timeout for the webhook http call
webhookCallTimeout = 3 * time.Second
// maxWorkers is the maximum number of concurrent http loggers
maxWorkers = 16
// maxWorkers is the maximum number of concurrent batch http loggers
maxWorkersWithBatchEvents = 4
// the suffix for the configured queue dir where the logs will be persisted.
httpLoggerExtension = ".http.log"
const (
statusOffline = iota
// Config http logger target
type Config struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
UserAgent string `json:"userAgent"`
Endpoint *xnet.URL `json:"endpoint"`
AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
BatchSize int `json:"batchSize"`
QueueSize int `json:"queueSize"`
QueueDir string `json:"queueDir"`
Proxy string `json:"string"`
Transport http.RoundTripper `json:"-"`
// Custom logger
LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
// Target implements logger.Target and sends the json
// format of a log entry to the configured http endpoint.
// An internal buffer of logs is maintained but when the
// buffer is full, new logs are just ignored and an error
// is returned to the caller.
type Target struct {
totalMessages int64
failedMessages int64
status int32
// Worker control
workers int64
workerStartMu sync.Mutex
lastStarted time.Time
wg sync.WaitGroup
// Channel of log entries.
// Reading logCh must hold read lock on logChMu (to avoid read race)
// Sending a value on logCh must hold read lock on logChMu (to avoid closing)
logCh chan interface{}
logChMu sync.RWMutex
// Number of events per HTTP send to webhook target
// this is ideally useful only if your endpoint can
// support reading multiple events on a stream for example
// like : Splunk HTTP Event collector, if you are unsure
// set this to '1'.
batchSize int
// If the first init fails, this starts a goroutine that
// will attempt to establish the connection.
revive sync.Once
// store to persist and replay the logs to the target
// to avoid missing events when the target is down.
store store.Store[interface{}]
storeCtxCancel context.CancelFunc
initQueueStoreOnce once.Init
config Config
client *http.Client
// Name returns the name of the target
func (h *Target) Name() string {
return "minio-http-" + h.config.Name
// Endpoint returns the backend endpoint
func (h *Target) Endpoint() string {
return h.config.Endpoint.String()
func (h *Target) String() string {
return h.config.Name
// IsOnline returns true if the target is reachable using a cached value
func (h *Target) IsOnline(ctx context.Context) bool {
return atomic.LoadInt32(&h.status) == statusOnline
// ping returns true if the target is reachable.
func (h *Target) ping(ctx context.Context) bool {
if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil {
return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err)
// We are online.
h.lastStarted = time.Now()
go h.startHTTPLogger(ctx)
return true
// Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats {
queueLength := len(h.logCh)
stats := types.TargetStats{
TotalMessages: atomic.LoadInt64(&h.totalMessages),
FailedMessages: atomic.LoadInt64(&h.failedMessages),
QueueLength: queueLength,
return stats
// Init validate and initialize the http target
func (h *Target) Init(ctx context.Context) (err error) {
if h.config.QueueDir != "" {
return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore)
return h.init(ctx)
func (h *Target) initQueueStore(ctx context.Context) (err error) {
var queueStore store.Store[interface{}]
queueDir := filepath.Join(h.config.QueueDir, h.Name())
queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.config.QueueSize), httpLoggerExtension)
if err = queueStore.Open(); err != nil {
return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
ctx, cancel := context.WithCancel(ctx)
h.store = queueStore
h.storeCtxCancel = cancel
store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnce)
func (h *Target) init(ctx context.Context) (err error) {
switch atomic.LoadInt32(&h.status) {
case statusOnline:
return nil
case statusClosed:
return errors.New("target is closed")
if !h.ping(ctx) {
// Start a goroutine that will continue to check if we can reach
h.revive.Do(func() {
go func() {
// Avoid stamping herd, add jitter.
t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second))))
defer t.Stop()
for range t.C {
if atomic.LoadInt32(&h.status) != statusOffline {
if h.ping(ctx) {
return err
return nil
func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) {
defer func() {
if err != nil {
atomic.StoreInt32(&h.status, statusOffline)
} else {
atomic.StoreInt32(&h.status, statusOnline)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.Endpoint(), bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err)
if payloadType != "" {
req.Header.Set(xhttp.ContentType, payloadType)
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
resp, err := h.client.Do(req)
if err != nil {
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err)
// Drain any response.
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent:
// accepted HTTP status codes.
return nil
case http.StatusForbidden:
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status)
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status)
func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) {
const maxTries = 3
tries := 0
for tries < maxTries {
if atomic.LoadInt32(&h.status) == statusClosed {
// Don't retry when closing...
// sleep = (tries+2) ^ 2 milliseconds.
sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond
if sleep > time.Second {
sleep = time.Second
err := h.send(ctx, payload, payloadType, webhookCallTimeout)
if err == nil {
h.config.LogOnce(ctx, err, h.Endpoint())
if tries == maxTries {
// Even with multiple retries, count failed messages as only one.
atomic.AddInt64(&h.failedMessages, 1)
func (h *Target) startHTTPLogger(ctx context.Context) {
atomic.AddInt64(&h.workers, 1)
defer atomic.AddInt64(&h.workers, -1)
logCh := h.logCh
if logCh != nil {
// We are not allowed to add when logCh is nil
defer h.wg.Done()
if logCh == nil {
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
json := jsoniter.ConfigCompatibleWithStandardLibrary
enc := json.NewEncoder(buf)
batchSize := h.batchSize
if batchSize <= 0 {
batchSize = 1
payloadType := "application/json"
if batchSize > 1 {
payloadType = ""
var nevents int
// Send messages until channel is closed.
for entry := range logCh {
atomic.AddInt64(&h.totalMessages, 1)
if err := enc.Encode(&entry); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 {
h.logEntry(ctx, buf.Bytes(), payloadType)
nevents = 0
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) *Target {
h := &Target{
logCh: make(chan interface{}, config.QueueSize),
config: config,
status: statusOffline,
batchSize: config.BatchSize,
// If proxy available, set the same
if h.config.Proxy != "" {
proxyURL, _ := url.Parse(h.config.Proxy)
transport := h.config.Transport
ctransport := transport.(*http.Transport).Clone()
ctransport.Proxy = http.ProxyURL(proxyURL)
h.config.Transport = ctransport
h.client = &http.Client{Transport: h.config.Transport}
return h
// SendFromStore - reads the log from store and sends it to webhook.
func (h *Target) SendFromStore(key store.Key) (err error) {
var eventData interface{}
eventData, err = h.store.Get(key.Name)
if err != nil {
if os.IsNotExist(err) {
return nil
return err
atomic.AddInt64(&h.totalMessages, 1)
logJSON, err := json.Marshal(&eventData)
if err != nil {
atomic.AddInt64(&h.failedMessages, 1)
if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
if xnet.IsNetworkOrHostDown(err, true) {
return store.ErrNotConnected
return err
// Delete the event from store.
return h.store.Del(key.Name)
// Send the log message 'entry' to the http target.
// Messages are queued in the disk if the store is enabled
// If Cancel has been called the message is ignored.
func (h *Target) Send(ctx context.Context, entry interface{}) error {
if atomic.LoadInt32(&h.status) == statusClosed {
return nil
if h.store != nil {
// save the entry to the queue store which will be replayed to the target.
return h.store.Put(entry)
defer h.logChMu.RUnlock()
if h.logCh == nil {
// We are closing...
return nil
mworkers := maxWorkers
if h.batchSize > 100 {
mworkers = maxWorkersWithBatchEvents
select {
case h.logCh <- entry:
atomic.AddInt64(&h.totalMessages, 1)
case <-ctx.Done():
// return error only for context timedout.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return ctx.Err()
return nil
nWorkers := atomic.LoadInt64(&h.workers)
if nWorkers < int64(mworkers) {
// Only have one try to start at the same time.
if time.Since(h.lastStarted) > time.Second {
h.lastStarted = time.Now()
go h.startHTTPLogger(ctx)
goto retry
atomic.AddInt64(&h.totalMessages, 1)
atomic.AddInt64(&h.failedMessages, 1)
return errors.New("log buffer full")
return nil
// Cancel - cancels the target.
// All queued messages are flushed and the function returns afterwards.
// All messages sent to the target after this function has been called will be dropped.
func (h *Target) Cancel() {
atomic.StoreInt32(&h.status, statusClosed)
// If queuestore is configured, cancel it's context to
// stop the replay go-routine.
if h.store != nil {
// Set logch to nil and close it.
// This will block all Send operations,
// and finish the existing ones.
// All future ones will be discarded.
h.logCh = nil
// Wait for messages to be sent...
// Type - returns type of the target
func (h *Target) Type() types.TargetType {
return types.TargetHTTP