replication: Simplify metrics calculation (#13274)

Also doing some code cleanup
This commit is contained in:
Poorna Krishnamoorthy 2021-09-22 13:48:45 -04:00 committed by GitHub
parent 46724508f8
commit 19ecdc75a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 54 deletions

View File

@ -893,10 +893,6 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}(i, tgt) }(i, tgt)
} }
wg.Wait() wg.Wait()
// metadata update once..
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
// FIXME: add support for missing replication events // FIXME: add support for missing replication events
// - event.ObjectReplicationMissedThreshold // - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold // - event.ObjectReplicationReplicatedAfterThreshold
@ -962,7 +958,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// replicateObjectToTarget replicates the specified version of the object to destination bucket // replicateObjectToTarget replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status. // The source object is then updated to reflect the replication status.
func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
objInfo := ri.ObjectInfo objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket bucket := objInfo.Bucket
object := objInfo.Name object := objInfo.Name
var ( var (

View File

@ -51,29 +51,6 @@ func (brs *BucketReplicationStats) Empty() bool {
return len(brs.Stats) == 0 && brs.ReplicaSize == 0 return len(brs.Stats) == 0 && brs.ReplicaSize == 0
} }
// UpdateStat updates replication stats for the target arn
func (brs *BucketReplicationStats) UpdateStat(arn string, stat *BucketReplicationStat) {
var s BucketReplicationStat
if st, ok := brs.Stats[arn]; ok {
s = *st
}
// update target metric
atomic.AddInt64(&s.FailedSize, stat.FailedSize)
atomic.AddInt64(&s.FailedCount, stat.FailedCount)
atomic.AddInt64(&s.PendingCount, stat.PendingCount)
atomic.AddInt64(&s.PendingSize, stat.PendingSize)
atomic.AddInt64(&s.ReplicaSize, stat.ReplicaSize)
atomic.AddInt64(&s.ReplicatedSize, stat.ReplicatedSize)
// update total counts across targets
atomic.AddInt64(&brs.FailedSize, stat.FailedSize)
atomic.AddInt64(&brs.FailedCount, stat.FailedCount)
atomic.AddInt64(&brs.PendingCount, stat.PendingCount)
atomic.AddInt64(&brs.PendingSize, stat.PendingSize)
atomic.AddInt64(&brs.ReplicaSize, stat.ReplicaSize)
atomic.AddInt64(&brs.ReplicatedSize, stat.ReplicatedSize)
brs.Stats[arn] = &s
}
// Clone creates a new BucketReplicationStats copy // Clone creates a new BucketReplicationStats copy
func (brs BucketReplicationStats) Clone() BucketReplicationStats { func (brs BucketReplicationStats) Clone() BucketReplicationStats {
c := BucketReplicationStats{ c := BucketReplicationStats{

View File

@ -1126,29 +1126,25 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
case replication.Pending: case replication.Pending:
tgtSizeS.pendingCount++ tgtSizeS.pendingCount++
tgtSizeS.pendingSize += oi.Size tgtSizeS.pendingSize += oi.Size
sizeS.pendingCount++
sizeS.pendingSize += oi.Size
case replication.Failed: case replication.Failed:
tgtSizeS.failedSize += oi.Size tgtSizeS.failedSize += oi.Size
tgtSizeS.failedCount++ tgtSizeS.failedCount++
sizeS.failedSize += oi.Size
sizeS.failedCount++
case replication.Completed, "COMPLETE": case replication.Completed, "COMPLETE":
tgtSizeS.replicatedSize += oi.Size tgtSizeS.replicatedSize += oi.Size
sizeS.replicatedSize += oi.Size
} }
sizeS.replTargetStats[arn] = tgtSizeS sizeS.replTargetStats[arn] = tgtSizeS
} }
} }
switch oi.ReplicationStatus { switch oi.ReplicationStatus {
case replication.Pending: case replication.Pending, replication.Failed:
sizeS.pendingCount++
sizeS.pendingSize += oi.Size
globalReplicationPool.queueReplicaTask(roi) globalReplicationPool.queueReplicaTask(roi)
return return
case replication.Failed:
sizeS.failedSize += oi.Size
sizeS.failedCount++
globalReplicationPool.queueReplicaTask(roi)
return
case replication.Completed, "COMPLETE":
sizeS.replicatedSize += oi.Size
case replication.Replica: case replication.Replica:
sizeS.replicaSize += oi.Size sizeS.replicaSize += oi.Size
} }

View File

@ -1003,6 +1003,7 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
FailedSize: v.ReplicationStats.FailedSize, FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount, FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize, PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
} }
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
} }
@ -1061,6 +1062,7 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
FailedSize: v.ReplicationStats.FailedSize, FailedSize: v.ReplicationStats.FailedSize,
FailedCount: v.ReplicationStats.FailedCount, FailedCount: v.ReplicationStats.FailedCount,
PendingSize: v.ReplicationStats.PendingSize, PendingSize: v.ReplicationStats.PendingSize,
PendingCount: v.ReplicationStats.PendingCount,
} }
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
} }

View File

@ -435,13 +435,12 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
// get the most current of in-memory replication stats and data usage info from crawler. // get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
s = BucketReplicationStats{}
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
// accumulate cluster bucket stats // accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat) stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
for _, bucketStat := range bucketStats { for _, bucketStat := range bucketStats {
s.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
for arn, stat := range bucketStat.ReplicationStats.Stats { for arn, stat := range bucketStat.ReplicationStats.Stats {
oldst, ok := stats[arn] oldst, ok := stats[arn]
if !ok { if !ok {
@ -452,16 +451,13 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
FailedSize: stat.FailedSize + oldst.FailedSize, FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
} }
s.FailedCount += stats[arn].FailedCount
s.FailedSize += stats[arn].FailedSize
s.ReplicatedSize += stats[arn].ReplicatedSize
} }
} }
s.Stats = make(map[string]*BucketReplicationStat, len(stats))
// add initial usage stat to cluster stats // add initial usage stat to cluster stats
usageStat := globalReplicationStats.GetInitialUsage(bucket) usageStat := globalReplicationStats.GetInitialUsage(bucket)
if usageStat.Stats != nil { if usageStat.Stats != nil {
totReplicaSize += usageStat.ReplicaSize
for arn, stat := range usageStat.Stats { for arn, stat := range usageStat.Stats {
st, ok := stats[arn] st, ok := stats[arn]
if !ok { if !ok {
@ -478,34 +474,37 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
stats[arn] = st stats[arn] = st
} }
} }
s.ReplicaSize += usageStat.ReplicaSize s = BucketReplicationStats{
Stats: make(map[string]*BucketReplicationStat, len(stats)),
}
var latestTotReplicatedSize int64
for _, st := range u.ReplicationInfo {
latestTotReplicatedSize += int64(st.ReplicatedSize)
}
// normalize computed real time stats with latest usage stat // normalize computed real time stats with latest usage stat
var usgReplicatedSize int64
for arn, tgtstat := range stats { for arn, tgtstat := range stats {
st := BucketReplicationStat{} st := BucketReplicationStat{}
bu, ok := usageStat.Stats[arn] bu, ok := usageStat.Stats[arn]
if !ok { if !ok {
bu = &BucketReplicationStat{} bu = &BucketReplicationStat{}
} }
usgReplicatedSize += bu.ReplicatedSize
// use in memory replication stats if it is ahead of usage info. // use in memory replication stats if it is ahead of usage info.
st.ReplicatedSize = bu.ReplicatedSize st.ReplicatedSize = bu.ReplicatedSize
if tgtstat.ReplicatedSize >= bu.ReplicatedSize { if tgtstat.ReplicatedSize >= bu.ReplicatedSize {
st.ReplicatedSize = tgtstat.ReplicatedSize st.ReplicatedSize = tgtstat.ReplicatedSize
} }
s.ReplicatedSize += st.ReplicatedSize
// Reset FailedSize and FailedCount to 0 for negative overflows which can // Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start // happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.ReplicaSize = int64(math.Max(float64(tgtstat.ReplicaSize), float64(u.ReplicaSize)))
s.Stats[arn] = &st s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount
} }
// normalize overall stats // normalize overall stats
s.FailedSize = int64(math.Max(float64(s.FailedSize), 0)) s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
s.FailedCount = int64(math.Max(float64(s.FailedCount), 0)) s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
s.ReplicaSize = int64(math.Max(float64(s.ReplicaSize), float64(u.ReplicaSize)))
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(usgReplicatedSize)))
return s return s
} }