diff --git a/cmd/endpoint-ellipses.go b/cmd/endpoint-ellipses.go index ae5350faa..417efe1b4 100644 --- a/cmd/endpoint-ellipses.go +++ b/cmd/endpoint-ellipses.go @@ -359,6 +359,9 @@ func createServerEndpoints(serverAddr string, args ...string) ( if err != nil { return nil, -1, err } + for i := range endpointList { + endpointList[i].SetPool(0) + } endpointServerPools = append(endpointServerPools, PoolEndpoints{ Legacy: true, SetCount: len(setArgs), diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 7b886d4a4..091777c2c 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio/internal/mountinfo" "github.com/minio/pkg/env" xnet "github.com/minio/pkg/net" + "golang.org/x/exp/slices" ) // EndpointType - enum for endpoint type. @@ -58,9 +59,17 @@ type ProxyEndpoint struct { Transport http.RoundTripper } +// Node holds information about a node in this cluster +type Node struct { + *url.URL + Pools []int + IsLocal bool +} + // Endpoint - any type of endpoint. type Endpoint struct { *url.URL + Pool int IsLocal bool } @@ -97,6 +106,11 @@ func (endpoint *Endpoint) UpdateIsLocal() (err error) { return nil } +// SetPool sets a specific pool number to this node +func (endpoint *Endpoint) SetPool(i int) { + endpoint.Pool = i +} + // NewEndpoint - returns new endpoint based on given arguments. func NewEndpoint(arg string) (ep Endpoint, e error) { // isEmptyPath - check whether given path is not empty. @@ -209,6 +223,35 @@ type PoolEndpoints struct { // EndpointServerPools - list of list of endpoints type EndpointServerPools []PoolEndpoints +// GetNodes returns a sorted list of nodes in this cluster +func (l EndpointServerPools) GetNodes() (nodes []Node) { + nodesMap := make(map[string]Node) + for _, pool := range l { + for _, ep := range pool.Endpoints { + node, ok := nodesMap[ep.Host] + if !ok { + node.IsLocal = ep.IsLocal + node.URL = &url.URL{ + Scheme: ep.Scheme, + Host: ep.Host, + } + } + if !slices.Contains(node.Pools, ep.Pool) { + node.Pools = append(node.Pools, ep.Pool) + } + nodesMap[ep.Host] = node + } + } + nodes = make([]Node, 0, len(nodesMap)) + for _, v := range nodesMap { + nodes = append(nodes, v) + } + sort.Slice(nodes, func(i, j int) bool { + return nodes[i].Host < nodes[j].Host + }) + return +} + // GetPoolIdx return pool index func (l EndpointServerPools) GetPoolIdx(pool string) int { for id, ep := range globalEndpoints { @@ -768,6 +811,8 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints return nil, setupType, config.ErrInvalidEndpoint(nil).Msg("use path style endpoint for single node setup") } + endpoint.SetPool(0) + var endpoints Endpoints endpoints = append(endpoints, endpoint) setupType = ErasureSDSetupType @@ -781,7 +826,7 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints return poolEndpoints, setupType, nil } - for i, args := range poolArgs { + for poolIdx, args := range poolArgs { var endpoints Endpoints for _, iargs := range args { // Convert args to endpoints @@ -795,6 +840,10 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) } + for i := range eps { + eps[i].SetPool(poolIdx) + } + endpoints = append(endpoints, eps...) } @@ -802,7 +851,7 @@ func CreatePoolEndpoints(serverAddr string, poolArgs ...[][]string) ([]Endpoints return nil, setupType, config.ErrInvalidErasureEndpoints(nil).Msg("invalid number of endpoints") } - poolEndpoints[i] = endpoints + poolEndpoints[poolIdx] = endpoints } for _, endpoints := range poolEndpoints { @@ -960,7 +1009,7 @@ func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType, _, serverAddrPort := mustSplitHostPort(serverAddr) - // For single arg, return FS setup. + // For single arg, return single drive setup. if len(args) == 1 && len(args[0]) == 1 { var endpoint Endpoint endpoint, err = NewEndpoint(args[0][0]) diff --git a/cmd/globals.go b/cmd/globals.go index 55edd12eb..fff57c512 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -224,7 +224,10 @@ var ( // registered listeners globalConsoleSys *HTTPConsoleLoggerSys + // All unique drives for this deployment globalEndpoints EndpointServerPools + // All unique nodes for this deployment + globalNodes []Node // The name of this local node, fetched from arguments globalLocalNodeName string @@ -237,8 +240,6 @@ var ( // The global callhome config globalCallhomeConfig callhome.Config - globalRemoteEndpoints map[string]Endpoint - // Global server's network statistics globalConnStats = newConnStats() diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index cca0c1532..cb8380ab0 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -21,37 +21,81 @@ import ( "context" "encoding/gob" "errors" - "fmt" "io" "net/url" "strconv" xhttp "github.com/minio/minio/internal/http" - "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" - xnet "github.com/minio/pkg/net" "github.com/minio/pkg/sync/errgroup" + "golang.org/x/exp/slices" ) var errPeerOffline = errors.New("peer is offline") +type peerS3Client interface { + ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) + GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) + MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error + DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error + + GetHost() string + SetPools([]int) + GetPools() []int +} + +type localPeerS3Client struct { + host string + pools []int +} + +func (l *localPeerS3Client) GetHost() string { + return l.host +} + +func (l *localPeerS3Client) SetPools(p []int) { + l.pools = make([]int, len(p)) + copy(l.pools, p) +} + +func (l localPeerS3Client) GetPools() []int { + return l.pools +} + +func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) { + return listBucketsLocal(ctx, opts) +} + +func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { + return getBucketInfoLocal(ctx, bucket, opts) +} + +func (l localPeerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { + return makeBucketLocal(ctx, bucket, opts) +} + +func (l localPeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { + return deleteBucketLocal(ctx, bucket, opts) +} + // client to talk to peer Nodes. -type peerS3Client struct { - host *xnet.Host +type remotePeerS3Client struct { + host string + pools []int restClient *rest.Client } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json -func (client *peerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { +func (client *remotePeerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { return client.callWithContext(GlobalContext, method, values, body, length) } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json -func (client *peerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { +func (client *remotePeerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { if values == nil { values = make(url.Values) } @@ -67,16 +111,15 @@ func (client *peerS3Client) callWithContext(ctx context.Context, method string, // S3PeerSys - S3 peer call system. type S3PeerSys struct { - peerClients []*peerS3Client // Excludes self - allPeerClients []*peerS3Client // Includes nil client for self + peerClients []peerS3Client // Excludes self + poolsCount int } // NewS3PeerSys - creates new S3 peer calls. func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys { - remote, all := newPeerS3Clients(endpoints) return &S3PeerSys{ - peerClients: remote, - allPeerClients: all, + peerClients: newPeerS3Clients(endpoints.GetNodes()), + poolsCount: len(endpoints), } } @@ -84,14 +127,8 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys { func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) (result []BucketInfo, err error) { g := errgroup.WithNErrs(len(sys.peerClients)) - localBuckets, err := listBucketsLocal(ctx, opts) - if err != nil { - return nil, err - } - - nodeBuckets := make([][]BucketInfo, len(sys.peerClients)+1) + nodeBuckets := make([][]BucketInfo, len(sys.peerClients)) errs := []error{nil} - nodeBuckets[0] = localBuckets for idx, client := range sys.peerClients { idx := idx @@ -104,14 +141,14 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) (resu if err != nil { return err } - nodeBuckets[idx+1] = localBuckets + nodeBuckets[idx] = localBuckets return nil }, idx) } errs = append(errs, g.Wait()...) - quorum := (len(sys.allPeerClients) / 2) + quorum := len(sys.peerClients)/2 + 1 if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil { return nil, err } @@ -156,11 +193,7 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc errs := g.Wait() - bucketInfo, err := getBucketInfoLocal(ctx, bucket, opts) - errs = append(errs, err) - bucketInfos = append(bucketInfos, bucketInfo) - - quorum := (len(sys.allPeerClients) / 2) + quorum := len(sys.peerClients)/2 + 1 if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil { return BucketInfo{}, toObjectErr(err, bucket) } @@ -174,7 +207,7 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc return BucketInfo{}, toObjectErr(errVolumeNotFound, bucket) } -func (client *peerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) { +func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) { v := url.Values{} v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted)) @@ -190,7 +223,7 @@ func (client *peerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) } // GetBucketInfo returns bucket stat info from a peer -func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { +func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { v := url.Values{} v.Set(peerS3Bucket, bucket) v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted)) @@ -209,7 +242,6 @@ func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, op // MakeBucket creates bucket across all peers func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { g := errgroup.WithNErrs(len(sys.peerClients)) - for idx, client := range sys.peerClients { client := client g.Go(func() error { @@ -219,30 +251,24 @@ func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBu return client.MakeBucket(ctx, bucket, opts) }, idx) } - errs := g.Wait() - errs = append(errs, makeBucketLocal(ctx, bucket, opts)) - quorum := (len(sys.allPeerClients) / 2) + 1 - err := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum) - - // Perform MRF on missing buckets for temporary errors. - for _, err := range errs { - if err == nil { - continue + for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ { + perPoolErrs := make([]error, 0, len(sys.peerClients)) + for i, client := range sys.peerClients { + if slices.Contains(client.GetPools(), poolIdx) { + perPoolErrs = append(perPoolErrs, errs[i]) + } } - if errors.Is(err, errPeerOffline) || errors.Is(err, errDiskNotFound) || - isNetworkError(err) { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - }) + if poolErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil { + return toObjectErr(poolErr, bucket) } } - return toObjectErr(err, bucket) + return nil } // MakeBucket creates a bucket on a peer -func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { +func (client *remotePeerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { v := url.Values{} v.Set(peerS3Bucket, bucket) v.Set(peerS3BucketForceCreate, strconv.FormatBool(opts.ForceCreate)) @@ -259,7 +285,6 @@ func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts // DeleteBucket deletes bucket across all peers func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { g := errgroup.WithNErrs(len(sys.peerClients)) - for idx, client := range sys.peerClients { client := client g.Go(func() error { @@ -269,32 +294,26 @@ func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts Dele return client.DeleteBucket(ctx, bucket, opts) }, idx) } - errs := g.Wait() - errs = append(errs, deleteBucketLocal(ctx, bucket, opts)) - var errReturn error - for _, err := range errs { - if errReturn == nil && err != nil { - // always return first error - errReturn = toObjectErr(err, bucket) - break + for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ { + perPoolErrs := make([]error, 0, len(sys.peerClients)) + for i, client := range sys.peerClients { + if slices.Contains(client.GetPools(), poolIdx) { + perPoolErrs = append(perPoolErrs, errs[i]) + } } - } - - for _, err := range errs { - if err == nil && errReturn != nil { + if poolErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil { // re-create successful deletes, since we are return an error. sys.MakeBucket(ctx, bucket, MakeBucketOptions{}) - break + return toObjectErr(poolErr, bucket) } } - - return errReturn + return nil } // DeleteBucket deletes bucket on a peer -func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { +func (client *remotePeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { v := url.Values{} v.Set(peerS3Bucket, bucket) v.Set(peerS3BucketForceDelete, strconv.FormatBool(opts.Force)) @@ -308,33 +327,35 @@ func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opt return nil } +func (client remotePeerS3Client) GetHost() string { + return client.host +} + +func (client remotePeerS3Client) GetPools() []int { + return client.pools +} + +func (client *remotePeerS3Client) SetPools(p []int) { + client.pools = make([]int, len(p)) + copy(client.pools, p) +} + // newPeerS3Clients creates new peer clients. -// The two slices will point to the same clients, -// but 'all' will contain nil entry for local client. -// The 'all' slice will be in the same order across the cluster. -func newPeerS3Clients(endpoints EndpointServerPools) (remote, all []*peerS3Client) { - if !globalIsDistErasure { - // Only useful in distributed setups - return nil, nil - } - hosts := endpoints.hostsSorted() - remote = make([]*peerS3Client, 0, len(hosts)) - all = make([]*peerS3Client, len(hosts)) - for i, host := range hosts { - if host == nil { - continue +func newPeerS3Clients(nodes []Node) (peers []peerS3Client) { + peers = make([]peerS3Client, len(nodes)) + for i, node := range nodes { + if node.IsLocal { + peers[i] = &localPeerS3Client{host: node.Host} + } else { + peers[i] = newPeerS3Client(node.Host) } - all[i] = newPeerS3Client(host) - remote = append(remote, all[i]) + peers[i].SetPools(node.Pools) } - if len(all) != len(remote)+1 { - logger.LogIf(context.Background(), fmt.Errorf("WARNING: Expected number of all hosts (%v) to be remote +1 (%v)", len(all), len(remote))) - } - return remote, all + return } // Returns a peer S3 client. -func newPeerS3Client(peer *xnet.Host) *peerS3Client { +func newPeerS3Client(peer string) peerS3Client { scheme := "http" if globalIsTLS { scheme = "https" @@ -342,7 +363,7 @@ func newPeerS3Client(peer *xnet.Host) *peerS3Client { serverURL := &url.URL{ Scheme: scheme, - Host: peer.String(), + Host: peer, Path: peerS3Path, } @@ -360,5 +381,5 @@ func newPeerS3Client(peer *xnet.Host) *peerS3Client { return !isNetworkError(err) } - return &peerS3Client{host: peer, restClient: restClient} + return &remotePeerS3Client{host: peer, restClient: restClient} } diff --git a/cmd/server-main.go b/cmd/server-main.go index 6c83c1f41..b8fdd0c73 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -246,23 +246,19 @@ func serverHandleCmdArgs(ctx *cli.Context) { globalEndpoints, setupType, err = createServerEndpoints(globalMinioAddr, serverCmdArgs(ctx)...) logger.FatalIf(err, "Invalid command line arguments") + globalNodes = globalEndpoints.GetNodes() globalLocalNodeName = GetLocalPeer(globalEndpoints, globalMinioHost, globalMinioPort) nodeNameSum := sha256.Sum256([]byte(globalLocalNodeName)) globalLocalNodeNameHex = hex.EncodeToString(nodeNameSum[:]) globalNodeNamesHex = make(map[string]struct{}) - globalRemoteEndpoints = make(map[string]Endpoint) - for _, z := range globalEndpoints { - for _, ep := range z.Endpoints { - if ep.IsLocal { - globalRemoteEndpoints[globalLocalNodeName] = ep - } else { - globalRemoteEndpoints[ep.Host] = ep - } - nodeNameSum := sha256.Sum256([]byte(ep.Host)) - globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{} - + for _, n := range globalNodes { + nodeName := n.Host + if n.IsLocal { + nodeName = globalLocalNodeName } + nodeNameSum := sha256.Sum256([]byte(nodeName)) + globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{} } // allow transport to be HTTP/1.1 for proxying.