migrate bootstrap logic directly to websockets (#18855)

improve performance for startup sequences by 2x for 300+ nodes.
This commit is contained in:
Harshavardhana 2024-01-24 13:36:44 -08:00 committed by GitHub
parent c905d3fe21
commit e377bb949a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 568 additions and 269 deletions

View File

@ -46,7 +46,7 @@ lint-fix: getdeps ## runs golangci-lint suite of linters with automatic fixes
check: test
test: verifiers build build-debugging ## builds minio, runs linters, tests
@echo "Running unit tests"
@MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -tags kqueue ./...
@MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -v -tags kqueue ./...
test-root-disable: install-race
@echo "Running minio root lockdown tests"

View File

@ -2852,7 +2852,7 @@ func getClusterMetaInfo(ctx context.Context) []byte {
ci := madmin.ClusterRegistrationInfo{}
ci.Info.NoOfServerPools = len(globalEndpoints)
ci.Info.NoOfServers = len(globalEndpoints.Hostnames())
ci.Info.NoOfServers = totalNodeCount()
ci.Info.MinioVersion = Version
si := objectAPI.StorageInfo(ctx, true)

View File

@ -19,71 +19,47 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"math/rand"
"reflect"
"strings"
"sync"
"time"
"github.com/minio/minio-go/v7/pkg/set"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/mux"
"github.com/minio/pkg/v2/env"
)
const (
bootstrapRESTVersion = "v1"
bootstrapRESTVersionPrefix = SlashSeparator + bootstrapRESTVersion
bootstrapRESTPrefix = minioReservedBucketPath + "/bootstrap"
bootstrapRESTPath = bootstrapRESTPrefix + bootstrapRESTVersionPrefix
)
const (
bootstrapRESTMethodHealth = "/health"
bootstrapRESTMethodVerify = "/verify"
)
// To abstract a node over network.
type bootstrapRESTServer struct{}
//go:generate msgp -file=$GOFILE
// ServerSystemConfig - captures information about server configuration.
type ServerSystemConfig struct {
MinioEndpoints EndpointServerPools
MinioEnv map[string]string
NEndpoints int
CmdLines []string
MinioEnv map[string]string
}
// Diff - returns error on first difference found in two configs.
func (s1 ServerSystemConfig) Diff(s2 ServerSystemConfig) error {
if s1.MinioEndpoints.NEndpoints() != s2.MinioEndpoints.NEndpoints() {
return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.NEndpoints(),
s2.MinioEndpoints.NEndpoints())
func (s1 *ServerSystemConfig) Diff(s2 *ServerSystemConfig) error {
ns1 := s1.NEndpoints
ns2 := s2.NEndpoints
if ns1 != ns2 {
return fmt.Errorf("Expected number of endpoints %d, seen %d", ns1, ns2)
}
for i, ep := range s1.MinioEndpoints {
if ep.CmdLine != s2.MinioEndpoints[i].CmdLine {
return fmt.Errorf("Expected command line argument %s, seen %s", ep.CmdLine,
s2.MinioEndpoints[i].CmdLine)
}
if ep.SetCount != s2.MinioEndpoints[i].SetCount {
return fmt.Errorf("Expected set count %d, seen %d", ep.SetCount,
s2.MinioEndpoints[i].SetCount)
}
if ep.DrivesPerSet != s2.MinioEndpoints[i].DrivesPerSet {
return fmt.Errorf("Expected drives pet set %d, seen %d", ep.DrivesPerSet,
s2.MinioEndpoints[i].DrivesPerSet)
}
if ep.Platform != s2.MinioEndpoints[i].Platform {
return fmt.Errorf("Expected platform '%s', found to be on '%s'",
ep.Platform, s2.MinioEndpoints[i].Platform)
for i, cmdLine := range s1.CmdLines {
if cmdLine != s2.CmdLines[i] {
return fmt.Errorf("Expected command line argument %s, seen %s", cmdLine,
s2.CmdLines[i])
}
}
if reflect.DeepEqual(s1.MinioEnv, s2.MinioEnv) {
return nil
}
@ -131,7 +107,7 @@ var skipEnvs = map[string]struct{}{
"MINIO_SECRET_KEY": {},
}
func getServerSystemCfg() ServerSystemConfig {
func getServerSystemCfg() *ServerSystemConfig {
envs := env.List("MINIO_")
envValues := make(map[string]string, len(envs))
for _, envK := range envs {
@ -144,120 +120,102 @@ func getServerSystemCfg() ServerSystemConfig {
}
envValues[envK] = logger.HashString(env.Get(envK, ""))
}
return ServerSystemConfig{
MinioEndpoints: globalEndpoints,
MinioEnv: envValues,
scfg := &ServerSystemConfig{NEndpoints: globalEndpoints.NEndpoints(), MinioEnv: envValues}
var cmdLines []string
for _, ep := range globalEndpoints {
cmdLines = append(cmdLines, ep.CmdLine)
}
scfg.CmdLines = cmdLines
return scfg
}
func (b *bootstrapRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
func (s *bootstrapRESTServer) VerifyHandler(params *grid.MSS) (*ServerSystemConfig, *grid.RemoteErr) {
return getServerSystemCfg(), nil
}
// HealthHandler returns success if request is valid
func (b *bootstrapRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {}
func (b *bootstrapRESTServer) VerifyHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "VerifyHandler")
if err := storageServerRequestValidate(r); err != nil {
b.writeErrorResponse(w, err)
return
}
cfg := getServerSystemCfg()
logger.LogIf(ctx, json.NewEncoder(w).Encode(&cfg))
}
var serverVerifyHandler = grid.NewSingleHandler[*grid.MSS, *ServerSystemConfig](grid.HandlerServerVerify, grid.NewMSS, func() *ServerSystemConfig { return &ServerSystemConfig{} })
// registerBootstrapRESTHandlers - register bootstrap rest router.
func registerBootstrapRESTHandlers(router *mux.Router) {
h := func(f http.HandlerFunc) http.HandlerFunc {
return collectInternodeStats(httpTraceHdrs(f))
}
func registerBootstrapRESTHandlers(gm *grid.Manager) {
server := &bootstrapRESTServer{}
subrouter := router.PathPrefix(bootstrapRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodHealth).HandlerFunc(
h(server.HealthHandler))
subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodVerify).HandlerFunc(
h(server.VerifyHandler))
logger.FatalIf(serverVerifyHandler.Register(gm, server.VerifyHandler), "unable to register handler")
}
// client to talk to bootstrap NEndpoints.
type bootstrapRESTClient struct {
endpoint Endpoint
restClient *rest.Client
gridConn *grid.Connection
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
if values == nil {
values = make(url.Values)
// Verify function verifies the server config.
func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg *ServerSystemConfig) (err error) {
if newObjectLayerFn() != nil {
return nil
}
respBody, err = client.restClient.Call(ctx, method, values, body, length)
if err == nil {
return respBody, nil
recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{}))
if err != nil {
return err
}
return nil, err
return srcCfg.Diff(recvCfg)
}
// Stringer provides a canonicalized representation of node.
func (client *bootstrapRESTClient) String() string {
return client.endpoint.String()
return client.gridConn.String()
}
// Verify - fetches system server config.
func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg ServerSystemConfig) (err error) {
if newObjectLayerFn() != nil {
return nil
}
respBody, err := client.callWithContext(ctx, bootstrapRESTMethodVerify, nil, nil, -1)
if err != nil {
return
}
defer xhttp.DrainBody(respBody)
recvCfg := ServerSystemConfig{}
if err = json.NewDecoder(respBody).Decode(&recvCfg); err != nil {
return err
}
return srcCfg.Diff(recvCfg)
}
func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointServerPools) error {
func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointServerPools, gm *grid.Manager) error {
srcCfg := getServerSystemCfg()
clnts := newBootstrapRESTClients(endpointServerPools)
clnts := newBootstrapRESTClients(endpointServerPools, gm)
var onlineServers int
var offlineEndpoints []error
var incorrectConfigs []error
var retries int
var mu sync.Mutex
for onlineServers < len(clnts)/2 {
var wg sync.WaitGroup
wg.Add(len(clnts))
onlineServers = 0
for _, clnt := range clnts {
if err := clnt.Verify(ctx, srcCfg); err != nil {
bootstrapTraceMsg(fmt.Sprintf("clnt.Verify: %v, endpoint: %v", err, clnt.endpoint))
if !isNetworkError(err) {
logger.LogOnceIf(ctx, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err), clnt.String())
incorrectConfigs = append(incorrectConfigs, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err))
} else {
offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt.String(), err))
go func(clnt *bootstrapRESTClient) {
defer wg.Done()
if clnt.gridConn.State() != grid.StateConnected {
mu.Lock()
offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt, grid.ErrDisconnected))
mu.Unlock()
return
}
continue
}
onlineServers++
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
err := clnt.Verify(ctx, srcCfg)
mu.Lock()
if err != nil {
bootstrapTraceMsg(fmt.Sprintf("clnt.Verify: %v, endpoint: %s", err, clnt))
if !isNetworkError(err) {
logger.LogOnceIf(context.Background(), fmt.Errorf("%s has incorrect configuration: %w", clnt, err), "incorrect_"+clnt.String())
incorrectConfigs = append(incorrectConfigs, fmt.Errorf("%s has incorrect configuration: %w", clnt, err))
} else {
offlineEndpoints = append(offlineEndpoints, fmt.Errorf("%s is unreachable: %w", clnt, err))
}
} else {
onlineServers++
}
mu.Unlock()
}(clnt)
}
wg.Wait()
select {
case <-ctx.Done():
return ctx.Err()
default:
// Sleep for a while - so that we don't go into
// 100% CPU when half the endpoints are offline.
time.Sleep(100 * time.Millisecond)
// Sleep and stagger to avoid blocked CPU and thundering
// herd upon start up sequence.
time.Sleep(25*time.Millisecond + time.Duration(rand.Int63n(int64(100*time.Millisecond))))
retries++
// after 20 retries start logging that servers are not reachable yet
if retries >= 20 {
@ -268,7 +226,7 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS
if len(incorrectConfigs) > 0 {
logger.Info(fmt.Sprintf("Following servers have mismatching configuration %s", incorrectConfigs))
}
retries = 0 // reset to log again after 5 retries.
retries = 0 // reset to log again after 20 retries.
}
offlineEndpoints = nil
incorrectConfigs = nil
@ -277,39 +235,20 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS
return nil
}
func newBootstrapRESTClients(endpointServerPools EndpointServerPools) []*bootstrapRESTClient {
seenHosts := set.NewStringSet()
func newBootstrapRESTClients(endpointServerPools EndpointServerPools, gm *grid.Manager) []*bootstrapRESTClient {
seenClient := set.NewStringSet()
var clnts []*bootstrapRESTClient
for _, ep := range endpointServerPools {
for _, endpoint := range ep.Endpoints {
if seenHosts.Contains(endpoint.Host) {
if endpoint.IsLocal {
continue
}
seenHosts.Add(endpoint.Host)
// Only proceed for remote endpoints.
if !endpoint.IsLocal {
cl := newBootstrapRESTClient(endpoint)
if serverDebugLog {
cl.restClient.TraceOutput = os.Stdout
}
clnts = append(clnts, cl)
if seenClient.Contains(endpoint.Host) {
continue
}
seenClient.Add(endpoint.Host)
clnts = append(clnts, &bootstrapRESTClient{gm.Connection(endpoint.GridHost())})
}
}
return clnts
}
// Returns a new bootstrap client.
func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient {
serverURL := &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
Path: bootstrapRESTPath,
}
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
restClient.HealthCheckFn = nil
return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}
}

View File

@ -0,0 +1,270 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *ServerSystemConfig) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "NEndpoints":
z.NEndpoints, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "NEndpoints")
return
}
case "CmdLines":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "CmdLines")
return
}
if cap(z.CmdLines) >= int(zb0002) {
z.CmdLines = (z.CmdLines)[:zb0002]
} else {
z.CmdLines = make([]string, zb0002)
}
for za0001 := range z.CmdLines {
z.CmdLines[za0001], err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "CmdLines", za0001)
return
}
}
case "MinioEnv":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
if z.MinioEnv == nil {
z.MinioEnv = make(map[string]string, zb0003)
} else if len(z.MinioEnv) > 0 {
for key := range z.MinioEnv {
delete(z.MinioEnv, key)
}
}
for zb0003 > 0 {
zb0003--
var za0002 string
var za0003 string
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
za0003, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "MinioEnv", za0002)
return
}
z.MinioEnv[za0002] = za0003
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *ServerSystemConfig) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "NEndpoints"
err = en.Append(0x83, 0xaa, 0x4e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73)
if err != nil {
return
}
err = en.WriteInt(z.NEndpoints)
if err != nil {
err = msgp.WrapError(err, "NEndpoints")
return
}
// write "CmdLines"
err = en.Append(0xa8, 0x43, 0x6d, 0x64, 0x4c, 0x69, 0x6e, 0x65, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.CmdLines)))
if err != nil {
err = msgp.WrapError(err, "CmdLines")
return
}
for za0001 := range z.CmdLines {
err = en.WriteString(z.CmdLines[za0001])
if err != nil {
err = msgp.WrapError(err, "CmdLines", za0001)
return
}
}
// write "MinioEnv"
err = en.Append(0xa8, 0x4d, 0x69, 0x6e, 0x69, 0x6f, 0x45, 0x6e, 0x76)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.MinioEnv)))
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
for za0002, za0003 := range z.MinioEnv {
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
err = en.WriteString(za0003)
if err != nil {
err = msgp.WrapError(err, "MinioEnv", za0002)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *ServerSystemConfig) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "NEndpoints"
o = append(o, 0x83, 0xaa, 0x4e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73)
o = msgp.AppendInt(o, z.NEndpoints)
// string "CmdLines"
o = append(o, 0xa8, 0x43, 0x6d, 0x64, 0x4c, 0x69, 0x6e, 0x65, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.CmdLines)))
for za0001 := range z.CmdLines {
o = msgp.AppendString(o, z.CmdLines[za0001])
}
// string "MinioEnv"
o = append(o, 0xa8, 0x4d, 0x69, 0x6e, 0x69, 0x6f, 0x45, 0x6e, 0x76)
o = msgp.AppendMapHeader(o, uint32(len(z.MinioEnv)))
for za0002, za0003 := range z.MinioEnv {
o = msgp.AppendString(o, za0002)
o = msgp.AppendString(o, za0003)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *ServerSystemConfig) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "NEndpoints":
z.NEndpoints, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "NEndpoints")
return
}
case "CmdLines":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "CmdLines")
return
}
if cap(z.CmdLines) >= int(zb0002) {
z.CmdLines = (z.CmdLines)[:zb0002]
} else {
z.CmdLines = make([]string, zb0002)
}
for za0001 := range z.CmdLines {
z.CmdLines[za0001], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "CmdLines", za0001)
return
}
}
case "MinioEnv":
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
if z.MinioEnv == nil {
z.MinioEnv = make(map[string]string, zb0003)
} else if len(z.MinioEnv) > 0 {
for key := range z.MinioEnv {
delete(z.MinioEnv, key)
}
}
for zb0003 > 0 {
var za0002 string
var za0003 string
zb0003--
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "MinioEnv")
return
}
za0003, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "MinioEnv", za0002)
return
}
z.MinioEnv[za0002] = za0003
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *ServerSystemConfig) Msgsize() (s int) {
s = 1 + 11 + msgp.IntSize + 9 + msgp.ArrayHeaderSize
for za0001 := range z.CmdLines {
s += msgp.StringPrefixSize + len(z.CmdLines[za0001])
}
s += 9 + msgp.MapHeaderSize
if z.MinioEnv != nil {
for za0002, za0003 := range z.MinioEnv {
_ = za0003
s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003)
}
}
return
}

View File

@ -0,0 +1,123 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalServerSystemConfig(t *testing.T) {
v := ServerSystemConfig{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgServerSystemConfig(b *testing.B) {
v := ServerSystemConfig{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgServerSystemConfig(b *testing.B) {
v := ServerSystemConfig{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalServerSystemConfig(b *testing.B) {
v := ServerSystemConfig{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeServerSystemConfig(t *testing.T) {
v := ServerSystemConfig{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeServerSystemConfig Msgsize() is inaccurate")
}
vn := ServerSystemConfig{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeServerSystemConfig(b *testing.B) {
v := ServerSystemConfig{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeServerSystemConfig(b *testing.B) {
v := ServerSystemConfig{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -386,6 +386,31 @@ func (l EndpointServerPools) NEndpoints() (count int) {
return count
}
// GridHosts will return all peers, including local.
// in websocket grid compatible format, The local peer
// is returned as a separate string.
func (l EndpointServerPools) GridHosts() (gridHosts []string, gridLocal string) {
seenHosts := set.NewStringSet()
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
u := endpoint.GridHost()
if seenHosts.Contains(u) {
continue
}
seenHosts.Add(u)
// Set local endpoint
if endpoint.IsLocal {
gridLocal = u
}
gridHosts = append(gridHosts, u)
}
}
return gridHosts, gridLocal
}
// Hostnames - returns list of unique hostnames
func (l EndpointServerPools) Hostnames() []string {
foundSet := set.NewStringSet()
@ -435,7 +460,9 @@ func (l EndpointServerPools) peers() (peers []string, local string) {
peer := endpoint.Host
if endpoint.IsLocal {
if _, port := mustSplitHostPort(peer); port == globalMinioPort {
local = peer
if local == "" {
local = peer
}
}
}

View File

@ -118,8 +118,10 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
return nil, fmt.Errorf("parity validation returned an error: %w <- (%d, %d), for pool(%s)", err, commonParityDrives, ep.DrivesPerSet, humanize.Ordinal(i+1))
}
storageDisks[i], formats[i], err = waitForFormatErasure(local, ep.Endpoints, i+1,
ep.SetCount, ep.DrivesPerSet, deploymentID, distributionAlgo)
bootstrapTrace("waitForFormatErasure: loading disks", func() {
storageDisks[i], formats[i], err = waitForFormatErasure(local, ep.Endpoints, i+1,
ep.SetCount, ep.DrivesPerSet, deploymentID, distributionAlgo)
})
if err != nil {
return nil, err
}
@ -138,7 +140,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
return nil, fmt.Errorf("all pools must have same deployment ID - expected %s, got %s for pool(%s)", deploymentID, formats[i].ID, humanize.Ordinal(i+1))
}
z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i)
bootstrapTrace(fmt.Sprintf("newErasureSets: initializing %s pool", humanize.Ordinal(i+1)), func() {
z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i)
})
if err != nil {
return nil, err
}
@ -176,8 +180,12 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
setObjectLayer(z)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
attempt := 1
for {
err := z.Init(ctx) // Initializes all pools.
var err error
bootstrapTrace(fmt.Sprintf("poolMeta.Init: loading pool metadata, attempt: %d", attempt), func() {
err = z.Init(ctx) // Initializes all pools.
})
if err != nil {
if !configRetriableErrors(err) {
logger.Fatal(err, "Unable to initialize backend")
@ -185,6 +193,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
retry := time.Duration(r.Float64() * float64(5*time.Second))
logger.LogIf(ctx, fmt.Errorf("Unable to initialize backend: %w, retrying in %s", err, retry))
time.Sleep(retry)
attempt++
continue
}
break

View File

@ -384,19 +384,27 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
}
}
var wg sync.WaitGroup
var lk sync.Mutex
for i := 0; i < setCount; i++ {
lockerEpSet := set.NewStringSet()
for j := 0; j < setDriveCount; j++ {
endpoint := endpoints.Endpoints[i*setDriveCount+j]
// Only add lockers only one per endpoint and per erasure set.
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host)
s.erasureLockers[i] = append(s.erasureLockers[i], locker)
}
wg.Add(1)
go func(i int, endpoint Endpoint) {
defer wg.Done()
lk.Lock()
// Only add lockers only one per endpoint and per erasure set.
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host)
s.erasureLockers[i] = append(s.erasureLockers[i], locker)
}
lk.Unlock()
}(i, endpoints.Endpoints[i*setDriveCount+j])
}
}
wg.Wait()
var wg sync.WaitGroup
for i := 0; i < setCount; i++ {
wg.Add(1)
go func(i int) {
@ -1021,48 +1029,6 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs
return beforeDrives
}
func getHealDiskInfos(storageDisks []StorageAPI, errs []error) ([]DiskInfo, []error) {
infos := make([]DiskInfo, len(storageDisks))
g := errgroup.WithNErrs(len(storageDisks))
for index := range storageDisks {
index := index
g.Go(func() error {
if errs[index] != nil && errs[index] != errUnformattedDisk {
return errs[index]
}
if storageDisks[index] == nil {
return errDiskNotFound
}
var err error
infos[index], err = storageDisks[index].DiskInfo(context.TODO(), false)
return err
}, index)
}
return infos, g.Wait()
}
// Mark root disks as down so as not to heal them.
func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
if globalIsCICD {
// Do nothing
return
}
infos, ierrs := getHealDiskInfos(storageDisks, errs)
for i := range storageDisks {
if ierrs[i] != nil && ierrs[i] != errUnformattedDisk {
storageDisks[i] = nil
continue
}
if storageDisks[i] != nil && infos[i].RootDisk {
// We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a
// defective drive we should not heal a path on the root disk.
logger.LogIf(GlobalContext, fmt.Errorf("Drive `%s` is part of root drive, will not be used", storageDisks[i]))
storageDisks[i].Close()
storageDisks[i] = nil
}
}
}
// HealFormat - heals missing `format.json` on fresh unformatted disks.
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{
@ -1081,9 +1047,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
return madmin.HealResultItem{}, err
}
// Mark all root disks down
markRootDisksAsDown(storageDisks, sErrs)
refFormat, err := getFormatErasureInQuorum(formats)
if err != nil {
return res, err

View File

@ -822,9 +822,6 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount,
}
}
// Mark all root disks down
markRootDisksAsDown(storageDisks, sErrs)
// Save formats `format.json` across all disks.
if err := saveFormatErasureAll(ctx, storageDisks, formats); err != nil {
return nil, err

View File

@ -22,7 +22,6 @@ import (
"crypto/tls"
"sync/atomic"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/fips"
"github.com/minio/minio/internal/grid"
xhttp "github.com/minio/minio/internal/http"
@ -36,28 +35,11 @@ var globalGrid atomic.Pointer[grid.Manager]
var globalGridStart = make(chan struct{})
func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
seenHosts := set.NewStringSet()
var hosts []string
var local string
for _, ep := range eps {
for _, endpoint := range ep.Endpoints {
u := endpoint.GridHost()
if seenHosts.Contains(u) {
continue
}
seenHosts.Add(u)
// Set local endpoint
if endpoint.IsLocal {
local = u
}
hosts = append(hosts, u)
}
}
lookupHost := globalDNSCache.LookupHost
if IsKubernetes() || IsDocker() {
lookupHost = nil
}
hosts, local := eps.GridHosts()
g, err := grid.NewManager(ctx, grid.ManagerOptions{
Dialer: grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions))),
Local: local,

View File

@ -133,8 +133,8 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
}
} else {
apiRequestsMaxPerNode = cfg.RequestsMax
if len(globalEndpoints.Hostnames()) > 0 {
apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames())
if n := totalNodeCount(); n > 0 {
apiRequestsMaxPerNode /= n
}
}

View File

@ -231,24 +231,6 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
return storageDisks, format, nil
}
// Mark all root disks down
markRootDisksAsDown(storageDisks, sErrs)
// Following function is added to fix a regressions which was introduced
// in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3.
// This migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below function is called to handle and fix
// this regression, for more info refer https://github.com/minio/minio/issues/5667
if err = fixFormatErasureV3(storageDisks, endpoints, formatConfigs); err != nil {
logger.LogIf(GlobalContext, err)
return nil, nil, err
}
// If any of the .This field is still empty, we return error.
if formatErasureV3ThisEmpty(formatConfigs) {
return nil, nil, errErasureV3ThisEmpty
}
format, err = getFormatErasureInQuorum(formatConfigs)
if err != nil {
logger.LogIf(GlobalContext, err)

View File

@ -36,7 +36,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint
registerPeerS3Handlers(router)
// Register bootstrap REST router for distributed setups.
registerBootstrapRESTHandlers(router)
registerBootstrapRESTHandlers(globalGrid.Load())
// Register distributed namespace lock routers.
registerLockRESTHandlers()

View File

@ -381,7 +381,7 @@ func initAllSubsystems(ctx context.Context) {
}
// Create the bucket bandwidth monitor
globalBucketMonitor = bandwidth.NewMonitor(ctx, totalNodeCount())
globalBucketMonitor = bandwidth.NewMonitor(ctx, uint64(totalNodeCount()))
// Create a new config system.
globalConfigSys = NewConfigSys()
@ -725,7 +725,7 @@ func serverMain(ctx *cli.Context) {
getCert = globalTLSCerts.GetCertificate
}
// Initialize grid
// Initialize gridn
bootstrapTrace("initGrid", func() {
logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services")
})
@ -767,7 +767,7 @@ func serverMain(ctx *cli.Context) {
if globalIsDistErasure {
bootstrapTrace("verifying system configuration", func() {
// Additionally in distributed setup, validate the setup and configuration.
if err := verifyServerSystemConfig(GlobalContext, globalEndpoints); err != nil {
if err := verifyServerSystemConfig(GlobalContext, globalEndpoints, globalGrid.Load()); err != nil {
logger.Fatal(err, "Unable to start the server")
}
})

View File

@ -54,8 +54,12 @@ var errDiskNotDir = StorageErr("drive is not directory or mountpoint")
// errDiskNotFound - cannot find the underlying configured disk anymore.
var errDiskNotFound = StorageErr("drive not found")
// errDiskOngoingReq - indicates if the disk has an on-going request in progress.
var errDiskOngoingReq = StorageErr("drive still did not complete the request")
// errDriveIsRoot - cannot use the disk since its a root disk.
var errDriveIsRoot = StorageErr("drive is part of root drive, will not be used")
// errFaultyRemoteDisk - remote disk is faulty.
var errFaultyRemoteDisk = StorageErr("remote drive is faulty")

View File

@ -1036,9 +1036,8 @@ func isDirObject(object string) bool {
}
// Helper method to return total number of nodes in cluster
func totalNodeCount() uint64 {
peers, _ := globalEndpoints.peers()
totalNodesCount := uint64(len(peers))
func totalNodeCount() int {
totalNodesCount := len(globalEndpoints.Hostnames())
if totalNodesCount == 0 {
totalNodesCount = 1 // For standalone erasure coding
}

View File

@ -101,7 +101,6 @@ type xlStorage struct {
globalSync bool
oDirect bool // indicates if this disk supports ODirect
rootDisk bool
diskID string
@ -240,26 +239,28 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
return nil, err
}
var rootDisk bool
if !globalIsCICD && !globalIsErasureSD {
var rootDrive bool
if globalRootDiskThreshold > 0 {
// Use MINIO_ROOTDISK_THRESHOLD_SIZE to figure out if
// this disk is a root disk. treat those disks with
// size less than or equal to the threshold as rootDisks.
rootDisk = info.Total <= globalRootDiskThreshold
// size less than or equal to the threshold as rootDrives.
rootDrive = info.Total <= globalRootDiskThreshold
} else {
rootDisk, err = disk.IsRootDisk(path, SlashSeparator)
rootDrive, err = disk.IsRootDisk(path, SlashSeparator)
if err != nil {
return nil, err
}
}
if rootDrive {
return nil, errDriveIsRoot
}
}
s = &xlStorage{
drivePath: path,
endpoint: ep,
globalSync: globalFSOSync,
rootDisk: rootDisk,
poolIndex: -1,
setIndex: -1,
diskIndex: -1,
@ -761,7 +762,6 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
info = v.(DiskInfo)
}
info.RootDisk = s.rootDisk
info.MountPath = s.drivePath
info.Endpoint = s.endpoint.String()
info.Scanning = atomic.LoadInt32(&s.scanning) == 1

View File

@ -57,6 +57,7 @@ const (
HandlerCheckParts
HandlerRenameData
HandlerServerVerify
// Add more above here ^^^
// If all handlers are used, the type of Handler can be changed.
// Handlers have no versioning, so non-compatible handler changes must result in new IDs.
@ -86,11 +87,13 @@ var handlerPrefixes = [handlerLast]string{
HandlerWriteMetadata: storagePrefix,
HandlerCheckParts: storagePrefix,
HandlerRenameData: storagePrefix,
HandlerServerVerify: bootstrapPrefix,
}
const (
lockPrefix = "lockR"
storagePrefix = "storageR"
lockPrefix = "lockR"
storagePrefix = "storageR"
bootstrapPrefix = "bootstrap"
)
func init() {

View File

@ -27,14 +27,15 @@ func _() {
_ = x[HandlerWriteMetadata-16]
_ = x[HandlerCheckParts-17]
_ = x[HandlerRenameData-18]
_ = x[handlerTest-19]
_ = x[handlerTest2-20]
_ = x[handlerLast-21]
_ = x[HandlerServerVerify-19]
_ = x[handlerTest-20]
_ = x[handlerTest2-21]
_ = x[handlerLast-22]
}
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDatahandlerTesthandlerTest2handlerLast"
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataServerVerifyhandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint8{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 207, 219, 230}
var _HandlerID_index = [...]uint8{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 208, 219, 231, 242}
func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) {