mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
avoid hot-tier SLA to be tied to warm-tier SLA (#18581)
it is okay if the warm-tier cannot keep up, we should continue to take I/O at hot-tier, only fail hot-tier or block it when we are disk full. Bonus: add metrics counter for these missed tasks, we will know for sure if one of the node is lagging behind or is losing too many tasks during transitioning.
This commit is contained in:
@@ -219,24 +219,23 @@ type transitionState struct {
|
||||
numWorkers int
|
||||
killCh chan struct{}
|
||||
|
||||
activeTasks int32
|
||||
activeTasks atomic.Int64
|
||||
missedImmediateTasks atomic.Int64
|
||||
|
||||
lastDayMu sync.RWMutex
|
||||
lastDayStats map[string]*lastDayTierStats
|
||||
}
|
||||
|
||||
func (t *transitionState) queueTransitionTask(oi ObjectInfo, event lifecycle.Event, src lcEventSrc, blocking bool) {
|
||||
func (t *transitionState) queueTransitionTask(oi ObjectInfo, event lifecycle.Event, src lcEventSrc) {
|
||||
task := transitionTask{objInfo: oi, event: event, src: src}
|
||||
if blocking {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
case t.transitionCh <- task:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
case t.transitionCh <- task:
|
||||
default:
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
case t.transitionCh <- task:
|
||||
default:
|
||||
switch src {
|
||||
case lcEventSrc_s3PutObject, lcEventSrc_s3CopyObject, lcEventSrc_s3CompleteMultipartUpload:
|
||||
// Update missed immediate tasks only for incoming requests.
|
||||
t.missedImmediateTasks.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -272,8 +271,14 @@ func (t *transitionState) PendingTasks() int {
|
||||
}
|
||||
|
||||
// ActiveTasks returns the number of active (ongoing) ILM transition tasks.
|
||||
func (t *transitionState) ActiveTasks() int {
|
||||
return int(atomic.LoadInt32(&t.activeTasks))
|
||||
func (t *transitionState) ActiveTasks() int64 {
|
||||
return t.activeTasks.Load()
|
||||
}
|
||||
|
||||
// MissedImmediateTasks returns the number of tasks - deferred to scanner due
|
||||
// to tasks channel being backlogged.
|
||||
func (t *transitionState) MissedImmediateTasks() int64 {
|
||||
return t.missedImmediateTasks.Load()
|
||||
}
|
||||
|
||||
// worker waits for transition tasks
|
||||
@@ -288,7 +293,7 @@ func (t *transitionState) worker(objectAPI ObjectLayer) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&t.activeTasks, 1)
|
||||
t.activeTasks.Add(1)
|
||||
if err := transitionObject(t.ctx, objectAPI, task.objInfo, newLifecycleAuditEvent(task.src, task.event)); err != nil {
|
||||
if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) && !xnet.IsNetworkOrHostDown(err, false) {
|
||||
if !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
@@ -306,7 +311,7 @@ func (t *transitionState) worker(objectAPI ObjectLayer) {
|
||||
}
|
||||
t.addLastDayStats(task.event.StorageClass, ts)
|
||||
}
|
||||
atomic.AddInt32(&t.activeTasks, -1)
|
||||
t.activeTasks.Add(-1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -379,7 +384,7 @@ func enqueueTransitionImmediate(obj ObjectInfo, src lcEventSrc) {
|
||||
if lc, err := globalLifecycleSys.Get(obj.Bucket); err == nil {
|
||||
switch event := lc.Eval(obj.ToLifecycleOpts()); event.Action {
|
||||
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
||||
globalTransitionState.queueTransitionTask(obj, event, src, true)
|
||||
globalTransitionState.queueTransitionTask(obj, event, src)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,7 +1190,7 @@ func applyTransitionRule(event lifecycle.Event, src lcEventSrc, obj ObjectInfo)
|
||||
if obj.DeleteMarker {
|
||||
return false
|
||||
}
|
||||
globalTransitionState.queueTransitionTask(obj, event, src, false)
|
||||
globalTransitionState.queueTransitionTask(obj, event, src)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -256,6 +256,7 @@ const (
|
||||
expiryPendingTasks MetricName = "expiry_pending_tasks"
|
||||
transitionPendingTasks MetricName = "transition_pending_tasks"
|
||||
transitionActiveTasks MetricName = "transition_active_tasks"
|
||||
transitionMissedTasks MetricName = "transition_missed_immediate_tasks"
|
||||
|
||||
transitionedBytes MetricName = "transitioned_bytes"
|
||||
transitionedObjects MetricName = "transitioned_objects"
|
||||
@@ -1707,6 +1708,16 @@ func getTransitionActiveTasksMD() MetricDescription {
|
||||
}
|
||||
}
|
||||
|
||||
func getTransitionMissedTasksMD() MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: nodeMetricNamespace,
|
||||
Subsystem: ilmSubsystem,
|
||||
Name: transitionMissedTasks,
|
||||
Help: "Number of missed immediate ILM transition tasks",
|
||||
Type: gaugeMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getExpiryPendingTasksMD() MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: nodeMetricNamespace,
|
||||
@@ -1781,17 +1792,22 @@ func getILMNodeMetrics() *MetricsGroup {
|
||||
trActiveTasks := Metric{
|
||||
Description: getTransitionActiveTasksMD(),
|
||||
}
|
||||
trMissedTasks := Metric{
|
||||
Description: getTransitionMissedTasksMD(),
|
||||
}
|
||||
if globalExpiryState != nil {
|
||||
expPendingTasks.Value = float64(globalExpiryState.PendingTasks())
|
||||
}
|
||||
if globalTransitionState != nil {
|
||||
trPendingTasks.Value = float64(globalTransitionState.PendingTasks())
|
||||
trActiveTasks.Value = float64(globalTransitionState.ActiveTasks())
|
||||
trMissedTasks.Value = float64(globalTransitionState.MissedImmediateTasks())
|
||||
}
|
||||
return []Metric{
|
||||
expPendingTasks,
|
||||
trPendingTasks,
|
||||
trActiveTasks,
|
||||
trMissedTasks,
|
||||
}
|
||||
})
|
||||
return mg
|
||||
|
||||
Reference in New Issue
Block a user