From bdddf597f6b8d4360a13fe52f26eccf34cbecf24 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 14 Jul 2023 02:25:40 -0700 Subject: [PATCH] shuffle buckets randomly before being scanned (#17644) this randomness is needed to avoid scanning the same buckets across different erasure sets, in the same order. allow random buckets to be scanned instead allowing a wider spread of ILM, replication checks. Additionally do not loop over twice to fill the channel, fill the channel regardless of having bucket new or old. --- cmd/erasure-common.go | 31 +++++++++++++++----------- cmd/erasure-server-pool.go | 13 ++++++----- cmd/erasure.go | 44 ++++++++++++++++++++----------------- cmd/object-api-interface.go | 3 ++- 4 files changed, 52 insertions(+), 39 deletions(-) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index eba322800..f848c2f6a 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -20,7 +20,9 @@ package cmd import ( "context" "fmt" + "math/rand" "sync" + "time" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/sync/errgroup" @@ -30,15 +32,16 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { disks := er.getDisks() var wg sync.WaitGroup var mu sync.Mutex - for _, i := range hashOrder(UTCNow().String(), len(disks)) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, i := range r.Perm(len(disks)) { i := i wg.Add(1) go func() { defer wg.Done() - if disks[i-1] == nil { + if disks[i] == nil { return } - di, err := disks[i-1].DiskInfo(context.Background()) + di, err := disks[i].DiskInfo(context.Background()) if err != nil || di.Healing { // - Do not consume disks which are not reachable // unformatted or simply not accessible for some reason. @@ -50,7 +53,7 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { } mu.Lock() - newDisks = append(newDisks, disks[i-1]) + newDisks = append(newDisks, disks[i]) mu.Unlock() }() } @@ -61,9 +64,10 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { disks := er.getDisks() // Based on the random shuffling return back randomized disks. - for _, i := range hashOrder(UTCNow().String(), len(disks)) { - if disks[i-1] != nil && disks[i-1].IsLocal() { - newDisks = append(newDisks, disks[i-1]) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, i := range r.Perm(len(disks)) { + if disks[i] != nil && disks[i].IsLocal() { + newDisks = append(newDisks, disks[i]) } } return newDisks @@ -74,10 +78,11 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { disks := er.getDisks() + r := rand.New(rand.NewSource(time.Now().UnixNano())) if !optimized { var newDisks []StorageAPI - for _, i := range hashOrder(UTCNow().String(), len(disks)) { - newDisks = append(newDisks, disks[i-1]) + for _, i := range r.Perm(len(disks)) { + newDisks = append(newDisks, disks[i]) } return newDisks } @@ -86,15 +91,15 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { var mu sync.Mutex newDisks := map[uint64][]StorageAPI{} // Based on the random shuffling return back randomized disks. - for _, i := range hashOrder(UTCNow().String(), len(disks)) { + for _, i := range r.Perm(len(disks)) { i := i wg.Add(1) go func() { defer wg.Done() - if disks[i-1] == nil { + if disks[i] == nil { return } - di, err := disks[i-1].DiskInfo(context.Background()) + di, err := disks[i].DiskInfo(context.Background()) if err != nil || di.Healing { // - Do not consume disks which are not reachable // unformatted or simply not accessible for some reason. @@ -107,7 +112,7 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { mu.Lock() // Capture disks usage wise upto resolution of MiB - newDisks[di.Used/1024/1024] = append(newDisks[di.Used/1024/1024], disks[i-1]) + newDisks[di.Used/1024/1024] = append(newDisks[di.Used/1024/1024], disks[i]) mu.Unlock() }() } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index da82bf60a..5be9e5195 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -242,6 +242,14 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string return nil } +// Return the disks belonging to the poolIdx, and setIdx. +func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) { + if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) { + return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil + } + return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)) +} + // Return the count of disks in each pool func (z *erasureServerPools) SetDriveCounts() []int { setDriveCounts := make([]int, len(z.serverPools)) @@ -630,11 +638,6 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU return nil } - // Scanner latest allBuckets first. - sort.Slice(allBuckets, func(i, j int) bool { - return allBuckets[i].Created.After(allBuckets[j].Created) - }) - // Collect for each set in serverPools. for _, z := range z.serverPools { for _, erObj := range z.sets { diff --git a/cmd/erasure.go b/cmd/erasure.go index 4b297bac7..900c50516 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -277,31 +277,33 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea var wg sync.WaitGroup disks := er.getDisks() infos := make([]DiskInfo, len(disks)) - for _, i := range hashOrder(UTCNow().String(), len(disks)) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, i := range r.Perm(len(disks)) { i := i wg.Add(1) go func() { defer wg.Done() - disk := disks[i-1] - + disk := disks[i] if disk == nil { - infos[i-1].Error = "nil drive" + infos[i].Error = "offline drive" return } di, err := disk.DiskInfo(context.Background()) - if err != nil { + if err != nil || di.Healing { // - Do not consume disks which are not reachable // unformatted or simply not accessible for some reason. // // // - Future: skip busy disks - infos[i-1].Error = err.Error() + if err != nil { + infos[i].Error = err.Error() + } return } - infos[i-1] = di + infos[i] = di }() } wg.Wait() @@ -373,23 +375,30 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa // Put all buckets into channel. bucketCh := make(chan BucketInfo, len(buckets)) + + // Shuffle buckets to ensure total randomness of buckets, being scanned. + // Otherwise same set of buckets get scanned across erasure sets always. + // at any given point in time. This allows different buckets to be scanned + // in different order per erasure set, this wider spread is needed when + // there are lots of buckets with different order of objects in them. + r := rand.New(rand.NewSource(time.Now().UnixNano())) + permutes := r.Perm(len(buckets)) // Add new buckets first - for _, b := range buckets { - if oldCache.find(b.Name) == nil { + for _, idx := range permutes { + b := buckets[idx] + if e := oldCache.find(b.Name); e == nil { bucketCh <- b } } - - // Add existing buckets. - for _, b := range buckets { - e := oldCache.find(b.Name) - if e != nil { + for _, idx := range permutes { + b := buckets[idx] + if e := oldCache.find(b.Name); e != nil { cache.replace(b.Name, dataUsageRoot, *e) bucketCh <- b } } - close(bucketCh) + bucketResults := make(chan dataUsageEntryInfo, len(disks)) // Start async collector/saver. @@ -428,11 +437,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa } }() - // Shuffle disks to ensure a total randomness of bucket/disk association to ensure - // that objects that are not present in all disks are accounted and ILM applied. - r := rand.New(rand.NewSource(time.Now().UnixNano())) - r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] }) - // Restrict parallelism for disk usage scanner // upto GOMAXPROCS if GOMAXPROCS is < len(disks) maxProcs := runtime.GOMAXPROCS(0) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 56c2b6471..081fb3d5f 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -247,7 +247,8 @@ type ObjectLayer interface { AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) - SetDriveCounts() []int // list of erasure stripe size for each pool in order. + GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) // return the disks belonging to pool and set. + SetDriveCounts() []int // list of erasure stripe size for each pool in order. // Healing operations. HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)