mirror of
				https://github.com/minio/minio.git
				synced 2025-10-29 15:55:00 -04:00 
			
		
		
		
	Add a custom healthcheck function for online status (#9858)
- Add changes to ensure remote disks are not incorrectly taken online if their order has changed or are incorrect disks. - Bring changes to peer to detect disconnection with separate Health handler, to avoid a rather expensive call GetLocakDiskIDs() - Follow up on the same changes for Lockers as well
This commit is contained in:
		
							parent
							
								
									16d7b90adf
								
							
						
					
					
						commit
						7ed1077879
					
				| @ -401,7 +401,7 @@ func errorResponseHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeErrorResponseString(r.Context(), w, APIError{ | ||||
| 			Code:           "XMinioPeerVersionMismatch", | ||||
| 			Description:    desc, | ||||
| 			HTTPStatusCode: http.StatusBadRequest, | ||||
| 			HTTPStatusCode: http.StatusUpgradeRequired, | ||||
| 		}, r.URL) | ||||
| 	case strings.HasPrefix(r.URL.Path, storageRESTPrefix): | ||||
| 		desc := fmt.Sprintf("Expected 'storage' API version '%s', instead found '%s', please upgrade the servers", | ||||
| @ -409,7 +409,7 @@ func errorResponseHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeErrorResponseString(r.Context(), w, APIError{ | ||||
| 			Code:           "XMinioStorageVersionMismatch", | ||||
| 			Description:    desc, | ||||
| 			HTTPStatusCode: http.StatusBadRequest, | ||||
| 			HTTPStatusCode: http.StatusUpgradeRequired, | ||||
| 		}, r.URL) | ||||
| 	case strings.HasPrefix(r.URL.Path, lockRESTPrefix): | ||||
| 		desc := fmt.Sprintf("Expected 'lock' API version '%s', instead found '%s', please upgrade the servers", | ||||
| @ -417,7 +417,7 @@ func errorResponseHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeErrorResponseString(r.Context(), w, APIError{ | ||||
| 			Code:           "XMinioLockVersionMismatch", | ||||
| 			Description:    desc, | ||||
| 			HTTPStatusCode: http.StatusBadRequest, | ||||
| 			HTTPStatusCode: http.StatusUpgradeRequired, | ||||
| 		}, r.URL) | ||||
| 	case strings.HasPrefix(r.URL.Path, adminPathPrefix): | ||||
| 		var desc string | ||||
| @ -431,7 +431,7 @@ func errorResponseHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeErrorResponseJSON(r.Context(), w, APIError{ | ||||
| 			Code:           "XMinioAdminVersionMismatch", | ||||
| 			Description:    desc, | ||||
| 			HTTPStatusCode: http.StatusBadRequest, | ||||
| 			HTTPStatusCode: http.StatusUpgradeRequired, | ||||
| 		}, r.URL) | ||||
| 	default: | ||||
| 		desc := fmt.Sprintf("Unknown API request at %s", r.URL.Path) | ||||
|  | ||||
| @ -18,11 +18,14 @@ package cmd | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"net/url" | ||||
| 
 | ||||
| 	"github.com/minio/minio/cmd/http" | ||||
| 	xhttp "github.com/minio/minio/cmd/http" | ||||
| 	"github.com/minio/minio/cmd/logger" | ||||
| 	"github.com/minio/minio/cmd/rest" | ||||
| 	"github.com/minio/minio/pkg/dsync" | ||||
| @ -156,7 +159,14 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { | ||||
| 	if err != nil { | ||||
| 		logger.Fatal(err, "Unable to create lock rest client") | ||||
| 	} | ||||
| 	restClient.HealthCheckPath = "/" | ||||
| 	restClient.HealthCheckFn = func() bool { | ||||
| 		ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) | ||||
| 		respBody, err := restClient.CallWithContext(ctx, lockRESTMethodHealth, nil, nil, -1) | ||||
| 		xhttp.DrainBody(respBody) | ||||
| 		cancel() | ||||
| 		var ne *rest.NetworkError | ||||
| 		return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) | ||||
| 	} | ||||
| 
 | ||||
| 	return &lockRESTClient{endpoint: endpoint, restClient: restClient} | ||||
| } | ||||
|  | ||||
| @ -27,6 +27,7 @@ const ( | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	lockRESTMethodHealth  = "/health" | ||||
| 	lockRESTMethodLock    = "/lock" | ||||
| 	lockRESTMethodRLock   = "/rlock" | ||||
| 	lockRESTMethodUnlock  = "/unlock" | ||||
|  | ||||
| @ -78,6 +78,11 @@ func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) { | ||||
| 	return args, nil | ||||
| } | ||||
| 
 | ||||
| // HealthHandler returns success if request is authenticated. | ||||
| func (l *lockRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	l.IsValid(w, r) | ||||
| } | ||||
| 
 | ||||
| // LockHandler - Acquires a lock. | ||||
| func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	if !l.IsValid(w, r) { | ||||
| @ -345,6 +350,7 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { | ||||
| 			} | ||||
| 
 | ||||
| 			subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() | ||||
| 			subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)).Queries(queries...) | ||||
| 			subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)).Queries(queries...) | ||||
| 			subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...) | ||||
| 			subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...) | ||||
|  | ||||
| @ -21,6 +21,7 @@ import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"encoding/gob" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"math" | ||||
| @ -32,6 +33,7 @@ import ( | ||||
| 
 | ||||
| 	"github.com/dustin/go-humanize" | ||||
| 	"github.com/minio/minio/cmd/http" | ||||
| 	xhttp "github.com/minio/minio/cmd/http" | ||||
| 	"github.com/minio/minio/cmd/logger" | ||||
| 	"github.com/minio/minio/cmd/rest" | ||||
| 	"github.com/minio/minio/pkg/event" | ||||
| @ -881,7 +883,16 @@ func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) { | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	restClient.HealthCheckPath = peerRESTMethodGetLocalDiskIDs | ||||
| 
 | ||||
| 	// Construct a new health function. | ||||
| 	restClient.HealthCheckFn = func() bool { | ||||
| 		ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) | ||||
| 		respBody, err := restClient.CallWithContext(ctx, peerRESTMethodHealth, nil, nil, -1) | ||||
| 		xhttp.DrainBody(respBody) | ||||
| 		cancel() | ||||
| 		var ne *rest.NetworkError | ||||
| 		return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) | ||||
| 	} | ||||
| 
 | ||||
| 	return &peerRESTClient{host: peer, restClient: restClient}, nil | ||||
| } | ||||
|  | ||||
| @ -24,6 +24,7 @@ const ( | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	peerRESTMethodHealth                = "/health" | ||||
| 	peerRESTMethodServerInfo            = "/serverinfo" | ||||
| 	peerRESTMethodDriveOBDInfo          = "/driveobdinfo" | ||||
| 	peerRESTMethodNetOBDInfo            = "/netobdinfo" | ||||
|  | ||||
| @ -729,6 +729,11 @@ func getLocalDiskIDs(z *erasureZones) []string { | ||||
| 	return ids | ||||
| } | ||||
| 
 | ||||
| // HealthHandler - returns true of health | ||||
| func (s *peerRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	s.IsValid(w, r) | ||||
| } | ||||
| 
 | ||||
| // GetLocalDiskIDs - Return disk IDs of all the local disks. | ||||
| func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request) { | ||||
| 	if !s.IsValid(w, r) { | ||||
| @ -1020,6 +1025,7 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { | ||||
| func registerPeerRESTHandlers(router *mux.Router) { | ||||
| 	server := &peerRESTServer{} | ||||
| 	subrouter := router.PathPrefix(peerRESTPrefix).Subrouter() | ||||
| 	subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler)) | ||||
| 	subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler)) | ||||
| 	subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler)) | ||||
| 	subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodProcOBDInfo).HandlerFunc(httpTraceHdrs(server.ProcOBDInfoHandler)) | ||||
|  | ||||
| @ -1,5 +1,5 @@ | ||||
| /* | ||||
|  * MinIO Cloud Storage, (C) 2018 MinIO, Inc. | ||||
|  * MinIO Cloud Storage, (C) 2018-2020 MinIO, Inc. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
| @ -56,10 +56,11 @@ func (n *NetworkError) Unwrap() error { | ||||
| 
 | ||||
| // Client - http based RPC client. | ||||
| type Client struct { | ||||
| 	// HealthCheckPath is the path to test for health. | ||||
| 	// If left empty the client will not keep track of health. | ||||
| 	// Calling this can return any http status code/contents. | ||||
| 	HealthCheckPath string | ||||
| 	// HealthCheckFn is the function set to test for health. | ||||
| 	// If not set the client will not keep track of health. | ||||
| 	// Calling this returns true or false if the target | ||||
| 	// is online or offline. | ||||
| 	HealthCheckFn func() bool | ||||
| 
 | ||||
| 	// HealthCheckInterval will be the duration between re-connection attempts | ||||
| 	// when a call has failed with a network error. | ||||
| @ -116,6 +117,18 @@ func (c *Client) CallWithContext(ctx context.Context, method string, values url. | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode != http.StatusOK { | ||||
| 		// If server returns 412 pre-condition failed, it would | ||||
| 		// mean that authentication succeeded, but another | ||||
| 		// side-channel check has failed, we shall take | ||||
| 		// the client offline in such situations. | ||||
| 		// generally all implementations should simply return | ||||
| 		// 403, but in situations where there is a dependency | ||||
| 		// with the caller to take the client offline purpose | ||||
| 		// fully it should make sure to respond with '412' | ||||
| 		// instead, see cmd/storage-rest-server.go for ideas. | ||||
| 		if resp.StatusCode == http.StatusPreconditionFailed { | ||||
| 			c.MarkOffline() | ||||
| 		} | ||||
| 		defer xhttp.DrainBody(resp.Body) | ||||
| 		// Limit the ReadAll(), just in case, because of a bug, the server responds with large data. | ||||
| 		b, err := ioutil.ReadAll(io.LimitReader(resp.Body, c.MaxErrResponseSize)) | ||||
| @ -157,7 +170,6 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT | ||||
| 		connected:           online, | ||||
| 
 | ||||
| 		MaxErrResponseSize:  4096, | ||||
| 		HealthCheckPath:     "", | ||||
| 		HealthCheckInterval: 200 * time.Millisecond, | ||||
| 		HealthCheckTimeout:  time.Second, | ||||
| 	}, nil | ||||
| @ -169,11 +181,11 @@ func (c *Client) IsOnline() bool { | ||||
| } | ||||
| 
 | ||||
| // MarkOffline - will mark a client as being offline and spawns | ||||
| // a goroutine that will attempt to reconnect if a HealthCheckPath is set. | ||||
| // a goroutine that will attempt to reconnect if HealthCheckFn is set. | ||||
| func (c *Client) MarkOffline() { | ||||
| 	// Start goroutine that will attempt to reconnect. | ||||
| 	// If server is already trying to reconnect this will have no effect. | ||||
| 	if len(c.HealthCheckPath) > 0 && atomic.CompareAndSwapInt32(&c.connected, online, offline) { | ||||
| 	if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) { | ||||
| 		if c.httpIdleConnsCloser != nil { | ||||
| 			c.httpIdleConnsCloser() | ||||
| 		} | ||||
| @ -184,12 +196,7 @@ func (c *Client) MarkOffline() { | ||||
| 				if status := atomic.LoadInt32(&c.connected); status == closed { | ||||
| 					return | ||||
| 				} | ||||
| 				ctx, cancel := context.WithTimeout(context.Background(), c.HealthCheckTimeout) | ||||
| 				respBody, err := c.CallWithContext(ctx, c.HealthCheckPath, nil, nil, -1) | ||||
| 				xhttp.DrainBody(respBody) | ||||
| 				cancel() | ||||
| 				var ne *NetworkError | ||||
| 				if !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) { | ||||
| 				if c.HealthCheckFn() { | ||||
| 					atomic.CompareAndSwapInt32(&c.connected, offline, online) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| @ -22,14 +22,17 @@ import ( | ||||
| 	"crypto/tls" | ||||
| 	"encoding/gob" | ||||
| 	"encoding/hex" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"net/url" | ||||
| 	"path" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/minio/minio/cmd/http" | ||||
| 	xhttp "github.com/minio/minio/cmd/http" | ||||
| 	"github.com/minio/minio/cmd/logger" | ||||
| 	"github.com/minio/minio/cmd/rest" | ||||
| 	xnet "github.com/minio/minio/pkg/net" | ||||
| @ -656,6 +659,15 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { | ||||
| 	if err != nil { | ||||
| 		logger.Fatal(err, "Unable to initialize remote REST disks") | ||||
| 	} | ||||
| 	restClient.HealthCheckPath = "/" | ||||
| 
 | ||||
| 	restClient.HealthCheckInterval = 500 * time.Millisecond | ||||
| 	restClient.HealthCheckFn = func() bool { | ||||
| 		ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) | ||||
| 		respBody, err := restClient.CallWithContext(ctx, storageRESTMethodHealth, nil, nil, -1) | ||||
| 		xhttp.DrainBody(respBody) | ||||
| 		cancel() | ||||
| 		return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound | ||||
| 	} | ||||
| 
 | ||||
| 	return &storageRESTClient{endpoint: endpoint, restClient: restClient} | ||||
| } | ||||
|  | ||||
| @ -23,6 +23,7 @@ const ( | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	storageRESTMethodHealth               = "/health" | ||||
| 	storageRESTMethodDiskInfo             = "/diskinfo" | ||||
| 	storageRESTMethodCrawlAndGetDataUsage = "/crawlandgetdatausage" | ||||
| 	storageRESTMethodMakeVol              = "/makevol" | ||||
|  | ||||
| @ -47,7 +47,11 @@ type storageRESTServer struct { | ||||
| } | ||||
| 
 | ||||
| func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { | ||||
| 	w.WriteHeader(http.StatusForbidden) | ||||
| 	if errors.Is(err, errDiskStale) { | ||||
| 		w.WriteHeader(http.StatusPreconditionFailed) | ||||
| 	} else { | ||||
| 		w.WriteHeader(http.StatusForbidden) | ||||
| 	} | ||||
| 	w.Write([]byte(err.Error())) | ||||
| 	w.(http.Flusher).Flush() | ||||
| } | ||||
| @ -118,6 +122,11 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| // HealthHandler handler checks if disk is stale | ||||
| func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	s.IsValid(w, r) | ||||
| } | ||||
| 
 | ||||
| // DiskInfoHandler - returns disk info. | ||||
| func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	if !s.IsValid(w, r) { | ||||
| @ -828,6 +837,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones | ||||
| 
 | ||||
| 			subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() | ||||
| 
 | ||||
| 			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler)) | ||||
| 			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler)) | ||||
| 			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCrawlAndGetDataUsage).HandlerFunc(httpTraceHdrs(server.CrawlAndGetDataUsageHandler)) | ||||
| 			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user