mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
avoid ListBuckets returning quorum errors when node is down (#10555)
Also, revamp the way ListBuckets work make few portions of the healing logic parallel - walk objects for healing disks in parallel - collect the list of buckets in parallel across drives - provide consistent view for listBuckets()
This commit is contained in:
parent
d778d034e7
commit
ca989eb0b3
@ -153,11 +153,22 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
|
|||||||
for _, ep := range endpoints {
|
for _, ep := range endpoints {
|
||||||
logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1))
|
logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1))
|
||||||
|
|
||||||
if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil {
|
buckets, err := z.ListBucketsHeal(ctx)
|
||||||
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(buckets) > 0 {
|
||||||
|
disks := z.zones[i].sets[setIndex].getLoadBalancedDisks()
|
||||||
|
if err := healErasureSet(ctx, setIndex, buckets, disks, z.zones[i].setDriveCount); err != nil {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Healing disk '%s' on %s zone complete", ep, humanize.Ordinal(i+1))
|
||||||
|
|
||||||
// Only upon success pop the healed disk.
|
// Only upon success pop the healed disk.
|
||||||
globalBackgroundHealState.popHealLocalDisks(ep)
|
globalBackgroundHealState.popHealLocalDisks(ep)
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ func verifyServerSystemConfig(ctx context.Context, endpointZones EndpointZones)
|
|||||||
retries++
|
retries++
|
||||||
// after 5 retries start logging that servers are not reachable yet
|
// after 5 retries start logging that servers are not reachable yet
|
||||||
if retries >= 5 {
|
if retries >= 5 {
|
||||||
logger.Info(fmt.Sprintf("Waiting for atleast %d servers to be online for bootstrap check", len(clnts)/2))
|
logger.Info(fmt.Sprintf("Waiting for atleast %d remote servers to be online for bootstrap check", len(clnts)/2))
|
||||||
logger.Info(fmt.Sprintf("Following servers are currently offline or unreachable %s", offlineEndpoints))
|
logger.Info(fmt.Sprintf("Following servers are currently offline or unreachable %s", offlineEndpoints))
|
||||||
retries = 0 // reset to log again after 5 retries.
|
retries = 0 // reset to log again after 5 retries.
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
@ -122,53 +121,6 @@ func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string) (bi B
|
|||||||
return bucketInfo, nil
|
return bucketInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// listBuckets - returns list of all buckets from a disk picked at random.
|
|
||||||
func (er erasureObjects) listBuckets(ctx context.Context) (bucketsInfo []BucketInfo, err error) {
|
|
||||||
for _, disk := range er.getLoadBalancedDisks() {
|
|
||||||
if disk == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var volsInfo []VolInfo
|
|
||||||
volsInfo, err = disk.ListVols(ctx)
|
|
||||||
if err == nil {
|
|
||||||
// NOTE: The assumption here is that volumes across all disks in
|
|
||||||
// readQuorum have consistent view i.e they all have same number
|
|
||||||
// of buckets.
|
|
||||||
var bucketsInfo []BucketInfo
|
|
||||||
for _, volInfo := range volsInfo {
|
|
||||||
if isReservedOrInvalidBucket(volInfo.Name, true) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
bucketsInfo = append(bucketsInfo, BucketInfo(volInfo))
|
|
||||||
}
|
|
||||||
// For buckets info empty, loop once again to check
|
|
||||||
// if we have, can happen if disks were down.
|
|
||||||
if len(bucketsInfo) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return bucketsInfo, nil
|
|
||||||
}
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
// Ignore any disks not found.
|
|
||||||
if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListBuckets - lists all the buckets, sorted by its name.
|
|
||||||
func (er erasureObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
|
||||||
bucketInfos, err := er.listBuckets(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, toObjectErr(err)
|
|
||||||
}
|
|
||||||
// Sort by bucket name before returning.
|
|
||||||
sort.Sort(byBucketName(bucketInfos))
|
|
||||||
return bucketInfos, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dangling buckets should be handled appropriately, in this following situation
|
// Dangling buckets should be handled appropriately, in this following situation
|
||||||
// we actually have quorum error to be `nil` but we have some disks where
|
// we actually have quorum error to be `nil` but we have some disks where
|
||||||
// the bucket delete returned `errVolumeNotEmpty` but this is not correct
|
// the bucket delete returned `errVolumeNotEmpty` but this is not correct
|
||||||
|
@ -155,17 +155,18 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
|
|||||||
|
|
||||||
// listAllBuckets lists all buckets from all disks. It also
|
// listAllBuckets lists all buckets from all disks. It also
|
||||||
// returns the occurrence of each buckets in all disks
|
// returns the occurrence of each buckets in all disks
|
||||||
func listAllBuckets(storageDisks []StorageAPI, healBuckets map[string]VolInfo) (err error) {
|
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo) error {
|
||||||
for _, disk := range storageDisks {
|
g := errgroup.WithNErrs(len(storageDisks))
|
||||||
if disk == nil {
|
var mu sync.Mutex
|
||||||
continue
|
for index := range storageDisks {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if storageDisks[index] == nil {
|
||||||
|
// we ignore disk not found errors
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
var volsInfo []VolInfo
|
volsInfo, err := storageDisks[index].ListVols(ctx)
|
||||||
volsInfo, err = disk.ListVols(context.TODO())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, volInfo := range volsInfo {
|
for _, volInfo := range volsInfo {
|
||||||
@ -175,14 +176,16 @@ func listAllBuckets(storageDisks []StorageAPI, healBuckets map[string]VolInfo) (
|
|||||||
if isReservedOrInvalidBucket(volInfo.Name, false) {
|
if isReservedOrInvalidBucket(volInfo.Name, false) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// always save unique buckets across drives.
|
mu.Lock()
|
||||||
if _, ok := healBuckets[volInfo.Name]; !ok {
|
if _, ok := healBuckets[volInfo.Name]; !ok {
|
||||||
healBuckets[volInfo.Name] = volInfo
|
healBuckets[volInfo.Name] = volInfo
|
||||||
}
|
}
|
||||||
|
mu.Unlock()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, len(storageDisks)/2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only heal on disks where we are sure that healing is needed. We can expand
|
// Only heal on disks where we are sure that healing is needed. We can expand
|
||||||
|
@ -665,7 +665,7 @@ func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObj
|
|||||||
// that all buckets are present on all sets.
|
// that all buckets are present on all sets.
|
||||||
func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
|
func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
|
||||||
// Always lists from the same set signified by the empty string.
|
// Always lists from the same set signified by the empty string.
|
||||||
return s.getHashedSet("").ListBuckets(ctx)
|
return s.ListBucketsHeal(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Object Operations ---
|
// --- Object Operations ---
|
||||||
@ -1465,7 +1465,7 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
|
|||||||
var healBuckets = map[string]VolInfo{}
|
var healBuckets = map[string]VolInfo{}
|
||||||
for _, set := range s.sets {
|
for _, set := range s.sets {
|
||||||
// lists all unique buckets across drives.
|
// lists all unique buckets across drives.
|
||||||
if err := listAllBuckets(set.getDisks(), healBuckets); err != nil {
|
if err := listAllBuckets(ctx, set.getDisks(), healBuckets); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,12 +298,11 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter
|
|||||||
|
|
||||||
// Collect for each set in zones.
|
// Collect for each set in zones.
|
||||||
for _, z := range z.zones {
|
for _, z := range z.zones {
|
||||||
for _, erObj := range z.sets {
|
buckets, err := z.ListBuckets(ctx)
|
||||||
// Add new buckets.
|
|
||||||
buckets, err := erObj.ListBuckets(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Add new buckets.
|
||||||
for _, b := range buckets {
|
for _, b := range buckets {
|
||||||
if _, ok := knownBuckets[b.Name]; ok {
|
if _, ok := knownBuckets[b.Name]; ok {
|
||||||
continue
|
continue
|
||||||
@ -311,6 +310,7 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter
|
|||||||
allBuckets = append(allBuckets, b)
|
allBuckets = append(allBuckets, b)
|
||||||
knownBuckets[b.Name] = struct{}{}
|
knownBuckets[b.Name] = struct{}{}
|
||||||
}
|
}
|
||||||
|
for _, erObj := range z.sets {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
results = append(results, dataUsageCache{})
|
results = append(results, dataUsageCache{})
|
||||||
go func(i int, erObj *erasureObjects) {
|
go func(i int, erObj *erasureObjects) {
|
||||||
|
@ -18,6 +18,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
@ -89,12 +90,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// healErasureSet lists and heals all objects in a specific erasure set
|
// healErasureSet lists and heals all objects in a specific erasure set
|
||||||
func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, setDriveCount int) error {
|
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI, setDriveCount int) error {
|
||||||
buckets, err := xlObj.ListBuckets(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get background heal sequence to send elements to heal
|
// Get background heal sequence to send elements to heal
|
||||||
var bgSeq *healSequence
|
var bgSeq *healSequence
|
||||||
var ok bool
|
var ok bool
|
||||||
@ -125,22 +121,30 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, se
|
|||||||
}
|
}
|
||||||
|
|
||||||
var entryChs []FileInfoVersionsCh
|
var entryChs []FileInfoVersionsCh
|
||||||
for _, disk := range xlObj.getLoadBalancedDisks() {
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, disk := range disks {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
// Disk can be offline
|
// Disk can be offline
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
disk := disk
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done())
|
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Disk walk returned error, ignore it.
|
// Disk walk returned error, ignore it.
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
mu.Lock()
|
||||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
entryChs = append(entryChs, FileInfoVersionsCh{
|
||||||
Ch: entryCh,
|
Ch: entryCh,
|
||||||
})
|
})
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
entriesValid := make([]bool, len(entryChs))
|
entriesValid := make([]bool, len(entryChs))
|
||||||
entries := make([]FileInfoVersions, len(entryChs))
|
entries := make([]FileInfoVersions, len(entryChs))
|
||||||
|
@ -305,31 +305,13 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
// are modifying this code that you do so, if and when
|
// are modifying this code that you do so, if and when
|
||||||
// you want to add extra context to your error. This
|
// you want to add extra context to your error. This
|
||||||
// ensures top level retry works accordingly.
|
// ensures top level retry works accordingly.
|
||||||
var buckets []BucketInfo
|
|
||||||
if globalIsDistErasure || globalIsErasure {
|
|
||||||
// List buckets to heal, and be re-used for loading configs.
|
// List buckets to heal, and be re-used for loading configs.
|
||||||
|
var buckets []BucketInfo
|
||||||
|
if globalIsErasure {
|
||||||
buckets, err = newObject.ListBucketsHeal(ctx)
|
buckets, err = newObject.ListBucketsHeal(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||||
}
|
}
|
||||||
// Attempt a heal if possible and re-use the bucket names
|
|
||||||
// to reload their config.
|
|
||||||
wquorum := &InsufficientWriteQuorum{}
|
|
||||||
rquorum := &InsufficientReadQuorum{}
|
|
||||||
for _, bucket := range buckets {
|
|
||||||
if err = newObject.MakeBucketWithLocation(ctx, bucket.Name, BucketOptions{}); err != nil {
|
|
||||||
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
|
|
||||||
// Return the error upwards for the caller to retry.
|
|
||||||
return fmt.Errorf("Unable to heal bucket: %w", err)
|
|
||||||
}
|
|
||||||
if _, ok := err.(BucketExists); !ok {
|
|
||||||
// ignore any other error and log for investigation.
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Bucket already exists, nothing that needs to be done.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
buckets, err = newObject.ListBuckets(ctx)
|
buckets, err = newObject.ListBuckets(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -349,7 +331,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize bucket metadata sub-system.
|
// Initialize bucket metadata sub-system.
|
||||||
if err := globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil {
|
if err = globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil {
|
||||||
return fmt.Errorf("Unable to initialize bucket metadata sub-system: %w", err)
|
return fmt.Errorf("Unable to initialize bucket metadata sub-system: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user