From a22b4adf4c5cc3e4db13fe92da683ef1ce45cd5a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 18 Nov 2022 00:20:09 +0100 Subject: [PATCH] distribute replication ops based on names (#16083) --- cmd/bucket-replication.go | 231 ++++++++++++++++++++------------------ 1 file changed, 123 insertions(+), 108 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index ed02e6672..42bd6c79d 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -48,6 +48,7 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" + "github.com/zeebo/xxh3" ) const ( @@ -1509,29 +1510,33 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { - // atomics: + // atomic ops: activeWorkers int32 activeMRFWorkers int32 - objLayer ObjectLayer - ctx context.Context - mrfWorkerKillCh chan struct{} - workerKillCh chan struct{} - replicaCh chan ReplicateObjectInfo - replicaDeleteCh chan DeletedObjectReplicationInfo - mrfReplicaCh chan ReplicateObjectInfo - existingReplicaCh chan ReplicateObjectInfo - existingReplicaDeleteCh chan DeletedObjectReplicationInfo - mrfSaveCh chan MRFReplicateEntry - saveStateCh chan struct{} - workerSize int - mrfWorkerSize int - priority string - resyncer *replicationResyncer - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.RWMutex + objLayer ObjectLayer + ctx context.Context + priority string + mu sync.RWMutex + resyncer *replicationResyncer + + // workers: + workers []chan ReplicationWorkerOperation + existingWorkers chan ReplicationWorkerOperation + workerWg sync.WaitGroup + + // mrf: + mrfWorkerKillCh chan struct{} + mrfReplicaCh chan ReplicationWorkerOperation + mrfSaveCh chan MRFReplicateEntry + mrfWorkerSize int + saveStateCh chan struct{} + mrfWorkerWg sync.WaitGroup +} + +// ReplicationWorkerOperation is a shared interface of replication operations. +type ReplicationWorkerOperation interface { + ToMRFEntry() MRFReplicateEntry } const ( @@ -1572,25 +1577,24 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool workers = WorkerAutoDefault failedWorkers = MRFWorkerAutoDefault } + pool := &ReplicationPool{ - replicaCh: make(chan ReplicateObjectInfo, 100000), - replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), - mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), - workerKillCh: make(chan struct{}, workers), - mrfWorkerKillCh: make(chan struct{}, failedWorkers), - existingReplicaCh: make(chan ReplicateObjectInfo, 100000), - existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), - resyncer: newresyncer(), - mrfSaveCh: make(chan MRFReplicateEntry, 100000), - saveStateCh: make(chan struct{}, 1), - ctx: ctx, - objLayer: o, - priority: priority, + workers: make([]chan ReplicationWorkerOperation, 0, workers), + existingWorkers: make(chan ReplicationWorkerOperation, 100000), + mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000), + mrfWorkerKillCh: make(chan struct{}, failedWorkers), + resyncer: newresyncer(), + mrfSaveCh: make(chan MRFReplicateEntry, 100000), + saveStateCh: make(chan struct{}, 1), + ctx: ctx, + objLayer: o, + priority: priority, } - pool.ResizeWorkers(workers) + pool.ResizeWorkers(workers, 0) pool.ResizeFailedWorkers(failedWorkers) - go pool.AddExistingObjectReplicateWorker() + pool.workerWg.Add(1) + go pool.AddWorker(pool.existingWorkers, nil) go pool.resyncer.PersistToDisk(ctx, o) go pool.processMRF() go pool.persistMRF() @@ -1610,59 +1614,53 @@ func (p *ReplicationPool) AddMRFWorker() { if !ok { return } - atomic.AddInt32(&p.activeMRFWorkers, 1) - replicateObject(p.ctx, oi, p.objLayer) - atomic.AddInt32(&p.activeMRFWorkers, -1) + switch v := oi.(type) { + case ReplicateObjectInfo: + atomic.AddInt32(&p.activeMRFWorkers, 1) + replicateObject(p.ctx, v, p.objLayer) + atomic.AddInt32(&p.activeMRFWorkers, -1) + default: + logger.LogOnceIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type") + } case <-p.mrfWorkerKillCh: return } } } -// AddWorker adds a replication worker to the pool -func (p *ReplicationPool) AddWorker() { +// AddWorker adds a replication worker to the pool. +// An optional pointer to a tracker that will be atomically +// incremented when operations are running can be provided. +func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) { defer p.workerWg.Done() for { select { case <-p.ctx.Done(): return - case oi, ok := <-p.replicaCh: + case oi, ok := <-input: if !ok { return } - atomic.AddInt32(&p.activeWorkers, 1) - replicateObject(p.ctx, oi, p.objLayer) - atomic.AddInt32(&p.activeWorkers, -1) - - case doi, ok := <-p.replicaDeleteCh: - if !ok { - return + switch v := oi.(type) { + case ReplicateObjectInfo: + if opTracker != nil { + atomic.AddInt32(opTracker, 1) + } + replicateObject(p.ctx, v, p.objLayer) + if opTracker != nil { + atomic.AddInt32(opTracker, -1) + } + case DeletedObjectReplicationInfo: + if opTracker != nil { + atomic.AddInt32(opTracker, 1) + } + replicateDelete(p.ctx, v, p.objLayer) + if opTracker != nil { + atomic.AddInt32(opTracker, -1) + } + default: + logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type") } - atomic.AddInt32(&p.activeWorkers, 1) - replicateDelete(p.ctx, doi, p.objLayer) - atomic.AddInt32(&p.activeWorkers, -1) - case <-p.workerKillCh: - return - } - } -} - -// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd -func (p *ReplicationPool) AddExistingObjectReplicateWorker() { - for { - select { - case <-p.ctx.Done(): - return - case oi, ok := <-p.existingReplicaCh: - if !ok { - return - } - replicateObject(p.ctx, oi, p.objLayer) - case doi, ok := <-p.existingReplicaDeleteCh: - if !ok { - return - } - replicateDelete(p.ctx, doi, p.objLayer) } } } @@ -1677,19 +1675,28 @@ func (p *ReplicationPool) ActiveMRFWorkers() int { return int(atomic.LoadInt32(&p.activeMRFWorkers)) } -// ResizeWorkers sets replication workers pool to new size -func (p *ReplicationPool) ResizeWorkers(n int) { +// ResizeWorkers sets replication workers pool to new size. +// checkOld can be set to an expected value. +// If the worker count changed +func (p *ReplicationPool) ResizeWorkers(n, checkOld int) { p.mu.Lock() defer p.mu.Unlock() - for p.workerSize < n { - p.workerSize++ - p.workerWg.Add(1) - go p.AddWorker() + if (checkOld > 0 && len(p.workers) != checkOld) || n == len(p.workers) || n < 1 { + // Either already satisfied or worker count changed while we waited for the lock. + return } - for p.workerSize > n { - p.workerSize-- - go func() { p.workerKillCh <- struct{}{} }() + for len(p.workers) < n { + input := make(chan ReplicationWorkerOperation, 10000) + p.workers = append(p.workers, input) + + p.workerWg.Add(1) + go p.AddWorker(input, &p.activeWorkers) + } + for len(p.workers) > n { + worker := p.workers[len(p.workers)-1] + p.workers = p.workers[:len(p.workers)-1] + close(worker) } } @@ -1707,8 +1714,8 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) { default: workers = WorkerAutoDefault mrfWorkers = MRFWorkerAutoDefault - if p.workerSize < WorkerAutoDefault { - workers = int(math.Min(float64(p.workerSize+1), WorkerAutoDefault)) + if len(p.workers) < WorkerAutoDefault { + workers = int(math.Min(float64(len(p.workers)+1), WorkerAutoDefault)) } if p.mrfWorkerSize < MRFWorkerAutoDefault { mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault)) @@ -1716,7 +1723,7 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) { } p.priority = pri p.mu.Unlock() - p.ResizeWorkers(workers) + p.ResizeWorkers(workers, 0) p.ResizeFailedWorkers(mrfWorkers) } @@ -1736,28 +1743,38 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) { } } +// getWorkerCh gets a worker channel deterministically based on bucket and object names. +// Must be able to grab read lock from p. +func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation { + h := xxh3.HashString(bucket + object) + p.mu.RLock() + defer p.mu.RUnlock() + if len(p.workers) == 0 { + return nil + } + return p.workers[h%uint64(len(p.workers))] +} + func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { if p == nil { return } - var ch, healCh chan ReplicateObjectInfo + var ch, healCh chan<- ReplicationWorkerOperation switch ri.OpType { case replication.ExistingObjectReplicationType: - ch = p.existingReplicaCh + ch = p.existingWorkers case replication.HealReplicationType: ch = p.mrfReplicaCh - healCh = p.replicaCh + healCh = p.getWorkerCh(ri.Name, ri.Bucket) default: - ch = p.replicaCh + ch = p.getWorkerCh(ri.Name, ri.Bucket) } + if ch == nil && healCh == nil { + return + } + select { - case <-GlobalContext.Done(): - p.once.Do(func() { - close(p.replicaCh) - close(p.mrfReplicaCh) - close(p.existingReplicaCh) - close(p.saveStateCh) - }) + case <-p.ctx.Done(): case healCh <- ri: case ch <- ri: default: @@ -1773,9 +1790,10 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { default: if p.ActiveWorkers() < WorkerMaxLimit { p.mu.RLock() - workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) + existing := len(p.workers) p.mu.RUnlock() - p.ResizeWorkers(workers) + p.ResizeWorkers(workers, existing) } if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { p.mu.RLock() @@ -1802,22 +1820,18 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf if p == nil { return } - var ch chan DeletedObjectReplicationInfo + var ch chan<- ReplicationWorkerOperation switch doi.OpType { case replication.ExistingObjectReplicationType: - ch = p.existingReplicaDeleteCh + ch = p.existingWorkers case replication.HealReplicationType: fallthrough default: - ch = p.replicaDeleteCh + ch = p.getWorkerCh(doi.Bucket, doi.ObjectName) } select { - case <-GlobalContext.Done(): - p.once.Do(func() { - close(p.replicaDeleteCh) - close(p.existingReplicaDeleteCh) - }) + case <-p.ctx.Done(): case ch <- doi: default: globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) @@ -1832,9 +1846,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf default: if p.ActiveWorkers() < WorkerMaxLimit { p.mu.RLock() - workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) + existing := len(p.workers) p.mu.RUnlock() - p.ResizeWorkers(workers) + p.ResizeWorkers(workers, existing) } } }