mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
b48bbe08b2
to track the replication transfer rate across different nodes, number of active workers in use and in-queue stats to get an idea of the current workload. This PR also adds replication metrics to the site replication status API. For site replication, prometheus metrics are no longer at the bucket level - but at the cluster level. Add prometheus metric to track credential errors since uptime
290 lines
8.2 KiB
Go
290 lines
8.2 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio-go/v7"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE
|
|
|
|
// RStat has replication error stats
|
|
type RStat struct {
|
|
Count int64 `json:"count"`
|
|
Bytes int64 `json:"bytes"`
|
|
}
|
|
|
|
// RTimedMetrics has replication error stats for various time windows
|
|
type RTimedMetrics struct {
|
|
LastHour ReplicationLastHour `json:"lastHour"`
|
|
SinceUptime RStat `json:"sinceUptime"`
|
|
LastMinute ReplicationLastMinute
|
|
// Error counts
|
|
ErrCounts map[string]int `json:"errCounts"` // Count of credential errors
|
|
}
|
|
|
|
func (rt *RTimedMetrics) String() string {
|
|
s := rt.toMetric()
|
|
return fmt.Sprintf("Errors in LastMinute: %v, LastHour: %v, SinceUptime: %v", s.LastMinute.Count, s.LastHour.Count, s.Totals.Count)
|
|
}
|
|
|
|
func (rt *RTimedMetrics) toMetric() madmin.TimedErrStats {
|
|
if rt == nil {
|
|
return madmin.TimedErrStats{}
|
|
}
|
|
errCounts := make(map[string]int)
|
|
for k, v := range rt.ErrCounts {
|
|
errCounts[k] = v
|
|
}
|
|
minuteTotals := rt.LastMinute.getTotal()
|
|
hourTotals := rt.LastHour.getTotal()
|
|
return madmin.TimedErrStats{
|
|
LastMinute: madmin.RStat{
|
|
Count: float64(minuteTotals.N),
|
|
Bytes: minuteTotals.Size,
|
|
},
|
|
LastHour: madmin.RStat{
|
|
Count: float64(hourTotals.N),
|
|
Bytes: hourTotals.Size,
|
|
},
|
|
Totals: madmin.RStat{
|
|
Count: float64(rt.SinceUptime.Count),
|
|
Bytes: rt.SinceUptime.Bytes,
|
|
},
|
|
ErrCounts: errCounts,
|
|
}
|
|
}
|
|
|
|
func (rt *RTimedMetrics) addsize(size int64, err error) {
|
|
// failures seen since uptime
|
|
atomic.AddInt64(&rt.SinceUptime.Bytes, size)
|
|
atomic.AddInt64(&rt.SinceUptime.Count, 1)
|
|
rt.LastMinute.addsize(size)
|
|
rt.LastHour.addsize(size)
|
|
if err != nil && minio.ToErrorResponse(err).Code == "AccessDenied" {
|
|
if rt.ErrCounts == nil {
|
|
rt.ErrCounts = make(map[string]int)
|
|
}
|
|
rt.ErrCounts["AccessDenied"]++
|
|
}
|
|
}
|
|
|
|
func (rt *RTimedMetrics) merge(o RTimedMetrics) (n RTimedMetrics) {
|
|
n.SinceUptime.Bytes = atomic.LoadInt64(&rt.SinceUptime.Bytes) + atomic.LoadInt64(&o.SinceUptime.Bytes)
|
|
n.SinceUptime.Count = atomic.LoadInt64(&rt.SinceUptime.Count) + atomic.LoadInt64(&o.SinceUptime.Count)
|
|
|
|
n.LastMinute = n.LastMinute.merge(rt.LastMinute)
|
|
n.LastMinute = n.LastMinute.merge(o.LastMinute)
|
|
n.LastHour = n.LastHour.merge(rt.LastHour)
|
|
n.LastHour = n.LastHour.merge(o.LastHour)
|
|
n.ErrCounts = make(map[string]int)
|
|
for k, v := range rt.ErrCounts {
|
|
n.ErrCounts[k] = v
|
|
}
|
|
for k, v := range o.ErrCounts {
|
|
n.ErrCounts[k] += v
|
|
}
|
|
return n
|
|
}
|
|
|
|
// SRStats has replication stats at site level
|
|
type SRStats struct {
|
|
// Total Replica size in bytes
|
|
ReplicaSize int64 `json:"replicaSize"`
|
|
// Total Replica received
|
|
ReplicaCount int64 `json:"replicaCount"`
|
|
M map[string]*SRStatus `json:"srStatusMap"`
|
|
|
|
movingAvgTicker *time.Ticker // Ticker for calculating moving averages
|
|
lock sync.RWMutex // mutex for srStats
|
|
}
|
|
|
|
// SRStatus has replication stats at deployment level
|
|
type SRStatus struct {
|
|
ReplicatedSize int64 `json:"completedReplicationSize"`
|
|
// Total number of failed operations including metadata updates in the last minute
|
|
Failed RTimedMetrics `json:"failedReplication"`
|
|
// Total number of completed operations
|
|
ReplicatedCount int64 `json:"replicationCount"`
|
|
// Replication latency information
|
|
Latency ReplicationLatency `json:"replicationLatency"`
|
|
// transfer rate for large uploads
|
|
XferRateLrg *XferStats `json:"largeTransferRate" msg:"lt"`
|
|
// transfer rate for small uploads
|
|
XferRateSml *XferStats `json:"smallTransferRate" msg:"st"`
|
|
// Endpoint is the replication target endpoint
|
|
Endpoint string `json:"-"`
|
|
// Secure is true if the replication target endpoint is secure
|
|
Secure bool `json:"-"`
|
|
}
|
|
|
|
func (sr *SRStats) update(st replStat, dID string) {
|
|
sr.lock.Lock()
|
|
defer sr.lock.Unlock()
|
|
srs, ok := sr.M[dID]
|
|
if !ok {
|
|
srs = &SRStatus{
|
|
XferRateLrg: newXferStats(),
|
|
XferRateSml: newXferStats(),
|
|
}
|
|
}
|
|
srs.Endpoint = st.Endpoint
|
|
srs.Secure = st.Secure
|
|
switch {
|
|
case st.Completed:
|
|
srs.ReplicatedSize += st.TransferSize
|
|
srs.ReplicatedCount++
|
|
if st.TransferDuration > 0 {
|
|
srs.Latency.update(st.TransferSize, st.TransferDuration)
|
|
srs.updateXferRate(st.TransferSize, st.TransferDuration)
|
|
}
|
|
case st.Failed:
|
|
srs.Failed.addsize(st.TransferSize, st.Err)
|
|
case st.Pending:
|
|
}
|
|
sr.M[dID] = srs
|
|
}
|
|
|
|
func (sr *SRStats) get() map[string]SRMetric {
|
|
epMap := globalBucketTargetSys.healthStats()
|
|
|
|
sr.lock.RLock()
|
|
defer sr.lock.RUnlock()
|
|
m := make(map[string]SRMetric, len(sr.M))
|
|
for dID, v := range sr.M {
|
|
t := newXferStats()
|
|
mx := make(map[RMetricName]XferStats)
|
|
|
|
if v.XferRateLrg != nil {
|
|
mx[Large] = *v.XferRateLrg.Clone()
|
|
m := t.merge(*v.XferRateLrg)
|
|
t = &m
|
|
}
|
|
if v.XferRateSml != nil {
|
|
mx[Small] = *v.XferRateSml.Clone()
|
|
m := t.merge(*v.XferRateSml)
|
|
t = &m
|
|
}
|
|
|
|
mx[Total] = *t
|
|
metric := SRMetric{
|
|
ReplicatedSize: v.ReplicatedSize,
|
|
ReplicatedCount: v.ReplicatedCount,
|
|
DeploymentID: dID,
|
|
Failed: v.Failed.toMetric(),
|
|
XferStats: mx,
|
|
}
|
|
epHealth, ok := epMap[v.Endpoint]
|
|
if ok {
|
|
metric.Endpoint = epHealth.Endpoint
|
|
metric.TotalDowntime = epHealth.offlineDuration
|
|
metric.LastOnline = epHealth.lastOnline
|
|
metric.Online = epHealth.Online
|
|
metric.Latency = madmin.LatencyStat{
|
|
Curr: epHealth.latency.curr,
|
|
Avg: epHealth.latency.avg,
|
|
Max: epHealth.latency.peak,
|
|
}
|
|
}
|
|
m[dID] = metric
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (srs *SRStatus) updateXferRate(sz int64, duration time.Duration) {
|
|
if sz > minLargeObjSize {
|
|
srs.XferRateLrg.addSize(sz, duration)
|
|
} else {
|
|
srs.XferRateSml.addSize(sz, duration)
|
|
}
|
|
}
|
|
|
|
func newSRStats() *SRStats {
|
|
s := SRStats{
|
|
M: make(map[string]*SRStatus),
|
|
movingAvgTicker: time.NewTicker(time.Second * 2),
|
|
}
|
|
go s.trackEWMA()
|
|
return &s
|
|
}
|
|
|
|
func (sr *SRStats) trackEWMA() {
|
|
for {
|
|
select {
|
|
case <-sr.movingAvgTicker.C:
|
|
sr.updateMovingAvg()
|
|
case <-GlobalContext.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sr *SRStats) updateMovingAvg() {
|
|
sr.lock.Lock()
|
|
defer sr.lock.Unlock()
|
|
for _, s := range sr.M {
|
|
s.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
|
|
s.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
|
|
}
|
|
}
|
|
|
|
// SRMetric captures replication metrics for a deployment
|
|
type SRMetric struct {
|
|
DeploymentID string `json:"deploymentID"`
|
|
Endpoint string `json:"endpoint"`
|
|
TotalDowntime time.Duration `json:"totalDowntime"`
|
|
LastOnline time.Time `json:"lastOnline"`
|
|
Online bool `json:"isOnline"`
|
|
Latency madmin.LatencyStat `json:"latency"`
|
|
|
|
// replication metrics across buckets roll up
|
|
ReplicatedSize int64 `json:"replicatedSize"`
|
|
// Total number of completed operations
|
|
ReplicatedCount int64 `json:"replicatedCount"`
|
|
// Failed captures replication errors in various time windows
|
|
|
|
Failed madmin.TimedErrStats `json:"failed,omitempty"`
|
|
|
|
XferStats map[RMetricName]XferStats `json:"transferSummary"`
|
|
}
|
|
|
|
// SRMetricsSummary captures summary of replication counts across buckets on site
|
|
// along with op metrics rollup.
|
|
type SRMetricsSummary struct {
|
|
// op metrics roll up
|
|
ActiveWorkers ActiveWorkerStat `json:"activeWorkers"`
|
|
|
|
// Total Replica size in bytes
|
|
ReplicaSize int64 `json:"replicaSize"`
|
|
|
|
// Total number of replica received
|
|
ReplicaCount int64 `json:"replicaCount"`
|
|
// Queued operations
|
|
Queued InQueueMetric `json:"queued"`
|
|
// replication metrics summary for each site replication peer
|
|
Metrics map[string]SRMetric `json:"replMetrics"`
|
|
// uptime of node being queried for site replication metrics
|
|
Uptime int64 `json:"uptime"`
|
|
}
|