replication: fix queuing of large uploads (#17831)

Fixes regression from #17687
This commit is contained in:
Poorna 2023-08-10 15:48:42 -07:00 committed by GitHub
parent 0d2b7bf94d
commit dfaf735073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1786,6 +1786,8 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT
func (p *ReplicationPool) AddLargeWorkers() {
for i := 0; i < LargeWorkerCount; i++ {
p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000))
i := i
go p.AddLargeWorker(p.lrgworkers[i])
}
go func() {
<-p.ctx.Done()
@ -1795,6 +1797,28 @@ func (p *ReplicationPool) AddLargeWorkers() {
}()
}
// AddLargeWorker adds a replication worker to the static pool for large uploads.
func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation) {
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-input:
if !ok {
return
}
switch v := oi.(type) {
case ReplicateObjectInfo:
replicateObject(p.ctx, v, p.objLayer)
case DeletedObjectReplicationInfo:
replicateDelete(p.ctx, v, p.objLayer)
default:
logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
}
}
}
}
// ActiveWorkers returns the number of active workers handling replication traffic.
func (p *ReplicationPool) ActiveWorkers() int {
return int(atomic.LoadInt32(&p.activeWorkers))