mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
converge listBuckets() as a peer call (#16346)
This commit is contained in:
@@ -80,6 +80,51 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
|
||||
}
|
||||
}
|
||||
|
||||
// ListBuckets lists buckets across all servers and returns a possible consistent view
|
||||
func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
|
||||
localBuckets, err := listBucketsLocal(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeBuckets := make([][]BucketInfo, len(sys.peerClients)+1)
|
||||
errs := []error{nil}
|
||||
nodeBuckets[0] = localBuckets
|
||||
|
||||
for idx, client := range sys.peerClients {
|
||||
idx := idx
|
||||
client := client
|
||||
g.Go(func() error {
|
||||
if client == nil {
|
||||
return errPeerOffline
|
||||
}
|
||||
localBuckets, err := client.ListBuckets(ctx, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeBuckets[idx+1] = localBuckets
|
||||
return nil
|
||||
}, idx)
|
||||
}
|
||||
|
||||
errs = append(errs, g.Wait()...)
|
||||
|
||||
quorum := (len(sys.allPeerClients) / 2)
|
||||
if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for idx, buckets := range nodeBuckets {
|
||||
if errs[idx] == nil {
|
||||
return buckets, nil
|
||||
}
|
||||
}
|
||||
|
||||
return []BucketInfo{}, nil
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket stat info about bucket on disk across all peers
|
||||
func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (binfo BucketInfo, err error) {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
@@ -127,6 +172,21 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc
|
||||
return bucketInfo, nil
|
||||
}
|
||||
|
||||
func (client *peerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) {
|
||||
v := url.Values{}
|
||||
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted))
|
||||
|
||||
respBody, err := client.call(peerS3MethodListBuckets, v, nil, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
var buckets []BucketInfo
|
||||
err = gob.NewDecoder(respBody).Decode(&buckets)
|
||||
return buckets, err
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket stat info from a peer
|
||||
func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
|
||||
v := url.Values{}
|
||||
|
||||
Reference in New Issue
Block a user