Add bootstrap REST handler for verifying server config (#8550)

This commit is contained in:
Harshavardhana 2019-11-22 12:45:13 -08:00 committed by GitHub
parent 890b493a2e
commit c3771df641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 276 additions and 19 deletions

View File

@ -0,0 +1,257 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"runtime"
"sync/atomic"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio-go/pkg/set"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
)
const (
bootstrapRESTVersion = "v1"
bootstrapRESTVersionPrefix = SlashSeparator + bootstrapRESTVersion
bootstrapRESTPrefix = minioReservedBucketPath + "/bootstrap"
bootstrapRESTPath = bootstrapRESTPrefix + bootstrapRESTVersionPrefix
)
const (
bootstrapRESTMethodVerify = "/verify"
)
// To abstract a node over network.
type bootstrapRESTServer struct{}
// ServerSystemConfig - captures information about server configuration.
type ServerSystemConfig struct {
MinioPlatform string
MinioRuntime string
MinioEndpoints EndpointZones
}
// Diff - returns error on first difference found in two configs.
func (s1 ServerSystemConfig) Diff(s2 ServerSystemConfig) error {
if s1.MinioPlatform != s2.MinioPlatform {
return fmt.Errorf("Expected platform '%s', found to be running '%s'",
s1.MinioPlatform, s2.MinioPlatform)
}
if s1.MinioEndpoints.Nodes() != s2.MinioEndpoints.Nodes() {
return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.Nodes(),
s2.MinioEndpoints.Nodes())
}
for i, ep := range s1.MinioEndpoints {
if ep.SetCount != s2.MinioEndpoints[i].SetCount {
return fmt.Errorf("Expected set count %d, seen %d", ep.SetCount,
s2.MinioEndpoints[i].SetCount)
}
if ep.DrivesPerSet != s2.MinioEndpoints[i].DrivesPerSet {
return fmt.Errorf("Expected drives pet set %d, seen %d", ep.DrivesPerSet,
s2.MinioEndpoints[i].DrivesPerSet)
}
for j, endpoint := range ep.Endpoints {
if endpoint.String() != s2.MinioEndpoints[i].Endpoints[j].String() {
return fmt.Errorf("Expected endpoint %s, seen %s", endpoint,
s2.MinioEndpoints[i].Endpoints[j])
}
}
}
return nil
}
func getServerSystemCfg() ServerSystemConfig {
return ServerSystemConfig{
MinioPlatform: fmt.Sprintf("OS: %s | Arch: %s", runtime.GOOS, runtime.GOARCH),
MinioEndpoints: globalEndpoints,
}
}
func (b *bootstrapRESTServer) VerifyHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "VerifyHandler")
cfg := getServerSystemCfg()
logger.LogIf(ctx, json.NewEncoder(w).Encode(&cfg))
w.(http.Flusher).Flush()
}
// registerBootstrapRESTHandlers - register bootstrap rest router.
func registerBootstrapRESTHandlers(router *mux.Router) {
server := &bootstrapRESTServer{}
subrouter := router.PathPrefix(bootstrapRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodVerify).HandlerFunc(
httpTraceHdrs(server.VerifyHandler))
}
// client to talk to bootstrap Nodes.
type bootstrapRESTClient struct {
endpoint Endpoint
restClient *rest.Client
connected int32
}
// Reconnect to a bootstrap rest server.k
func (client *bootstrapRESTClient) reConnect() {
atomic.StoreInt32(&client.connected, 1)
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *bootstrapRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
return client.callWithContext(context.Background(), method, values, body, length)
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
if !client.IsOnline() {
client.reConnect()
}
if values == nil {
values = make(url.Values)
}
respBody, err = client.restClient.CallWithContext(ctx, method, values, body, length)
if err == nil {
return respBody, nil
}
if isNetworkError(err) {
atomic.StoreInt32(&client.connected, 0)
}
return nil, err
}
// Stringer provides a canonicalized representation of node.
func (client *bootstrapRESTClient) String() string {
return client.endpoint.String()
}
// IsOnline - returns whether RPC client failed to connect or not.
func (client *bootstrapRESTClient) IsOnline() bool {
return atomic.LoadInt32(&client.connected) == 1
}
// Close - marks the client as closed.
func (client *bootstrapRESTClient) Close() error {
atomic.StoreInt32(&client.connected, 0)
client.restClient.Close()
return nil
}
// Verify - fetches system server config.
func (client *bootstrapRESTClient) Verify(srcCfg ServerSystemConfig) (err error) {
if newObjectLayerFn() != nil {
return nil
}
respBody, err := client.call(bootstrapRESTMethodVerify, nil, nil, -1)
if err != nil {
return
}
defer xhttp.DrainBody(respBody)
recvCfg := ServerSystemConfig{}
if err = json.NewDecoder(respBody).Decode(&recvCfg); err != nil {
return err
}
return srcCfg.Diff(recvCfg)
}
func verifyServerSystemConfig(endpointZones EndpointZones) error {
srcCfg := getServerSystemCfg()
clnts := newBootstrapRESTClients(endpointZones)
var onlineServers int
for onlineServers < len(clnts)/2 {
for _, clnt := range clnts {
if err := clnt.Verify(srcCfg); err != nil {
if isNetworkError(err) {
continue
}
return fmt.Errorf("%s as has incorrect configuration: %w", clnt.String(), err)
}
onlineServers++
}
// Sleep for a while - so that we don't go into
// 100% CPU when half the endpoints are offline.
time.Sleep(500 * time.Millisecond)
}
return nil
}
func newBootstrapRESTClients(endpointZones EndpointZones) []*bootstrapRESTClient {
seenHosts := set.NewStringSet()
var clnts []*bootstrapRESTClient
for _, ep := range endpointZones {
for _, endpoint := range ep.Endpoints {
if seenHosts.Contains(endpoint.Host) {
continue
}
seenHosts.Add(endpoint.Host)
// Only proceed for remote endpoints.
if !endpoint.IsLocal {
clnt, err := newBootstrapRESTClient(endpoint)
if err != nil {
continue
}
clnts = append(clnts, clnt)
}
}
}
return clnts
}
// Returns a new bootstrap client.
func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) {
serverURL := &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
Path: bootstrapRESTPath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
}
}
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return nil, err
}
return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}, nil
}

View File

@ -196,8 +196,8 @@ type ZoneEndpoints struct {
// EndpointZones - list of list of endpoints // EndpointZones - list of list of endpoints
type EndpointZones []ZoneEndpoints type EndpointZones []ZoneEndpoints
// First returns true if the first endpoint is local. // FirstLocal returns true if the first endpoint is local.
func (l EndpointZones) First() bool { func (l EndpointZones) FirstLocal() bool {
return l[0].Endpoints[0].IsLocal return l[0].Endpoints[0].IsLocal
} }

View File

@ -149,8 +149,4 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
globalLockServers[endpoint] = lockServer.ll globalLockServers[endpoint] = lockServer.ll
} }
} }
// If none of the routes match add default error handler routes
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
} }

View File

@ -1086,11 +1086,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketObjectLockConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketObjectLockConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)
// If none of the routes match add default error handler routes
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
} }

View File

@ -24,15 +24,17 @@ import (
// Composed function registering routers for only distributed XL setup. // Composed function registering routers for only distributed XL setup.
func registerDistXLRouters(router *mux.Router, endpointZones EndpointZones) { func registerDistXLRouters(router *mux.Router, endpointZones EndpointZones) {
// Register storage rpc router only if its a distributed setup. // Register storage REST router only if its a distributed setup.
registerStorageRESTHandlers(router, endpointZones) registerStorageRESTHandlers(router, endpointZones)
// Register peer REST router only if its a distributed setup. // Register peer REST router only if its a distributed setup.
registerPeerRESTHandlers(router) registerPeerRESTHandlers(router)
// Register distributed namespace lock. // Register bootstrap REST router for distributed setups.
registerLockRESTHandlers(router, endpointZones) registerBootstrapRESTHandlers(router)
// Register distributed namespace lock routers.
registerLockRESTHandlers(router, endpointZones)
} }
// List of some generic handlers which are applied for all incoming requests. // List of some generic handlers which are applied for all incoming requests.
@ -112,6 +114,10 @@ func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) {
// but don't allow SSE-KMS. // but don't allow SSE-KMS.
registerAPIRouter(router, true, false) registerAPIRouter(router, true, false)
// If none of the routes match add default error handler routes
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
// Register rest of the handlers. // Register rest of the handlers.
return registerHandlers(router, globalHandlers...), nil return registerHandlers(router, globalHandlers...), nil
} }

View File

@ -365,6 +365,13 @@ func serverMain(ctx *cli.Context) {
globalHTTPServerErrorCh <- globalHTTPServer.Start() globalHTTPServerErrorCh <- globalHTTPServer.Start()
}() }()
if globalIsDistXL && globalEndpoints.FirstLocal() {
// Additionally in distributed setup validate
if err := verifyServerSystemConfig(globalEndpoints); err != nil {
logger.Fatal(err, "Unable to initialize distributed setup")
}
}
newObject, err := newObjectLayer(globalEndpoints) newObject, err := newObjectLayer(globalEndpoints)
logger.SetDeploymentID(globalDeploymentID) logger.SetDeploymentID(globalDeploymentID)
if err != nil { if err != nil {

View File

@ -612,8 +612,4 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTBitrotHash, storageRESTLength, storageRESTShardSize)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTBitrotHash, storageRESTLength, storageRESTShardSize)...)
} }
} }
// If none of the routes match add default error handler routes
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
} }

View File

@ -60,7 +60,7 @@ func newXLZones(endpointZones EndpointZones) (ObjectLayer, error) {
z = &xlZones{zones: make([]*xlSets, len(endpointZones))} z = &xlZones{zones: make([]*xlSets, len(endpointZones))}
) )
for i, ep := range endpointZones { for i, ep := range endpointZones {
formats[i], err = waitForFormatXL(endpointZones.First(), ep.Endpoints, formats[i], err = waitForFormatXL(endpointZones.FirstLocal(), ep.Endpoints,
ep.SetCount, ep.DrivesPerSet, deploymentID) ep.SetCount, ep.DrivesPerSet, deploymentID)
if err != nil { if err != nil {
return nil, err return nil, err