replication: limit larger uploads to a subset of workers (#17687)

Limit large uploads (> 128MiB)  to a max of 10 workers, intent is to avoid
larger uploads from using all replication bandwidth, giving room for smaller
uploads to sync faster.
This commit is contained in:
Poorna 2023-07-25 20:02:02 -07:00 committed by GitHub
parent e7b60c4d65
commit 1a42693d68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1645,6 +1645,7 @@ type ReplicationPool struct {
// workers:
workers []chan ReplicationWorkerOperation
lrgworkers []chan ReplicationWorkerOperation
existingWorkers chan ReplicationWorkerOperation
// mrf:
@ -1679,6 +1680,9 @@ const (
// MRFWorkerAutoDefault is default number of mrf workers for "auto" mode
MRFWorkerAutoDefault = 4
// LargeWorkerCount is default number of workers assigned to large uploads ( >= 128MiB)
LargeWorkerCount = 10
)
// NewReplicationPool creates a pool of replication workers of specified size
@ -1702,7 +1706,9 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool := &ReplicationPool{
workers: make([]chan ReplicationWorkerOperation, 0, workers),
lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount),
existingWorkers: make(chan ReplicationWorkerOperation, 100000),
mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
resyncer: newresyncer(),
@ -1714,6 +1720,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
priority: priority,
}
pool.AddLargeWorkers()
pool.ResizeWorkers(workers, 0)
pool.ResizeFailedWorkers(failedWorkers)
go pool.AddWorker(pool.existingWorkers, nil)
@ -1785,6 +1792,19 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT
}
}
// AddLargeWorkers adds a static number of workers to handle large uploads
func (p *ReplicationPool) AddLargeWorkers() {
for i := 0; i < LargeWorkerCount; i++ {
p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000))
}
go func() {
<-p.ctx.Done()
for i := 0; i < LargeWorkerCount; i++ {
close(p.lrgworkers[i])
}
}()
}
// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
return int(atomic.LoadInt32(&p.activeWorkers))
@ -1861,9 +1881,14 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) {
}
}
const (
minLargeObjSize = 128 * humanize.MiByte // 128MiB
)
// getWorkerCh gets a worker channel deterministically based on bucket and object names.
// Must be able to grab read lock from p.
func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation {
func (p *ReplicationPool) getWorkerCh(bucket, object string, sz int64) chan<- ReplicationWorkerOperation {
h := xxh3.HashString(bucket + object)
p.mu.RLock()
defer p.mu.RUnlock()
@ -1877,15 +1902,27 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
// if object is large, queue it to a static set of large workers
if ri.Size >= int64(minLargeObjSize) {
h := xxh3.HashString(ri.Bucket + ri.Name)
select {
case <-p.ctx.Done():
case p.lrgworkers[h%LargeWorkerCount] <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
}
return
}
var ch, healCh chan<- ReplicationWorkerOperation
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingWorkers
case replication.HealReplicationType:
ch = p.mrfReplicaCh
healCh = p.getWorkerCh(ri.Name, ri.Bucket)
healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size)
default:
ch = p.getWorkerCh(ri.Name, ri.Bucket)
ch = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size)
}
if ch == nil && healCh == nil {
return
@ -1945,7 +1982,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
case replication.HealReplicationType:
fallthrough
default:
ch = p.getWorkerCh(doi.Bucket, doi.ObjectName)
ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0)
}
select {