mirror of
https://github.com/minio/minio.git
synced 2025-01-12 07:23:23 -05:00
replication: warn if insufficient workers (#13899)
This should give an early warning if configured replication workers are insufficient to meet application workload.
This commit is contained in:
parent
109c927dad
commit
d422d24278
@ -62,6 +62,8 @@ const (
|
|||||||
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
|
ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
|
||||||
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
|
// ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
|
||||||
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
|
ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
|
||||||
|
// ReplicationWorkerMultiplier is suggested worker multiplier if traffic exceeds replication worker capacity
|
||||||
|
ReplicationWorkerMultiplier = 1.5
|
||||||
)
|
)
|
||||||
|
|
||||||
// gets replication config associated to a given bucket name.
|
// gets replication config associated to a given bucket name.
|
||||||
@ -1412,6 +1414,14 @@ func (p *ReplicationPool) ResizeFailedWorkers(n int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// suggestedWorkers recommends an increase in number of workers to meet replication load.
|
||||||
|
func (p *ReplicationPool) suggestedWorkers(failQueue bool) int {
|
||||||
|
if failQueue {
|
||||||
|
return int(float64(p.mrfWorkerSize) * ReplicationWorkerMultiplier)
|
||||||
|
}
|
||||||
|
return int(float64(p.workerSize) * ReplicationWorkerMultiplier)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return
|
return
|
||||||
@ -1425,6 +1435,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
|||||||
})
|
})
|
||||||
case p.mrfReplicaCh <- ri:
|
case p.mrfReplicaCh <- ri:
|
||||||
default:
|
default:
|
||||||
|
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication failed workers could not keep up with healing failures - consider increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), replicationSubsystem)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1450,6 +1461,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
|
|||||||
})
|
})
|
||||||
case ch <- ri:
|
case ch <- ri:
|
||||||
default:
|
default:
|
||||||
|
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1486,6 +1498,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
|
|||||||
})
|
})
|
||||||
case ch <- doi:
|
case ch <- doi:
|
||||||
default:
|
default:
|
||||||
|
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Replication workers could not keep up with incoming traffic - consider increasing number of replication workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), replicationSubsystem)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user