mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
s3: Make/Delete buckets to use error quorum per pool (#17467)
This commit is contained in:
@@ -21,37 +21,81 @@ import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/minio/internal/rest"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
"github.com/minio/pkg/sync/errgroup"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var errPeerOffline = errors.New("peer is offline")
|
||||
|
||||
type peerS3Client interface {
|
||||
ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error)
|
||||
GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error)
|
||||
MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error
|
||||
DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error
|
||||
|
||||
GetHost() string
|
||||
SetPools([]int)
|
||||
GetPools() []int
|
||||
}
|
||||
|
||||
type localPeerS3Client struct {
|
||||
host string
|
||||
pools []int
|
||||
}
|
||||
|
||||
func (l *localPeerS3Client) GetHost() string {
|
||||
return l.host
|
||||
}
|
||||
|
||||
func (l *localPeerS3Client) SetPools(p []int) {
|
||||
l.pools = make([]int, len(p))
|
||||
copy(l.pools, p)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) GetPools() []int {
|
||||
return l.pools
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) {
|
||||
return listBucketsLocal(ctx, opts)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
return getBucketInfoLocal(ctx, bucket, opts)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
return makeBucketLocal(ctx, bucket, opts)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||
return deleteBucketLocal(ctx, bucket, opts)
|
||||
}
|
||||
|
||||
// client to talk to peer Nodes.
|
||||
type peerS3Client struct {
|
||||
host *xnet.Host
|
||||
type remotePeerS3Client struct {
|
||||
host string
|
||||
pools []int
|
||||
restClient *rest.Client
|
||||
}
|
||||
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||
// after verifying format.json
|
||||
func (client *peerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
func (client *remotePeerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
return client.callWithContext(GlobalContext, method, values, body, length)
|
||||
}
|
||||
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||
// after verifying format.json
|
||||
func (client *peerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
func (client *remotePeerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
if values == nil {
|
||||
values = make(url.Values)
|
||||
}
|
||||
@@ -67,16 +111,15 @@ func (client *peerS3Client) callWithContext(ctx context.Context, method string,
|
||||
|
||||
// S3PeerSys - S3 peer call system.
|
||||
type S3PeerSys struct {
|
||||
peerClients []*peerS3Client // Excludes self
|
||||
allPeerClients []*peerS3Client // Includes nil client for self
|
||||
peerClients []peerS3Client // Excludes self
|
||||
poolsCount int
|
||||
}
|
||||
|
||||
// NewS3PeerSys - creates new S3 peer calls.
|
||||
func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
|
||||
remote, all := newPeerS3Clients(endpoints)
|
||||
return &S3PeerSys{
|
||||
peerClients: remote,
|
||||
allPeerClients: all,
|
||||
peerClients: newPeerS3Clients(endpoints.GetNodes()),
|
||||
poolsCount: len(endpoints),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,14 +127,8 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
|
||||
func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) (result []BucketInfo, err error) {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
|
||||
localBuckets, err := listBucketsLocal(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeBuckets := make([][]BucketInfo, len(sys.peerClients)+1)
|
||||
nodeBuckets := make([][]BucketInfo, len(sys.peerClients))
|
||||
errs := []error{nil}
|
||||
nodeBuckets[0] = localBuckets
|
||||
|
||||
for idx, client := range sys.peerClients {
|
||||
idx := idx
|
||||
@@ -104,14 +141,14 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) (resu
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeBuckets[idx+1] = localBuckets
|
||||
nodeBuckets[idx] = localBuckets
|
||||
return nil
|
||||
}, idx)
|
||||
}
|
||||
|
||||
errs = append(errs, g.Wait()...)
|
||||
|
||||
quorum := (len(sys.allPeerClients) / 2)
|
||||
quorum := len(sys.peerClients)/2 + 1
|
||||
if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -156,11 +193,7 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc
|
||||
|
||||
errs := g.Wait()
|
||||
|
||||
bucketInfo, err := getBucketInfoLocal(ctx, bucket, opts)
|
||||
errs = append(errs, err)
|
||||
bucketInfos = append(bucketInfos, bucketInfo)
|
||||
|
||||
quorum := (len(sys.allPeerClients) / 2)
|
||||
quorum := len(sys.peerClients)/2 + 1
|
||||
if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil {
|
||||
return BucketInfo{}, toObjectErr(err, bucket)
|
||||
}
|
||||
@@ -174,7 +207,7 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc
|
||||
return BucketInfo{}, toObjectErr(errVolumeNotFound, bucket)
|
||||
}
|
||||
|
||||
func (client *peerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) {
|
||||
func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted))
|
||||
|
||||
@@ -190,7 +223,7 @@ func (client *peerS3Client) ListBuckets(ctx context.Context, opts BucketOptions)
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket stat info from a peer
|
||||
func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3Bucket, bucket)
|
||||
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted))
|
||||
@@ -209,7 +242,6 @@ func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, op
|
||||
// MakeBucket creates bucket across all peers
|
||||
func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
g.Go(func() error {
|
||||
@@ -219,30 +251,24 @@ func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBu
|
||||
return client.MakeBucket(ctx, bucket, opts)
|
||||
}, idx)
|
||||
}
|
||||
|
||||
errs := g.Wait()
|
||||
errs = append(errs, makeBucketLocal(ctx, bucket, opts))
|
||||
|
||||
quorum := (len(sys.allPeerClients) / 2) + 1
|
||||
err := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum)
|
||||
|
||||
// Perform MRF on missing buckets for temporary errors.
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
continue
|
||||
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
|
||||
perPoolErrs := make([]error, 0, len(sys.peerClients))
|
||||
for i, client := range sys.peerClients {
|
||||
if slices.Contains(client.GetPools(), poolIdx) {
|
||||
perPoolErrs = append(perPoolErrs, errs[i])
|
||||
}
|
||||
}
|
||||
if errors.Is(err, errPeerOffline) || errors.Is(err, errDiskNotFound) ||
|
||||
isNetworkError(err) {
|
||||
globalMRFState.addPartialOp(partialOperation{
|
||||
bucket: bucket,
|
||||
})
|
||||
if poolErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil {
|
||||
return toObjectErr(poolErr, bucket)
|
||||
}
|
||||
}
|
||||
return toObjectErr(err, bucket)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MakeBucket creates a bucket on a peer
|
||||
func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
func (client *remotePeerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3Bucket, bucket)
|
||||
v.Set(peerS3BucketForceCreate, strconv.FormatBool(opts.ForceCreate))
|
||||
@@ -259,7 +285,6 @@ func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts
|
||||
// DeleteBucket deletes bucket across all peers
|
||||
func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
|
||||
for idx, client := range sys.peerClients {
|
||||
client := client
|
||||
g.Go(func() error {
|
||||
@@ -269,32 +294,26 @@ func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts Dele
|
||||
return client.DeleteBucket(ctx, bucket, opts)
|
||||
}, idx)
|
||||
}
|
||||
|
||||
errs := g.Wait()
|
||||
errs = append(errs, deleteBucketLocal(ctx, bucket, opts))
|
||||
|
||||
var errReturn error
|
||||
for _, err := range errs {
|
||||
if errReturn == nil && err != nil {
|
||||
// always return first error
|
||||
errReturn = toObjectErr(err, bucket)
|
||||
break
|
||||
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
|
||||
perPoolErrs := make([]error, 0, len(sys.peerClients))
|
||||
for i, client := range sys.peerClients {
|
||||
if slices.Contains(client.GetPools(), poolIdx) {
|
||||
perPoolErrs = append(perPoolErrs, errs[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, err := range errs {
|
||||
if err == nil && errReturn != nil {
|
||||
if poolErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil {
|
||||
// re-create successful deletes, since we are return an error.
|
||||
sys.MakeBucket(ctx, bucket, MakeBucketOptions{})
|
||||
break
|
||||
return toObjectErr(poolErr, bucket)
|
||||
}
|
||||
}
|
||||
|
||||
return errReturn
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBucket deletes bucket on a peer
|
||||
func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||
func (client *remotePeerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3Bucket, bucket)
|
||||
v.Set(peerS3BucketForceDelete, strconv.FormatBool(opts.Force))
|
||||
@@ -308,33 +327,35 @@ func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opt
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client remotePeerS3Client) GetHost() string {
|
||||
return client.host
|
||||
}
|
||||
|
||||
func (client remotePeerS3Client) GetPools() []int {
|
||||
return client.pools
|
||||
}
|
||||
|
||||
func (client *remotePeerS3Client) SetPools(p []int) {
|
||||
client.pools = make([]int, len(p))
|
||||
copy(client.pools, p)
|
||||
}
|
||||
|
||||
// newPeerS3Clients creates new peer clients.
|
||||
// The two slices will point to the same clients,
|
||||
// but 'all' will contain nil entry for local client.
|
||||
// The 'all' slice will be in the same order across the cluster.
|
||||
func newPeerS3Clients(endpoints EndpointServerPools) (remote, all []*peerS3Client) {
|
||||
if !globalIsDistErasure {
|
||||
// Only useful in distributed setups
|
||||
return nil, nil
|
||||
}
|
||||
hosts := endpoints.hostsSorted()
|
||||
remote = make([]*peerS3Client, 0, len(hosts))
|
||||
all = make([]*peerS3Client, len(hosts))
|
||||
for i, host := range hosts {
|
||||
if host == nil {
|
||||
continue
|
||||
func newPeerS3Clients(nodes []Node) (peers []peerS3Client) {
|
||||
peers = make([]peerS3Client, len(nodes))
|
||||
for i, node := range nodes {
|
||||
if node.IsLocal {
|
||||
peers[i] = &localPeerS3Client{host: node.Host}
|
||||
} else {
|
||||
peers[i] = newPeerS3Client(node.Host)
|
||||
}
|
||||
all[i] = newPeerS3Client(host)
|
||||
remote = append(remote, all[i])
|
||||
peers[i].SetPools(node.Pools)
|
||||
}
|
||||
if len(all) != len(remote)+1 {
|
||||
logger.LogIf(context.Background(), fmt.Errorf("WARNING: Expected number of all hosts (%v) to be remote +1 (%v)", len(all), len(remote)))
|
||||
}
|
||||
return remote, all
|
||||
return
|
||||
}
|
||||
|
||||
// Returns a peer S3 client.
|
||||
func newPeerS3Client(peer *xnet.Host) *peerS3Client {
|
||||
func newPeerS3Client(peer string) peerS3Client {
|
||||
scheme := "http"
|
||||
if globalIsTLS {
|
||||
scheme = "https"
|
||||
@@ -342,7 +363,7 @@ func newPeerS3Client(peer *xnet.Host) *peerS3Client {
|
||||
|
||||
serverURL := &url.URL{
|
||||
Scheme: scheme,
|
||||
Host: peer.String(),
|
||||
Host: peer,
|
||||
Path: peerS3Path,
|
||||
}
|
||||
|
||||
@@ -360,5 +381,5 @@ func newPeerS3Client(peer *xnet.Host) *peerS3Client {
|
||||
return !isNetworkError(err)
|
||||
}
|
||||
|
||||
return &peerS3Client{host: peer, restClient: restClient}
|
||||
return &remotePeerS3Client{host: peer, restClient: restClient}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user