feat: Add Metrics V3 API (#19068)

Metrics v3 is mainly a reorganization of metrics into smaller groups of
metrics and the removal of internal aggregation of metrics received from
peer nodes in a MinIO cluster.

This change adds the endpoint `/minio/metrics/v3` as the top-level metrics
endpoint and under this, various sub-endpoints are implemented. These
are currently documented in `docs/metrics/v3.md`

The handler will serve metrics at any path
`/minio/metrics/v3/PATH`, as follows:

when PATH is a sub-endpoint listed above => serves the group of
metrics under that path; or when PATH is a (non-empty) parent 
directory of the sub-endpoints listed above => serves metrics
from each child sub-endpoint of PATH. otherwise, returns a no 
resource found error

All available metrics are listed in the `docs/metrics/v3.md`. More will
be added subsequently.
This commit is contained in:
Aditya Manthramurthy 2024-03-10 01:15:15 -08:00 committed by GitHub
parent 2dfa9adc5d
commit b2c5b75efa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 2920 additions and 755 deletions

View File

@ -140,7 +140,7 @@ func (dui DataUsageInfo) tierStats() []madmin.TierInfo {
return infos return infos
} }
func (dui DataUsageInfo) tierMetrics() (metrics []Metric) { func (dui DataUsageInfo) tierMetrics() (metrics []MetricV2) {
if dui.TierStats == nil { if dui.TierStats == nil {
return nil return nil
} }
@ -148,17 +148,17 @@ func (dui DataUsageInfo) tierMetrics() (metrics []Metric) {
// minio_cluster_ilm_transitioned_objects{tier="S3TIER-1"}=1 // minio_cluster_ilm_transitioned_objects{tier="S3TIER-1"}=1
// minio_cluster_ilm_transitioned_versions{tier="S3TIER-1"}=3 // minio_cluster_ilm_transitioned_versions{tier="S3TIER-1"}=3
for tier, st := range dui.TierStats.Tiers { for tier, st := range dui.TierStats.Tiers {
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getClusterTransitionedBytesMD(), Description: getClusterTransitionedBytesMD(),
Value: float64(st.TotalSize), Value: float64(st.TotalSize),
VariableLabels: map[string]string{"tier": tier}, VariableLabels: map[string]string{"tier": tier},
}) })
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getClusterTransitionedObjectsMD(), Description: getClusterTransitionedObjectsMD(),
Value: float64(st.NumObjects), Value: float64(st.NumObjects),
VariableLabels: map[string]string{"tier": tier}, VariableLabels: map[string]string{"tier": tier},
}) })
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getClusterTransitionedVersionsMD(), Description: getClusterTransitionedVersionsMD(),
Value: float64(st.NumVersions), Value: float64(st.NumVersions),
VariableLabels: map[string]string{"tier": tier}, VariableLabels: map[string]string{"tier": tier},

View File

@ -233,7 +233,8 @@ func guessIsMetricsReq(req *http.Request) bool {
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ClusterPath || req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ClusterPath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2NodePath || req.URL.Path == minioReservedBucketPath+prometheusMetricsV2NodePath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2BucketPath || req.URL.Path == minioReservedBucketPath+prometheusMetricsV2BucketPath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ResourcePath req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ResourcePath ||
strings.HasPrefix(req.URL.Path, minioReservedBucketPath+metricsV3Path)
} }
// guessIsRPCReq - returns true if the request is for an RPC endpoint. // guessIsRPCReq - returns true if the request is for an RPC endpoint.

View File

@ -470,6 +470,7 @@ var (
// Indicates if server was started as `--address ":0"` // Indicates if server was started as `--address ":0"`
globalDynamicAPIPort bool globalDynamicAPIPort bool
// Add new variable global values here. // Add new variable global values here.
) )

View File

@ -269,6 +269,28 @@ func (s *bucketConnStats) getS3InOutBytes() map[string]inOutBytes {
return bucketStats return bucketStats
} }
// Return S3 total input/output bytes for each
func (s *bucketConnStats) getBucketS3InOutBytes(buckets []string) map[string]inOutBytes {
s.RLock()
defer s.RUnlock()
if len(s.stats) == 0 || len(buckets) == 0 {
return nil
}
bucketStats := make(map[string]inOutBytes, len(buckets))
for _, bucket := range buckets {
if stats, ok := s.stats[bucket]; ok {
bucketStats[bucket] = inOutBytes{
In: stats.s3InputBytes,
Out: stats.s3OutputBytes,
}
}
}
return bucketStats
}
// delete metrics once bucket is deleted. // delete metrics once bucket is deleted.
func (s *bucketConnStats) delete(bucket string) { func (s *bucketConnStats) delete(bucket string) {
s.Lock() s.Lock()

View File

@ -81,7 +81,7 @@ var (
resourceMetricsMapMu sync.RWMutex resourceMetricsMapMu sync.RWMutex
// resourceMetricsHelpMap maps metric name to its help string // resourceMetricsHelpMap maps metric name to its help string
resourceMetricsHelpMap map[MetricName]string resourceMetricsHelpMap map[MetricName]string
resourceMetricsGroups []*MetricsGroup resourceMetricsGroups []*MetricsGroupV2
// initial values for drives (at the time of server startup) // initial values for drives (at the time of server startup)
// used for calculating avg values for drive metrics // used for calculating avg values for drive metrics
latestDriveStats map[string]madmin.DiskIOStats latestDriveStats map[string]madmin.DiskIOStats
@ -164,7 +164,7 @@ func init() {
cpuLoad5Perc: "CPU load average 5min (percentage)", cpuLoad5Perc: "CPU load average 5min (percentage)",
cpuLoad15Perc: "CPU load average 15min (percentage)", cpuLoad15Perc: "CPU load average 15min (percentage)",
} }
resourceMetricsGroups = []*MetricsGroup{ resourceMetricsGroups = []*MetricsGroupV2{
getResourceMetrics(), getResourceMetrics(),
} }
@ -405,7 +405,7 @@ func startResourceMetricsCollection() {
// minioResourceCollector is the Collector for resource metrics // minioResourceCollector is the Collector for resource metrics
type minioResourceCollector struct { type minioResourceCollector struct {
metricsGroups []*MetricsGroup metricsGroups []*MetricsGroupV2
desc *prometheus.Desc desc *prometheus.Desc
} }
@ -417,7 +417,7 @@ func (c *minioResourceCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect is called by the Prometheus registry when collecting metrics. // Collect is called by the Prometheus registry when collecting metrics.
func (c *minioResourceCollector) Collect(out chan<- prometheus.Metric) { func (c *minioResourceCollector) Collect(out chan<- prometheus.Metric) {
var wg sync.WaitGroup var wg sync.WaitGroup
publish := func(in <-chan Metric) { publish := func(in <-chan MetricV2) {
defer wg.Done() defer wg.Done()
for metric := range in { for metric := range in {
labels, values := getOrderedLabelValueArrays(metric.VariableLabels) labels, values := getOrderedLabelValueArrays(metric.VariableLabels)
@ -436,18 +436,18 @@ func (c *minioResourceCollector) Collect(out chan<- prometheus.Metric) {
// and returns reference of minio resource Collector // and returns reference of minio resource Collector
// It creates the Prometheus Description which is used // It creates the Prometheus Description which is used
// to define Metric and help string // to define Metric and help string
func newMinioResourceCollector(metricsGroups []*MetricsGroup) *minioResourceCollector { func newMinioResourceCollector(metricsGroups []*MetricsGroupV2) *minioResourceCollector {
return &minioResourceCollector{ return &minioResourceCollector{
metricsGroups: metricsGroups, metricsGroups: metricsGroups,
desc: prometheus.NewDesc("minio_resource_stats", "Resource statistics exposed by MinIO server", nil, nil), desc: prometheus.NewDesc("minio_resource_stats", "Resource statistics exposed by MinIO server", nil, nil),
} }
} }
func prepareResourceMetrics(rm ResourceMetric, subSys MetricSubsystem, requireAvgMax bool) []Metric { func prepareResourceMetrics(rm ResourceMetric, subSys MetricSubsystem, requireAvgMax bool) []MetricV2 {
help := resourceMetricsHelpMap[rm.Name] help := resourceMetricsHelpMap[rm.Name]
name := rm.Name name := rm.Name
metrics := make([]Metric, 0, 3) metrics := make([]MetricV2, 0, 3)
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getResourceMetricDescription(subSys, name, help), Description: getResourceMetricDescription(subSys, name, help),
Value: rm.Current, Value: rm.Current,
VariableLabels: cloneMSS(rm.Labels), VariableLabels: cloneMSS(rm.Labels),
@ -456,7 +456,7 @@ func prepareResourceMetrics(rm ResourceMetric, subSys MetricSubsystem, requireAv
if requireAvgMax { if requireAvgMax {
avgName := MetricName(fmt.Sprintf("%s_avg", name)) avgName := MetricName(fmt.Sprintf("%s_avg", name))
avgHelp := fmt.Sprintf("%s (avg)", help) avgHelp := fmt.Sprintf("%s (avg)", help)
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getResourceMetricDescription(subSys, avgName, avgHelp), Description: getResourceMetricDescription(subSys, avgName, avgHelp),
Value: math.Round(rm.Avg*100) / 100, Value: math.Round(rm.Avg*100) / 100,
VariableLabels: cloneMSS(rm.Labels), VariableLabels: cloneMSS(rm.Labels),
@ -464,7 +464,7 @@ func prepareResourceMetrics(rm ResourceMetric, subSys MetricSubsystem, requireAv
maxName := MetricName(fmt.Sprintf("%s_max", name)) maxName := MetricName(fmt.Sprintf("%s_max", name))
maxHelp := fmt.Sprintf("%s (max)", help) maxHelp := fmt.Sprintf("%s (max)", help)
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: getResourceMetricDescription(subSys, maxName, maxHelp), Description: getResourceMetricDescription(subSys, maxName, maxHelp),
Value: rm.Max, Value: rm.Max,
VariableLabels: cloneMSS(rm.Labels), VariableLabels: cloneMSS(rm.Labels),
@ -484,12 +484,12 @@ func getResourceMetricDescription(subSys MetricSubsystem, name MetricName, help
} }
} }
func getResourceMetrics() *MetricsGroup { func getResourceMetrics() *MetricsGroupV2 {
mg := &MetricsGroup{ mg := &MetricsGroupV2{
cacheInterval: resourceMetricsCacheInterval, cacheInterval: resourceMetricsCacheInterval,
} }
mg.RegisterRead(func(ctx context.Context) []Metric { mg.RegisterRead(func(ctx context.Context) []MetricV2 {
metrics := []Metric{} metrics := []MetricV2{}
subSystems := []MetricSubsystem{interfaceSubsystem, memSubsystem, driveSubsystem, cpuSubsystem} subSystems := []MetricSubsystem{interfaceSubsystem, memSubsystem, driveSubsystem, cpuSubsystem}
resourceMetricsMapMu.RLock() resourceMetricsMapMu.RLock()

View File

@ -18,6 +18,7 @@
package cmd package cmd
import ( import (
"net/http"
"strings" "strings"
"github.com/minio/mux" "github.com/minio/mux"
@ -30,6 +31,9 @@ const (
prometheusMetricsV2BucketPath = "/v2/metrics/bucket" prometheusMetricsV2BucketPath = "/v2/metrics/bucket"
prometheusMetricsV2NodePath = "/v2/metrics/node" prometheusMetricsV2NodePath = "/v2/metrics/node"
prometheusMetricsV2ResourcePath = "/v2/metrics/resource" prometheusMetricsV2ResourcePath = "/v2/metrics/resource"
// Metrics v3 endpoints
metricsV3Path = "/metrics/v3"
) )
// Standard env prometheus auth type // Standard env prometheus auth type
@ -48,10 +52,10 @@ const (
func registerMetricsRouter(router *mux.Router) { func registerMetricsRouter(router *mux.Router) {
// metrics router // metrics router
metricsRouter := router.NewRoute().PathPrefix(minioReservedBucketPath).Subrouter() metricsRouter := router.NewRoute().PathPrefix(minioReservedBucketPath).Subrouter()
authType := strings.ToLower(env.Get(EnvPrometheusAuthType, string(prometheusJWT))) authType := prometheusAuthType(strings.ToLower(env.Get(EnvPrometheusAuthType, string(prometheusJWT))))
auth := AuthMiddleware auth := AuthMiddleware
if prometheusAuthType(authType) == prometheusPublic { if authType == prometheusPublic {
auth = NoAuthMiddleware auth = NoAuthMiddleware
} }
metricsRouter.Handle(prometheusMetricsPathLegacy, auth(metricsHandler())) metricsRouter.Handle(prometheusMetricsPathLegacy, auth(metricsHandler()))
@ -59,4 +63,11 @@ func registerMetricsRouter(router *mux.Router) {
metricsRouter.Handle(prometheusMetricsV2BucketPath, auth(metricsBucketHandler())) metricsRouter.Handle(prometheusMetricsV2BucketPath, auth(metricsBucketHandler()))
metricsRouter.Handle(prometheusMetricsV2NodePath, auth(metricsNodeHandler())) metricsRouter.Handle(prometheusMetricsV2NodePath, auth(metricsNodeHandler()))
metricsRouter.Handle(prometheusMetricsV2ResourcePath, auth(metricsResourceHandler())) metricsRouter.Handle(prometheusMetricsV2ResourcePath, auth(metricsResourceHandler()))
// Metrics v3!
metricsV3Server := newMetricsV3Server(authType)
// Register metrics v3 handler. It also accepts an optional query
// parameter `?list` - see handler for details.
metricsRouter.Methods(http.MethodGet).Path(metricsV3Path + "{pathComps:.*}").Handler(metricsV3Server)
} }

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,223 @@ import (
) )
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *Metric) MarshalMsg(b []byte) (o []byte, err error) { func (z *MetricDescription) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 5
// string "Namespace"
o = append(o, 0x85, 0xa9, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65)
o = msgp.AppendString(o, string(z.Namespace))
// string "Subsystem"
o = append(o, 0xa9, 0x53, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d)
o = msgp.AppendString(o, string(z.Subsystem))
// string "Name"
o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, string(z.Name))
// string "Help"
o = append(o, 0xa4, 0x48, 0x65, 0x6c, 0x70)
o = msgp.AppendString(o, z.Help)
// string "Type"
o = append(o, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = msgp.AppendString(o, string(z.Type))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricDescription) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Namespace":
{
var zb0002 string
zb0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Namespace")
return
}
z.Namespace = MetricNamespace(zb0002)
}
case "Subsystem":
{
var zb0003 string
zb0003, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Subsystem")
return
}
z.Subsystem = MetricSubsystem(zb0003)
}
case "Name":
{
var zb0004 string
zb0004, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Name")
return
}
z.Name = MetricName(zb0004)
}
case "Help":
z.Help, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Help")
return
}
case "Type":
{
var zb0005 string
zb0005, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Type")
return
}
z.Type = MetricTypeV2(zb0005)
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MetricDescription) Msgsize() (s int) {
s = 1 + 10 + msgp.StringPrefixSize + len(string(z.Namespace)) + 10 + msgp.StringPrefixSize + len(string(z.Subsystem)) + 5 + msgp.StringPrefixSize + len(string(z.Name)) + 5 + msgp.StringPrefixSize + len(z.Help) + 5 + msgp.StringPrefixSize + len(string(z.Type))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricName) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricName) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricName(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricName) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricNamespace) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricNamespace) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricNamespace(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricNamespace) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricSubsystem) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricSubsystem) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricSubsystem(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricSubsystem) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricTypeV2) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricTypeV2) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricTypeV2(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricTypeV2) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z *MetricV2) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 6 // map header, size 6
// string "Description" // string "Description"
@ -48,7 +264,7 @@ func (z *Metric) MarshalMsg(b []byte) (o []byte, err error) {
} }
// UnmarshalMsg implements msgp.Unmarshaler // UnmarshalMsg implements msgp.Unmarshaler
func (z *Metric) UnmarshalMsg(bts []byte) (o []byte, err error) { func (z *MetricV2) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte var field []byte
_ = field _ = field
var zb0001 uint32 var zb0001 uint32
@ -186,7 +402,7 @@ func (z *Metric) UnmarshalMsg(bts []byte) (o []byte, err error) {
} }
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Metric) Msgsize() (s int) { func (z *MetricV2) Msgsize() (s int) {
s = 1 + 12 + z.Description.Msgsize() + 13 + msgp.MapHeaderSize s = 1 + 12 + z.Description.Msgsize() + 13 + msgp.MapHeaderSize
if z.StaticLabels != nil { if z.StaticLabels != nil {
for za0001, za0002 := range z.StaticLabels { for za0001, za0002 := range z.StaticLabels {
@ -211,287 +427,6 @@ func (z *Metric) Msgsize() (s int) {
return return
} }
// MarshalMsg implements msgp.Marshaler
func (z *MetricDescription) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 5
// string "Namespace"
o = append(o, 0x85, 0xa9, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65)
o = msgp.AppendString(o, string(z.Namespace))
// string "Subsystem"
o = append(o, 0xa9, 0x53, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d)
o = msgp.AppendString(o, string(z.Subsystem))
// string "Name"
o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, string(z.Name))
// string "Help"
o = append(o, 0xa4, 0x48, 0x65, 0x6c, 0x70)
o = msgp.AppendString(o, z.Help)
// string "Type"
o = append(o, 0xa4, 0x54, 0x79, 0x70, 0x65)
o = msgp.AppendString(o, string(z.Type))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricDescription) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Namespace":
{
var zb0002 string
zb0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Namespace")
return
}
z.Namespace = MetricNamespace(zb0002)
}
case "Subsystem":
{
var zb0003 string
zb0003, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Subsystem")
return
}
z.Subsystem = MetricSubsystem(zb0003)
}
case "Name":
{
var zb0004 string
zb0004, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Name")
return
}
z.Name = MetricName(zb0004)
}
case "Help":
z.Help, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Help")
return
}
case "Type":
{
var zb0005 string
zb0005, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Type")
return
}
z.Type = MetricType(zb0005)
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MetricDescription) Msgsize() (s int) {
s = 1 + 10 + msgp.StringPrefixSize + len(string(z.Namespace)) + 10 + msgp.StringPrefixSize + len(string(z.Subsystem)) + 5 + msgp.StringPrefixSize + len(string(z.Name)) + 5 + msgp.StringPrefixSize + len(z.Help) + 5 + msgp.StringPrefixSize + len(string(z.Type))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricName) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricName) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricName(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricName) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricNamespace) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricNamespace) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricNamespace(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricNamespace) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricSubsystem) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricSubsystem) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricSubsystem(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricSubsystem) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z MetricType) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendString(o, string(z))
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricType) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var zb0001 string
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
(*z) = MetricType(zb0001)
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MetricType) Msgsize() (s int) {
s = msgp.StringPrefixSize + len(string(z))
return
}
// MarshalMsg implements msgp.Marshaler
func (z *MetricsGroup) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "cacheInterval"
o = append(o, 0x82, 0xad, 0x63, 0x61, 0x63, 0x68, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c)
o = msgp.AppendDuration(o, z.cacheInterval)
// string "metricsGroupOpts"
o = append(o, 0xb0, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x4f, 0x70, 0x74, 0x73)
o, err = z.metricsGroupOpts.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "metricsGroupOpts")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricsGroup) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "cacheInterval":
z.cacheInterval, bts, err = msgp.ReadDurationBytes(bts)
if err != nil {
err = msgp.WrapError(err, "cacheInterval")
return
}
case "metricsGroupOpts":
bts, err = z.metricsGroupOpts.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "metricsGroupOpts")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MetricsGroup) Msgsize() (s int) {
s = 1 + 14 + msgp.DurationSize + 17 + z.metricsGroupOpts.Msgsize()
return
}
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *MetricsGroupOpts) MarshalMsg(b []byte) (o []byte, err error) { func (z *MetricsGroupOpts) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
@ -642,3 +577,68 @@ func (z *MetricsGroupOpts) Msgsize() (s int) {
s = 1 + 22 + msgp.BoolSize + 24 + msgp.BoolSize + 31 + msgp.BoolSize + 28 + msgp.BoolSize + 16 + msgp.BoolSize + 11 + msgp.BoolSize + 29 + msgp.BoolSize + 19 + msgp.BoolSize + 23 + msgp.BoolSize + 26 + msgp.BoolSize + 32 + msgp.BoolSize + 22 + msgp.BoolSize s = 1 + 22 + msgp.BoolSize + 24 + msgp.BoolSize + 31 + msgp.BoolSize + 28 + msgp.BoolSize + 16 + msgp.BoolSize + 11 + msgp.BoolSize + 29 + msgp.BoolSize + 19 + msgp.BoolSize + 23 + msgp.BoolSize + 26 + msgp.BoolSize + 32 + msgp.BoolSize + 22 + msgp.BoolSize
return return
} }
// MarshalMsg implements msgp.Marshaler
func (z *MetricsGroupV2) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "cacheInterval"
o = append(o, 0x82, 0xad, 0x63, 0x61, 0x63, 0x68, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c)
o = msgp.AppendDuration(o, z.cacheInterval)
// string "metricsGroupOpts"
o = append(o, 0xb0, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x4f, 0x70, 0x74, 0x73)
o, err = z.metricsGroupOpts.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "metricsGroupOpts")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MetricsGroupV2) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "cacheInterval":
z.cacheInterval, bts, err = msgp.ReadDurationBytes(bts)
if err != nil {
err = msgp.WrapError(err, "cacheInterval")
return
}
case "metricsGroupOpts":
bts, err = z.metricsGroupOpts.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "metricsGroupOpts")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MetricsGroupV2) Msgsize() (s int) {
s = 1 + 14 + msgp.DurationSize + 17 + z.metricsGroupOpts.Msgsize()
return
}

View File

@ -8,64 +8,6 @@ import (
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
) )
func TestMarshalUnmarshalMetric(t *testing.T) {
v := Metric{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgMetric(b *testing.B) {
v := Metric{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgMetric(b *testing.B) {
v := Metric{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalMetric(b *testing.B) {
v := Metric{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalMetricDescription(t *testing.T) { func TestMarshalUnmarshalMetricDescription(t *testing.T) {
v := MetricDescription{} v := MetricDescription{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)
@ -124,8 +66,8 @@ func BenchmarkUnmarshalMetricDescription(b *testing.B) {
} }
} }
func TestMarshalUnmarshalMetricsGroup(t *testing.T) { func TestMarshalUnmarshalMetricV2(t *testing.T) {
v := MetricsGroup{} v := MetricV2{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -147,8 +89,8 @@ func TestMarshalUnmarshalMetricsGroup(t *testing.T) {
} }
} }
func BenchmarkMarshalMsgMetricsGroup(b *testing.B) { func BenchmarkMarshalMsgMetricV2(b *testing.B) {
v := MetricsGroup{} v := MetricV2{}
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -156,8 +98,8 @@ func BenchmarkMarshalMsgMetricsGroup(b *testing.B) {
} }
} }
func BenchmarkAppendMsgMetricsGroup(b *testing.B) { func BenchmarkAppendMsgMetricV2(b *testing.B) {
v := MetricsGroup{} v := MetricV2{}
bts := make([]byte, 0, v.Msgsize()) bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0]) bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts))) b.SetBytes(int64(len(bts)))
@ -168,8 +110,8 @@ func BenchmarkAppendMsgMetricsGroup(b *testing.B) {
} }
} }
func BenchmarkUnmarshalMetricsGroup(b *testing.B) { func BenchmarkUnmarshalMetricV2(b *testing.B) {
v := MetricsGroup{} v := MetricV2{}
bts, _ := v.MarshalMsg(nil) bts, _ := v.MarshalMsg(nil)
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(len(bts))) b.SetBytes(int64(len(bts)))
@ -239,3 +181,61 @@ func BenchmarkUnmarshalMetricsGroupOpts(b *testing.B) {
} }
} }
} }
func TestMarshalUnmarshalMetricsGroupV2(t *testing.T) {
v := MetricsGroupV2{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgMetricsGroupV2(b *testing.B) {
v := MetricsGroupV2{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgMetricsGroupV2(b *testing.B) {
v := MetricsGroupV2{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalMetricsGroupV2(b *testing.B) {
v := MetricsGroupV2{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}

220
cmd/metrics-v3-api.go Normal file
View File

@ -0,0 +1,220 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
)
const (
apiRejectedAuthTotal MetricName = "rejected_auth_total"
apiRejectedHeaderTotal MetricName = "rejected_header_total"
apiRejectedTimestampTotal MetricName = "rejected_timestamp_total"
apiRejectedInvalidTotal MetricName = "rejected_invalid_total"
apiRequestsWaitingTotal MetricName = "waiting_total"
apiRequestsIncomingTotal MetricName = "incoming_total"
apiRequestsInFlightTotal MetricName = "inflight_total"
apiRequestsTotal MetricName = "total"
apiRequestsErrorsTotal MetricName = "errors_total"
apiRequests5xxErrorsTotal MetricName = "5xx_errors_total"
apiRequests4xxErrorsTotal MetricName = "4xx_errors_total"
apiRequestsCanceledTotal MetricName = "canceled_total"
apiRequestsTTFBSecondsDistribution MetricName = "ttfb_seconds_distribution"
apiTrafficSentBytes MetricName = "traffic_sent_bytes"
apiTrafficRecvBytes MetricName = "traffic_received_bytes"
)
var (
apiRejectedAuthTotalMD = NewCounterMD(apiRejectedAuthTotal,
"Total number of requests rejected for auth failure", "type")
apiRejectedHeaderTotalMD = NewCounterMD(apiRejectedHeaderTotal,
"Total number of requests rejected for invalid header", "type")
apiRejectedTimestampTotalMD = NewCounterMD(apiRejectedTimestampTotal,
"Total number of requests rejected for invalid timestamp", "type")
apiRejectedInvalidTotalMD = NewCounterMD(apiRejectedInvalidTotal,
"Total number of invalid requests", "type")
apiRequestsWaitingTotalMD = NewGaugeMD(apiRequestsWaitingTotal,
"Total number of requests in the waiting queue", "type")
apiRequestsIncomingTotalMD = NewGaugeMD(apiRequestsIncomingTotal,
"Total number of incoming requests", "type")
apiRequestsInFlightTotalMD = NewGaugeMD(apiRequestsInFlightTotal,
"Total number of requests currently in flight", "name", "type")
apiRequestsTotalMD = NewCounterMD(apiRequestsTotal,
"Total number of requests", "name", "type")
apiRequestsErrorsTotalMD = NewCounterMD(apiRequestsErrorsTotal,
"Total number of requests with (4xx and 5xx) errors", "name", "type")
apiRequests5xxErrorsTotalMD = NewCounterMD(apiRequests5xxErrorsTotal,
"Total number of requests with 5xx errors", "name", "type")
apiRequests4xxErrorsTotalMD = NewCounterMD(apiRequests4xxErrorsTotal,
"Total number of requests with 4xx errors", "name", "type")
apiRequestsCanceledTotalMD = NewCounterMD(apiRequestsCanceledTotal,
"Total number of requests canceled by the client", "name", "type")
apiRequestsTTFBSecondsDistributionMD = NewCounterMD(apiRequestsTTFBSecondsDistribution,
"Distribution of time to first byte across API calls", "name", "type", "le")
apiTrafficSentBytesMD = NewCounterMD(apiTrafficSentBytes,
"Total number of bytes sent", "type")
apiTrafficRecvBytesMD = NewCounterMD(apiTrafficRecvBytes,
"Total number of bytes received", "type")
)
// loadAPIRequestsHTTPMetrics - reads S3 HTTP metrics.
//
// This is a `MetricsLoaderFn`.
//
// This includes node level S3 HTTP metrics.
//
// This function currently ignores `opts`.
func loadAPIRequestsHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
// Collect node level S3 HTTP metrics.
httpStats := globalHTTPStats.toServerHTTPStats(false)
// Currently we only collect S3 API related stats, so we set the "type"
// label to "s3".
m.Set(apiRejectedAuthTotal, float64(httpStats.TotalS3RejectedAuth), "type", "s3")
m.Set(apiRejectedTimestampTotal, float64(httpStats.TotalS3RejectedTime), "type", "s3")
m.Set(apiRejectedHeaderTotal, float64(httpStats.TotalS3RejectedHeader), "type", "s3")
m.Set(apiRejectedInvalidTotal, float64(httpStats.TotalS3RejectedInvalid), "type", "s3")
m.Set(apiRequestsWaitingTotal, float64(httpStats.S3RequestsInQueue), "type", "s3")
m.Set(apiRequestsIncomingTotal, float64(httpStats.S3RequestsIncoming), "type", "s3")
for name, value := range httpStats.CurrentS3Requests.APIStats {
m.Set(apiRequestsInFlightTotal, float64(value), "name", name, "type", "s3")
}
for name, value := range httpStats.TotalS3Requests.APIStats {
m.Set(apiRequestsTotal, float64(value), "name", name, "type", "s3")
}
for name, value := range httpStats.TotalS3Errors.APIStats {
m.Set(apiRequestsErrorsTotal, float64(value), "name", name, "type", "s3")
}
for name, value := range httpStats.TotalS35xxErrors.APIStats {
m.Set(apiRequests5xxErrorsTotal, float64(value), "name", name, "type", "s3")
}
for name, value := range httpStats.TotalS34xxErrors.APIStats {
m.Set(apiRequests4xxErrorsTotal, float64(value), "name", name, "type", "s3")
}
for name, value := range httpStats.TotalS3Canceled.APIStats {
m.Set(apiRequestsCanceledTotal, float64(value), "name", name, "type", "s3")
}
return nil
}
// loadAPIRequestsTTFBMetrics - loads S3 TTFB metrics.
//
// This is a `MetricsLoaderFn`.
func loadAPIRequestsTTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
renameLabels := map[string]string{"api": "name"}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, httpRequestsDuration, renameLabels, nil,
"type", "s3")
return nil
}
// loadAPIRequestsNetworkMetrics - loads S3 network metrics.
//
// This is a `MetricsLoaderFn`.
func loadAPIRequestsNetworkMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
connStats := globalConnStats.toServerConnStats()
m.Set(apiTrafficSentBytes, float64(connStats.s3OutputBytes), "type", "s3")
m.Set(apiTrafficRecvBytes, float64(connStats.s3InputBytes), "type", "s3")
return nil
}
// Metric Descriptions for bucket level S3 metrics.
var (
apiBucketTrafficSentBytesMD = NewCounterMD(apiTrafficSentBytes,
"Total number of bytes received for a bucket", "bucket", "type")
apiBucketTrafficRecvBytesMD = NewCounterMD(apiTrafficRecvBytes,
"Total number of bytes sent for a bucket", "bucket", "type")
apiBucketRequestsInFlightMD = NewGaugeMD(apiRequestsInFlightTotal,
"Total number of requests currently in flight for a bucket", "bucket", "name", "type")
apiBucketRequestsTotalMD = NewCounterMD(apiRequestsTotal,
"Total number of requests for a bucket", "bucket", "name", "type")
apiBucketRequestsCanceledMD = NewCounterMD(apiRequestsCanceledTotal,
"Total number of requests canceled by the client for a bucket", "bucket", "name", "type")
apiBucketRequests4xxErrorsMD = NewCounterMD(apiRequests4xxErrorsTotal,
"Total number of requests with 4xx errors for a bucket", "bucket", "name", "type")
apiBucketRequests5xxErrorsMD = NewCounterMD(apiRequests5xxErrorsTotal,
"Total number of requests with 5xx errors for a bucket", "bucket", "name", "type")
apiBucketRequestsTTFBSecondsDistributionMD = NewCounterMD(apiRequestsTTFBSecondsDistribution,
"Distribution of time to first byte across API calls for a bucket",
"bucket", "name", "le", "type")
)
// loadAPIBucketHTTPMetrics - loads bucket level S3 HTTP metrics.
//
// This is a `MetricsLoaderFn`.
//
// This includes bucket level S3 HTTP metrics and S3 network in/out metrics.
func loadAPIBucketHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsCache, buckets []string) error {
if len(buckets) == 0 {
return nil
}
for bucket, inOut := range globalBucketConnStats.getBucketS3InOutBytes(buckets) {
recvBytes := inOut.In
if recvBytes > 0 {
m.Set(apiTrafficSentBytes, float64(recvBytes), "bucket", bucket, "type", "s3")
}
sentBytes := inOut.Out
if sentBytes > 0 {
m.Set(apiTrafficRecvBytes, float64(sentBytes), "bucket", bucket, "type", "s3")
}
httpStats := globalBucketHTTPStats.load(bucket)
for k, v := range httpStats.currentS3Requests.Load(false) {
m.Set(apiRequestsInFlightTotal, float64(v), "bucket", bucket, "name", k, "type", "s3")
}
for k, v := range httpStats.totalS3Requests.Load(false) {
m.Set(apiRequestsTotal, float64(v), "bucket", bucket, "name", k, "type", "s3")
}
for k, v := range httpStats.totalS3Canceled.Load(false) {
m.Set(apiRequestsCanceledTotal, float64(v), "bucket", bucket, "name", k, "type", "s3")
}
for k, v := range httpStats.totalS34xxErrors.Load(false) {
m.Set(apiRequests4xxErrorsTotal, float64(v), "bucket", bucket, "name", k, "type", "s3")
}
for k, v := range httpStats.totalS35xxErrors.Load(false) {
m.Set(apiRequests5xxErrorsTotal, float64(v), "bucket", bucket, "name", k, "type", "s3")
}
}
return nil
}
// loadAPIBucketTTFBMetrics - loads bucket S3 TTFB metrics.
//
// This is a `MetricsLoaderFn`.
func loadAPIBucketTTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache, buckets []string) error {
renameLabels := map[string]string{"api": "name"}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, bucketHTTPRequestsDuration, renameLabels,
buckets, "type", "s3")
return nil
}

145
cmd/metrics-v3-cache.go Normal file
View File

@ -0,0 +1,145 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"time"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/cachevalue"
)
// metricsCache - cache for metrics.
//
// When serving metrics, this cache is passed to the MetricsLoaderFn.
//
// This cache is used for metrics that would result in network/storage calls.
type metricsCache struct {
dataUsageInfo *cachevalue.Cache[DataUsageInfo]
esetHealthResult *cachevalue.Cache[HealthResult]
driveMetrics *cachevalue.Cache[storageMetrics]
clusterDriveMetrics *cachevalue.Cache[storageMetrics]
nodesUpDown *cachevalue.Cache[nodesOnline]
}
func newMetricsCache() *metricsCache {
return &metricsCache{
dataUsageInfo: newDataUsageInfoCache(),
esetHealthResult: newESetHealthResultCache(),
driveMetrics: newDriveMetricsCache(),
clusterDriveMetrics: newClusterStorageInfoCache(),
nodesUpDown: newNodesUpDownCache(),
}
}
type nodesOnline struct {
Online, Offline int
}
func newNodesUpDownCache() *cachevalue.Cache[nodesOnline] {
loadNodesUpDown := func() (v nodesOnline, err error) {
v.Online, v.Offline = globalNotificationSys.GetPeerOnlineCount()
return
}
return cachevalue.NewFromFunc(1*time.Minute,
cachevalue.Opts{ReturnLastGood: true},
loadNodesUpDown)
}
type storageMetrics struct {
storageInfo madmin.StorageInfo
onlineDrives, offlineDrives, totalDrives int
}
func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
loadDataUsage := func() (u DataUsageInfo, err error) {
objLayer := newObjectLayerFn()
if objLayer == nil {
return
}
// Collect cluster level object metrics.
u, err = loadDataUsageFromBackend(GlobalContext, objLayer)
return
}
return cachevalue.NewFromFunc(1*time.Minute,
cachevalue.Opts{ReturnLastGood: true},
loadDataUsage)
}
func newESetHealthResultCache() *cachevalue.Cache[HealthResult] {
loadHealth := func() (r HealthResult, err error) {
objLayer := newObjectLayerFn()
if objLayer == nil {
return
}
r = objLayer.Health(GlobalContext, HealthOptions{})
return
}
return cachevalue.NewFromFunc(1*time.Minute,
cachevalue.Opts{ReturnLastGood: true},
loadHealth,
)
}
func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] {
loadDriveMetrics := func() (v storageMetrics, err error) {
objLayer := newObjectLayerFn()
if objLayer == nil {
return
}
storageInfo := objLayer.LocalStorageInfo(GlobalContext, true)
onlineDrives, offlineDrives := getOnlineOfflineDisksStats(storageInfo.Disks)
totalDrives := onlineDrives.Merge(offlineDrives)
v = storageMetrics{
storageInfo: storageInfo,
onlineDrives: onlineDrives.Sum(),
offlineDrives: offlineDrives.Sum(),
totalDrives: totalDrives.Sum(),
}
return
}
return cachevalue.NewFromFunc(1*time.Minute,
cachevalue.Opts{ReturnLastGood: true},
loadDriveMetrics)
}
func newClusterStorageInfoCache() *cachevalue.Cache[storageMetrics] {
loadStorageInfo := func() (v storageMetrics, err error) {
objLayer := newObjectLayerFn()
if objLayer == nil {
return storageMetrics{}, nil
}
storageInfo := objLayer.StorageInfo(GlobalContext, true)
onlineDrives, offlineDrives := getOnlineOfflineDisksStats(storageInfo.Disks)
totalDrives := onlineDrives.Merge(offlineDrives)
v = storageMetrics{
storageInfo: storageInfo,
onlineDrives: onlineDrives.Sum(),
offlineDrives: offlineDrives.Sum(),
totalDrives: totalDrives.Sum(),
}
return
}
return cachevalue.NewFromFunc(1*time.Minute,
cachevalue.Opts{ReturnLastGood: true},
loadStorageInfo,
)
}

View File

@ -0,0 +1,89 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"strconv"
)
const (
erasureSetOverallWriteQuorum = "overall_write_quorum"
erasureSetOverallHealth = "overall_health"
erasureSetReadQuorum = "read_quorum"
erasureSetWriteQuorum = "write_quorum"
erasureSetOnlineDrivesCount = "online_drives_count"
erasureSetHealingDrivesCount = "healing_drives_count"
erasureSetHealth = "health"
)
const (
poolIDL = "pool_id"
setIDL = "set_id"
)
var (
erasureSetOverallWriteQuorumMD = NewGaugeMD(erasureSetOverallWriteQuorum,
"Overall write quorum across pools and sets")
erasureSetOverallHealthMD = NewGaugeMD(erasureSetOverallHealth,
"Overall health across pools and sets (1=healthy, 0=unhealthy)")
erasureSetReadQuorumMD = NewGaugeMD(erasureSetReadQuorum,
"Read quorum for the erasure set in a pool", poolIDL, setIDL)
erasureSetWriteQuorumMD = NewGaugeMD(erasureSetWriteQuorum,
"Write quorum for the erasure set in a pool", poolIDL, setIDL)
erasureSetOnlineDrivesCountMD = NewGaugeMD(erasureSetOnlineDrivesCount,
"Count of online drives in the erasure set in a pool", poolIDL, setIDL)
erasureSetHealingDrivesCountMD = NewGaugeMD(erasureSetHealingDrivesCount,
"Count of healing drives in the erasure set in a pool", poolIDL, setIDL)
erasureSetHealthMD = NewGaugeMD(erasureSetHealth,
"Health of the erasure set in a pool (1=healthy, 0=unhealthy)",
poolIDL, setIDL)
)
func b2f(v bool) float64 {
if v {
return 1
}
return 0
}
// loadClusterErasureSetMetrics - `MetricsLoaderFn` for cluster storage erasure
// set metrics.
func loadClusterErasureSetMetrics(ctx context.Context, m MetricValues, c *metricsCache) error {
result, _ := c.esetHealthResult.Get()
m.Set(erasureSetOverallWriteQuorum, float64(result.WriteQuorum))
m.Set(erasureSetOverallHealth, b2f(result.Healthy))
for _, h := range result.ESHealth {
poolLV := strconv.Itoa(h.PoolID)
setLV := strconv.Itoa(h.SetID)
m.Set(erasureSetReadQuorum, float64(h.ReadQuorum),
poolIDL, poolLV, setIDL, setLV)
m.Set(erasureSetWriteQuorum, float64(h.WriteQuorum),
poolIDL, poolLV, setIDL, setLV)
m.Set(erasureSetOnlineDrivesCount, float64(h.HealthyDrives),
poolIDL, poolLV, setIDL, setLV)
m.Set(erasureSetHealingDrivesCount, float64(h.HealingDrives),
poolIDL, poolLV, setIDL, setLV)
m.Set(erasureSetHealth, b2f(h.Healthy),
poolIDL, poolLV, setIDL, setLV)
}
return nil
}

View File

@ -0,0 +1,109 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import "context"
const (
healthDrivesOfflineCount = "drives_offline_count"
healthDrivesOnlineCount = "drives_online_count"
healthDrivesCount = "drives_count"
)
var (
healthDrivesOfflineCountMD = NewGaugeMD(healthDrivesOfflineCount,
"Count of offline drives in the cluster")
healthDrivesOnlineCountMD = NewGaugeMD(healthDrivesOnlineCount,
"Count of online drives in the cluster")
healthDrivesCountMD = NewGaugeMD(healthDrivesCount,
"Count of all drives in the cluster")
)
// loadClusterHealthDriveMetrics - `MetricsLoaderFn` for cluster storage drive metrics
// such as online, offline and total drives.
func loadClusterHealthDriveMetrics(ctx context.Context, m MetricValues,
c *metricsCache,
) error {
clusterDriveMetrics, _ := c.clusterDriveMetrics.Get()
m.Set(healthDrivesOfflineCount, float64(clusterDriveMetrics.offlineDrives))
m.Set(healthDrivesOnlineCount, float64(clusterDriveMetrics.onlineDrives))
m.Set(healthDrivesCount, float64(clusterDriveMetrics.totalDrives))
return nil
}
const (
healthNodesOfflineCount = "nodes_offline_count"
healthNodesOnlineCount = "nodes_online_count"
)
var (
healthNodesOfflineCountMD = NewGaugeMD(healthNodesOfflineCount,
"Count of offline nodes in the cluster")
healthNodesOnlineCountMD = NewGaugeMD(healthNodesOnlineCount,
"Count of online nodes in the cluster")
)
// loadClusterHealthNodeMetrics - `MetricsLoaderFn` for cluster health node
// metrics.
func loadClusterHealthNodeMetrics(ctx context.Context, m MetricValues,
c *metricsCache,
) error {
nodesUpDown, _ := c.nodesUpDown.Get()
m.Set(healthNodesOfflineCount, float64(nodesUpDown.Offline))
m.Set(healthNodesOnlineCount, float64(nodesUpDown.Online))
return nil
}
const (
healthCapacityRawTotalBytes = "capacity_raw_total_bytes"
healthCapacityRawFreeBytes = "capacity_raw_free_bytes"
healthCapacityUsableTotalBytes = "capacity_usable_total_bytes"
healthCapacityUsableFreeBytes = "capacity_usable_free_bytes"
)
var (
healthCapacityRawTotalBytesMD = NewGaugeMD(healthCapacityRawTotalBytes,
"Total cluster raw storage capacity in bytes")
healthCapacityRawFreeBytesMD = NewGaugeMD(healthCapacityRawFreeBytes,
"Total cluster raw storage free in bytes")
healthCapacityUsableTotalBytesMD = NewGaugeMD(healthCapacityUsableTotalBytes,
"Total cluster usable storage capacity in bytes")
healthCapacityUsableFreeBytesMD = NewGaugeMD(healthCapacityUsableFreeBytes,
"Total cluster usable storage free in bytes")
)
// loadClusterHealthCapacityMetrics - `MetricsLoaderFn` for cluster storage
// capacity metrics.
func loadClusterHealthCapacityMetrics(ctx context.Context, m MetricValues,
c *metricsCache,
) error {
clusterDriveMetrics, _ := c.clusterDriveMetrics.Get()
storageInfo := clusterDriveMetrics.storageInfo
m.Set(healthCapacityRawTotalBytes, float64(GetTotalCapacity(storageInfo.Disks)))
m.Set(healthCapacityRawFreeBytes, float64(GetTotalCapacityFree(storageInfo.Disks)))
m.Set(healthCapacityUsableTotalBytes, float64(GetTotalUsableCapacity(storageInfo.Disks, storageInfo)))
m.Set(healthCapacityUsableFreeBytes, float64(GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo)))
return nil
}

View File

@ -0,0 +1,189 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"time"
"github.com/minio/minio/internal/logger"
)
const (
usageSinceLastUpdateSeconds = "since_last_update_seconds"
usageTotalBytes = "total_bytes"
usageObjectsCount = "count"
usageVersionsCount = "versions_count"
usageDeleteMarkersCount = "delete_markers_count"
usageBucketsCount = "buckets_count"
usageSizeDistribution = "size_distribution"
usageVersionCountDistribution = "version_count_distribution"
)
var (
usageSinceLastUpdateSecondsMD = NewGaugeMD(usageSinceLastUpdateSeconds,
"Time since last update of usage metrics in seconds")
usageTotalBytesMD = NewGaugeMD(usageTotalBytes,
"Total cluster usage in bytes")
usageObjectsCountMD = NewGaugeMD(usageObjectsCount,
"Total cluster objects count")
usageVersionsCountMD = NewGaugeMD(usageVersionsCount,
"Total cluster object versions (including delete markers) count")
usageDeleteMarkersCountMD = NewGaugeMD(usageDeleteMarkersCount,
"Total cluster delete markers count")
usageBucketsCountMD = NewGaugeMD(usageBucketsCount,
"Total cluster buckets count")
usageObjectsDistributionMD = NewGaugeMD(usageSizeDistribution,
"Cluster object size distribution", "range")
usageVersionsDistributionMD = NewGaugeMD(usageVersionCountDistribution,
"Cluster object version count distribution", "range")
)
// loadClusterUsageObjectMetrics - reads cluster usage metrics.
//
// This is a `MetricsLoaderFn`.
func loadClusterUsageObjectMetrics(ctx context.Context, m MetricValues, c *metricsCache) error {
dataUsageInfo, err := c.dataUsageInfo.Get()
if err != nil {
logger.LogIf(ctx, err)
return nil
}
// data usage has not captured any data yet.
if dataUsageInfo.LastUpdate.IsZero() {
return nil
}
var (
clusterSize uint64
clusterBuckets uint64
clusterObjectsCount uint64
clusterVersionsCount uint64
clusterDeleteMarkersCount uint64
)
clusterObjectSizesHistogram := map[string]uint64{}
clusterVersionsHistogram := map[string]uint64{}
for _, usage := range dataUsageInfo.BucketsUsage {
clusterBuckets++
clusterSize += usage.Size
clusterObjectsCount += usage.ObjectsCount
clusterVersionsCount += usage.VersionsCount
clusterDeleteMarkersCount += usage.DeleteMarkersCount
for k, v := range usage.ObjectSizesHistogram {
clusterObjectSizesHistogram[k] += v
}
for k, v := range usage.ObjectVersionsHistogram {
clusterVersionsHistogram[k] += v
}
}
m.Set(usageSinceLastUpdateSeconds, time.Since(dataUsageInfo.LastUpdate).Seconds())
m.Set(usageTotalBytes, float64(clusterSize))
m.Set(usageObjectsCount, float64(clusterObjectsCount))
m.Set(usageVersionsCount, float64(clusterVersionsCount))
m.Set(usageDeleteMarkersCount, float64(clusterDeleteMarkersCount))
m.Set(usageBucketsCount, float64(clusterBuckets))
for k, v := range clusterObjectSizesHistogram {
m.Set(usageSizeDistribution, float64(v), "range", k)
}
for k, v := range clusterVersionsHistogram {
m.Set(usageVersionCountDistribution, float64(v), "range", k)
}
return nil
}
const (
usageBucketQuotaTotalBytes = "quota_total_bytes"
usageBucketTotalBytes = "total_bytes"
usageBucketObjectsCount = "objects_count"
usageBucketVersionsCount = "versions_count"
usageBucketDeleteMarkersCount = "delete_markers_count"
usageBucketObjectSizeDistribution = "object_size_distribution"
usageBucketObjectVersionCountDistribution = "object_version_count_distribution"
)
var (
usageBucketTotalBytesMD = NewGaugeMD(usageBucketTotalBytes,
"Total bucket size in bytes", "bucket")
usageBucketObjectsTotalMD = NewGaugeMD(usageBucketObjectsCount,
"Total objects count in bucket", "bucket")
usageBucketVersionsCountMD = NewGaugeMD(usageBucketVersionsCount,
"Total object versions (including delete markers) count in bucket", "bucket")
usageBucketDeleteMarkersCountMD = NewGaugeMD(usageBucketDeleteMarkersCount,
"Total delete markers count in bucket", "bucket")
usageBucketQuotaTotalBytesMD = NewGaugeMD(usageBucketQuotaTotalBytes,
"Total bucket quota in bytes", "bucket")
usageBucketObjectSizeDistributionMD = NewGaugeMD(usageBucketObjectSizeDistribution,
"Bucket object size distribution", "range", "bucket")
usageBucketObjectVersionCountDistributionMD = NewGaugeMD(
usageBucketObjectVersionCountDistribution,
"Bucket object version count distribution", "range", "bucket")
)
// loadClusterUsageBucketMetrics - `MetricsLoaderFn` to load bucket usage metrics.
func loadClusterUsageBucketMetrics(ctx context.Context, m MetricValues, c *metricsCache, buckets []string) error {
dataUsageInfo, err := c.dataUsageInfo.Get()
if err != nil {
logger.LogIf(ctx, err)
return nil
}
// data usage has not been captured yet.
if dataUsageInfo.LastUpdate.IsZero() {
return nil
}
m.Set(usageSinceLastUpdateSeconds, float64(time.Since(dataUsageInfo.LastUpdate)))
for _, bucket := range buckets {
usage, ok := dataUsageInfo.BucketsUsage[bucket]
if !ok {
continue
}
quota, err := globalBucketQuotaSys.Get(ctx, bucket)
if err != nil {
// Log and continue if we are unable to retrieve metrics for this
// bucket.
logger.LogIf(ctx, err)
continue
}
m.Set(usageBucketTotalBytes, float64(usage.Size), "bucket", bucket)
m.Set(usageBucketObjectsCount, float64(usage.ObjectsCount), "bucket", bucket)
m.Set(usageBucketVersionsCount, float64(usage.VersionsCount), "bucket", bucket)
m.Set(usageBucketDeleteMarkersCount, float64(usage.DeleteMarkersCount), "bucket", bucket)
if quota != nil && quota.Quota > 0 {
m.Set(usageBucketQuotaTotalBytes, float64(quota.Quota), "bucket", bucket)
}
for k, v := range usage.ObjectSizesHistogram {
m.Set(usageBucketObjectSizeDistribution, float64(v), "range", k, "bucket", bucket)
}
for k, v := range usage.ObjectVersionsHistogram {
m.Set(usageBucketObjectVersionCountDistribution, float64(v), "range", k, "bucket", bucket)
}
}
return nil
}

254
cmd/metrics-v3-handler.go Normal file
View File

@ -0,0 +1,254 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"encoding/json"
"fmt"
"net/http"
"slices"
"strings"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/mcontext"
"github.com/minio/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type promLogger struct{}
func (p promLogger) Println(v ...interface{}) {
s := make([]string, 0, len(v))
for _, val := range v {
s = append(s, fmt.Sprintf("%v", val))
}
err := fmt.Errorf("metrics handler error: %v", strings.Join(s, " "))
logger.LogIf(GlobalContext, err)
}
type metricsV3Server struct {
registry *prometheus.Registry
opts promhttp.HandlerOpts
authFn func(http.Handler) http.Handler
metricsData *metricsV3Collection
}
func newMetricsV3Server(authType prometheusAuthType) *metricsV3Server {
registry := prometheus.NewRegistry()
authFn := AuthMiddleware
if authType == prometheusPublic {
authFn = NoAuthMiddleware
}
metricGroups := newMetricGroups(registry)
return &metricsV3Server{
registry: registry,
opts: promhttp.HandlerOpts{
ErrorLog: promLogger{},
ErrorHandling: promhttp.HTTPErrorOnError,
Registry: registry,
MaxRequestsInFlight: 2,
},
authFn: authFn,
metricsData: metricGroups,
}
}
// metricDisplay - contains info on a metric for display purposes.
type metricDisplay struct {
Name string `json:"name"`
Help string `json:"help"`
Type string `json:"type"`
Labels []string `json:"labels"`
}
func (md metricDisplay) String() string {
return fmt.Sprintf("Name: %s\nType: %s\nHelp: %s\nLabels: {%s}\n", md.Name, md.Type, md.Help, strings.Join(md.Labels, ","))
}
func (md metricDisplay) TableRow() string {
labels := strings.Join(md.Labels, ",")
if labels == "" {
labels = ""
} else {
labels = "`" + labels + "`"
}
return fmt.Sprintf("| `%s` | `%s` | %s | %s |\n", md.Name, md.Type, md.Help, labels)
}
// listMetrics - returns a handler that lists all the metrics that could be
// returned for the requested path.
//
// FIXME: It currently only lists `minio_` prefixed metrics.
func (h *metricsV3Server) listMetrics(path string) http.Handler {
// First collect all matching MetricsGroup's
matchingMG := make(map[collectorPath]*MetricsGroup)
for _, collPath := range h.metricsData.collectorPaths {
if collPath.isDescendantOf(path) {
if v, ok := h.metricsData.mgMap[collPath]; ok {
matchingMG[collPath] = v
} else {
matchingMG[collPath] = h.metricsData.bucketMGMap[collPath]
}
}
}
if len(matchingMG) == 0 {
return nil
}
var metrics []metricDisplay
for _, collectorPath := range h.metricsData.collectorPaths {
if mg, ok := matchingMG[collectorPath]; ok {
var commonLabels []string
for k := range mg.ExtraLabels {
commonLabels = append(commonLabels, k)
}
for _, d := range mg.Descriptors {
labels := slices.Clone(d.VariableLabels)
labels = append(labels, commonLabels...)
metric := metricDisplay{
Name: mg.MetricFQN(d.Name),
Help: d.Help,
Type: d.Type.String(),
Labels: labels,
}
metrics = append(metrics, metric)
}
}
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
if contentType == "application/json" {
w.Header().Set("Content-Type", "application/json")
jsonEncoder := json.NewEncoder(w)
jsonEncoder.Encode(metrics)
return
}
// If not JSON, return plain text. We format it as a markdown table for
// readability.
w.Header().Set("Content-Type", "text/plain")
var b strings.Builder
b.WriteString("| Name | Type | Help | Labels |\n")
b.WriteString("| ---- | ---- | ---- | ------ |\n")
for _, metric := range metrics {
b.WriteString(metric.TableRow())
}
w.Write([]byte(b.String()))
})
}
func (h *metricsV3Server) handle(path string, isListingRequest bool, buckets []string) http.Handler {
var notFoundHandler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Metrics Resource Not found", http.StatusNotFound)
})
// Require that metrics path has at least component.
if path == "/" {
return notFoundHandler
}
if isListingRequest {
handler := h.listMetrics(path)
if handler == nil {
return notFoundHandler
}
return handler
}
// In each of the following cases, we check if the collect path is a
// descendant of `path`, and if so, we add the corresponding gatherer to
// the list of gatherers. This way, /api/a will return all metrics returned
// by /api/a/b and /api/a/c (and any other matching descendant collector
// paths).
var gatherers []prometheus.Gatherer
for _, collectorPath := range h.metricsData.collectorPaths {
if collectorPath.isDescendantOf(path) {
gatherer := h.metricsData.mgGatherers[collectorPath]
// For Bucket metrics we need to set the buckets argument inside the
// metric group, so that it will affect collection. If no buckets
// are provided, we will not return bucket metrics.
if bmg, ok := h.metricsData.bucketMGMap[collectorPath]; ok {
if len(buckets) == 0 {
continue
}
unLocker := bmg.LockAndSetBuckets(buckets)
defer unLocker()
}
gatherers = append(gatherers, gatherer)
}
}
if len(gatherers) == 0 {
return notFoundHandler
}
return promhttp.HandlerFor(prometheus.Gatherers(gatherers), h.opts)
}
// ServeHTTP - implements http.Handler interface.
//
// When the `list` query parameter is provided (its value is ignored), the
// server lists all metrics that could be returned for the requested path.
//
// The (repeatable) `buckets` query parameter is a list of bucket names (or it
// could be a comma separated value) to return metrics with a bucket label.
// Bucket metrics will be returned only for the provided buckets. If no buckets
// parameter is provided, no bucket metrics are returned.
func (h *metricsV3Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pathComponents := mux.Vars(r)["pathComps"]
isListingRequest := r.Form.Has("list")
// Parse optional buckets query parameter.
bucketsParam := r.Form["buckets"]
buckets := make([]string, 0, len(bucketsParam))
for _, bp := range bucketsParam {
bp = strings.TrimSpace(bp)
if bp == "" {
continue
}
splits := strings.Split(bp, ",")
for _, split := range splits {
buckets = append(buckets, strings.TrimSpace(split))
}
}
innerHandler := h.handle(pathComponents, isListingRequest, buckets)
// Add tracing to the prom. handler
tracedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tc, ok := r.Context().Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
if ok {
tc.FuncName = "handler.MetricsV3"
tc.ResponseRecorder.LogErrBody = true
}
innerHandler.ServeHTTP(w, r)
})
// Add authentication
h.authFn(tracedHandler).ServeHTTP(w, r)
}

View File

@ -0,0 +1,126 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"strconv"
"github.com/minio/minio/internal/logger"
)
// label constants
const (
driveL = "drive"
poolIndexL = "pool_index"
setIndexL = "set_index"
driveIndexL = "drive_index"
apiL = "api"
)
var allDriveLabels = []string{driveL, poolIndexL, setIndexL, driveIndexL}
const (
driveUsedBytes = "used_bytes"
driveFreeBytes = "free_bytes"
driveTotalBytes = "total_bytes"
driveFreeInodes = "free_inodes"
driveTimeoutErrorsTotal = "timeout_errors_total"
driveAvailabilityErrorsTotal = "availability_errors_total"
driveWaitingIO = "waiting_io"
driveAPILatencyMicros = "api_latency_micros"
driveOfflineCount = "offline_count"
driveOnlineCount = "online_count"
driveCount = "count"
)
var (
driveUsedBytesMD = NewGaugeMD(driveUsedBytes,
"Total storage used on a drive in bytes", allDriveLabels...)
driveFreeBytesMD = NewGaugeMD(driveFreeBytes,
"Total storage free on a drive in bytes", allDriveLabels...)
driveTotalBytesMD = NewGaugeMD(driveTotalBytes,
"Total storage available on a drive in bytes", allDriveLabels...)
driveFreeInodesMD = NewGaugeMD(driveFreeInodes,
"Total free inodes on a drive", allDriveLabels...)
driveTimeoutErrorsMD = NewCounterMD(driveTimeoutErrorsTotal,
"Total timeout errors on a drive", allDriveLabels...)
driveAvailabilityErrorsMD = NewCounterMD(driveAvailabilityErrorsTotal,
"Total availability errors (I/O errors, permission denied and timeouts) on a drive",
allDriveLabels...)
driveWaitingIOMD = NewGaugeMD(driveWaitingIO,
"Total waiting I/O operations on a drive", allDriveLabels...)
driveAPILatencyMD = NewGaugeMD(driveAPILatencyMicros,
"Average last minute latency in µs for drive API storage operations",
append(allDriveLabels, apiL)...)
driveOfflineCountMD = NewGaugeMD(driveOfflineCount,
"Count of offline drives")
driveOnlineCountMD = NewGaugeMD(driveOnlineCount,
"Count of online drives")
driveCountMD = NewGaugeMD(driveCount,
"Count of all drives")
)
// loadDriveMetrics - `MetricsLoaderFn` for node drive metrics.
func loadDriveMetrics(ctx context.Context, m MetricValues, c *metricsCache) error {
driveMetrics, err := c.driveMetrics.Get()
if err != nil {
logger.LogIf(ctx, err)
return nil
}
storageInfo := driveMetrics.storageInfo
for _, disk := range storageInfo.Disks {
labels := []string{
driveL, disk.DrivePath,
poolIndexL, strconv.Itoa(disk.PoolIndex),
setIndexL, strconv.Itoa(disk.SetIndex),
driveIndexL, strconv.Itoa(disk.DiskIndex),
}
m.Set(driveUsedBytes, float64(disk.UsedSpace), labels...)
m.Set(driveFreeBytes, float64(disk.AvailableSpace), labels...)
m.Set(driveTotalBytes, float64(disk.TotalSpace), labels...)
m.Set(driveFreeInodes, float64(disk.FreeInodes), labels...)
if disk.Metrics != nil {
m.Set(driveTimeoutErrorsTotal, float64(disk.Metrics.TotalErrorsTimeout), labels...)
m.Set(driveAvailabilityErrorsTotal, float64(disk.Metrics.TotalErrorsAvailability), labels...)
m.Set(driveWaitingIO, float64(disk.Metrics.TotalWaiting), labels...)
// Append the api label for the drive API latencies.
labels = append(labels, "api", "")
lastIdx := len(labels) - 1
for apiName, latency := range disk.Metrics.LastMinute {
labels[lastIdx] = "storage." + apiName
m.Set(driveAPILatencyMicros, float64(latency.Avg().Microseconds()),
labels...)
}
}
}
m.Set(driveOfflineCount, float64(driveMetrics.offlineDrives))
m.Set(driveOnlineCount, float64(driveMetrics.onlineDrives))
m.Set(driveCount, float64(driveMetrics.totalDrives))
return nil
}

View File

@ -0,0 +1,61 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"github.com/minio/minio/internal/rest"
)
const (
internodeErrorsTotal MetricName = "errors_total"
internodeDialErrorsTotal MetricName = "dial_errors_total"
internodeDialAvgTimeNanos MetricName = "dial_avg_time_nanos"
internodeSentBytesTotal MetricName = "sent_bytes_total"
internodeRecvBytesTotal MetricName = "recv_bytes_total"
)
var (
internodeErrorsTotalMD = NewCounterMD(internodeErrorsTotal,
"Total number of failed internode calls")
internodeDialedErrorsTotalMD = NewCounterMD(internodeDialErrorsTotal,
"Total number of internode TCP dial timeouts and errors")
internodeDialAvgTimeNanosMD = NewGaugeMD(internodeDialAvgTimeNanos,
"Average dial time of internode TCP calls in nanoseconds")
internodeSentBytesTotalMD = NewCounterMD(internodeSentBytesTotal,
"Total number of bytes sent to other peer nodes")
internodeRecvBytesTotalMD = NewCounterMD(internodeRecvBytesTotal,
"Total number of bytes received from other peer nodes")
)
// loadNetworkInternodeMetrics - reads internode network metrics.
//
// This is a `MetricsLoaderFn`.
func loadNetworkInternodeMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
connStats := globalConnStats.toServerConnStats()
rpcStats := rest.GetRPCStats()
if globalIsDistErasure {
m.Set(internodeErrorsTotal, float64(rpcStats.Errs))
m.Set(internodeDialErrorsTotal, float64(rpcStats.DialErrs))
m.Set(internodeDialAvgTimeNanos, float64(rpcStats.DialAvgDuration))
m.Set(internodeSentBytesTotal, float64(connStats.internodeOutputBytes))
m.Set(internodeRecvBytesTotal, float64(connStats.internodeInputBytes))
}
return nil
}

487
cmd/metrics-v3-types.go Normal file
View File

@ -0,0 +1,487 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"strings"
"sync"
"github.com/minio/minio/internal/logger"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
)
type collectorPath string
// metricPrefix converts a collector path to a metric name prefix. The path is
// converted to snake-case (by replaced '/' and '-' with '_') and prefixed with
// `minio_`.
func (cp collectorPath) metricPrefix() string {
s := strings.TrimPrefix(string(cp), "/")
s = strings.ReplaceAll(s, "/", "_")
s = strings.ReplaceAll(s, "-", "_")
return "minio_" + s
}
// isDescendantOf returns true if it is a descendant of (or the same as)
// `ancestor`.
//
// For example:
//
// /a, /a/b, /a/b/c are all descendants of /a.
// /abc or /abd/a are not descendants of /ab.
func (cp collectorPath) isDescendantOf(arg string) bool {
descendant := string(cp)
if descendant == arg {
return true
}
if len(arg) >= len(descendant) {
return false
}
if !strings.HasSuffix(arg, "/") {
arg += "/"
}
return strings.HasPrefix(descendant, arg)
}
// MetricType - represents the type of a metric.
type MetricType int
const (
// CounterMT - represents a counter metric.
CounterMT MetricType = iota
// GaugeMT - represents a gauge metric.
GaugeMT
// HistogramMT - represents a histogram metric.
HistogramMT
)
func (mt MetricType) String() string {
switch mt {
case CounterMT:
return "counter"
case GaugeMT:
return "gauge"
case HistogramMT:
return "histogram"
default:
return "*unknown*"
}
}
func (mt MetricType) toProm() prometheus.ValueType {
switch mt {
case CounterMT:
return prometheus.CounterValue
case GaugeMT:
return prometheus.GaugeValue
case HistogramMT:
return prometheus.CounterValue
default:
panic(fmt.Sprintf("unknown metric type: %d", mt))
}
}
// MetricDescriptor - represents a metric descriptor.
type MetricDescriptor struct {
Name MetricName
Type MetricType
Help string
VariableLabels []string
// managed values follow:
labelSet map[string]struct{}
}
func (md *MetricDescriptor) getLabelSet() map[string]struct{} {
if md.labelSet != nil {
return md.labelSet
}
md.labelSet = make(map[string]struct{}, len(md.VariableLabels))
for _, label := range md.VariableLabels {
md.labelSet[label] = struct{}{}
}
return md.labelSet
}
func (md *MetricDescriptor) toPromName(namePrefix string) string {
return prometheus.BuildFQName(namePrefix, "", string(md.Name))
}
func (md *MetricDescriptor) toPromDesc(namePrefix string, extraLabels map[string]string) *prometheus.Desc {
return prometheus.NewDesc(
md.toPromName(namePrefix),
md.Help,
md.VariableLabels, extraLabels,
)
}
// NewCounterMD - creates a new counter metric descriptor.
func NewCounterMD(name MetricName, help string, labels ...string) MetricDescriptor {
return MetricDescriptor{
Name: name,
Type: CounterMT,
Help: help,
VariableLabels: labels,
}
}
// NewGaugeMD - creates a new gauge metric descriptor.
func NewGaugeMD(name MetricName, help string, labels ...string) MetricDescriptor {
return MetricDescriptor{
Name: name,
Type: GaugeMT,
Help: help,
VariableLabels: labels,
}
}
type metricValue struct {
Labels map[string]string
Value float64
}
// MetricValues - type to set metric values retrieved while loading metrics. A
// value of this type is passed to the `MetricsLoaderFn`.
type MetricValues struct {
values map[MetricName][]metricValue
descriptors map[MetricName]MetricDescriptor
}
func newMetricValues(d map[MetricName]MetricDescriptor) MetricValues {
return MetricValues{
values: make(map[MetricName][]metricValue, len(d)),
descriptors: d,
}
}
// ToPromMetrics - converts the internal metric values to Prometheus
// adding the given name prefix. The extraLabels are added to each metric as
// constant labels.
func (m *MetricValues) ToPromMetrics(namePrefix string, extraLabels map[string]string,
) []prometheus.Metric {
metrics := make([]prometheus.Metric, 0, len(m.values))
for metricName, mv := range m.values {
desc := m.descriptors[metricName]
promDesc := desc.toPromDesc(namePrefix, extraLabels)
for _, v := range mv {
// labelValues is in the same order as the variable labels in the
// descriptor.
labelValues := make([]string, 0, len(v.Labels))
for _, k := range desc.VariableLabels {
labelValues = append(labelValues, v.Labels[k])
}
metrics = append(metrics,
prometheus.MustNewConstMetric(promDesc, desc.Type.toProm(), v.Value,
labelValues...))
}
}
return metrics
}
// Set - sets a metric value along with any provided labels. It is used only
// with Gauge and Counter metrics.
//
// If the MetricName given here is not present in the `MetricsGroup`'s
// descriptors, this function panics.
//
// Panics if `labels` is not a list of ordered label name and label value pairs
// or if all labels for the metric are not provided.
func (m *MetricValues) Set(name MetricName, value float64, labels ...string) {
desc, ok := m.descriptors[name]
if !ok {
panic(fmt.Sprintf("metric has no description: %s", name))
}
if len(labels)%2 != 0 {
panic("labels must be a list of ordered key-value pairs")
}
validLabels := desc.getLabelSet()
labelMap := make(map[string]string, len(labels)/2)
for i := 0; i < len(labels); i += 2 {
if _, ok := validLabels[labels[i]]; !ok {
panic(fmt.Sprintf("invalid label: %s (metric: %s)", labels[i], name))
}
labelMap[labels[i]] = labels[i+1]
}
if len(labels)/2 != len(validLabels) {
panic(fmt.Sprintf("not all labels were given values"))
}
v, ok := m.values[name]
if !ok {
v = make([]metricValue, 0, 1)
}
m.values[name] = append(v, metricValue{
Labels: labelMap,
Value: value,
})
}
// SetHistogram - sets values for the given MetricName using the provided
// histogram.
//
// `renameLabels` is a map of label names to rename. The keys are the original
// label names and the values are the new label names.
//
// TODO: bucketFilter doc
//
// `extraLabels` are additional labels to add to each metric. They are ordered
// label name and value pairs.
func (m *MetricValues) SetHistogram(name MetricName, hist *prometheus.HistogramVec,
renameLabels map[string]string, bucketFilter []string, extraLabels ...string,
) {
if _, ok := m.descriptors[name]; !ok {
panic(fmt.Sprintf("metric has no description: %s", name))
}
dummyDesc := MetricDescription{}
metricsV2 := getHistogramMetrics(hist, dummyDesc, false)
for _, metric := range metricsV2 {
// If a bucket filter is provided, only add metrics for the given
// buckets.
if len(bucketFilter) > 0 {
if !slices.Contains(bucketFilter, metric.VariableLabels["bucket"]) {
continue
}
}
labels := make([]string, 0, len(metric.VariableLabels)*2)
for k, v := range metric.VariableLabels {
if newLabel, ok := renameLabels[k]; ok {
labels = append(labels, newLabel, v)
} else {
labels = append(labels, k, v)
}
}
labels = append(labels, extraLabels...)
m.Set(name, metric.Value, labels...)
}
}
// MetricsLoaderFn - represents a function to load metrics from the
// metricsCache.
//
// Note that returning an error here will cause the Metrics handler to return a
// 500 Internal Server Error.
type MetricsLoaderFn func(context.Context, MetricValues, *metricsCache) error
// JoinLoaders - joins multiple loaders into a single loader. The returned
// loader will call each of the given loaders in order. If any of the loaders
// return an error, the returned loader will return that error.
func JoinLoaders(loaders ...MetricsLoaderFn) MetricsLoaderFn {
return func(ctx context.Context, m MetricValues, c *metricsCache) error {
for _, loader := range loaders {
if err := loader(ctx, m, c); err != nil {
return err
}
}
return nil
}
}
// BucketMetricsLoaderFn - represents a function to load metrics from the
// metricsCache and the system for a given list of buckets.
//
// Note that returning an error here will cause the Metrics handler to return a
// 500 Internal Server Error.
type BucketMetricsLoaderFn func(context.Context, MetricValues, *metricsCache, []string) error
// JoinBucketLoaders - joins multiple bucket loaders into a single loader,
// similar to `JoinLoaders`.
func JoinBucketLoaders(loaders ...BucketMetricsLoaderFn) BucketMetricsLoaderFn {
return func(ctx context.Context, m MetricValues, c *metricsCache, b []string) error {
for _, loader := range loaders {
if err := loader(ctx, m, c, b); err != nil {
return err
}
}
return nil
}
}
// MetricsGroup - represents a group of metrics. It includes a `MetricsLoaderFn`
// function that provides a way to load the metrics from the system. The metrics
// are cached and refreshed after a given timeout.
//
// For metrics with a `bucket` dimension, a list of buckets argument is required
// to collect the metrics.
//
// It implements the prometheus.Collector interface for metric groups without a
// bucket dimension. For metric groups with a bucket dimension, use the
// `GetBucketCollector` method to get a `BucketCollector` that implements the
// prometheus.Collector interface.
type MetricsGroup struct {
// Path (relative to the Metrics v3 base endpoint) at which this group of
// metrics is served. This value is converted into a metric name prefix
// using `.metricPrefix()` and is added to each metric returned.
CollectorPath collectorPath
// List of all metric descriptors that could be returned by the loader.
Descriptors []MetricDescriptor
// (Optional) Extra (constant) label KV pairs to be added to each metric in
// the group.
ExtraLabels map[string]string
// Loader functions to load metrics. Only one of these will be set. Metrics
// returned by these functions must be present in the `Descriptors` list.
loader MetricsLoaderFn
bucketLoader BucketMetricsLoaderFn
// Cache for all metrics groups. Set via `.SetCache` method.
cache *metricsCache
// managed values follow:
// map of metric descriptors by metric name.
descriptorMap map[MetricName]MetricDescriptor
// For bucket metrics, the list of buckets is stored here. It is used in the
// Collect() call. This is protected by the `bucketsLock`.
bucketsLock sync.Mutex
buckets []string
}
// NewMetricsGroup creates a new MetricsGroup. To create a metrics group for
// metrics with a `bucket` dimension (label), use `NewBucketMetricsGroup`.
//
// The `loader` function loads metrics from the cache and the system.
func NewMetricsGroup(path collectorPath, descriptors []MetricDescriptor,
loader MetricsLoaderFn,
) *MetricsGroup {
mg := &MetricsGroup{
CollectorPath: path,
Descriptors: descriptors,
loader: loader,
}
mg.validate()
return mg
}
// NewBucketMetricsGroup creates a new MetricsGroup for metrics with a `bucket`
// dimension (label).
//
// The `loader` function loads metrics from the cache and the system for a given
// list of buckets.
func NewBucketMetricsGroup(path collectorPath, descriptors []MetricDescriptor,
loader BucketMetricsLoaderFn,
) *MetricsGroup {
mg := &MetricsGroup{
CollectorPath: path,
Descriptors: descriptors,
bucketLoader: loader,
}
mg.validate()
return mg
}
// AddExtraLabels - adds extra (constant) label KV pairs to the metrics group.
// This is a helper to initialize the `ExtraLabels` field. The argument is a
// list of ordered label name and value pairs.
func (mg *MetricsGroup) AddExtraLabels(labels ...string) {
if len(labels)%2 != 0 {
panic("Labels must be an ordered list of name value pairs")
}
if mg.ExtraLabels == nil {
mg.ExtraLabels = make(map[string]string, len(labels))
}
for i := 0; i < len(labels); i += 2 {
mg.ExtraLabels[labels[i]] = labels[i+1]
}
}
// IsBucketMetricsGroup - returns true if the given MetricsGroup is a bucket
// metrics group.
func (mg *MetricsGroup) IsBucketMetricsGroup() bool {
return mg.bucketLoader != nil
}
// Describe - implements prometheus.Collector interface.
func (mg *MetricsGroup) Describe(ch chan<- *prometheus.Desc) {
for _, desc := range mg.Descriptors {
ch <- desc.toPromDesc(mg.CollectorPath.metricPrefix(), mg.ExtraLabels)
}
}
// Collect - implements prometheus.Collector interface.
func (mg *MetricsGroup) Collect(ch chan<- prometheus.Metric) {
metricValues := newMetricValues(mg.descriptorMap)
var err error
if mg.IsBucketMetricsGroup() {
err = mg.bucketLoader(GlobalContext, metricValues, mg.cache, mg.buckets)
} else {
err = mg.loader(GlobalContext, metricValues, mg.cache)
}
// There is no way to handle errors here, so we panic the current goroutine
// and the Metrics API handler returns a 500 HTTP status code. This should
// normally not happen, and usually indicates a bug.
logger.CriticalIf(GlobalContext, errors.Wrap(err, "failed to get metrics"))
promMetrics := metricValues.ToPromMetrics(mg.CollectorPath.metricPrefix(),
mg.ExtraLabels)
for _, metric := range promMetrics {
ch <- metric
}
}
// LockAndSetBuckets - locks the buckets and sets the given buckets. It returns
// a function to unlock the buckets.
func (mg *MetricsGroup) LockAndSetBuckets(buckets []string) func() {
mg.bucketsLock.Lock()
mg.buckets = buckets
return func() {
mg.bucketsLock.Unlock()
}
}
// MetricFQN - returns the fully qualified name for the given metric name.
func (mg *MetricsGroup) MetricFQN(name MetricName) string {
v, ok := mg.descriptorMap[name]
if !ok {
// This should never happen.
return ""
}
return v.toPromName(mg.CollectorPath.metricPrefix())
}
func (mg *MetricsGroup) validate() {
if len(mg.Descriptors) == 0 {
panic("Descriptors must be set")
}
// For bools A and B, A XOR B <=> A != B.
isExactlyOneSet := (mg.loader == nil) != (mg.bucketLoader == nil)
if !isExactlyOneSet {
panic("Exactly one Loader function must be set")
}
mg.descriptorMap = make(map[MetricName]MetricDescriptor, len(mg.Descriptors))
for _, desc := range mg.Descriptors {
mg.descriptorMap[desc.Name] = desc
}
}
// SetCache is a helper to initialize MetricsGroup. It sets the cache object.
func (mg *MetricsGroup) SetCache(c *metricsCache) {
mg.cache = c
}

272
cmd/metrics-v3.go Normal file
View File

@ -0,0 +1,272 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"slices"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
// Collector paths.
//
// These are paths under the top-level /minio/metrics/v3 metrics endpoint. Each
// of these paths returns a set of V3 metrics.
const (
apiRequestsCollectorPath collectorPath = "/api/requests"
apiBucketCollectorPath collectorPath = "/api/bucket"
systemNetworkInternodeCollectorPath collectorPath = "/system/network/internode"
systemDriveCollectorPath collectorPath = "/system/drive"
systemProcessCollectorPath collectorPath = "/system/process"
systemGoCollectorPath collectorPath = "/system/go"
clusterHealthCollectorPath collectorPath = "/cluster/health"
clusterUsageObjectsCollectorPath collectorPath = "/cluster/usage/objects"
clusterUsageBucketsCollectorPath collectorPath = "/cluster/usage/buckets"
clusterErasureSetCollectorPath collectorPath = "/cluster/erasure-set"
)
const (
clusterBasePath = "/cluster"
)
type metricsV3Collection struct {
mgMap map[collectorPath]*MetricsGroup
bucketMGMap map[collectorPath]*MetricsGroup
// Gatherers for non-bucket MetricsGroup's
mgGatherers map[collectorPath]prometheus.Gatherer
collectorPaths []collectorPath
}
func newMetricGroups(r *prometheus.Registry) *metricsV3Collection {
// Create all metric groups.
apiRequestsMG := NewMetricsGroup(apiRequestsCollectorPath,
[]MetricDescriptor{
apiRejectedAuthTotalMD,
apiRejectedHeaderTotalMD,
apiRejectedTimestampTotalMD,
apiRejectedInvalidTotalMD,
apiRequestsWaitingTotalMD,
apiRequestsIncomingTotalMD,
apiRequestsInFlightTotalMD,
apiRequestsTotalMD,
apiRequestsErrorsTotalMD,
apiRequests5xxErrorsTotalMD,
apiRequests4xxErrorsTotalMD,
apiRequestsCanceledTotalMD,
apiRequestsTTFBSecondsDistributionMD,
apiTrafficSentBytesMD,
apiTrafficRecvBytesMD,
},
JoinLoaders(loadAPIRequestsHTTPMetrics, loadAPIRequestsTTFBMetrics,
loadAPIRequestsNetworkMetrics),
)
apiBucketMG := NewBucketMetricsGroup(apiBucketCollectorPath,
[]MetricDescriptor{
apiBucketTrafficRecvBytesMD,
apiBucketTrafficSentBytesMD,
apiBucketRequestsInFlightMD,
apiBucketRequestsTotalMD,
apiBucketRequestsCanceledMD,
apiBucketRequests4xxErrorsMD,
apiBucketRequests5xxErrorsMD,
apiBucketRequestsTTFBSecondsDistributionMD,
},
JoinBucketLoaders(loadAPIBucketHTTPMetrics, loadAPIBucketTTFBMetrics),
)
systemNetworkInternodeMG := NewMetricsGroup(systemNetworkInternodeCollectorPath,
[]MetricDescriptor{
internodeErrorsTotalMD,
internodeDialedErrorsTotalMD,
internodeDialAvgTimeNanosMD,
internodeSentBytesTotalMD,
internodeRecvBytesTotalMD,
},
loadNetworkInternodeMetrics,
)
systemDriveMG := NewMetricsGroup(systemDriveCollectorPath,
[]MetricDescriptor{
driveUsedBytesMD,
driveFreeBytesMD,
driveTotalBytesMD,
driveFreeInodesMD,
driveTimeoutErrorsMD,
driveAvailabilityErrorsMD,
driveWaitingIOMD,
driveAPILatencyMD,
driveOfflineCountMD,
driveOnlineCountMD,
driveCountMD,
},
loadDriveMetrics,
)
clusterHealthMG := NewMetricsGroup(clusterHealthCollectorPath,
[]MetricDescriptor{
healthDrivesOfflineCountMD,
healthDrivesOnlineCountMD,
healthDrivesCountMD,
healthNodesOfflineCountMD,
healthNodesOnlineCountMD,
healthCapacityRawTotalBytesMD,
healthCapacityRawFreeBytesMD,
healthCapacityUsableTotalBytesMD,
healthCapacityUsableFreeBytesMD,
},
JoinLoaders(loadClusterHealthDriveMetrics,
loadClusterHealthNodeMetrics,
loadClusterHealthCapacityMetrics),
)
clusterUsageObjectsMG := NewMetricsGroup(clusterUsageObjectsCollectorPath,
[]MetricDescriptor{
usageSinceLastUpdateSecondsMD,
usageTotalBytesMD,
usageObjectsCountMD,
usageVersionsCountMD,
usageDeleteMarkersCountMD,
usageBucketsCountMD,
usageObjectsDistributionMD,
usageVersionsDistributionMD,
},
loadClusterUsageObjectMetrics,
)
clusterUsageBucketsMG := NewBucketMetricsGroup(clusterUsageBucketsCollectorPath,
[]MetricDescriptor{
usageSinceLastUpdateSecondsMD,
usageBucketTotalBytesMD,
usageBucketObjectsTotalMD,
usageBucketVersionsCountMD,
usageBucketDeleteMarkersCountMD,
usageBucketQuotaTotalBytesMD,
usageBucketObjectSizeDistributionMD,
usageBucketObjectVersionCountDistributionMD,
},
loadClusterUsageBucketMetrics,
)
clusterErasureSetMG := NewMetricsGroup(clusterErasureSetCollectorPath,
[]MetricDescriptor{
erasureSetOverallWriteQuorumMD,
erasureSetOverallHealthMD,
erasureSetReadQuorumMD,
erasureSetWriteQuorumMD,
erasureSetOnlineDrivesCountMD,
erasureSetHealingDrivesCountMD,
erasureSetHealthMD,
},
loadClusterErasureSetMetrics,
)
allMetricGroups := []*MetricsGroup{
apiRequestsMG,
apiBucketMG,
systemNetworkInternodeMG,
systemDriveMG,
clusterHealthMG,
clusterUsageObjectsMG,
clusterUsageBucketsMG,
clusterErasureSetMG,
}
// Bucket metrics are special, they always include the bucket label. These
// metrics required a list of buckets to be passed to the loader, and the list
// of buckets is not known until the request is made. So we keep a separate
// map for bucket metrics and handle them specially.
// Add the serverName and poolIndex labels to all non-cluster metrics.
//
// Also create metric group maps and set the cache.
metricsCache := newMetricsCache()
mgMap := make(map[collectorPath]*MetricsGroup)
bucketMGMap := make(map[collectorPath]*MetricsGroup)
for _, mg := range allMetricGroups {
if !strings.HasPrefix(string(mg.CollectorPath), clusterBasePath) {
mg.AddExtraLabels(
serverName, globalLocalNodeName,
// poolIndex, strconv.Itoa(globalLocalPoolIdx),
)
}
mg.SetCache(metricsCache)
if mg.IsBucketMetricsGroup() {
bucketMGMap[mg.CollectorPath] = mg
} else {
mgMap[mg.CollectorPath] = mg
}
}
// Prepare to register the collectors. Other than `MetricGroup` collectors,
// we also have standard collectors like `ProcessCollector` and `GoCollector`.
// Create all Non-`MetricGroup` collectors here.
collectors := map[collectorPath]prometheus.Collector{
systemProcessCollectorPath: collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
ReportErrors: true,
}),
systemGoCollectorPath: collectors.NewGoCollector(),
}
// Add all `MetricGroup` collectors to the map.
for _, mg := range allMetricGroups {
collectors[mg.CollectorPath] = mg
}
// Helper function to register a collector and return a gatherer for it.
mustRegister := func(c ...prometheus.Collector) prometheus.Gatherer {
subRegistry := prometheus.NewRegistry()
for _, col := range c {
subRegistry.MustRegister(col)
}
r.MustRegister(subRegistry)
return subRegistry
}
// Register all collectors and create gatherers for them.
gatherers := make(map[collectorPath]prometheus.Gatherer, len(collectors))
collectorPaths := make([]collectorPath, 0, len(collectors))
for path, collector := range collectors {
gatherers[path] = mustRegister(collector)
collectorPaths = append(collectorPaths, path)
}
slices.Sort(collectorPaths)
return &metricsV3Collection{
mgMap: mgMap,
bucketMGMap: bucketMGMap,
mgGatherers: gatherers,
collectorPaths: collectorPaths,
}
}

View File

@ -870,12 +870,12 @@ func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType,
} }
// GetResourceMetrics - gets the resource metrics from all nodes excluding self. // GetResourceMetrics - gets the resource metrics from all nodes excluding self.
func (sys *NotificationSys) GetResourceMetrics(ctx context.Context) <-chan Metric { func (sys *NotificationSys) GetResourceMetrics(ctx context.Context) <-chan MetricV2 {
if sys == nil { if sys == nil {
return nil return nil
} }
g := errgroup.WithNErrs(len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients)) peerChannels := make([]<-chan MetricV2, len(sys.peerClients))
for index := range sys.peerClients { for index := range sys.peerClients {
index := index index := index
g.Go(func() error { g.Go(func() error {
@ -1214,8 +1214,8 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...
return consolidatedReport return consolidatedReport
} }
func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels []<-chan Metric, g *errgroup.Group) <-chan Metric { func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels []<-chan MetricV2, g *errgroup.Group) <-chan MetricV2 {
ch := make(chan Metric) ch := make(chan MetricV2)
var wg sync.WaitGroup var wg sync.WaitGroup
for index, err := range g.Wait() { for index, err := range g.Wait() {
if err != nil { if err != nil {
@ -1229,7 +1229,7 @@ func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels
continue continue
} }
wg.Add(1) wg.Add(1)
go func(ctx context.Context, peerChannel <-chan Metric, wg *sync.WaitGroup) { go func(ctx context.Context, peerChannel <-chan MetricV2, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
@ -1248,7 +1248,7 @@ func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels
} }
}(ctx, peerChannels[index], &wg) }(ctx, peerChannels[index], &wg)
} }
go func(wg *sync.WaitGroup, ch chan Metric) { go func(wg *sync.WaitGroup, ch chan MetricV2) {
wg.Wait() wg.Wait()
xioutil.SafeClose(ch) xioutil.SafeClose(ch)
}(&wg, ch) }(&wg, ch)
@ -1256,12 +1256,12 @@ func (sys *NotificationSys) collectPeerMetrics(ctx context.Context, peerChannels
} }
// GetBucketMetrics - gets the cluster level bucket metrics from all nodes excluding self. // GetBucketMetrics - gets the cluster level bucket metrics from all nodes excluding self.
func (sys *NotificationSys) GetBucketMetrics(ctx context.Context) <-chan Metric { func (sys *NotificationSys) GetBucketMetrics(ctx context.Context) <-chan MetricV2 {
if sys == nil { if sys == nil {
return nil return nil
} }
g := errgroup.WithNErrs(len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients)) peerChannels := make([]<-chan MetricV2, len(sys.peerClients))
for index := range sys.peerClients { for index := range sys.peerClients {
index := index index := index
g.Go(func() error { g.Go(func() error {
@ -1277,12 +1277,12 @@ func (sys *NotificationSys) GetBucketMetrics(ctx context.Context) <-chan Metric
} }
// GetClusterMetrics - gets the cluster metrics from all nodes excluding self. // GetClusterMetrics - gets the cluster metrics from all nodes excluding self.
func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) <-chan Metric { func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) <-chan MetricV2 {
if sys == nil { if sys == nil {
return nil return nil
} }
g := errgroup.WithNErrs(len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients)) peerChannels := make([]<-chan MetricV2, len(sys.peerClients))
for index := range sys.peerClients { for index := range sys.peerClients {
index := index index := index
g.Go(func() error { g.Go(func() error {

View File

@ -641,13 +641,13 @@ func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []st
return getBandwidthRPC.Call(ctx, client.gridConn(), values) return getBandwidthRPC.Call(ctx, client.gridConn(), values)
} }
func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Metric, error) { func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan MetricV2, error) {
resp, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) resp, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch := make(chan Metric) ch := make(chan MetricV2)
go func(ch chan<- Metric) { go func(ch chan<- MetricV2) {
defer close(ch) defer close(ch)
for _, m := range resp.Value() { for _, m := range resp.Value() {
if m == nil { if m == nil {
@ -663,12 +663,12 @@ func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Me
return ch, nil return ch, nil
} }
func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric, error) { func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan MetricV2, error) {
resp, err := getPeerMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) resp, err := getPeerMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch := make(chan Metric) ch := make(chan MetricV2)
go func() { go func() {
defer close(ch) defer close(ch)
for _, m := range resp.Value() { for _, m := range resp.Value() {
@ -685,12 +685,12 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric
return ch, nil return ch, nil
} }
func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan Metric, error) { func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan MetricV2, error) {
resp, err := getPeerBucketMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) resp, err := getPeerBucketMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch := make(chan Metric) ch := make(chan MetricV2)
go func() { go func() {
defer close(ch) defer close(ch)
for _, m := range resp.Value() { for _, m := range resp.Value() {

View File

@ -54,7 +54,7 @@ type peerRESTServer struct{}
var ( var (
// Types & Wrappers // Types & Wrappers
aoBucketInfo = grid.NewArrayOf[*BucketInfo](func() *BucketInfo { return &BucketInfo{} }) aoBucketInfo = grid.NewArrayOf[*BucketInfo](func() *BucketInfo { return &BucketInfo{} })
aoMetricsGroup = grid.NewArrayOf[*Metric](func() *Metric { return &Metric{} }) aoMetricsGroup = grid.NewArrayOf[*MetricV2](func() *MetricV2 { return &MetricV2{} })
madminBgHealState = grid.NewJSONPool[madmin.BgHealState]() madminBgHealState = grid.NewJSONPool[madmin.BgHealState]()
madminCPUs = grid.NewJSONPool[madmin.CPUs]() madminCPUs = grid.NewJSONPool[madmin.CPUs]()
madminMemInfo = grid.NewJSONPool[madmin.MemInfo]() madminMemInfo = grid.NewJSONPool[madmin.MemInfo]()
@ -88,9 +88,9 @@ var (
getNetInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.NetInfo]](grid.HandlerGetNetInfo, grid.NewMSS, madminNetInfo.NewJSON) getNetInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.NetInfo]](grid.HandlerGetNetInfo, grid.NewMSS, madminNetInfo.NewJSON)
getOSInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.OSInfo]](grid.HandlerGetOSInfo, grid.NewMSS, madminOSInfo.NewJSON) getOSInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.OSInfo]](grid.HandlerGetOSInfo, grid.NewMSS, madminOSInfo.NewJSON)
getPartitionsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.Partitions]](grid.HandlerGetPartitions, grid.NewMSS, madminPartitions.NewJSON) getPartitionsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.Partitions]](grid.HandlerGetPartitions, grid.NewMSS, madminPartitions.NewJSON)
getPeerBucketMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerBucketMetrics, grid.NewMSS, aoMetricsGroup.New) getPeerBucketMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetPeerBucketMetrics, grid.NewMSS, aoMetricsGroup.New)
getPeerMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerMetrics, grid.NewMSS, aoMetricsGroup.New) getPeerMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetPeerMetrics, grid.NewMSS, aoMetricsGroup.New)
getResourceMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetResourceMetrics, grid.NewMSS, aoMetricsGroup.New) getResourceMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*MetricV2]](grid.HandlerGetResourceMetrics, grid.NewMSS, aoMetricsGroup.New)
getProcInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ProcInfo]](grid.HandlerGetProcInfo, grid.NewMSS, madminProcInfo.NewJSON) getProcInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ProcInfo]](grid.HandlerGetProcInfo, grid.NewMSS, madminProcInfo.NewJSON)
getSRMetricsRPC = grid.NewSingleHandler[*grid.MSS, *SRMetricsSummary](grid.HandlerGetSRMetrics, grid.NewMSS, func() *SRMetricsSummary { return &SRMetricsSummary{} }) getSRMetricsRPC = grid.NewSingleHandler[*grid.MSS, *SRMetricsSummary](grid.HandlerGetSRMetrics, grid.NewMSS, func() *SRMetricsSummary { return &SRMetricsSummary{} })
getSysConfigRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysConfig]](grid.HandlerGetSysConfig, grid.NewMSS, madminSysConfig.NewJSON) getSysConfigRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysConfig]](grid.HandlerGetSysConfig, grid.NewMSS, madminSysConfig.NewJSON)
@ -1000,9 +1000,9 @@ func (s *peerRESTServer) GetBandwidth(params *grid.URLValues) (*bandwidth.Bucket
return globalBucketMonitor.GetReport(selectBuckets), nil return globalBucketMonitor.GetReport(selectBuckets), nil
} }
func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) { func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
res := make([]*Metric, 0, len(resourceMetricsGroups)) res := make([]*MetricV2, 0, len(resourceMetricsGroups))
populateAndPublish(resourceMetricsGroups, func(m Metric) bool { populateAndPublish(resourceMetricsGroups, func(m MetricV2) bool {
if m.VariableLabels == nil { if m.VariableLabels == nil {
m.VariableLabels = make(map[string]string, 1) m.VariableLabels = make(map[string]string, 1)
} }
@ -1014,9 +1014,9 @@ func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*Metric],
} }
// GetPeerMetrics gets the metrics to be federated across peers. // GetPeerMetrics gets the metrics to be federated across peers.
func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) { func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
res := make([]*Metric, 0, len(peerMetricsGroups)) res := make([]*MetricV2, 0, len(peerMetricsGroups))
populateAndPublish(peerMetricsGroups, func(m Metric) bool { populateAndPublish(peerMetricsGroups, func(m MetricV2) bool {
if m.VariableLabels == nil { if m.VariableLabels == nil {
m.VariableLabels = make(map[string]string, 1) m.VariableLabels = make(map[string]string, 1)
} }
@ -1028,9 +1028,9 @@ func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*Metric], *gri
} }
// GetPeerBucketMetrics gets the metrics to be federated across peers. // GetPeerBucketMetrics gets the metrics to be federated across peers.
func (s *peerRESTServer) GetPeerBucketMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) { func (s *peerRESTServer) GetPeerBucketMetrics(_ *grid.MSS) (*grid.Array[*MetricV2], *grid.RemoteErr) {
res := make([]*Metric, 0, len(bucketPeerMetricsGroups)) res := make([]*MetricV2, 0, len(bucketPeerMetricsGroups))
populateAndPublish(bucketPeerMetricsGroups, func(m Metric) bool { populateAndPublish(bucketPeerMetricsGroups, func(m MetricV2) bool {
if m.VariableLabels == nil { if m.VariableLabels == nil {
m.VariableLabels = make(map[string]string, 1) m.VariableLabels = make(map[string]string, 1)
} }

View File

@ -158,17 +158,17 @@ var (
} }
) )
func (t *tierMetrics) Report() []Metric { func (t *tierMetrics) Report() []MetricV2 {
metrics := getHistogramMetrics(t.histogram, tierTTLBMD, true) metrics := getHistogramMetrics(t.histogram, tierTTLBMD, true)
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
for tier, stat := range t.requestsCount { for tier, stat := range t.requestsCount {
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: tierRequestsSuccessMD, Description: tierRequestsSuccessMD,
Value: float64(stat.success), Value: float64(stat.success),
VariableLabels: map[string]string{"tier": tier}, VariableLabels: map[string]string{"tier": tier},
}) })
metrics = append(metrics, Metric{ metrics = append(metrics, MetricV2{
Description: tierRequestsFailureMD, Description: tierRequestsFailureMD,
Value: float64(stat.failure), Value: float64(stat.failure),
VariableLabels: map[string]string{"tier": tier}, VariableLabels: map[string]string{"tier": tier},

178
docs/metrics/v3.md Normal file
View File

@ -0,0 +1,178 @@
# Metrics Version 3
In metrics version 3, all metrics are available under the endpoint:
```
/minio/metrics/v3
```
however, a specific path under this is required.
Metrics are organized into groups at paths **relative** to the top-level endpoint above.
## Metrics Request Handling
Each endpoint below can be queried at different intervals as needed via a scrape configuration in Prometheus or a compatible metrics collection tool.
For ease of configuration, each (non-empty) parent of the path serves all metric endpoints that are at descendant paths. For example, to query all system metrics one needs to only scrape `/minio/metrics/v3/system/`.
Some metrics are bucket specific. These will have a `/bucket` component in their path. As the number of buckets can be large, the metrics scrape operation needs to be provided with a specific list of buckets via the `bucket` query parameter. Only metrics for the given buckets will be returned (with the bucket label set). For example to query API metrics for buckets `test1` and `test2`, make a scrape request to `/minio/metrics/v3/api/bucket?buckets=test1,test2`.
Instead of a metrics scrape, it is also possible to list the metrics that would be returned by a path. This is done by adding a `?list` query parameter. The MinIO server will then list all possible metrics that could be returned. During an actual metrics scrape, only available metrics are returned - not all of them. With the `list` query parameter, the output format can be selected - just set the request `Content-Type` to `application/json` for JSON output, or `text/plain` for a simple markdown formatted table. The latter is the default.
## Request, System and Cluster Metrics
At a high level metrics are grouped into three categories, listed in the following sub-sections. The path in each of the tables is relative to the top-level endpoint.
### Request metrics
These are metrics about requests served by the (current) node.
| Path | Description |
|-----------------|--------------------------------------------------|
| `/api/requests` | Metrics over all requests |
| `/api/bucket` | Metrics over all requests split by bucket labels |
| | |
### System metrics
These are metrics about the minio process and the node.
| Path | Description |
|-----------------------------|---------------------------------------------------|
| `/system/drive` | Metrics about drives on the system |
| `/system/network/internode` | Metrics about internode requests made by the node |
| `/system/process` | Standard process metrics |
| `/system/go` | Standard Go lang metrics |
| | |
### Cluster metrics
These present metrics about the whole MinIO cluster.
| Path | Description |
|--------------------------|-----------------------------|
| `/cluster/health` | Cluster health metrics |
| `/cluster/usage/objects` | Object statistics |
| `/cluster/usage/buckets` | Object statistics by bucket |
| `/cluster/erasure-set` | Erasure set metrics |
| | |
## Metrics Listing
Each of the following sub-sections list metrics returned by each of the endpoints.
The standard metrics groups for ProcessCollector and GoCollector are not shown below.
### `/api/requests`
| Name | Type | Help | Labels |
|------------------------------------------------|-----------|---------------------------------------------------------|----------------------------------|
| `minio_api_requests_rejected_auth_total` | `counter` | Total number of requests rejected for auth failure | `type,pool_index,server` |
| `minio_api_requests_rejected_header_total` | `counter` | Total number of requests rejected for invalid header | `type,pool_index,server` |
| `minio_api_requests_rejected_timestamp_total` | `counter` | Total number of requests rejected for invalid timestamp | `type,pool_index,server` |
| `minio_api_requests_rejected_invalid_total` | `counter` | Total number of invalid requests | `type,pool_index,server` |
| `minio_api_requests_waiting_total` | `gauge` | Total number of requests in the waiting queue | `type,pool_index,server` |
| `minio_api_requests_incoming_total` | `gauge` | Total number of incoming requests | `type,pool_index,server` |
| `minio_api_requests_inflight_total` | `gauge` | Total number of requests currently in flight | `name,type,pool_index,server` |
| `minio_api_requests_total` | `counter` | Total number of requests | `name,type,pool_index,server` |
| `minio_api_requests_errors_total` | `counter` | Total number of requests with (4xx and 5xx) errors | `name,type,pool_index,server` |
| `minio_api_requests_5xx_errors_total` | `counter` | Total number of requests with 5xx errors | `name,type,pool_index,server` |
| `minio_api_requests_4xx_errors_total` | `counter` | Total number of requests with 4xx errors | `name,type,pool_index,server` |
| `minio_api_requests_canceled_total` | `counter` | Total number of requests canceled by the client | `name,type,pool_index,server` |
| `minio_api_requests_ttfb_seconds_distribution` | `counter` | Distribution of time to first byte across API calls | `name,type,le,pool_index,server` |
| `minio_api_requests_traffic_sent_bytes` | `counter` | Total number of bytes sent | `type,pool_index,server` |
| `minio_api_requests_traffic_received_bytes` | `counter` | Total number of bytes received | `type,pool_index,server` |
### `/api/bucket`
| Name | Type | Help | Labels |
|----------------------------------------------|-----------|------------------------------------------------------------------|-----------------------------------------|
| `minio_api_bucket_traffic_received_bytes` | `counter` | Total number of bytes sent for a bucket | `bucket,type,server,pool_index` |
| `minio_api_bucket_traffic_sent_bytes` | `counter` | Total number of bytes received for a bucket | `bucket,type,server,pool_index` |
| `minio_api_bucket_inflight_total` | `gauge` | Total number of requests currently in flight for a bucket | `bucket,name,type,server,pool_index` |
| `minio_api_bucket_total` | `counter` | Total number of requests for a bucket | `bucket,name,type,server,pool_index` |
| `minio_api_bucket_canceled_total` | `counter` | Total number of requests canceled by the client for a bucket | `bucket,name,type,server,pool_index` |
| `minio_api_bucket_4xx_errors_total` | `counter` | Total number of requests with 4xx errors for a bucket | `bucket,name,type,server,pool_index` |
| `minio_api_bucket_5xx_errors_total` | `counter` | Total number of requests with 5xx errors for a bucket | `bucket,name,type,server,pool_index` |
| `minio_api_bucket_ttfb_seconds_distribution` | `counter` | Distribution of time to first byte across API calls for a bucket | `bucket,name,le,type,server,pool_index` |
### `/system/drive`
| Name | Type | Help | Labels |
|------------------------------------------------|-----------|-----------------------------------------------------------------------------------|-----------------------------------------------------|
| `minio_system_drive_used_bytes` | `gauge` | Total storage used on a drive in bytes | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_free_bytes` | `gauge` | Total storage free on a drive in bytes | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_total_bytes` | `gauge` | Total storage available on a drive in bytes | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_free_inodes` | `gauge` | Total free inodes on a drive | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_timeout_errors_total` | `counter` | Total timeout errors on a drive | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_availability_errors_total` | `counter` | Total availability errors (I/O errors, permission denied and timeouts) on a drive | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_waiting_io` | `gauge` | Total waiting I/O operations on a drive | `drive,set_index,drive_index,pool_index,server` |
| `minio_system_drive_api_latency_micros` | `gauge` | Average last minute latency in µs for drive API storage operations | `drive,api,set_index,drive_index,pool_index,server` |
| `minio_system_drive_offline_count` | `gauge` | Count of offline drives | `pool_index,server` |
| `minio_system_drive_online_count` | `gauge` | Count of online drives | `pool_index,server` |
| `minio_system_drive_count` | `gauge` | Count of all drives | `pool_index,server` |
### `/system/network/internode`
| Name | Type | Help | Labels |
|------------------------------------------------------|-----------|----------------------------------------------------------|---------------------|
| `minio_system_network_internode_errors_total` | `counter` | Total number of failed internode calls | `server,pool_index` |
| `minio_system_network_internode_dial_errors_total` | `counter` | Total number of internode TCP dial timeouts and errors | `server,pool_index` |
| `minio_system_network_internode_dial_avg_time_nanos` | `gauge` | Average dial time of internodes TCP calls in nanoseconds | `server,pool_index` |
| `minio_system_network_internode_sent_bytes_total` | `counter` | Total number of bytes sent to other peer nodes | `server,pool_index` |
| `minio_system_network_internode_recv_bytes_total` | `counter` | Total number of bytes received from other peer nodes | `server,pool_index` |
### `/cluster/health`
| Name | Type | Help | Labels |
|----------------------------------------------------|---------|------------------------------------------------|--------|
| `minio_cluster_health_drives_offline_count` | `gauge` | Count of offline drives in the cluster | |
| `minio_cluster_health_drives_online_count` | `gauge` | Count of online drives in the cluster | |
| `minio_cluster_health_drives_count` | `gauge` | Count of all drives in the cluster | |
| `minio_cluster_health_nodes_offline_count` | `gauge` | Count of offline nodes in the cluster | |
| `minio_cluster_health_nodes_online_count` | `gauge` | Count of online nodes in the cluster | |
| `minio_cluster_health_capacity_raw_total_bytes` | `gauge` | Total cluster raw storage capacity in bytes | |
| `minio_cluster_health_capacity_raw_free_bytes` | `gauge` | Total cluster raw storage free in bytes | |
| `minio_cluster_health_capacity_usable_total_bytes` | `gauge` | Total cluster usable storage capacity in bytes | |
| `minio_cluster_health_capacity_usable_free_bytes` | `gauge` | Total cluster usable storage free in bytes | |
### `/cluster/usage/objects`
| Name | Type | Help | Labels |
|----------------------------------------------------------|---------|----------------------------------------------------------------|---------|
| `minio_cluster_usage_objects_since_last_update_seconds` | `gauge` | Time since last update of usage metrics in seconds | |
| `minio_cluster_usage_objects_total_bytes` | `gauge` | Total cluster usage in bytes | |
| `minio_cluster_usage_objects_count` | `gauge` | Total cluster objects count | |
| `minio_cluster_usage_objects_versions_count` | `gauge` | Total cluster object versions (including delete markers) count | |
| `minio_cluster_usage_objects_delete_markers_count` | `gauge` | Total cluster delete markers count | |
| `minio_cluster_usage_objects_buckets_count` | `gauge` | Total cluster buckets count | |
| `minio_cluster_usage_objects_size_distribution` | `gauge` | Cluster object size distribution | `range` |
| `minio_cluster_usage_objects_version_count_distribution` | `gauge` | Cluster object version count distribution | `range` |
### `/cluster/usage/buckets`
| Name | Type | Help | Labels |
|-----------------------------------------------------------------|---------|------------------------------------------------------------------|----------------|
| `minio_cluster_usage_buckets_since_last_update_seconds` | `gauge` | Time since last update of usage metrics in seconds | |
| `minio_cluster_usage_buckets_total_bytes` | `gauge` | Total bucket size in bytes | `bucket` |
| `minio_cluster_usage_buckets_objects_count` | `gauge` | Total objects count in bucket | `bucket` |
| `minio_cluster_usage_buckets_versions_count` | `gauge` | Total object versions (including delete markers) count in bucket | `bucket` |
| `minio_cluster_usage_buckets_delete_markers_count` | `gauge` | Total delete markers count in bucket | `bucket` |
| `minio_cluster_usage_buckets_quota_total_bytes` | `gauge` | Total bucket quota in bytes | `bucket` |
| `minio_cluster_usage_buckets_object_size_distribution` | `gauge` | Bucket object size distribution | `range,bucket` |
| `minio_cluster_usage_buckets_object_version_count_distribution` | `gauge` | Bucket object version count distribution | `range,bucket` |
### `/cluster/erasure-set`
| Name | Type | Help | Labels |
|--------------------------------------------------|---------|---------------------------------------------------------------|------------------|
| `minio_cluster_erasure_set_overall_write_quorum` | `gauge` | Overall write quorum across pools and sets | |
| `minio_cluster_erasure_set_overall_health` | `gauge` | Overall health across pools and sets (1=healthy, 0=unhealthy) | |
| `minio_cluster_erasure_set_read_quorum` | `gauge` | Read quorum for the erasure set in a pool | `pool_id,set_id` |
| `minio_cluster_erasure_set_write_quorum` | `gauge` | Write quorum for the erasure set in a pool | `pool_id,set_id` |
| `minio_cluster_erasure_set_online_drives_count` | `gauge` | Count of online drives in the erasure set in a pool | `pool_id,set_id` |
| `minio_cluster_erasure_set_healing_drives_count` | `gauge` | Count of healing drives in the erasure set in a pool | `pool_id,set_id` |
| `minio_cluster_erasure_set_health` | `gauge` | Health of the erasure set in a pool (1=healthy, 0=unhealthy) | `pool_id,set_id` |