Allow parallel decom migration threads to be more than erasure sets (#14733)

This commit is contained in:
Krishna Srinivas 2022-04-12 10:49:53 -07:00 committed by GitHub
parent 646350fa7f
commit 5f94cec1e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -583,13 +583,15 @@ func (v versionsSorter) reverse() {
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
var wg sync.WaitGroup var wg sync.WaitGroup
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
workerSize, _ := strconv.Atoi(wStr) workerSize, err := strconv.Atoi(wStr)
if err != nil {
return err
}
parallelWorkers := make(chan struct{}, workerSize) parallelWorkers := make(chan struct{}, workerSize)
versioned := globalBucketVersioningSys.Enabled(bName) versioned := globalBucketVersioningSys.Enabled(bName)
for _, set := range pool.sets { for _, set := range pool.sets {
parallelWorkers <- struct{}{}
set := set set := set
disks := set.getOnlineDisks() disks := set.getOnlineDisks()
if len(disks) == 0 { if len(disks) == 0 {
@ -599,6 +601,9 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
} }
decommissionEntry := func(entry metaCacheEntry) { decommissionEntry := func(entry metaCacheEntry) {
defer func() {
<-parallelWorkers
}()
if entry.isDir() { if entry.isDir() {
return return
} }
@ -712,17 +717,20 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
forwardTo: "", forwardTo: "",
minDisks: len(disks) / 2, // to capture all quorum ratios minDisks: len(disks) / 2, // to capture all quorum ratios
reportNotFound: false, reportNotFound: false,
agreed: decommissionEntry, agreed: func(entry metaCacheEntry) {
parallelWorkers <- struct{}{}
go decommissionEntry(entry)
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) { partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver) entry, ok := entries.resolve(&resolver)
if ok { if ok {
decommissionEntry(*entry) parallelWorkers <- struct{}{}
go decommissionEntry(*entry)
} }
}, },
finished: nil, finished: nil,
}) })
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
<-parallelWorkers
}() }()
} }
wg.Wait() wg.Wait()