From 65b6f4aa31c2f6e49cb4e4b29e2a148b1990c354 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 11 Aug 2021 22:23:56 -0700 Subject: [PATCH] Add dynamic reconfiguration of number of transition workers (#12926) --- cmd/bucket-lifecycle.go | 79 +++++++++++++++++++++---------------- cmd/common-main.go | 2 - cmd/handler-api.go | 12 ++++++ cmd/server-main.go | 2 +- internal/config/api/api.go | 17 ++++++++ internal/config/api/help.go | 6 +++ internal/config/errors.go | 6 +++ 7 files changed, 88 insertions(+), 36 deletions(-) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 511e5d38e..341de22c2 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "net/http" - "runtime" "strings" "sync" "time" @@ -115,9 +114,14 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { } type transitionState struct { - once sync.Once - // add future metrics here + once sync.Once transitionCh chan ObjectInfo + + ctx context.Context + objAPI ObjectLayer + mu sync.Mutex + numWorkers int + killCh chan struct{} } func (t *transitionState) queueTransitionTask(oi ObjectInfo) { @@ -132,50 +136,59 @@ func (t *transitionState) queueTransitionTask(oi ObjectInfo) { } var ( - globalTransitionState *transitionState - globalTransitionConcurrent = runtime.GOMAXPROCS(0) / 2 + globalTransitionState *transitionState ) -func newTransitionState() *transitionState { - // fix minimum concurrent transition to 1 for single CPU setup - if globalTransitionConcurrent == 0 { - globalTransitionConcurrent = 1 - } +func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionState { return &transitionState{ transitionCh: make(chan ObjectInfo, 10000), + ctx: ctx, + objAPI: objAPI, + killCh: make(chan struct{}), } } -// addWorker creates a new worker to process tasks -func (t *transitionState) addWorker(ctx context.Context, objectAPI ObjectLayer) { - // Add a new worker. - go func() { - for { - select { - case <-ctx.Done(): +// worker waits for transition tasks +func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) { + for { + select { + case <-t.killCh: + return + case <-ctx.Done(): + return + case oi, ok := <-t.transitionCh: + if !ok { return - case oi, ok := <-t.transitionCh: - if !ok { - return - } + } - if err := transitionObject(ctx, objectAPI, oi); err != nil { - logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err)) - } + if err := transitionObject(ctx, objectAPI, oi); err != nil { + logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err)) } } - }() + } +} + +// UpdateWorkers at the end of this function leaves n goroutines waiting for +// transition tasks +func (t *transitionState) UpdateWorkers(n int) { + t.mu.Lock() + defer t.mu.Unlock() + + for t.numWorkers < n { + go t.worker(t.ctx, t.objAPI) + t.numWorkers++ + } + + for t.numWorkers > n { + go func() { t.killCh <- struct{}{} }() + t.numWorkers-- + } } func initBackgroundTransition(ctx context.Context, objectAPI ObjectLayer) { - if globalTransitionState == nil { - return - } - - // Start with globalTransitionConcurrent. - for i := 0; i < globalTransitionConcurrent; i++ { - globalTransitionState.addWorker(ctx, objectAPI) - } + globalTransitionState = newTransitionState(ctx, objectAPI) + n := globalAPIConfig.getTransitionWorkers() + globalTransitionState.UpdateWorkers(n) } var errInvalidStorageClass = errors.New("invalid storage class") diff --git a/cmd/common-main.go b/cmd/common-main.go index 55a8aa8cf..a3bb03701 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -92,8 +92,6 @@ func init() { }, }) - globalTransitionState = newTransitionState() - console.SetColor("Debug", fcolor.New()) gob.Register(StorageErr("")) diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 69d650140..80284fe54 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -39,6 +39,7 @@ type apiConfig struct { totalDriveCount int replicationWorkers int replicationFailedWorkers int + transitionWorkers int } func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { @@ -87,6 +88,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { } t.replicationFailedWorkers = cfg.ReplicationFailedWorkers t.replicationWorkers = cfg.ReplicationWorkers + if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers { + globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) + } + t.transitionWorkers = cfg.TransitionWorkers } func (t *apiConfig) getListQuorum() int { @@ -173,3 +178,10 @@ func (t *apiConfig) getReplicationWorkers() int { return t.replicationWorkers } + +func (t *apiConfig) getTransitionWorkers() int { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.transitionWorkers +} diff --git a/cmd/server-main.go b/cmd/server-main.go index f56429d84..80ce7e96b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -530,7 +530,6 @@ func serverMain(ctx *cli.Context) { if globalIsErasure { initAutoHeal(GlobalContext, newObject) initHealMRF(GlobalContext, newObject) - initBackgroundTransition(GlobalContext, newObject) } initBackgroundExpiry(GlobalContext, newObject) @@ -558,6 +557,7 @@ func serverMain(ctx *cli.Context) { if globalIsErasure { // to be done after config init initBackgroundReplication(GlobalContext, newObject) + initBackgroundTransition(GlobalContext, newObject) globalTierJournal, err = initTierDeletionJournal(GlobalContext) if err != nil { logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal") diff --git a/internal/config/api/api.go b/internal/config/api/api.go index d19e716c1..611dc4d93 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -20,6 +20,7 @@ package api import ( "encoding/json" "errors" + "runtime" "strconv" "strings" "time" @@ -38,6 +39,7 @@ const ( apiListQuorum = "list_quorum" apiReplicationWorkers = "replication_workers" apiReplicationFailedWorkers = "replication_failed_workers" + apiTransitionWorkers = "transition_workers" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" @@ -48,6 +50,7 @@ const ( EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" + EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" ) // Deprecated key and ENVs @@ -91,6 +94,10 @@ var ( Key: apiReplicationFailedWorkers, Value: "8", }, + config.KV{ + Key: apiTransitionWorkers, + Value: "100", + }, } ) @@ -104,6 +111,7 @@ type Config struct { ListQuorum string `json:"list_quorum"` ReplicationWorkers int `json:"replication_workers"` ReplicationFailedWorkers int `json:"replication_failed_workers"` + TransitionWorkers int `json:"transition_workers"` } // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. @@ -195,6 +203,14 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication failed workers should be 1") } + transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.Get(apiTransitionWorkers))) + if err != nil { + return cfg, err + } + if transitionWorkers < runtime.GOMAXPROCS(0)/2 { + return cfg, config.ErrInvalidTransitionWorkersValue(nil) + } + return Config{ RequestsMax: requestsMax, RequestsDeadline: requestsDeadline, @@ -204,5 +220,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { ListQuorum: listQuorum, ReplicationWorkers: replicationWorkers, ReplicationFailedWorkers: replicationFailedWorkers, + TransitionWorkers: transitionWorkers, }, nil } diff --git a/internal/config/api/help.go b/internal/config/api/help.go index 2e04c39d9..899b05e55 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -58,5 +58,11 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: apiTransitionWorkers, + Description: `set the number of transition workers, defaults to 100`, + Optional: true, + Type: "number", + }, } ) diff --git a/internal/config/errors.go b/internal/config/errors.go index b01f3f122..05c0b2b7f 100644 --- a/internal/config/errors.go +++ b/internal/config/errors.go @@ -289,4 +289,10 @@ Example 1: "", "MINIO_API_REPLICATION_WORKERS: should be > 0", ) + + ErrInvalidTransitionWorkersValue = newErrFn( + "Invalid value for transition workers", + "", + "MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2", + ) )