Make number of replication workers configurable. (#11379)

MINIO_API_REPLICATION_WORKERS env.var and
`mc admin config set api` allow number of replication
workers to be configurable. Defaults to half the number
of cpus available.

Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
This commit is contained in:
Poorna Krishnamoorthy 2021-02-02 03:15:06 -08:00 committed by GitHub
parent c4848f9b4f
commit fe3aca70c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 24 deletions

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"runtime"
"strings" "strings"
"time" "time"
@ -631,19 +630,9 @@ func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo)
var ( var (
globalReplicationState *replicationState globalReplicationState *replicationState
// TODO: currently keeping it conservative
// but eventually can be tuned in future,
// take only half the CPUs for replication
// conservatively.
globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2
) )
func newReplicationState() *replicationState { func newReplicationState() *replicationState {
// fix minimum concurrent replication to 1 for single CPU setup
if globalReplicationConcurrent == 0 {
globalReplicationConcurrent = 1
}
rs := &replicationState{ rs := &replicationState{
replicaCh: make(chan ObjectInfo, 10000), replicaCh: make(chan ObjectInfo, 10000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
@ -684,8 +673,8 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
return return
} }
// Start with globalReplicationConcurrent. // Start replication workers per count set in api config or MINIO_API_REPLICATION_WORKERS.
for i := 0; i < globalReplicationConcurrent; i++ { for i := 0; i < globalAPIConfig.getReplicationWorkers(); i++ {
globalReplicationState.addWorker(ctx, objectAPI) globalReplicationState.addWorker(ctx, objectAPI)
} }
} }

View File

@ -29,14 +29,14 @@ import (
// API sub-system constants // API sub-system constants
const ( const (
apiRequestsMax = "requests_max" apiRequestsMax = "requests_max"
apiRequestsDeadline = "requests_deadline" apiRequestsDeadline = "requests_deadline"
apiClusterDeadline = "cluster_deadline" apiClusterDeadline = "cluster_deadline"
apiCorsAllowOrigin = "cors_allow_origin" apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline" apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum" apiListQuorum = "list_quorum"
apiExtendListCacheLife = "extend_list_cache_life" apiExtendListCacheLife = "extend_list_cache_life"
apiReplicationWorkers = "replication_workers"
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
@ -45,6 +45,7 @@ const (
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE" EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS"
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
) )
// Deprecated key and ENVs // Deprecated key and ENVs
@ -84,6 +85,10 @@ var (
Key: apiExtendListCacheLife, Key: apiExtendListCacheLife,
Value: "0s", Value: "0s",
}, },
config.KV{
Key: apiReplicationWorkers,
Value: "100",
},
} }
) )
@ -96,6 +101,7 @@ type Config struct {
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
ListQuorum string `json:"list_strict_quorum"` ListQuorum string `json:"list_strict_quorum"`
ExtendListLife time.Duration `json:"extend_list_cache_life"` ExtendListLife time.Duration `json:"extend_list_cache_life"`
ReplicationWorkers int `json:"replication_workers"`
} }
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
@ -172,7 +178,13 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers)))
if err != nil {
return cfg, err
}
if replicationWorkers <= 0 {
return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1")
}
return Config{ return Config{
RequestsMax: requestsMax, RequestsMax: requestsMax,
RequestsDeadline: requestsDeadline, RequestsDeadline: requestsDeadline,
@ -181,5 +193,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
RemoteTransportDeadline: remoteTransportDeadline, RemoteTransportDeadline: remoteTransportDeadline,
ListQuorum: listQuorum, ListQuorum: listQuorum,
ExtendListLife: listLife, ExtendListLife: listLife,
ReplicationWorkers: replicationWorkers,
}, nil }, nil
} }

View File

@ -45,5 +45,11 @@ var (
Optional: true, Optional: true,
Type: "duration", Type: "duration",
}, },
config.HelpKV{
Key: apiReplicationWorkers,
Description: `set the number of replication workers, defaults to 100`,
Optional: true,
Type: "number",
},
} }
) )

View File

@ -281,4 +281,10 @@ Example 1:
"", "",
"Refer to https://docs.min.io/docs/minio-kms-quickstart-guide.html for setting up SSE", "Refer to https://docs.min.io/docs/minio-kms-quickstart-guide.html for setting up SSE",
) )
ErrInvalidReplicationWorkersValue = newErrFn(
"Invalid value for replication workers",
"",
"MINIO_API_REPLICATION_WORKERS: should be > 0",
)
) )

View File

@ -36,7 +36,8 @@ type apiConfig struct {
extendListLife time.Duration extendListLife time.Duration
corsAllowOrigins []string corsAllowOrigins []string
// total drives per erasure set across pools. // total drives per erasure set across pools.
totalDriveCount int totalDriveCount int
replicationWorkers int
} }
func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
@ -78,6 +79,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
t.requestsDeadline = cfg.RequestsDeadline t.requestsDeadline = cfg.RequestsDeadline
t.listQuorum = cfg.GetListQuorum() t.listQuorum = cfg.GetListQuorum()
t.extendListLife = cfg.ExtendListLife t.extendListLife = cfg.ExtendListLife
t.replicationWorkers = cfg.ReplicationWorkers
} }
func (t *apiConfig) getListQuorum() int { func (t *apiConfig) getListQuorum() int {
@ -152,3 +154,10 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc {
} }
} }
} }
func (t *apiConfig) getReplicationWorkers() int {
t.mu.RLock()
defer t.mu.RUnlock()
return t.replicationWorkers
}

View File

@ -492,7 +492,6 @@ func serverMain(ctx *cli.Context) {
// Enable background operations for erasure coding // Enable background operations for erasure coding
if globalIsErasure { if globalIsErasure {
initAutoHeal(GlobalContext, newObject) initAutoHeal(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject) initBackgroundTransition(GlobalContext, newObject)
initBackgroundExpiry(GlobalContext, newObject) initBackgroundExpiry(GlobalContext, newObject)
} }
@ -513,6 +512,9 @@ func serverMain(ctx *cli.Context) {
} }
} }
if globalIsErasure { // to be done after config init
initBackgroundReplication(GlobalContext, newObject)
}
if globalCacheConfig.Enabled { if globalCacheConfig.Enabled {
// initialize the new disk cache objects. // initialize the new disk cache objects.
var cacheAPI CacheObjectLayer var cacheAPI CacheObjectLayer