move Make,Delete,Head,Heal bucket calls to websockets (#18951)

This commit is contained in:
Harshavardhana 2024-02-02 14:54:54 -08:00 committed by GitHub
parent 99fde2ba85
commit ff80cfd83d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 256 additions and 151 deletions

View File

@ -62,8 +62,9 @@ type ProxyEndpoint struct {
// Node holds information about a node in this cluster
type Node struct {
*url.URL
Pools []int
IsLocal bool
Pools []int
IsLocal bool
GridHost string
}
// Endpoint - any type of endpoint.
@ -254,6 +255,7 @@ func (l EndpointServerPools) GetNodes() (nodes []Node) {
Scheme: ep.Scheme,
Host: ep.Host,
}
node.GridHost = ep.GridHost()
}
if !slices.Contains(node.Pools, ep.PoolIdx) {
node.Pools = append(node.Pools, ep.PoolIdx)
@ -411,6 +413,47 @@ func (l EndpointServerPools) GridHosts() (gridHosts []string, gridLocal string)
return gridHosts, gridLocal
}
// FindGridHostsFromPeerPool will return a matching peerPool from provided peer (as string)
func (l EndpointServerPools) FindGridHostsFromPeerPool(peer string) []int {
if peer == "" {
return nil
}
var pools []int
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
if endpoint.IsLocal {
continue
}
if !slices.Contains(pools, endpoint.PoolIdx) {
pools = append(pools, endpoint.PoolIdx)
}
}
}
return pools
}
// FindGridHostsFromPeerStr will return a matching peer from provided peer (as string)
func (l EndpointServerPools) FindGridHostsFromPeerStr(peer string) (peerGrid string) {
if peer == "" {
return ""
}
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
if endpoint.IsLocal {
continue
}
if endpoint.Host == peer {
return endpoint.GridHost()
}
}
}
return ""
}
// FindGridHostsFromPeer will return a matching peer from provided peer.
func (l EndpointServerPools) FindGridHostsFromPeer(peer *xnet.Host) (peerGrid string) {
if peer == nil {

View File

@ -1421,6 +1421,89 @@ func (s *peerRESTServer) NetSpeedTestHandler(w http.ResponseWriter, r *http.Requ
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result))
}
var healBucketHandler = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerHealBucket, grid.NewMSS, grid.NewNoPayload)
func (s *peerRESTServer) HealBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
bucket := mss.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
return np, grid.NewRemoteErr(errInvalidArgument)
}
bucketDeleted := mss.Get(peerS3BucketDeleted) == "true"
_, err := healBucketLocal(context.Background(), bucket, madmin.HealOpts{
Remove: bucketDeleted,
})
if err != nil {
return np, grid.NewRemoteErr(err)
}
return np, nil
}
var headBucketHandler = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerHeadBucket, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
// HeadBucketHandler implements peer BuckeInfo call, returns bucket create date.
func (s *peerRESTServer) HeadBucketHandler(mss *grid.MSS) (info *VolInfo, nerr *grid.RemoteErr) {
bucket := mss.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
return info, grid.NewRemoteErr(errInvalidArgument)
}
bucketDeleted := mss.Get(peerS3BucketDeleted) == "true"
bucketInfo, err := getBucketInfoLocal(context.Background(), bucket, BucketOptions{
Deleted: bucketDeleted,
})
if err != nil {
return info, grid.NewRemoteErr(err)
}
return &VolInfo{
Name: bucketInfo.Name,
Created: bucketInfo.Created,
}, nil
}
var deleteBucketHandler = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerDeleteBucket, grid.NewMSS, grid.NewNoPayload)
// DeleteBucketHandler implements peer delete bucket call.
func (s *peerRESTServer) DeleteBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
bucket := mss.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
return np, grid.NewRemoteErr(errInvalidArgument)
}
forceDelete := mss.Get(peerS3BucketForceDelete) == "true"
err := deleteBucketLocal(context.Background(), bucket, DeleteBucketOptions{
Force: forceDelete,
})
if err != nil {
return np, grid.NewRemoteErr(err)
}
return np, nil
}
var makeBucketHandler = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerMakeBucket, grid.NewMSS, grid.NewNoPayload)
// MakeBucketHandler implements peer create bucket call.
func (s *peerRESTServer) MakeBucketHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) {
bucket := mss.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
return np, grid.NewRemoteErr(errInvalidArgument)
}
forceCreate := mss.Get(peerS3BucketForceCreate) == "true"
err := makeBucketLocal(context.Background(), bucket, MakeBucketOptions{
ForceCreate: forceCreate,
})
if err != nil {
return np, grid.NewRemoteErr(err)
}
return np, nil
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) {
h := func(f http.HandlerFunc) http.HandlerFunc {
@ -1467,6 +1550,11 @@ func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(h(server.DevNull))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(h(server.GetLastDayTierStatsHandler))
logger.FatalIf(makeBucketHandler.Register(gm, server.MakeBucketHandler), "unable to register handler")
logger.FatalIf(deleteBucketHandler.Register(gm, server.DeleteBucketHandler), "unable to register handler")
logger.FatalIf(headBucketHandler.Register(gm, server.HeadBucketHandler), "unable to register handler")
logger.FatalIf(healBucketHandler.Register(gm, server.HealBucketHandler), "unable to register handler")
logger.FatalIf(deletePolicyHandler.Register(gm, server.DeletePolicyHandler), "unable to register handler")
logger.FatalIf(loadPolicyHandler.Register(gm, server.LoadPolicyHandler), "unable to register handler")
logger.FatalIf(loadPolicyMappingHandler.Register(gm, server.LoadPolicyMappingHandler), "unable to register handler")

View File

@ -21,14 +21,18 @@ import (
"context"
"encoding/gob"
"errors"
"fmt"
"io"
"net/url"
"sort"
"strconv"
"sync/atomic"
"time"
"github.com/minio/madmin-go/v3"
grid "github.com/minio/minio/internal/grid"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/v2/sync/errgroup"
"golang.org/x/exp/slices"
@ -49,12 +53,12 @@ type peerS3Client interface {
}
type localPeerS3Client struct {
host string
node Node
pools []int
}
func (l *localPeerS3Client) GetHost() string {
return l.host
return l.node.Host
}
func (l *localPeerS3Client) SetPools(p []int) {
@ -88,9 +92,13 @@ func (l localPeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts
// client to talk to peer Nodes.
type remotePeerS3Client struct {
host string
node Node
pools []int
restClient *rest.Client
// Function that returns the grid connection for this peer when initialized.
// Will return nil if the grid connection is not initialized yet.
gridConn func() *grid.Connection
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
@ -126,7 +134,7 @@ type S3PeerSys struct {
// NewS3PeerSys - creates new S3 peer calls.
func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
return &S3PeerSys{
peerClients: newPeerS3Clients(endpoints.GetNodes()),
peerClients: newPeerS3Clients(endpoints),
poolsCount: len(endpoints),
}
}
@ -358,37 +366,47 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp
}
func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove))
respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1)
if err != nil {
return madmin.HealResultItem{}, err
conn := client.gridConn()
if conn == nil {
return madmin.HealResultItem{}, nil
}
defer xhttp.DrainBody(respBody)
var res madmin.HealResultItem
err = gob.NewDecoder(respBody).Decode(&res)
mss := grid.NewMSSWith(map[string]string{
peerS3Bucket: bucket,
peerS3BucketDeleted: strconv.FormatBool(opts.Remove),
})
return res, err
_, err := healBucketHandler.Call(ctx, conn, mss)
// Initialize heal result info
return madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
SetCount: -1, // explicitly set an invalid value -1, for bucket heal scenario
}, toStorageErr(err)
}
// GetBucketInfo returns bucket stat info from a peer
func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted))
conn := client.gridConn()
if conn == nil {
return BucketInfo{}, nil
}
respBody, err := client.call(peerS3MethodGetBucketInfo, v, nil, -1)
mss := grid.NewMSSWith(map[string]string{
peerS3Bucket: bucket,
peerS3BucketDeleted: strconv.FormatBool(opts.Deleted),
})
volInfo, err := headBucketHandler.Call(ctx, conn, mss)
if err != nil {
return BucketInfo{}, err
}
defer xhttp.DrainBody(respBody)
var bucketInfo BucketInfo
err = gob.NewDecoder(respBody).Decode(&bucketInfo)
return bucketInfo, err
return BucketInfo{
Name: volInfo.Name,
Created: volInfo.Created,
}, nil
}
// MakeBucket creates bucket across all peers
@ -421,17 +439,18 @@ func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBu
// MakeBucket creates a bucket on a peer
func (client *remotePeerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketForceCreate, strconv.FormatBool(opts.ForceCreate))
respBody, err := client.call(peerS3MethodMakeBucket, v, nil, -1)
if err != nil {
return err
conn := client.gridConn()
if conn == nil {
return nil
}
defer xhttp.DrainBody(respBody)
return nil
mss := grid.NewMSSWith(map[string]string{
peerS3Bucket: bucket,
peerS3BucketForceCreate: strconv.FormatBool(opts.ForceCreate),
})
_, err := makeBucketHandler.Call(ctx, conn, mss)
return toStorageErr(err)
}
// DeleteBucket deletes bucket across all peers
@ -466,21 +485,22 @@ func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts Dele
// DeleteBucket deletes bucket on a peer
func (client *remotePeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketForceDelete, strconv.FormatBool(opts.Force))
respBody, err := client.call(peerS3MethodDeleteBucket, v, nil, -1)
if err != nil {
return err
conn := client.gridConn()
if conn == nil {
return nil
}
defer xhttp.DrainBody(respBody)
return nil
mss := grid.NewMSSWith(map[string]string{
peerS3Bucket: bucket,
peerS3BucketForceDelete: strconv.FormatBool(opts.Force),
})
_, err := deleteBucketHandler.Call(ctx, conn, mss)
return toStorageErr(err)
}
func (client remotePeerS3Client) GetHost() string {
return client.host
return client.node.Host
}
func (client remotePeerS3Client) GetPools() []int {
@ -493,21 +513,23 @@ func (client *remotePeerS3Client) SetPools(p []int) {
}
// newPeerS3Clients creates new peer clients.
func newPeerS3Clients(nodes []Node) (peers []peerS3Client) {
func newPeerS3Clients(endpoints EndpointServerPools) (peers []peerS3Client) {
nodes := endpoints.GetNodes()
peers = make([]peerS3Client, len(nodes))
for i, node := range nodes {
if node.IsLocal {
peers[i] = &localPeerS3Client{host: node.Host}
peers[i] = &localPeerS3Client{node: node}
} else {
peers[i] = newPeerS3Client(node.Host)
peers[i] = newPeerS3Client(node)
}
peers[i].SetPools(node.Pools)
}
return
return peers
}
// Returns a peer S3 client.
func newPeerS3Client(peer string) peerS3Client {
func newPeerS3Client(node Node) peerS3Client {
scheme := "http"
if globalIsTLS {
scheme = "https"
@ -515,7 +537,7 @@ func newPeerS3Client(peer string) peerS3Client {
serverURL := &url.URL{
Scheme: scheme,
Host: peer,
Host: node.Host,
Path: peerS3Path,
}
@ -533,5 +555,32 @@ func newPeerS3Client(peer string) peerS3Client {
return !isNetworkError(err)
}
return &remotePeerS3Client{host: peer, restClient: restClient}
var gridConn atomic.Pointer[grid.Connection]
return &remotePeerS3Client{
node: node, restClient: restClient,
gridConn: func() *grid.Connection {
// Lazy initialization of grid connection.
// When we create this peer client, the grid connection is likely not yet initialized.
if node.GridHost == "" {
logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost is empty for peer %s", node.Host), node.Host+":gridHost")
return nil
}
gc := gridConn.Load()
if gc != nil {
return gc
}
gm := globalGrid.Load()
if gm == nil {
return nil
}
gc = gm.Connection(node.GridHost)
if gc == nil {
logger.LogOnceIf(context.Background(), fmt.Errorf("gridHost %s not found for peer %s", node.GridHost, node.Host), node.Host+":gridHost")
return nil
}
gridConn.Store(gc)
return gc
},
}
}

View File

@ -385,90 +385,6 @@ func (s *peerS3Server) ListBucketsHandler(w http.ResponseWriter, r *http.Request
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(buckets))
}
func (s *peerS3Server) HealBucketHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true"
bucket := r.Form.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
s.writeErrorResponse(w, errInvalidArgument)
return
}
res, err := healBucketLocal(r.Context(), bucket, madmin.HealOpts{
Remove: bucketDeleted,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(res))
}
// GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date.
func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
bucket := r.Form.Get(peerS3Bucket)
bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true"
bucketInfo, err := getBucketInfoLocal(r.Context(), bucket, BucketOptions{
Deleted: bucketDeleted,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(bucketInfo))
}
// DeleteBucketHandler implements peer delete bucket call.
func (s *peerS3Server) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
bucket := r.Form.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
s.writeErrorResponse(w, errInvalidArgument)
return
}
forceDelete := r.Form.Get(peerS3BucketForceDelete) == "true"
err := deleteBucketLocal(r.Context(), bucket, DeleteBucketOptions{
Force: forceDelete,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
}
// MakeBucketHandler implements peer create bucket call.
func (s *peerS3Server) MakeBucketHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
bucket := r.Form.Get(peerS3Bucket)
forceCreate := r.Form.Get(peerS3BucketForceCreate) == "true"
err := makeBucketLocal(r.Context(), bucket, MakeBucketOptions{
ForceCreate: forceCreate,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
}
// registerPeerS3Handlers - register peer s3 router.
func registerPeerS3Handlers(router *mux.Router) {
server := &peerS3Server{}
@ -479,9 +395,5 @@ func registerPeerS3Handlers(router *mux.Router) {
}
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealth).HandlerFunc(h(server.HealthHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodMakeBucket).HandlerFunc(h(server.MakeBucketHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(h(server.DeleteBucketHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(h(server.GetBucketInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodListBuckets).HandlerFunc(h(server.ListBucketsHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealBucket).HandlerFunc(h(server.HealBucketHandler))
}

View File

@ -80,6 +80,11 @@ const (
HandlerLoadUser
HandlerLoadGroup
HandlerHealBucket
HandlerMakeBucket
HandlerHeadBucket
HandlerDeleteBucket
// Add more above here ^^^
// If all handlers are used, the type of Handler can be changed.
// Handlers have no versioning, so non-compatible handler changes must result in new IDs.
@ -130,6 +135,10 @@ var handlerPrefixes = [handlerLast]string{
HandlerDeleteUser: peerPrefix,
HandlerLoadUser: peerPrefix,
HandlerLoadGroup: peerPrefix,
HandlerMakeBucket: peerPrefixS3,
HandlerHeadBucket: peerPrefixS3,
HandlerDeleteBucket: peerPrefixS3,
HandlerHealBucket: healPrefix,
}
const (
@ -137,6 +146,8 @@ const (
storagePrefix = "storageR"
bootstrapPrefix = "bootstrap"
peerPrefix = "peer"
peerPrefixS3 = "peerS3"
healPrefix = "heal"
)
func init() {
@ -472,9 +483,10 @@ func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Re
return resp, err
}
switch any(req).(type) {
case *MSS, *Bytes, *URLValues:
case *MSS, *URLValues:
ctx = context.WithValue(ctx, TraceParamsKey{}, req)
case *NoPayload:
case *NoPayload, *Bytes:
// do not need to trace nopayload and bytes payload
default:
ctx = context.WithValue(ctx, TraceParamsKey{}, fmt.Sprintf("type=%T", req))
}

View File

@ -48,14 +48,18 @@ func _() {
_ = x[HandlerDeleteUser-37]
_ = x[HandlerLoadUser-38]
_ = x[HandlerLoadGroup-39]
_ = x[handlerTest-40]
_ = x[handlerTest2-41]
_ = x[handlerLast-42]
_ = x[HandlerHealBucket-40]
_ = x[HandlerMakeBucket-41]
_ = x[HandlerHeadBucket-42]
_ = x[HandlerDeleteBucket-43]
_ = x[handlerTest-44]
_ = x[handlerTest2-45]
_ = x[handlerLast-46]
}
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenGetLocalDiskIDsDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGrouphandlerTesthandlerTest2handlerLast"
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenGetLocalDiskIDsDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBuckethandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 251, 271, 289, 316, 330, 343, 360, 384, 396, 406, 423, 443, 461, 471, 479, 488, 499, 511, 522}
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 251, 271, 289, 316, 330, 343, 360, 384, 396, 406, 423, 443, 461, 471, 479, 488, 498, 508, 518, 530, 541, 553, 564}
func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) {

View File

@ -139,11 +139,8 @@ func (c *muxClient) traceRoundtrip(ctx context.Context, t *tracer, h HandlerID,
trace.Path += m.ToQuery()
case *URLValues:
trace.Path += typed.Values().Encode()
case *NoPayload:
case *Bytes:
if typed != nil {
trace.Path = fmt.Sprintf("%s?bytes=%d", trace.Path, len(*typed))
}
case *NoPayload, *Bytes:
trace.Path = fmt.Sprintf("%s?payload=%T", trace.Path, typed)
case string:
trace.Path = fmt.Sprintf("%s?%s", trace.Path, typed)
default: