diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 3658cba38..a9c62ac72 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -22,6 +22,7 @@ import ( "net/http" "reflect" "strings" + "sync" "time" minio "github.com/minio/minio-go/v7" @@ -770,81 +771,102 @@ type DeletedObjectVersionInfo struct { DeletedObject Bucket string } -type replicationState struct { - // add future metrics here - replicaCh chan ObjectInfo - replicaDeleteCh chan DeletedObjectVersionInfo -} - -func (r *replicationState) queueReplicaTask(oi ObjectInfo) { - if r == nil { - return - } - select { - case r.replicaCh <- oi: - default: - } -} - -func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) { - if r == nil { - return - } - select { - case r.replicaDeleteCh <- doi: - default: - } -} var ( - globalReplicationState *replicationState + globalReplicationPool *ReplicationPool ) -func newReplicationState() *replicationState { - rs := &replicationState{ - replicaCh: make(chan ObjectInfo, 10000), - replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), - } - go func() { - <-GlobalContext.Done() - close(rs.replicaCh) - close(rs.replicaDeleteCh) - }() - return rs +// ReplicationPool describes replication pool +type ReplicationPool struct { + mu sync.Mutex + size int + replicaCh chan ObjectInfo + replicaDeleteCh chan DeletedObjectVersionInfo + killCh chan struct{} + wg sync.WaitGroup + ctx context.Context + objLayer ObjectLayer } -// addWorker creates a new worker to process tasks -func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) { - // Add a new worker. +// NewReplicationPool creates a pool of replication workers of specified size +func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool { + pool := &ReplicationPool{ + replicaCh: make(chan ObjectInfo, 10000), + replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), + ctx: ctx, + objLayer: o, + } go func() { - for { - select { - case <-ctx.Done(): - return - case oi, ok := <-r.replicaCh: - if !ok { - return - } - replicateObject(ctx, oi, objectAPI) - case doi, ok := <-r.replicaDeleteCh: - if !ok { - return - } - replicateDelete(ctx, doi, objectAPI) - } - } + <-ctx.Done() + close(pool.replicaCh) + close(pool.replicaDeleteCh) }() + pool.Resize(sz) + return pool +} + +// AddWorker adds a replication worker to the pool +func (p *ReplicationPool) AddWorker() { + defer p.wg.Done() + for { + select { + case <-p.ctx.Done(): + return + case oi, ok := <-p.replicaCh: + if !ok { + return + } + replicateObject(p.ctx, oi, p.objLayer) + case doi, ok := <-p.replicaDeleteCh: + if !ok { + return + } + replicateDelete(p.ctx, doi, p.objLayer) + case <-p.killCh: + return + } + } + +} + +//Resize replication pool to new size +func (p *ReplicationPool) Resize(n int) { + p.mu.Lock() + defer p.mu.Unlock() + + for p.size < n { + p.size++ + p.wg.Add(1) + go p.AddWorker() + } + for p.size > n { + p.size-- + go func() { p.killCh <- struct{}{} }() + } +} + +func (p *ReplicationPool) queueReplicaTask(oi ObjectInfo) { + if p == nil { + return + } + select { + case p.replicaCh <- oi: + default: + } +} + +func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) { + if p == nil { + return + } + select { + case p.replicaDeleteCh <- doi: + default: + } } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - if globalReplicationState == nil { - return - } - - // Start replication workers per count set in api config or MINIO_API_REPLICATION_WORKERS. - for i := 0; i < globalAPIConfig.getReplicationWorkers(); i++ { - globalReplicationState.addWorker(ctx, objectAPI) - } + globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers()) } // get Reader from replication target if active-active replication is in place and @@ -984,7 +1006,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, if sync { replicateObject(ctx, objInfo, o) } else { - globalReplicationState.queueReplicaTask(objInfo) + globalReplicationPool.queueReplicaTask(objInfo) } } @@ -992,6 +1014,6 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, if sync { replicateDelete(ctx, dv, o) } else { - globalReplicationState.queueReplicaDeleteTask(dv) + globalReplicationPool.queueReplicaDeleteTask(dv) } } diff --git a/cmd/common-main.go b/cmd/common-main.go index 2270c174d..79319ad8f 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -73,7 +73,6 @@ func init() { }, }) - globalReplicationState = newReplicationState() globalTransitionState = newTransitionState() console.SetColor("Debug", color.New()) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index f387aa848..25b5c5cd0 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1076,10 +1076,10 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj switch oi.ReplicationStatus { case replication.Pending: sizeS.pendingSize += oi.Size - globalReplicationState.queueReplicaTask(oi) + globalReplicationPool.queueReplicaTask(oi) case replication.Failed: sizeS.failedSize += oi.Size - globalReplicationState.queueReplicaTask(oi) + globalReplicationPool.queueReplicaTask(oi) case replication.Completed, "COMPLETE": sizeS.replicatedSize += oi.Size case replication.Replica: @@ -1098,7 +1098,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, } else { versionID = oi.VersionID } - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: oi.Name, DeleteMarkerVersionID: dmVersionID, diff --git a/cmd/handler-api.go b/cmd/handler-api.go index f14db6220..ee7067d11 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -80,6 +80,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { t.requestsDeadline = cfg.RequestsDeadline t.listQuorum = cfg.GetListQuorum() t.extendListLife = cfg.ExtendListLife + if globalReplicationPool != nil && + cfg.ReplicationWorkers != t.replicationWorkers { + globalReplicationPool.Resize(cfg.ReplicationWorkers) + } t.replicationWorkers = cfg.ReplicationWorkers }