Heal buckets at node level (#18612)

Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
This commit is contained in:
Shubhendu
2024-01-10 10:04:04 +05:30
committed by GitHub
parent f02d282754
commit e31081d79d
12 changed files with 326 additions and 197 deletions

View File

@@ -25,7 +25,9 @@ import (
"net/url"
"sort"
"strconv"
"time"
"github.com/minio/madmin-go/v3"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/v2/sync/errgroup"
@@ -36,6 +38,7 @@ var errPeerOffline = errors.New("peer is offline")
type peerS3Client interface {
ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error)
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error)
MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error
DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error
@@ -67,6 +70,10 @@ func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions)
return listBucketsLocal(ctx, opts)
}
func (l localPeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return healBucketLocal(ctx, bucket, opts)
}
func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
return getBucketInfoLocal(ctx, bucket, opts)
}
@@ -124,6 +131,51 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
}
}
// HealBucket - heals buckets at node level
func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
g := errgroup.WithNErrs(len(sys.peerClients))
healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients))
for idx, client := range sys.peerClients {
idx := idx
client := client
g.Go(func() error {
if client == nil {
return errPeerOffline
}
res, err := client.HealBucket(ctx, bucket, opts)
if err != nil {
return err
}
healBucketResults[idx] = res
return nil
}, idx)
}
errs := g.Wait()
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
perPoolErrs := make([]error, 0, len(sys.peerClients))
for i, client := range sys.peerClients {
if slices.Contains(client.GetPools(), poolIdx) {
perPoolErrs = append(perPoolErrs, errs[i])
}
}
quorum := len(perPoolErrs) / 2
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil {
return madmin.HealResultItem{}, poolErr
}
}
for i, err := range errs {
if err == nil {
return healBucketResults[i], nil
}
}
return madmin.HealResultItem{}, toObjectErr(errVolumeNotFound, bucket)
}
// ListBuckets lists buckets across all nodes and returns a consistent view:
// - Return an error when a pool cannot return N/2+1 valid bucket information
// - For each pool, check if the bucket exists in N/2+1 nodes before including it in the final result
@@ -185,6 +237,22 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
}
}
}
// loop through buckets and see if some with lost quorum
// these could be stale buckets lying around, queue a heal
// of such a bucket. This is needed here as we identify such
// buckets here while listing buckets. As part of regular
// globalBucketMetadataSys.Init() call would get a valid
// buckets only and not the quourum lost ones like this, so
// explicit call
for bktName, count := range bucketsMap {
if count < quorum {
// Queue a bucket heal task
globalMRFState.addPartialOp(partialOperation{
bucket: bktName,
queued: time.Now(),
})
}
}
}
result := make([]BucketInfo, 0, len(resultMap))
@@ -259,6 +327,23 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp
return buckets, err
}
func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove))
respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1)
if err != nil {
return madmin.HealResultItem{}, err
}
defer xhttp.DrainBody(respBody)
var res madmin.HealResultItem
err = gob.NewDecoder(respBody).Decode(&res)
return res, err
}
// GetBucketInfo returns bucket stat info from a peer
func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
v := url.Values{}