mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
504 lines
13 KiB
Go
504 lines
13 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/rcrowley/go-metrics"
|
|
)
|
|
|
|
func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
|
for _, s := range b.Stats {
|
|
if s.hasReplicationUsage() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ReplicationStats holds the global in-memory replication stats
|
|
type ReplicationStats struct {
|
|
// map of site deployment ID to site replication status
|
|
// for site replication - maintain stats at global level
|
|
srStats *SRStats
|
|
// active worker stats
|
|
workers *ActiveWorkerStat
|
|
// queue stats cache
|
|
qCache queueCache
|
|
|
|
pCache proxyStatsCache
|
|
// mrf backlog stats
|
|
mrfStats ReplicationMRFStats
|
|
// for bucket replication, continue to use existing cache
|
|
Cache map[string]*BucketReplicationStats
|
|
mostRecentStats BucketStatsMap
|
|
registry metrics.Registry
|
|
sync.RWMutex // mutex for Cache
|
|
mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
|
|
|
|
wlock sync.RWMutex // mutex for active workers
|
|
|
|
movingAvgTicker *time.Ticker // Ticker for calculating moving averages
|
|
wTimer *time.Ticker // ticker for calculating active workers
|
|
qTimer *time.Ticker // ticker for calculating queue stats
|
|
}
|
|
|
|
func (r *ReplicationStats) trackEWMA() {
|
|
for {
|
|
select {
|
|
case <-r.movingAvgTicker.C:
|
|
r.updateMovingAvg()
|
|
case <-GlobalContext.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *ReplicationStats) updateMovingAvg() {
|
|
r.RLock()
|
|
for _, s := range r.Cache {
|
|
for _, st := range s.Stats {
|
|
st.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
|
|
st.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
|
|
}
|
|
}
|
|
r.RUnlock()
|
|
}
|
|
|
|
// ActiveWorkers returns worker stats
|
|
func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat {
|
|
r.wlock.RLock()
|
|
defer r.wlock.RUnlock()
|
|
w := r.workers.get()
|
|
return ActiveWorkerStat{
|
|
Curr: w.Curr,
|
|
Max: w.Max,
|
|
Avg: w.Avg,
|
|
}
|
|
}
|
|
|
|
func (r *ReplicationStats) collectWorkerMetrics(ctx context.Context) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-r.wTimer.C:
|
|
r.wlock.Lock()
|
|
r.workers.update()
|
|
r.wlock.Unlock()
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *ReplicationStats) collectQueueMetrics(ctx context.Context) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-r.qTimer.C:
|
|
r.qCache.update()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete deletes in-memory replication statistics for a bucket.
|
|
func (r *ReplicationStats) Delete(bucket string) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
delete(r.Cache, bucket)
|
|
}
|
|
|
|
// UpdateReplicaStat updates in-memory replica statistics with new values.
|
|
func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
bs, ok := r.Cache[bucket]
|
|
if !ok {
|
|
bs = newBucketReplicationStats()
|
|
}
|
|
bs.ReplicaSize += n
|
|
bs.ReplicaCount++
|
|
r.Cache[bucket] = bs
|
|
r.srUpdateReplicaStat(n)
|
|
}
|
|
|
|
func (r *ReplicationStats) srUpdateReplicaStat(sz int64) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
atomic.AddInt64(&r.srStats.ReplicaSize, sz)
|
|
atomic.AddInt64(&r.srStats.ReplicaCount, 1)
|
|
}
|
|
|
|
func (r *ReplicationStats) srUpdate(sr replStat) {
|
|
dID, err := globalSiteReplicationSys.getDeplIDForEndpoint(sr.endpoint())
|
|
if err == nil {
|
|
r.srStats.update(sr, dID)
|
|
}
|
|
}
|
|
|
|
// Update updates in-memory replication statistics with new values.
|
|
func (r *ReplicationStats) Update(bucket string, ri replicatedTargetInfo, status, prevStatus replication.StatusType) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
var rs replStat
|
|
switch status {
|
|
case replication.Pending:
|
|
if ri.OpType.IsDataReplication() && prevStatus != status {
|
|
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
|
}
|
|
case replication.Completed:
|
|
if ri.OpType.IsDataReplication() {
|
|
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
|
}
|
|
case replication.Failed:
|
|
if ri.OpType.IsDataReplication() && prevStatus == replication.Pending {
|
|
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
|
}
|
|
case replication.Replica:
|
|
if ri.OpType == replication.ObjectReplicationType {
|
|
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, "", false, ri.Err)
|
|
}
|
|
}
|
|
|
|
// update site-replication in-memory stats
|
|
if rs.Completed || rs.Failed {
|
|
r.srUpdate(rs)
|
|
}
|
|
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
// update bucket replication in-memory stats
|
|
bs, ok := r.Cache[bucket]
|
|
if !ok {
|
|
bs = newBucketReplicationStats()
|
|
r.Cache[bucket] = bs
|
|
}
|
|
b, ok := bs.Stats[ri.Arn]
|
|
if !ok {
|
|
b = &BucketReplicationStat{
|
|
XferRateLrg: newXferStats(),
|
|
XferRateSml: newXferStats(),
|
|
}
|
|
bs.Stats[ri.Arn] = b
|
|
}
|
|
|
|
switch {
|
|
case rs.Completed:
|
|
b.ReplicatedSize += rs.TransferSize
|
|
b.ReplicatedCount++
|
|
if rs.TransferDuration > 0 {
|
|
b.Latency.update(rs.TransferSize, rs.TransferDuration)
|
|
b.updateXferRate(rs.TransferSize, rs.TransferDuration)
|
|
}
|
|
case rs.Failed:
|
|
b.FailStats.addsize(rs.TransferSize, rs.Err)
|
|
case rs.Pending:
|
|
}
|
|
}
|
|
|
|
type replStat struct {
|
|
Arn string
|
|
Completed bool
|
|
Pending bool
|
|
Failed bool
|
|
opType replication.Type
|
|
// transfer size
|
|
TransferSize int64
|
|
// transfer duration
|
|
TransferDuration time.Duration
|
|
Endpoint string
|
|
Secure bool
|
|
Err error
|
|
}
|
|
|
|
func (rs *replStat) endpoint() string {
|
|
scheme := "http"
|
|
if rs.Secure {
|
|
scheme = "https"
|
|
}
|
|
return scheme + "://" + rs.Endpoint
|
|
}
|
|
|
|
func (rs *replStat) set(arn string, n int64, duration time.Duration, status replication.StatusType, opType replication.Type, endpoint string, secure bool, err error) {
|
|
rs.Endpoint = endpoint
|
|
rs.Secure = secure
|
|
rs.TransferSize = n
|
|
rs.Arn = arn
|
|
rs.TransferDuration = duration
|
|
rs.opType = opType
|
|
switch status {
|
|
case replication.Completed:
|
|
rs.Completed = true
|
|
case replication.Pending:
|
|
rs.Pending = true
|
|
case replication.Failed:
|
|
rs.Failed = true
|
|
rs.Err = err
|
|
}
|
|
}
|
|
|
|
// GetAll returns replication metrics for all buckets at once.
|
|
func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
|
|
if r == nil {
|
|
return map[string]BucketReplicationStats{}
|
|
}
|
|
|
|
r.RLock()
|
|
|
|
bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
|
|
for k, v := range r.Cache {
|
|
bucketReplicationStats[k] = v.Clone()
|
|
}
|
|
r.RUnlock()
|
|
for k, v := range bucketReplicationStats {
|
|
v.QStat = r.qCache.getBucketStats(k)
|
|
bucketReplicationStats[k] = v
|
|
}
|
|
|
|
return bucketReplicationStats
|
|
}
|
|
|
|
func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary {
|
|
if r == nil {
|
|
return SRMetricsSummary{}
|
|
}
|
|
|
|
m := SRMetricsSummary{
|
|
Uptime: UTCNow().Unix() - globalBootTime.Unix(),
|
|
Queued: r.qCache.getSiteStats(),
|
|
ActiveWorkers: r.ActiveWorkers(),
|
|
Metrics: r.srStats.get(),
|
|
Proxied: r.pCache.getSiteStats(),
|
|
ReplicaSize: atomic.LoadInt64(&r.srStats.ReplicaSize),
|
|
ReplicaCount: atomic.LoadInt64(&r.srStats.ReplicaCount),
|
|
}
|
|
return m
|
|
}
|
|
|
|
// Get replication metrics for a bucket from this node since this node came up.
|
|
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
|
|
if r == nil {
|
|
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
|
|
}
|
|
|
|
r.RLock()
|
|
defer r.RUnlock()
|
|
|
|
st, ok := r.Cache[bucket]
|
|
if !ok {
|
|
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
|
|
}
|
|
return st.Clone()
|
|
}
|
|
|
|
// NewReplicationStats initialize in-memory replication statistics
|
|
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
|
|
r := metrics.NewRegistry()
|
|
rs := ReplicationStats{
|
|
Cache: make(map[string]*BucketReplicationStats),
|
|
qCache: newQueueCache(r),
|
|
pCache: newProxyStatsCache(),
|
|
srStats: newSRStats(),
|
|
movingAvgTicker: time.NewTicker(2 * time.Second),
|
|
wTimer: time.NewTicker(2 * time.Second),
|
|
qTimer: time.NewTicker(2 * time.Second),
|
|
|
|
workers: newActiveWorkerStat(r),
|
|
registry: r,
|
|
}
|
|
go rs.collectWorkerMetrics(ctx)
|
|
go rs.collectQueueMetrics(ctx)
|
|
return &rs
|
|
}
|
|
|
|
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) {
|
|
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
|
|
bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage))
|
|
|
|
for bucket := range bucketsUsage {
|
|
bucketStats := make([]BucketStats, len(peerBucketStatsList))
|
|
for i, peerBucketStats := range peerBucketStatsList {
|
|
bucketStat, ok := peerBucketStats.Stats[bucket]
|
|
if !ok {
|
|
continue
|
|
}
|
|
bucketStats[i] = bucketStat
|
|
}
|
|
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, bucketStats)
|
|
}
|
|
return bucketsReplicationStats
|
|
}
|
|
|
|
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucketStats []BucketStats) (bs BucketStats) {
|
|
if r == nil {
|
|
bs = BucketStats{
|
|
ReplicationStats: BucketReplicationStats{
|
|
Stats: make(map[string]*BucketReplicationStat),
|
|
},
|
|
QueueStats: ReplicationQueueStats{},
|
|
ProxyStats: ProxyMetric{},
|
|
}
|
|
return bs
|
|
}
|
|
var s BucketReplicationStats
|
|
// accumulate cluster bucket stats
|
|
stats := make(map[string]*BucketReplicationStat)
|
|
var (
|
|
totReplicaSize, totReplicatedSize int64
|
|
totReplicaCount, totReplicatedCount int64
|
|
totFailed RTimedMetrics
|
|
tq InQueueMetric
|
|
)
|
|
for _, bucketStat := range bucketStats {
|
|
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
|
|
totReplicaCount += bucketStat.ReplicationStats.ReplicaCount
|
|
for _, q := range bucketStat.QueueStats.Nodes {
|
|
tq = tq.merge(q.QStats)
|
|
}
|
|
|
|
for arn, stat := range bucketStat.ReplicationStats.Stats {
|
|
oldst := stats[arn]
|
|
if oldst == nil {
|
|
oldst = &BucketReplicationStat{
|
|
XferRateLrg: newXferStats(),
|
|
XferRateSml: newXferStats(),
|
|
}
|
|
}
|
|
fstats := stat.FailStats.merge(oldst.FailStats)
|
|
lrg := oldst.XferRateLrg.merge(*stat.XferRateLrg)
|
|
sml := oldst.XferRateSml.merge(*stat.XferRateSml)
|
|
stats[arn] = &BucketReplicationStat{
|
|
Failed: fstats.toMetric(),
|
|
FailStats: fstats,
|
|
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
|
|
ReplicatedCount: stat.ReplicatedCount + oldst.ReplicatedCount,
|
|
Latency: stat.Latency.merge(oldst.Latency),
|
|
XferRateLrg: &lrg,
|
|
XferRateSml: &sml,
|
|
}
|
|
totReplicatedSize += stat.ReplicatedSize
|
|
totReplicatedCount += stat.ReplicatedCount
|
|
totFailed = totFailed.merge(stat.FailStats)
|
|
}
|
|
}
|
|
|
|
s = BucketReplicationStats{
|
|
Stats: stats,
|
|
QStat: tq,
|
|
ReplicaSize: totReplicaSize,
|
|
ReplicaCount: totReplicaCount,
|
|
ReplicatedSize: totReplicatedSize,
|
|
ReplicatedCount: totReplicatedCount,
|
|
Failed: totFailed.toMetric(),
|
|
}
|
|
|
|
var qs ReplicationQueueStats
|
|
for _, bs := range bucketStats {
|
|
qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...)
|
|
}
|
|
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
|
|
|
|
var ps ProxyMetric
|
|
for _, bs := range bucketStats {
|
|
ps.add(bs.ProxyStats)
|
|
}
|
|
bs = BucketStats{
|
|
ReplicationStats: s,
|
|
QueueStats: qs,
|
|
ProxyStats: ps,
|
|
}
|
|
r.mostRecentStatsMu.Lock()
|
|
if len(r.mostRecentStats.Stats) == 0 {
|
|
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
|
|
}
|
|
if len(bs.ReplicationStats.Stats) > 0 {
|
|
r.mostRecentStats.Stats[bucket] = bs
|
|
}
|
|
r.mostRecentStats.Timestamp = UTCNow()
|
|
r.mostRecentStatsMu.Unlock()
|
|
return bs
|
|
}
|
|
|
|
// get the most current of in-memory replication stats and data usage info from crawler.
|
|
func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) {
|
|
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
|
|
return r.calculateBucketReplicationStats(bucket, bucketStats)
|
|
}
|
|
|
|
func (r *ReplicationStats) incQ(bucket string, sz int64, isDeleleRepl bool, opType replication.Type) {
|
|
r.qCache.Lock()
|
|
defer r.qCache.Unlock()
|
|
v, ok := r.qCache.bucketStats[bucket]
|
|
if !ok {
|
|
v = newInQueueStats(r.registry, bucket)
|
|
}
|
|
atomic.AddInt64(&v.nowBytes, sz)
|
|
atomic.AddInt64(&v.nowCount, 1)
|
|
r.qCache.bucketStats[bucket] = v
|
|
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, sz)
|
|
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, 1)
|
|
}
|
|
|
|
func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opType replication.Type) {
|
|
r.qCache.Lock()
|
|
defer r.qCache.Unlock()
|
|
v, ok := r.qCache.bucketStats[bucket]
|
|
if !ok {
|
|
v = newInQueueStats(r.registry, bucket)
|
|
}
|
|
atomic.AddInt64(&v.nowBytes, -1*sz)
|
|
atomic.AddInt64(&v.nowCount, -1)
|
|
r.qCache.bucketStats[bucket] = v
|
|
|
|
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz)
|
|
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1)
|
|
}
|
|
|
|
// incProxy increments proxy metrics for proxied calls
|
|
func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) {
|
|
r.pCache.inc(bucket, api, isErr)
|
|
}
|
|
|
|
func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric {
|
|
return r.pCache.getBucketStats(bucket)
|
|
}
|