fix: replication auto-scaling deadlock (#16084)

This commit is contained in:
Klaus Post 2022-11-17 16:35:02 +01:00 committed by GitHub
parent 8441a3bf5f
commit b7bb122be8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1509,6 +1509,10 @@ var (
// ReplicationPool describes replication pool // ReplicationPool describes replication pool
type ReplicationPool struct { type ReplicationPool struct {
// atomics:
activeWorkers int32
activeMRFWorkers int32
objLayer ObjectLayer objLayer ObjectLayer
ctx context.Context ctx context.Context
mrfWorkerKillCh chan struct{} mrfWorkerKillCh chan struct{}
@ -1522,8 +1526,6 @@ type ReplicationPool struct {
saveStateCh chan struct{} saveStateCh chan struct{}
workerSize int workerSize int
mrfWorkerSize int mrfWorkerSize int
activeWorkers int32
activeMRFWorkers int32
priority string priority string
resyncer *replicationResyncer resyncer *replicationResyncer
workerWg sync.WaitGroup workerWg sync.WaitGroup
@ -1761,22 +1763,27 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
default: default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
p.mu.RLock() p.mu.RLock()
switch p.priority { prio := p.priority
p.mu.RUnlock()
switch prio {
case "fast": case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem))
case "slow": case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default: default:
if p.ActiveWorkers() < WorkerMaxLimit { if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
p.mu.RUnlock()
p.ResizeWorkers(workers) p.ResizeWorkers(workers)
} }
if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { if p.ActiveMRFWorkers() < MRFWorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit)) workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit))
p.mu.RUnlock()
p.ResizeFailedWorkers(workers) p.ResizeFailedWorkers(workers)
} }
} }
p.mu.RUnlock()
} }
} }
@ -1815,18 +1822,21 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
default: default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
p.mu.RLock() p.mu.RLock()
switch p.priority { prio := p.priority
p.mu.RUnlock()
switch prio {
case "fast": case "fast":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem))
case "slow": case "slow":
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem))
default: default:
if p.ActiveWorkers() < WorkerMaxLimit { if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
p.mu.RUnlock()
p.ResizeWorkers(workers) p.ResizeWorkers(workers)
} }
} }
p.mu.RUnlock()
} }
} }