From b5dcaaccb47c6c75f32be47fb8a1bf10651f615c Mon Sep 17 00:00:00 2001 From: Ritesh H Shukla Date: Fri, 19 Mar 2021 00:04:29 -0700 Subject: [PATCH] Introduce metrics caching for performant metrics (#11831) --- cmd/metrics-v2.go | 272 ++++++++++++++++++++++++++++++---------------- 1 file changed, 176 insertions(+), 96 deletions(-) diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index b5adbbed3..1a5980ce2 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -116,8 +116,8 @@ const ( serverName = "server" ) -// GaugeMetricType for the types of metrics supported -type GaugeMetricType string +// MetricType for the types of metrics supported +type MetricType string const ( gaugeMetric = "gaugeMetric" @@ -131,7 +131,7 @@ type MetricDescription struct { Subsystem MetricSubsystem `json:"Subsystem"` Name MetricName `json:"MetricName"` Help string `json:"Help"` - Type GaugeMetricType `json:"Type"` + Type MetricType `json:"Type"` } // Metric captures the details for a metric @@ -144,10 +144,66 @@ type Metric struct { Histogram map[string]uint64 `json:"Histogram"` } +func (m *Metric) copyMetric() Metric { + metric := Metric{ + Description: m.Description, + Value: m.Value, + HistogramBucketLabel: m.HistogramBucketLabel, + StaticLabels: make(map[string]string), + VariableLabels: make(map[string]string), + Histogram: make(map[string]uint64), + } + for k, v := range m.StaticLabels { + metric.StaticLabels[k] = v + } + for k, v := range m.VariableLabels { + metric.VariableLabels[k] = v + } + for k, v := range m.Histogram { + metric.Histogram[k] = v + } + return metric +} + // MetricsGroup are a group of metrics that are initialized together. type MetricsGroup struct { - Metrics []Metric - initialize func(ctx context.Context, m *MetricsGroup) + id string + cacheInterval time.Duration + cachedRead func(ctx context.Context, mg *MetricsGroup) []Metric + read func(ctx context.Context) []Metric +} + +var metricsGroupCache = make(map[string]*timedValue) +var cacheLock sync.Mutex + +func cachedRead(ctx context.Context, mg *MetricsGroup) (metrics []Metric) { + cacheLock.Lock() + defer cacheLock.Unlock() + v, ok := metricsGroupCache[mg.id] + if !ok { + interval := mg.cacheInterval + if interval == 0 { + interval = 30 * time.Second + } + v = &timedValue{} + v.Once.Do(func() { + v.Update = func() (interface{}, error) { + c := mg.read(ctx) + return c, nil + } + v.TTL = interval + }) + metricsGroupCache[mg.id] = v + } + c, err := v.Get() + if err != nil { + return []Metric{} + } + m := c.([]Metric) + for i := range m { + metrics = append(metrics, m[i].copyMetric()) + } + return metrics } // MetricsGenerator are functions that generate metric groups. @@ -674,8 +730,10 @@ func getMinIOProcessStartTimeMD() MetricDescription { } func getMinioProcMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "MinioProcMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) p, err := procfs.Self() if err != nil { logger.LogOnceIf(ctx, err, nodeMetricNamespace) @@ -708,70 +766,74 @@ func getMinioProcMetrics() MetricsGroup { return } - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioFDOpenMD(), Value: float64(openFDs), }, ) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioFDLimitMD(), Value: float64(l.OpenFiles), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinIOProcessSysCallRMD(), Value: float64(io.SyscR), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinIOProcessSysCallWMD(), Value: float64(io.SyscW), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioProcessIOReadBytesMD(), Value: float64(io.ReadBytes), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioProcessIOWriteBytesMD(), Value: float64(io.WriteBytes), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioProcessIOReadCachedBytesMD(), Value: float64(io.RChar), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinioProcessIOWriteCachedBytesMD(), Value: float64(io.WChar), }) - metrics.Metrics = append(metrics.Metrics, + metrics = append(metrics, Metric{ Description: getMinIOProcessStartTimeMD(), Value: startTime, }) + return }, } } func getGoMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { - metrics.Metrics = append(metrics.Metrics, Metric{ + id: "GoMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = append(metrics, Metric{ Description: getMinIOGORoutineCountMD(), Value: float64(runtime.NumGoroutine()), }) + return }, } } func getS3TTFBMetric() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "s3TTFBMetric", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { // Read prometheus metric on this channel ch := make(chan prometheus.Metric) @@ -800,7 +862,7 @@ func getS3TTFBMetric() MetricsGroup { VariableLabels: labels, Value: float64(b.GetCumulativeCount()), } - metrics.Metrics = append(metrics.Metrics, metric) + metrics = append(metrics, metric) } } @@ -809,53 +871,54 @@ func getS3TTFBMetric() MetricsGroup { httpRequestsDuration.Collect(ch) close(ch) wg.Wait() + return }, } } func getMinioVersionMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(_ context.Context, m *MetricsGroup) { - m.Metrics = append(m.Metrics, Metric{ + id: "MinioVersionMetrics", + cachedRead: cachedRead, + read: func(_ context.Context) (metrics []Metric) { + metrics = append(metrics, Metric{ Description: getMinIOCommitMD(), VariableLabels: map[string]string{"commit": CommitID}, }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getMinIOVersionMD(), VariableLabels: map[string]string{"version": Version}, }) + return }, } } func getNodeHealthMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{ - { - Description: getNodeOnlineTotalMD(), - }, { - Description: getNodeOfflineTotalMD(), - }, - }, - initialize: func(_ context.Context, m *MetricsGroup) { + id: "NodeHealthMetrics", + cachedRead: cachedRead, + read: func(_ context.Context) (metrics []Metric) { nodesUp, nodesDown := GetPeerOnlineCount() - for i := range m.Metrics { - switch { - case m.Metrics[i].Description.Name == onlineTotal: - m.Metrics[i].Value = float64(nodesUp) - case m.Metrics[i].Description.Name == offlineTotal: - m.Metrics[i].Value = float64(nodesDown) - } - } + metrics = append(metrics, Metric{ + Description: getNodeOnlineTotalMD(), + Value: float64(nodesUp), + }) + metrics = append(metrics, Metric{ + Description: getNodeOfflineTotalMD(), + Value: float64(nodesDown), + }) + return }, } } func getMinioHealingMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(_ context.Context, m *MetricsGroup) { + id: "minioHealingMetrics", + cachedRead: cachedRead, + read: func(_ context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) if !globalIsErasure { return } @@ -867,13 +930,14 @@ func getMinioHealingMetrics() MetricsGroup { if !bgSeq.lastHealActivity.IsZero() { dur = time.Since(bgSeq.lastHealActivity) } - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getHealLastActivityTimeMD(), Value: float64(dur), }) - m.Metrics = append(m.Metrics, getObjectsScanned(bgSeq)...) - m.Metrics = append(m.Metrics, getScannedItems(bgSeq)...) - m.Metrics = append(m.Metrics, getFailedItems(bgSeq)...) + metrics = append(metrics, getObjectsScanned(bgSeq)...) + metrics = append(metrics, getScannedItems(bgSeq)...) + metrics = append(metrics, getFailedItems(bgSeq)...) + return }, } } @@ -919,118 +983,127 @@ func getObjectsScanned(seq *healSequence) (m []Metric) { } func getCacheMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, m *MetricsGroup) { + id: "CacheMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) cacheObjLayer := newCachedObjectLayerFn() // Service not initialized yet if cacheObjLayer == nil { return } - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheHitsTotalMD(), Value: float64(cacheObjLayer.CacheStats().getHits()), }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheHitsMissedTotalMD(), Value: float64(cacheObjLayer.CacheStats().getMisses()), }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheSentBytesMD(), Value: float64(cacheObjLayer.CacheStats().getBytesServed()), }) for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() { - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheUsagePercentMD(), Value: float64(cdStats.UsagePercent), VariableLabels: map[string]string{"disk": cdStats.Dir}, }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheUsageInfoMD(), Value: float64(cdStats.UsageState), VariableLabels: map[string]string{"disk": cdStats.Dir, "level": cdStats.GetUsageLevelString()}, }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheUsedBytesMD(), Value: float64(cdStats.UsageSize), VariableLabels: map[string]string{"disk": cdStats.Dir}, }) - m.Metrics = append(m.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getCacheTotalBytesMD(), Value: float64(cdStats.TotalCapacity), VariableLabels: map[string]string{"disk": cdStats.Dir}, }) } + return }, } } func getHTTPMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "httpMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { httpStats := globalHTTPStats.toServerHTTPStats() - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3RequestsInQueueMD(), Value: float64(httpStats.S3RequestsInQueue), }) for api, value := range httpStats.CurrentS3Requests.APIStats { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3RequestsInFlightMD(), Value: float64(value), VariableLabels: map[string]string{"api": api}, }) } for api, value := range httpStats.TotalS3Requests.APIStats { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3RequestsTotalMD(), Value: float64(value), VariableLabels: map[string]string{"api": api}, }) } for api, value := range httpStats.TotalS3Errors.APIStats { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3RequestsErrorsMD(), Value: float64(value), VariableLabels: map[string]string{"api": api}, }) } + return }, } } func getNetworkMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { - metrics.Metrics = append(metrics.Metrics, Metric{ + id: "networkMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = append(metrics, Metric{ Description: getInternodeFailedRequests(), Value: float64(loadAndResetRPCNetworkErrsCounter()), }) connStats := globalConnStats.toServerConnStats() - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getInterNodeSentBytesMD(), Value: float64(connStats.TotalOutputBytes), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getInterNodeReceivedBytesMD(), Value: float64(connStats.TotalInputBytes), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3SentBytesMD(), Value: float64(connStats.S3OutputBytes), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getS3ReceivedBytesMD(), Value: float64(connStats.S3InputBytes), }) + return }, } } func getBucketUsageMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "BucketUsageMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) objLayer := newObjectLayerFn() // Service not initialized yet if objLayer == nil { @@ -1052,42 +1125,42 @@ func getBucketUsageMetrics() MetricsGroup { } for bucket, usage := range dataUsageInfo.BucketsUsage { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketUsageTotalBytesMD(), Value: float64(usage.Size), VariableLabels: map[string]string{"bucket": bucket}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketUsageObjectsTotalMD(), Value: float64(usage.ObjectsCount), VariableLabels: map[string]string{"bucket": bucket}, }) if usage.hasReplicationUsage() { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketRepPendingBytesMD(), Value: float64(usage.ReplicationPendingSize), VariableLabels: map[string]string{"bucket": bucket}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketRepFailedBytesMD(), Value: float64(usage.ReplicationFailedSize), VariableLabels: map[string]string{"bucket": bucket}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketRepSentBytesMD(), Value: float64(usage.ReplicatedSize), VariableLabels: map[string]string{"bucket": bucket}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketRepReceivedBytesMD(), Value: float64(usage.ReplicaSize), VariableLabels: map[string]string{"bucket": bucket}, }) } - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getBucketObjectDistributionMD(), Histogram: usage.ObjectSizesHistogram, HistogramBucketLabel: "range", @@ -1095,13 +1168,16 @@ func getBucketUsageMetrics() MetricsGroup { }) } + return }, } } func getLocalStorageMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "localStorageMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) objLayer := newObjectLayerFn() // Service not initialized yet if objLayer == nil { @@ -1114,31 +1190,34 @@ func getLocalStorageMetrics() MetricsGroup { storageInfo, _ := objLayer.LocalStorageInfo(ctx) for _, disk := range storageInfo.Disks { - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getNodeDiskUsedBytesMD(), Value: float64(disk.UsedSpace), VariableLabels: map[string]string{"disk": disk.DrivePath}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getNodeDiskFreeBytesMD(), Value: float64(disk.AvailableSpace), VariableLabels: map[string]string{"disk": disk.DrivePath}, }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getNodeDiskTotalBytesMD(), Value: float64(disk.TotalSpace), VariableLabels: map[string]string{"disk": disk.DrivePath}, }) } + return }, } } func getClusterStorageMetrics() MetricsGroup { return MetricsGroup{ - Metrics: []Metric{}, - initialize: func(ctx context.Context, metrics *MetricsGroup) { + id: "ClusterStorageMetrics", + cachedRead: cachedRead, + read: func(ctx context.Context) (metrics []Metric) { + metrics = make([]Metric, 0) objLayer := newObjectLayerFn() // Service not initialized yet if objLayer == nil { @@ -1154,40 +1233,41 @@ func getClusterStorageMetrics() MetricsGroup { onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) totalDisks := onlineDisks.Merge(offlineDisks) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterCapacityTotalBytesMD(), Value: float64(GetTotalCapacity(storageInfo.Disks)), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterCapacityFreeBytesMD(), Value: float64(GetTotalCapacityFree(storageInfo.Disks)), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterCapacityUsageBytesMD(), Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterCapacityUsageFreeBytesMD(), Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterDisksOfflineTotalMD(), Value: float64(offlineDisks.Sum()), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterDisksOnlineTotalMD(), Value: float64(onlineDisks.Sum()), }) - metrics.Metrics = append(metrics.Metrics, Metric{ + metrics = append(metrics, Metric{ Description: getClusterDisksTotalMD(), Value: float64(totalDisks.Sum()), }) + return }, } } @@ -1312,9 +1392,9 @@ func (c *minioCollectorV2) Describe(ch chan<- *prometheus.Desc) { func populateAndPublish(generatorFn func() []MetricsGenerator, publish func(m Metric) bool) { generators := generatorFn() for _, g := range generators { - metrics := g() - metrics.initialize(GlobalContext, &metrics) - for _, metric := range metrics.Metrics { + metricsGroup := g() + metrics := metricsGroup.cachedRead(GlobalContext, &metricsGroup) + for _, metric := range metrics { if !publish(metric) { return }