resize replication worker pool dynamically after config update (#11737)

This commit is contained in:
Poorna Krishnamoorthy 2021-03-09 02:56:42 -08:00 committed by GitHub
parent 209fe61dcc
commit 2f29719e6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 70 deletions

View File

@ -22,6 +22,7 @@ import (
"net/http"
"reflect"
"strings"
"sync"
"time"
minio "github.com/minio/minio-go/v7"
@ -770,81 +771,102 @@ type DeletedObjectVersionInfo struct {
DeletedObject
Bucket string
}
type replicationState struct {
// add future metrics here
replicaCh chan ObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
}
func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
if r == nil {
return
}
select {
case r.replicaCh <- oi:
default:
}
}
func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
if r == nil {
return
}
select {
case r.replicaDeleteCh <- doi:
default:
}
}
var (
globalReplicationState *replicationState
globalReplicationPool *ReplicationPool
)
func newReplicationState() *replicationState {
rs := &replicationState{
replicaCh: make(chan ObjectInfo, 10000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
}
go func() {
<-GlobalContext.Done()
close(rs.replicaCh)
close(rs.replicaDeleteCh)
}()
return rs
// ReplicationPool describes replication pool
type ReplicationPool struct {
mu sync.Mutex
size int
replicaCh chan ObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
killCh chan struct{}
wg sync.WaitGroup
ctx context.Context
objLayer ObjectLayer
}
// addWorker creates a new worker to process tasks
func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) {
// Add a new worker.
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool {
pool := &ReplicationPool{
replicaCh: make(chan ObjectInfo, 10000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
ctx: ctx,
objLayer: o,
}
go func() {
for {
select {
case <-ctx.Done():
return
case oi, ok := <-r.replicaCh:
if !ok {
return
}
replicateObject(ctx, oi, objectAPI)
case doi, ok := <-r.replicaDeleteCh:
if !ok {
return
}
replicateDelete(ctx, doi, objectAPI)
}
}
<-ctx.Done()
close(pool.replicaCh)
close(pool.replicaDeleteCh)
}()
pool.Resize(sz)
return pool
}
// AddWorker adds a replication worker to the pool
func (p *ReplicationPool) AddWorker() {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.replicaCh:
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.replicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer)
case <-p.killCh:
return
}
}
}
//Resize replication pool to new size
func (p *ReplicationPool) Resize(n int) {
p.mu.Lock()
defer p.mu.Unlock()
for p.size < n {
p.size++
p.wg.Add(1)
go p.AddWorker()
}
for p.size > n {
p.size--
go func() { p.killCh <- struct{}{} }()
}
}
func (p *ReplicationPool) queueReplicaTask(oi ObjectInfo) {
if p == nil {
return
}
select {
case p.replicaCh <- oi:
default:
}
}
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
if p == nil {
return
}
select {
case p.replicaDeleteCh <- doi:
default:
}
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
if globalReplicationState == nil {
return
}
// Start replication workers per count set in api config or MINIO_API_REPLICATION_WORKERS.
for i := 0; i < globalAPIConfig.getReplicationWorkers(); i++ {
globalReplicationState.addWorker(ctx, objectAPI)
}
globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers())
}
// get Reader from replication target if active-active replication is in place and
@ -984,7 +1006,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
if sync {
replicateObject(ctx, objInfo, o)
} else {
globalReplicationState.queueReplicaTask(objInfo)
globalReplicationPool.queueReplicaTask(objInfo)
}
}
@ -992,6 +1014,6 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo,
if sync {
replicateDelete(ctx, dv, o)
} else {
globalReplicationState.queueReplicaDeleteTask(dv)
globalReplicationPool.queueReplicaDeleteTask(dv)
}
}

View File

@ -73,7 +73,6 @@ func init() {
},
})
globalReplicationState = newReplicationState()
globalTransitionState = newTransitionState()
console.SetColor("Debug", color.New())

View File

@ -1076,10 +1076,10 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
switch oi.ReplicationStatus {
case replication.Pending:
sizeS.pendingSize += oi.Size
globalReplicationState.queueReplicaTask(oi)
globalReplicationPool.queueReplicaTask(oi)
case replication.Failed:
sizeS.failedSize += oi.Size
globalReplicationState.queueReplicaTask(oi)
globalReplicationPool.queueReplicaTask(oi)
case replication.Completed, "COMPLETE":
sizeS.replicatedSize += oi.Size
case replication.Replica:
@ -1098,7 +1098,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer,
} else {
versionID = oi.VersionID
}
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{
DeletedObject: DeletedObject{
ObjectName: oi.Name,
DeleteMarkerVersionID: dmVersionID,

View File

@ -80,6 +80,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
t.requestsDeadline = cfg.RequestsDeadline
t.listQuorum = cfg.GetListQuorum()
t.extendListLife = cfg.ExtendListLife
if globalReplicationPool != nil &&
cfg.ReplicationWorkers != t.replicationWorkers {
globalReplicationPool.Resize(cfg.ReplicationWorkers)
}
t.replicationWorkers = cfg.ReplicationWorkers
}