diff --git a/Makefile b/Makefile index 23ed066f3..c0ad9b712 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ lint-fix: getdeps ## runs golangci-lint suite of linters with automatic fixes check: test test: verifiers build build-debugging ## builds minio, runs linters, tests @echo "Running unit tests" - @MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -tags kqueue ./... + @MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -v -tags kqueue ./... test-root-disable: install-race @echo "Running minio root lockdown tests" diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index b8f6a8aaa..f6bca13f3 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2852,7 +2852,7 @@ func getClusterMetaInfo(ctx context.Context) []byte { ci := madmin.ClusterRegistrationInfo{} ci.Info.NoOfServerPools = len(globalEndpoints) - ci.Info.NoOfServers = len(globalEndpoints.Hostnames()) + ci.Info.NoOfServers = totalNodeCount() ci.Info.MinioVersion = Version si := objectAPI.StorageInfo(ctx, true) diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index d8ead7fa8..8b8efeb34 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -19,71 +19,47 @@ package cmd import ( "context" - "encoding/json" "errors" "fmt" - "io" - "net/http" - "net/url" - "os" + "math/rand" "reflect" "strings" + "sync" "time" "github.com/minio/minio-go/v7/pkg/set" - xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/grid" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/rest" - "github.com/minio/mux" "github.com/minio/pkg/v2/env" ) -const ( - bootstrapRESTVersion = "v1" - bootstrapRESTVersionPrefix = SlashSeparator + bootstrapRESTVersion - bootstrapRESTPrefix = minioReservedBucketPath + "/bootstrap" - bootstrapRESTPath = bootstrapRESTPrefix + bootstrapRESTVersionPrefix -) - -const ( - bootstrapRESTMethodHealth = "/health" - bootstrapRESTMethodVerify = "/verify" -) - // To abstract a node over network. type bootstrapRESTServer struct{} +//go:generate msgp -file=$GOFILE + // ServerSystemConfig - captures information about server configuration. type ServerSystemConfig struct { - MinioEndpoints EndpointServerPools - MinioEnv map[string]string + NEndpoints int + CmdLines []string + MinioEnv map[string]string } // Diff - returns error on first difference found in two configs. -func (s1 ServerSystemConfig) Diff(s2 ServerSystemConfig) error { - if s1.MinioEndpoints.NEndpoints() != s2.MinioEndpoints.NEndpoints() { - return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.NEndpoints(), - s2.MinioEndpoints.NEndpoints()) +func (s1 *ServerSystemConfig) Diff(s2 *ServerSystemConfig) error { + ns1 := s1.NEndpoints + ns2 := s2.NEndpoints + if ns1 != ns2 { + return fmt.Errorf("Expected number of endpoints %d, seen %d", ns1, ns2) } - for i, ep := range s1.MinioEndpoints { - if ep.CmdLine != s2.MinioEndpoints[i].CmdLine { - return fmt.Errorf("Expected command line argument %s, seen %s", ep.CmdLine, - s2.MinioEndpoints[i].CmdLine) - } - if ep.SetCount != s2.MinioEndpoints[i].SetCount { - return fmt.Errorf("Expected set count %d, seen %d", ep.SetCount, - s2.MinioEndpoints[i].SetCount) - } - if ep.DrivesPerSet != s2.MinioEndpoints[i].DrivesPerSet { - return fmt.Errorf("Expected drives pet set %d, seen %d", ep.DrivesPerSet, - s2.MinioEndpoints[i].DrivesPerSet) - } - if ep.Platform != s2.MinioEndpoints[i].Platform { - return fmt.Errorf("Expected platform '%s', found to be on '%s'", - ep.Platform, s2.MinioEndpoints[i].Platform) + for i, cmdLine := range s1.CmdLines { + if cmdLine != s2.CmdLines[i] { + return fmt.Errorf("Expected command line argument %s, seen %s", cmdLine, + s2.CmdLines[i]) } } + if reflect.DeepEqual(s1.MinioEnv, s2.MinioEnv) { return nil } @@ -131,7 +107,7 @@ var skipEnvs = map[string]struct{}{ "MINIO_SECRET_KEY": {}, } -func getServerSystemCfg() ServerSystemConfig { +func getServerSystemCfg() *ServerSystemConfig { envs := env.List("MINIO_") envValues := make(map[string]string, len(envs)) for _, envK := range envs { @@ -144,120 +120,102 @@ func getServerSystemCfg() ServerSystemConfig { } envValues[envK] = logger.HashString(env.Get(envK, "")) } - return ServerSystemConfig{ - MinioEndpoints: globalEndpoints, - MinioEnv: envValues, + scfg := &ServerSystemConfig{NEndpoints: globalEndpoints.NEndpoints(), MinioEnv: envValues} + var cmdLines []string + for _, ep := range globalEndpoints { + cmdLines = append(cmdLines, ep.CmdLine) } + scfg.CmdLines = cmdLines + return scfg } -func (b *bootstrapRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { - w.WriteHeader(http.StatusForbidden) - w.Write([]byte(err.Error())) +func (s *bootstrapRESTServer) VerifyHandler(params *grid.MSS) (*ServerSystemConfig, *grid.RemoteErr) { + return getServerSystemCfg(), nil } -// HealthHandler returns success if request is valid -func (b *bootstrapRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {} - -func (b *bootstrapRESTServer) VerifyHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "VerifyHandler") - - if err := storageServerRequestValidate(r); err != nil { - b.writeErrorResponse(w, err) - return - } - - cfg := getServerSystemCfg() - logger.LogIf(ctx, json.NewEncoder(w).Encode(&cfg)) -} +var serverVerifyHandler = grid.NewSingleHandler[*grid.MSS, *ServerSystemConfig](grid.HandlerServerVerify, grid.NewMSS, func() *ServerSystemConfig { return &ServerSystemConfig{} }) // registerBootstrapRESTHandlers - register bootstrap rest router. -func registerBootstrapRESTHandlers(router *mux.Router) { - h := func(f http.HandlerFunc) http.HandlerFunc { - return collectInternodeStats(httpTraceHdrs(f)) - } - +func registerBootstrapRESTHandlers(gm *grid.Manager) { server := &bootstrapRESTServer{} - subrouter := router.PathPrefix(bootstrapRESTPrefix).Subrouter() - - subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodHealth).HandlerFunc( - h(server.HealthHandler)) - - subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodVerify).HandlerFunc( - h(server.VerifyHandler)) + logger.FatalIf(serverVerifyHandler.Register(gm, server.VerifyHandler), "unable to register handler") } // client to talk to bootstrap NEndpoints. type bootstrapRESTClient struct { - endpoint Endpoint - restClient *rest.Client + gridConn *grid.Connection } -// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected -// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() -// after verifying format.json -func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - if values == nil { - values = make(url.Values) +// Verify function verifies the server config. +func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg *ServerSystemConfig) (err error) { + if newObjectLayerFn() != nil { + return nil } - respBody, err = client.restClient.Call(ctx, method, values, body, length) - if err == nil { - return respBody, nil + recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{})) + if err != nil { + return err } - return nil, err + return srcCfg.Diff(recvCfg) } // Stringer provides a canonicalized representation of node. func (client *bootstrapRESTClient) String() string { - return client.endpoint.String() + return client.gridConn.String() } -// Verify - fetches system server config. -func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg ServerSystemConfig) (err error) { - if newObjectLayerFn() != nil { - return nil - } - respBody, err := client.callWithContext(ctx, bootstrapRESTMethodVerify, nil, nil, -1) - if err != nil { - return - } - defer xhttp.DrainBody(respBody) - recvCfg := ServerSystemConfig{} - if err = json.NewDecoder(respBody).Decode(&recvCfg); err != nil { - return err - } - return srcCfg.Diff(recvCfg) -} - -func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointServerPools) error { +func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointServerPools, gm *grid.Manager) error { srcCfg := getServerSystemCfg() - clnts := newBootstrapRESTClients(endpointServerPools) + clnts := newBootstrapRESTClients(endpointServerPools, gm) var onlineServers int var offlineEndpoints []error var incorrectConfigs []error var retries int + var mu sync.Mutex for onlineServers < len(clnts)/2 { + var wg sync.WaitGroup + wg.Add(len(clnts)) + onlineServers = 0 for _, clnt := range clnts { - if err := clnt.Verify(ctx, srcCfg); err != nil { - bootstrapTraceMsg(fmt.Sprintf("clnt.Verify: %v, endpoint: %v", err, clnt.endpoint)) - if !isNetworkError(err) { - logger.LogOnceIf(ctx, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err), clnt.String()) - incorrectConfigs = append(incorrectConfigs, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err)) - } else { - offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt.String(), err)) + go func(clnt *bootstrapRESTClient) { + defer wg.Done() + + if clnt.gridConn.State() != grid.StateConnected { + mu.Lock() + offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt, grid.ErrDisconnected)) + mu.Unlock() + return } - continue - } - onlineServers++ + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + err := clnt.Verify(ctx, srcCfg) + mu.Lock() + if err != nil { + bootstrapTraceMsg(fmt.Sprintf("clnt.Verify: %v, endpoint: %s", err, clnt)) + if !isNetworkError(err) { + logger.LogOnceIf(context.Background(), fmt.Errorf("%s has incorrect configuration: %w", clnt, err), "incorrect_"+clnt.String()) + incorrectConfigs = append(incorrectConfigs, fmt.Errorf("%s has incorrect configuration: %w", clnt, err)) + } else { + offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt, err)) + } + } else { + onlineServers++ + } + mu.Unlock() + }(clnt) } + wg.Wait() + select { case <-ctx.Done(): return ctx.Err() default: - // Sleep for a while - so that we don't go into - // 100% CPU when half the endpoints are offline. - time.Sleep(100 * time.Millisecond) + // Sleep and stagger to avoid blocked CPU and thundering + // herd upon start up sequence. + time.Sleep(25*time.Millisecond + time.Duration(rand.Int63n(int64(100*time.Millisecond)))) retries++ // after 20 retries start logging that servers are not reachable yet if retries >= 20 { @@ -268,7 +226,7 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS if len(incorrectConfigs) > 0 { logger.Info(fmt.Sprintf("Following servers have mismatching configuration %s", incorrectConfigs)) } - retries = 0 // reset to log again after 5 retries. + retries = 0 // reset to log again after 20 retries. } offlineEndpoints = nil incorrectConfigs = nil @@ -277,39 +235,20 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS return nil } -func newBootstrapRESTClients(endpointServerPools EndpointServerPools) []*bootstrapRESTClient { - seenHosts := set.NewStringSet() +func newBootstrapRESTClients(endpointServerPools EndpointServerPools, gm *grid.Manager) []*bootstrapRESTClient { + seenClient := set.NewStringSet() var clnts []*bootstrapRESTClient for _, ep := range endpointServerPools { for _, endpoint := range ep.Endpoints { - if seenHosts.Contains(endpoint.Host) { + if endpoint.IsLocal { continue } - seenHosts.Add(endpoint.Host) - - // Only proceed for remote endpoints. - if !endpoint.IsLocal { - cl := newBootstrapRESTClient(endpoint) - if serverDebugLog { - cl.restClient.TraceOutput = os.Stdout - } - clnts = append(clnts, cl) + if seenClient.Contains(endpoint.Host) { + continue } + seenClient.Add(endpoint.Host) + clnts = append(clnts, &bootstrapRESTClient{gm.Connection(endpoint.GridHost())}) } } return clnts } - -// Returns a new bootstrap client. -func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient { - serverURL := &url.URL{ - Scheme: endpoint.Scheme, - Host: endpoint.Host, - Path: bootstrapRESTPath, - } - - restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) - restClient.HealthCheckFn = nil - - return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient} -} diff --git a/cmd/bootstrap-peer-server_gen.go b/cmd/bootstrap-peer-server_gen.go new file mode 100644 index 000000000..77f4cde1d --- /dev/null +++ b/cmd/bootstrap-peer-server_gen.go @@ -0,0 +1,270 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *ServerSystemConfig) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "NEndpoints": + z.NEndpoints, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "NEndpoints") + return + } + case "CmdLines": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "CmdLines") + return + } + if cap(z.CmdLines) >= int(zb0002) { + z.CmdLines = (z.CmdLines)[:zb0002] + } else { + z.CmdLines = make([]string, zb0002) + } + for za0001 := range z.CmdLines { + z.CmdLines[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "CmdLines", za0001) + return + } + } + case "MinioEnv": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + if z.MinioEnv == nil { + z.MinioEnv = make(map[string]string, zb0003) + } else if len(z.MinioEnv) > 0 { + for key := range z.MinioEnv { + delete(z.MinioEnv, key) + } + } + for zb0003 > 0 { + zb0003-- + var za0002 string + var za0003 string + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + za0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "MinioEnv", za0002) + return + } + z.MinioEnv[za0002] = za0003 + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ServerSystemConfig) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "NEndpoints" + err = en.Append(0x83, 0xaa, 0x4e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteInt(z.NEndpoints) + if err != nil { + err = msgp.WrapError(err, "NEndpoints") + return + } + // write "CmdLines" + err = en.Append(0xa8, 0x43, 0x6d, 0x64, 0x4c, 0x69, 0x6e, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.CmdLines))) + if err != nil { + err = msgp.WrapError(err, "CmdLines") + return + } + for za0001 := range z.CmdLines { + err = en.WriteString(z.CmdLines[za0001]) + if err != nil { + err = msgp.WrapError(err, "CmdLines", za0001) + return + } + } + // write "MinioEnv" + err = en.Append(0xa8, 0x4d, 0x69, 0x6e, 0x69, 0x6f, 0x45, 0x6e, 0x76) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.MinioEnv))) + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + for za0002, za0003 := range z.MinioEnv { + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + err = en.WriteString(za0003) + if err != nil { + err = msgp.WrapError(err, "MinioEnv", za0002) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ServerSystemConfig) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "NEndpoints" + o = append(o, 0x83, 0xaa, 0x4e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73) + o = msgp.AppendInt(o, z.NEndpoints) + // string "CmdLines" + o = append(o, 0xa8, 0x43, 0x6d, 0x64, 0x4c, 0x69, 0x6e, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.CmdLines))) + for za0001 := range z.CmdLines { + o = msgp.AppendString(o, z.CmdLines[za0001]) + } + // string "MinioEnv" + o = append(o, 0xa8, 0x4d, 0x69, 0x6e, 0x69, 0x6f, 0x45, 0x6e, 0x76) + o = msgp.AppendMapHeader(o, uint32(len(z.MinioEnv))) + for za0002, za0003 := range z.MinioEnv { + o = msgp.AppendString(o, za0002) + o = msgp.AppendString(o, za0003) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ServerSystemConfig) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "NEndpoints": + z.NEndpoints, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "NEndpoints") + return + } + case "CmdLines": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "CmdLines") + return + } + if cap(z.CmdLines) >= int(zb0002) { + z.CmdLines = (z.CmdLines)[:zb0002] + } else { + z.CmdLines = make([]string, zb0002) + } + for za0001 := range z.CmdLines { + z.CmdLines[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "CmdLines", za0001) + return + } + } + case "MinioEnv": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + if z.MinioEnv == nil { + z.MinioEnv = make(map[string]string, zb0003) + } else if len(z.MinioEnv) > 0 { + for key := range z.MinioEnv { + delete(z.MinioEnv, key) + } + } + for zb0003 > 0 { + var za0002 string + var za0003 string + zb0003-- + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MinioEnv") + return + } + za0003, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MinioEnv", za0002) + return + } + z.MinioEnv[za0002] = za0003 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ServerSystemConfig) Msgsize() (s int) { + s = 1 + 11 + msgp.IntSize + 9 + msgp.ArrayHeaderSize + for za0001 := range z.CmdLines { + s += msgp.StringPrefixSize + len(z.CmdLines[za0001]) + } + s += 9 + msgp.MapHeaderSize + if z.MinioEnv != nil { + for za0002, za0003 := range z.MinioEnv { + _ = za0003 + s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003) + } + } + return +} diff --git a/cmd/bootstrap-peer-server_gen_test.go b/cmd/bootstrap-peer-server_gen_test.go new file mode 100644 index 000000000..1446451de --- /dev/null +++ b/cmd/bootstrap-peer-server_gen_test.go @@ -0,0 +1,123 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalServerSystemConfig(t *testing.T) { + v := ServerSystemConfig{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgServerSystemConfig(b *testing.B) { + v := ServerSystemConfig{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgServerSystemConfig(b *testing.B) { + v := ServerSystemConfig{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalServerSystemConfig(b *testing.B) { + v := ServerSystemConfig{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeServerSystemConfig(t *testing.T) { + v := ServerSystemConfig{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeServerSystemConfig Msgsize() is inaccurate") + } + + vn := ServerSystemConfig{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeServerSystemConfig(b *testing.B) { + v := ServerSystemConfig{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeServerSystemConfig(b *testing.B) { + v := ServerSystemConfig{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 84a6a6157..8faaee245 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -386,6 +386,31 @@ func (l EndpointServerPools) NEndpoints() (count int) { return count } +// GridHosts will return all peers, including local. +// in websocket grid compatible format, The local peer +// is returned as a separate string. +func (l EndpointServerPools) GridHosts() (gridHosts []string, gridLocal string) { + seenHosts := set.NewStringSet() + for _, ep := range l { + for _, endpoint := range ep.Endpoints { + u := endpoint.GridHost() + if seenHosts.Contains(u) { + continue + } + seenHosts.Add(u) + + // Set local endpoint + if endpoint.IsLocal { + gridLocal = u + } + + gridHosts = append(gridHosts, u) + } + } + + return gridHosts, gridLocal +} + // Hostnames - returns list of unique hostnames func (l EndpointServerPools) Hostnames() []string { foundSet := set.NewStringSet() @@ -435,7 +460,9 @@ func (l EndpointServerPools) peers() (peers []string, local string) { peer := endpoint.Host if endpoint.IsLocal { if _, port := mustSplitHostPort(peer); port == globalMinioPort { - local = peer + if local == "" { + local = peer + } } } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 2ba9a895e..690a448bb 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -118,8 +118,10 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ return nil, fmt.Errorf("parity validation returned an error: %w <- (%d, %d), for pool(%s)", err, commonParityDrives, ep.DrivesPerSet, humanize.Ordinal(i+1)) } - storageDisks[i], formats[i], err = waitForFormatErasure(local, ep.Endpoints, i+1, - ep.SetCount, ep.DrivesPerSet, deploymentID, distributionAlgo) + bootstrapTrace("waitForFormatErasure: loading disks", func() { + storageDisks[i], formats[i], err = waitForFormatErasure(local, ep.Endpoints, i+1, + ep.SetCount, ep.DrivesPerSet, deploymentID, distributionAlgo) + }) if err != nil { return nil, err } @@ -138,7 +140,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ return nil, fmt.Errorf("all pools must have same deployment ID - expected %s, got %s for pool(%s)", deploymentID, formats[i].ID, humanize.Ordinal(i+1)) } - z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i) + bootstrapTrace(fmt.Sprintf("newErasureSets: initializing %s pool", humanize.Ordinal(i+1)), func() { + z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i) + }) if err != nil { return nil, err } @@ -176,8 +180,12 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ setObjectLayer(z) r := rand.New(rand.NewSource(time.Now().UnixNano())) + attempt := 1 for { - err := z.Init(ctx) // Initializes all pools. + var err error + bootstrapTrace(fmt.Sprintf("poolMeta.Init: loading pool metadata, attempt: %d", attempt), func() { + err = z.Init(ctx) // Initializes all pools. + }) if err != nil { if !configRetriableErrors(err) { logger.Fatal(err, "Unable to initialize backend") @@ -185,6 +193,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ retry := time.Duration(r.Float64() * float64(5*time.Second)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize backend: %w, retrying in %s", err, retry)) time.Sleep(retry) + attempt++ continue } break diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 36a37ca08..0342414b8 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -384,19 +384,27 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [ } } + var wg sync.WaitGroup + var lk sync.Mutex for i := 0; i < setCount; i++ { lockerEpSet := set.NewStringSet() for j := 0; j < setDriveCount; j++ { - endpoint := endpoints.Endpoints[i*setDriveCount+j] - // Only add lockers only one per endpoint and per erasure set. - if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { - lockerEpSet.Add(endpoint.Host) - s.erasureLockers[i] = append(s.erasureLockers[i], locker) - } + wg.Add(1) + go func(i int, endpoint Endpoint) { + defer wg.Done() + + lk.Lock() + // Only add lockers only one per endpoint and per erasure set. + if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { + lockerEpSet.Add(endpoint.Host) + s.erasureLockers[i] = append(s.erasureLockers[i], locker) + } + lk.Unlock() + }(i, endpoints.Endpoints[i*setDriveCount+j]) } } + wg.Wait() - var wg sync.WaitGroup for i := 0; i < setCount; i++ { wg.Add(1) go func(i int) { @@ -1021,48 +1029,6 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs return beforeDrives } -func getHealDiskInfos(storageDisks []StorageAPI, errs []error) ([]DiskInfo, []error) { - infos := make([]DiskInfo, len(storageDisks)) - g := errgroup.WithNErrs(len(storageDisks)) - for index := range storageDisks { - index := index - g.Go(func() error { - if errs[index] != nil && errs[index] != errUnformattedDisk { - return errs[index] - } - if storageDisks[index] == nil { - return errDiskNotFound - } - var err error - infos[index], err = storageDisks[index].DiskInfo(context.TODO(), false) - return err - }, index) - } - return infos, g.Wait() -} - -// Mark root disks as down so as not to heal them. -func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) { - if globalIsCICD { - // Do nothing - return - } - infos, ierrs := getHealDiskInfos(storageDisks, errs) - for i := range storageDisks { - if ierrs[i] != nil && ierrs[i] != errUnformattedDisk { - storageDisks[i] = nil - continue - } - if storageDisks[i] != nil && infos[i].RootDisk { - // We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a - // defective drive we should not heal a path on the root disk. - logger.LogIf(GlobalContext, fmt.Errorf("Drive `%s` is part of root drive, will not be used", storageDisks[i])) - storageDisks[i].Close() - storageDisks[i] = nil - } - } -} - // HealFormat - heals missing `format.json` on fresh unformatted disks. func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{ @@ -1081,9 +1047,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return madmin.HealResultItem{}, err } - // Mark all root disks down - markRootDisksAsDown(storageDisks, sErrs) - refFormat, err := getFormatErasureInQuorum(formats) if err != nil { return res, err diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 5af8b84e6..90e1807a0 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -822,9 +822,6 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, } } - // Mark all root disks down - markRootDisksAsDown(storageDisks, sErrs) - // Save formats `format.json` across all disks. if err := saveFormatErasureAll(ctx, storageDisks, formats); err != nil { return nil, err diff --git a/cmd/grid.go b/cmd/grid.go index bd7029c10..e347e994d 100644 --- a/cmd/grid.go +++ b/cmd/grid.go @@ -22,7 +22,6 @@ import ( "crypto/tls" "sync/atomic" - "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/fips" "github.com/minio/minio/internal/grid" xhttp "github.com/minio/minio/internal/http" @@ -36,28 +35,11 @@ var globalGrid atomic.Pointer[grid.Manager] var globalGridStart = make(chan struct{}) func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { - seenHosts := set.NewStringSet() - var hosts []string - var local string - for _, ep := range eps { - for _, endpoint := range ep.Endpoints { - u := endpoint.GridHost() - if seenHosts.Contains(u) { - continue - } - seenHosts.Add(u) - - // Set local endpoint - if endpoint.IsLocal { - local = u - } - hosts = append(hosts, u) - } - } lookupHost := globalDNSCache.LookupHost if IsKubernetes() || IsDocker() { lookupHost = nil } + hosts, local := eps.GridHosts() g, err := grid.NewManager(ctx, grid.ManagerOptions{ Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions))), Local: local, diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 3f592ffa1..ae7781116 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -133,8 +133,8 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { } } else { apiRequestsMaxPerNode = cfg.RequestsMax - if len(globalEndpoints.Hostnames()) > 0 { - apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames()) + if n := totalNodeCount(); n > 0 { + apiRequestsMaxPerNode /= n } } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index c1a428707..1c9357464 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -231,24 +231,6 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo return storageDisks, format, nil } - // Mark all root disks down - markRootDisksAsDown(storageDisks, sErrs) - - // Following function is added to fix a regressions which was introduced - // in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3. - // This migration failed to capture '.This' field properly which indicates - // the disk UUID association. Below function is called to handle and fix - // this regression, for more info refer https://github.com/minio/minio/issues/5667 - if err = fixFormatErasureV3(storageDisks, endpoints, formatConfigs); err != nil { - logger.LogIf(GlobalContext, err) - return nil, nil, err - } - - // If any of the .This field is still empty, we return error. - if formatErasureV3ThisEmpty(formatConfigs) { - return nil, nil, errErasureV3ThisEmpty - } - format, err = getFormatErasureInQuorum(formatConfigs) if err != nil { logger.LogIf(GlobalContext, err) diff --git a/cmd/routers.go b/cmd/routers.go index 91f47c739..7ee5446bf 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -36,7 +36,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint registerPeerS3Handlers(router) // Register bootstrap REST router for distributed setups. - registerBootstrapRESTHandlers(router) + registerBootstrapRESTHandlers(globalGrid.Load()) // Register distributed namespace lock routers. registerLockRESTHandlers() diff --git a/cmd/server-main.go b/cmd/server-main.go index a13961d37..c2b92536e 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -381,7 +381,7 @@ func initAllSubsystems(ctx context.Context) { } // Create the bucket bandwidth monitor - globalBucketMonitor = bandwidth.NewMonitor(ctx, totalNodeCount()) + globalBucketMonitor = bandwidth.NewMonitor(ctx, uint64(totalNodeCount())) // Create a new config system. globalConfigSys = NewConfigSys() @@ -725,7 +725,7 @@ func serverMain(ctx *cli.Context) { getCert = globalTLSCerts.GetCertificate } - // Initialize grid + // Initialize gridn bootstrapTrace("initGrid", func() { logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services") }) @@ -767,7 +767,7 @@ func serverMain(ctx *cli.Context) { if globalIsDistErasure { bootstrapTrace("verifying system configuration", func() { // Additionally in distributed setup, validate the setup and configuration. - if err := verifyServerSystemConfig(GlobalContext, globalEndpoints); err != nil { + if err := verifyServerSystemConfig(GlobalContext, globalEndpoints, globalGrid.Load()); err != nil { logger.Fatal(err, "Unable to start the server") } }) diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index f31107f53..5aff111f0 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -54,8 +54,12 @@ var errDiskNotDir = StorageErr("drive is not directory or mountpoint") // errDiskNotFound - cannot find the underlying configured disk anymore. var errDiskNotFound = StorageErr("drive not found") +// errDiskOngoingReq - indicates if the disk has an on-going request in progress. var errDiskOngoingReq = StorageErr("drive still did not complete the request") +// errDriveIsRoot - cannot use the disk since its a root disk. +var errDriveIsRoot = StorageErr("drive is part of root drive, will not be used") + // errFaultyRemoteDisk - remote disk is faulty. var errFaultyRemoteDisk = StorageErr("remote drive is faulty") diff --git a/cmd/utils.go b/cmd/utils.go index a2b989a94..cb1868beb 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1036,9 +1036,8 @@ func isDirObject(object string) bool { } // Helper method to return total number of nodes in cluster -func totalNodeCount() uint64 { - peers, _ := globalEndpoints.peers() - totalNodesCount := uint64(len(peers)) +func totalNodeCount() int { + totalNodesCount := len(globalEndpoints.Hostnames()) if totalNodesCount == 0 { totalNodesCount = 1 // For standalone erasure coding } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3c89e9c83..3ccd769f3 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -101,7 +101,6 @@ type xlStorage struct { globalSync bool oDirect bool // indicates if this disk supports ODirect - rootDisk bool diskID string @@ -240,26 +239,28 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { return nil, err } - var rootDisk bool if !globalIsCICD && !globalIsErasureSD { + var rootDrive bool if globalRootDiskThreshold > 0 { // Use MINIO_ROOTDISK_THRESHOLD_SIZE to figure out if // this disk is a root disk. treat those disks with - // size less than or equal to the threshold as rootDisks. - rootDisk = info.Total <= globalRootDiskThreshold + // size less than or equal to the threshold as rootDrives. + rootDrive = info.Total <= globalRootDiskThreshold } else { - rootDisk, err = disk.IsRootDisk(path, SlashSeparator) + rootDrive, err = disk.IsRootDisk(path, SlashSeparator) if err != nil { return nil, err } } + if rootDrive { + return nil, errDriveIsRoot + } } s = &xlStorage{ drivePath: path, endpoint: ep, globalSync: globalFSOSync, - rootDisk: rootDisk, poolIndex: -1, setIndex: -1, diskIndex: -1, @@ -761,7 +762,6 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro info = v.(DiskInfo) } - info.RootDisk = s.rootDisk info.MountPath = s.drivePath info.Endpoint = s.endpoint.String() info.Scanning = atomic.LoadInt32(&s.scanning) == 1 diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 4d40ccda0..e53358a1d 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -57,6 +57,7 @@ const ( HandlerCheckParts HandlerRenameData + HandlerServerVerify // Add more above here ^^^ // If all handlers are used, the type of Handler can be changed. // Handlers have no versioning, so non-compatible handler changes must result in new IDs. @@ -86,11 +87,13 @@ var handlerPrefixes = [handlerLast]string{ HandlerWriteMetadata: storagePrefix, HandlerCheckParts: storagePrefix, HandlerRenameData: storagePrefix, + HandlerServerVerify: bootstrapPrefix, } const ( - lockPrefix = "lockR" - storagePrefix = "storageR" + lockPrefix = "lockR" + storagePrefix = "storageR" + bootstrapPrefix = "bootstrap" ) func init() { diff --git a/internal/grid/handlers_string.go b/internal/grid/handlers_string.go index 8c20fee5f..a21c3c742 100644 --- a/internal/grid/handlers_string.go +++ b/internal/grid/handlers_string.go @@ -27,14 +27,15 @@ func _() { _ = x[HandlerWriteMetadata-16] _ = x[HandlerCheckParts-17] _ = x[HandlerRenameData-18] - _ = x[handlerTest-19] - _ = x[handlerTest2-20] - _ = x[handlerLast-21] + _ = x[HandlerServerVerify-19] + _ = x[handlerTest-20] + _ = x[handlerTest2-21] + _ = x[handlerLast-22] } -const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDatahandlerTesthandlerTest2handlerLast" +const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataServerVerifyhandlerTesthandlerTest2handlerLast" -var _HandlerID_index = [...]uint8{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 207, 219, 230} +var _HandlerID_index = [...]uint8{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 208, 219, 231, 242} func (i HandlerID) String() string { if i >= HandlerID(len(_HandlerID_index)-1) {