add configuration to limit replication workers (#18601)

This commit is contained in:
Poorna 2023-12-07 16:22:00 -08:00 committed by GitHub
parent 6ca6788bb7
commit 6b06da76cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 42 deletions

View File

@ -1700,6 +1700,7 @@ type ReplicationPool struct {
objLayer ObjectLayer objLayer ObjectLayer
ctx context.Context ctx context.Context
priority string priority string
maxWorkers int
mu sync.RWMutex mu sync.RWMutex
mrfMU sync.Mutex mrfMU sync.Mutex
resyncer *replicationResyncer resyncer *replicationResyncer
@ -1748,9 +1749,13 @@ const (
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
var workers, failedWorkers int var workers, failedWorkers int
priority := "auto" priority := "auto"
maxWorkers := WorkerMaxLimit
if opts.Priority != "" { if opts.Priority != "" {
priority = opts.Priority priority = opts.Priority
} }
if opts.MaxWorkers > 0 {
maxWorkers = opts.MaxWorkers
}
switch priority { switch priority {
case "fast": case "fast":
workers = WorkerMaxLimit workers = WorkerMaxLimit
@ -1762,7 +1767,13 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
workers = WorkerAutoDefault workers = WorkerAutoDefault
failedWorkers = MRFWorkerAutoDefault failedWorkers = MRFWorkerAutoDefault
} }
if maxWorkers > 0 && workers > maxWorkers {
workers = maxWorkers
}
if maxWorkers > 0 && failedWorkers > maxWorkers {
failedWorkers = maxWorkers
}
pool := &ReplicationPool{ pool := &ReplicationPool{
workers: make([]chan ReplicationWorkerOperation, 0, workers), workers: make([]chan ReplicationWorkerOperation, 0, workers),
lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount),
@ -1774,6 +1785,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
ctx: ctx, ctx: ctx,
objLayer: o, objLayer: o,
priority: priority, priority: priority,
maxWorkers: maxWorkers,
} }
pool.AddLargeWorkers() pool.AddLargeWorkers()
@ -1929,7 +1941,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
} }
// ResizeWorkerPriority sets replication failed workers pool size // ResizeWorkerPriority sets replication failed workers pool size
func (p *ReplicationPool) ResizeWorkerPriority(pri string) { func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) {
var workers, mrfWorkers int var workers, mrfWorkers int
p.mu.Lock() p.mu.Lock()
switch pri { switch pri {
@ -1949,7 +1961,15 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault)) mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
} }
} }
if maxWorkers > 0 && workers > maxWorkers {
workers = maxWorkers
}
if maxWorkers > 0 && mrfWorkers > maxWorkers {
mrfWorkers = maxWorkers
}
p.priority = pri p.priority = pri
p.maxWorkers = maxWorkers
p.mu.Unlock() p.mu.Unlock()
p.ResizeWorkers(workers, 0) p.ResizeWorkers(workers, 0)
p.ResizeFailedWorkers(mrfWorkers) p.ResizeFailedWorkers(mrfWorkers)
@ -2023,6 +2043,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
p.mu.RLock() p.mu.RLock()
prio := p.priority prio := p.priority
maxWorkers := p.maxWorkers
p.mu.RUnlock() p.mu.RUnlock()
switch prio { switch prio {
case "fast": case "fast":
@ -2030,16 +2051,18 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case "slow": case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default: default:
if p.ActiveWorkers() < WorkerMaxLimit { maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit))
if p.ActiveWorkers() < maxWorkers {
p.mu.RLock() p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers)))
existing := len(p.workers) existing := len(p.workers)
p.mu.RUnlock() p.mu.RUnlock()
p.ResizeWorkers(workers, existing) p.ResizeWorkers(workers, existing)
} }
if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { maxMRFWorkers := int(math.Min(float64(maxWorkers), MRFWorkerMaxLimit))
if p.ActiveMRFWorkers() < maxMRFWorkers {
p.mu.RLock() p.mu.RLock()
workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit)) workers := int(math.Min(float64(p.mrfWorkerSize+1), float64(maxMRFWorkers)))
p.mu.RUnlock() p.mu.RUnlock()
p.ResizeFailedWorkers(workers) p.ResizeFailedWorkers(workers)
} }
@ -2077,6 +2100,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
p.mu.RLock() p.mu.RLock()
prio := p.priority prio := p.priority
maxWorkers := p.maxWorkers
p.mu.RUnlock() p.mu.RUnlock()
switch prio { switch prio {
case "fast": case "fast":
@ -2084,9 +2108,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
case "slow": case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default: default:
if p.ActiveWorkers() < WorkerMaxLimit { maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit))
if p.ActiveWorkers() < maxWorkers {
p.mu.RLock() p.mu.RLock()
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers)))
existing := len(p.workers) existing := len(p.workers)
p.mu.RUnlock() p.mu.RUnlock()
p.ResizeWorkers(workers, existing) p.ResizeWorkers(workers, existing)
@ -2097,12 +2122,11 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
type replicationPoolOpts struct { type replicationPoolOpts struct {
Priority string Priority string
MaxWorkers int
} }
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{ globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts())
Priority: globalAPIConfig.getReplicationPriority(),
})
globalReplicationStats = NewReplicationStats(ctx, objectAPI) globalReplicationStats = NewReplicationStats(ctx, objectAPI)
go globalReplicationStats.trackEWMA() go globalReplicationStats.trackEWMA()
} }

View File

@ -44,6 +44,7 @@ type apiConfig struct {
// total drives per erasure set across pools. // total drives per erasure set across pools.
totalDriveCount int totalDriveCount int
replicationPriority string replicationPriority string
replicationMaxWorkers int
transitionWorkers int transitionWorkers int
staleUploadsExpiry time.Duration staleUploadsExpiry time.Duration
@ -152,10 +153,11 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
} }
t.listQuorum = listQuorum t.listQuorum = listQuorum
if globalReplicationPool != nil && if globalReplicationPool != nil &&
cfg.ReplicationPriority != t.replicationPriority { (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers) {
globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority) globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers)
} }
t.replicationPriority = cfg.ReplicationPriority t.replicationPriority = cfg.ReplicationPriority
t.replicationMaxWorkers = cfg.ReplicationMaxWorkers
if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers { if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers {
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
} }
@ -334,15 +336,21 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc {
} }
} }
func (t *apiConfig) getReplicationPriority() string { func (t *apiConfig) getReplicationOpts() replicationPoolOpts {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
if t.replicationPriority == "" { if t.replicationPriority == "" {
return "auto" return replicationPoolOpts{
Priority: "auto",
MaxWorkers: WorkerMaxLimit,
}
} }
return t.replicationPriority return replicationPoolOpts{
Priority: t.replicationPriority,
MaxWorkers: t.replicationMaxWorkers,
}
} }
func (t *apiConfig) getTransitionWorkers() int { func (t *apiConfig) getTransitionWorkers() int {

View File

@ -38,6 +38,8 @@ const (
apiRemoteTransportDeadline = "remote_transport_deadline" apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum" apiListQuorum = "list_quorum"
apiReplicationPriority = "replication_priority" apiReplicationPriority = "replication_priority"
apiReplicationMaxWorkers = "replication_max_workers"
apiTransitionWorkers = "transition_workers" apiTransitionWorkers = "transition_workers"
apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval"
apiStaleUploadsExpiry = "stale_uploads_expiry" apiStaleUploadsExpiry = "stale_uploads_expiry"
@ -57,7 +59,7 @@ const (
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS"
EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL" EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL"
EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY" EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY"
EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL" EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL"
@ -107,6 +109,10 @@ var (
Key: apiReplicationPriority, Key: apiReplicationPriority,
Value: "auto", Value: "auto",
}, },
config.KV{
Key: apiReplicationMaxWorkers,
Value: "500",
},
config.KV{ config.KV{
Key: apiTransitionWorkers, Key: apiTransitionWorkers,
Value: "100", Value: "100",
@ -156,6 +162,7 @@ type Config struct {
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
ListQuorum string `json:"list_quorum"` ListQuorum string `json:"list_quorum"`
ReplicationPriority string `json:"replication_priority"` ReplicationPriority string `json:"replication_priority"`
ReplicationMaxWorkers int `json:"replication_max_workers"`
TransitionWorkers int `json:"transition_workers"` TransitionWorkers int `json:"transition_workers"`
StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"`
StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"`
@ -259,7 +266,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, fmt.Errorf("invalid value %v for replication_priority", replicationPriority) return cfg, fmt.Errorf("invalid value %v for replication_priority", replicationPriority)
} }
cfg.ReplicationPriority = replicationPriority cfg.ReplicationPriority = replicationPriority
replicationMaxWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationMaxWorkers, kvs.GetWithDefault(apiReplicationMaxWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
if replicationMaxWorkers <= 0 || replicationMaxWorkers > 500 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers should be between 1 and 500")
}
cfg.ReplicationMaxWorkers = replicationMaxWorkers
transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS))) transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS)))
if err != nil { if err != nil {
return cfg, err return cfg, err

View File

@ -68,6 +68,12 @@ var (
Optional: true, Optional: true,
Type: "string", Type: "string",
}, },
config.HelpKV{
Key: apiReplicationMaxWorkers,
Description: `set the maximum number of replication workers` + defaultHelpPostfix(apiReplicationMaxWorkers),
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: apiTransitionWorkers, Key: apiTransitionWorkers,
Description: `set the number of transition workers` + defaultHelpPostfix(apiTransitionWorkers), Description: `set the number of transition workers` + defaultHelpPostfix(apiTransitionWorkers),