From 8ea6fb368ddc23ba0c49fb8d4c4305349e71b78f Mon Sep 17 00:00:00 2001 From: Poorna Date: Sat, 24 Sep 2022 16:20:28 -0700 Subject: [PATCH] Add auto configuration of replication workers (#15636) --- cmd/bucket-replication.go | 154 ++++++++++++++++++++++++++++++------ cmd/handler-api.go | 27 +++---- internal/config/api/api.go | 68 +++++++--------- internal/config/api/help.go | 12 +-- 4 files changed, 169 insertions(+), 92 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 69519659a..83064dcaa 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -23,11 +23,13 @@ import ( "errors" "fmt" "io" + "math" "net/http" "path" "reflect" "strings" "sync" + "sync/atomic" "time" "github.com/dustin/go-humanize" @@ -1511,23 +1513,62 @@ type ReplicationPool struct { mrfSaveCh chan MRFReplicateEntry saveStateCh chan struct{} - workerSize int - mrfWorkerSize int - resyncState replicationResyncState - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.Mutex + workerSize int + mrfWorkerSize int + activeWorkers int32 + activeMRFWorkers int32 + priority string + resyncState replicationResyncState + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.RWMutex } +const ( + // WorkerMaxLimit max number of workers per node for "fast" mode + WorkerMaxLimit = 500 + + // WorkerMinLimit min number of workers per node for "slow" mode + WorkerMinLimit = 50 + + // WorkerAutoDefault is default number of workers for "auto" mode + WorkerAutoDefault = 100 + + // MRFWorkerMaxLimit max number of mrf workers per node for "fast" mode + MRFWorkerMaxLimit = 8 + + // MRFWorkerMinLimit min number of mrf workers per node for "slow" mode + MRFWorkerMinLimit = 2 + + // MRFWorkerAutoDefault is default number of mrf workers for "auto" mode + MRFWorkerAutoDefault = 4 +) + // NewReplicationPool creates a pool of replication workers of specified size func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { + var workers, failedWorkers int + priority := "auto" + if opts.Priority != "" { + priority = opts.Priority + } + switch priority { + case "fast": + workers = WorkerMaxLimit + failedWorkers = MRFWorkerMaxLimit + case "slow": + workers = WorkerMinLimit + failedWorkers = MRFWorkerMinLimit + default: + workers = WorkerAutoDefault + failedWorkers = MRFWorkerAutoDefault + } pool := &ReplicationPool{ replicaCh: make(chan ReplicateObjectInfo, 100000), replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), - workerKillCh: make(chan struct{}, opts.Workers), - mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers), + workerKillCh: make(chan struct{}, workers), + mrfWorkerKillCh: make(chan struct{}, failedWorkers), existingReplicaCh: make(chan ReplicateObjectInfo, 100000), existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)}, @@ -1535,10 +1576,11 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool saveStateCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, + priority: priority, } - pool.ResizeWorkers(opts.Workers) - pool.ResizeFailedWorkers(opts.FailedWorkers) + pool.ResizeWorkers(workers) + pool.ResizeFailedWorkers(failedWorkers) go pool.AddExistingObjectReplicateWorker() go pool.updateResyncStatus(ctx, o) go pool.processMRF() @@ -1559,7 +1601,9 @@ func (p *ReplicationPool) AddMRFWorker() { if !ok { return } + atomic.AddInt32(&p.activeMRFWorkers, 1) replicateObject(p.ctx, oi, p.objLayer) + atomic.AddInt32(&p.activeMRFWorkers, -1) case <-p.mrfWorkerKillCh: return } @@ -1577,12 +1621,17 @@ func (p *ReplicationPool) AddWorker() { if !ok { return } + atomic.AddInt32(&p.activeWorkers, 1) replicateObject(p.ctx, oi, p.objLayer) + atomic.AddInt32(&p.activeWorkers, -1) + case doi, ok := <-p.replicaDeleteCh: if !ok { return } + atomic.AddInt32(&p.activeWorkers, 1) replicateDelete(p.ctx, doi, p.objLayer) + atomic.AddInt32(&p.activeWorkers, -1) case <-p.workerKillCh: return } @@ -1609,6 +1658,16 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() { } } +// ActiveWorkers returns the number of active workers handling replication traffic. +func (p *ReplicationPool) ActiveWorkers() int { + return int(atomic.LoadInt32(&p.activeWorkers)) +} + +// ActiveMRFWorkers returns the number of active workers handling replication failures. +func (p *ReplicationPool) ActiveMRFWorkers() int { + return int(atomic.LoadInt32(&p.activeMRFWorkers)) +} + // ResizeWorkers sets replication workers pool to new size func (p *ReplicationPool) ResizeWorkers(n int) { p.mu.Lock() @@ -1625,6 +1684,33 @@ func (p *ReplicationPool) ResizeWorkers(n int) { } } +// ResizeWorkerPriority sets replication failed workers pool size +func (p *ReplicationPool) ResizeWorkerPriority(pri string) { + var workers, mrfWorkers int + p.mu.Lock() + switch pri { + case "fast": + workers = WorkerMaxLimit + mrfWorkers = MRFWorkerMaxLimit + case "slow": + workers = WorkerMinLimit + mrfWorkers = MRFWorkerMinLimit + default: + workers = WorkerAutoDefault + mrfWorkers = MRFWorkerAutoDefault + if p.workerSize < WorkerAutoDefault { + workers = int(math.Min(float64(p.workerSize+1), WorkerAutoDefault)) + } + if p.mrfWorkerSize < MRFWorkerAutoDefault { + mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault)) + } + } + p.priority = pri + p.mu.Unlock() + p.ResizeWorkers(workers) + p.ResizeFailedWorkers(mrfWorkers) +} + // ResizeFailedWorkers sets replication failed workers pool size func (p *ReplicationPool) ResizeFailedWorkers(n int) { p.mu.Lock() @@ -1641,14 +1727,6 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) { } } -// suggestedWorkers recommends an increase in number of workers to meet replication load. -func (p *ReplicationPool) suggestedWorkers(failQueue bool) int { - if failQueue { - return int(float64(p.mrfWorkerSize) * ReplicationWorkerMultiplier) - } - return int(float64(p.workerSize) * ReplicationWorkerMultiplier) -} - func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { if p == nil { return @@ -1675,7 +1753,23 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case ch <- ri: default: globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) + p.mu.RLock() + switch p.priority { + case "fast": + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem)) + case "slow": + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) + default: + if p.ActiveWorkers() < WorkerMaxLimit { + workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + p.ResizeWorkers(workers) + } + if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { + workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit)) + p.ResizeFailedWorkers(workers) + } + } + p.mu.RUnlock() } } @@ -1713,19 +1807,29 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf case ch <- doi: default: globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) + p.mu.RLock() + switch p.priority { + case "fast": + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem)) + case "slow": + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) + default: + if p.ActiveWorkers() < WorkerMaxLimit { + workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + p.ResizeWorkers(workers) + } + } + p.mu.RUnlock() } } type replicationPoolOpts struct { - Workers int - FailedWorkers int + Priority string } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{ - Workers: globalAPIConfig.getReplicationWorkers(), - FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(), + Priority: globalAPIConfig.getReplicationPriority(), }) globalReplicationStats = NewReplicationStats(ctx, objectAPI) go globalReplicationStats.loadInitialReplicationMetrics(ctx) diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 4d59a8e69..05d3ef4f8 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -41,10 +41,9 @@ type apiConfig struct { listQuorum string corsAllowOrigins []string // total drives per erasure set across pools. - totalDriveCount int - replicationWorkers int - replicationFailedWorkers int - transitionWorkers int + totalDriveCount int + replicationPriority string + transitionWorkers int staleUploadsExpiry time.Duration staleUploadsCleanupInterval time.Duration @@ -137,12 +136,11 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { t.requestsDeadline = cfg.RequestsDeadline t.listQuorum = cfg.ListQuorum if globalReplicationPool != nil && - cfg.ReplicationWorkers != t.replicationWorkers { - globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers) - globalReplicationPool.ResizeWorkers(cfg.ReplicationWorkers) + cfg.ReplicationPriority != t.replicationPriority { + globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority) } - t.replicationFailedWorkers = cfg.ReplicationFailedWorkers - t.replicationWorkers = cfg.ReplicationWorkers + t.replicationPriority = cfg.ReplicationPriority + if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers { globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) } @@ -289,18 +287,11 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc { } } -func (t *apiConfig) getReplicationFailedWorkers() int { +func (t *apiConfig) getReplicationPriority() string { t.mu.RLock() defer t.mu.RUnlock() - return t.replicationFailedWorkers -} - -func (t *apiConfig) getReplicationWorkers() int { - t.mu.RLock() - defer t.mu.RUnlock() - - return t.replicationWorkers + return t.replicationPriority } func (t *apiConfig) getTransitionWorkers() int { diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 69094eb80..d15d17d38 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -37,8 +37,7 @@ const ( apiCorsAllowOrigin = "cors_allow_origin" apiRemoteTransportDeadline = "remote_transport_deadline" apiListQuorum = "list_quorum" - apiReplicationWorkers = "replication_workers" - apiReplicationFailedWorkers = "replication_failed_workers" + apiReplicationPriority = "replication_priority" apiTransitionWorkers = "transition_workers" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" apiStaleUploadsExpiry = "stale_uploads_expiry" @@ -46,16 +45,14 @@ const ( apiDisableODirect = "disable_odirect" apiGzipObjects = "gzip_objects" - EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" - EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" - EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" - EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" - EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" - EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" - EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default "on" - EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" - EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" - EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" + EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" + EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" + EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" + EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" + EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" + EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" + EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default "on" + EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL" EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY" @@ -67,8 +64,14 @@ const ( // Deprecated key and ENVs const ( - apiReadyDeadline = "ready_deadline" - EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE" + apiReadyDeadline = "ready_deadline" + apiReplicationWorkers = "replication_workers" + apiReplicationFailedWorkers = "replication_failed_workers" + + EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE" + EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" + EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" + EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" ) // DefaultKVS - default storage class config @@ -99,12 +102,8 @@ var ( Value: "strict", }, config.KV{ - Key: apiReplicationWorkers, - Value: "250", - }, - config.KV{ - Key: apiReplicationFailedWorkers, - Value: "8", + Key: apiReplicationPriority, + Value: "auto", }, config.KV{ Key: apiTransitionWorkers, @@ -141,8 +140,7 @@ type Config struct { CorsAllowOrigin []string `json:"cors_allow_origin"` RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` ListQuorum string `json:"list_quorum"` - ReplicationWorkers int `json:"replication_workers"` - ReplicationFailedWorkers int `json:"replication_failed_workers"` + ReplicationPriority string `json:"replication_priority"` TransitionWorkers int `json:"transition_workers"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` @@ -167,6 +165,8 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { // remove this since we have removed this already. kvs.Delete(apiReadyDeadline) kvs.Delete("extend_list_cache_life") + kvs.Delete(apiReplicationWorkers) + kvs.Delete(apiReplicationFailedWorkers) if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil { return cfg, err @@ -206,22 +206,11 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, errors.New("invalid value for list strict quorum") } - replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.GetWithDefault(apiReplicationWorkers, DefaultKVS))) - if err != nil { - return cfg, err - } - - if replicationWorkers <= 0 { - return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1") - } - - replicationFailedWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationFailedWorkers, kvs.GetWithDefault(apiReplicationFailedWorkers, DefaultKVS))) - if err != nil { - return cfg, err - } - - if replicationFailedWorkers <= 0 { - return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication failed workers should be 1") + replicationPriority := env.Get(EnvAPIReplicationPriority, kvs.GetWithDefault(apiReplicationPriority, DefaultKVS)) + switch replicationPriority { + case "slow", "fast", "auto": + default: + return cfg, errors.New("invalid value for replication priority") } transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS))) @@ -263,8 +252,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { CorsAllowOrigin: corsAllowOrigin, RemoteTransportDeadline: remoteTransportDeadline, ListQuorum: listQuorum, - ReplicationWorkers: replicationWorkers, - ReplicationFailedWorkers: replicationFailedWorkers, + ReplicationPriority: replicationPriority, TransitionWorkers: transitionWorkers, StaleUploadsCleanupInterval: staleUploadsCleanupInterval, StaleUploadsExpiry: staleUploadsExpiry, diff --git a/internal/config/api/help.go b/internal/config/api/help.go index eaeab95e9..35cabc876 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -63,16 +63,10 @@ var ( Type: "string", }, config.HelpKV{ - Key: apiReplicationWorkers, - Description: `set the number of replication workers` + defaultHelpPostfix(apiReplicationWorkers), + Key: apiReplicationPriority, + Description: `set replication priority` + defaultHelpPostfix(apiReplicationPriority), Optional: true, - Type: "number", - }, - config.HelpKV{ - Key: apiReplicationFailedWorkers, - Description: `set the number of replication workers for recently failed replicas` + defaultHelpPostfix(apiReplicationFailedWorkers), - Optional: true, - Type: "number", + Type: "string", }, config.HelpKV{ Key: apiTransitionWorkers,