minio/cmd/site-replication-metrics.go
Poorna b48bbe08b2
Add additional info for replication metrics API (#17293)
to track the replication transfer rate across different nodes,
number of active workers in use and in-queue stats to get
an idea of the current workload.

This PR also adds replication metrics to the site replication
status API. For site replication, prometheus metrics are
no longer at the bucket level - but at the cluster level.

Add prometheus metric to track credential errors since uptime
2023-08-30 01:00:59 -07:00

290 lines
8.2 KiB
Go

// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio-go/v7"
)
//go:generate msgp -file $GOFILE
// RStat has replication error stats
type RStat struct {
Count int64 `json:"count"`
Bytes int64 `json:"bytes"`
}
// RTimedMetrics has replication error stats for various time windows
type RTimedMetrics struct {
LastHour ReplicationLastHour `json:"lastHour"`
SinceUptime RStat `json:"sinceUptime"`
LastMinute ReplicationLastMinute
// Error counts
ErrCounts map[string]int `json:"errCounts"` // Count of credential errors
}
func (rt *RTimedMetrics) String() string {
s := rt.toMetric()
return fmt.Sprintf("Errors in LastMinute: %v, LastHour: %v, SinceUptime: %v", s.LastMinute.Count, s.LastHour.Count, s.Totals.Count)
}
func (rt *RTimedMetrics) toMetric() madmin.TimedErrStats {
if rt == nil {
return madmin.TimedErrStats{}
}
errCounts := make(map[string]int)
for k, v := range rt.ErrCounts {
errCounts[k] = v
}
minuteTotals := rt.LastMinute.getTotal()
hourTotals := rt.LastHour.getTotal()
return madmin.TimedErrStats{
LastMinute: madmin.RStat{
Count: float64(minuteTotals.N),
Bytes: minuteTotals.Size,
},
LastHour: madmin.RStat{
Count: float64(hourTotals.N),
Bytes: hourTotals.Size,
},
Totals: madmin.RStat{
Count: float64(rt.SinceUptime.Count),
Bytes: rt.SinceUptime.Bytes,
},
ErrCounts: errCounts,
}
}
func (rt *RTimedMetrics) addsize(size int64, err error) {
// failures seen since uptime
atomic.AddInt64(&rt.SinceUptime.Bytes, size)
atomic.AddInt64(&rt.SinceUptime.Count, 1)
rt.LastMinute.addsize(size)
rt.LastHour.addsize(size)
if err != nil && minio.ToErrorResponse(err).Code == "AccessDenied" {
if rt.ErrCounts == nil {
rt.ErrCounts = make(map[string]int)
}
rt.ErrCounts["AccessDenied"]++
}
}
func (rt *RTimedMetrics) merge(o RTimedMetrics) (n RTimedMetrics) {
n.SinceUptime.Bytes = atomic.LoadInt64(&rt.SinceUptime.Bytes) + atomic.LoadInt64(&o.SinceUptime.Bytes)
n.SinceUptime.Count = atomic.LoadInt64(&rt.SinceUptime.Count) + atomic.LoadInt64(&o.SinceUptime.Count)
n.LastMinute = n.LastMinute.merge(rt.LastMinute)
n.LastMinute = n.LastMinute.merge(o.LastMinute)
n.LastHour = n.LastHour.merge(rt.LastHour)
n.LastHour = n.LastHour.merge(o.LastHour)
n.ErrCounts = make(map[string]int)
for k, v := range rt.ErrCounts {
n.ErrCounts[k] = v
}
for k, v := range o.ErrCounts {
n.ErrCounts[k] += v
}
return n
}
// SRStats has replication stats at site level
type SRStats struct {
// Total Replica size in bytes
ReplicaSize int64 `json:"replicaSize"`
// Total Replica received
ReplicaCount int64 `json:"replicaCount"`
M map[string]*SRStatus `json:"srStatusMap"`
movingAvgTicker *time.Ticker // Ticker for calculating moving averages
lock sync.RWMutex // mutex for srStats
}
// SRStatus has replication stats at deployment level
type SRStatus struct {
ReplicatedSize int64 `json:"completedReplicationSize"`
// Total number of failed operations including metadata updates in the last minute
Failed RTimedMetrics `json:"failedReplication"`
// Total number of completed operations
ReplicatedCount int64 `json:"replicationCount"`
// Replication latency information
Latency ReplicationLatency `json:"replicationLatency"`
// transfer rate for large uploads
XferRateLrg *XferStats `json:"largeTransferRate" msg:"lt"`
// transfer rate for small uploads
XferRateSml *XferStats `json:"smallTransferRate" msg:"st"`
// Endpoint is the replication target endpoint
Endpoint string `json:"-"`
// Secure is true if the replication target endpoint is secure
Secure bool `json:"-"`
}
func (sr *SRStats) update(st replStat, dID string) {
sr.lock.Lock()
defer sr.lock.Unlock()
srs, ok := sr.M[dID]
if !ok {
srs = &SRStatus{
XferRateLrg: newXferStats(),
XferRateSml: newXferStats(),
}
}
srs.Endpoint = st.Endpoint
srs.Secure = st.Secure
switch {
case st.Completed:
srs.ReplicatedSize += st.TransferSize
srs.ReplicatedCount++
if st.TransferDuration > 0 {
srs.Latency.update(st.TransferSize, st.TransferDuration)
srs.updateXferRate(st.TransferSize, st.TransferDuration)
}
case st.Failed:
srs.Failed.addsize(st.TransferSize, st.Err)
case st.Pending:
}
sr.M[dID] = srs
}
func (sr *SRStats) get() map[string]SRMetric {
epMap := globalBucketTargetSys.healthStats()
sr.lock.RLock()
defer sr.lock.RUnlock()
m := make(map[string]SRMetric, len(sr.M))
for dID, v := range sr.M {
t := newXferStats()
mx := make(map[RMetricName]XferStats)
if v.XferRateLrg != nil {
mx[Large] = *v.XferRateLrg.Clone()
m := t.merge(*v.XferRateLrg)
t = &m
}
if v.XferRateSml != nil {
mx[Small] = *v.XferRateSml.Clone()
m := t.merge(*v.XferRateSml)
t = &m
}
mx[Total] = *t
metric := SRMetric{
ReplicatedSize: v.ReplicatedSize,
ReplicatedCount: v.ReplicatedCount,
DeploymentID: dID,
Failed: v.Failed.toMetric(),
XferStats: mx,
}
epHealth, ok := epMap[v.Endpoint]
if ok {
metric.Endpoint = epHealth.Endpoint
metric.TotalDowntime = epHealth.offlineDuration
metric.LastOnline = epHealth.lastOnline
metric.Online = epHealth.Online
metric.Latency = madmin.LatencyStat{
Curr: epHealth.latency.curr,
Avg: epHealth.latency.avg,
Max: epHealth.latency.peak,
}
}
m[dID] = metric
}
return m
}
func (srs *SRStatus) updateXferRate(sz int64, duration time.Duration) {
if sz > minLargeObjSize {
srs.XferRateLrg.addSize(sz, duration)
} else {
srs.XferRateSml.addSize(sz, duration)
}
}
func newSRStats() *SRStats {
s := SRStats{
M: make(map[string]*SRStatus),
movingAvgTicker: time.NewTicker(time.Second * 2),
}
go s.trackEWMA()
return &s
}
func (sr *SRStats) trackEWMA() {
for {
select {
case <-sr.movingAvgTicker.C:
sr.updateMovingAvg()
case <-GlobalContext.Done():
return
}
}
}
func (sr *SRStats) updateMovingAvg() {
sr.lock.Lock()
defer sr.lock.Unlock()
for _, s := range sr.M {
s.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
s.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
}
}
// SRMetric captures replication metrics for a deployment
type SRMetric struct {
DeploymentID string `json:"deploymentID"`
Endpoint string `json:"endpoint"`
TotalDowntime time.Duration `json:"totalDowntime"`
LastOnline time.Time `json:"lastOnline"`
Online bool `json:"isOnline"`
Latency madmin.LatencyStat `json:"latency"`
// replication metrics across buckets roll up
ReplicatedSize int64 `json:"replicatedSize"`
// Total number of completed operations
ReplicatedCount int64 `json:"replicatedCount"`
// Failed captures replication errors in various time windows
Failed madmin.TimedErrStats `json:"failed,omitempty"`
XferStats map[RMetricName]XferStats `json:"transferSummary"`
}
// SRMetricsSummary captures summary of replication counts across buckets on site
// along with op metrics rollup.
type SRMetricsSummary struct {
// op metrics roll up
ActiveWorkers ActiveWorkerStat `json:"activeWorkers"`
// Total Replica size in bytes
ReplicaSize int64 `json:"replicaSize"`
// Total number of replica received
ReplicaCount int64 `json:"replicaCount"`
// Queued operations
Queued InQueueMetric `json:"queued"`
// replication metrics summary for each site replication peer
Metrics map[string]SRMetric `json:"replMetrics"`
// uptime of node being queried for site replication metrics
Uptime int64 `json:"uptime"`
}