fix: start using pkg/workers to spawn parallel workers (#17170)

This commit is contained in:
Harshavardhana 2023-05-09 16:37:31 -07:00 committed by GitHub
parent 5e629a99af
commit b92cdea578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -27,7 +27,6 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
@ -36,6 +35,7 @@ import (
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console" "github.com/minio/pkg/console"
"github.com/minio/pkg/env" "github.com/minio/pkg/env"
"github.com/minio/pkg/workers"
) )
// PoolDecommissionInfo currently decommissioning information // PoolDecommissionInfo currently decommissioning information
@ -708,14 +708,20 @@ func (set *erasureObjects) listObjectsToDecommission(ctx context.Context, bi dec
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error { func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
var wg sync.WaitGroup
wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets))) wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
workerSize, err := strconv.Atoi(wStr) workerSize, err := strconv.Atoi(wStr)
if err != nil { if err != nil {
return err return err
} }
parallelWorkers := make(chan struct{}, workerSize) // each set get its own thread separate from the concurrent
// objects/versions being decommissioned.
workerSize += len(pool.sets)
wk, err := workers.New(workerSize)
if err != nil {
return err
}
for _, set := range pool.sets { for _, set := range pool.sets {
set := set set := set
@ -749,10 +755,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
} }
decommissionEntry := func(entry metaCacheEntry) { decommissionEntry := func(entry metaCacheEntry) {
defer func() { defer wk.Give()
<-parallelWorkers
wg.Done()
}()
if entry.isDir() { if entry.isDir() {
return return
@ -902,21 +905,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
z.poolMetaMutex.Unlock() z.poolMetaMutex.Unlock()
} }
wg.Add(1) wk.Take()
go func() { go func() {
defer wg.Done() defer wk.Give()
err := set.listObjectsToDecommission(ctx, bi, err := set.listObjectsToDecommission(ctx, bi,
func(entry metaCacheEntry) { func(entry metaCacheEntry) {
// Wait must be synchronized here. wk.Take()
wg.Add(1)
parallelWorkers <- struct{}{}
go decommissionEntry(entry) go decommissionEntry(entry)
}, },
) )
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
}() }()
} }
wg.Wait() wk.Wait()
return nil return nil
} }
@ -1280,7 +1281,7 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, indices ...i
z.HealBucket(ctx, bucket.Name, madmin.HealOpts{}) z.HealBucket(ctx, bucket.Name, madmin.HealOpts{})
} }
// Create .minio.sys/conifg, .minio.sys/buckets paths if missing, // Create .minio.sys/config, .minio.sys/buckets paths if missing,
// this code is present to avoid any missing meta buckets on other // this code is present to avoid any missing meta buckets on other
// pools. // pools.
for _, metaBucket := range []string{ for _, metaBucket := range []string{