diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 255173e86..d069658d0 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -36,9 +36,9 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool { // ReplicationStats holds the global in-memory replication stats type ReplicationStats struct { - sync.RWMutex Cache map[string]*BucketReplicationStats - UsageCache map[string]*BucketReplicationStats // initial usage + UsageCache map[string]*BucketReplicationStats + sync.RWMutex } // Delete deletes in-memory replication statistics for a bucket. diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index bc0808549..0d704a61f 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -819,32 +819,34 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { - once sync.Once - mu sync.Mutex - size int + objLayer ObjectLayer + ctx context.Context + mrfWorkerKillCh chan struct{} + workerKillCh chan struct{} + mrfReplicaDeleteCh chan DeletedObjectVersionInfo replicaCh chan ReplicateObjectInfo replicaDeleteCh chan DeletedObjectVersionInfo mrfReplicaCh chan ReplicateObjectInfo - mrfReplicaDeleteCh chan DeletedObjectVersionInfo - killCh chan struct{} - wg sync.WaitGroup - ctx context.Context - objLayer ObjectLayer + workerSize int + mrfWorkerSize int + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.Mutex } // NewReplicationPool creates a pool of replication workers of specified size -func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool { +func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { pool := &ReplicationPool{ - replicaCh: make(chan ReplicateObjectInfo, 1000), - replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000), + replicaCh: make(chan ReplicateObjectInfo, 10000), + replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), ctx: ctx, objLayer: o, } - pool.Resize(sz) - // add long running worker for handling most recent failures/pending replications - go pool.AddMRFWorker() + pool.ResizeWorkers(opts.Workers) + pool.ResizeFailedWorkers(opts.FailedWorkers) return pool } @@ -871,7 +873,7 @@ func (p *ReplicationPool) AddMRFWorker() { // AddWorker adds a replication worker to the pool func (p *ReplicationPool) AddWorker() { - defer p.wg.Done() + defer p.workerWg.Done() for { select { case <-p.ctx.Done(): @@ -886,26 +888,42 @@ func (p *ReplicationPool) AddWorker() { return } replicateDelete(p.ctx, doi, p.objLayer) - case <-p.killCh: + case <-p.workerKillCh: return } } } -//Resize replication pool to new size -func (p *ReplicationPool) Resize(n int) { +// ResizeWorkers sets replication workers pool to new size +func (p *ReplicationPool) ResizeWorkers(n int) { p.mu.Lock() defer p.mu.Unlock() - for p.size < n { - p.size++ - p.wg.Add(1) + for p.workerSize < n { + p.workerSize++ + p.workerWg.Add(1) go p.AddWorker() } - for p.size > n { - p.size-- - go func() { p.killCh <- struct{}{} }() + for p.workerSize > n { + p.workerSize-- + go func() { p.workerKillCh <- struct{}{} }() + } +} + +// ResizeFailedWorkers sets replication failed workers pool size +func (p *ReplicationPool) ResizeFailedWorkers(n int) { + p.mu.Lock() + defer p.mu.Unlock() + + for p.mrfWorkerSize < n { + p.mrfWorkerSize++ + p.mrfWorkerWg.Add(1) + go p.AddMRFWorker() + } + for p.mrfWorkerSize > n { + p.mrfWorkerSize-- + go func() { p.mrfWorkerKillCh <- struct{}{} }() } } @@ -943,8 +961,16 @@ func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi Delete } } +type replicationPoolOpts struct { + Workers int + FailedWorkers int +} + func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers()) + globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{ + Workers: globalAPIConfig.getReplicationWorkers(), + FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(), + }) globalReplicationStats = NewReplicationStats(ctx, objectAPI) } diff --git a/cmd/config/api/api.go b/cmd/config/api/api.go index 4b1f5225e..a64d4873d 100644 --- a/cmd/config/api/api.go +++ b/cmd/config/api/api.go @@ -30,24 +30,26 @@ import ( // API sub-system constants const ( - apiRequestsMax = "requests_max" - apiRequestsDeadline = "requests_deadline" - apiClusterDeadline = "cluster_deadline" - apiCorsAllowOrigin = "cors_allow_origin" - apiRemoteTransportDeadline = "remote_transport_deadline" - apiListQuorum = "list_quorum" - apiExtendListCacheLife = "extend_list_cache_life" - apiReplicationWorkers = "replication_workers" + apiRequestsMax = "requests_max" + apiRequestsDeadline = "requests_deadline" + apiClusterDeadline = "cluster_deadline" + apiCorsAllowOrigin = "cors_allow_origin" + apiRemoteTransportDeadline = "remote_transport_deadline" + apiListQuorum = "list_quorum" + apiExtendListCacheLife = "extend_list_cache_life" + apiReplicationWorkers = "replication_workers" + apiReplicationFailedWorkers = "replication_failed_workers" - EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" - EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" - EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" - EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" - EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" - EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" - EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE" - EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" - EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" + EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" + EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" + EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" + EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" + EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" + EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" + EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE" + EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" + EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" + EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" ) // Deprecated key and ENVs @@ -91,19 +93,24 @@ var ( Key: apiReplicationWorkers, Value: "500", }, + config.KV{ + Key: apiReplicationFailedWorkers, + Value: "4", + }, } ) // Config storage class configuration type Config struct { - RequestsMax int `json:"requests_max"` - RequestsDeadline time.Duration `json:"requests_deadline"` - ClusterDeadline time.Duration `json:"cluster_deadline"` - CorsAllowOrigin []string `json:"cors_allow_origin"` - RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` - ListQuorum string `json:"list_strict_quorum"` - ExtendListLife time.Duration `json:"extend_list_cache_life"` - ReplicationWorkers int `json:"replication_workers"` + RequestsMax int `json:"requests_max"` + RequestsDeadline time.Duration `json:"requests_deadline"` + ClusterDeadline time.Duration `json:"cluster_deadline"` + CorsAllowOrigin []string `json:"cors_allow_origin"` + RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` + ListQuorum string `json:"list_strict_quorum"` + ExtendListLife time.Duration `json:"extend_list_cache_life"` + ReplicationWorkers int `json:"replication_workers"` + ReplicationFailedWorkers int `json:"replication_failed_workers"` } // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. @@ -190,14 +197,24 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1") } + replicationFailedWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationFailedWorkers, kvs.Get(apiReplicationFailedWorkers))) + if err != nil { + return cfg, err + } + + if replicationFailedWorkers <= 0 { + return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication failed workers should be 1") + } + return Config{ - RequestsMax: requestsMax, - RequestsDeadline: requestsDeadline, - ClusterDeadline: clusterDeadline, - CorsAllowOrigin: corsAllowOrigin, - RemoteTransportDeadline: remoteTransportDeadline, - ListQuorum: listQuorum, - ExtendListLife: listLife, - ReplicationWorkers: replicationWorkers, + RequestsMax: requestsMax, + RequestsDeadline: requestsDeadline, + ClusterDeadline: clusterDeadline, + CorsAllowOrigin: corsAllowOrigin, + RemoteTransportDeadline: remoteTransportDeadline, + ListQuorum: listQuorum, + ExtendListLife: listLife, + ReplicationWorkers: replicationWorkers, + ReplicationFailedWorkers: replicationFailedWorkers, }, nil } diff --git a/cmd/config/api/help.go b/cmd/config/api/help.go index 4b4196a76..e8f63d901 100644 --- a/cmd/config/api/help.go +++ b/cmd/config/api/help.go @@ -52,5 +52,11 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: apiReplicationFailedWorkers, + Description: `set the number of replication workers for recently failed replicas, defaults to 4`, + Optional: true, + Type: "number", + }, } ) diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 84533d5c3..7049f0252 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -37,8 +37,9 @@ type apiConfig struct { extendListLife time.Duration corsAllowOrigins []string // total drives per erasure set across pools. - totalDriveCount int - replicationWorkers int + totalDriveCount int + replicationWorkers int + replicationFailedWorkers int } func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { @@ -83,8 +84,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { t.extendListLife = cfg.ExtendListLife if globalReplicationPool != nil && cfg.ReplicationWorkers != t.replicationWorkers { - globalReplicationPool.Resize(cfg.ReplicationWorkers) + globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers) + globalReplicationPool.ResizeWorkers(cfg.ReplicationWorkers) } + t.replicationFailedWorkers = cfg.ReplicationFailedWorkers t.replicationWorkers = cfg.ReplicationWorkers } @@ -166,6 +169,13 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc { } } +func (t *apiConfig) getReplicationFailedWorkers() int { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.replicationFailedWorkers +} + func (t *apiConfig) getReplicationWorkers() int { t.mu.RLock() defer t.mu.RUnlock()