mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Add additional info for replication metrics API (#17293)
to track the replication transfer rate across different nodes, number of active workers in use and in-queue stats to get an idea of the current workload. This PR also adds replication metrics to the site replication status API. For site replication, prometheus metrics are no longer at the bucket level - but at the cluster level. Add prometheus metric to track credential errors since uptime
This commit is contained in:
@@ -19,12 +19,12 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
)
|
||||
|
||||
func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
||||
@@ -38,12 +38,93 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
||||
|
||||
// ReplicationStats holds the global in-memory replication stats
|
||||
type ReplicationStats struct {
|
||||
// map of site deployment ID to site replication status
|
||||
// for site replication - maintain stats at global level
|
||||
srStats *SRStats
|
||||
// active worker stats
|
||||
workers *ActiveWorkerStat
|
||||
// queue stats cache
|
||||
qCache queueCache
|
||||
// mrf backlog stats
|
||||
mrfStats ReplicationMRFStats
|
||||
// for bucket replication, continue to use existing cache
|
||||
Cache map[string]*BucketReplicationStats
|
||||
UsageCache map[string]*BucketReplicationStats
|
||||
mostRecentStats BucketStatsMap
|
||||
sync.RWMutex // mutex for Cache
|
||||
ulock sync.RWMutex // mutex for UsageCache
|
||||
mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
|
||||
registry metrics.Registry
|
||||
sync.RWMutex // mutex for Cache
|
||||
mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
|
||||
|
||||
wlock sync.RWMutex // mutex for active workers
|
||||
|
||||
movingAvgTicker *time.Ticker // Ticker for calculating moving averages
|
||||
wTimer *time.Ticker // ticker for calculating active workers
|
||||
qTimer *time.Ticker // ticker for calculating queue stats
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) trackEWMA() {
|
||||
for {
|
||||
select {
|
||||
case <-r.movingAvgTicker.C:
|
||||
r.updateMovingAvg()
|
||||
case <-GlobalContext.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) updateMovingAvg() {
|
||||
r.RLock()
|
||||
for _, s := range r.Cache {
|
||||
for _, st := range s.Stats {
|
||||
st.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
|
||||
st.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
|
||||
}
|
||||
}
|
||||
r.RUnlock()
|
||||
}
|
||||
|
||||
// ActiveWorkers returns worker stats
|
||||
func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat {
|
||||
r.wlock.RLock()
|
||||
defer r.wlock.RUnlock()
|
||||
w := r.workers.get()
|
||||
return ActiveWorkerStat{
|
||||
Curr: w.Curr,
|
||||
Max: w.Max,
|
||||
Avg: w.Avg,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) collectWorkerMetrics(ctx context.Context) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-r.wTimer.C:
|
||||
r.wlock.Lock()
|
||||
r.workers.update()
|
||||
r.wlock.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) collectQueueMetrics(ctx context.Context) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-r.qTimer.C:
|
||||
r.qCache.update()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete deletes in-memory replication statistics for a bucket.
|
||||
@@ -55,10 +136,6 @@ func (r *ReplicationStats) Delete(bucket string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
delete(r.Cache, bucket)
|
||||
|
||||
r.ulock.Lock()
|
||||
defer r.ulock.Unlock()
|
||||
delete(r.UsageCache, bucket)
|
||||
}
|
||||
|
||||
// UpdateReplicaStat updates in-memory replica statistics with new values.
|
||||
@@ -71,83 +148,130 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
|
||||
defer r.Unlock()
|
||||
bs, ok := r.Cache[bucket]
|
||||
if !ok {
|
||||
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
|
||||
bs = newBucketReplicationStats()
|
||||
}
|
||||
bs.ReplicaSize += n
|
||||
bs.ReplicaCount++
|
||||
r.Cache[bucket] = bs
|
||||
r.srUpdateReplicaStat(n)
|
||||
}
|
||||
|
||||
// Update updates in-memory replication statistics with new values.
|
||||
func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) {
|
||||
func (r *ReplicationStats) srUpdateReplicaStat(sz int64) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
atomic.AddInt64(&r.srStats.ReplicaSize, sz)
|
||||
atomic.AddInt64(&r.srStats.ReplicaCount, 1)
|
||||
}
|
||||
|
||||
bs, ok := r.Cache[bucket]
|
||||
if !ok {
|
||||
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
|
||||
r.Cache[bucket] = bs
|
||||
}
|
||||
b, ok := bs.Stats[arn]
|
||||
if !ok {
|
||||
b = &BucketReplicationStat{}
|
||||
bs.Stats[arn] = b
|
||||
}
|
||||
switch status {
|
||||
case replication.Pending:
|
||||
if opType.IsDataReplication() && prevStatus != status {
|
||||
b.PendingSize += n
|
||||
b.PendingCount++
|
||||
}
|
||||
case replication.Completed:
|
||||
switch prevStatus { // adjust counters based on previous state
|
||||
case replication.Pending:
|
||||
b.PendingCount--
|
||||
case replication.Failed:
|
||||
b.FailedCount--
|
||||
}
|
||||
if opType.IsDataReplication() {
|
||||
b.ReplicatedSize += n
|
||||
switch prevStatus {
|
||||
case replication.Pending:
|
||||
b.PendingSize -= n
|
||||
case replication.Failed:
|
||||
b.FailedSize -= n
|
||||
}
|
||||
if duration > 0 {
|
||||
b.Latency.update(n, duration)
|
||||
}
|
||||
}
|
||||
case replication.Failed:
|
||||
if opType.IsDataReplication() {
|
||||
if prevStatus == replication.Pending {
|
||||
b.FailedSize += n
|
||||
b.FailedCount++
|
||||
b.PendingSize -= n
|
||||
b.PendingCount--
|
||||
}
|
||||
}
|
||||
case replication.Replica:
|
||||
if opType == replication.ObjectReplicationType {
|
||||
b.ReplicaSize += n
|
||||
}
|
||||
func (r *ReplicationStats) srUpdate(sr replStat) {
|
||||
dID, err := globalSiteReplicationSys.getDeplIDForEndpoint(sr.endpoint())
|
||||
if err == nil {
|
||||
r.srStats.update(sr, dID)
|
||||
}
|
||||
}
|
||||
|
||||
// GetInitialUsage get replication metrics available at the time of cluster initialization
|
||||
func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats {
|
||||
// Update updates in-memory replication statistics with new values.
|
||||
func (r *ReplicationStats) Update(bucket string, ri replicatedTargetInfo, status, prevStatus replication.StatusType) {
|
||||
if r == nil {
|
||||
return BucketReplicationStats{}
|
||||
return
|
||||
}
|
||||
r.ulock.RLock()
|
||||
defer r.ulock.RUnlock()
|
||||
st, ok := r.UsageCache[bucket]
|
||||
var rs replStat
|
||||
switch status {
|
||||
case replication.Pending:
|
||||
if ri.OpType.IsDataReplication() && prevStatus != status {
|
||||
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
||||
}
|
||||
case replication.Completed:
|
||||
if ri.OpType.IsDataReplication() {
|
||||
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
||||
}
|
||||
case replication.Failed:
|
||||
if ri.OpType.IsDataReplication() && prevStatus == replication.Pending {
|
||||
rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
|
||||
}
|
||||
case replication.Replica:
|
||||
if ri.OpType == replication.ObjectReplicationType {
|
||||
rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, "", false, ri.Err)
|
||||
}
|
||||
}
|
||||
|
||||
// update site-replication in-memory stats
|
||||
if rs.Completed || rs.Failed {
|
||||
r.srUpdate(rs)
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
// update bucket replication in-memory stats
|
||||
bs, ok := r.Cache[bucket]
|
||||
if !ok {
|
||||
return BucketReplicationStats{}
|
||||
bs = newBucketReplicationStats()
|
||||
r.Cache[bucket] = bs
|
||||
}
|
||||
b, ok := bs.Stats[ri.Arn]
|
||||
if !ok {
|
||||
b = &BucketReplicationStat{
|
||||
XferRateLrg: newXferStats(),
|
||||
XferRateSml: newXferStats(),
|
||||
}
|
||||
bs.Stats[ri.Arn] = b
|
||||
}
|
||||
|
||||
switch {
|
||||
case rs.Completed:
|
||||
b.ReplicatedSize += rs.TransferSize
|
||||
b.ReplicatedCount++
|
||||
if rs.TransferDuration > 0 {
|
||||
b.Latency.update(rs.TransferSize, rs.TransferDuration)
|
||||
b.updateXferRate(rs.TransferSize, rs.TransferDuration)
|
||||
}
|
||||
case rs.Failed:
|
||||
b.FailStats.addsize(rs.TransferSize, rs.Err)
|
||||
case rs.Pending:
|
||||
}
|
||||
}
|
||||
|
||||
type replStat struct {
|
||||
Arn string
|
||||
Completed bool
|
||||
Pending bool
|
||||
Failed bool
|
||||
opType replication.Type
|
||||
// transfer size
|
||||
TransferSize int64
|
||||
// transfer duration
|
||||
TransferDuration time.Duration
|
||||
Endpoint string
|
||||
Secure bool
|
||||
Err error
|
||||
}
|
||||
|
||||
func (rs *replStat) endpoint() string {
|
||||
scheme := "http"
|
||||
if rs.Secure {
|
||||
scheme = "https"
|
||||
}
|
||||
return scheme + "://" + rs.Endpoint
|
||||
}
|
||||
|
||||
func (rs *replStat) set(arn string, n int64, duration time.Duration, status replication.StatusType, opType replication.Type, endpoint string, secure bool, err error) {
|
||||
rs.Endpoint = endpoint
|
||||
rs.Secure = secure
|
||||
rs.TransferSize = n
|
||||
rs.Arn = arn
|
||||
rs.TransferDuration = duration
|
||||
rs.opType = opType
|
||||
switch status {
|
||||
case replication.Completed:
|
||||
rs.Completed = true
|
||||
case replication.Pending:
|
||||
rs.Pending = true
|
||||
case replication.Failed:
|
||||
rs.Failed = true
|
||||
rs.Err = err
|
||||
}
|
||||
return st.Clone()
|
||||
}
|
||||
|
||||
// GetAll returns replication metrics for all buckets at once.
|
||||
@@ -157,16 +281,36 @@ func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
|
||||
}
|
||||
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
|
||||
for k, v := range r.Cache {
|
||||
bucketReplicationStats[k] = v.Clone()
|
||||
}
|
||||
r.RUnlock()
|
||||
for k, v := range bucketReplicationStats {
|
||||
v.QStat = r.qCache.getBucketStats(k)
|
||||
bucketReplicationStats[k] = v
|
||||
}
|
||||
|
||||
return bucketReplicationStats
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary {
|
||||
if r == nil {
|
||||
return SRMetricsSummary{}
|
||||
}
|
||||
|
||||
m := SRMetricsSummary{
|
||||
Uptime: UTCNow().Unix() - globalBootTime.Unix(),
|
||||
Queued: r.qCache.getSiteStats(),
|
||||
ActiveWorkers: r.ActiveWorkers(),
|
||||
Metrics: r.srStats.get(),
|
||||
ReplicaSize: atomic.LoadInt64(&r.srStats.ReplicaSize),
|
||||
ReplicaCount: atomic.LoadInt64(&r.srStats.ReplicaCount),
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Get replication metrics for a bucket from this node since this node came up.
|
||||
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
|
||||
if r == nil {
|
||||
@@ -178,99 +322,35 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
|
||||
|
||||
st, ok := r.Cache[bucket]
|
||||
if !ok {
|
||||
return BucketReplicationStats{}
|
||||
return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
|
||||
}
|
||||
return st.Clone()
|
||||
}
|
||||
|
||||
// NewReplicationStats initialize in-memory replication statistics
|
||||
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
|
||||
return &ReplicationStats{
|
||||
Cache: make(map[string]*BucketReplicationStats),
|
||||
UsageCache: make(map[string]*BucketReplicationStats),
|
||||
r := metrics.NewRegistry()
|
||||
rs := ReplicationStats{
|
||||
Cache: make(map[string]*BucketReplicationStats),
|
||||
qCache: newQueueCache(r),
|
||||
srStats: newSRStats(),
|
||||
movingAvgTicker: time.NewTicker(2 * time.Second),
|
||||
wTimer: time.NewTicker(2 * time.Second),
|
||||
qTimer: time.NewTicker(2 * time.Second),
|
||||
|
||||
workers: newActiveWorkerStat(r),
|
||||
registry: r,
|
||||
}
|
||||
go rs.collectWorkerMetrics(ctx)
|
||||
go rs.collectQueueMetrics(ctx)
|
||||
return &rs
|
||||
}
|
||||
|
||||
// load replication metrics at cluster start from latest replication stats saved in .minio.sys/buckets/replication/node-name.stats
|
||||
// fallback to replication stats in data usage to be backward compatible
|
||||
func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) {
|
||||
m := make(map[string]*BucketReplicationStats)
|
||||
if stats, err := globalReplicationPool.loadStatsFromDisk(); err == nil {
|
||||
for b, st := range stats {
|
||||
c := st.Clone()
|
||||
m[b] = &c
|
||||
}
|
||||
r.ulock.Lock()
|
||||
r.UsageCache = m
|
||||
r.ulock.Unlock()
|
||||
return
|
||||
}
|
||||
rTimer := time.NewTimer(time.Second * 5)
|
||||
defer rTimer.Stop()
|
||||
var (
|
||||
dui DataUsageInfo
|
||||
err error
|
||||
)
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-rTimer.C:
|
||||
dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn())
|
||||
// If LastUpdate is set, data usage is available.
|
||||
if err == nil {
|
||||
break outer
|
||||
}
|
||||
rTimer.Reset(time.Second * 5)
|
||||
}
|
||||
}
|
||||
for bucket, usage := range dui.BucketsUsage {
|
||||
b := &BucketReplicationStats{
|
||||
Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)),
|
||||
}
|
||||
for arn, uinfo := range usage.ReplicationInfo {
|
||||
b.Stats[arn] = &BucketReplicationStat{
|
||||
FailedSize: int64(uinfo.ReplicationFailedSize),
|
||||
ReplicatedSize: int64(uinfo.ReplicatedSize),
|
||||
ReplicaSize: int64(uinfo.ReplicaSize),
|
||||
FailedCount: int64(uinfo.ReplicationFailedCount),
|
||||
}
|
||||
}
|
||||
b.ReplicaSize += int64(usage.ReplicaSize)
|
||||
if b.hasReplicationUsage() {
|
||||
m[bucket] = b
|
||||
}
|
||||
}
|
||||
r.ulock.Lock()
|
||||
r.UsageCache = m
|
||||
r.ulock.Unlock()
|
||||
}
|
||||
|
||||
// serializeStats will serialize the current stats.
|
||||
// Will return (nil, nil) if no data.
|
||||
func (r *ReplicationStats) serializeStats() ([]byte, error) {
|
||||
if r == nil {
|
||||
return nil, nil
|
||||
}
|
||||
r.mostRecentStatsMu.Lock()
|
||||
defer r.mostRecentStatsMu.Unlock()
|
||||
if len(r.mostRecentStats.Stats) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
data := make([]byte, 4, 4+r.mostRecentStats.Msgsize())
|
||||
// Add the replication stats meta header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], replStatsMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], replStatsVersion)
|
||||
// Add data
|
||||
return r.mostRecentStats.MarshalMsg(data)
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
|
||||
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) {
|
||||
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
|
||||
bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage))
|
||||
bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage))
|
||||
|
||||
for bucket, u := range bucketsUsage {
|
||||
for bucket := range bucketsUsage {
|
||||
bucketStats := make([]BucketStats, len(peerBucketStatsList))
|
||||
for i, peerBucketStats := range peerBucketStatsList {
|
||||
bucketStat, ok := peerBucketStats.Stats[bucket]
|
||||
@@ -279,110 +359,126 @@ func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo)
|
||||
}
|
||||
bucketStats[i] = bucketStat
|
||||
}
|
||||
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, u, bucketStats)
|
||||
bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, bucketStats)
|
||||
}
|
||||
return bucketsReplicationStats
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) {
|
||||
func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucketStats []BucketStats) (bs BucketStats) {
|
||||
if r == nil {
|
||||
s = BucketReplicationStats{
|
||||
Stats: make(map[string]*BucketReplicationStat),
|
||||
bs = BucketStats{
|
||||
ReplicationStats: BucketReplicationStats{
|
||||
Stats: make(map[string]*BucketReplicationStat),
|
||||
},
|
||||
QueueStats: ReplicationQueueStats{},
|
||||
}
|
||||
return s
|
||||
return bs
|
||||
}
|
||||
|
||||
var s BucketReplicationStats
|
||||
// accumulate cluster bucket stats
|
||||
stats := make(map[string]*BucketReplicationStat)
|
||||
var totReplicaSize int64
|
||||
var (
|
||||
totReplicaSize, totReplicatedSize int64
|
||||
totReplicaCount, totReplicatedCount int64
|
||||
totFailed RTimedMetrics
|
||||
tq InQueueMetric
|
||||
)
|
||||
for _, bucketStat := range bucketStats {
|
||||
totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
|
||||
totReplicaCount += bucketStat.ReplicationStats.ReplicaCount
|
||||
for _, q := range bucketStat.QueueStats.Nodes {
|
||||
tq = tq.merge(q.QStats)
|
||||
}
|
||||
|
||||
for arn, stat := range bucketStat.ReplicationStats.Stats {
|
||||
oldst := stats[arn]
|
||||
if oldst == nil {
|
||||
oldst = &BucketReplicationStat{}
|
||||
oldst = &BucketReplicationStat{
|
||||
XferRateLrg: newXferStats(),
|
||||
XferRateSml: newXferStats(),
|
||||
}
|
||||
}
|
||||
fstats := stat.FailStats.merge(oldst.FailStats)
|
||||
lrg := oldst.XferRateLrg.merge(*stat.XferRateLrg)
|
||||
sml := oldst.XferRateSml.merge(*stat.XferRateSml)
|
||||
stats[arn] = &BucketReplicationStat{
|
||||
FailedCount: stat.FailedCount + oldst.FailedCount,
|
||||
FailedSize: stat.FailedSize + oldst.FailedSize,
|
||||
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
|
||||
Latency: stat.Latency.merge(oldst.Latency),
|
||||
PendingCount: stat.PendingCount + oldst.PendingCount,
|
||||
PendingSize: stat.PendingSize + oldst.PendingSize,
|
||||
Failed: fstats.toMetric(),
|
||||
FailStats: fstats,
|
||||
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
|
||||
ReplicatedCount: stat.ReplicatedCount + oldst.ReplicatedCount,
|
||||
Latency: stat.Latency.merge(oldst.Latency),
|
||||
XferRateLrg: &lrg,
|
||||
XferRateSml: &sml,
|
||||
}
|
||||
totReplicatedSize += stat.ReplicatedSize
|
||||
totReplicatedCount += stat.ReplicatedCount
|
||||
totFailed = totFailed.merge(stat.FailStats)
|
||||
}
|
||||
}
|
||||
|
||||
// add initial usage stat to cluster stats
|
||||
usageStat := globalReplicationStats.GetInitialUsage(bucket)
|
||||
|
||||
totReplicaSize += usageStat.ReplicaSize
|
||||
for arn, stat := range usageStat.Stats {
|
||||
st, ok := stats[arn]
|
||||
if !ok {
|
||||
st = &BucketReplicationStat{}
|
||||
stats[arn] = st
|
||||
}
|
||||
st.ReplicatedSize += stat.ReplicatedSize
|
||||
st.FailedSize += stat.FailedSize
|
||||
st.FailedCount += stat.FailedCount
|
||||
st.PendingSize += stat.PendingSize
|
||||
st.PendingCount += stat.PendingCount
|
||||
}
|
||||
|
||||
s = BucketReplicationStats{
|
||||
Stats: make(map[string]*BucketReplicationStat, len(stats)),
|
||||
Stats: stats,
|
||||
QStat: tq,
|
||||
ReplicaSize: totReplicaSize,
|
||||
ReplicaCount: totReplicaCount,
|
||||
ReplicatedSize: totReplicatedSize,
|
||||
ReplicatedCount: totReplicatedCount,
|
||||
Failed: totFailed.toMetric(),
|
||||
}
|
||||
|
||||
var latestTotReplicatedSize int64
|
||||
for _, st := range u.ReplicationInfo {
|
||||
latestTotReplicatedSize += int64(st.ReplicatedSize)
|
||||
var qs ReplicationQueueStats
|
||||
for _, bs := range bucketStats {
|
||||
qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...)
|
||||
}
|
||||
|
||||
// normalize computed real time stats with latest usage stat
|
||||
for arn, tgtstat := range stats {
|
||||
st := BucketReplicationStat{}
|
||||
bu, ok := u.ReplicationInfo[arn]
|
||||
if !ok {
|
||||
bu = BucketTargetUsageInfo{}
|
||||
}
|
||||
// use in memory replication stats if it is ahead of usage info.
|
||||
st.ReplicatedSize = int64(bu.ReplicatedSize)
|
||||
if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) {
|
||||
st.ReplicatedSize = tgtstat.ReplicatedSize
|
||||
}
|
||||
s.ReplicatedSize += st.ReplicatedSize
|
||||
// Reset FailedSize and FailedCount to 0 for negative overflows which can
|
||||
// happen since data usage picture can lag behind actual usage state at the time of cluster start
|
||||
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
|
||||
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
|
||||
st.PendingSize = int64(math.Max(float64(tgtstat.PendingSize), 0))
|
||||
st.PendingCount = int64(math.Max(float64(tgtstat.PendingCount), 0))
|
||||
st.Latency = tgtstat.Latency
|
||||
|
||||
s.Stats[arn] = &st
|
||||
s.FailedSize += st.FailedSize
|
||||
s.FailedCount += st.FailedCount
|
||||
s.PendingCount += st.PendingCount
|
||||
s.PendingSize += st.PendingSize
|
||||
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
|
||||
bs = BucketStats{
|
||||
ReplicationStats: s,
|
||||
QueueStats: qs,
|
||||
}
|
||||
// normalize overall stats
|
||||
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
|
||||
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
|
||||
r.mostRecentStatsMu.Lock()
|
||||
if len(r.mostRecentStats.Stats) == 0 {
|
||||
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
|
||||
}
|
||||
if len(s.Stats) > 0 {
|
||||
r.mostRecentStats.Stats[bucket] = BucketStats{ReplicationStats: s}
|
||||
if len(bs.ReplicationStats.Stats) > 0 {
|
||||
r.mostRecentStats.Stats[bucket] = bs
|
||||
}
|
||||
r.mostRecentStats.Timestamp = UTCNow()
|
||||
r.mostRecentStatsMu.Unlock()
|
||||
return s
|
||||
return bs
|
||||
}
|
||||
|
||||
// get the most current of in-memory replication stats and data usage info from crawler.
|
||||
func (r *ReplicationStats) getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
|
||||
func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) {
|
||||
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
|
||||
return r.calculateBucketReplicationStats(bucket, u, bucketStats)
|
||||
return r.calculateBucketReplicationStats(bucket, bucketStats)
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) incQ(bucket string, sz int64, isDeleleRepl bool, opType replication.Type) {
|
||||
r.qCache.Lock()
|
||||
defer r.qCache.Unlock()
|
||||
v, ok := r.qCache.bucketStats[bucket]
|
||||
if !ok {
|
||||
v = newInQueueStats(r.registry, bucket)
|
||||
}
|
||||
atomic.AddInt64(&v.nowBytes, sz)
|
||||
atomic.AddInt64(&v.nowCount, 1)
|
||||
r.qCache.bucketStats[bucket] = v
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, sz)
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, 1)
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opType replication.Type) {
|
||||
r.qCache.Lock()
|
||||
defer r.qCache.Unlock()
|
||||
v, ok := r.qCache.bucketStats[bucket]
|
||||
if !ok {
|
||||
v = newInQueueStats(r.registry, bucket)
|
||||
}
|
||||
atomic.AddInt64(&v.nowBytes, -1*sz)
|
||||
atomic.AddInt64(&v.nowCount, -1)
|
||||
r.qCache.bucketStats[bucket] = v
|
||||
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz)
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user