add more details on the payload sent to webhook audit (#20335)

This commit is contained in:
Harshavardhana 2024-08-28 08:31:56 -07:00 committed by GitHub
parent fb2360ff88
commit c65e67c357
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 24 deletions

View File

@ -259,5 +259,6 @@ const (
// http headers sent to webhook targets // http headers sent to webhook targets
const ( const (
// Reports the version of MinIO server // Reports the version of MinIO server
MinIOVersion = "x-minio-version" MinIOVersion = "x-minio-version"
WebhookEventPayloadCount = "x-minio-webhook-payload-count"
) )

View File

@ -26,6 +26,8 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -42,7 +44,7 @@ import (
const ( const (
// Timeout for the webhook http call // Timeout for the webhook http call
webhookCallTimeout = 3 * time.Second webhookCallTimeout = 5 * time.Second
// maxWorkers is the maximum number of concurrent http loggers // maxWorkers is the maximum number of concurrent http loggers
maxWorkers = 16 maxWorkers = 16
@ -219,17 +221,23 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) {
return nil return nil
} }
func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, payloadType string, timeout time.Duration) (err error) {
h.failedMessages.Add(int64(payloadCount))
defer func() { defer func() {
if err != nil { if err != nil {
h.status.Store(statusOffline) if xnet.IsNetworkOrHostDown(err, false) {
h.status.Store(statusOffline)
}
} else { } else {
h.failedMessages.Add(-int64(payloadCount))
h.status.Store(statusOnline) h.status.Store(statusOnline)
} }
}() }()
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.Endpoint(), bytes.NewReader(payload)) h.Endpoint(), bytes.NewReader(payload))
if err != nil { if err != nil {
@ -238,6 +246,7 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadType string, t
if payloadType != "" { if payloadType != "" {
req.Header.Set(xhttp.ContentType, payloadType) req.Header.Set(xhttp.ContentType, payloadType)
} }
req.Header.Set(xhttp.WebhookEventPayloadCount, strconv.Itoa(payloadCount))
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
@ -353,6 +362,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
return return
} }
var count int
if !isTick { if !isTick {
h.totalMessages.Add(1) h.totalMessages.Add(1)
@ -366,13 +376,15 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
h.failedMessages.Add(1) h.failedMessages.Add(1)
continue continue
} }
count++
} else {
entries = append(entries, entry)
count++
} }
entries = append(entries, entry)
} }
if len(entries) != h.batchSize { if count != h.batchSize {
if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 { if len(h.logCh) > 0 || len(globalBuffer) > 0 || count == 0 {
continue continue
} }
@ -406,16 +418,15 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
} }
if !isDirQueue { if !isDirQueue {
err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout) err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout)
} else { } else {
err = h.store.PutMultiple(entries) err = h.store.PutMultiple(entries)
} }
if err != nil { if err != nil {
h.config.LogOnceIf( h.config.LogOnceIf(
context.Background(), context.Background(),
fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err), fmt.Errorf("unable to send webhook log entry(s) to '%s' err '%w': %d", name, err, count),
name, name,
) )
@ -428,7 +439,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
} }
entries = make([]interface{}, 0) entries = make([]interface{}, 0)
count = 0
if !isDirQueue { if !isDirQueue {
buf.Reset() buf.Reset()
} }
@ -529,14 +540,16 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
return err return err
} }
h.failedMessages.Add(1) count := 1
defer func() { v := strings.Split(key.Name, ":")
if err == nil { if len(v) == 2 {
h.failedMessages.Add(-1) count, err = strconv.Atoi(v[0])
if err != nil {
return err
} }
}() }
if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { if err := h.send(context.Background(), eventData, count, h.payloadType, webhookCallTimeout); err != nil {
return err return err
} }

View File

@ -20,6 +20,7 @@ package store
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -107,17 +108,18 @@ func (store *QueueStore[_]) Delete() error {
// PutMultiple - puts an item to the store. // PutMultiple - puts an item to the store.
func (store *QueueStore[I]) PutMultiple(item []I) error { func (store *QueueStore[I]) PutMultiple(item []I) error {
store.Lock()
defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded
}
// Generate a new UUID for the key. // Generate a new UUID for the key.
key, err := uuid.NewRandom() key, err := uuid.NewRandom()
if err != nil { if err != nil {
return err return err
} }
return store.multiWrite(key.String(), item)
store.Lock()
defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded
}
return store.multiWrite(fmt.Sprintf("%d:%s", len(item), key.String()), item)
} }
// multiWrite - writes an item to the directory. // multiWrite - writes an item to the directory.