Updated Prometheus metrics (#11141)

* Add metrics for nodes online and offline
* Add cluster capacity metrics
* Introduce v2 metrics
This commit is contained in:
Ritesh H Shukla
2021-01-18 20:35:38 -08:00
committed by GitHub
parent 3bda8f755c
commit b4add82bb6
27 changed files with 1669 additions and 252 deletions

View File

@@ -69,3 +69,35 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
Disks: storageInfo.Disks,
}
}
func getLocalDisks(endpointServerPools EndpointServerPools) []madmin.Disk {
var localEndpoints Endpoints
network := make(map[string]string)
for _, ep := range endpointServerPools {
for _, endpoint := range ep.Endpoints {
nodeName := endpoint.Host
if nodeName == "" {
nodeName = "localhost"
}
if endpoint.IsLocal {
// Only proceed for local endpoints
network[nodeName] = "online"
localEndpoints = append(localEndpoints, endpoint)
continue
}
_, present := network[nodeName]
if !present {
if err := isServerResolvable(endpoint); err == nil {
network[nodeName] = "online"
} else {
network[nodeName] = "offline"
}
}
}
}
localDisks, _ := initStorageDisksWithErrors(localEndpoints)
defer closeStorageDisks(localDisks)
storageInfo, _ := getStorageInfo(localDisks, localEndpoints.GetAllStrings())
return storageInfo.Disks
}

View File

@@ -34,6 +34,14 @@ type CacheDiskStats struct {
Dir string
}
// GetUsageLevelString gets the string representation for the usage level.
func (c *CacheDiskStats) GetUsageLevelString() (u string) {
if atomic.LoadInt32(&c.UsageState) == 0 {
return "low"
}
return "high"
}
// CacheStats - represents bytes served from cache,
// cache hits and cache misses.
type CacheStats struct {

View File

@@ -1377,9 +1377,9 @@ func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, ver
}
// GetMetrics - no op
func (z *erasureServerPools) GetMetrics(ctx context.Context) (*Metrics, error) {
func (z *erasureServerPools) GetMetrics(ctx context.Context) (*BackendMetrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
return &BackendMetrics{}, NotImplemented{}
}
func (z *erasureServerPools) getZoneAndSet(id string) (int, int, error) {

View File

@@ -1554,9 +1554,9 @@ func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, opt
}
// GetMetrics - no op
func (fs *FSObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
func (fs *FSObjects) GetMetrics(ctx context.Context) (*BackendMetrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
return &BackendMetrics{}, NotImplemented{}
}
// ListObjectsV2 lists all blobs in bucket filtered by prefix

View File

@@ -389,7 +389,7 @@ func shouldMeterRequest(req *http.Request) bool {
// MetricsTransport is a custom wrapper around Transport to track metrics
type MetricsTransport struct {
Transport *http.Transport
Metrics *Metrics
Metrics *BackendMetrics
}
// RoundTrip implements the RoundTrip method for MetricsTransport

View File

@@ -29,36 +29,28 @@ type RequestStats struct {
Post uint64 `json:"Post"`
}
// Metrics - represents bytes served from backend
// only implemented for S3 Gateway
type Metrics struct {
bytesReceived uint64
bytesSent uint64
requestStats RequestStats
}
// IncBytesReceived - Increase total bytes received from gateway backend
func (s *Metrics) IncBytesReceived(n uint64) {
func (s *BackendMetrics) IncBytesReceived(n uint64) {
atomic.AddUint64(&s.bytesReceived, n)
}
// GetBytesReceived - Get total bytes received from gateway backend
func (s *Metrics) GetBytesReceived() uint64 {
func (s *BackendMetrics) GetBytesReceived() uint64 {
return atomic.LoadUint64(&s.bytesReceived)
}
// IncBytesSent - Increase total bytes sent to gateway backend
func (s *Metrics) IncBytesSent(n uint64) {
func (s *BackendMetrics) IncBytesSent(n uint64) {
atomic.AddUint64(&s.bytesSent, n)
}
// GetBytesSent - Get total bytes received from gateway backend
func (s *Metrics) GetBytesSent() uint64 {
func (s *BackendMetrics) GetBytesSent() uint64 {
return atomic.LoadUint64(&s.bytesSent)
}
// IncRequests - Increase request count sent to gateway backend by 1
func (s *Metrics) IncRequests(method string) {
func (s *BackendMetrics) IncRequests(method string) {
// Only increment for Head & Get requests, else no op
if method == http.MethodGet {
atomic.AddUint64(&s.requestStats.Get, 1)
@@ -72,11 +64,11 @@ func (s *Metrics) IncRequests(method string) {
}
// GetRequests - Get total number of Get & Headrequests sent to gateway backend
func (s *Metrics) GetRequests() RequestStats {
func (s *BackendMetrics) GetRequests() RequestStats {
return s.requestStats
}
// NewMetrics - Prepare new Metrics structure
func NewMetrics() *Metrics {
return &Metrics{}
// NewMetrics - Prepare new BackendMetrics structure
func NewMetrics() *BackendMetrics {
return &BackendMetrics{}
}

View File

@@ -202,9 +202,9 @@ func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, sr
}
// GetMetrics - no op
func (a GatewayUnsupported) GetMetrics(ctx context.Context) (*Metrics, error) {
func (a GatewayUnsupported) GetMetrics(ctx context.Context) (*BackendMetrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
return &BackendMetrics{}, NotImplemented{}
}
// PutObjectTags - not implemented.

View File

@@ -419,7 +419,7 @@ type azureObjects struct {
minio.GatewayUnsupported
endpoint *url.URL
httpClient *http.Client
metrics *minio.Metrics
metrics *minio.BackendMetrics
client azblob.ServiceURL // Azure sdk client
}
@@ -533,7 +533,7 @@ func parseAzurePart(metaPartFileName, prefix string) (partID int, err error) {
}
// GetMetrics returns this gateway's metrics
func (a *azureObjects) GetMetrics(ctx context.Context) (*minio.Metrics, error) {
func (a *azureObjects) GetMetrics(ctx context.Context) (*minio.BackendMetrics, error) {
return a.metrics, nil
}

View File

@@ -341,7 +341,7 @@ type gcsGateway struct {
minio.GatewayUnsupported
client *storage.Client
httpClient *http.Client
metrics *minio.Metrics
metrics *minio.BackendMetrics
projectID string
}
@@ -359,7 +359,7 @@ func gcsParseProjectID(credsFile string) (projectID string, err error) {
}
// GetMetrics returns this gateway's metrics
func (l *gcsGateway) GetMetrics(ctx context.Context) (*minio.Metrics, error) {
func (l *gcsGateway) GetMetrics(ctx context.Context) (*minio.BackendMetrics, error) {
return l.metrics, nil
}

View File

@@ -259,11 +259,11 @@ type s3Objects struct {
minio.GatewayUnsupported
Client *miniogo.Core
HTTPClient *http.Client
Metrics *minio.Metrics
Metrics *minio.BackendMetrics
}
// GetMetrics returns this gateway's metrics
func (l *s3Objects) GetMetrics(ctx context.Context) (*minio.Metrics, error) {
func (l *s3Objects) GetMetrics(ctx context.Context) (*minio.BackendMetrics, error) {
return l.Metrics, nil
}

View File

@@ -228,7 +228,9 @@ func guessIsMetricsReq(req *http.Request) bool {
}
aType := getRequestAuthType(req)
return (aType == authTypeAnonymous || aType == authTypeJWT) &&
req.URL.Path == minioReservedBucketPath+prometheusMetricsPath
req.URL.Path == minioReservedBucketPath+prometheusMetricsPathLegacy ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ClusterPath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2NodePath
}
// guessIsRPCReq - returns true if the request is for an RPC endpoint.

View File

@@ -79,10 +79,10 @@ func (s *ConnStats) getS3OutputBytes() uint64 {
// Return connection stats (total input/output bytes and total s3 input/output bytes)
func (s *ConnStats) toServerConnStats() ServerConnStats {
return ServerConnStats{
TotalInputBytes: s.getTotalInputBytes(),
TotalOutputBytes: s.getTotalOutputBytes(),
S3InputBytes: s.getS3InputBytes(),
S3OutputBytes: s.getS3OutputBytes(),
TotalInputBytes: s.getTotalInputBytes(), // Traffic including reserved bucket
TotalOutputBytes: s.getTotalOutputBytes(), // Traffic including reserved bucket
S3InputBytes: s.getS3InputBytes(), // Traffic for client buckets
S3OutputBytes: s.getS3OutputBytes(), // Traffic for client buckets
}
}
@@ -163,9 +163,11 @@ func (st *HTTPStats) toServerHTTPStats() ServerHTTPStats {
// Update statistics from http request and response data
func (st *HTTPStats) updateStats(api string, r *http.Request, w *logger.ResponseWriter) {
// A successful request has a 2xx response code
successReq := (w.StatusCode >= 200 && w.StatusCode < 300)
successReq := w.StatusCode >= 200 && w.StatusCode < 300
if !strings.HasSuffix(r.URL.Path, prometheusMetricsPath) {
if !strings.HasSuffix(r.URL.Path, prometheusMetricsPathLegacy) ||
!strings.HasSuffix(r.URL.Path, prometheusMetricsV2ClusterPath) ||
!strings.HasSuffix(r.URL.Path, prometheusMetricsV2NodePath) {
st.totalS3Requests.Inc(api)
if !successReq && w.StatusCode != 0 {
st.totalS3Errors.Inc(api)

View File

@@ -24,7 +24,9 @@ import (
)
const (
prometheusMetricsPath = "/prometheus/metrics"
prometheusMetricsPathLegacy = "/prometheus/metrics"
prometheusMetricsV2ClusterPath = "/v2/metrics/cluster"
prometheusMetricsV2NodePath = "/v2/metrics/node"
)
// Standard env prometheus auth type
@@ -43,14 +45,17 @@ const (
func registerMetricsRouter(router *mux.Router) {
// metrics router
metricsRouter := router.NewRoute().PathPrefix(minioReservedBucketPath).Subrouter()
authType := strings.ToLower(os.Getenv(EnvPrometheusAuthType))
switch prometheusAuthType(authType) {
case prometheusPublic:
metricsRouter.Handle(prometheusMetricsPath, metricsHandler())
metricsRouter.Handle(prometheusMetricsPathLegacy, metricsHandler())
metricsRouter.Handle(prometheusMetricsV2ClusterPath, metricsServerHandler())
metricsRouter.Handle(prometheusMetricsV2NodePath, metricsNodeHandler())
case prometheusJWT:
fallthrough
default:
metricsRouter.Handle(prometheusMetricsPath, AuthMiddleware(metricsHandler()))
metricsRouter.Handle(prometheusMetricsPathLegacy, AuthMiddleware(metricsHandler()))
metricsRouter.Handle(prometheusMetricsV2ClusterPath, AuthMiddleware(metricsServerHandler()))
metricsRouter.Handle(prometheusMetricsV2NodePath, AuthMiddleware(metricsNodeHandler()))
}
}

1187
cmd/metrics-v2.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -51,6 +51,17 @@ var (
)
)
const (
healMetricsNamespace = "self_heal"
gatewayNamespace = "gateway"
cacheNamespace = "cache"
s3Namespace = "s3"
bucketNamespace = "bucket"
minioNamespace = "minio"
diskNamespace = "disk"
interNodeNamespace = "internode"
)
func init() {
prometheus.MustRegister(httpRequestsDuration)
prometheus.MustRegister(newMinioCollector())
@@ -81,9 +92,10 @@ func (c *minioCollector) Describe(ch chan<- *prometheus.Desc) {
func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {
// Expose MinIO's version information
minioVersionInfo.WithLabelValues(Version, CommitID).Set(float64(1.0))
minioVersionInfo.WithLabelValues(Version, CommitID).Set(1.0)
storageMetricsPrometheus(ch)
nodeHealthMetricsPrometheus(ch)
bucketUsageMetricsPrometheus(ch)
networkMetricsPrometheus(ch)
httpMetricsPrometheus(ch)
@@ -92,6 +104,26 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {
healingMetricsPrometheus(ch)
}
func nodeHealthMetricsPrometheus(ch chan<- prometheus.Metric) {
nodesUp, nodesDown := GetPeerOnlineCount()
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "nodes", "online"),
"Total number of MinIO nodes online",
nil, nil),
prometheus.GaugeValue,
float64(nodesUp),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "nodes", "offline"),
"Total number of MinIO nodes offline",
nil, nil),
prometheus.GaugeValue,
float64(nodesDown),
)
}
// collects healing specific metrics for MinIO instance in Prometheus specific format
// and sends to given channel
func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
@@ -102,7 +134,6 @@ func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
if !exists {
return
}
healMetricsNamespace := "self_heal"
var dur time.Duration
if !bgSeq.lastHealActivity.IsZero() {
@@ -172,7 +203,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_received"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "bytes_received"),
"Total number of bytes received by current MinIO Gateway "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
@@ -180,7 +211,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_sent"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "bytes_sent"),
"Total number of bytes sent by current MinIO Gateway to "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
@@ -189,7 +220,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
s := m.GetRequests()
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
@@ -198,7 +229,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
@@ -207,7 +238,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
@@ -216,7 +247,7 @@ func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
prometheus.BuildFQName(gatewayNamespace, globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
@@ -236,7 +267,7 @@ func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "hits", "total"),
prometheus.BuildFQName(cacheNamespace, "hits", "total"),
"Total number of disk cache hits in current MinIO instance",
nil, nil),
prometheus.CounterValue,
@@ -244,7 +275,7 @@ func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "misses", "total"),
prometheus.BuildFQName(cacheNamespace, "misses", "total"),
"Total number of disk cache misses in current MinIO instance",
nil, nil),
prometheus.CounterValue,
@@ -252,7 +283,7 @@ func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "data", "served"),
prometheus.BuildFQName(cacheNamespace, "data", "served"),
"Total number of bytes served from cache of current MinIO instance",
nil, nil),
prometheus.CounterValue,
@@ -262,7 +293,7 @@ func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
// Cache disk usage percentage
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "usage", "percent"),
prometheus.BuildFQName(cacheNamespace, "usage", "percent"),
"Total percentage cache usage",
[]string{"disk"}, nil),
prometheus.GaugeValue,
@@ -271,7 +302,7 @@ func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "usage", "high"),
prometheus.BuildFQName(cacheNamespace, "usage", "high"),
"Indicates cache usage is high or low, relative to current cache 'quota' settings",
[]string{"disk"}, nil),
prometheus.GaugeValue,
@@ -309,7 +340,7 @@ func httpMetricsPrometheus(ch chan<- prometheus.Metric) {
for api, value := range httpStats.CurrentS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "current"),
prometheus.BuildFQName(s3Namespace, "requests", "current"),
"Total number of running s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
@@ -321,7 +352,7 @@ func httpMetricsPrometheus(ch chan<- prometheus.Metric) {
for api, value := range httpStats.TotalS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "total"),
prometheus.BuildFQName(s3Namespace, "requests", "total"),
"Total number of s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
@@ -333,7 +364,7 @@ func httpMetricsPrometheus(ch chan<- prometheus.Metric) {
for api, value := range httpStats.TotalS3Errors.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "errors", "total"),
prometheus.BuildFQName(s3Namespace, "errors", "total"),
"Total number of s3 errors in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
@@ -351,7 +382,7 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
// Network Sent/Received Bytes (internode)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "tx", "bytes_total"),
prometheus.BuildFQName(interNodeNamespace, "tx", "bytes_total"),
"Total number of bytes sent to the other peer nodes by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
@@ -360,7 +391,7 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "rx", "bytes_total"),
prometheus.BuildFQName(interNodeNamespace, "rx", "bytes_total"),
"Total number of internode bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
@@ -370,7 +401,7 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
// Network Sent/Received Bytes (Outbound)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "tx", "bytes_total"),
prometheus.BuildFQName(s3Namespace, "tx", "bytes_total"),
"Total number of s3 bytes sent by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
@@ -379,7 +410,7 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "rx", "bytes_total"),
prometheus.BuildFQName(s3Namespace, "rx", "bytes_total"),
"Total number of s3 bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
@@ -414,7 +445,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
// Total space used by bucket
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "usage", "size"),
prometheus.BuildFQName(bucketNamespace, "usage", "size"),
"Total bucket size",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
@@ -423,7 +454,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "objects", "count"),
prometheus.BuildFQName(bucketNamespace, "objects", "count"),
"Total number of objects in a bucket",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
@@ -469,7 +500,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
for k, v := range usageInfo.ObjectSizesHistogram {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "objects", "histogram"),
prometheus.BuildFQName(bucketNamespace, "objects", "histogram"),
"Total number of objects of different sizes in a bucket",
[]string{"bucket", "object_size"}, nil),
prometheus.GaugeValue,
@@ -497,10 +528,50 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
onlineDisks, offlineDisks := getOnlineOfflineDisksStats(server.Disks)
totalDisks := offlineDisks.Merge(onlineDisks)
// Report total capacity
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "capacity_raw", "total"),
"Total capacity online in the cluster",
nil, nil),
prometheus.GaugeValue,
float64(GetTotalCapacity(GlobalContext)),
)
// Report total capacity free
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "capacity_raw_free", "total"),
"Total free capacity online in the cluster",
nil, nil),
prometheus.GaugeValue,
float64(GetTotalCapacityFree(GlobalContext)),
)
s, _ := objLayer.StorageInfo(GlobalContext)
// Report total usable capacity
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "capacity_usable", "total"),
"Total usable capacity online in the cluster",
nil, nil),
prometheus.GaugeValue,
GetTotalUsableCapacity(GlobalContext, s),
)
// Report total usable capacity free
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(minioNamespace, "capacity_usable_free", "total"),
"Total free usable capacity online in the cluster",
nil, nil),
prometheus.GaugeValue,
GetTotalUsableCapacityFree(GlobalContext, s),
)
// MinIO Offline Disks per node
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "disks", "offline"),
prometheus.BuildFQName(minioNamespace, "disks", "offline"),
"Total number of offline disks in current MinIO server instance",
nil, nil),
prometheus.GaugeValue,
@@ -510,7 +581,7 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
// MinIO Total Disks per node
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("minio", "disks", "total"),
prometheus.BuildFQName(minioNamespace, "disks", "total"),
"Total number of disks for current MinIO server instance",
nil, nil),
prometheus.GaugeValue,
@@ -521,7 +592,7 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
// Total disk usage by the disk
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("disk", "storage", "used"),
prometheus.BuildFQName(diskNamespace, "storage", "used"),
"Total disk storage used on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,
@@ -532,7 +603,7 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
// Total available space in the disk
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("disk", "storage", "available"),
prometheus.BuildFQName(diskNamespace, "storage", "available"),
"Total available space left on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,
@@ -543,7 +614,7 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
// Total storage space of the disk
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("disk", "storage", "total"),
prometheus.BuildFQName(diskNamespace, "storage", "total"),
"Total space on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,

View File

@@ -0,0 +1,54 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cmd
import (
"context"
)
// GetTotalCapacity gets the total capacity in the cluster.
func GetTotalCapacity(ctx context.Context) (capacity uint64) {
d := globalNotificationSys.DiskHwInfo(ctx)
for _, s := range d {
capacity += s.GetTotalCapacity()
}
return
}
// GetTotalUsableCapacity gets the total usable capacity in the cluster.
func GetTotalUsableCapacity(ctx context.Context, s StorageInfo) (capacity float64) {
raw := GetTotalCapacity(ctx)
ratio := float64(s.Backend.StandardSCData) / float64(s.Backend.StandardSCData+s.Backend.StandardSCParity)
return float64(raw) * ratio
}
// GetTotalCapacityFree gets the total capacity free in the cluster.
func GetTotalCapacityFree(ctx context.Context) (capacity uint64) {
d := globalNotificationSys.DiskHwInfo(ctx)
for _, s := range d {
capacity += s.GetTotalFreeCapacity()
}
return
}
// GetTotalUsableCapacityFree gets the total usable capacity free in the cluster.
func GetTotalUsableCapacityFree(ctx context.Context, s StorageInfo) (capacity float64) {
raw := GetTotalCapacityFree(ctx)
ratio := float64(s.Backend.StandardSCData) / float64(s.Backend.StandardSCData+s.Backend.StandardSCParity)
return float64(raw) * ratio
}

View File

@@ -51,8 +51,8 @@ type NotificationSys struct {
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerClients []*peerRESTClient
allPeerClients []*peerRESTClient
peerClients []*peerRESTClient // Excludes self
allPeerClients []*peerRESTClient // Includes nil client for self
}
// GetARNList - returns available ARNs.
@@ -1294,6 +1294,21 @@ func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
}
}
// GetPeerOnlineCount gets the count of online and offline nodes.
func GetPeerOnlineCount() (nodesOnline, nodesOffline int) {
nodesOnline = 1 // Self is always online.
nodesOffline = 0
servers := globalNotificationSys.ServerInfo()
for _, s := range servers {
if s.State == "ok" {
nodesOnline++
continue
}
nodesOffline++
}
return
}
type eventArgs struct {
EventName event.Name
BucketName string
@@ -1428,3 +1443,52 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...
}
return consolidatedReport
}
// GetClusterMetrics - gets the cluster metrics from all nodes excluding self.
func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric {
g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients))
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
index := index
g.Go(func() error {
var err error
peerChannels[index], err = sys.peerClients[index].GetPeerMetrics(ctx)
return err
}, index)
}
ch := make(chan Metric)
var wg sync.WaitGroup
for index, err := range g.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
if err != nil {
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
continue
}
wg.Add(1)
go func(ctx context.Context, peerChannel <-chan Metric, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case m, ok := <-peerChannel:
if !ok {
return
}
ch <- m
case <-ctx.Done():
return
}
}
}(ctx, peerChannels[index], &wg)
}
go func(wg *sync.WaitGroup, ch chan Metric) {
wg.Wait()
close(ch)
}(&wg, ch)
return ch
}

View File

@@ -72,6 +72,13 @@ const (
writeLock
)
// BackendMetrics - represents bytes served from backend
type BackendMetrics struct {
bytesReceived uint64
bytesSent uint64
requestStats RequestStats
}
// ObjectLayer implements primitives for object API layer.
type ObjectLayer interface {
SetDriveCount() int // Only implemented by erasure layer
@@ -143,7 +150,7 @@ type ObjectLayer interface {
IsCompressionSupported() bool
// Backend related metrics
GetMetrics(ctx context.Context) (*Metrics, error)
GetMetrics(ctx context.Context) (*BackendMetrics, error)
// Returns health of the backend
Health(ctx context.Context, opts HealthOptions) HealthResult

View File

@@ -749,7 +749,7 @@ func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan
dec := gob.NewDecoder(respBody)
for {
var ev event.Event
if err = dec.Decode(&ev); err != nil {
if err := dec.Decode(&ev); err != nil {
return
}
if len(ev.EventVersion) > 0 {
@@ -906,3 +906,24 @@ func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []st
err = dec.Decode(&bandwidthReport)
return &bandwidthReport, err
}
func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric, error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodGetPeerMetrics, nil, nil, -1)
if err != nil {
return nil, err
}
dec := gob.NewDecoder(respBody)
ch := make(chan Metric)
go func(ch chan<- Metric) {
for {
var metric Metric
if err := dec.Decode(&metric); err != nil {
http.DrainBody(respBody)
close(ch)
return
}
ch <- metric
}
}(ch)
return ch, nil
}

View File

@@ -58,6 +58,7 @@ const (
peerRESTMethodGetBandwidth = "/bandwidth"
peerRESTMethodGetMetacacheListing = "/getmetacache"
peerRESTMethodUpdateMetacacheListing = "/updatemetacache"
peerRESTMethodGetPeerMetrics = "/peermetrics"
)
const (

View File

@@ -801,7 +801,7 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
// ListenHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
@@ -809,7 +809,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
var prefix string
if len(values[peerRESTListenPrefix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
@@ -824,7 +824,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
var suffix string
if len(values[peerRESTListenSuffix]) > 1 {
s.writeErrorResponse(w, errors.New("Invalid request"))
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
@@ -1004,7 +1004,7 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
// GetBandwidth gets the bandwidth for the buckets requested.
func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
bucketsString := r.URL.Query().Get("buckets")
@@ -1025,6 +1025,29 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush()
}
// GetPeerMetrics gets the metrics to be federated across peers.
func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
}
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
enc := gob.NewEncoder(w)
ch := ReportMetrics(r.Context(), GetGeneratorsForPeer)
for m := range ch {
if err := enc.Encode(m); err != nil {
s.writeErrorResponse(w, errors.New("Encoding metric failed: "+err.Error()))
return
}
}
w.(http.Flusher).Flush()
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
@@ -1064,4 +1087,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBandwidth).HandlerFunc(httpTraceHdrs(server.GetBandwidth))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetMetacacheListing).HandlerFunc(httpTraceHdrs(server.GetMetacacheListingHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodUpdateMetacacheListing).HandlerFunc(httpTraceHdrs(server.UpdateMetacacheListingHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics))
}