diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index a72142e47..7a82c0c61 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -62,6 +62,8 @@ const ( ObjectLockRetentionTimestamp = "objectlock-retention-timestamp" // ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp" + // ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity + ReplicationWorkerMultiplier = 1.5 ) // gets replication config associated to a given bucket name. @@ -1412,6 +1414,14 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) { } } +// suggestedWorkers recommends an increase in number of workers to meet replication load. +func (p *ReplicationPool) suggestedWorkers(failQueue bool) int { + if failQueue { + return int(float64(p.mrfWorkerSize) * ReplicationWorkerMultiplier) + } + return int(float64(p.workerSize) * ReplicationWorkerMultiplier) +} + func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { if p == nil { return @@ -1425,6 +1435,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { }) case p.mrfReplicaCh <- ri: default: + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication failed workers could not keep up with healing failures - consider increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem) } } @@ -1450,6 +1461,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { }) case ch <- ri: default: + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) } } @@ -1486,6 +1498,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf }) case ch <- doi: default: + logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem) } }