Parallelize decommissioning process (#14704)

This commit is contained in:
Krishna Srinivas 2022-04-07 23:19:13 -07:00 committed by GitHub
parent b35b9dcff7
commit 48594617b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,12 +24,15 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"sort" "sort"
"strconv"
"sync"
"time" "time"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console" "github.com/minio/pkg/console"
"github.com/minio/pkg/env"
) )
// PoolDecommissionInfo currently decommissioning information // PoolDecommissionInfo currently decommissioning information
@ -578,15 +581,16 @@ func (v versionsSorter) reverse() {
} }
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
var forwardTo string var wg sync.WaitGroup
// If we resume to the same bucket, forward to last known item. wStr := env.Get("_MINIO_DECOMMISSION_WORKERS", strconv.Itoa(len(pool.sets)))
rbucket, robject := z.poolMeta.ResumeBucketObject(idx) workerSize, _ := strconv.Atoi(wStr)
if rbucket != "" && rbucket == bName {
forwardTo = robject parallelWorkers := make(chan struct{}, workerSize)
}
versioned := globalBucketVersioningSys.Enabled(bName) versioned := globalBucketVersioningSys.Enabled(bName)
for _, set := range pool.sets { for _, set := range pool.sets {
parallelWorkers <- struct{}{}
set := set
disks := set.getOnlineDisks() disks := set.getOnlineDisks()
if len(disks) == 0 { if len(disks) == 0 {
logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s", logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s",
@ -698,26 +702,30 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
bucket: bName, bucket: bName,
} }
if err := listPathRaw(ctx, listPathRawOptions{ wg.Add(1)
disks: disks, go func() {
bucket: bName, defer wg.Done()
recursive: true, err := listPathRaw(ctx, listPathRawOptions{
forwardTo: forwardTo, disks: disks,
minDisks: len(disks) / 2, // to capture all quorum ratios bucket: bName,
reportNotFound: false, recursive: true,
agreed: decommissionEntry, forwardTo: "",
partial: func(entries metaCacheEntries, nAgreed int, errs []error) { minDisks: len(disks) / 2, // to capture all quorum ratios
entry, ok := entries.resolve(&resolver) reportNotFound: false,
if ok { agreed: decommissionEntry,
decommissionEntry(*entry) partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
} entry, ok := entries.resolve(&resolver)
}, if ok {
finished: nil, decommissionEntry(*entry)
}); err != nil { }
// Decommissioning failed and won't continue },
return err finished: nil,
} })
logger.LogIf(ctx, err)
<-parallelWorkers
}()
} }
wg.Wait()
return nil return nil
} }