Use expontential backoff algo for internode reconnections (#17052)

This commit is contained in:
Anis Eleuch 2023-05-02 20:35:52 +01:00 committed by GitHub
parent 1704abaf6b
commit 4640b13c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -75,9 +75,9 @@ type Client struct {
// is online or offline.
HealthCheckFn func() bool
// HealthCheckInterval will be the duration between re-connection attempts
// when a call has failed with a network error.
HealthCheckInterval time.Duration
// HealthCheckRetryUnit will be used to calculate the exponential
// backoff when trying to reconnect to an offline node
HealthCheckReconnectUnit time.Duration
// HealthCheckTimeout determines timeout for each call.
HealthCheckTimeout time.Duration
@ -309,14 +309,14 @@ func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig.
return &Client{
httpClient: &http.Client{Transport: tr},
url: url,
newAuthToken: newAuthToken,
connected: online,
lastConn: time.Now().UnixNano(),
MaxErrResponseSize: 4096,
HealthCheckInterval: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
httpClient: &http.Client{Transport: tr},
url: url,
newAuthToken: newAuthToken,
connected: online,
lastConn: time.Now().UnixNano(),
MaxErrResponseSize: 4096,
HealthCheckReconnectUnit: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
}
}
@ -337,6 +337,28 @@ func (c *Client) LastError() error {
return c.lastErr
}
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) time.Duration {
if unit > time.Hour {
// Protect against integer overflow
panic("unit cannot exceed one hour")
}
return func(attempt uint) time.Duration {
if attempt > 16 {
// Protect against integer overflow
attempt = 16
}
// sleep = random_between(unit, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<attempt)
if sleep > cap {
sleep = cap
}
sleep -= time.Duration(r.Float64() * float64(sleep-unit))
return sleep
}
}
// MarkOffline - will mark a client as being offline and spawns
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
// returns true if the node changed state from online to offline
@ -347,8 +369,14 @@ func (c *Client) MarkOffline(err error) bool {
// Start goroutine that will attempt to reconnect.
// If server is already trying to reconnect this will have no effect.
if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
go func() {
backOff := exponentialBackoffWait(
rand.New(rand.NewSource(time.Now().UnixNano())),
200*time.Millisecond,
30*time.Second,
)
attempt := uint(0)
for {
if atomic.LoadInt32(&c.connected) == closed {
return
@ -362,7 +390,8 @@ func (c *Client) MarkOffline(err error) bool {
}
return
}
time.Sleep(time.Duration(r.Float64() * float64(c.HealthCheckInterval)))
attempt++
time.Sleep(backOff(attempt))
}
}()
return true