diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index c79edb494..6e1b9e072 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -35,6 +35,51 @@ import ( "github.com/prometheus/procfs" ) +var ( + nodeCollector *minioNodeCollector + clusterCollector *minioClusterCollector + peerMetricsGroups []*MetricsGroup +) + +func init() { + clusterMetricsGroups := []*MetricsGroup{ + getBucketUsageMetrics(), + getMinioHealingMetrics(), + getNodeHealthMetrics(), + getClusterStorageMetrics(), + } + + peerMetricsGroups = []*MetricsGroup{ + getCacheMetrics(), + getGoMetrics(), + getHTTPMetrics(), + getLocalStorageMetrics(), + getMinioProcMetrics(), + getMinioVersionMetrics(), + getNetworkMetrics(), + getS3TTFBMetric(), + getILMNodeMetrics(), + getScannerNodeMetrics(), + } + + allMetricsGroups := func() (allMetrics []*MetricsGroup) { + allMetrics = append(allMetrics, clusterMetricsGroups...) + allMetrics = append(allMetrics, peerMetricsGroups...) + return allMetrics + }() + + nodeCollector = newMinioCollectorNode([]*MetricsGroup{ + getNodeHealthMetrics(), + getCacheMetrics(), + getHTTPMetrics(), + getNetworkMetrics(), + getMinioVersionMetrics(), + getS3TTFBMetric(), + }) + + clusterCollector = newMinioClusterCollector(allMetricsGroups) +} + // MetricNamespace is top level grouping of metrics to create the metric name. type MetricNamespace string @@ -164,117 +209,37 @@ type Metric struct { Histogram map[string]uint64 `json:"Histogram"` } -func (m *Metric) copyMetric() Metric { - metric := Metric{ - Description: m.Description, - Value: m.Value, - HistogramBucketLabel: m.HistogramBucketLabel, - StaticLabels: make(map[string]string), - VariableLabels: make(map[string]string), - Histogram: make(map[string]uint64), - } - for k, v := range m.StaticLabels { - metric.StaticLabels[k] = v - } - for k, v := range m.VariableLabels { - metric.VariableLabels[k] = v - } - for k, v := range m.Histogram { - metric.Histogram[k] = v - } - return metric -} - // MetricsGroup are a group of metrics that are initialized together. type MetricsGroup struct { - id string - cacheInterval time.Duration - cachedRead func(ctx context.Context, mg *MetricsGroup) []Metric - read func(ctx context.Context) []Metric + metricsCache timedValue } -var metricsGroupCache = make(map[string]*timedValue) -var cacheLock sync.Mutex - -func cachedRead(ctx context.Context, mg *MetricsGroup) (metrics []Metric) { - cacheLock.Lock() - defer cacheLock.Unlock() - v, ok := metricsGroupCache[mg.id] - if !ok { - interval := mg.cacheInterval - if interval == 0 { - interval = 30 * time.Second +// RegisterRead register the metrics populator function to be used +// to populate new values upon cache invalidation. +func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) { + g.metricsCache.Once.Do(func() { + g.metricsCache.TTL = 10 * time.Second + g.metricsCache.Update = func() (interface{}, error) { + return read(GlobalContext), nil } - v = &timedValue{} - v.Once.Do(func() { - v.Update = func() (interface{}, error) { - c := mg.read(ctx) - return c, nil - } - v.TTL = interval - }) - metricsGroupCache[mg.id] = v - } - c, err := v.Get() + }) +} + +// Get - returns cached value always upton the configured TTL, +// once the TTL expires "read()" registered function is called +// to return the new values and updated. +func (g *MetricsGroup) Get() []Metric { + c, err := g.metricsCache.Get() if err != nil { return []Metric{} } - m := c.([]Metric) - for i := range m { - metrics = append(metrics, m[i].copyMetric()) + + m, ok := c.([]Metric) + if !ok { + return []Metric{} } - return metrics -} -// MetricsGenerator are functions that generate metric groups. -type MetricsGenerator func() MetricsGroup - -// GetGlobalGenerators gets all the generators the report global metrics pre calculated. -func GetGlobalGenerators() []MetricsGenerator { - g := []MetricsGenerator{ - getBucketUsageMetrics, - getMinioHealingMetrics, - getNodeHealthMetrics, - getClusterStorageMetrics, - } - return g -} - -// GetAllGenerators gets all the metric generators. -func GetAllGenerators() []MetricsGenerator { - g := GetGlobalGenerators() - g = append(g, GetGeneratorsForPeer()...) - return g -} - -// GetGeneratorsForPeer - gets the generators to report to peer. -func GetGeneratorsForPeer() []MetricsGenerator { - g := []MetricsGenerator{ - getCacheMetrics, - getGoMetrics, - getHTTPMetrics, - getLocalStorageMetrics, - getMinioProcMetrics, - getMinioVersionMetrics, - getNetworkMetrics, - getS3TTFBMetric, - getILMNodeMetrics, - getScannerNodeMetrics, - } - return g -} - -// GetSingleNodeGenerators gets the metrics that are local -func GetSingleNodeGenerators() []MetricsGenerator { - g := []MetricsGenerator{ - getNodeHealthMetrics, - getCacheMetrics, - getHTTPMetrics, - getNetworkMetrics, - getMinioVersionMetrics, - getS3TTFBMetric, - } - return g + return m } func getClusterCapacityTotalBytesMD() MetricDescription { @@ -286,6 +251,7 @@ func getClusterCapacityTotalBytesMD() MetricDescription { Type: gaugeMetric, } } + func getClusterCapacityFreeBytesMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -295,6 +261,7 @@ func getClusterCapacityFreeBytesMD() MetricDescription { Type: gaugeMetric, } } + func getClusterCapacityUsageBytesMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -304,6 +271,7 @@ func getClusterCapacityUsageBytesMD() MetricDescription { Type: gaugeMetric, } } + func getClusterCapacityUsageFreeBytesMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -323,6 +291,7 @@ func getNodeDiskUsedBytesMD() MetricDescription { Type: gaugeMetric, } } + func getNodeDiskFreeBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -332,6 +301,7 @@ func getNodeDiskFreeBytesMD() MetricDescription { Type: gaugeMetric, } } + func getClusterDisksOfflineTotalMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -381,6 +351,7 @@ func getNodeDiskTotalBytesMD() MetricDescription { Type: gaugeMetric, } } + func getUsageLastScanActivityMD() MetricDescription { return MetricDescription{ Namespace: minioMetricNamespace, @@ -429,6 +400,7 @@ func getBucketRepFailedBytesMD() MetricDescription { Type: gaugeMetric, } } + func getBucketRepSentBytesMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -438,6 +410,7 @@ func getBucketRepSentBytesMD() MetricDescription { Type: gaugeMetric, } } + func getBucketRepReceivedBytesMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -457,6 +430,7 @@ func getBucketRepFailedOperationsMD() MetricDescription { Type: gaugeMetric, } } + func getBucketObjectDistributionMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -466,6 +440,7 @@ func getBucketObjectDistributionMD() MetricDescription { Type: histogramMetric, } } + func getInternodeFailedRequests() MetricDescription { return MetricDescription{ Namespace: interNodeMetricNamespace, @@ -485,6 +460,7 @@ func getInterNodeSentBytesMD() MetricDescription { Type: counterMetric, } } + func getInterNodeReceivedBytesMD() MetricDescription { return MetricDescription{ Namespace: interNodeMetricNamespace, @@ -494,6 +470,7 @@ func getInterNodeReceivedBytesMD() MetricDescription { Type: counterMetric, } } + func getS3SentBytesMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -503,6 +480,7 @@ func getS3SentBytesMD() MetricDescription { Type: counterMetric, } } + func getS3ReceivedBytesMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -512,6 +490,7 @@ func getS3ReceivedBytesMD() MetricDescription { Type: counterMetric, } } + func getS3RequestsInFlightMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -521,6 +500,7 @@ func getS3RequestsInFlightMD() MetricDescription { Type: gaugeMetric, } } + func getS3RequestsInQueueMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -530,6 +510,7 @@ func getS3RequestsInQueueMD() MetricDescription { Type: gaugeMetric, } } + func getS3RequestsTotalMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -539,6 +520,7 @@ func getS3RequestsTotalMD() MetricDescription { Type: counterMetric, } } + func getS3RequestsErrorsMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -548,6 +530,7 @@ func getS3RequestsErrorsMD() MetricDescription { Type: counterMetric, } } + func getS3RequestsCanceledMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -557,6 +540,7 @@ func getS3RequestsCanceledMD() MetricDescription { Type: counterMetric, } } + func getS3RejectedAuthRequestsTotalMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -566,6 +550,7 @@ func getS3RejectedAuthRequestsTotalMD() MetricDescription { Type: counterMetric, } } + func getS3RejectedHeaderRequestsTotalMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -575,6 +560,7 @@ func getS3RejectedHeaderRequestsTotalMD() MetricDescription { Type: counterMetric, } } + func getS3RejectedTimestampRequestsTotalMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -584,6 +570,7 @@ func getS3RejectedTimestampRequestsTotalMD() MetricDescription { Type: counterMetric, } } + func getS3RejectedInvalidRequestsTotalMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -593,6 +580,7 @@ func getS3RejectedInvalidRequestsTotalMD() MetricDescription { Type: counterMetric, } } + func getCacheHitsTotalMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -602,6 +590,7 @@ func getCacheHitsTotalMD() MetricDescription { Type: counterMetric, } } + func getCacheHitsMissedTotalMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -611,6 +600,7 @@ func getCacheHitsMissedTotalMD() MetricDescription { Type: counterMetric, } } + func getCacheUsagePercentMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -620,6 +610,7 @@ func getCacheUsagePercentMD() MetricDescription { Type: gaugeMetric, } } + func getCacheUsageInfoMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -629,6 +620,7 @@ func getCacheUsageInfoMD() MetricDescription { Type: gaugeMetric, } } + func getCacheUsedBytesMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -638,6 +630,7 @@ func getCacheUsedBytesMD() MetricDescription { Type: gaugeMetric, } } + func getCacheTotalBytesMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -647,6 +640,7 @@ func getCacheTotalBytesMD() MetricDescription { Type: gaugeMetric, } } + func getCacheSentBytesMD() MetricDescription { return MetricDescription{ Namespace: minioNamespace, @@ -656,6 +650,7 @@ func getCacheSentBytesMD() MetricDescription { Type: counterMetric, } } + func getHealObjectsTotalMD() MetricDescription { return MetricDescription{ Namespace: healMetricNamespace, @@ -665,6 +660,7 @@ func getHealObjectsTotalMD() MetricDescription { Type: gaugeMetric, } } + func getHealObjectsHealTotalMD() MetricDescription { return MetricDescription{ Namespace: healMetricNamespace, @@ -684,6 +680,7 @@ func getHealObjectsFailTotalMD() MetricDescription { Type: gaugeMetric, } } + func getHealLastActivityTimeMD() MetricDescription { return MetricDescription{ Namespace: healMetricNamespace, @@ -693,6 +690,7 @@ func getHealLastActivityTimeMD() MetricDescription { Type: gaugeMetric, } } + func getNodeOnlineTotalMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -702,6 +700,7 @@ func getNodeOnlineTotalMD() MetricDescription { Type: gaugeMetric, } } + func getNodeOfflineTotalMD() MetricDescription { return MetricDescription{ Namespace: clusterMetricNamespace, @@ -711,6 +710,7 @@ func getNodeOfflineTotalMD() MetricDescription { Type: gaugeMetric, } } + func getMinIOVersionMD() MetricDescription { return MetricDescription{ Namespace: minioMetricNamespace, @@ -720,6 +720,7 @@ func getMinIOVersionMD() MetricDescription { Type: gaugeMetric, } } + func getMinIOCommitMD() MetricDescription { return MetricDescription{ Namespace: minioMetricNamespace, @@ -729,6 +730,7 @@ func getMinIOCommitMD() MetricDescription { Type: gaugeMetric, } } + func getS3TTFBDistributionMD() MetricDescription { return MetricDescription{ Namespace: s3MetricNamespace, @@ -738,6 +740,7 @@ func getS3TTFBDistributionMD() MetricDescription { Type: gaugeMetric, } } + func getMinioFDOpenMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -747,6 +750,7 @@ func getMinioFDOpenMD() MetricDescription { Type: gaugeMetric, } } + func getMinioFDLimitMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -756,6 +760,7 @@ func getMinioFDLimitMD() MetricDescription { Type: gaugeMetric, } } + func getMinioProcessIOWriteBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -765,6 +770,7 @@ func getMinioProcessIOWriteBytesMD() MetricDescription { Type: counterMetric, } } + func getMinioProcessIOReadBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -774,6 +780,7 @@ func getMinioProcessIOReadBytesMD() MetricDescription { Type: counterMetric, } } + func getMinioProcessIOWriteCachedBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -783,6 +790,7 @@ func getMinioProcessIOWriteCachedBytesMD() MetricDescription { Type: counterMetric, } } + func getMinioProcessIOReadCachedBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -792,6 +800,7 @@ func getMinioProcessIOReadCachedBytesMD() MetricDescription { Type: counterMetric, } } + func getMinIOProcessSysCallRMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -801,6 +810,7 @@ func getMinIOProcessSysCallRMD() MetricDescription { Type: counterMetric, } } + func getMinIOProcessSysCallWMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -810,6 +820,7 @@ func getMinIOProcessSysCallWMD() MetricDescription { Type: counterMetric, } } + func getMinIOGORoutineCountMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -819,6 +830,7 @@ func getMinIOGORoutineCountMD() MetricDescription { Type: gaugeMetric, } } + func getMinIOProcessStartTimeMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -828,6 +840,7 @@ func getMinIOProcessStartTimeMD() MetricDescription { Type: gaugeMetric, } } + func getMinIOProcessUptimeMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -837,6 +850,7 @@ func getMinIOProcessUptimeMD() MetricDescription { Type: gaugeMetric, } } + func getMinIOProcessResidentMemory() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -846,6 +860,7 @@ func getMinIOProcessResidentMemory() MetricDescription { Type: gaugeMetric, } } + func getMinIOProcessCPUTime() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -855,170 +870,166 @@ func getMinIOProcessCPUTime() MetricDescription { Type: counterMetric, } } -func getMinioProcMetrics() MetricsGroup { - return MetricsGroup{ - id: "MinioProcMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - if runtime.GOOS == "windows" { - return nil - } - metrics = make([]Metric, 0, 20) - p, err := procfs.Self() - if err != nil { - logger.LogOnceIf(ctx, err, nodeMetricNamespace) - return - } - var openFDs int - openFDs, err = p.FileDescriptorsLen() - if err != nil { - logger.LogOnceIf(ctx, err, getMinioFDOpenMD()) - return - } - l, err := p.Limits() - if err != nil { - logger.LogOnceIf(ctx, err, getMinioFDLimitMD()) - return - } - io, err := p.IO() - if err != nil { - logger.LogOnceIf(ctx, err, ioSubsystem) - return - } - stat, err := p.Stat() - if err != nil { - logger.LogOnceIf(ctx, err, processSubsystem) - return - } - startTime, err := stat.StartTime() - if err != nil { - logger.LogOnceIf(ctx, err, startTime) - return - } - metrics = append(metrics, - Metric{ - Description: getMinioFDOpenMD(), - Value: float64(openFDs), - }, - ) - metrics = append(metrics, - Metric{ - Description: getMinioFDLimitMD(), - Value: float64(l.OpenFiles), - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessSysCallRMD(), - Value: float64(io.SyscR), - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessSysCallWMD(), - Value: float64(io.SyscW), - }) - metrics = append(metrics, - Metric{ - Description: getMinioProcessIOReadBytesMD(), - Value: float64(io.ReadBytes), - }) - metrics = append(metrics, - Metric{ - Description: getMinioProcessIOWriteBytesMD(), - Value: float64(io.WriteBytes), - }) - metrics = append(metrics, - Metric{ - Description: getMinioProcessIOReadCachedBytesMD(), - Value: float64(io.RChar), - }) - metrics = append(metrics, - Metric{ - Description: getMinioProcessIOWriteCachedBytesMD(), - Value: float64(io.WChar), - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessStartTimeMD(), - Value: startTime, - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessUptimeMD(), - Value: time.Since(globalBootTime).Seconds(), - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessResidentMemory(), - Value: float64(stat.ResidentMemory()), - }) - metrics = append(metrics, - Metric{ - Description: getMinIOProcessCPUTime(), - Value: stat.CPUTime(), - }) +func getMinioProcMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + if runtime.GOOS == "windows" { + return nil + } + metrics = make([]Metric, 0, 20) + p, err := procfs.Self() + if err != nil { + logger.LogOnceIf(ctx, err, nodeMetricNamespace) return - }, - } -} -func getGoMetrics() MetricsGroup { - return MetricsGroup{ - id: "GoMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - metrics = append(metrics, Metric{ - Description: getMinIOGORoutineCountMD(), - Value: float64(runtime.NumGoroutine()), + } + var openFDs int + openFDs, err = p.FileDescriptorsLen() + if err != nil { + logger.LogOnceIf(ctx, err, getMinioFDOpenMD()) + return + } + l, err := p.Limits() + if err != nil { + logger.LogOnceIf(ctx, err, getMinioFDLimitMD()) + return + } + io, err := p.IO() + if err != nil { + logger.LogOnceIf(ctx, err, ioSubsystem) + return + } + stat, err := p.Stat() + if err != nil { + logger.LogOnceIf(ctx, err, processSubsystem) + return + } + startTime, err := stat.StartTime() + if err != nil { + logger.LogOnceIf(ctx, err, startTime) + return + } + + metrics = append(metrics, + Metric{ + Description: getMinioFDOpenMD(), + Value: float64(openFDs), + }, + ) + metrics = append(metrics, + Metric{ + Description: getMinioFDLimitMD(), + Value: float64(l.OpenFiles), }) - return - }, - } + metrics = append(metrics, + Metric{ + Description: getMinIOProcessSysCallRMD(), + Value: float64(io.SyscR), + }) + metrics = append(metrics, + Metric{ + Description: getMinIOProcessSysCallWMD(), + Value: float64(io.SyscW), + }) + metrics = append(metrics, + Metric{ + Description: getMinioProcessIOReadBytesMD(), + Value: float64(io.ReadBytes), + }) + metrics = append(metrics, + Metric{ + Description: getMinioProcessIOWriteBytesMD(), + Value: float64(io.WriteBytes), + }) + metrics = append(metrics, + Metric{ + Description: getMinioProcessIOReadCachedBytesMD(), + Value: float64(io.RChar), + }) + metrics = append(metrics, + Metric{ + Description: getMinioProcessIOWriteCachedBytesMD(), + Value: float64(io.WChar), + }) + metrics = append(metrics, + Metric{ + Description: getMinIOProcessStartTimeMD(), + Value: startTime, + }) + metrics = append(metrics, + Metric{ + Description: getMinIOProcessUptimeMD(), + Value: time.Since(globalBootTime).Seconds(), + }) + metrics = append(metrics, + Metric{ + Description: getMinIOProcessResidentMemory(), + Value: float64(stat.ResidentMemory()), + }) + metrics = append(metrics, + Metric{ + Description: getMinIOProcessCPUTime(), + Value: stat.CPUTime(), + }) + return + }) + return mg } -func getS3TTFBMetric() MetricsGroup { - return MetricsGroup{ - id: "s3TTFBMetric", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - // Read prometheus metric on this channel - ch := make(chan prometheus.Metric) - var wg sync.WaitGroup - wg.Add(1) +func getGoMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + metrics = append(metrics, Metric{ + Description: getMinIOGORoutineCountMD(), + Value: float64(runtime.NumGoroutine()), + }) + return + }) + return mg +} - // Read prometheus histogram data and convert it to internal metric data - go func() { - defer wg.Done() - for promMetric := range ch { - dtoMetric := &dto.Metric{} - err := promMetric.Write(dtoMetric) - if err != nil { - logger.LogIf(GlobalContext, err) - return - } - h := dtoMetric.GetHistogram() - for _, b := range h.Bucket { - labels := make(map[string]string) - for _, lp := range dtoMetric.GetLabel() { - labels[*lp.Name] = *lp.Value - } - labels["le"] = fmt.Sprintf("%.3f", *b.UpperBound) - metric := Metric{ - Description: getS3TTFBDistributionMD(), - VariableLabels: labels, - Value: float64(b.GetCumulativeCount()), - } - metrics = append(metrics, metric) - } +func getS3TTFBMetric() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + // Read prometheus metric on this channel + ch := make(chan prometheus.Metric) + var wg sync.WaitGroup + wg.Add(1) + + // Read prometheus histogram data and convert it to internal metric data + go func() { + defer wg.Done() + for promMetric := range ch { + dtoMetric := &dto.Metric{} + err := promMetric.Write(dtoMetric) + if err != nil { + logger.LogIf(GlobalContext, err) + return } + h := dtoMetric.GetHistogram() + for _, b := range h.Bucket { + labels := make(map[string]string) + for _, lp := range dtoMetric.GetLabel() { + labels[*lp.Name] = *lp.Value + } + labels["le"] = fmt.Sprintf("%.3f", *b.UpperBound) + metric := Metric{ + Description: getS3TTFBDistributionMD(), + VariableLabels: labels, + Value: float64(b.GetCumulativeCount()), + } + metrics = append(metrics, metric) + } + } - }() + }() - httpRequestsDuration.Collect(ch) - close(ch) - wg.Wait() - return - }, - } + httpRequestsDuration.Collect(ch) + close(ch) + wg.Wait() + return + }) + return mg } func getTransitionPendingTasksMD() MetricDescription { @@ -1051,194 +1062,184 @@ func getExpiryPendingTasksMD() MetricDescription { } } -func getILMNodeMetrics() MetricsGroup { - return MetricsGroup{ - id: "ILMNodeMetrics", - cachedRead: cachedRead, - read: func(_ context.Context) []Metric { - expPendingTasks := Metric{ - Description: getExpiryPendingTasksMD(), - } - trPendingTasks := Metric{ - Description: getTransitionPendingTasksMD(), - } - trActiveTasks := Metric{ - Description: getTransitionActiveTasksMD(), - } - if globalExpiryState != nil { - expPendingTasks.Value = float64(globalExpiryState.PendingTasks()) - } - if globalTransitionState != nil { - trPendingTasks.Value = float64(globalTransitionState.PendingTasks()) - trActiveTasks.Value = float64(globalTransitionState.ActiveTasks()) - } - return []Metric{ - expPendingTasks, - trPendingTasks, - trActiveTasks, - } - }, - } +func getILMNodeMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(_ context.Context) []Metric { + expPendingTasks := Metric{ + Description: getExpiryPendingTasksMD(), + } + trPendingTasks := Metric{ + Description: getTransitionPendingTasksMD(), + } + trActiveTasks := Metric{ + Description: getTransitionActiveTasksMD(), + } + if globalExpiryState != nil { + expPendingTasks.Value = float64(globalExpiryState.PendingTasks()) + } + if globalTransitionState != nil { + trPendingTasks.Value = float64(globalTransitionState.PendingTasks()) + trActiveTasks.Value = float64(globalTransitionState.ActiveTasks()) + } + return []Metric{ + expPendingTasks, + trPendingTasks, + trActiveTasks, + } + }) + return mg } -func getScannerNodeMetrics() MetricsGroup { - return MetricsGroup{ - id: "ScannerNodeMetrics", - cachedRead: cachedRead, - read: func(_ context.Context) []Metric { - metrics := []Metric{ - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: scannerSubsystem, - Name: "objects_scanned", - Help: "Total number of unique objects scanned since server start.", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalObjects)), +func getScannerNodeMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(_ context.Context) []Metric { + metrics := []Metric{ + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: scannerSubsystem, + Name: "objects_scanned", + Help: "Total number of unique objects scanned since server start.", + Type: counterMetric, }, - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: scannerSubsystem, - Name: "versions_scanned", - Help: "Total number of object versions scanned since server start.", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalVersions)), + Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalObjects)), + }, + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: scannerSubsystem, + Name: "versions_scanned", + Help: "Total number of object versions scanned since server start.", + Type: counterMetric, }, - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: scannerSubsystem, - Name: "directories_scanned", - Help: "Total number of directories scanned since server start.", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.accFolders)), + Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalVersions)), + }, + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: scannerSubsystem, + Name: "directories_scanned", + Help: "Total number of directories scanned since server start.", + Type: counterMetric, }, - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: scannerSubsystem, - Name: "bucket_scans_started", - Help: "Total number of bucket scans started since server start", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsStarted)), + Value: float64(atomic.LoadUint64(&globalScannerStats.accFolders)), + }, + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: scannerSubsystem, + Name: "bucket_scans_started", + Help: "Total number of bucket scans started since server start", + Type: counterMetric, }, - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: scannerSubsystem, - Name: "bucket_scans_finished", - Help: "Total number of bucket scans finished since server start", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsFinished)), + Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsStarted)), + }, + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: scannerSubsystem, + Name: "bucket_scans_finished", + Help: "Total number of bucket scans finished since server start", + Type: counterMetric, }, - { - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: ilmSubsystem, - Name: "versions_scanned", - Help: "Total number of object versions checked for ilm actions since server start", - Type: counterMetric, - }, - Value: float64(atomic.LoadUint64(&globalScannerStats.ilmChecks)), + Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsFinished)), + }, + { + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: "versions_scanned", + Help: "Total number of object versions checked for ilm actions since server start", + Type: counterMetric, }, + Value: float64(atomic.LoadUint64(&globalScannerStats.ilmChecks)), + }, + } + for i := range globalScannerStats.actions { + action := lifecycle.Action(i) + v := atomic.LoadUint64(&globalScannerStats.actions[action]) + if v == 0 { + continue } - for i := range globalScannerStats.actions { - action := lifecycle.Action(i) - v := atomic.LoadUint64(&globalScannerStats.actions[action]) - if v == 0 { - continue - } - metrics = append(metrics, Metric{ - Description: MetricDescription{ - Namespace: nodeMetricNamespace, - Subsystem: ilmSubsystem, - Name: MetricName("action_count_" + toSnake(action.String())), - Help: "Total action outcome of lifecycle checks since server start", - Type: counterMetric, - }, - Value: float64(v), - }) - } - return metrics - }, - } + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: ilmSubsystem, + Name: MetricName("action_count_" + toSnake(action.String())), + Help: "Total action outcome of lifecycle checks since server start", + Type: counterMetric, + }, + Value: float64(v), + }) + } + return metrics + }) + return mg } -func getMinioVersionMetrics() MetricsGroup { - return MetricsGroup{ - id: "MinioVersionMetrics", - cachedRead: cachedRead, - read: func(_ context.Context) (metrics []Metric) { - metrics = append(metrics, Metric{ - Description: getMinIOCommitMD(), - VariableLabels: map[string]string{"commit": CommitID}, - }) - metrics = append(metrics, Metric{ - Description: getMinIOVersionMD(), - VariableLabels: map[string]string{"version": Version}, - }) +func getMinioVersionMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(_ context.Context) (metrics []Metric) { + metrics = append(metrics, Metric{ + Description: getMinIOCommitMD(), + VariableLabels: map[string]string{"commit": CommitID}, + }) + metrics = append(metrics, Metric{ + Description: getMinIOVersionMD(), + VariableLabels: map[string]string{"version": Version}, + }) + return + }) + return mg +} + +func getNodeHealthMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(_ context.Context) (metrics []Metric) { + if globalIsGateway { return - }, - } + } + metrics = make([]Metric, 0, 16) + nodesUp, nodesDown := GetPeerOnlineCount() + metrics = append(metrics, Metric{ + Description: getNodeOnlineTotalMD(), + Value: float64(nodesUp), + }) + metrics = append(metrics, Metric{ + Description: getNodeOfflineTotalMD(), + Value: float64(nodesDown), + }) + return + }) + return mg } -func getNodeHealthMetrics() MetricsGroup { - return MetricsGroup{ - id: "NodeHealthMetrics", - cachedRead: cachedRead, - read: func(_ context.Context) (metrics []Metric) { - if globalIsGateway { - return - } - metrics = make([]Metric, 0, 16) - nodesUp, nodesDown := GetPeerOnlineCount() - metrics = append(metrics, Metric{ - Description: getNodeOnlineTotalMD(), - Value: float64(nodesUp), - }) - metrics = append(metrics, Metric{ - Description: getNodeOfflineTotalMD(), - Value: float64(nodesDown), - }) +func getMinioHealingMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(_ context.Context) (metrics []Metric) { + metrics = make([]Metric, 0, 5) + if !globalIsErasure { return - }, - } -} - -func getMinioHealingMetrics() MetricsGroup { - return MetricsGroup{ - id: "minioHealingMetrics", - cachedRead: cachedRead, - read: func(_ context.Context) (metrics []Metric) { - metrics = make([]Metric, 0, 5) - if !globalIsErasure { - return - } - bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if !exists { - return - } - - if bgSeq.lastHealActivity.IsZero() { - return - } - - metrics = append(metrics, Metric{ - Description: getHealLastActivityTimeMD(), - Value: float64(time.Since(bgSeq.lastHealActivity)), - }) - metrics = append(metrics, getObjectsScanned(bgSeq)...) - metrics = append(metrics, getHealedItems(bgSeq)...) - metrics = append(metrics, getFailedItems(bgSeq)...) + } + bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) + if !exists { return - }, - } + } + + if bgSeq.lastHealActivity.IsZero() { + return + } + + metrics = append(metrics, Metric{ + Description: getHealLastActivityTimeMD(), + Value: float64(time.Since(bgSeq.lastHealActivity)), + }) + metrics = append(metrics, getObjectsScanned(bgSeq)...) + metrics = append(metrics, getHealedItems(bgSeq)...) + metrics = append(metrics, getFailedItems(bgSeq)...) + return + }) + return mg } func getFailedItems(seq *healSequence) (m []Metric) { @@ -1283,343 +1284,335 @@ func getObjectsScanned(seq *healSequence) (m []Metric) { return } -func getCacheMetrics() MetricsGroup { - return MetricsGroup{ - id: "CacheMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - metrics = make([]Metric, 0, 20) - cacheObjLayer := newCachedObjectLayerFn() - // Service not initialized yet - if cacheObjLayer == nil { - return - } - metrics = append(metrics, Metric{ - Description: getCacheHitsTotalMD(), - Value: float64(cacheObjLayer.CacheStats().getHits()), - }) - metrics = append(metrics, Metric{ - Description: getCacheHitsMissedTotalMD(), - Value: float64(cacheObjLayer.CacheStats().getMisses()), - }) - metrics = append(metrics, Metric{ - Description: getCacheSentBytesMD(), - Value: float64(cacheObjLayer.CacheStats().getBytesServed()), - }) - for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() { - metrics = append(metrics, Metric{ - Description: getCacheUsagePercentMD(), - Value: float64(cdStats.UsagePercent), - VariableLabels: map[string]string{"disk": cdStats.Dir}, - }) - metrics = append(metrics, Metric{ - Description: getCacheUsageInfoMD(), - Value: float64(cdStats.UsageState), - VariableLabels: map[string]string{"disk": cdStats.Dir, "level": cdStats.GetUsageLevelString()}, - }) - metrics = append(metrics, Metric{ - Description: getCacheUsedBytesMD(), - Value: float64(cdStats.UsageSize), - VariableLabels: map[string]string{"disk": cdStats.Dir}, - }) - metrics = append(metrics, Metric{ - Description: getCacheTotalBytesMD(), - Value: float64(cdStats.TotalCapacity), - VariableLabels: map[string]string{"disk": cdStats.Dir}, - }) - } +func getCacheMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0, 20) + cacheObjLayer := newCachedObjectLayerFn() + // Service not initialized yet + if cacheObjLayer == nil { return - }, - } + } + metrics = append(metrics, Metric{ + Description: getCacheHitsTotalMD(), + Value: float64(cacheObjLayer.CacheStats().getHits()), + }) + metrics = append(metrics, Metric{ + Description: getCacheHitsMissedTotalMD(), + Value: float64(cacheObjLayer.CacheStats().getMisses()), + }) + metrics = append(metrics, Metric{ + Description: getCacheSentBytesMD(), + Value: float64(cacheObjLayer.CacheStats().getBytesServed()), + }) + for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() { + metrics = append(metrics, Metric{ + Description: getCacheUsagePercentMD(), + Value: float64(cdStats.UsagePercent), + VariableLabels: map[string]string{"disk": cdStats.Dir}, + }) + metrics = append(metrics, Metric{ + Description: getCacheUsageInfoMD(), + Value: float64(cdStats.UsageState), + VariableLabels: map[string]string{"disk": cdStats.Dir, "level": cdStats.GetUsageLevelString()}, + }) + metrics = append(metrics, Metric{ + Description: getCacheUsedBytesMD(), + Value: float64(cdStats.UsageSize), + VariableLabels: map[string]string{"disk": cdStats.Dir}, + }) + metrics = append(metrics, Metric{ + Description: getCacheTotalBytesMD(), + Value: float64(cdStats.TotalCapacity), + VariableLabels: map[string]string{"disk": cdStats.Dir}, + }) + } + return + }) + return mg } -func getHTTPMetrics() MetricsGroup { - return MetricsGroup{ - id: "httpMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - httpStats := globalHTTPStats.toServerHTTPStats() - metrics = make([]Metric, 0, 3+ - len(httpStats.CurrentS3Requests.APIStats)+ - len(httpStats.TotalS3Requests.APIStats)+ - len(httpStats.TotalS3Errors.APIStats)) +func getHTTPMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + httpStats := globalHTTPStats.toServerHTTPStats() + metrics = make([]Metric, 0, 3+ + len(httpStats.CurrentS3Requests.APIStats)+ + len(httpStats.TotalS3Requests.APIStats)+ + len(httpStats.TotalS3Errors.APIStats)) + metrics = append(metrics, Metric{ + Description: getS3RejectedAuthRequestsTotalMD(), + Value: float64(httpStats.TotalS3RejectedAuth), + }) + metrics = append(metrics, Metric{ + Description: getS3RejectedTimestampRequestsTotalMD(), + Value: float64(httpStats.TotalS3RejectedTime), + }) + metrics = append(metrics, Metric{ + Description: getS3RejectedHeaderRequestsTotalMD(), + Value: float64(httpStats.TotalS3RejectedHeader), + }) + metrics = append(metrics, Metric{ + Description: getS3RejectedInvalidRequestsTotalMD(), + Value: float64(httpStats.TotalS3RejectedInvalid), + }) + metrics = append(metrics, Metric{ + Description: getS3RequestsInQueueMD(), + Value: float64(httpStats.S3RequestsInQueue), + }) + for api, value := range httpStats.CurrentS3Requests.APIStats { metrics = append(metrics, Metric{ - Description: getS3RejectedAuthRequestsTotalMD(), - Value: float64(httpStats.TotalS3RejectedAuth), + Description: getS3RequestsInFlightMD(), + Value: float64(value), + VariableLabels: map[string]string{"api": api}, }) + } + for api, value := range httpStats.TotalS3Requests.APIStats { metrics = append(metrics, Metric{ - Description: getS3RejectedTimestampRequestsTotalMD(), - Value: float64(httpStats.TotalS3RejectedTime), + Description: getS3RequestsTotalMD(), + Value: float64(value), + VariableLabels: map[string]string{"api": api}, }) + } + for api, value := range httpStats.TotalS3Errors.APIStats { metrics = append(metrics, Metric{ - Description: getS3RejectedHeaderRequestsTotalMD(), - Value: float64(httpStats.TotalS3RejectedHeader), + Description: getS3RequestsErrorsMD(), + Value: float64(value), + VariableLabels: map[string]string{"api": api}, }) + } + for api, value := range httpStats.TotalS3Canceled.APIStats { metrics = append(metrics, Metric{ - Description: getS3RejectedInvalidRequestsTotalMD(), - Value: float64(httpStats.TotalS3RejectedInvalid), + Description: getS3RequestsCanceledMD(), + Value: float64(value), + VariableLabels: map[string]string{"api": api}, }) - metrics = append(metrics, Metric{ - Description: getS3RequestsInQueueMD(), - Value: float64(httpStats.S3RequestsInQueue), - }) - for api, value := range httpStats.CurrentS3Requests.APIStats { - metrics = append(metrics, Metric{ - Description: getS3RequestsInFlightMD(), - Value: float64(value), - VariableLabels: map[string]string{"api": api}, - }) - } - for api, value := range httpStats.TotalS3Requests.APIStats { - metrics = append(metrics, Metric{ - Description: getS3RequestsTotalMD(), - Value: float64(value), - VariableLabels: map[string]string{"api": api}, - }) - } - for api, value := range httpStats.TotalS3Errors.APIStats { - metrics = append(metrics, Metric{ - Description: getS3RequestsErrorsMD(), - Value: float64(value), - VariableLabels: map[string]string{"api": api}, - }) - } - for api, value := range httpStats.TotalS3Canceled.APIStats { - metrics = append(metrics, Metric{ - Description: getS3RequestsCanceledMD(), - Value: float64(value), - VariableLabels: map[string]string{"api": api}, - }) - } - return - }, - } + } + return + }) + return mg } -func getNetworkMetrics() MetricsGroup { - return MetricsGroup{ - id: "networkMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - metrics = make([]Metric, 0, 10) - metrics = append(metrics, Metric{ - Description: getInternodeFailedRequests(), - Value: float64(loadAndResetRPCNetworkErrsCounter()), - }) - connStats := globalConnStats.toServerConnStats() - metrics = append(metrics, Metric{ - Description: getInterNodeSentBytesMD(), - Value: float64(connStats.TotalOutputBytes), - }) - metrics = append(metrics, Metric{ - Description: getInterNodeReceivedBytesMD(), - Value: float64(connStats.TotalInputBytes), - }) - metrics = append(metrics, Metric{ - Description: getS3SentBytesMD(), - Value: float64(connStats.S3OutputBytes), - }) - metrics = append(metrics, Metric{ - Description: getS3ReceivedBytesMD(), - Value: float64(connStats.S3InputBytes), - }) - return - }, - } +func getNetworkMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0, 10) + metrics = append(metrics, Metric{ + Description: getInternodeFailedRequests(), + Value: float64(loadAndResetRPCNetworkErrsCounter()), + }) + connStats := globalConnStats.toServerConnStats() + metrics = append(metrics, Metric{ + Description: getInterNodeSentBytesMD(), + Value: float64(connStats.TotalOutputBytes), + }) + metrics = append(metrics, Metric{ + Description: getInterNodeReceivedBytesMD(), + Value: float64(connStats.TotalInputBytes), + }) + metrics = append(metrics, Metric{ + Description: getS3SentBytesMD(), + Value: float64(connStats.S3OutputBytes), + }) + metrics = append(metrics, Metric{ + Description: getS3ReceivedBytesMD(), + Value: float64(connStats.S3InputBytes), + }) + return + }) + return mg } -func getBucketUsageMetrics() MetricsGroup { - return MetricsGroup{ - id: "BucketUsageMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - objLayer := newObjectLayerFn() - // Service not initialized yet - if objLayer == nil || globalIsGateway { - return - } +func getBucketUsageMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil || globalIsGateway { + return + } - metrics = make([]Metric, 0, 50) - dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer) - if err != nil { - return - } + metrics = make([]Metric, 0, 50) + dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer) + if err != nil { + return + } - // data usage has not captured any data yet. - if dataUsageInfo.LastUpdate.IsZero() { - return - } + // data usage has not captured any data yet. + if dataUsageInfo.LastUpdate.IsZero() { + return + } + + metrics = append(metrics, Metric{ + Description: getUsageLastScanActivityMD(), + Value: float64(time.Since(dataUsageInfo.LastUpdate)), + }) + + for bucket, usage := range dataUsageInfo.BucketsUsage { + stats := getLatestReplicationStats(bucket, usage) metrics = append(metrics, Metric{ - Description: getUsageLastScanActivityMD(), - Value: float64(time.Since(dataUsageInfo.LastUpdate)), + Description: getBucketUsageTotalBytesMD(), + Value: float64(usage.Size), + VariableLabels: map[string]string{"bucket": bucket}, }) - for bucket, usage := range dataUsageInfo.BucketsUsage { - stats := getLatestReplicationStats(bucket, usage) + metrics = append(metrics, Metric{ + Description: getBucketUsageObjectsTotalMD(), + Value: float64(usage.ObjectsCount), + VariableLabels: map[string]string{"bucket": bucket}, + }) - metrics = append(metrics, Metric{ - Description: getBucketUsageTotalBytesMD(), - Value: float64(usage.Size), - VariableLabels: map[string]string{"bucket": bucket}, - }) + metrics = append(metrics, Metric{ + Description: getBucketRepReceivedBytesMD(), + Value: float64(stats.ReplicaSize), + VariableLabels: map[string]string{"bucket": bucket}, + }) - metrics = append(metrics, Metric{ - Description: getBucketUsageObjectsTotalMD(), - Value: float64(usage.ObjectsCount), - VariableLabels: map[string]string{"bucket": bucket}, - }) + if stats.hasReplicationUsage() { + for arn, stat := range stats.Stats { + metrics = append(metrics, Metric{ + Description: getBucketRepFailedBytesMD(), + Value: float64(stat.FailedSize), + VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, + }) + metrics = append(metrics, Metric{ + Description: getBucketRepSentBytesMD(), + Value: float64(stat.ReplicatedSize), + VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, + }) + metrics = append(metrics, Metric{ + Description: getBucketRepFailedOperationsMD(), + Value: float64(stat.FailedCount), + VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, + }) + metrics = append(metrics, Metric{ + Description: getBucketRepLatencyMD(), + HistogramBucketLabel: "range", + Histogram: stat.Latency.getUploadLatency(), + VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn}, + }) - metrics = append(metrics, Metric{ - Description: getBucketRepReceivedBytesMD(), - Value: float64(stats.ReplicaSize), - VariableLabels: map[string]string{"bucket": bucket}, - }) - - if stats.hasReplicationUsage() { - for arn, stat := range stats.Stats { - metrics = append(metrics, Metric{ - Description: getBucketRepFailedBytesMD(), - Value: float64(stat.FailedSize), - VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, - }) - metrics = append(metrics, Metric{ - Description: getBucketRepSentBytesMD(), - Value: float64(stat.ReplicatedSize), - VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, - }) - metrics = append(metrics, Metric{ - Description: getBucketRepFailedOperationsMD(), - Value: float64(stat.FailedCount), - VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, - }) - metrics = append(metrics, Metric{ - Description: getBucketRepLatencyMD(), - HistogramBucketLabel: "range", - Histogram: stat.Latency.getUploadLatency(), - VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn}, - }) - - } } - - metrics = append(metrics, Metric{ - Description: getBucketObjectDistributionMD(), - Histogram: usage.ObjectSizesHistogram, - HistogramBucketLabel: "range", - VariableLabels: map[string]string{"bucket": bucket}, - }) - } - return - }, - } + + metrics = append(metrics, Metric{ + Description: getBucketObjectDistributionMD(), + Histogram: usage.ObjectSizesHistogram, + HistogramBucketLabel: "range", + VariableLabels: map[string]string{"bucket": bucket}, + }) + + } + return + }) + return mg } -func getLocalStorageMetrics() MetricsGroup { - return MetricsGroup{ - id: "localStorageMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - objLayer := newObjectLayerFn() - // Service not initialized yet - if objLayer == nil || globalIsGateway { - return - } - metrics = make([]Metric, 0, 50) - storageInfo, _ := objLayer.LocalStorageInfo(ctx) - for _, disk := range storageInfo.Disks { - metrics = append(metrics, Metric{ - Description: getNodeDiskUsedBytesMD(), - Value: float64(disk.UsedSpace), - VariableLabels: map[string]string{"disk": disk.DrivePath}, - }) - - metrics = append(metrics, Metric{ - Description: getNodeDiskFreeBytesMD(), - Value: float64(disk.AvailableSpace), - VariableLabels: map[string]string{"disk": disk.DrivePath}, - }) - - metrics = append(metrics, Metric{ - Description: getNodeDiskTotalBytesMD(), - Value: float64(disk.TotalSpace), - VariableLabels: map[string]string{"disk": disk.DrivePath}, - }) - - metrics = append(metrics, Metric{ - Description: getClusterDisksFreeInodes(), - Value: float64(disk.FreeInodes), - VariableLabels: map[string]string{"disk": disk.DrivePath}, - }) - } +func getLocalStorageMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil || globalIsGateway { return - }, - } + } + + metrics = make([]Metric, 0, 50) + storageInfo, _ := objLayer.LocalStorageInfo(ctx) + for _, disk := range storageInfo.Disks { + metrics = append(metrics, Metric{ + Description: getNodeDiskUsedBytesMD(), + Value: float64(disk.UsedSpace), + VariableLabels: map[string]string{"disk": disk.DrivePath}, + }) + + metrics = append(metrics, Metric{ + Description: getNodeDiskFreeBytesMD(), + Value: float64(disk.AvailableSpace), + VariableLabels: map[string]string{"disk": disk.DrivePath}, + }) + + metrics = append(metrics, Metric{ + Description: getNodeDiskTotalBytesMD(), + Value: float64(disk.TotalSpace), + VariableLabels: map[string]string{"disk": disk.DrivePath}, + }) + + metrics = append(metrics, Metric{ + Description: getClusterDisksFreeInodes(), + Value: float64(disk.FreeInodes), + VariableLabels: map[string]string{"disk": disk.DrivePath}, + }) + } + return + }) + return mg } -func getClusterStorageMetrics() MetricsGroup { - return MetricsGroup{ - id: "ClusterStorageMetrics", - cachedRead: cachedRead, - read: func(ctx context.Context) (metrics []Metric) { - objLayer := newObjectLayerFn() - // Service not initialized yet - if objLayer == nil || !globalIsErasure { - return - } - // Fetch disk space info, ignore errors - metrics = make([]Metric, 0, 10) - storageInfo, _ := objLayer.StorageInfo(ctx) - onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) - totalDisks := onlineDisks.Merge(offlineDisks) - - metrics = append(metrics, Metric{ - Description: getClusterCapacityTotalBytesMD(), - Value: float64(GetTotalCapacity(storageInfo.Disks)), - }) - - metrics = append(metrics, Metric{ - Description: getClusterCapacityFreeBytesMD(), - Value: float64(GetTotalCapacityFree(storageInfo.Disks)), - }) - - metrics = append(metrics, Metric{ - Description: getClusterCapacityUsageBytesMD(), - Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo), - }) - - metrics = append(metrics, Metric{ - Description: getClusterCapacityUsageFreeBytesMD(), - Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo), - }) - - metrics = append(metrics, Metric{ - Description: getClusterDisksOfflineTotalMD(), - Value: float64(offlineDisks.Sum()), - }) - - metrics = append(metrics, Metric{ - Description: getClusterDisksOnlineTotalMD(), - Value: float64(onlineDisks.Sum()), - }) - - metrics = append(metrics, Metric{ - Description: getClusterDisksTotalMD(), - Value: float64(totalDisks.Sum()), - }) +func getClusterStorageMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil || !globalIsErasure { return - }, - } + } + + // Fetch disk space info, ignore errors + metrics = make([]Metric, 0, 10) + storageInfo, _ := objLayer.StorageInfo(ctx) + onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) + totalDisks := onlineDisks.Merge(offlineDisks) + + metrics = append(metrics, Metric{ + Description: getClusterCapacityTotalBytesMD(), + Value: float64(GetTotalCapacity(storageInfo.Disks)), + }) + + metrics = append(metrics, Metric{ + Description: getClusterCapacityFreeBytesMD(), + Value: float64(GetTotalCapacityFree(storageInfo.Disks)), + }) + + metrics = append(metrics, Metric{ + Description: getClusterCapacityUsageBytesMD(), + Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo), + }) + + metrics = append(metrics, Metric{ + Description: getClusterCapacityUsageFreeBytesMD(), + Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo), + }) + + metrics = append(metrics, Metric{ + Description: getClusterDisksOfflineTotalMD(), + Value: float64(offlineDisks.Sum()), + }) + + metrics = append(metrics, Metric{ + Description: getClusterDisksOnlineTotalMD(), + Value: float64(onlineDisks.Sum()), + }) + + metrics = append(metrics, Metric{ + Description: getClusterDisksTotalMD(), + Value: float64(totalDisks.Sum()), + }) + return + }) + return mg } type minioClusterCollector struct { - desc *prometheus.Desc + metricsGroups []*MetricsGroup + desc *prometheus.Desc } -func newMinioClusterCollector() *minioClusterCollector { +func newMinioClusterCollector(metricsGroups []*MetricsGroup) *minioClusterCollector { return &minioClusterCollector{ - desc: prometheus.NewDesc("minio_stats", "Statistics exposed by MinIO server", nil, nil), + metricsGroups: metricsGroups, + desc: prometheus.NewDesc("minio_stats", "Statistics exposed by MinIO server per cluster", nil, nil), } } @@ -1630,7 +1623,6 @@ func (c *minioClusterCollector) Describe(ch chan<- *prometheus.Desc) { // Collect is called by the Prometheus registry when collecting metrics. func (c *minioClusterCollector) Collect(out chan<- prometheus.Metric) { - var wg sync.WaitGroup publish := func(in <-chan Metric) { defer wg.Done() @@ -1679,7 +1671,7 @@ func (c *minioClusterCollector) Collect(out chan<- prometheus.Metric) { // Call peer api to fetch metrics peerCh := globalNotificationSys.GetClusterMetrics(GlobalContext) - selfCh := ReportMetrics(GlobalContext, GetAllGenerators) + selfCh := ReportMetrics(GlobalContext, c.metricsGroups) wg.Add(2) go publish(peerCh) go publish(selfCh) @@ -1687,11 +1679,11 @@ func (c *minioClusterCollector) Collect(out chan<- prometheus.Metric) { } // ReportMetrics reports serialized metrics to the channel passed for the metrics generated. -func ReportMetrics(ctx context.Context, generators func() []MetricsGenerator) <-chan Metric { +func ReportMetrics(ctx context.Context, metricsGroups []*MetricsGroup) <-chan Metric { ch := make(chan Metric) go func() { defer close(ch) - populateAndPublish(generators, func(m Metric) bool { + populateAndPublish(metricsGroups, func(m Metric) bool { if m.VariableLabels == nil { m.VariableLabels = make(map[string]string) } @@ -1709,24 +1701,24 @@ func ReportMetrics(ctx context.Context, generators func() []MetricsGenerator) <- return ch } -// minioCollectorV2 is the Custom Collector -type minioCollectorV2 struct { - generator func() []MetricsGenerator - desc *prometheus.Desc +// minioNodeCollector is the Custom Collector +type minioNodeCollector struct { + metricsGroups []*MetricsGroup + desc *prometheus.Desc } // Describe sends the super-set of all possible descriptors of metrics -func (c *minioCollectorV2) Describe(ch chan<- *prometheus.Desc) { +func (c *minioNodeCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.desc } // populateAndPublish populates and then publishes the metrics generated by the generator function. -func populateAndPublish(generatorFn func() []MetricsGenerator, publish func(m Metric) bool) { - generators := generatorFn() - for _, g := range generators { - metricsGroup := g() - metrics := metricsGroup.cachedRead(GlobalContext, &metricsGroup) - for _, metric := range metrics { +func populateAndPublish(metricsGroups []*MetricsGroup, publish func(m Metric) bool) { + for _, mg := range metricsGroups { + if mg == nil { + continue + } + for _, metric := range mg.Get() { if !publish(metric) { return } @@ -1735,12 +1727,12 @@ func populateAndPublish(generatorFn func() []MetricsGenerator, publish func(m Me } // Collect is called by the Prometheus registry when collecting metrics. -func (c *minioCollectorV2) Collect(ch chan<- prometheus.Metric) { +func (c *minioNodeCollector) Collect(ch chan<- prometheus.Metric) { // Expose MinIO's version information minioVersionInfo.WithLabelValues(Version, CommitID).Set(1.0) - populateAndPublish(c.generator, func(metric Metric) bool { + populateAndPublish(c.metricsGroups, func(metric Metric) bool { labels, values := getOrderedLabelValueArrays(metric.VariableLabels) values = append(values, globalLocalNodeName) labels = append(labels, serverName) @@ -1799,30 +1791,31 @@ func getOrderedLabelValueArrays(labelsWithValue map[string]string) (labels, valu return } -// newMinioCollectorV2 describes the collector +// newMinioCollectorNode describes the collector // and returns reference of minioCollector for version 2 // It creates the Prometheus Description which is used // to define Metric and help string -func newMinioCollectorV2(generator func() []MetricsGenerator) *minioCollectorV2 { - return &minioCollectorV2{ - generator: generator, - desc: prometheus.NewDesc("minio_stats", "Statistics exposed by MinIO server", nil, nil), +func newMinioCollectorNode(metricsGroups []*MetricsGroup) *minioNodeCollector { + return &minioNodeCollector{ + metricsGroups: metricsGroups, + desc: prometheus.NewDesc("minio_stats", "Statistics exposed by MinIO server per node", nil, nil), } } func metricsServerHandler() http.Handler { - registry := prometheus.NewRegistry() // Report all other metrics - err := registry.Register(newMinioClusterCollector()) + err := registry.Register(clusterCollector) if err != nil { logger.CriticalIf(GlobalContext, err) } + // DefaultGatherers include golang metrics and process metrics. gatherers := prometheus.Gatherers{ registry, } + // Delegate http serving to Prometheus client library, which will call collector.Collect. return promhttp.InstrumentMetricHandler( registry, @@ -1836,7 +1829,7 @@ func metricsServerHandler() http.Handler { func metricsNodeHandler() http.Handler { registry := prometheus.NewRegistry() - err := registry.Register(newMinioCollectorV2(GetSingleNodeGenerators)) + err := registry.Register(nodeCollector) if err != nil { logger.CriticalIf(GlobalContext, err) } diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index a27b697e6..d25d509c7 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1113,7 +1113,7 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) enc := gob.NewEncoder(w) - ch := ReportMetrics(r.Context(), GetGeneratorsForPeer) + ch := ReportMetrics(r.Context(), peerMetricsGroups) for m := range ch { if err := enc.Encode(m); err != nil { s.writeErrorResponse(w, errors.New("Encoding metric failed: "+err.Error())) diff --git a/cmd/utils_test.go b/cmd/utils_test.go index e6499cf91..b9931a696 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -29,6 +29,7 @@ import ( "reflect" "strings" "testing" + "time" ) // Tests maximum object size. @@ -486,3 +487,32 @@ func TestGetMinioMode(t *testing.T) { testMinioMode(globalMinioModeGatewayPrefix + globalGatewayName) } + +func TestTimedValue(t *testing.T) { + var cache timedValue + t.Parallel() + cache.Once.Do(func() { + cache.TTL = 2 * time.Second + cache.Update = func() (interface{}, error) { + return time.Now(), nil + } + }) + + i, _ := cache.Get() + t1 := i.(time.Time) + + j, _ := cache.Get() + t2 := j.(time.Time) + + if !t1.Equal(t2) { + t.Fatalf("expected time to be equal: %s != %s", t1, t2) + } + + time.Sleep(3 * time.Second) + k, _ := cache.Get() + t3 := k.(time.Time) + + if t1.Equal(t3) { + t.Fatalf("expected time to be un-equal: %s == %s", t1, t3) + } +}