mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
add support for configurable replication MRF workers (#12125)
just like replication workers, allow failed replication workers to be configurable in situations like DR failures etc to catch up on replication sooner when DR is back online. Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
parent
014e419151
commit
82dc6aff1c
@ -36,9 +36,9 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
||||
|
||||
// ReplicationStats holds the global in-memory replication stats
|
||||
type ReplicationStats struct {
|
||||
sync.RWMutex
|
||||
Cache map[string]*BucketReplicationStats
|
||||
UsageCache map[string]*BucketReplicationStats // initial usage
|
||||
UsageCache map[string]*BucketReplicationStats
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// Delete deletes in-memory replication statistics for a bucket.
|
||||
|
@ -819,32 +819,34 @@ var (
|
||||
|
||||
// ReplicationPool describes replication pool
|
||||
type ReplicationPool struct {
|
||||
once sync.Once
|
||||
mu sync.Mutex
|
||||
size int
|
||||
objLayer ObjectLayer
|
||||
ctx context.Context
|
||||
mrfWorkerKillCh chan struct{}
|
||||
workerKillCh chan struct{}
|
||||
mrfReplicaDeleteCh chan DeletedObjectVersionInfo
|
||||
replicaCh chan ReplicateObjectInfo
|
||||
replicaDeleteCh chan DeletedObjectVersionInfo
|
||||
mrfReplicaCh chan ReplicateObjectInfo
|
||||
mrfReplicaDeleteCh chan DeletedObjectVersionInfo
|
||||
killCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
objLayer ObjectLayer
|
||||
workerSize int
|
||||
mrfWorkerSize int
|
||||
workerWg sync.WaitGroup
|
||||
mrfWorkerWg sync.WaitGroup
|
||||
once sync.Once
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewReplicationPool creates a pool of replication workers of specified size
|
||||
func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool {
|
||||
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
|
||||
pool := &ReplicationPool{
|
||||
replicaCh: make(chan ReplicateObjectInfo, 1000),
|
||||
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000),
|
||||
replicaCh: make(chan ReplicateObjectInfo, 10000),
|
||||
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
|
||||
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
|
||||
mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
|
||||
ctx: ctx,
|
||||
objLayer: o,
|
||||
}
|
||||
pool.Resize(sz)
|
||||
// add long running worker for handling most recent failures/pending replications
|
||||
go pool.AddMRFWorker()
|
||||
pool.ResizeWorkers(opts.Workers)
|
||||
pool.ResizeFailedWorkers(opts.FailedWorkers)
|
||||
return pool
|
||||
}
|
||||
|
||||
@ -871,7 +873,7 @@ func (p *ReplicationPool) AddMRFWorker() {
|
||||
|
||||
// AddWorker adds a replication worker to the pool
|
||||
func (p *ReplicationPool) AddWorker() {
|
||||
defer p.wg.Done()
|
||||
defer p.workerWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
@ -886,26 +888,42 @@ func (p *ReplicationPool) AddWorker() {
|
||||
return
|
||||
}
|
||||
replicateDelete(p.ctx, doi, p.objLayer)
|
||||
case <-p.killCh:
|
||||
case <-p.workerKillCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Resize replication pool to new size
|
||||
func (p *ReplicationPool) Resize(n int) {
|
||||
// ResizeWorkers sets replication workers pool to new size
|
||||
func (p *ReplicationPool) ResizeWorkers(n int) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
for p.size < n {
|
||||
p.size++
|
||||
p.wg.Add(1)
|
||||
for p.workerSize < n {
|
||||
p.workerSize++
|
||||
p.workerWg.Add(1)
|
||||
go p.AddWorker()
|
||||
}
|
||||
for p.size > n {
|
||||
p.size--
|
||||
go func() { p.killCh <- struct{}{} }()
|
||||
for p.workerSize > n {
|
||||
p.workerSize--
|
||||
go func() { p.workerKillCh <- struct{}{} }()
|
||||
}
|
||||
}
|
||||
|
||||
// ResizeFailedWorkers sets replication failed workers pool size
|
||||
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
for p.mrfWorkerSize < n {
|
||||
p.mrfWorkerSize++
|
||||
p.mrfWorkerWg.Add(1)
|
||||
go p.AddMRFWorker()
|
||||
}
|
||||
for p.mrfWorkerSize > n {
|
||||
p.mrfWorkerSize--
|
||||
go func() { p.mrfWorkerKillCh <- struct{}{} }()
|
||||
}
|
||||
}
|
||||
|
||||
@ -943,8 +961,16 @@ func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi Delete
|
||||
}
|
||||
}
|
||||
|
||||
type replicationPoolOpts struct {
|
||||
Workers int
|
||||
FailedWorkers int
|
||||
}
|
||||
|
||||
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
||||
globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers())
|
||||
globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
|
||||
Workers: globalAPIConfig.getReplicationWorkers(),
|
||||
FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(),
|
||||
})
|
||||
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
|
||||
}
|
||||
|
||||
|
@ -30,24 +30,26 @@ import (
|
||||
|
||||
// API sub-system constants
|
||||
const (
|
||||
apiRequestsMax = "requests_max"
|
||||
apiRequestsDeadline = "requests_deadline"
|
||||
apiClusterDeadline = "cluster_deadline"
|
||||
apiCorsAllowOrigin = "cors_allow_origin"
|
||||
apiRemoteTransportDeadline = "remote_transport_deadline"
|
||||
apiListQuorum = "list_quorum"
|
||||
apiExtendListCacheLife = "extend_list_cache_life"
|
||||
apiReplicationWorkers = "replication_workers"
|
||||
apiRequestsMax = "requests_max"
|
||||
apiRequestsDeadline = "requests_deadline"
|
||||
apiClusterDeadline = "cluster_deadline"
|
||||
apiCorsAllowOrigin = "cors_allow_origin"
|
||||
apiRemoteTransportDeadline = "remote_transport_deadline"
|
||||
apiListQuorum = "list_quorum"
|
||||
apiExtendListCacheLife = "extend_list_cache_life"
|
||||
apiReplicationWorkers = "replication_workers"
|
||||
apiReplicationFailedWorkers = "replication_failed_workers"
|
||||
|
||||
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
||||
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
||||
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
|
||||
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
|
||||
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
|
||||
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
|
||||
EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE"
|
||||
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS"
|
||||
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
|
||||
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
||||
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
||||
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
|
||||
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
|
||||
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
|
||||
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
|
||||
EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE"
|
||||
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS"
|
||||
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
|
||||
EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS"
|
||||
)
|
||||
|
||||
// Deprecated key and ENVs
|
||||
@ -91,19 +93,24 @@ var (
|
||||
Key: apiReplicationWorkers,
|
||||
Value: "500",
|
||||
},
|
||||
config.KV{
|
||||
Key: apiReplicationFailedWorkers,
|
||||
Value: "4",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// Config storage class configuration
|
||||
type Config struct {
|
||||
RequestsMax int `json:"requests_max"`
|
||||
RequestsDeadline time.Duration `json:"requests_deadline"`
|
||||
ClusterDeadline time.Duration `json:"cluster_deadline"`
|
||||
CorsAllowOrigin []string `json:"cors_allow_origin"`
|
||||
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
|
||||
ListQuorum string `json:"list_strict_quorum"`
|
||||
ExtendListLife time.Duration `json:"extend_list_cache_life"`
|
||||
ReplicationWorkers int `json:"replication_workers"`
|
||||
RequestsMax int `json:"requests_max"`
|
||||
RequestsDeadline time.Duration `json:"requests_deadline"`
|
||||
ClusterDeadline time.Duration `json:"cluster_deadline"`
|
||||
CorsAllowOrigin []string `json:"cors_allow_origin"`
|
||||
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
|
||||
ListQuorum string `json:"list_strict_quorum"`
|
||||
ExtendListLife time.Duration `json:"extend_list_cache_life"`
|
||||
ReplicationWorkers int `json:"replication_workers"`
|
||||
ReplicationFailedWorkers int `json:"replication_failed_workers"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||
@ -190,14 +197,24 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
||||
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1")
|
||||
}
|
||||
|
||||
replicationFailedWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationFailedWorkers, kvs.Get(apiReplicationFailedWorkers)))
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
if replicationFailedWorkers <= 0 {
|
||||
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication failed workers should be 1")
|
||||
}
|
||||
|
||||
return Config{
|
||||
RequestsMax: requestsMax,
|
||||
RequestsDeadline: requestsDeadline,
|
||||
ClusterDeadline: clusterDeadline,
|
||||
CorsAllowOrigin: corsAllowOrigin,
|
||||
RemoteTransportDeadline: remoteTransportDeadline,
|
||||
ListQuorum: listQuorum,
|
||||
ExtendListLife: listLife,
|
||||
ReplicationWorkers: replicationWorkers,
|
||||
RequestsMax: requestsMax,
|
||||
RequestsDeadline: requestsDeadline,
|
||||
ClusterDeadline: clusterDeadline,
|
||||
CorsAllowOrigin: corsAllowOrigin,
|
||||
RemoteTransportDeadline: remoteTransportDeadline,
|
||||
ListQuorum: listQuorum,
|
||||
ExtendListLife: listLife,
|
||||
ReplicationWorkers: replicationWorkers,
|
||||
ReplicationFailedWorkers: replicationFailedWorkers,
|
||||
}, nil
|
||||
}
|
||||
|
@ -52,5 +52,11 @@ var (
|
||||
Optional: true,
|
||||
Type: "number",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: apiReplicationFailedWorkers,
|
||||
Description: `set the number of replication workers for recently failed replicas, defaults to 4`,
|
||||
Optional: true,
|
||||
Type: "number",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
@ -37,8 +37,9 @@ type apiConfig struct {
|
||||
extendListLife time.Duration
|
||||
corsAllowOrigins []string
|
||||
// total drives per erasure set across pools.
|
||||
totalDriveCount int
|
||||
replicationWorkers int
|
||||
totalDriveCount int
|
||||
replicationWorkers int
|
||||
replicationFailedWorkers int
|
||||
}
|
||||
|
||||
func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
@ -83,8 +84,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
t.extendListLife = cfg.ExtendListLife
|
||||
if globalReplicationPool != nil &&
|
||||
cfg.ReplicationWorkers != t.replicationWorkers {
|
||||
globalReplicationPool.Resize(cfg.ReplicationWorkers)
|
||||
globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers)
|
||||
globalReplicationPool.ResizeWorkers(cfg.ReplicationWorkers)
|
||||
}
|
||||
t.replicationFailedWorkers = cfg.ReplicationFailedWorkers
|
||||
t.replicationWorkers = cfg.ReplicationWorkers
|
||||
}
|
||||
|
||||
@ -166,6 +169,13 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *apiConfig) getReplicationFailedWorkers() int {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
return t.replicationFailedWorkers
|
||||
}
|
||||
|
||||
func (t *apiConfig) getReplicationWorkers() int {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
Loading…
Reference in New Issue
Block a user