From 075bccda4212301995820b7e87e21304aacc92f2 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Tue, 6 Apr 2021 08:36:54 -0700 Subject: [PATCH] Fix cluster bucket stats API for prometheus (#11970) Metrics calculation was accumulating inital usage across all nodes rather than using initial usage only once. Also fixing: - bug where all peer traffic was going to the same node. - reset counters when replication status changes from PENDING -> FAILED --- cmd/bucket-replication-stats.go | 42 +++++++++++++++++++++++++++++---- cmd/metrics.go | 9 ++++++- cmd/notification.go | 1 + 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 4ce47ae48..255b429c9 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -36,7 +36,8 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool { // ReplicationStats holds the global in-memory replication stats type ReplicationStats struct { sync.RWMutex - Cache map[string]*BucketReplicationStats + Cache map[string]*BucketReplicationStats + UsageCache map[string]*BucketReplicationStats // initial usage } // Delete deletes in-memory replication statistics for a bucket. @@ -48,6 +49,8 @@ func (r *ReplicationStats) Delete(bucket string) { r.Lock() defer r.Unlock() delete(r.Cache, bucket) + delete(r.UsageCache, bucket) + } // Update updates in-memory replication statistics with new values. @@ -62,7 +65,6 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep b = &BucketReplicationStats{} } r.RUnlock() - switch status { case replication.Pending: if opType == replication.ObjectReplicationType { @@ -87,10 +89,15 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep } case replication.Failed: // count failures only once - not on every retry + switch prevStatus { // adjust counters based on previous state + case replication.Pending: + atomic.AddUint64(&b.PendingCount, ^uint64(0)) + } if opType == replication.ObjectReplicationType { if prevStatus == replication.Pending { atomic.AddUint64(&b.FailedSize, uint64(n)) atomic.AddUint64(&b.FailedCount, 1) + atomic.AddUint64(&b.PendingSize, ^uint64(n-1)) } } case replication.Replica: @@ -98,6 +105,32 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep atomic.AddUint64(&b.ReplicaSize, uint64(n)) } } + r.Lock() + r.Cache[bucket] = b + r.Unlock() +} + +// GetInitialUsage gets initial usage from the time of cluster initialization +func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats { + if r == nil { + return BucketReplicationStats{} + } + + r.RLock() + defer r.RUnlock() + + st, ok := r.UsageCache[bucket] + if !ok { + return BucketReplicationStats{} + } + return BucketReplicationStats{ + PendingSize: atomic.LoadUint64(&st.PendingSize), + FailedSize: atomic.LoadUint64(&st.FailedSize), + ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize), + ReplicaSize: atomic.LoadUint64(&st.ReplicaSize), + PendingCount: atomic.LoadUint64(&st.PendingCount), + FailedCount: atomic.LoadUint64(&st.FailedCount), + } } // Get total bytes pending replication for a bucket @@ -127,7 +160,8 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { // NewReplicationStats initialize in-memory replication statistics func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { st := &ReplicationStats{ - Cache: make(map[string]*BucketReplicationStats), + Cache: make(map[string]*BucketReplicationStats), + UsageCache: make(map[string]*BucketReplicationStats), } dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) @@ -150,7 +184,7 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio FailedCount: usage.ReplicationFailedCount, } if b.hasReplicationUsage() { - st.Cache[bucket] = b + st.UsageCache[bucket] = b } } diff --git a/cmd/metrics.go b/cmd/metrics.go index 2eb0b5260..804f6b402 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -444,9 +444,16 @@ func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s Bucke replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize } + usageStat := globalReplicationStats.GetInitialUsage(bucket) + replStats.FailedCount += usageStat.FailedCount + replStats.FailedSize += usageStat.FailedSize + replStats.PendingCount += usageStat.PendingCount + replStats.PendingSize += usageStat.PendingSize + replStats.ReplicaSize += usageStat.ReplicaSize + replStats.ReplicatedSize += usageStat.ReplicatedSize // use in memory replication stats if it is ahead of usage info. - if replStats.ReplicatedSize > u.ReplicatedSize { + if replStats.ReplicatedSize >= u.ReplicatedSize { s.ReplicatedSize = replStats.ReplicatedSize } else { s.ReplicatedSize = u.ReplicatedSize diff --git a/cmd/notification.go b/cmd/notification.go index 7c7612d65..998315774 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -704,6 +704,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam bucketStats := make([]BucketStats, len(sys.peerClients)) for index, client := range sys.peerClients { index := index + client := client ng.Go(ctx, func() error { if client == nil { return errPeerNotReachable