Introduce metrics caching for performant metrics (#11831)

This commit is contained in:
Ritesh H Shukla 2021-03-19 00:04:29 -07:00 committed by GitHub
parent 0843280dc3
commit b5dcaaccb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -116,8 +116,8 @@ const (
serverName = "server"
)
// GaugeMetricType for the types of metrics supported
type GaugeMetricType string
// MetricType for the types of metrics supported
type MetricType string
const (
gaugeMetric = "gaugeMetric"
@ -131,7 +131,7 @@ type MetricDescription struct {
Subsystem MetricSubsystem `json:"Subsystem"`
Name MetricName `json:"MetricName"`
Help string `json:"Help"`
Type GaugeMetricType `json:"Type"`
Type MetricType `json:"Type"`
}
// Metric captures the details for a metric
@ -144,10 +144,66 @@ type Metric struct {
Histogram map[string]uint64 `json:"Histogram"`
}
func (m *Metric) copyMetric() Metric {
metric := Metric{
Description: m.Description,
Value: m.Value,
HistogramBucketLabel: m.HistogramBucketLabel,
StaticLabels: make(map[string]string),
VariableLabels: make(map[string]string),
Histogram: make(map[string]uint64),
}
for k, v := range m.StaticLabels {
metric.StaticLabels[k] = v
}
for k, v := range m.VariableLabels {
metric.VariableLabels[k] = v
}
for k, v := range m.Histogram {
metric.Histogram[k] = v
}
return metric
}
// MetricsGroup are a group of metrics that are initialized together.
type MetricsGroup struct {
Metrics []Metric
initialize func(ctx context.Context, m *MetricsGroup)
id string
cacheInterval time.Duration
cachedRead func(ctx context.Context, mg *MetricsGroup) []Metric
read func(ctx context.Context) []Metric
}
var metricsGroupCache = make(map[string]*timedValue)
var cacheLock sync.Mutex
func cachedRead(ctx context.Context, mg *MetricsGroup) (metrics []Metric) {
cacheLock.Lock()
defer cacheLock.Unlock()
v, ok := metricsGroupCache[mg.id]
if !ok {
interval := mg.cacheInterval
if interval == 0 {
interval = 30 * time.Second
}
v = &timedValue{}
v.Once.Do(func() {
v.Update = func() (interface{}, error) {
c := mg.read(ctx)
return c, nil
}
v.TTL = interval
})
metricsGroupCache[mg.id] = v
}
c, err := v.Get()
if err != nil {
return []Metric{}
}
m := c.([]Metric)
for i := range m {
metrics = append(metrics, m[i].copyMetric())
}
return metrics
}
// MetricsGenerator are functions that generate metric groups.
@ -674,8 +730,10 @@ func getMinIOProcessStartTimeMD() MetricDescription {
}
func getMinioProcMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "MinioProcMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
p, err := procfs.Self()
if err != nil {
logger.LogOnceIf(ctx, err, nodeMetricNamespace)
@ -708,70 +766,74 @@ func getMinioProcMetrics() MetricsGroup {
return
}
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioFDOpenMD(),
Value: float64(openFDs),
},
)
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioFDLimitMD(),
Value: float64(l.OpenFiles),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinIOProcessSysCallRMD(),
Value: float64(io.SyscR),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinIOProcessSysCallWMD(),
Value: float64(io.SyscW),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioProcessIOReadBytesMD(),
Value: float64(io.ReadBytes),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioProcessIOWriteBytesMD(),
Value: float64(io.WriteBytes),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioProcessIOReadCachedBytesMD(),
Value: float64(io.RChar),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinioProcessIOWriteCachedBytesMD(),
Value: float64(io.WChar),
})
metrics.Metrics = append(metrics.Metrics,
metrics = append(metrics,
Metric{
Description: getMinIOProcessStartTimeMD(),
Value: startTime,
})
return
},
}
}
func getGoMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
metrics.Metrics = append(metrics.Metrics, Metric{
id: "GoMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = append(metrics, Metric{
Description: getMinIOGORoutineCountMD(),
Value: float64(runtime.NumGoroutine()),
})
return
},
}
}
func getS3TTFBMetric() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "s3TTFBMetric",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
// Read prometheus metric on this channel
ch := make(chan prometheus.Metric)
@ -800,7 +862,7 @@ func getS3TTFBMetric() MetricsGroup {
VariableLabels: labels,
Value: float64(b.GetCumulativeCount()),
}
metrics.Metrics = append(metrics.Metrics, metric)
metrics = append(metrics, metric)
}
}
@ -809,53 +871,54 @@ func getS3TTFBMetric() MetricsGroup {
httpRequestsDuration.Collect(ch)
close(ch)
wg.Wait()
return
},
}
}
func getMinioVersionMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(_ context.Context, m *MetricsGroup) {
m.Metrics = append(m.Metrics, Metric{
id: "MinioVersionMetrics",
cachedRead: cachedRead,
read: func(_ context.Context) (metrics []Metric) {
metrics = append(metrics, Metric{
Description: getMinIOCommitMD(),
VariableLabels: map[string]string{"commit": CommitID},
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getMinIOVersionMD(),
VariableLabels: map[string]string{"version": Version},
})
return
},
}
}
func getNodeHealthMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{
{
Description: getNodeOnlineTotalMD(),
}, {
Description: getNodeOfflineTotalMD(),
},
},
initialize: func(_ context.Context, m *MetricsGroup) {
id: "NodeHealthMetrics",
cachedRead: cachedRead,
read: func(_ context.Context) (metrics []Metric) {
nodesUp, nodesDown := GetPeerOnlineCount()
for i := range m.Metrics {
switch {
case m.Metrics[i].Description.Name == onlineTotal:
m.Metrics[i].Value = float64(nodesUp)
case m.Metrics[i].Description.Name == offlineTotal:
m.Metrics[i].Value = float64(nodesDown)
}
}
metrics = append(metrics, Metric{
Description: getNodeOnlineTotalMD(),
Value: float64(nodesUp),
})
metrics = append(metrics, Metric{
Description: getNodeOfflineTotalMD(),
Value: float64(nodesDown),
})
return
},
}
}
func getMinioHealingMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(_ context.Context, m *MetricsGroup) {
id: "minioHealingMetrics",
cachedRead: cachedRead,
read: func(_ context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
if !globalIsErasure {
return
}
@ -867,13 +930,14 @@ func getMinioHealingMetrics() MetricsGroup {
if !bgSeq.lastHealActivity.IsZero() {
dur = time.Since(bgSeq.lastHealActivity)
}
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getHealLastActivityTimeMD(),
Value: float64(dur),
})
m.Metrics = append(m.Metrics, getObjectsScanned(bgSeq)...)
m.Metrics = append(m.Metrics, getScannedItems(bgSeq)...)
m.Metrics = append(m.Metrics, getFailedItems(bgSeq)...)
metrics = append(metrics, getObjectsScanned(bgSeq)...)
metrics = append(metrics, getScannedItems(bgSeq)...)
metrics = append(metrics, getFailedItems(bgSeq)...)
return
},
}
}
@ -919,118 +983,127 @@ func getObjectsScanned(seq *healSequence) (m []Metric) {
}
func getCacheMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, m *MetricsGroup) {
id: "CacheMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
cacheObjLayer := newCachedObjectLayerFn()
// Service not initialized yet
if cacheObjLayer == nil {
return
}
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheHitsTotalMD(),
Value: float64(cacheObjLayer.CacheStats().getHits()),
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheHitsMissedTotalMD(),
Value: float64(cacheObjLayer.CacheStats().getMisses()),
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheSentBytesMD(),
Value: float64(cacheObjLayer.CacheStats().getBytesServed()),
})
for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() {
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheUsagePercentMD(),
Value: float64(cdStats.UsagePercent),
VariableLabels: map[string]string{"disk": cdStats.Dir},
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheUsageInfoMD(),
Value: float64(cdStats.UsageState),
VariableLabels: map[string]string{"disk": cdStats.Dir, "level": cdStats.GetUsageLevelString()},
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheUsedBytesMD(),
Value: float64(cdStats.UsageSize),
VariableLabels: map[string]string{"disk": cdStats.Dir},
})
m.Metrics = append(m.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getCacheTotalBytesMD(),
Value: float64(cdStats.TotalCapacity),
VariableLabels: map[string]string{"disk": cdStats.Dir},
})
}
return
},
}
}
func getHTTPMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "httpMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
httpStats := globalHTTPStats.toServerHTTPStats()
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3RequestsInQueueMD(),
Value: float64(httpStats.S3RequestsInQueue),
})
for api, value := range httpStats.CurrentS3Requests.APIStats {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3RequestsInFlightMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Requests.APIStats {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3RequestsTotalMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Errors.APIStats {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3RequestsErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
return
},
}
}
func getNetworkMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
metrics.Metrics = append(metrics.Metrics, Metric{
id: "networkMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = append(metrics, Metric{
Description: getInternodeFailedRequests(),
Value: float64(loadAndResetRPCNetworkErrsCounter()),
})
connStats := globalConnStats.toServerConnStats()
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getInterNodeSentBytesMD(),
Value: float64(connStats.TotalOutputBytes),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getInterNodeReceivedBytesMD(),
Value: float64(connStats.TotalInputBytes),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3SentBytesMD(),
Value: float64(connStats.S3OutputBytes),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getS3ReceivedBytesMD(),
Value: float64(connStats.S3InputBytes),
})
return
},
}
}
func getBucketUsageMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "BucketUsageMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
@ -1052,42 +1125,42 @@ func getBucketUsageMetrics() MetricsGroup {
}
for bucket, usage := range dataUsageInfo.BucketsUsage {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketUsageTotalBytesMD(),
Value: float64(usage.Size),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketUsageObjectsTotalMD(),
Value: float64(usage.ObjectsCount),
VariableLabels: map[string]string{"bucket": bucket},
})
if usage.hasReplicationUsage() {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketRepPendingBytesMD(),
Value: float64(usage.ReplicationPendingSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketRepFailedBytesMD(),
Value: float64(usage.ReplicationFailedSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketRepSentBytesMD(),
Value: float64(usage.ReplicatedSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketRepReceivedBytesMD(),
Value: float64(usage.ReplicaSize),
VariableLabels: map[string]string{"bucket": bucket},
})
}
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getBucketObjectDistributionMD(),
Histogram: usage.ObjectSizesHistogram,
HistogramBucketLabel: "range",
@ -1095,13 +1168,16 @@ func getBucketUsageMetrics() MetricsGroup {
})
}
return
},
}
}
func getLocalStorageMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "localStorageMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
@ -1114,31 +1190,34 @@ func getLocalStorageMetrics() MetricsGroup {
storageInfo, _ := objLayer.LocalStorageInfo(ctx)
for _, disk := range storageInfo.Disks {
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getNodeDiskUsedBytesMD(),
Value: float64(disk.UsedSpace),
VariableLabels: map[string]string{"disk": disk.DrivePath},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getNodeDiskFreeBytesMD(),
Value: float64(disk.AvailableSpace),
VariableLabels: map[string]string{"disk": disk.DrivePath},
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getNodeDiskTotalBytesMD(),
Value: float64(disk.TotalSpace),
VariableLabels: map[string]string{"disk": disk.DrivePath},
})
}
return
},
}
}
func getClusterStorageMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
id: "ClusterStorageMetrics",
cachedRead: cachedRead,
read: func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0)
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
@ -1154,40 +1233,41 @@ func getClusterStorageMetrics() MetricsGroup {
onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks)
totalDisks := onlineDisks.Merge(offlineDisks)
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterCapacityTotalBytesMD(),
Value: float64(GetTotalCapacity(storageInfo.Disks)),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterCapacityFreeBytesMD(),
Value: float64(GetTotalCapacityFree(storageInfo.Disks)),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterCapacityUsageBytesMD(),
Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterCapacityUsageFreeBytesMD(),
Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterDisksOfflineTotalMD(),
Value: float64(offlineDisks.Sum()),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterDisksOnlineTotalMD(),
Value: float64(onlineDisks.Sum()),
})
metrics.Metrics = append(metrics.Metrics, Metric{
metrics = append(metrics, Metric{
Description: getClusterDisksTotalMD(),
Value: float64(totalDisks.Sum()),
})
return
},
}
}
@ -1312,9 +1392,9 @@ func (c *minioCollectorV2) Describe(ch chan<- *prometheus.Desc) {
func populateAndPublish(generatorFn func() []MetricsGenerator, publish func(m Metric) bool) {
generators := generatorFn()
for _, g := range generators {
metrics := g()
metrics.initialize(GlobalContext, &metrics)
for _, metric := range metrics.Metrics {
metricsGroup := g()
metrics := metricsGroup.cachedRead(GlobalContext, &metricsGroup)
for _, metric := range metrics {
if !publish(metric) {
return
}