Fix cluster bucket stats API for prometheus (#11970)

Metrics calculation was accumulating inital usage across all nodes
rather than using initial usage only once.

Also fixing:
- bug where all  peer traffic was going to the same node.
- reset counters when replication status changes from
PENDING -> FAILED
This commit is contained in:
Poorna Krishnamoorthy 2021-04-06 08:36:54 -07:00 committed by GitHub
parent 0276652f26
commit 075bccda42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 5 deletions

View File

@ -37,6 +37,7 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool {
type ReplicationStats struct { type ReplicationStats struct {
sync.RWMutex sync.RWMutex
Cache map[string]*BucketReplicationStats Cache map[string]*BucketReplicationStats
UsageCache map[string]*BucketReplicationStats // initial usage
} }
// Delete deletes in-memory replication statistics for a bucket. // Delete deletes in-memory replication statistics for a bucket.
@ -48,6 +49,8 @@ func (r *ReplicationStats) Delete(bucket string) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
delete(r.Cache, bucket) delete(r.Cache, bucket)
delete(r.UsageCache, bucket)
} }
// Update updates in-memory replication statistics with new values. // Update updates in-memory replication statistics with new values.
@ -62,7 +65,6 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep
b = &BucketReplicationStats{} b = &BucketReplicationStats{}
} }
r.RUnlock() r.RUnlock()
switch status { switch status {
case replication.Pending: case replication.Pending:
if opType == replication.ObjectReplicationType { if opType == replication.ObjectReplicationType {
@ -87,10 +89,15 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep
} }
case replication.Failed: case replication.Failed:
// count failures only once - not on every retry // count failures only once - not on every retry
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
atomic.AddUint64(&b.PendingCount, ^uint64(0))
}
if opType == replication.ObjectReplicationType { if opType == replication.ObjectReplicationType {
if prevStatus == replication.Pending { if prevStatus == replication.Pending {
atomic.AddUint64(&b.FailedSize, uint64(n)) atomic.AddUint64(&b.FailedSize, uint64(n))
atomic.AddUint64(&b.FailedCount, 1) atomic.AddUint64(&b.FailedCount, 1)
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
} }
} }
case replication.Replica: case replication.Replica:
@ -98,6 +105,32 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep
atomic.AddUint64(&b.ReplicaSize, uint64(n)) atomic.AddUint64(&b.ReplicaSize, uint64(n))
} }
} }
r.Lock()
r.Cache[bucket] = b
r.Unlock()
}
// GetInitialUsage gets initial usage from the time of cluster initialization
func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{}
}
r.RLock()
defer r.RUnlock()
st, ok := r.UsageCache[bucket]
if !ok {
return BucketReplicationStats{}
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
PendingCount: atomic.LoadUint64(&st.PendingCount),
FailedCount: atomic.LoadUint64(&st.FailedCount),
}
} }
// Get total bytes pending replication for a bucket // Get total bytes pending replication for a bucket
@ -128,6 +161,7 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
st := &ReplicationStats{ st := &ReplicationStats{
Cache: make(map[string]*BucketReplicationStats), Cache: make(map[string]*BucketReplicationStats),
UsageCache: make(map[string]*BucketReplicationStats),
} }
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
@ -150,7 +184,7 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
FailedCount: usage.ReplicationFailedCount, FailedCount: usage.ReplicationFailedCount,
} }
if b.hasReplicationUsage() { if b.hasReplicationUsage() {
st.Cache[bucket] = b st.UsageCache[bucket] = b
} }
} }

View File

@ -444,9 +444,16 @@ func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s Bucke
replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
} }
usageStat := globalReplicationStats.GetInitialUsage(bucket)
replStats.FailedCount += usageStat.FailedCount
replStats.FailedSize += usageStat.FailedSize
replStats.PendingCount += usageStat.PendingCount
replStats.PendingSize += usageStat.PendingSize
replStats.ReplicaSize += usageStat.ReplicaSize
replStats.ReplicatedSize += usageStat.ReplicatedSize
// use in memory replication stats if it is ahead of usage info. // use in memory replication stats if it is ahead of usage info.
if replStats.ReplicatedSize > u.ReplicatedSize { if replStats.ReplicatedSize >= u.ReplicatedSize {
s.ReplicatedSize = replStats.ReplicatedSize s.ReplicatedSize = replStats.ReplicatedSize
} else { } else {
s.ReplicatedSize = u.ReplicatedSize s.ReplicatedSize = u.ReplicatedSize

View File

@ -704,6 +704,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
bucketStats := make([]BucketStats, len(sys.peerClients)) bucketStats := make([]BucketStats, len(sys.peerClients))
for index, client := range sys.peerClients { for index, client := range sys.peerClients {
index := index index := index
client := client
ng.Go(ctx, func() error { ng.Go(ctx, func() error {
if client == nil { if client == nil {
return errPeerNotReachable return errPeerNotReachable