replication: make large workers configurable (#20077)

This PR also improves throttling by reducing tokens requested
from rate limiter based on available tokens to avoid exceeding
throttle wait deadlines
This commit is contained in:
Poorna 2024-07-12 07:57:31 -07:00 committed by GitHub
parent ef802f2b2c
commit 989c318a28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 129 additions and 55 deletions

View File

@ -1803,15 +1803,18 @@ var (
type ReplicationPool struct {
// atomic ops:
activeWorkers int32
activeLrgWorkers int32
activeMRFWorkers int32
objLayer ObjectLayer
ctx context.Context
priority string
maxWorkers int
mu sync.RWMutex
mrfMU sync.Mutex
resyncer *replicationResyncer
objLayer ObjectLayer
ctx context.Context
priority string
maxWorkers int
maxLWorkers int
mu sync.RWMutex
mrfMU sync.Mutex
resyncer *replicationResyncer
// workers:
workers []chan ReplicationWorkerOperation
@ -1882,9 +1885,13 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
if maxWorkers > 0 && failedWorkers > maxWorkers {
failedWorkers = maxWorkers
}
maxLWorkers := LargeWorkerCount
if opts.MaxLWorkers > 0 {
maxLWorkers = opts.MaxLWorkers
}
pool := &ReplicationPool{
workers: make([]chan ReplicationWorkerOperation, 0, workers),
lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount),
lrgworkers: make([]chan ReplicationWorkerOperation, 0, maxLWorkers),
mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
resyncer: newresyncer(),
@ -1894,9 +1901,10 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
objLayer: o,
priority: priority,
maxWorkers: maxWorkers,
maxLWorkers: maxLWorkers,
}
pool.AddLargeWorkers()
pool.ResizeLrgWorkers(maxLWorkers, 0)
pool.ResizeWorkers(workers, 0)
pool.ResizeFailedWorkers(failedWorkers)
go pool.resyncer.PersistToDisk(ctx, o)
@ -1975,23 +1983,8 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT
}
}
// AddLargeWorkers adds a static number of workers to handle large uploads
func (p *ReplicationPool) AddLargeWorkers() {
for i := 0; i < LargeWorkerCount; i++ {
p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000))
i := i
go p.AddLargeWorker(p.lrgworkers[i])
}
go func() {
<-p.ctx.Done()
for i := 0; i < LargeWorkerCount; i++ {
xioutil.SafeClose(p.lrgworkers[i])
}
}()
}
// AddLargeWorker adds a replication worker to the static pool for large uploads.
func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation) {
func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
for {
select {
case <-p.ctx.Done():
@ -2002,11 +1995,23 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation
}
switch v := oi.(type) {
case ReplicateObjectInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
replicateObject(p.ctx, v, p.objLayer)
globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
case DeletedObjectReplicationInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
replicateDelete(p.ctx, v, p.objLayer)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
default:
bugLogIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
}
@ -2014,6 +2019,30 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation
}
}
// ResizeLrgWorkers sets replication workers pool for large transfers(>=128MiB) to new size.
// checkOld can be set to an expected value.
// If the worker count changed
func (p *ReplicationPool) ResizeLrgWorkers(n, checkOld int) {
p.mu.Lock()
defer p.mu.Unlock()
if (checkOld > 0 && len(p.lrgworkers) != checkOld) || n == len(p.lrgworkers) || n < 1 {
// Either already satisfied or worker count changed while we waited for the lock.
return
}
for len(p.lrgworkers) < n {
input := make(chan ReplicationWorkerOperation, 100000)
p.lrgworkers = append(p.lrgworkers, input)
go p.AddLargeWorker(input, &p.activeLrgWorkers)
}
for len(p.lrgworkers) > n {
worker := p.lrgworkers[len(p.lrgworkers)-1]
p.lrgworkers = p.lrgworkers[:len(p.lrgworkers)-1]
xioutil.SafeClose(worker)
}
}
// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
return int(atomic.LoadInt32(&p.activeWorkers))
@ -2024,6 +2053,11 @@ func (p *ReplicationPool) ActiveMRFWorkers() int {
return int(atomic.LoadInt32(&p.activeMRFWorkers))
}
// ActiveLrgWorkers returns the number of active workers handling traffic > 128MiB object size.
func (p *ReplicationPool) ActiveLrgWorkers() int {
return int(atomic.LoadInt32(&p.activeLrgWorkers))
}
// ResizeWorkers sets replication workers pool to new size.
// checkOld can be set to an expected value.
// If the worker count changed
@ -2049,7 +2083,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
}
// ResizeWorkerPriority sets replication failed workers pool size
func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) {
func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers, maxLWorkers int) {
var workers, mrfWorkers int
p.mu.Lock()
switch pri {
@ -2076,11 +2110,15 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) {
if maxWorkers > 0 && mrfWorkers > maxWorkers {
mrfWorkers = maxWorkers
}
if maxLWorkers <= 0 {
maxLWorkers = LargeWorkerCount
}
p.priority = pri
p.maxWorkers = maxWorkers
p.mu.Unlock()
p.ResizeWorkers(workers, 0)
p.ResizeFailedWorkers(mrfWorkers)
p.ResizeLrgWorkers(maxLWorkers, 0)
}
// ResizeFailedWorkers sets replication failed workers pool size
@ -2127,6 +2165,15 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case p.lrgworkers[h%LargeWorkerCount] <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
p.mu.RLock()
maxLWorkers := p.maxLWorkers
existing := len(p.lrgworkers)
p.mu.RUnlock()
maxLWorkers = min(maxLWorkers, LargeWorkerCount)
if p.ActiveLrgWorkers() < maxLWorkers {
workers := min(existing+1, maxLWorkers)
p.ResizeLrgWorkers(workers, existing)
}
}
return
}
@ -2229,8 +2276,9 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
}
type replicationPoolOpts struct {
Priority string
MaxWorkers int
Priority string
MaxWorkers int
MaxLWorkers int
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {

View File

@ -39,14 +39,15 @@ import (
type apiConfig struct {
mu sync.RWMutex
requestsDeadline time.Duration
requestsPool chan struct{}
clusterDeadline time.Duration
listQuorum string
corsAllowOrigins []string
replicationPriority string
replicationMaxWorkers int
transitionWorkers int
requestsDeadline time.Duration
requestsPool chan struct{}
clusterDeadline time.Duration
listQuorum string
corsAllowOrigins []string
replicationPriority string
replicationMaxWorkers int
replicationMaxLWorkers int
transitionWorkers int
staleUploadsExpiry time.Duration
staleUploadsCleanupInterval time.Duration
@ -170,11 +171,12 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int, legacy bool) {
}
t.listQuorum = listQuorum
if globalReplicationPool != nil &&
(cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers) {
globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers)
(cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers || cfg.ReplicationMaxLWorkers != t.replicationMaxLWorkers) {
globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers)
}
t.replicationPriority = cfg.ReplicationPriority
t.replicationMaxWorkers = cfg.ReplicationMaxWorkers
t.replicationMaxLWorkers = cfg.ReplicationMaxLWorkers
// N B api.transition_workers will be deprecated
if globalTransitionState != nil {
@ -381,14 +383,16 @@ func (t *apiConfig) getReplicationOpts() replicationPoolOpts {
if t.replicationPriority == "" {
return replicationPoolOpts{
Priority: "auto",
MaxWorkers: WorkerMaxLimit,
Priority: "auto",
MaxWorkers: WorkerMaxLimit,
MaxLWorkers: LargeWorkerCount,
}
}
return replicationPoolOpts{
Priority: t.replicationPriority,
MaxWorkers: t.replicationMaxWorkers,
Priority: t.replicationPriority,
MaxWorkers: t.replicationMaxWorkers,
MaxLWorkers: t.replicationMaxLWorkers,
}
}

View File

@ -74,12 +74,16 @@ func (r *MonitoredReader) Read(buf []byte) (n int, err error) {
need = int(math.Min(float64(b), float64(need)))
tokens = need
}
// reduce tokens requested according to availability
av := int(r.throttle.Tokens())
if av < tokens && av > 0 {
tokens = av
need = int(math.Min(float64(tokens), float64(need)))
}
err = r.throttle.WaitN(r.ctx, tokens)
if err != nil {
return
}
n, err = r.r.Read(buf[:need])
if err != nil {
r.lastErr = err

View File

@ -40,6 +40,7 @@ const (
apiListQuorum = "list_quorum"
apiReplicationPriority = "replication_priority"
apiReplicationMaxWorkers = "replication_max_workers"
apiReplicationMaxLWorkers = "replication_max_lrg_workers"
apiTransitionWorkers = "transition_workers"
apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval"
@ -52,16 +53,18 @@ const (
apiSyncEvents = "sync_events"
apiObjectMaxVersions = "object_max_versions"
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS"
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS"
EnvAPIReplicationMaxLWorkers = "MINIO_API_REPLICATION_MAX_LRG_WORKERS"
EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL"
EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY"
EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL"
@ -117,6 +120,10 @@ var (
Key: apiReplicationMaxWorkers,
Value: "500",
},
config.KV{
Key: apiReplicationMaxLWorkers,
Value: "10",
},
config.KV{
Key: apiTransitionWorkers,
Value: "100",
@ -171,6 +178,7 @@ type Config struct {
ListQuorum string `json:"list_quorum"`
ReplicationPriority string `json:"replication_priority"`
ReplicationMaxWorkers int `json:"replication_max_workers"`
ReplicationMaxLWorkers int `json:"replication_max_lrg_workers"`
TransitionWorkers int `json:"transition_workers"`
StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"`
StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"`
@ -280,11 +288,21 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err != nil {
return cfg, err
}
if replicationMaxWorkers <= 0 || replicationMaxWorkers > 500 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers should be between 1 and 500")
}
cfg.ReplicationMaxWorkers = replicationMaxWorkers
replicationMaxLWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationMaxLWorkers, kvs.GetWithDefault(apiReplicationMaxLWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
if replicationMaxLWorkers <= 0 || replicationMaxLWorkers > 10 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers for transfers >=128MiB should be between 1 and 10 per node")
}
cfg.ReplicationMaxLWorkers = replicationMaxLWorkers
transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS)))
if err != nil {
return cfg, err