fix: retries and failed message counter (#20401)

This commit is contained in:
Sveinn 2024-09-08 00:13:57 +00:00 committed by GitHub
parent 9d5cdaa2e3
commit 3f39da48ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -222,15 +222,12 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) {
} }
func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, payloadType string, timeout time.Duration) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, payloadType string, timeout time.Duration) (err error) {
h.failedMessages.Add(int64(payloadCount))
defer func() { defer func() {
if err != nil { if err != nil {
if xnet.IsNetworkOrHostDown(err, false) { if xnet.IsNetworkOrHostDown(err, false) {
h.status.Store(statusOffline) h.status.Store(statusOffline)
} }
} else { } else {
h.failedMessages.Add(-int64(payloadCount))
h.status.Store(statusOnline) h.status.Store(statusOnline)
} }
}() }()
@ -260,6 +257,7 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay
resp, err := h.client.Do(req) resp, err := h.client.Do(req)
if err != nil { if err != nil {
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err) return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err)
} }
@ -270,8 +268,10 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadCount int, pay
// accepted HTTP status codes. // accepted HTTP status codes.
return nil return nil
} else if resp.StatusCode == http.StatusForbidden { } else if resp.StatusCode == http.StatusForbidden {
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status) return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status)
} }
h.failedMessages.Add(int64(payloadCount))
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status) return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status)
} }
@ -345,6 +345,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
newTicker := time.NewTicker(time.Second) newTicker := time.NewTicker(time.Second)
isTick := false isTick := false
var count int
for { for {
isTick = false isTick = false
@ -360,7 +361,6 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
return return
} }
var count int
if !isTick { if !isTick {
h.totalMessages.Add(1) h.totalMessages.Add(1)