Add support for self-healing related metrics in Prometheus (#9079)

Fixes #8988

Co-authored-by: Anis Elleuch <vadmeste@users.noreply.github.com>
Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Nitish Tiwari 2020-03-25 11:10:45 +05:30 committed by GitHub
parent 813e0fc1a8
commit 6b984410d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 379 additions and 155 deletions

View File

@ -318,9 +318,7 @@ type healSequence struct {
// List of entities (format, buckets, objects) to heal // List of entities (format, buckets, objects) to heal
sourceCh chan string sourceCh chan string
// Report healing progress, false if this is a background // Report healing progress
// healing since currently there is no entity which will
// receive realtime healing status
reportProgress bool reportProgress bool
// time at which heal sequence was started // time at which heal sequence was started
@ -352,14 +350,23 @@ type healSequence struct {
// the last result index sent to client // the last result index sent to client
lastSentResultIndex int64 lastSentResultIndex int64
// Number of total items scanned // Number of total items scanned against item type
scannedItemsCount int64 scannedItemsMap map[madmin.HealItemType]int64
// Number of total items healed against item type
healedItemsMap map[madmin.HealItemType]int64
// Number of total items where healing failed against endpoint and drive state
healFailedItemsMap map[string]int64
// The time of the last scan/heal activity // The time of the last scan/heal activity
lastHealActivity time.Time lastHealActivity time.Time
// Holds the request-info for logging // Holds the request-info for logging
ctx context.Context ctx context.Context
// used to lock this structure as it is concurrently accessed
mutex sync.RWMutex
} }
// NewHealSequence - creates healSettings, assumes bucket and // NewHealSequence - creates healSettings, assumes bucket and
@ -390,9 +397,83 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
traverseAndHealDoneCh: make(chan error), traverseAndHealDoneCh: make(chan error),
stopSignalCh: make(chan struct{}), stopSignalCh: make(chan struct{}),
ctx: ctx, ctx: ctx,
scannedItemsMap: make(map[madmin.HealItemType]int64),
healedItemsMap: make(map[madmin.HealItemType]int64),
healFailedItemsMap: make(map[string]int64),
} }
} }
// resetHealStatusCounters - reset the healSequence status counters between
// each monthly background heal scanning activity.
// This is used only in case of Background healing scenario, where
// we use a single long running healSequence which reactively heals
// objects passed to the SourceCh.
func (h *healSequence) resetHealStatusCounters() {
h.mutex.Lock()
defer h.mutex.Unlock()
h.currentStatus.Items = []madmin.HealResultItem{}
h.lastSentResultIndex = 0
h.scannedItemsMap = make(map[madmin.HealItemType]int64)
h.healedItemsMap = make(map[madmin.HealItemType]int64)
h.healFailedItemsMap = make(map[string]int64)
}
// getScannedItemsCount - returns a count of all scanned items
func (h *healSequence) getScannedItemsCount() int64 {
var count int64
h.mutex.RLock()
defer h.mutex.RUnlock()
for _, v := range h.scannedItemsMap {
count = count + v
}
return count
}
// getScannedItemsMap - returns map of all scanned items against type
func (h *healSequence) getScannedItemsMap() map[madmin.HealItemType]int64 {
h.mutex.RLock()
defer h.mutex.RUnlock()
// Make a copy before returning the value
retMap := make(map[madmin.HealItemType]int64, len(h.scannedItemsMap))
for k, v := range h.scannedItemsMap {
retMap[k] = v
}
return retMap
}
// getHealedItemsMap - returns the map of all healed items against type
func (h *healSequence) getHealedItemsMap() map[madmin.HealItemType]int64 {
h.mutex.RLock()
defer h.mutex.RUnlock()
// Make a copy before returning the value
retMap := make(map[madmin.HealItemType]int64, len(h.healedItemsMap))
for k, v := range h.healedItemsMap {
retMap[k] = v
}
return retMap
}
// gethealFailedItemsMap - returns map of all items where heal failed against
// drive endpoint and status
func (h *healSequence) gethealFailedItemsMap() map[string]int64 {
h.mutex.RLock()
defer h.mutex.RUnlock()
// Make a copy before returning the value
retMap := make(map[string]int64, len(h.healFailedItemsMap))
for k, v := range h.healFailedItemsMap {
retMap[k] = v
}
return retMap
}
// isQuitting - determines if the heal sequence is quitting (due to an // isQuitting - determines if the heal sequence is quitting (due to an
// external signal) // external signal)
func (h *healSequence) isQuitting() bool { func (h *healSequence) isQuitting() bool {
@ -556,6 +637,22 @@ func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType)
// Wait for answer and push result to the client // Wait for answer and push result to the client
res := <-respCh res := <-respCh
if !h.reportProgress { if !h.reportProgress {
h.mutex.Lock()
defer h.mutex.Unlock()
// Progress is not reported in case of background heal processing.
// Instead we increment relevant counter based on the heal result
// for prometheus reporting.
if res.err != nil && !isErrObjectNotFound(res.err) {
for _, d := range res.result.After.Drives {
// For failed items we report the endpoint and drive state
// This will help users take corrective actions for drives
h.healFailedItemsMap[d.Endpoint+","+d.State]++
}
} else {
// Only object type reported for successful healing
h.healedItemsMap[res.result.Type]++
}
return nil return nil
} }
res.result.Type = healType res.result.Type = healType
@ -599,7 +696,7 @@ func (h *healSequence) healItemsFromSourceCh() error {
logger.LogIf(h.ctx, err) logger.LogIf(h.ctx, err)
} }
h.scannedItemsCount++ h.scannedItemsMap[itemType]++
h.lastHealActivity = UTCNow() h.lastHealActivity = UTCNow()
case <-h.traverseAndHealDoneCh: case <-h.traverseAndHealDoneCh:
return nil return nil

View File

@ -61,6 +61,9 @@ func newBgHealSequence(numDisks int) *healSequence {
stopSignalCh: make(chan struct{}), stopSignalCh: make(chan struct{}),
ctx: ctx, ctx: ctx,
reportProgress: false, reportProgress: false,
scannedItemsMap: make(map[madmin.HealItemType]int64),
healedItemsMap: make(map[madmin.HealItemType]int64),
healFailedItemsMap: make(map[string]int64),
} }
} }
@ -71,7 +74,7 @@ func getLocalBackgroundHealStatus() madmin.BgHealState {
} }
return madmin.BgHealState{ return madmin.BgHealState{
ScannedItemsCount: bgSeq.scannedItemsCount, ScannedItemsCount: bgSeq.getScannedItemsCount(),
LastHealActivity: bgSeq.lastHealActivity, LastHealActivity: bgSeq.lastHealActivity,
NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)), NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)),
} }
@ -126,12 +129,24 @@ func durationToNextHealRound(lastHeal time.Time) time.Duration {
// Healing leader will take the charge of healing all erasure sets // Healing leader will take the charge of healing all erasure sets
func execLeaderTasks(ctx context.Context, z *xlZones) { func execLeaderTasks(ctx context.Context, z *xlZones) {
lastScanTime := UTCNow() // So that we don't heal immediately, but after one month. // So that we don't heal immediately, but after one month.
lastScanTime := UTCNow()
// Get background heal sequence to send elements to heal
var bgSeq *healSequence
var ok bool
for {
bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if ok {
break
}
time.Sleep(time.Second)
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C: case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C:
bgSeq.resetHealStatusCounters()
for _, zone := range z.zones { for _, zone := range z.zones {
// Heal set by set // Heal set by set
for i, set := range zone.sets { for i, set := range zone.sets {

View File

@ -19,6 +19,8 @@ package cmd
import ( import (
"context" "context"
"net/http" "net/http"
"strings"
"time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -81,13 +83,259 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {
// Expose MinIO's version information // Expose MinIO's version information
minioVersionInfo.WithLabelValues(Version, CommitID).Set(float64(1.0)) minioVersionInfo.WithLabelValues(Version, CommitID).Set(float64(1.0))
// Fetch disk space info storageMetricsPrometheus(ch)
objLayer := newObjectLayerFn() networkMetricsPrometheus(ch)
httpMetricsPrometheus(ch)
gatewayMetricsPrometheus(ch)
healingMetricsPrometheus(ch)
}
// collects healing specific metrics for MinIO instance in Prometheus specific format
// and sends to given channel
func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
if !globalIsXL {
return
}
bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if !exists {
return
}
healMetricsNamespace := "self_heal"
dur := time.Duration(-1)
if !bgSeq.lastHealActivity.IsZero() {
dur = time.Since(bgSeq.lastHealActivity)
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(healMetricsNamespace, "time", "since_last_activity"),
"Time elapsed (in nano seconds) since last self healing activity. This is set to -1 until initial self heal activity",
nil, nil),
prometheus.GaugeValue,
float64(dur),
)
for k, v := range bgSeq.getScannedItemsMap() {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(healMetricsNamespace, "objects", "scanned"),
"Objects scanned in current self healing run",
[]string{"type"}, nil),
prometheus.GaugeValue,
float64(v), string(k),
)
}
for k, v := range bgSeq.getHealedItemsMap() {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(healMetricsNamespace, "objects", "healed"),
"Objects healed in current self healing run",
[]string{"type"}, nil),
prometheus.GaugeValue,
float64(v), string(k),
)
}
for k, v := range bgSeq.gethealFailedItemsMap() {
// healFailedItemsMap stores the endpoint and volume state separated by comma,
// split the fields and pass to channel at correct index
s := strings.Split(k, ",")
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(healMetricsNamespace, "objects", "heal_failed"),
"Objects for which healing failed in current self healing run",
[]string{"mount_path", "volume_status"}, nil),
prometheus.GaugeValue,
float64(v), string(s[0]), string(s[1]),
)
}
}
// collects gateway specific metrics for MinIO instance in Prometheus specific format
// and sends to given channel
func gatewayMetricsPrometheus(ch chan<- prometheus.Metric) {
if !globalIsGateway || (globalGatewayName != "s3" && globalGatewayName != "azure" && globalGatewayName != "gcs") {
return
}
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet // Service not initialized yet
if objLayer == nil { if objLayer == nil {
return return
} }
m, err := objLayer.GetMetrics(context.Background())
if err != nil {
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_received"),
"Total number of bytes received by current MinIO Gateway "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
float64(m.GetBytesReceived()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_sent"),
"Total number of bytes sent by current MinIO Gateway to "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
float64(m.GetBytesSent()),
)
s := m.GetRequests()
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
float64(s.Get.Load()),
http.MethodGet,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
float64(s.Head.Load()),
http.MethodHead,
)
}
// collects cache metrics for MinIO server in Prometheus specific format
// and sends to given channel
func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
cacheObjLayer := newCachedObjectLayerFn()
// Service not initialized yet
if cacheObjLayer == nil {
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "hits", "total"),
"Total number of disk cache hits in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getHits()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "misses", "total"),
"Total number of disk cache misses in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getMisses()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "data", "served"),
"Total number of bytes served from cache of current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getBytesServed()),
)
}
// collects http metrics for MinIO server in Prometheus specific format
// and sends to given channel
func httpMetricsPrometheus(ch chan<- prometheus.Metric) {
httpStats := globalHTTPStats.toServerHTTPStats()
for api, value := range httpStats.CurrentS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "current"),
"Total number of running s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStats.TotalS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "total"),
"Total number of s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStats.TotalS3Errors.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "errors", "total"),
"Total number of s3 errors in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
}
// collects network metrics for MinIO server in Prometheus specific format
// and sends to given channel
func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
connStats := globalConnStats.toServerConnStats()
// Network Sent/Received Bytes (internode)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "tx", "bytes_total"),
"Total number of bytes sent to the other peer nodes by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.TotalOutputBytes),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "rx", "bytes_total"),
"Total number of internode bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.TotalInputBytes),
)
// Network Sent/Received Bytes (Outbound)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "tx", "bytes_total"),
"Total number of s3 bytes sent by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.S3OutputBytes),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "rx", "bytes_total"),
"Total number of s3 bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.S3InputBytes),
)
}
// collects storage metrics for MinIO server in Prometheus specific format
// and sends to given channel
func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet
if objLayer == nil {
return
}
// Fetch disk space info
storageInfo := objLayer.StorageInfo(context.Background(), true) storageInfo := objLayer.StorageInfo(context.Background(), true)
offlineDisks := storageInfo.Backend.OfflineDisks offlineDisks := storageInfo.Backend.OfflineDisks
@ -151,151 +399,6 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {
mountPath, mountPath,
) )
} }
connStats := globalConnStats.toServerConnStats()
// Network Sent/Received Bytes (internode)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "tx", "bytes_total"),
"Total number of bytes sent to the other peer nodes by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.TotalOutputBytes),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("internode", "rx", "bytes_total"),
"Total number of internode bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.TotalInputBytes),
)
// Network Sent/Received Bytes (Outbound)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "tx", "bytes_total"),
"Total number of s3 bytes sent by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.S3OutputBytes),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "rx", "bytes_total"),
"Total number of s3 bytes received by current MinIO server instance",
nil, nil),
prometheus.CounterValue,
float64(connStats.S3InputBytes),
)
httpStats := globalHTTPStats.toServerHTTPStats()
for api, value := range httpStats.CurrentS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "current"),
"Total number of running s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStats.TotalS3Requests.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "requests", "total"),
"Total number of s3 requests in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
for api, value := range httpStats.TotalS3Errors.APIStats {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("s3", "errors", "total"),
"Total number of s3 errors in current MinIO server instance",
[]string{"api"}, nil),
prometheus.CounterValue,
float64(value),
api,
)
}
// Cache related metrics
if globalCacheConfig.Enabled {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "hits", "total"),
"Total number of disk cache hits in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(newCachedObjectLayerFn().CacheStats().getHits()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "misses", "total"),
"Total number of disk cache misses in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(newCachedObjectLayerFn().CacheStats().getMisses()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "data", "served"),
"Total number of bytes served from cache of current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(newCachedObjectLayerFn().CacheStats().getBytesServed()),
)
}
if globalIsGateway && (globalGatewayName == "s3" || globalGatewayName == "azure" || globalGatewayName == "gcs") {
m, _ := globalObjectAPI.GetMetrics(context.Background())
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_received"),
"Total number of bytes received by current MinIO Gateway "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
float64(m.GetBytesReceived()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "bytes_sent"),
"Total number of bytes sent by current MinIO Gateway to "+globalGatewayName+" backend",
nil, nil),
prometheus.CounterValue,
float64(m.GetBytesSent()),
)
s := m.GetRequests()
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
float64(s.Get.Load()),
http.MethodGet,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("gateway", globalGatewayName, "requests"),
"Total number of requests made to "+globalGatewayName+" by current MinIO Gateway",
[]string{"method"}, nil),
prometheus.CounterValue,
float64(s.Head.Load()),
http.MethodHead,
)
}
} }
func metricsHandler() http.Handler { func metricsHandler() http.Handler {

View File

@ -157,6 +157,15 @@ MinIO Gateway instance exposes metrics related to Gateway communication with the
Note that this is currently only support for Azure, S3 and GCS Gateway. Note that this is currently only support for Azure, S3 and GCS Gateway.
### MinIO self-healing metrics - `self_heal_*`
MinIO exposes self-healing related metrics for erasure-code deployments _only_. These metrics are _not_ available on Gateway or Single Node, Single Drive deployments. Note that these metrics will be exposed _only_ when there is a relevant event happening on MinIO server.
- `self_heal_time_since_last_activity`: Time elapsed since last self-healing related activity.
- `self_heal_objects_scanned`: Number of objects scanned by self-healing thread in its current run. This will reset when a fresh self-healing run starts. This is labeled with the object type scanned.
- `self_heal_objects_healed`: Number of objects healing by self-healing thread in its current run. This will reset when a fresh self-healing run starts. This is labeled with the object type scanned.
- `self_heal_objects_heal_failed`: Number of objects for which self-healing failed in its current run. This will reset when a fresh self-healing run starts. This is labeled with disk status and its endpoint.
## Migration guide for the new set of metrics ## Migration guide for the new set of metrics
This migration guide applies for older releases or any releases before `RELEASE.2019-10-23*` This migration guide applies for older releases or any releases before `RELEASE.2019-10-23*`