diff --git a/internal/http/headers.go b/internal/http/headers.go index 9b8054e80..d8968910f 100644 --- a/internal/http/headers.go +++ b/internal/http/headers.go @@ -259,5 +259,6 @@ const ( // http headers sent to webhook targets const ( // Reports the version of MinIO server - MinIOVersion = "x-minio-version" + MinIOVersion = "x-minio-version" + WebhookEventPayloadCount = "x-minio-webhook-payload-count" ) diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index e27e1f2d8..8e2a1e2f1 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -26,6 +26,8 @@ import ( "net/url" "os" "path/filepath" + "strconv" + "strings" "sync" "sync/atomic" "time" @@ -42,7 +44,7 @@ import ( const ( // Timeout for the webhook http call - webhookCallTimeout = 3 * time.Second + webhookCallTimeout = 5 * time.Second // maxWorkers is the maximum number of concurrent http loggers maxWorkers = 16 @@ -219,17 +221,23 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) { return nil } -func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { +func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, payloadType string, timeout time.Duration) (err error) { + h.failedMessages.Add(int64(payloadCount)) + defer func() { if err != nil { - h.status.Store(statusOffline) + if xnet.IsNetworkOrHostDown(err, false) { + h.status.Store(statusOffline) + } } else { + h.failedMessages.Add(-int64(payloadCount)) h.status.Store(statusOnline) } }() ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.Endpoint(), bytes.NewReader(payload)) if err != nil { @@ -238,6 +246,7 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadType string, t if payloadType != "" { req.Header.Set(xhttp.ContentType, payloadType) } + req.Header.Set(xhttp.WebhookEventPayloadCount, strconv.Itoa(payloadCount)) req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) @@ -353,6 +362,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { return } + var count int if !isTick { h.totalMessages.Add(1) @@ -366,13 +376,15 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { h.failedMessages.Add(1) continue } + count++ + } else { + entries = append(entries, entry) + count++ } - - entries = append(entries, entry) } - if len(entries) != h.batchSize { - if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 { + if count != h.batchSize { + if len(h.logCh) > 0 || len(globalBuffer) > 0 || count == 0 { continue } @@ -406,16 +418,15 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } if !isDirQueue { - err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout) + err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout) } else { err = h.store.PutMultiple(entries) } if err != nil { - h.config.LogOnceIf( context.Background(), - fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err), + fmt.Errorf("unable to send webhook log entry(s) to '%s' err '%w': %d", name, err, count), name, ) @@ -428,7 +439,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } entries = make([]interface{}, 0) - + count = 0 if !isDirQueue { buf.Reset() } @@ -529,14 +540,16 @@ func (h *Target) SendFromStore(key store.Key) (err error) { return err } - h.failedMessages.Add(1) - defer func() { - if err == nil { - h.failedMessages.Add(-1) + count := 1 + v := strings.Split(key.Name, ":") + if len(v) == 2 { + count, err = strconv.Atoi(v[0]) + if err != nil { + return err } - }() + } - if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { + if err := h.send(context.Background(), eventData, count, h.payloadType, webhookCallTimeout); err != nil { return err } diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index 5f5e3f129..002202f49 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -20,6 +20,7 @@ package store import ( "encoding/json" "errors" + "fmt" "os" "path/filepath" "sort" @@ -107,17 +108,18 @@ func (store *QueueStore[_]) Delete() error { // PutMultiple - puts an item to the store. func (store *QueueStore[I]) PutMultiple(item []I) error { - store.Lock() - defer store.Unlock() - if uint64(len(store.entries)) >= store.entryLimit { - return errLimitExceeded - } // Generate a new UUID for the key. key, err := uuid.NewRandom() if err != nil { return err } - return store.multiWrite(key.String(), item) + + store.Lock() + defer store.Unlock() + if uint64(len(store.entries)) >= store.entryLimit { + return errLimitExceeded + } + return store.multiWrite(fmt.Sprintf("%d:%s", len(item), key.String()), item) } // multiWrite - writes an item to the directory.