From 1472875670b5d5244402bce2bf0b16c498abf8b5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 21 Sep 2023 11:24:56 -0700 Subject: [PATCH] fix: failed messages counting in audit_http metrics (#18075) all retries must not be counted as failed messages, a failed message is a single counter not for all retries, this PR fixes this. Also we do not need to retry 10-times, instead we should retry at max 3 times with some jitter to deliver the messages. --- internal/logger/target/http/http.go | 47 ++++++++++++++++------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 40e033d86..512cc2d1f 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "math" + "math/rand" "net/http" "net/url" "os" @@ -153,7 +154,7 @@ func (h *Target) Init(ctx context.Context) (err error) { if h.config.QueueDir != "" { return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore) } - return h.initLogChannel(ctx) + return h.init(ctx) } func (h *Target) initQueueStore(ctx context.Context) (err error) { @@ -170,7 +171,7 @@ func (h *Target) initQueueStore(ctx context.Context) (err error) { return } -func (h *Target) initLogChannel(ctx context.Context) (err error) { +func (h *Target) init(ctx context.Context) (err error) { switch atomic.LoadInt32(&h.status) { case statusOnline: return nil @@ -182,8 +183,10 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) { // Start a goroutine that will continue to check if we can reach h.revive.Do(func() { go func() { - t := time.NewTicker(time.Second) + // Avoid stamping herd, add jitter. + t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) defer t.Stop() + for range t.C { if atomic.LoadInt32(&h.status) != statusOffline { return @@ -261,27 +264,29 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) { return } + const maxTries = 3 tries := 0 - for { - if tries > 0 { - if tries >= 10 || atomic.LoadInt32(&h.status) == statusClosed { - // Don't retry when closing... - return - } - // sleep = (tries+2) ^ 2 milliseconds. - sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond - if sleep > time.Second { - sleep = time.Second - } - time.Sleep(sleep) - } - tries++ - if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil { - h.config.LogOnce(ctx, err, h.Endpoint()) - atomic.AddInt64(&h.failedMessages, 1) - } else { + for tries < maxTries { + if atomic.LoadInt32(&h.status) == statusClosed { + // Don't retry when closing... return } + // sleep = (tries+2) ^ 2 milliseconds. + sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond + if sleep > time.Second { + sleep = time.Second + } + time.Sleep(sleep) + tries++ + err := h.send(ctx, logJSON, webhookCallTimeout) + if err == nil { + return + } + h.config.LogOnce(ctx, err, h.Endpoint()) + } + if tries == maxTries { + // Even with multiple retries, count failed messages as only one. + atomic.AddInt64(&h.failedMessages, 1) } }