diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 5a5801b3a..c1483354e 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1721,13 +1721,13 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW usageInfo = dataUsageInfo.BucketsUsage[bucket] } - bucketReplStats := getLatestReplicationStats(bucket, usageInfo) - jsonData, err := json.Marshal(bucketReplStats) - if err != nil { + w.Header().Set(xhttp.ContentType, string(mimeJSON)) + + enc := json.NewEncoder(w) + if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - writeSuccessResponseJSON(w, jsonData) } // ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 57bb334ed..a72142e47 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "reflect" "strings" @@ -1770,3 +1771,81 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate) return } + +// get the most current of in-memory replication stats and data usage info from crawler. +func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { + bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) + // accumulate cluster bucket stats + stats := make(map[string]*BucketReplicationStat) + var totReplicaSize int64 + for _, bucketStat := range bucketStats { + totReplicaSize += bucketStat.ReplicationStats.ReplicaSize + for arn, stat := range bucketStat.ReplicationStats.Stats { + oldst := stats[arn] + if oldst == nil { + oldst = &BucketReplicationStat{} + } + stats[arn] = &BucketReplicationStat{ + FailedCount: stat.FailedCount + oldst.FailedCount, + FailedSize: stat.FailedSize + oldst.FailedSize, + ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, + Latency: stat.Latency.merge(oldst.Latency), + } + } + } + + // add initial usage stat to cluster stats + usageStat := globalReplicationStats.GetInitialUsage(bucket) + totReplicaSize += usageStat.ReplicaSize + if usageStat.Stats != nil { + for arn, stat := range usageStat.Stats { + st := stats[arn] + if st == nil { + st = &BucketReplicationStat{ + ReplicatedSize: stat.ReplicatedSize, + FailedSize: stat.FailedSize, + FailedCount: stat.FailedCount, + } + } else { + st.ReplicatedSize += stat.ReplicatedSize + st.FailedSize += stat.FailedSize + st.FailedCount += stat.FailedCount + } + stats[arn] = st + } + } + s = BucketReplicationStats{ + Stats: make(map[string]*BucketReplicationStat, len(stats)), + } + var latestTotReplicatedSize int64 + for _, st := range u.ReplicationInfo { + latestTotReplicatedSize += int64(st.ReplicatedSize) + } + // normalize computed real time stats with latest usage stat + for arn, tgtstat := range stats { + st := BucketReplicationStat{} + bu, ok := u.ReplicationInfo[arn] + if !ok { + bu = BucketTargetUsageInfo{} + } + // use in memory replication stats if it is ahead of usage info. + st.ReplicatedSize = int64(bu.ReplicatedSize) + if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { + st.ReplicatedSize = tgtstat.ReplicatedSize + } + s.ReplicatedSize += st.ReplicatedSize + // Reset FailedSize and FailedCount to 0 for negative overflows which can + // happen since data usage picture can lag behind actual usage state at the time of cluster start + st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) + st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) + st.Latency = tgtstat.Latency + + s.Stats[arn] = &st + s.FailedSize += st.FailedSize + s.FailedCount += st.FailedCount + } + // normalize overall stats + s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize))) + s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) + return s +} diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 681b1b8fe..8aceb316d 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -1071,19 +1071,17 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { Children: v.Children, } if v.ReplicatedSize > 0 || v.ReplicaSize > 0 || v.ReplicationFailedSize > 0 || v.ReplicationPendingSize > 0 { - due.ReplicationStats = &replicationAllStats{ - Targets: make(map[string]replicationStats), - } - cfg, err := getReplicationConfig(GlobalContext, d.Info.Name) - if err != nil { - return err - } - due.ReplicationStats.ReplicaSize = v.ReplicaSize - - due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ - ReplicatedSize: v.ReplicatedSize, - FailedSize: v.ReplicationFailedSize, - PendingSize: v.ReplicationPendingSize, + cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name) + if cfg != nil && cfg.RoleArn != "" { + due.ReplicationStats = &replicationAllStats{ + Targets: make(map[string]replicationStats), + } + due.ReplicationStats.ReplicaSize = v.ReplicaSize + due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ + ReplicatedSize: v.ReplicatedSize, + FailedSize: v.ReplicationFailedSize, + PendingSize: v.ReplicationPendingSize, + } } } due.Compacted = len(due.Children) == 0 && k != d.Info.Name @@ -1115,21 +1113,20 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { empty := replicationStatsV1{} if v.ReplicationStats != empty { - due.ReplicationStats = &replicationAllStats{ - Targets: make(map[string]replicationStats), + cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name) + if cfg != nil && cfg.RoleArn != "" { + due.ReplicationStats = &replicationAllStats{ + Targets: make(map[string]replicationStats), + } + due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ + ReplicatedSize: v.ReplicationStats.ReplicatedSize, + FailedSize: v.ReplicationStats.FailedSize, + FailedCount: v.ReplicationStats.FailedCount, + PendingSize: v.ReplicationStats.PendingSize, + PendingCount: v.ReplicationStats.PendingCount, + } + due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize } - cfg, err := getReplicationConfig(GlobalContext, d.Info.Name) - if err != nil { - return err - } - due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ - ReplicatedSize: v.ReplicationStats.ReplicatedSize, - FailedSize: v.ReplicationStats.FailedSize, - FailedCount: v.ReplicationStats.FailedCount, - PendingSize: v.ReplicationStats.PendingSize, - PendingCount: v.ReplicationStats.PendingCount, - } - due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize } due.Compacted = len(due.Children) == 0 && k != d.Info.Name @@ -1158,7 +1155,6 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { d.Info = dold.Info d.Disks = dold.Disks d.Cache = make(map[string]dataUsageEntry, len(dold.Cache)) - var arn string for k, v := range dold.Cache { due := dataUsageEntry{ Size: v.Size, @@ -1167,21 +1163,14 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { Children: v.Children, } if v.ReplicationStats != nil && !v.ReplicationStats.Empty() { - if arn == "" { - cfg, err := getReplicationConfig(GlobalContext, d.Info.Name) - if err != nil { - return err + cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name) + if cfg != nil && cfg.RoleArn != "" { + due.ReplicationStats = &replicationAllStats{ + Targets: make(map[string]replicationStats), } d.Info.replication = replicationConfig{Config: cfg} - arn = d.Info.replication.Config.RoleArn - } - due.ReplicationStats = &replicationAllStats{ - Targets: make(map[string]replicationStats), - } - - if arn != "" { - due.ReplicationStats.Targets[arn] = replicationStats{ + due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ ReplicatedSize: v.ReplicationStats.ReplicatedSize, FailedSize: v.ReplicationStats.FailedSize, FailedCount: v.ReplicationStats.FailedCount, diff --git a/cmd/data-usage.go b/cmd/data-usage.go index c804d2841..48b64302e 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -128,17 +128,16 @@ func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsag for bucket, bui := range dataUsageInfo.BucketsUsage { if bui.ReplicatedSizeV1 > 0 || bui.ReplicationFailedCountV1 > 0 || bui.ReplicationFailedSizeV1 > 0 || bui.ReplicationPendingCountV1 > 0 { - dataUsageInfo.ReplicationInfo = make(map[string]BucketTargetUsageInfo) - cfg, err := getReplicationConfig(GlobalContext, bucket) - if err != nil { - return DataUsageInfo{}, err - } - dataUsageInfo.ReplicationInfo[cfg.RoleArn] = BucketTargetUsageInfo{ - ReplicationFailedSize: bui.ReplicationFailedSizeV1, - ReplicationFailedCount: bui.ReplicationFailedCountV1, - ReplicatedSize: bui.ReplicatedSizeV1, - ReplicationPendingCount: bui.ReplicationPendingCountV1, - ReplicationPendingSize: bui.ReplicationPendingSizeV1, + cfg, _ := getReplicationConfig(GlobalContext, bucket) + if cfg != nil && cfg.RoleArn != "" { + dataUsageInfo.ReplicationInfo = make(map[string]BucketTargetUsageInfo) + dataUsageInfo.ReplicationInfo[cfg.RoleArn] = BucketTargetUsageInfo{ + ReplicationFailedSize: bui.ReplicationFailedSizeV1, + ReplicationFailedCount: bui.ReplicationFailedCountV1, + ReplicatedSize: bui.ReplicatedSizeV1, + ReplicationPendingCount: bui.ReplicationPendingCountV1, + ReplicationPendingSize: bui.ReplicationPendingSizeV1, + } } } } diff --git a/cmd/metrics.go b/cmd/metrics.go index d7de59506..a919aa820 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -18,7 +18,6 @@ package cmd import ( - "math" "net/http" "strings" "sync/atomic" @@ -437,84 +436,6 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) { ) } -// get the most current of in-memory replication stats and data usage info from crawler. -func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { - bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) - // accumulate cluster bucket stats - stats := make(map[string]*BucketReplicationStat) - var totReplicaSize int64 - for _, bucketStat := range bucketStats { - totReplicaSize += bucketStat.ReplicationStats.ReplicaSize - for arn, stat := range bucketStat.ReplicationStats.Stats { - oldst := stats[arn] - if oldst == nil { - oldst = &BucketReplicationStat{} - } - stats[arn] = &BucketReplicationStat{ - FailedCount: stat.FailedCount + oldst.FailedCount, - FailedSize: stat.FailedSize + oldst.FailedSize, - ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, - Latency: stat.Latency.merge(oldst.Latency), - } - } - } - - // add initial usage stat to cluster stats - usageStat := globalReplicationStats.GetInitialUsage(bucket) - totReplicaSize += usageStat.ReplicaSize - if usageStat.Stats != nil { - for arn, stat := range usageStat.Stats { - st := stats[arn] - if st == nil { - st = &BucketReplicationStat{ - ReplicatedSize: stat.ReplicatedSize, - FailedSize: stat.FailedSize, - FailedCount: stat.FailedCount, - } - } else { - st.ReplicatedSize += stat.ReplicatedSize - st.FailedSize += stat.FailedSize - st.FailedCount += stat.FailedCount - } - stats[arn] = st - } - } - s = BucketReplicationStats{ - Stats: make(map[string]*BucketReplicationStat, len(stats)), - } - var latestTotReplicatedSize int64 - for _, st := range u.ReplicationInfo { - latestTotReplicatedSize += int64(st.ReplicatedSize) - } - // normalize computed real time stats with latest usage stat - for arn, tgtstat := range stats { - st := BucketReplicationStat{} - bu, ok := u.ReplicationInfo[arn] - if !ok { - bu = BucketTargetUsageInfo{} - } - // use in memory replication stats if it is ahead of usage info. - st.ReplicatedSize = int64(bu.ReplicatedSize) - if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { - st.ReplicatedSize = tgtstat.ReplicatedSize - } - s.ReplicatedSize += st.ReplicatedSize - // Reset FailedSize and FailedCount to 0 for negative overflows which can - // happen since data usage picture can lag behind actual usage state at the time of cluster start - st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) - st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) - st.Latency = tgtstat.Latency - - s.Stats[arn] = &st - s.FailedSize += st.FailedSize - s.FailedCount += st.FailedCount - } - // normalize overall stats - s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize))) - s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) - return s -} - // Populates prometheus with bucket usage metrics, this metrics // is only enabled if scanner is enabled. func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {