replication: queue existing objects to same workers as incoming (#18020)

Previously existing objects were queued to single worker and MRF re-queues
are also handled by same worker - this does not fully use the available
bandwidth in case there is no incoming workload.
This commit is contained in:
Poorna 2023-09-12 21:59:15 -07:00 committed by GitHub
parent c8a57a8fa2
commit 96fbf18201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 12 deletions

View File

@ -1663,7 +1663,6 @@ type ReplicationPool struct {
// workers: // workers:
workers []chan ReplicationWorkerOperation workers []chan ReplicationWorkerOperation
lrgworkers []chan ReplicationWorkerOperation lrgworkers []chan ReplicationWorkerOperation
existingWorkers chan ReplicationWorkerOperation
// mrf: // mrf:
mrfWorkerKillCh chan struct{} mrfWorkerKillCh chan struct{}
@ -1723,8 +1722,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool := &ReplicationPool{ pool := &ReplicationPool{
workers: make([]chan ReplicationWorkerOperation, 0, workers), workers: make([]chan ReplicationWorkerOperation, 0, workers),
lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount),
existingWorkers: make(chan ReplicationWorkerOperation, 100000),
mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000), mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
mrfWorkerKillCh: make(chan struct{}, failedWorkers), mrfWorkerKillCh: make(chan struct{}, failedWorkers),
resyncer: newresyncer(), resyncer: newresyncer(),
@ -1738,7 +1735,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool.AddLargeWorkers() pool.AddLargeWorkers()
pool.ResizeWorkers(workers, 0) pool.ResizeWorkers(workers, 0)
pool.ResizeFailedWorkers(failedWorkers) pool.ResizeFailedWorkers(failedWorkers)
go pool.AddWorker(pool.existingWorkers, nil)
go pool.resyncer.PersistToDisk(ctx, o) go pool.resyncer.PersistToDisk(ctx, o)
go pool.processMRF() go pool.processMRF()
go pool.persistMRF() go pool.persistMRF()
@ -1965,9 +1961,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
var ch, healCh chan<- ReplicationWorkerOperation var ch, healCh chan<- ReplicationWorkerOperation
switch ri.OpType { switch ri.OpType {
case replication.ExistingObjectReplicationType: case replication.HealReplicationType, replication.ExistingObjectReplicationType:
ch = p.existingWorkers
case replication.HealReplicationType:
ch = p.mrfReplicaCh ch = p.mrfReplicaCh
healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size) healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size)
default: default:
@ -2026,9 +2020,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
} }
var ch chan<- ReplicationWorkerOperation var ch chan<- ReplicationWorkerOperation
switch doi.OpType { switch doi.OpType {
case replication.ExistingObjectReplicationType: case replication.HealReplicationType, replication.ExistingObjectReplicationType:
ch = p.existingWorkers
case replication.HealReplicationType:
fallthrough fallthrough
default: default:
ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0) ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0)