do not list buckets without local quorum (#20852)

ListBuckets() would result in listing buckets
without quorum, this PR fixes the behavior.
This commit is contained in:
Harshavardhana
2025-01-19 15:13:17 -08:00
committed by GitHub
parent 3d0f513ee2
commit 779ec8f0d4
5 changed files with 54 additions and 34 deletions

View File

@@ -32,6 +32,7 @@ import (
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v3/sync/errgroup"
"github.com/puzpuzpuz/xsync/v3"
)
//go:generate stringer -type=healingMetric -trimprefix=healingMetric $GOFILE
@@ -117,9 +118,8 @@ func (er erasureObjects) listAndHeal(ctx context.Context, bucket, prefix string,
// listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets *xsync.MapOf[string, VolInfo], readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks))
var mu sync.Mutex
for index := range storageDisks {
index := index
g.Go(func() error {
@@ -131,6 +131,7 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
if err != nil {
return err
}
for _, volInfo := range volsInfo {
// StorageAPI can send volume names which are
// incompatible with buckets - these are
@@ -138,25 +139,44 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
if isReservedOrInvalidBucket(volInfo.Name, false) {
continue
}
mu.Lock()
if _, ok := healBuckets[volInfo.Name]; !ok {
healBuckets[volInfo.Name] = volInfo
}
mu.Unlock()
healBuckets.Compute(volInfo.Name, func(oldValue VolInfo, loaded bool) (newValue VolInfo, del bool) {
if loaded {
newValue = oldValue
newValue.count = oldValue.count + 1
return newValue, false
}
return VolInfo{
Name: volInfo.Name,
Created: volInfo.Created,
count: 1,
}, false
})
}
return nil
}, index)
}
return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum)
if err := reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum); err != nil {
return err
}
healBuckets.Range(func(volName string, volInfo VolInfo) bool {
if volInfo.count < readQuorum {
healBuckets.Delete(volName)
}
return true
})
return nil
}
var errLegacyXLMeta = errors.New("legacy XL meta")
var errOutdatedXLMeta = errors.New("outdated XL meta")
var (
errPartCorrupt = errors.New("part corrupt")
errPartMissing = errors.New("part missing")
errLegacyXLMeta = errors.New("legacy XL meta")
errOutdatedXLMeta = errors.New("outdated XL meta")
errPartCorrupt = errors.New("part corrupt")
errPartMissing = errors.New("part missing")
)
// Only heal on disks where we are sure that healing is needed. We can expand