Add auto configuration of replication workers (#15636)

This commit is contained in:
Poorna 2022-09-24 16:20:28 -07:00 committed by GitHub
parent 5fd5ddea23
commit 8ea6fb368d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 92 deletions

View File

@ -23,11 +23,13 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"path"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
@ -1513,21 +1515,60 @@ type ReplicationPool struct {
workerSize int
mrfWorkerSize int
activeWorkers int32
activeMRFWorkers int32
priority string
resyncState replicationResyncState
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.Mutex
mu sync.RWMutex
}
const (
// WorkerMaxLimit max number of workers per node for "fast" mode
WorkerMaxLimit = 500
// WorkerMinLimit min number of workers per node for "slow" mode
WorkerMinLimit = 50
// WorkerAutoDefault is default number of workers for "auto" mode
WorkerAutoDefault = 100
// MRFWorkerMaxLimit max number of mrf workers per node for "fast" mode
MRFWorkerMaxLimit = 8
// MRFWorkerMinLimit min number of mrf workers per node for "slow" mode
MRFWorkerMinLimit = 2
// MRFWorkerAutoDefault is default number of mrf workers for "auto" mode
MRFWorkerAutoDefault = 4
)
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
var workers, failedWorkers int
priority := "auto"
if opts.Priority != "" {
priority = opts.Priority
}
switch priority {
case "fast":
workers = WorkerMaxLimit
failedWorkers = MRFWorkerMaxLimit
case "slow":
workers = WorkerMinLimit
failedWorkers = MRFWorkerMinLimit
default:
workers = WorkerAutoDefault
failedWorkers = MRFWorkerAutoDefault
}
pool := &ReplicationPool{
replicaCh: make(chan ReplicateObjectInfo, 100000),
replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
workerKillCh: make(chan struct{}, opts.Workers),
mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers),
workerKillCh: make(chan struct{}, workers),
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)},
@ -1535,10 +1576,11 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
saveStateCh: make(chan struct{}, 1),
ctx: ctx,
objLayer: o,
priority: priority,
}
pool.ResizeWorkers(opts.Workers)
pool.ResizeFailedWorkers(opts.FailedWorkers)
pool.ResizeWorkers(workers)
pool.ResizeFailedWorkers(failedWorkers)
go pool.AddExistingObjectReplicateWorker()
go pool.updateResyncStatus(ctx, o)
go pool.processMRF()
@ -1559,7 +1601,9 @@ func (p *ReplicationPool) AddMRFWorker() {
if !ok {
return
}
atomic.AddInt32(&p.activeMRFWorkers, 1)
replicateObject(p.ctx, oi, p.objLayer)
atomic.AddInt32(&p.activeMRFWorkers, -1)
case <-p.mrfWorkerKillCh:
return
}
@ -1577,12 +1621,17 @@ func (p *ReplicationPool) AddWorker() {
if !ok {
return
}
atomic.AddInt32(&p.activeWorkers, 1)
replicateObject(p.ctx, oi, p.objLayer)
atomic.AddInt32(&p.activeWorkers, -1)
case doi, ok := <-p.replicaDeleteCh:
if !ok {
return
}
atomic.AddInt32(&p.activeWorkers, 1)
replicateDelete(p.ctx, doi, p.objLayer)
atomic.AddInt32(&p.activeWorkers, -1)
case <-p.workerKillCh:
return
}
@ -1609,6 +1658,16 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
}
}
// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
return int(atomic.LoadInt32(&p.activeWorkers))
}
// ActiveMRFWorkers returns the number of active workers handling replication failures.
func (p *ReplicationPool) ActiveMRFWorkers() int {
return int(atomic.LoadInt32(&p.activeMRFWorkers))
}
// ResizeWorkers sets replication workers pool to new size
func (p *ReplicationPool) ResizeWorkers(n int) {
p.mu.Lock()
@ -1625,6 +1684,33 @@ func (p *ReplicationPool) ResizeWorkers(n int) {
}
}
// ResizeWorkerPriority sets replication failed workers pool size
func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
var workers, mrfWorkers int
p.mu.Lock()
switch pri {
case "fast":
workers = WorkerMaxLimit
mrfWorkers = MRFWorkerMaxLimit
case "slow":
workers = WorkerMinLimit
mrfWorkers = MRFWorkerMinLimit
default:
workers = WorkerAutoDefault
mrfWorkers = MRFWorkerAutoDefault
if p.workerSize < WorkerAutoDefault {
workers = int(math.Min(float64(p.workerSize+1), WorkerAutoDefault))
}
if p.mrfWorkerSize < MRFWorkerAutoDefault {
mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
}
}
p.priority = pri
p.mu.Unlock()
p.ResizeWorkers(workers)
p.ResizeFailedWorkers(mrfWorkers)
}
// ResizeFailedWorkers sets replication failed workers pool size
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
p.mu.Lock()
@ -1641,14 +1727,6 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) {
}
}
// suggestedWorkers recommends an increase in number of workers to meet replication load.
func (p *ReplicationPool) suggestedWorkers(failQueue bool) int {
if failQueue {
return int(float64(p.mrfWorkerSize) * ReplicationWorkerMultiplier)
}
return int(float64(p.workerSize) * ReplicationWorkerMultiplier)
}
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
@ -1675,7 +1753,23 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case ch <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem))
p.mu.RLock()
switch p.priority {
case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem))
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
if p.ActiveWorkers() < WorkerMaxLimit {
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
p.ResizeWorkers(workers)
}
if p.ActiveMRFWorkers() < MRFWorkerMaxLimit {
workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit))
p.ResizeFailedWorkers(workers)
}
}
p.mu.RUnlock()
}
}
@ -1713,19 +1807,29 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
case ch <- doi:
default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem))
p.mu.RLock()
switch p.priority {
case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem))
case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default:
if p.ActiveWorkers() < WorkerMaxLimit {
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
p.ResizeWorkers(workers)
}
}
p.mu.RUnlock()
}
}
type replicationPoolOpts struct {
Workers int
FailedWorkers int
Priority string
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
Workers: globalAPIConfig.getReplicationWorkers(),
FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(),
Priority: globalAPIConfig.getReplicationPriority(),
})
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
go globalReplicationStats.loadInitialReplicationMetrics(ctx)

View File

@ -42,8 +42,7 @@ type apiConfig struct {
corsAllowOrigins []string
// total drives per erasure set across pools.
totalDriveCount int
replicationWorkers int
replicationFailedWorkers int
replicationPriority string
transitionWorkers int
staleUploadsExpiry time.Duration
@ -137,12 +136,11 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
t.requestsDeadline = cfg.RequestsDeadline
t.listQuorum = cfg.ListQuorum
if globalReplicationPool != nil &&
cfg.ReplicationWorkers != t.replicationWorkers {
globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers)
globalReplicationPool.ResizeWorkers(cfg.ReplicationWorkers)
cfg.ReplicationPriority != t.replicationPriority {
globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority)
}
t.replicationFailedWorkers = cfg.ReplicationFailedWorkers
t.replicationWorkers = cfg.ReplicationWorkers
t.replicationPriority = cfg.ReplicationPriority
if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers {
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
}
@ -289,18 +287,11 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc {
}
}
func (t *apiConfig) getReplicationFailedWorkers() int {
func (t *apiConfig) getReplicationPriority() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.replicationFailedWorkers
}
func (t *apiConfig) getReplicationWorkers() int {
t.mu.RLock()
defer t.mu.RUnlock()
return t.replicationWorkers
return t.replicationPriority
}
func (t *apiConfig) getTransitionWorkers() int {

View File

@ -37,8 +37,7 @@ const (
apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum"
apiReplicationWorkers = "replication_workers"
apiReplicationFailedWorkers = "replication_failed_workers"
apiReplicationPriority = "replication_priority"
apiTransitionWorkers = "transition_workers"
apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval"
apiStaleUploadsExpiry = "stale_uploads_expiry"
@ -53,9 +52,7 @@ const (
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default "on"
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL"
EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY"
@ -68,7 +65,13 @@ const (
// Deprecated key and ENVs
const (
apiReadyDeadline = "ready_deadline"
apiReplicationWorkers = "replication_workers"
apiReplicationFailedWorkers = "replication_failed_workers"
EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE"
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
)
// DefaultKVS - default storage class config
@ -99,12 +102,8 @@ var (
Value: "strict",
},
config.KV{
Key: apiReplicationWorkers,
Value: "250",
},
config.KV{
Key: apiReplicationFailedWorkers,
Value: "8",
Key: apiReplicationPriority,
Value: "auto",
},
config.KV{
Key: apiTransitionWorkers,
@ -141,8 +140,7 @@ type Config struct {
CorsAllowOrigin []string `json:"cors_allow_origin"`
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
ListQuorum string `json:"list_quorum"`
ReplicationWorkers int `json:"replication_workers"`
ReplicationFailedWorkers int `json:"replication_failed_workers"`
ReplicationPriority string `json:"replication_priority"`
TransitionWorkers int `json:"transition_workers"`
StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"`
StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"`
@ -167,6 +165,8 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
// remove this since we have removed this already.
kvs.Delete(apiReadyDeadline)
kvs.Delete("extend_list_cache_life")
kvs.Delete(apiReplicationWorkers)
kvs.Delete(apiReplicationFailedWorkers)
if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil {
return cfg, err
@ -206,22 +206,11 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, errors.New("invalid value for list strict quorum")
}
replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.GetWithDefault(apiReplicationWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
if replicationWorkers <= 0 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1")
}
replicationFailedWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationFailedWorkers, kvs.GetWithDefault(apiReplicationFailedWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
if replicationFailedWorkers <= 0 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication failed workers should be 1")
replicationPriority := env.Get(EnvAPIReplicationPriority, kvs.GetWithDefault(apiReplicationPriority, DefaultKVS))
switch replicationPriority {
case "slow", "fast", "auto":
default:
return cfg, errors.New("invalid value for replication priority")
}
transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS)))
@ -263,8 +252,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
CorsAllowOrigin: corsAllowOrigin,
RemoteTransportDeadline: remoteTransportDeadline,
ListQuorum: listQuorum,
ReplicationWorkers: replicationWorkers,
ReplicationFailedWorkers: replicationFailedWorkers,
ReplicationPriority: replicationPriority,
TransitionWorkers: transitionWorkers,
StaleUploadsCleanupInterval: staleUploadsCleanupInterval,
StaleUploadsExpiry: staleUploadsExpiry,

View File

@ -63,16 +63,10 @@ var (
Type: "string",
},
config.HelpKV{
Key: apiReplicationWorkers,
Description: `set the number of replication workers` + defaultHelpPostfix(apiReplicationWorkers),
Key: apiReplicationPriority,
Description: `set replication priority` + defaultHelpPostfix(apiReplicationPriority),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: apiReplicationFailedWorkers,
Description: `set the number of replication workers for recently failed replicas` + defaultHelpPostfix(apiReplicationFailedWorkers),
Optional: true,
Type: "number",
Type: "string",
},
config.HelpKV{
Key: apiTransitionWorkers,