Add dynamic reconfiguration of number of transition workers (#12926)

This commit is contained in:
Krishnan Parthasarathi
2021-08-11 22:23:56 -07:00
committed by GitHub
parent 9e88941515
commit 65b6f4aa31
7 changed files with 88 additions and 36 deletions

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"strings"
"sync"
"time"
@@ -115,9 +114,14 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
}
type transitionState struct {
once sync.Once
// add future metrics here
once sync.Once
transitionCh chan ObjectInfo
ctx context.Context
objAPI ObjectLayer
mu sync.Mutex
numWorkers int
killCh chan struct{}
}
func (t *transitionState) queueTransitionTask(oi ObjectInfo) {
@@ -132,50 +136,59 @@ func (t *transitionState) queueTransitionTask(oi ObjectInfo) {
}
var (
globalTransitionState *transitionState
globalTransitionConcurrent = runtime.GOMAXPROCS(0) / 2
globalTransitionState *transitionState
)
func newTransitionState() *transitionState {
// fix minimum concurrent transition to 1 for single CPU setup
if globalTransitionConcurrent == 0 {
globalTransitionConcurrent = 1
}
func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionState {
return &transitionState{
transitionCh: make(chan ObjectInfo, 10000),
ctx: ctx,
objAPI: objAPI,
killCh: make(chan struct{}),
}
}
// addWorker creates a new worker to process tasks
func (t *transitionState) addWorker(ctx context.Context, objectAPI ObjectLayer) {
// Add a new worker.
go func() {
for {
select {
case <-ctx.Done():
// worker waits for transition tasks
func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) {
for {
select {
case <-t.killCh:
return
case <-ctx.Done():
return
case oi, ok := <-t.transitionCh:
if !ok {
return
case oi, ok := <-t.transitionCh:
if !ok {
return
}
}
if err := transitionObject(ctx, objectAPI, oi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err))
}
if err := transitionObject(ctx, objectAPI, oi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err))
}
}
}()
}
}
// UpdateWorkers at the end of this function leaves n goroutines waiting for
// transition tasks
func (t *transitionState) UpdateWorkers(n int) {
t.mu.Lock()
defer t.mu.Unlock()
for t.numWorkers < n {
go t.worker(t.ctx, t.objAPI)
t.numWorkers++
}
for t.numWorkers > n {
go func() { t.killCh <- struct{}{} }()
t.numWorkers--
}
}
func initBackgroundTransition(ctx context.Context, objectAPI ObjectLayer) {
if globalTransitionState == nil {
return
}
// Start with globalTransitionConcurrent.
for i := 0; i < globalTransitionConcurrent; i++ {
globalTransitionState.addWorker(ctx, objectAPI)
}
globalTransitionState = newTransitionState(ctx, objectAPI)
n := globalAPIConfig.getTransitionWorkers()
globalTransitionState.UpdateWorkers(n)
}
var errInvalidStorageClass = errors.New("invalid storage class")

View File

@@ -92,8 +92,6 @@ func init() {
},
})
globalTransitionState = newTransitionState()
console.SetColor("Debug", fcolor.New())
gob.Register(StorageErr(""))

View File

@@ -39,6 +39,7 @@ type apiConfig struct {
totalDriveCount int
replicationWorkers int
replicationFailedWorkers int
transitionWorkers int
}
func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
@@ -87,6 +88,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
}
t.replicationFailedWorkers = cfg.ReplicationFailedWorkers
t.replicationWorkers = cfg.ReplicationWorkers
if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers {
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
}
t.transitionWorkers = cfg.TransitionWorkers
}
func (t *apiConfig) getListQuorum() int {
@@ -173,3 +178,10 @@ func (t *apiConfig) getReplicationWorkers() int {
return t.replicationWorkers
}
func (t *apiConfig) getTransitionWorkers() int {
t.mu.RLock()
defer t.mu.RUnlock()
return t.transitionWorkers
}

View File

@@ -530,7 +530,6 @@ func serverMain(ctx *cli.Context) {
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
initHealMRF(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
}
initBackgroundExpiry(GlobalContext, newObject)
@@ -558,6 +557,7 @@ func serverMain(ctx *cli.Context) {
if globalIsErasure { // to be done after config init
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
if err != nil {
logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal")