diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 67c0bcc5e..c71a5fc17 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1803,15 +1803,18 @@ var ( type ReplicationPool struct { // atomic ops: activeWorkers int32 + activeLrgWorkers int32 activeMRFWorkers int32 - objLayer ObjectLayer - ctx context.Context - priority string - maxWorkers int - mu sync.RWMutex - mrfMU sync.Mutex - resyncer *replicationResyncer + objLayer ObjectLayer + ctx context.Context + priority string + maxWorkers int + maxLWorkers int + + mu sync.RWMutex + mrfMU sync.Mutex + resyncer *replicationResyncer // workers: workers []chan ReplicationWorkerOperation @@ -1882,9 +1885,13 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool if maxWorkers > 0 && failedWorkers > maxWorkers { failedWorkers = maxWorkers } + maxLWorkers := LargeWorkerCount + if opts.MaxLWorkers > 0 { + maxLWorkers = opts.MaxLWorkers + } pool := &ReplicationPool{ workers: make([]chan ReplicationWorkerOperation, 0, workers), - lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), + lrgworkers: make([]chan ReplicationWorkerOperation, 0, maxLWorkers), mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000), mrfWorkerKillCh: make(chan struct{}, failedWorkers), resyncer: newresyncer(), @@ -1894,9 +1901,10 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool objLayer: o, priority: priority, maxWorkers: maxWorkers, + maxLWorkers: maxLWorkers, } - pool.AddLargeWorkers() + pool.ResizeLrgWorkers(maxLWorkers, 0) pool.ResizeWorkers(workers, 0) pool.ResizeFailedWorkers(failedWorkers) go pool.resyncer.PersistToDisk(ctx, o) @@ -1975,23 +1983,8 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT } } -// AddLargeWorkers adds a static number of workers to handle large uploads -func (p *ReplicationPool) AddLargeWorkers() { - for i := 0; i < LargeWorkerCount; i++ { - p.lrgworkers = append(p.lrgworkers, make(chan ReplicationWorkerOperation, 100000)) - i := i - go p.AddLargeWorker(p.lrgworkers[i]) - } - go func() { - <-p.ctx.Done() - for i := 0; i < LargeWorkerCount; i++ { - xioutil.SafeClose(p.lrgworkers[i]) - } - }() -} - // AddLargeWorker adds a replication worker to the static pool for large uploads. -func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation) { +func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) { for { select { case <-p.ctx.Done(): @@ -2002,11 +1995,23 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation } switch v := oi.(type) { case ReplicateObjectInfo: + if opTracker != nil { + atomic.AddInt32(opTracker, 1) + } globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) replicateObject(p.ctx, v, p.objLayer) globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + if opTracker != nil { + atomic.AddInt32(opTracker, -1) + } case DeletedObjectReplicationInfo: + if opTracker != nil { + atomic.AddInt32(opTracker, 1) + } replicateDelete(p.ctx, v, p.objLayer) + if opTracker != nil { + atomic.AddInt32(opTracker, -1) + } default: bugLogIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type") } @@ -2014,6 +2019,30 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation } } +// ResizeLrgWorkers sets replication workers pool for large transfers(>=128MiB) to new size. +// checkOld can be set to an expected value. +// If the worker count changed +func (p *ReplicationPool) ResizeLrgWorkers(n, checkOld int) { + p.mu.Lock() + defer p.mu.Unlock() + + if (checkOld > 0 && len(p.lrgworkers) != checkOld) || n == len(p.lrgworkers) || n < 1 { + // Either already satisfied or worker count changed while we waited for the lock. + return + } + for len(p.lrgworkers) < n { + input := make(chan ReplicationWorkerOperation, 100000) + p.lrgworkers = append(p.lrgworkers, input) + + go p.AddLargeWorker(input, &p.activeLrgWorkers) + } + for len(p.lrgworkers) > n { + worker := p.lrgworkers[len(p.lrgworkers)-1] + p.lrgworkers = p.lrgworkers[:len(p.lrgworkers)-1] + xioutil.SafeClose(worker) + } +} + // ActiveWorkers returns the number of active workers handling replication traffic. func (p *ReplicationPool) ActiveWorkers() int { return int(atomic.LoadInt32(&p.activeWorkers)) @@ -2024,6 +2053,11 @@ func (p *ReplicationPool) ActiveMRFWorkers() int { return int(atomic.LoadInt32(&p.activeMRFWorkers)) } +// ActiveLrgWorkers returns the number of active workers handling traffic > 128MiB object size. +func (p *ReplicationPool) ActiveLrgWorkers() int { + return int(atomic.LoadInt32(&p.activeLrgWorkers)) +} + // ResizeWorkers sets replication workers pool to new size. // checkOld can be set to an expected value. // If the worker count changed @@ -2049,7 +2083,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) { } // ResizeWorkerPriority sets replication failed workers pool size -func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) { +func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers, maxLWorkers int) { var workers, mrfWorkers int p.mu.Lock() switch pri { @@ -2076,11 +2110,15 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) { if maxWorkers > 0 && mrfWorkers > maxWorkers { mrfWorkers = maxWorkers } + if maxLWorkers <= 0 { + maxLWorkers = LargeWorkerCount + } p.priority = pri p.maxWorkers = maxWorkers p.mu.Unlock() p.ResizeWorkers(workers, 0) p.ResizeFailedWorkers(mrfWorkers) + p.ResizeLrgWorkers(maxLWorkers, 0) } // ResizeFailedWorkers sets replication failed workers pool size @@ -2127,6 +2165,15 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case p.lrgworkers[h%LargeWorkerCount] <- ri: default: globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + p.mu.RLock() + maxLWorkers := p.maxLWorkers + existing := len(p.lrgworkers) + p.mu.RUnlock() + maxLWorkers = min(maxLWorkers, LargeWorkerCount) + if p.ActiveLrgWorkers() < maxLWorkers { + workers := min(existing+1, maxLWorkers) + p.ResizeLrgWorkers(workers, existing) + } } return } @@ -2229,8 +2276,9 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf } type replicationPoolOpts struct { - Priority string - MaxWorkers int + Priority string + MaxWorkers int + MaxLWorkers int } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { diff --git a/cmd/handler-api.go b/cmd/handler-api.go index ad9d5903b..11b67411b 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -39,14 +39,15 @@ import ( type apiConfig struct { mu sync.RWMutex - requestsDeadline time.Duration - requestsPool chan struct{} - clusterDeadline time.Duration - listQuorum string - corsAllowOrigins []string - replicationPriority string - replicationMaxWorkers int - transitionWorkers int + requestsDeadline time.Duration + requestsPool chan struct{} + clusterDeadline time.Duration + listQuorum string + corsAllowOrigins []string + replicationPriority string + replicationMaxWorkers int + replicationMaxLWorkers int + transitionWorkers int staleUploadsExpiry time.Duration staleUploadsCleanupInterval time.Duration @@ -170,11 +171,12 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int, legacy bool) { } t.listQuorum = listQuorum if globalReplicationPool != nil && - (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers) { - globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers) + (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers || cfg.ReplicationMaxLWorkers != t.replicationMaxLWorkers) { + globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers) } t.replicationPriority = cfg.ReplicationPriority t.replicationMaxWorkers = cfg.ReplicationMaxWorkers + t.replicationMaxLWorkers = cfg.ReplicationMaxLWorkers // N B api.transition_workers will be deprecated if globalTransitionState != nil { @@ -381,14 +383,16 @@ func (t *apiConfig) getReplicationOpts() replicationPoolOpts { if t.replicationPriority == "" { return replicationPoolOpts{ - Priority: "auto", - MaxWorkers: WorkerMaxLimit, + Priority: "auto", + MaxWorkers: WorkerMaxLimit, + MaxLWorkers: LargeWorkerCount, } } return replicationPoolOpts{ - Priority: t.replicationPriority, - MaxWorkers: t.replicationMaxWorkers, + Priority: t.replicationPriority, + MaxWorkers: t.replicationMaxWorkers, + MaxLWorkers: t.replicationMaxLWorkers, } } diff --git a/internal/bucket/bandwidth/reader.go b/internal/bucket/bandwidth/reader.go index 3ec765321..e82199bde 100644 --- a/internal/bucket/bandwidth/reader.go +++ b/internal/bucket/bandwidth/reader.go @@ -74,12 +74,16 @@ func (r *MonitoredReader) Read(buf []byte) (n int, err error) { need = int(math.Min(float64(b), float64(need))) tokens = need } - + // reduce tokens requested according to availability + av := int(r.throttle.Tokens()) + if av < tokens && av > 0 { + tokens = av + need = int(math.Min(float64(tokens), float64(need))) + } err = r.throttle.WaitN(r.ctx, tokens) if err != nil { return } - n, err = r.r.Read(buf[:need]) if err != nil { r.lastErr = err diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 7f7563334..d7f493bb0 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -40,6 +40,7 @@ const ( apiListQuorum = "list_quorum" apiReplicationPriority = "replication_priority" apiReplicationMaxWorkers = "replication_max_workers" + apiReplicationMaxLWorkers = "replication_max_lrg_workers" apiTransitionWorkers = "transition_workers" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" @@ -52,16 +53,18 @@ const ( apiSyncEvents = "sync_events" apiObjectMaxVersions = "object_max_versions" - EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" - EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" - EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" - EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" - EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" - EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" - EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" - EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn - EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" - EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS" + EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" + EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" + EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" + EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" + EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" + EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" + EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" + EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn + EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" + EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS" + EnvAPIReplicationMaxLWorkers = "MINIO_API_REPLICATION_MAX_LRG_WORKERS" + EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL" EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY" EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL" @@ -117,6 +120,10 @@ var ( Key: apiReplicationMaxWorkers, Value: "500", }, + config.KV{ + Key: apiReplicationMaxLWorkers, + Value: "10", + }, config.KV{ Key: apiTransitionWorkers, Value: "100", @@ -171,6 +178,7 @@ type Config struct { ListQuorum string `json:"list_quorum"` ReplicationPriority string `json:"replication_priority"` ReplicationMaxWorkers int `json:"replication_max_workers"` + ReplicationMaxLWorkers int `json:"replication_max_lrg_workers"` TransitionWorkers int `json:"transition_workers"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` @@ -280,11 +288,21 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { if err != nil { return cfg, err } - if replicationMaxWorkers <= 0 || replicationMaxWorkers > 500 { return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers should be between 1 and 500") } cfg.ReplicationMaxWorkers = replicationMaxWorkers + + replicationMaxLWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationMaxLWorkers, kvs.GetWithDefault(apiReplicationMaxLWorkers, DefaultKVS))) + if err != nil { + return cfg, err + } + if replicationMaxLWorkers <= 0 || replicationMaxLWorkers > 10 { + return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers for transfers >=128MiB should be between 1 and 10 per node") + } + + cfg.ReplicationMaxLWorkers = replicationMaxLWorkers + transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS))) if err != nil { return cfg, err