relax write quorum requirement for ListBuckets()/HeadBucket() (#18288)

Also fix error handling for HeadBucket() to be pool specific
This commit is contained in:
Harshavardhana 2023-10-20 17:50:21 -07:00 committed by GitHub
parent 8cd80fec8c
commit aa703dc903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -131,7 +131,6 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
g := errgroup.WithNErrs(len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients))
nodeBuckets := make([][]BucketInfo, len(sys.peerClients)) nodeBuckets := make([][]BucketInfo, len(sys.peerClients))
errs := []error{nil}
for idx, client := range sys.peerClients { for idx, client := range sys.peerClients {
idx := idx idx := idx
@ -149,7 +148,7 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
}, idx) }, idx)
} }
errs = append(errs, g.Wait()...) errs := g.Wait()
// The list of buckets in a map to avoid duplication // The list of buckets in a map to avoid duplication
resultMap := make(map[string]BucketInfo) resultMap := make(map[string]BucketInfo)
@ -161,7 +160,7 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
perPoolErrs = append(perPoolErrs, errs[i]) perPoolErrs = append(perPoolErrs, errs[i])
} }
} }
quorum := len(perPoolErrs)/2 + 1 quorum := len(perPoolErrs) / 2
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil { if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil {
return nil, poolErr return nil, poolErr
} }
@ -181,7 +180,7 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
continue continue
} }
bucketsMap[bi.Name]++ bucketsMap[bi.Name]++
if bucketsMap[bi.Name] == quorum { if bucketsMap[bi.Name] >= quorum {
resultMap[bi.Name] = bi resultMap[bi.Name] = bi
} }
} }
@ -223,9 +222,17 @@ func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts Buc
errs := g.Wait() errs := g.Wait()
quorum := len(sys.peerClients)/2 + 1 for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil { perPoolErrs := make([]error, 0, len(sys.peerClients))
return BucketInfo{}, toObjectErr(err, bucket) for i, client := range sys.peerClients {
if slices.Contains(client.GetPools(), poolIdx) {
perPoolErrs = append(perPoolErrs, errs[i])
}
}
quorum := len(perPoolErrs) / 2
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil {
return BucketInfo{}, poolErr
}
} }
for i, err := range errs { for i, err := range errs {