minio/cmd/bucket-replication-stats.go

364 lines
11 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"math"
"sync"
"time"
"github.com/minio/minio/internal/bucket/replication"
)
func (b *BucketReplicationStats) hasReplicationUsage() bool {
for _, s := range b.Stats {
if s.hasReplicationUsage() {
return true
}
}
return false
}
// ReplicationStats holds the global in-memory replication stats
type ReplicationStats struct {
Cache map[string]*BucketReplicationStats
UsageCache map[string]*BucketReplicationStats
mostRecentStats BucketStatsMap
sync.RWMutex // mutex for Cache
ulock sync.RWMutex // mutex for UsageCache
dlock sync.RWMutex // mutex for mostRecentStats
}
// Delete deletes in-memory replication statistics for a bucket.
func (r *ReplicationStats) Delete(bucket string) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
delete(r.Cache, bucket)
r.ulock.Lock()
defer r.ulock.Unlock()
delete(r.UsageCache, bucket)
}
// UpdateReplicaStat updates in-memory replica statistics with new values.
func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
bs, ok := r.Cache[bucket]
if !ok {
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
}
bs.ReplicaSize += n
r.Cache[bucket] = bs
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
bs, ok := r.Cache[bucket]
if !ok {
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
r.Cache[bucket] = bs
}
b, ok := bs.Stats[arn]
if !ok {
b = &BucketReplicationStat{}
bs.Stats[arn] = b
}
switch status {
case replication.Pending:
if opType.IsDataReplication() && prevStatus != status {
b.PendingSize += n
b.PendingCount++
}
case replication.Completed:
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
b.PendingCount--
case replication.Failed:
b.FailedCount--
}
if opType.IsDataReplication() {
b.ReplicatedSize += n
switch prevStatus {
case replication.Pending:
b.PendingSize -= n
case replication.Failed:
b.FailedSize -= n
}
if duration > 0 {
b.Latency.update(n, duration)
}
}
case replication.Failed:
if opType.IsDataReplication() {
if prevStatus == replication.Pending {
b.FailedSize += n
b.FailedCount++
b.PendingSize -= n
b.PendingCount--
}
}
case replication.Replica:
if opType == replication.ObjectReplicationType {
b.ReplicaSize += n
}
}
}
// GetInitialUsage get replication metrics available at the time of cluster initialization
func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{}
}
r.ulock.RLock()
defer r.ulock.RUnlock()
st, ok := r.UsageCache[bucket]
if !ok {
return BucketReplicationStats{}
}
return st.Clone()
}
// GetAll returns replication metrics for all buckets at once.
func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
if r == nil {
return map[string]BucketReplicationStats{}
}
r.RLock()
defer r.RUnlock()
bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
for k, v := range r.Cache {
bucketReplicationStats[k] = v.Clone()
}
return bucketReplicationStats
}
// Get replication metrics for a bucket from this node since this node came up.
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
}
r.RLock()
defer r.RUnlock()
st, ok := r.Cache[bucket]
if !ok {
return BucketReplicationStats{}
}
return st.Clone()
}
// NewReplicationStats initialize in-memory replication statistics
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
return &ReplicationStats{
Cache: make(map[string]*BucketReplicationStats),
UsageCache: make(map[string]*BucketReplicationStats),
}
}
// load replication metrics at cluster start from latest replication stats saved in .minio.sys/buckets/replication/node-name.stats
// fallback to replication stats in data usage to be backward compatible
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
m := make(map[string]*BucketReplicationStats)
if stats, err := globalReplicationPool.loadStatsFromDisk(); err == nil {
for b, st := range stats {
m[b] = &st
}
r.ulock.Lock()
r.UsageCache = m
r.ulock.Unlock()
return
}
rTimer := time.NewTimer(time.Second * 5)
defer rTimer.Stop()
var (
dui DataUsageInfo
err error
)
outer:
for {
select {
case <-ctx.Done():
return
case <-rTimer.C:
dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
// If LastUpdate is set, data usage is available.
if err == nil {
break outer
}
rTimer.Reset(time.Second * 5)
}
}
for bucket, usage := range dui.BucketsUsage {
b := &BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)),
}
for arn, uinfo := range usage.ReplicationInfo {
b.Stats[arn] = &BucketReplicationStat{
FailedSize: int64(uinfo.ReplicationFailedSize),
ReplicatedSize: int64(uinfo.ReplicatedSize),
ReplicaSize: int64(uinfo.ReplicaSize),
FailedCount: int64(uinfo.ReplicationFailedCount),
}
}
b.ReplicaSize += int64(usage.ReplicaSize)
if b.hasReplicationUsage() {
m[bucket] = b
}
}
r.ulock.Lock()
r.UsageCache = m
r.ulock.Unlock()
}
func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap {
r.dlock.RLock()
defer r.dlock.RUnlock()
return r.mostRecentStats
}
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage))
for bucket, u := range bucketsUsage {
bucketStats := make([]BucketStats, len(peerBucketStatsList))
for i, peerBucketStats := range peerBucketStatsList {
bucketStat, ok := peerBucketStats.Stats[bucket]
if !ok {
continue
}
bucketStats[i] = bucketStat
}
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, u, bucketStats)
}
return bucketsReplicationStats
}
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) {
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{}
}
stats[arn] = &BucketReplicationStat{
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
PendingCount: stat.PendingCount + oldst.PendingCount,
PendingSize: stat.PendingSize + oldst.PendingSize,
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
for arn, stat := range usageStat.Stats {
st, ok := stats[arn]
if !ok {
st = &BucketReplicationStat{}
stats[arn] = st
}
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
st.PendingSize += stat.PendingSize
st.PendingCount += stat.PendingCount
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat
for arn, tgtstat := range stats {
st := BucketReplicationStat{}
bu, ok := u.ReplicationInfo[arn]
if !ok {
bu = BucketTargetUsageInfo{}
}
// use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = int64(bu.ReplicatedSize)
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
st.ReplicatedSize = tgtstat.ReplicatedSize
}
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.PendingSize = int64(math.Max(float64(tgtstat.PendingSize), 0))
st.PendingCount = int64(math.Max(float64(tgtstat.PendingCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
s.PendingCount += st.PendingCount
s.PendingSize += st.PendingSize
}
// normalize overall stats
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
r.dlock.Lock()
if len(r.mostRecentStats.Stats) == 0 {
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
}
r.mostRecentStats.Stats[bucket] = BucketStats{ReplicationStats: s}
r.mostRecentStats.Timestamp = UTCNow()
r.dlock.Unlock()
return s
}
// get the most current of in-memory replication stats and data usage info from crawler.
func (r *ReplicationStats) getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return r.calculateBucketReplicationStats(bucket, u, bucketStats)
}