mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
shuffle buckets randomly before being scanned (#17644)
this randomness is needed to avoid scanning the same buckets across different erasure sets, in the same order. allow random buckets to be scanned instead allowing a wider spread of ILM, replication checks. Additionally do not loop over twice to fill the channel, fill the channel regardless of having bucket new or old.
This commit is contained in:
parent
bb6921bf9c
commit
bdddf597f6
@ -20,7 +20,9 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/sync/errgroup"
|
||||
@ -30,15 +32,16 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if disks[i-1] == nil {
|
||||
if disks[i] == nil {
|
||||
return
|
||||
}
|
||||
di, err := disks[i-1].DiskInfo(context.Background())
|
||||
di, err := disks[i].DiskInfo(context.Background())
|
||||
if err != nil || di.Healing {
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
@ -50,7 +53,7 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
newDisks = append(newDisks, disks[i-1])
|
||||
newDisks = append(newDisks, disks[i])
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
@ -61,9 +64,10 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
// Based on the random shuffling return back randomized disks.
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
if disks[i-1] != nil && disks[i-1].IsLocal() {
|
||||
newDisks = append(newDisks, disks[i-1])
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
if disks[i] != nil && disks[i].IsLocal() {
|
||||
newDisks = append(newDisks, disks[i])
|
||||
}
|
||||
}
|
||||
return newDisks
|
||||
@ -74,10 +78,11 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
||||
func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
|
||||
disks := er.getDisks()
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
if !optimized {
|
||||
var newDisks []StorageAPI
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
newDisks = append(newDisks, disks[i-1])
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
newDisks = append(newDisks, disks[i])
|
||||
}
|
||||
return newDisks
|
||||
}
|
||||
@ -86,15 +91,15 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
|
||||
var mu sync.Mutex
|
||||
newDisks := map[uint64][]StorageAPI{}
|
||||
// Based on the random shuffling return back randomized disks.
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if disks[i-1] == nil {
|
||||
if disks[i] == nil {
|
||||
return
|
||||
}
|
||||
di, err := disks[i-1].DiskInfo(context.Background())
|
||||
di, err := disks[i].DiskInfo(context.Background())
|
||||
if err != nil || di.Healing {
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
@ -107,7 +112,7 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
|
||||
|
||||
mu.Lock()
|
||||
// Capture disks usage wise upto resolution of MiB
|
||||
newDisks[di.Used/1024/1024] = append(newDisks[di.Used/1024/1024], disks[i-1])
|
||||
newDisks[di.Used/1024/1024] = append(newDisks[di.Used/1024/1024], disks[i])
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
@ -242,6 +242,14 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return the disks belonging to the poolIdx, and setIdx.
|
||||
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) {
|
||||
if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) {
|
||||
return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil
|
||||
}
|
||||
return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
|
||||
}
|
||||
|
||||
// Return the count of disks in each pool
|
||||
func (z *erasureServerPools) SetDriveCounts() []int {
|
||||
setDriveCounts := make([]int, len(z.serverPools))
|
||||
@ -630,11 +638,6 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, updates chan<- DataU
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scanner latest allBuckets first.
|
||||
sort.Slice(allBuckets, func(i, j int) bool {
|
||||
return allBuckets[i].Created.After(allBuckets[j].Created)
|
||||
})
|
||||
|
||||
// Collect for each set in serverPools.
|
||||
for _, z := range z.serverPools {
|
||||
for _, erObj := range z.sets {
|
||||
|
@ -277,31 +277,33 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
|
||||
var wg sync.WaitGroup
|
||||
disks := er.getDisks()
|
||||
infos := make([]DiskInfo, len(disks))
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for _, i := range r.Perm(len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
disk := disks[i-1]
|
||||
|
||||
disk := disks[i]
|
||||
if disk == nil {
|
||||
infos[i-1].Error = "nil drive"
|
||||
infos[i].Error = "offline drive"
|
||||
return
|
||||
}
|
||||
|
||||
di, err := disk.DiskInfo(context.Background())
|
||||
if err != nil {
|
||||
if err != nil || di.Healing {
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
//
|
||||
//
|
||||
// - Future: skip busy disks
|
||||
infos[i-1].Error = err.Error()
|
||||
if err != nil {
|
||||
infos[i].Error = err.Error()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
infos[i-1] = di
|
||||
infos[i] = di
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -373,23 +375,30 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
||||
|
||||
// Put all buckets into channel.
|
||||
bucketCh := make(chan BucketInfo, len(buckets))
|
||||
|
||||
// Shuffle buckets to ensure total randomness of buckets, being scanned.
|
||||
// Otherwise same set of buckets get scanned across erasure sets always.
|
||||
// at any given point in time. This allows different buckets to be scanned
|
||||
// in different order per erasure set, this wider spread is needed when
|
||||
// there are lots of buckets with different order of objects in them.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
permutes := r.Perm(len(buckets))
|
||||
// Add new buckets first
|
||||
for _, b := range buckets {
|
||||
if oldCache.find(b.Name) == nil {
|
||||
for _, idx := range permutes {
|
||||
b := buckets[idx]
|
||||
if e := oldCache.find(b.Name); e == nil {
|
||||
bucketCh <- b
|
||||
}
|
||||
}
|
||||
|
||||
// Add existing buckets.
|
||||
for _, b := range buckets {
|
||||
e := oldCache.find(b.Name)
|
||||
if e != nil {
|
||||
for _, idx := range permutes {
|
||||
b := buckets[idx]
|
||||
if e := oldCache.find(b.Name); e != nil {
|
||||
cache.replace(b.Name, dataUsageRoot, *e)
|
||||
bucketCh <- b
|
||||
}
|
||||
}
|
||||
|
||||
close(bucketCh)
|
||||
|
||||
bucketResults := make(chan dataUsageEntryInfo, len(disks))
|
||||
|
||||
// Start async collector/saver.
|
||||
@ -428,11 +437,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
||||
}
|
||||
}()
|
||||
|
||||
// Shuffle disks to ensure a total randomness of bucket/disk association to ensure
|
||||
// that objects that are not present in all disks are accounted and ILM applied.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] })
|
||||
|
||||
// Restrict parallelism for disk usage scanner
|
||||
// upto GOMAXPROCS if GOMAXPROCS is < len(disks)
|
||||
maxProcs := runtime.GOMAXPROCS(0)
|
||||
|
@ -247,7 +247,8 @@ type ObjectLayer interface {
|
||||
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
|
||||
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
|
||||
SetDriveCounts() []int // list of erasure stripe size for each pool in order.
|
||||
GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) // return the disks belonging to pool and set.
|
||||
SetDriveCounts() []int // list of erasure stripe size for each pool in order.
|
||||
|
||||
// Healing operations.
|
||||
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
|
||||
|
Loading…
Reference in New Issue
Block a user