From 1a42693d688ee9b4589a9cf0a604c953cf7df1cf Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 25 Jul 2023 20:02:02 -0700 Subject: [PATCH] replication: limit larger uploads to a subset of workers (#17687) Limit large uploads (> 128MiB) to a max of 10 workers, intent is to avoid larger uploads from using all replication bandwidth, giving room for smaller uploads to sync faster. --- cmd/bucket-replication.go | 45 +++++++++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 0f06dec86..e7b2fb37e 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1645,6 +1645,7 @@ type ReplicationPool struct { // workers: workers []chan ReplicationWorkerOperation + lrgworkers []chan ReplicationWorkerOperation existingWorkers chan ReplicationWorkerOperation // mrf: @@ -1679,6 +1680,9 @@ const ( // MRFWorkerAutoDefault is default number of mrf workers for "auto" mode MRFWorkerAutoDefault = 4 + + // LargeWorkerCount is default number of workers assigned to large uploads ( >= 128MiB) + LargeWorkerCount = 10 ) // NewReplicationPool creates a pool of replication workers of specified size @@ -1702,7 +1706,9 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool := &ReplicationPool{ workers: make([]chan ReplicationWorkerOperation, 0, workers), + lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), existingWorkers: make(chan ReplicationWorkerOperation, 100000), + mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000), mrfWorkerKillCh: make(chan struct{}, failedWorkers), resyncer: newresyncer(), @@ -1714,6 +1720,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool priority: priority, } + pool.AddLargeWorkers() pool.ResizeWorkers(workers, 0) pool.ResizeFailedWorkers(failedWorkers) go pool.AddWorker(pool.existingWorkers, nil) @@ -1785,6 +1792,19 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT } } +// AddLargeWorkers adds a static number of workers to handle large uploads +func (p *ReplicationPool) AddLargeWorkers() { + for i := 0; i < LargeWorkerCount; i++ { + p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000)) + } + go func() { + <-p.ctx.Done() + for i := 0; i < LargeWorkerCount; i++ { + close(p.lrgworkers[i]) + } + }() +} + // ActiveWorkers returns the number of active workers handling replication traffic. func (p *ReplicationPool) ActiveWorkers() int { return int(atomic.LoadInt32(&p.activeWorkers)) @@ -1861,9 +1881,14 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) { } } +const ( + minLargeObjSize = 128 * humanize.MiByte // 128MiB +) + // getWorkerCh gets a worker channel deterministically based on bucket and object names. // Must be able to grab read lock from p. -func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation { + +func (p *ReplicationPool) getWorkerCh(bucket, object string, sz int64) chan<- ReplicationWorkerOperation { h := xxh3.HashString(bucket + object) p.mu.RLock() defer p.mu.RUnlock() @@ -1877,15 +1902,27 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { if p == nil { return } + // if object is large, queue it to a static set of large workers + if ri.Size >= int64(minLargeObjSize) { + h := xxh3.HashString(ri.Bucket + ri.Name) + select { + case <-p.ctx.Done(): + case p.lrgworkers[h%LargeWorkerCount] <- ri: + default: + globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + } + return + } + var ch, healCh chan<- ReplicationWorkerOperation switch ri.OpType { case replication.ExistingObjectReplicationType: ch = p.existingWorkers case replication.HealReplicationType: ch = p.mrfReplicaCh - healCh = p.getWorkerCh(ri.Name, ri.Bucket) + healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size) default: - ch = p.getWorkerCh(ri.Name, ri.Bucket) + ch = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size) } if ch == nil && healCh == nil { return @@ -1945,7 +1982,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf case replication.HealReplicationType: fallthrough default: - ch = p.getWorkerCh(doi.Bucket, doi.ObjectName) + ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0) } select {