diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 34e4f8a92..29d60d659 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -26,6 +26,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -83,6 +84,11 @@ type expiryState struct { expiryCh chan expiryTask } +// PendingTasks returns the number of pending ILM expiry tasks. +func (es *expiryState) PendingTasks() int { + return len(es.expiryCh) +} + func (es *expiryState) queueExpiryTask(oi ObjectInfo, rmVersion bool) { select { case <-GlobalContext.Done(): @@ -122,6 +128,8 @@ type transitionState struct { mu sync.Mutex numWorkers int killCh chan struct{} + + activeTasks int32 } func (t *transitionState) queueTransitionTask(oi ObjectInfo) { @@ -148,6 +156,17 @@ func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionStat } } +// PendingTasks returns the number of ILM transition tasks waiting for a worker +// goroutine. +func (t *transitionState) PendingTasks() int { + return len(globalTransitionState.transitionCh) +} + +// ActiveTasks returns the number of active (ongoing) ILM transition tasks. +func (t *transitionState) ActiveTasks() int { + return int(atomic.LoadInt32(&t.activeTasks)) +} + // worker waits for transition tasks func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) { for { @@ -160,10 +179,11 @@ func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) { if !ok { return } - + atomic.AddInt32(&t.activeTasks, 1) if err := transitionObject(ctx, objectAPI, oi); err != nil { logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err)) } + atomic.AddInt32(&t.activeTasks, -1) } } } diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index c0f69a61c..3247d07dd 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -68,6 +68,7 @@ const ( softwareSubsystem MetricSubsystem = "software" sysCallSubsystem MetricSubsystem = "syscall" usageSubsystem MetricSubsystem = "usage" + ilmSubsystem MetricSubsystem = "ilm" ) // MetricName are the individual names for the metric. @@ -121,6 +122,10 @@ const ( upTime = "uptime_seconds" memory = "resident_memory_bytes" cpu = "cpu_total_seconds" + + expiryPendingTasks MetricName = "expiry_pending_tasks" + transitionPendingTasks MetricName = "transition_pending_tasks" + transitionActiveTasks MetricName = "transition_active_tasks" ) const ( @@ -249,6 +254,7 @@ func GetGeneratorsForPeer() []MetricsGenerator { getMinioVersionMetrics, getNetworkMetrics, getS3TTFBMetric, + getILMNodeMetrics, } return g } @@ -1000,6 +1006,66 @@ func getS3TTFBMetric() MetricsGroup { } } +func getTransitionPendingTasksMD() MetricDescription { + return MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: transitionPendingTasks, + Help: "Number of pending ILM transition tasks in the queue.", + Type: gaugeMetric, + } +} + +func getTransitionActiveTasksMD() MetricDescription { + return MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: transitionActiveTasks, + Help: "Number of active ILM transition tasks.", + Type: gaugeMetric, + } +} + +func getExpiryPendingTasksMD() MetricDescription { + return MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: expiryPendingTasks, + Help: "Number of pending ILM expiry tasks in the queue.", + Type: gaugeMetric, + } +} + +func getILMNodeMetrics() MetricsGroup { + return MetricsGroup{ + id: "ILMNodeMetrics", + cachedRead: cachedRead, + read: func(_ context.Context) []Metric { + expPendingTasks := Metric{ + Description: getExpiryPendingTasksMD(), + } + trPendingTasks := Metric{ + Description: getTransitionPendingTasksMD(), + } + trActiveTasks := Metric{ + Description: getTransitionActiveTasksMD(), + } + if globalExpiryState != nil { + expPendingTasks.Value = float64(globalExpiryState.PendingTasks()) + } + if globalTransitionState != nil { + trPendingTasks.Value = float64(globalTransitionState.PendingTasks()) + trActiveTasks.Value = float64(globalTransitionState.ActiveTasks()) + } + return []Metric{ + expPendingTasks, + trPendingTasks, + trActiveTasks, + } + }, + } +} + func getMinioVersionMetrics() MetricsGroup { return MetricsGroup{ id: "MinioVersionMetrics",