From dfaf7350738bbec2970400dca72aaf3944e0306d Mon Sep 17 00:00:00 2001 From: Poorna Date: Thu, 10 Aug 2023 15:48:42 -0700 Subject: [PATCH] replication: fix queuing of large uploads (#17831) Fixes regression from #17687 --- cmd/bucket-replication.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 0f77219d5..b160cc2f9 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1786,6 +1786,8 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT func (p *ReplicationPool) AddLargeWorkers() { for i := 0; i < LargeWorkerCount; i++ { p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000)) + i := i + go p.AddLargeWorker(p.lrgworkers[i]) } go func() { <-p.ctx.Done() @@ -1795,6 +1797,28 @@ func (p *ReplicationPool) AddLargeWorkers() { }() } +// AddLargeWorker adds a replication worker to the static pool for large uploads. +func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation) { + for { + select { + case <-p.ctx.Done(): + return + case oi, ok := <-input: + if !ok { + return + } + switch v := oi.(type) { + case ReplicateObjectInfo: + replicateObject(p.ctx, v, p.objLayer) + case DeletedObjectReplicationInfo: + replicateDelete(p.ctx, v, p.objLayer) + default: + logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type") + } + } + } +} + // ActiveWorkers returns the number of active workers handling replication traffic. func (p *ReplicationPool) ActiveWorkers() int { return int(atomic.LoadInt32(&p.activeWorkers))