mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Heal buckets at node level (#18504)
This commit is contained in:
@@ -25,7 +25,9 @@ import (
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v3"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/rest"
|
||||
"github.com/minio/pkg/v2/sync/errgroup"
|
||||
@@ -36,6 +38,7 @@ var errPeerOffline = errors.New("peer is offline")
|
||||
|
||||
type peerS3Client interface {
|
||||
ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error)
|
||||
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error)
|
||||
MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error
|
||||
DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error
|
||||
@@ -67,6 +70,10 @@ func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions)
|
||||
return listBucketsLocal(ctx, opts)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
return healBucketLocal(ctx, bucket, opts)
|
||||
}
|
||||
|
||||
func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
return getBucketInfoLocal(ctx, bucket, opts)
|
||||
}
|
||||
@@ -124,6 +131,51 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
|
||||
}
|
||||
}
|
||||
|
||||
// HealBucket - heals buckets at node level
|
||||
func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
|
||||
healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
idx := idx
|
||||
client := client
|
||||
g.Go(func() error {
|
||||
if client == nil {
|
||||
return errPeerOffline
|
||||
}
|
||||
res, err := client.HealBucket(ctx, bucket, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
healBucketResults[idx] = res
|
||||
return nil
|
||||
}, idx)
|
||||
}
|
||||
|
||||
errs := g.Wait()
|
||||
|
||||
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
|
||||
perPoolErrs := make([]error, 0, len(sys.peerClients))
|
||||
for i, client := range sys.peerClients {
|
||||
if slices.Contains(client.GetPools(), poolIdx) {
|
||||
perPoolErrs = append(perPoolErrs, errs[i])
|
||||
}
|
||||
}
|
||||
quorum := len(perPoolErrs) / 2
|
||||
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil {
|
||||
return madmin.HealResultItem{}, poolErr
|
||||
}
|
||||
}
|
||||
|
||||
for i, err := range errs {
|
||||
if err == nil {
|
||||
return healBucketResults[i], nil
|
||||
}
|
||||
}
|
||||
|
||||
return madmin.HealResultItem{}, toObjectErr(errVolumeNotFound, bucket)
|
||||
}
|
||||
|
||||
// ListBuckets lists buckets across all nodes and returns a consistent view:
|
||||
// - Return an error when a pool cannot return N/2+1 valid bucket information
|
||||
// - For each pool, check if the bucket exists in N/2+1 nodes before including it in the final result
|
||||
@@ -185,6 +237,22 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
|
||||
}
|
||||
}
|
||||
}
|
||||
// loop through buckets and see if some with lost quorum
|
||||
// these could be stale buckets lying around, queue a heal
|
||||
// of such a bucket. This is needed here as we identify such
|
||||
// buckets here while listing buckets. As part of regular
|
||||
// globalBucketMetadataSys.Init() call would get a valid
|
||||
// buckets only and not the quourum lost ones like this, so
|
||||
// explicit call
|
||||
for bktName, count := range bucketsMap {
|
||||
if count < quorum {
|
||||
// Queue a bucket heal task
|
||||
globalMRFState.addPartialOp(partialOperation{
|
||||
bucket: bktName,
|
||||
queued: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]BucketInfo, 0, len(resultMap))
|
||||
@@ -259,6 +327,23 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp
|
||||
return buckets, err
|
||||
}
|
||||
|
||||
func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3Bucket, bucket)
|
||||
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove))
|
||||
|
||||
respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1)
|
||||
if err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
var res madmin.HealResultItem
|
||||
err = gob.NewDecoder(respBody).Decode(&res)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket stat info from a peer
|
||||
func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
v := url.Values{}
|
||||
|
||||
Reference in New Issue
Block a user