fix: make sure to use new restClient for healthcheck (#10026)

Without instantiating a new rest client we can
have a recursive error which can lead to
healthcheck returning always offline, this can
prematurely take the servers offline.
This commit is contained in:
Harshavardhana 2020-07-11 22:19:38 -07:00 committed by GitHub
parent c2fdf73491
commit 3b9fbf80ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 43 additions and 59 deletions

View File

@ -205,11 +205,7 @@ func newBootstrapRESTClients(endpointZones EndpointZones) []*bootstrapRESTClient
// Only proceed for remote endpoints. // Only proceed for remote endpoints.
if !endpoint.IsLocal { if !endpoint.IsLocal {
clnt, err := newBootstrapRESTClient(endpoint) clnts = append(clnts, newBootstrapRESTClient(endpoint))
if err != nil {
continue
}
clnts = append(clnts, clnt)
} }
} }
} }
@ -217,7 +213,7 @@ func newBootstrapRESTClients(endpointZones EndpointZones) []*bootstrapRESTClient
} }
// Returns a new bootstrap client. // Returns a new bootstrap client.
func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) { func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient {
serverURL := &url.URL{ serverURL := &url.URL{
Scheme: endpoint.Scheme, Scheme: endpoint.Scheme,
Host: endpoint.Host, Host: endpoint.Host,
@ -233,19 +229,17 @@ func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) {
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return nil, err
}
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
respBody, err := restClient.CallWithContext(ctx, bootstrapRESTMethodHealth, nil, nil, -1) // Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).CallWithContext(ctx, bootstrapRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
cancel() cancel()
var ne *rest.NetworkError var ne *rest.NetworkError
return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne)
} }
return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}, nil return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}
} }

View File

@ -462,7 +462,7 @@ func azureToObjectError(err error, params ...string) error {
func azureCodesToObjectError(err error, serviceCode string, statusCode int, bucket string, object string) error { func azureCodesToObjectError(err error, serviceCode string, statusCode int, bucket string, object string) error {
switch serviceCode { switch serviceCode {
case "ContainerNotFound": case "ContainerNotFound", "ContainerBeingDeleted":
err = minio.BucketNotFound{Bucket: bucket} err = minio.BucketNotFound{Bucket: bucket}
case "ContainerAlreadyExists": case "ContainerAlreadyExists":
err = minio.BucketExists{Bucket: bucket} err = minio.BucketExists{Bucket: bucket}

View File

@ -26,7 +26,6 @@ import (
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest" "github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/dsync"
) )
@ -155,13 +154,12 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
logger.Fatal(err, "Unable to create lock rest client")
}
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
respBody, err := restClient.CallWithContext(ctx, lockRESTMethodHealth, nil, nil, -1) // Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).CallWithContext(ctx, lockRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
cancel() cancel()
var ne *rest.NetworkError var ne *rest.NetworkError

View File

@ -846,19 +846,14 @@ func newPeerRestClients(endpoints EndpointZones) []*peerRESTClient {
peerHosts := getRemoteHosts(endpoints) peerHosts := getRemoteHosts(endpoints)
restClients := make([]*peerRESTClient, len(peerHosts)) restClients := make([]*peerRESTClient, len(peerHosts))
for i, host := range peerHosts { for i, host := range peerHosts {
client, err := newPeerRESTClient(host) restClients[i] = newPeerRESTClient(host)
if err != nil {
logger.LogIf(GlobalContext, err)
continue
}
restClients[i] = client
} }
return restClients return restClients
} }
// Returns a peer rest client. // Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) { func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
scheme := "http" scheme := "http"
if globalIsSSL { if globalIsSSL {
scheme = "https" scheme = "https"
@ -879,20 +874,19 @@ func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return nil, err
}
// Construct a new health function. // Construct a new health function.
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
respBody, err := restClient.CallWithContext(ctx, peerRESTMethodHealth, nil, nil, -1) // Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).CallWithContext(ctx, peerRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
cancel() cancel()
var ne *rest.NetworkError var ne *rest.NetworkError
return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne)
} }
return &peerRESTClient{host: peer, restClient: restClient}, nil return &peerRESTClient{host: peer, restClient: restClient}
} }

View File

@ -158,7 +158,7 @@ func (c *Client) Close() {
} }
// NewClient - returns new REST client. // NewClient - returns new REST client.
func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthToken func(aud string) string) (*Client, error) { func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthToken func(aud string) string) *Client {
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig. // except custom DialContext and TLSClientConfig.
tr := newCustomTransport() tr := newCustomTransport()
@ -172,7 +172,7 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT
MaxErrResponseSize: 4096, MaxErrResponseSize: 4096,
HealthCheckInterval: 200 * time.Millisecond, HealthCheckInterval: 200 * time.Millisecond,
HealthCheckTimeout: time.Second, HealthCheckTimeout: time.Second,
}, nil }
} }
// IsOnline returns whether the client is likely to be online. // IsOnline returns whether the client is likely to be online.

View File

@ -660,15 +660,13 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
logger.Fatal(err, "Unable to initialize remote REST disks")
}
restClient.HealthCheckInterval = 500 * time.Millisecond restClient.HealthCheckInterval = 500 * time.Millisecond
restClient.HealthCheckFn = func() bool { restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
respBody, err := restClient.CallWithContext(ctx, storageRESTMethodHealth, nil, nil, -1) // Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).CallWithContext(ctx, storageRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody) xhttp.DrainBody(respBody)
cancel() cancel()
return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound

View File

@ -66,6 +66,13 @@ import (
// Tests should initNSLock only once. // Tests should initNSLock only once.
func init() { func init() {
globalActiveCred = auth.Credentials{
AccessKey: auth.DefaultAccessKey,
SecretKey: auth.DefaultSecretKey,
}
globalConfigEncrypted = true
// disable ENVs which interfere with tests. // disable ENVs which interfere with tests.
for _, env := range []string{ for _, env := range []string{
crypto.EnvAutoEncryptionLegacy, crypto.EnvAutoEncryptionLegacy,
@ -463,13 +470,6 @@ func newTestConfig(bucketLocation string, obj ObjectLayer) (err error) {
return err return err
} }
globalActiveCred = auth.Credentials{
AccessKey: auth.DefaultAccessKey,
SecretKey: auth.DefaultSecretKey,
}
globalConfigEncrypted = true
// Set a default region. // Set a default region.
config.SetRegion(globalServerConfig, bucketLocation) config.SetRegion(globalServerConfig, bucketLocation)

View File

@ -19,12 +19,12 @@ package dsync
import ( import (
"context" "context"
"errors" "errors"
golog "log"
"math/rand" "math/rand"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/retry" "github.com/minio/minio/pkg/retry"
) )
@ -37,9 +37,9 @@ func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
func log(msg ...interface{}) { func log(format string, data ...interface{}) {
if dsyncLog { if dsyncLog {
golog.Println(msg...) console.Printf(format, data...)
} }
} }
@ -185,7 +185,7 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa
g := Granted{index: index} g := Granted{index: index}
if c == nil { if c == nil {
log("lock: nil locker") log("dsync: nil locker")
ch <- g ch <- g
return return
} }
@ -200,11 +200,11 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa
var err error var err error
if isReadLock { if isReadLock {
if locked, err = c.RLock(args); err != nil { if locked, err = c.RLock(args); err != nil {
log("Unable to call RLock", err) log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c)
} }
} else { } else {
if locked, err = c.Lock(args); err != nil { if locked, err = c.Lock(args); err != nil {
log("Unable to call Lock", err) log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c)
} }
} }
@ -259,10 +259,10 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa
// timeout happened, maybe one of the nodes is slow, count // timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not // number of locks to check whether we have quorum or not
if !quorumMet(locks, isReadLock, dquorum, dquorumReads) { if !quorumMet(locks, isReadLock, dquorum, dquorumReads) {
log("Quorum not met after timeout") log("Quorum not met after timeout\n")
releaseAll(ds, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, locks, isReadLock, restClnts, lockNames...)
} else { } else {
log("Quorum met after timeout") log("Quorum met after timeout\n")
} }
} }
@ -402,7 +402,7 @@ func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, n
// sendRelease sends a release message to a node that previously granted a lock // sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...string) { func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...string) {
if c == nil { if c == nil {
log("Unable to call RUnlock", errors.New("netLocker is offline")) log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline"))
return return
} }
@ -412,11 +412,11 @@ func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...s
} }
if isReadLock { if isReadLock {
if _, err := c.RUnlock(args); err != nil { if _, err := c.RUnlock(args); err != nil {
log("Unable to call RUnlock", err) log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c)
} }
} else { } else {
if _, err := c.Unlock(args); err != nil { if _, err := c.Unlock(args); err != nil {
log("Unable to call Unlock", err) log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c)
} }
} }
} }