fix: HealBucket regression for empty buckets, simplify it (#18815)

This commit is contained in:
Harshavardhana
2024-01-17 15:19:09 -08:00
committed by GitHub
parent 479940b7d0
commit 9588978028
6 changed files with 93 additions and 94 deletions

View File

@@ -1931,76 +1931,12 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi
}
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
r := madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
}
// Attempt heal on the bucket metadata, ignore any failures
hopts := opts
hopts.Recreate = false
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts)
type DiskStat struct {
VolInfos []VolInfo
Errs []error
}
for _, pool := range z.serverPools {
// map of node wise disk stats
diskStats := make(map[string]DiskStat)
for _, set := range pool.sets {
for _, disk := range set.getDisks() {
if disk == OfflineDisk {
continue
}
vi, err := disk.StatVol(ctx, bucket)
hostName := disk.Hostname()
if disk.IsLocal() {
hostName = "local"
}
ds, ok := diskStats[hostName]
if !ok {
newds := DiskStat{
VolInfos: []VolInfo{vi},
Errs: []error{err},
}
diskStats[hostName] = newds
} else {
ds.VolInfos = append(ds.VolInfos, vi)
ds.Errs = append(ds.Errs, err)
diskStats[hostName] = ds
}
}
}
nodeCount := len(diskStats)
bktNotFoundCount := 0
for _, ds := range diskStats {
if isAllBucketsNotFound(ds.Errs) {
bktNotFoundCount++
}
}
// if the bucket if not found on more than hslf the no of nodes, its dangling
if bktNotFoundCount > nodeCount/2 {
opts.Remove = true
} else {
opts.Recreate = true
}
result, err := z.s3Peer.HealBucket(ctx, bucket, opts)
if err != nil {
if _, ok := err.(BucketNotFound); ok {
continue
}
return result, err
}
r.DiskCount += result.DiskCount
r.SetCount += result.SetCount
r.Before.Drives = append(r.Before.Drives, result.Before.Drives...)
r.After.Drives = append(r.After.Drives, result.After.Drives...)
}
return r, nil
return z.s3Peer.HealBucket(ctx, bucket, opts)
}
// Walk a bucket, optionally prefix recursively, until we have returned

View File

@@ -135,6 +135,35 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
idx := idx
client := client
g.Go(func() error {
if client == nil {
return errPeerOffline
}
_, err := client.GetBucketInfo(ctx, bucket, BucketOptions{})
return err
}, idx)
}
errs := g.Wait()
var poolErrs []error
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
perPoolErrs := make([]error, 0, len(sys.peerClients))
for i, client := range sys.peerClients {
if slices.Contains(client.GetPools(), poolIdx) {
perPoolErrs = append(perPoolErrs, errs[i])
}
}
quorum := len(perPoolErrs) / 2
poolErrs = append(poolErrs, reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum))
}
opts.Remove = isAllBucketsNotFound(poolErrs)
opts.Recreate = !opts.Remove
healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients))
for idx, client := range sys.peerClients {
idx := idx
@@ -152,7 +181,7 @@ func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin
}, idx)
}
errs := g.Wait()
errs = g.Wait()
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
perPoolErrs := make([]error, 0, len(sys.peerClients))