fix: allow compaction on replicated buckets (#13711)

currently getReplicationConfig() failure incorrectly
returns error on unexpected buckets upon upgrade, we
should always calculate usage as much as possible.
This commit is contained in:
Harshavardhana 2021-11-19 14:46:14 -08:00 committed by GitHub
parent 40244994ad
commit 914bfb2d9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 134 deletions

View File

@ -1721,13 +1721,13 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
usageInfo = dataUsageInfo.BucketsUsage[bucket] usageInfo = dataUsageInfo.BucketsUsage[bucket]
} }
bucketReplStats := getLatestReplicationStats(bucket, usageInfo) w.Header().Set(xhttp.ContentType, string(mimeJSON))
jsonData, err := json.Marshal(bucketReplStats)
if err != nil { enc := json.NewEncoder(w)
if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
} }
writeSuccessResponseJSON(w, jsonData)
} }
// ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which // ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math"
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
@ -1770,3 +1771,81 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim
rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate) rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
return return
} }
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{}
}
stats[arn] = &BucketReplicationStat{
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
if usageStat.Stats != nil {
for arn, stat := range usageStat.Stats {
st := stats[arn]
if st == nil {
st = &BucketReplicationStat{
ReplicatedSize: stat.ReplicatedSize,
FailedSize: stat.FailedSize,
FailedCount: stat.FailedCount,
}
} else {
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
}
stats[arn] = st
}
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat
for arn, tgtstat := range stats {
st := BucketReplicationStat{}
bu, ok := u.ReplicationInfo[arn]
if !ok {
bu = BucketTargetUsageInfo{}
}
// use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = int64(bu.ReplicatedSize)
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
st.ReplicatedSize = tgtstat.ReplicatedSize
}
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
}
// normalize overall stats
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
return s
}

View File

@ -1071,19 +1071,17 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
Children: v.Children, Children: v.Children,
} }
if v.ReplicatedSize > 0 || v.ReplicaSize > 0 || v.ReplicationFailedSize > 0 || v.ReplicationPendingSize > 0 { if v.ReplicatedSize > 0 || v.ReplicaSize > 0 || v.ReplicationFailedSize > 0 || v.ReplicationPendingSize > 0 {
due.ReplicationStats = &replicationAllStats{ cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
Targets: make(map[string]replicationStats), if cfg != nil && cfg.RoleArn != "" {
} due.ReplicationStats = &replicationAllStats{
cfg, err := getReplicationConfig(GlobalContext, d.Info.Name) Targets: make(map[string]replicationStats),
if err != nil { }
return err due.ReplicationStats.ReplicaSize = v.ReplicaSize
} due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
due.ReplicationStats.ReplicaSize = v.ReplicaSize ReplicatedSize: v.ReplicatedSize,
FailedSize: v.ReplicationFailedSize,
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{ PendingSize: v.ReplicationPendingSize,
ReplicatedSize: v.ReplicatedSize, }
FailedSize: v.ReplicationFailedSize,
PendingSize: v.ReplicationPendingSize,
} }
} }
due.Compacted = len(due.Children) == 0 && k != d.Info.Name due.Compacted = len(due.Children) == 0 && k != d.Info.Name
@ -1115,21 +1113,20 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
empty := replicationStatsV1{} empty := replicationStatsV1{}
if v.ReplicationStats != empty { if v.ReplicationStats != empty {
due.ReplicationStats = &replicationAllStats{ cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
Targets: make(map[string]replicationStats), if cfg != nil && cfg.RoleArn != "" {
due.ReplicationStats = &replicationAllStats{
Targets: make(map[string]replicationStats),
}
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
}
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
} }
cfg, err := getReplicationConfig(GlobalContext, d.Info.Name)
if err != nil {
return err
}
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
}
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
} }
due.Compacted = len(due.Children) == 0 && k != d.Info.Name due.Compacted = len(due.Children) == 0 && k != d.Info.Name
@ -1158,7 +1155,6 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
d.Info = dold.Info d.Info = dold.Info
d.Disks = dold.Disks d.Disks = dold.Disks
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache)) d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
var arn string
for k, v := range dold.Cache { for k, v := range dold.Cache {
due := dataUsageEntry{ due := dataUsageEntry{
Size: v.Size, Size: v.Size,
@ -1167,21 +1163,14 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
Children: v.Children, Children: v.Children,
} }
if v.ReplicationStats != nil && !v.ReplicationStats.Empty() { if v.ReplicationStats != nil && !v.ReplicationStats.Empty() {
if arn == "" { cfg, _ := getReplicationConfig(GlobalContext, d.Info.Name)
cfg, err := getReplicationConfig(GlobalContext, d.Info.Name) if cfg != nil && cfg.RoleArn != "" {
if err != nil { due.ReplicationStats = &replicationAllStats{
return err Targets: make(map[string]replicationStats),
} }
d.Info.replication = replicationConfig{Config: cfg} d.Info.replication = replicationConfig{Config: cfg}
arn = d.Info.replication.Config.RoleArn
}
due.ReplicationStats = &replicationAllStats{ due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
Targets: make(map[string]replicationStats),
}
if arn != "" {
due.ReplicationStats.Targets[arn] = replicationStats{
ReplicatedSize: v.ReplicationStats.ReplicatedSize, ReplicatedSize: v.ReplicationStats.ReplicatedSize,
FailedSize: v.ReplicationStats.FailedSize, FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount, FailedCount: v.ReplicationStats.FailedCount,

View File

@ -128,17 +128,16 @@ func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsag
for bucket, bui := range dataUsageInfo.BucketsUsage { for bucket, bui := range dataUsageInfo.BucketsUsage {
if bui.ReplicatedSizeV1 > 0 || bui.ReplicationFailedCountV1 > 0 || if bui.ReplicatedSizeV1 > 0 || bui.ReplicationFailedCountV1 > 0 ||
bui.ReplicationFailedSizeV1 > 0 || bui.ReplicationPendingCountV1 > 0 { bui.ReplicationFailedSizeV1 > 0 || bui.ReplicationPendingCountV1 > 0 {
dataUsageInfo.ReplicationInfo = make(map[string]BucketTargetUsageInfo) cfg, _ := getReplicationConfig(GlobalContext, bucket)
cfg, err := getReplicationConfig(GlobalContext, bucket) if cfg != nil && cfg.RoleArn != "" {
if err != nil { dataUsageInfo.ReplicationInfo = make(map[string]BucketTargetUsageInfo)
return DataUsageInfo{}, err dataUsageInfo.ReplicationInfo[cfg.RoleArn] = BucketTargetUsageInfo{
} ReplicationFailedSize: bui.ReplicationFailedSizeV1,
dataUsageInfo.ReplicationInfo[cfg.RoleArn] = BucketTargetUsageInfo{ ReplicationFailedCount: bui.ReplicationFailedCountV1,
ReplicationFailedSize: bui.ReplicationFailedSizeV1, ReplicatedSize: bui.ReplicatedSizeV1,
ReplicationFailedCount: bui.ReplicationFailedCountV1, ReplicationPendingCount: bui.ReplicationPendingCountV1,
ReplicatedSize: bui.ReplicatedSizeV1, ReplicationPendingSize: bui.ReplicationPendingSizeV1,
ReplicationPendingCount: bui.ReplicationPendingCountV1, }
ReplicationPendingSize: bui.ReplicationPendingSizeV1,
} }
} }
} }

View File

@ -18,7 +18,6 @@
package cmd package cmd
import ( import (
"math"
"net/http" "net/http"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -437,84 +436,6 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
) )
} }
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats {
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst := stats[arn]
if oldst == nil {
oldst = &BucketReplicationStat{}
}
stats[arn] = &BucketReplicationStat{
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
}
}
}
// add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket)
totReplicaSize += usageStat.ReplicaSize
if usageStat.Stats != nil {
for arn, stat := range usageStat.Stats {
st := stats[arn]
if st == nil {
st = &BucketReplicationStat{
ReplicatedSize: stat.ReplicatedSize,
FailedSize: stat.FailedSize,
FailedCount: stat.FailedCount,
}
} else {
st.ReplicatedSize += stat.ReplicatedSize
st.FailedSize += stat.FailedSize
st.FailedCount += stat.FailedCount
}
stats[arn] = st
}
}
s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat
for arn, tgtstat := range stats {
st := BucketReplicationStat{}
bu, ok := u.ReplicationInfo[arn]
if !ok {
bu = BucketTargetUsageInfo{}
}
// use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = int64(bu.ReplicatedSize)
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
st.ReplicatedSize = tgtstat.ReplicatedSize
}
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
}
// normalize overall stats
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
return s
}
// Populates prometheus with bucket usage metrics, this metrics // Populates prometheus with bucket usage metrics, this metrics
// is only enabled if scanner is enabled. // is only enabled if scanner is enabled.
func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {