distribute replication ops based on names (#16083)

This commit is contained in:
Klaus Post 2022-11-18 00:20:09 +01:00 committed by GitHub
parent b7bb122be8
commit a22b4adf4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -48,6 +48,7 @@ import (
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/zeebo/xxh3"
)
const (
@ -1509,29 +1510,33 @@ var (
// ReplicationPool describes replication pool
type ReplicationPool struct {
// atomics:
// atomic ops:
activeWorkers int32
activeMRFWorkers int32
objLayer ObjectLayer
ctx context.Context
mrfWorkerKillCh chan struct{}
workerKillCh chan struct{}
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectReplicationInfo
mrfReplicaCh chan ReplicateObjectInfo
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
mrfSaveCh chan MRFReplicateEntry
saveStateCh chan struct{}
workerSize int
mrfWorkerSize int
priority string
resyncer *replicationResyncer
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.RWMutex
resyncer *replicationResyncer
// workers:
workers []chan ReplicationWorkerOperation
existingWorkers chan ReplicationWorkerOperation
workerWg sync.WaitGroup
// mrf:
mrfWorkerKillCh chan struct{}
mrfReplicaCh chan ReplicationWorkerOperation
mrfSaveCh chan MRFReplicateEntry
mrfWorkerSize int
saveStateCh chan struct{}
mrfWorkerWg sync.WaitGroup
}
// ReplicationWorkerOperation is a shared interface of replication operations.
type ReplicationWorkerOperation interface {
ToMRFEntry() MRFReplicateEntry
}
const (
@ -1572,14 +1577,12 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
workers = WorkerAutoDefault
failedWorkers = MRFWorkerAutoDefault
}
pool := &ReplicationPool{
replicaCh: make(chan ReplicateObjectInfo, 100000),
replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
workerKillCh: make(chan struct{}, workers),
workers: make([]chan ReplicationWorkerOperation, 0, workers),
existingWorkers: make(chan ReplicationWorkerOperation, 100000),
mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
resyncer: newresyncer(),
mrfSaveCh: make(chan MRFReplicateEntry, 100000),
saveStateCh: make(chan struct{}, 1),
@ -1588,9 +1591,10 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
priority: priority,
}
pool.ResizeWorkers(workers)
pool.ResizeWorkers(workers, 0)
pool.ResizeFailedWorkers(failedWorkers)
go pool.AddExistingObjectReplicateWorker()
pool.workerWg.Add(1)
go pool.AddWorker(pool.existingWorkers, nil)
go pool.resyncer.PersistToDisk(ctx, o)
go pool.processMRF()
go pool.persistMRF()
@ -1610,59 +1614,53 @@ func (p *ReplicationPool) AddMRFWorker() {
if !ok {
return
}
switch v := oi.(type) {
case ReplicateObjectInfo:
atomic.AddInt32(&p.activeMRFWorkers, 1)
replicateObject(p.ctx, oi, p.objLayer)
replicateObject(p.ctx, v, p.objLayer)
atomic.AddInt32(&p.activeMRFWorkers, -1)
default:
logger.LogOnceIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type")
}
case <-p.mrfWorkerKillCh:
return
}
}
}
// AddWorker adds a replication worker to the pool
func (p *ReplicationPool) AddWorker() {
// AddWorker adds a replication worker to the pool.
// An optional pointer to a tracker that will be atomically
// incremented when operations are running can be provided.
func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
defer p.workerWg.Done()
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.replicaCh:
case oi, ok := <-input:
if !ok {
return
}
atomic.AddInt32(&p.activeWorkers, 1)
replicateObject(p.ctx, oi, p.objLayer)
atomic.AddInt32(&p.activeWorkers, -1)
case doi, ok := <-p.replicaDeleteCh:
if !ok {
return
switch v := oi.(type) {
case ReplicateObjectInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
atomic.AddInt32(&p.activeWorkers, 1)
replicateDelete(p.ctx, doi, p.objLayer)
atomic.AddInt32(&p.activeWorkers, -1)
case <-p.workerKillCh:
return
replicateObject(p.ctx, v, p.objLayer)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
case DeletedObjectReplicationInfo:
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
replicateDelete(p.ctx, v, p.objLayer)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.existingReplicaCh:
if !ok {
return
default:
logger.LogOnceIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
}
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.existingReplicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer)
}
}
}
@ -1677,19 +1675,28 @@ func (p *ReplicationPool) ActiveMRFWorkers() int {
return int(atomic.LoadInt32(&p.activeMRFWorkers))
}
// ResizeWorkers sets replication workers pool to new size
func (p *ReplicationPool) ResizeWorkers(n int) {
// ResizeWorkers sets replication workers pool to new size.
// checkOld can be set to an expected value.
// If the worker count changed
func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
p.mu.Lock()
defer p.mu.Unlock()
for p.workerSize < n {
p.workerSize++
p.workerWg.Add(1)
go p.AddWorker()
if (checkOld > 0 && len(p.workers) != checkOld) || n == len(p.workers) || n < 1 {
// Either already satisfied or worker count changed while we waited for the lock.
return
}
for p.workerSize > n {
p.workerSize--
go func() { p.workerKillCh <- struct{}{} }()
for len(p.workers) < n {
input := make(chan ReplicationWorkerOperation, 10000)
p.workers = append(p.workers, input)
p.workerWg.Add(1)
go p.AddWorker(input, &p.activeWorkers)
}
for len(p.workers) > n {
worker := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
close(worker)
}
}
@ -1707,8 +1714,8 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
default:
workers = WorkerAutoDefault
mrfWorkers = MRFWorkerAutoDefault
if p.workerSize < WorkerAutoDefault {
workers = int(math.Min(float64(p.workerSize+1), WorkerAutoDefault))
if len(p.workers) < WorkerAutoDefault {
workers = int(math.Min(float64(len(p.workers)+1), WorkerAutoDefault))
}
if p.mrfWorkerSize < MRFWorkerAutoDefault {
mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault))
@ -1716,7 +1723,7 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) {
}
p.priority = pri
p.mu.Unlock()
p.ResizeWorkers(workers)
p.ResizeWorkers(workers, 0)
p.ResizeFailedWorkers(mrfWorkers)
}
@ -1736,28 +1743,38 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) {
}
}
// getWorkerCh gets a worker channel deterministically based on bucket and object names.
// Must be able to grab read lock from p.
func (p *ReplicationPool) getWorkerCh(bucket, object string) chan<- ReplicationWorkerOperation {
h := xxh3.HashString(bucket + object)
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.workers) == 0 {
return nil
}
return p.workers[h%uint64(len(p.workers))]
}
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
var ch, healCh chan ReplicateObjectInfo
var ch, healCh chan<- ReplicationWorkerOperation
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaCh
ch = p.existingWorkers
case replication.HealReplicationType:
ch = p.mrfReplicaCh
healCh = p.replicaCh
healCh = p.getWorkerCh(ri.Name, ri.Bucket)
default:
ch = p.replicaCh
ch = p.getWorkerCh(ri.Name, ri.Bucket)
}
if ch == nil && healCh == nil {
return
}
select {
case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
close(p.saveStateCh)
})
case <-p.ctx.Done():
case healCh <- ri:
case ch <- ri:
default:
@ -1773,9 +1790,10 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
default:
if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers)
p.ResizeWorkers(workers, existing)
}
if p.ActiveMRFWorkers() < MRFWorkerMaxLimit {
p.mu.RLock()
@ -1802,22 +1820,18 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
if p == nil {
return
}
var ch chan DeletedObjectReplicationInfo
var ch chan<- ReplicationWorkerOperation
switch doi.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaDeleteCh
ch = p.existingWorkers
case replication.HealReplicationType:
fallthrough
default:
ch = p.replicaDeleteCh
ch = p.getWorkerCh(doi.Bucket, doi.ObjectName)
}
select {
case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaDeleteCh)
close(p.existingReplicaDeleteCh)
})
case <-p.ctx.Done():
case ch <- doi:
default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
@ -1832,9 +1846,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
default:
if p.ActiveWorkers() < WorkerMaxLimit {
p.mu.RLock()
workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit))
workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit))
existing := len(p.workers)
p.mu.RUnlock()
p.ResizeWorkers(workers)
p.ResizeWorkers(workers, existing)
}
}
}