From 6b9fd256e1f6eee7c6f97a87b240da3e3e0e3eb5 Mon Sep 17 00:00:00 2001 From: Poorna Date: Mon, 12 Sep 2022 12:40:02 -0700 Subject: [PATCH] Persist in-memory replication stats to disk (#15594) to avoid relying on scanner-calculated replication metrics. This will improve the accuracy of the replication stats reported. This PR also adds on to #15556 by handing replication traffic that could not be queued by available workers to the MRF queue so that entries in `PENDING` status are healed faster. --- cmd/bucket-replication-handlers.go | 2 +- cmd/bucket-replication-stats.go | 167 +++++++++++-- cmd/bucket-replication-utils.go | 11 + cmd/bucket-replication.go | 193 +++++++------- cmd/bucket-stats.go | 14 +- cmd/bucket-stats_gen.go | 277 +++++++++++++-------- cmd/metrics-v2.go | 2 +- cmd/metrics.go | 2 +- cmd/notification.go | 9 +- cmd/peer-rest-client.go | 2 +- cmd/peer-rest-common.go | 2 +- cmd/peer-rest-server.go | 3 +- cmd/signals.go | 2 + internal/bucket/replication/replication.go | 9 + 14 files changed, 476 insertions(+), 219 deletions(-) diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index c0bc1ac24..68d96f641 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -217,7 +217,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW w.Header().Set(xhttp.ContentType, string(mimeJSON)) enc := json.NewEncoder(w) - if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil { + if err = enc.Encode(globalReplicationStats.getLatestReplicationStats(bucket, usageInfo)); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 5cb65d695..5441166e6 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "math" "sync" "time" @@ -36,10 +37,12 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool { // ReplicationStats holds the global in-memory replication stats type ReplicationStats struct { - Cache map[string]*BucketReplicationStats - UsageCache map[string]*BucketReplicationStats - sync.RWMutex - ulock sync.RWMutex + Cache map[string]*BucketReplicationStats + UsageCache map[string]*BucketReplicationStats + mostRecentStats BucketStatsMap + sync.RWMutex // mutex for Cache + ulock sync.RWMutex // mutex for UsageCache + dlock sync.RWMutex // mutex for mostRecentStats } // Delete deletes in-memory replication statistics for a bucket. @@ -92,14 +95,23 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t bs.Stats[arn] = b } switch status { + case replication.Pending: + if opType.IsDataReplication() && prevStatus != status { + b.PendingSize += n + b.PendingCount++ + } case replication.Completed: switch prevStatus { // adjust counters based on previous state + case replication.Pending: + b.PendingCount-- case replication.Failed: b.FailedCount-- } - if opType == replication.ObjectReplicationType { + if opType.IsDataReplication() { b.ReplicatedSize += n switch prevStatus { + case replication.Pending: + b.PendingSize -= n case replication.Failed: b.FailedSize -= n } @@ -108,10 +120,12 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration t } } case replication.Failed: - if opType == replication.ObjectReplicationType { + if opType.IsDataReplication() { if prevStatus == replication.Pending { b.FailedSize += n b.FailedCount++ + b.PendingSize -= n + b.PendingCount-- } } case replication.Replica: @@ -176,9 +190,20 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio } } -// load replication metrics at cluster start from initial data usage +// load replication metrics at cluster start from latest replication stats saved in .minio.sys/buckets/replication/node-name.stats +// fallback to replication stats in data usage to be backward compatible func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) { - rTimer := time.NewTimer(time.Minute) + m := make(map[string]*BucketReplicationStats) + if stats, err := globalReplicationPool.loadStatsFromDisk(); err == nil { + for b, st := range stats { + m[b] = &st + } + r.ulock.Lock() + r.UsageCache = m + r.ulock.Unlock() + return + } + rTimer := time.NewTimer(time.Second * 5) defer rTimer.Stop() var ( dui DataUsageInfo @@ -192,15 +217,12 @@ outer: case <-rTimer.C: dui, err = loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) // If LastUpdate is set, data usage is available. - if err == nil && !dui.LastUpdate.IsZero() { + if err == nil { break outer } - - rTimer.Reset(time.Minute) + rTimer.Reset(time.Second * 5) } } - - m := make(map[string]*BucketReplicationStats) for bucket, usage := range dui.BucketsUsage { b := &BucketReplicationStats{ Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), @@ -219,6 +241,123 @@ outer: } } r.ulock.Lock() - defer r.ulock.Unlock() r.UsageCache = m + r.ulock.Unlock() +} + +func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap { + r.dlock.RLock() + defer r.dlock.RUnlock() + return r.mostRecentStats +} + +func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) { + peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) + bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage)) + + for bucket, u := range bucketsUsage { + bucketStats := make([]BucketStats, len(peerBucketStatsList)) + for i, peerBucketStats := range peerBucketStatsList { + bucketStat, ok := peerBucketStats.Stats[bucket] + if !ok { + continue + } + bucketStats[i] = bucketStat + } + bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, u, bucketStats) + } + return bucketsReplicationStats +} + +func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) { + // accumulate cluster bucket stats + stats := make(map[string]*BucketReplicationStat) + var totReplicaSize int64 + for _, bucketStat := range bucketStats { + totReplicaSize += bucketStat.ReplicationStats.ReplicaSize + for arn, stat := range bucketStat.ReplicationStats.Stats { + oldst := stats[arn] + if oldst == nil { + oldst = &BucketReplicationStat{} + } + stats[arn] = &BucketReplicationStat{ + FailedCount: stat.FailedCount + oldst.FailedCount, + FailedSize: stat.FailedSize + oldst.FailedSize, + ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, + Latency: stat.Latency.merge(oldst.Latency), + PendingCount: stat.PendingCount + oldst.PendingCount, + PendingSize: stat.PendingSize + oldst.PendingSize, + } + } + } + + // add initial usage stat to cluster stats + usageStat := globalReplicationStats.GetInitialUsage(bucket) + + totReplicaSize += usageStat.ReplicaSize + for arn, stat := range usageStat.Stats { + st, ok := stats[arn] + if !ok { + st = &BucketReplicationStat{} + stats[arn] = st + } + st.ReplicatedSize += stat.ReplicatedSize + st.FailedSize += stat.FailedSize + st.FailedCount += stat.FailedCount + st.PendingSize += stat.PendingSize + st.PendingCount += stat.PendingCount + } + + s = BucketReplicationStats{ + Stats: make(map[string]*BucketReplicationStat, len(stats)), + } + var latestTotReplicatedSize int64 + for _, st := range u.ReplicationInfo { + latestTotReplicatedSize += int64(st.ReplicatedSize) + } + + // normalize computed real time stats with latest usage stat + for arn, tgtstat := range stats { + st := BucketReplicationStat{} + bu, ok := u.ReplicationInfo[arn] + if !ok { + bu = BucketTargetUsageInfo{} + } + // use in memory replication stats if it is ahead of usage info. + st.ReplicatedSize = int64(bu.ReplicatedSize) + if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { + st.ReplicatedSize = tgtstat.ReplicatedSize + } + s.ReplicatedSize += st.ReplicatedSize + // Reset FailedSize and FailedCount to 0 for negative overflows which can + // happen since data usage picture can lag behind actual usage state at the time of cluster start + st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) + st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) + st.PendingSize = int64(math.Max(float64(tgtstat.PendingSize), 0)) + st.PendingCount = int64(math.Max(float64(tgtstat.PendingCount), 0)) + st.Latency = tgtstat.Latency + + s.Stats[arn] = &st + s.FailedSize += st.FailedSize + s.FailedCount += st.FailedCount + s.PendingCount += st.PendingCount + s.PendingSize += st.PendingSize + } + // normalize overall stats + s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize))) + s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) + r.dlock.Lock() + if len(r.mostRecentStats.Stats) == 0 { + r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()} + } + r.mostRecentStats.Stats[bucket] = BucketStats{ReplicationStats: s} + r.mostRecentStats.Timestamp = UTCNow() + r.dlock.Unlock() + return s +} + +// get the most current of in-memory replication stats and data usage info from crawler. +func (r *ReplicationStats) getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { + bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) + return r.calculateBucketReplicationStats(bucket, u, bucketStats) } diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 975f7efa3..55b58dc2a 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -766,3 +766,14 @@ func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry { versionID: ri.VersionID, } } + +func getReplicationStatsPath(nodeName string) string { + return bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + nodeName + ".stats" +} + +const ( + replStatsMetaFormat = 1 + replStatsVersionV1 = 1 + replStatsVersion = replStatsVersionV1 + replStatsSaveInterval = time.Minute * 5 +) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 1b08144f4..578ba97d9 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "io" - "math" "net/http" "path" "reflect" @@ -1369,6 +1368,7 @@ type ReplicationPool struct { existingReplicaCh chan ReplicateObjectInfo existingReplicaDeleteCh chan DeletedObjectReplicationInfo mrfSaveCh chan MRFReplicateEntry + saveStateCh chan struct{} workerSize int mrfWorkerSize int @@ -1391,6 +1391,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)}, mrfSaveCh: make(chan MRFReplicateEntry, 100000), + saveStateCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, } @@ -1401,6 +1402,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool go pool.updateResyncStatus(ctx, o) go pool.processMRF() go pool.persistMRF() + go pool.saveStatsToDisk() return pool } @@ -1526,10 +1528,12 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { close(p.replicaCh) close(p.mrfReplicaCh) close(p.existingReplicaCh) + close(p.saveStateCh) }) case healCh <- ri: case ch <- ri: default: + globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) } } @@ -1567,6 +1571,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf }) case ch <- doi: default: + globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing number of replicate workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) } } @@ -1886,100 +1891,6 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim return } -func getAllLatestReplicationStats(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) { - peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) - bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage)) - - for bucket, u := range bucketsUsage { - bucketStats := make([]BucketStats, len(peerBucketStatsList)) - for i, peerBucketStats := range peerBucketStatsList { - bucketStat, ok := peerBucketStats[bucket] - if !ok { - continue - } - bucketStats[i] = bucketStat - } - bucketsReplicationStats[bucket] = calculateBucketReplicationStats(bucket, u, bucketStats) - } - - return bucketsReplicationStats -} - -func calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) { - // accumulate cluster bucket stats - stats := make(map[string]*BucketReplicationStat) - var totReplicaSize int64 - for _, bucketStat := range bucketStats { - totReplicaSize += bucketStat.ReplicationStats.ReplicaSize - for arn, stat := range bucketStat.ReplicationStats.Stats { - oldst := stats[arn] - if oldst == nil { - oldst = &BucketReplicationStat{} - } - stats[arn] = &BucketReplicationStat{ - FailedCount: stat.FailedCount + oldst.FailedCount, - FailedSize: stat.FailedSize + oldst.FailedSize, - ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, - Latency: stat.Latency.merge(oldst.Latency), - } - } - } - - // add initial usage stat to cluster stats - usageStat := globalReplicationStats.GetInitialUsage(bucket) - totReplicaSize += usageStat.ReplicaSize - for arn, stat := range usageStat.Stats { - st, ok := stats[arn] - if !ok { - st = &BucketReplicationStat{} - stats[arn] = st - } - st.ReplicatedSize += stat.ReplicatedSize - st.FailedSize += stat.FailedSize - st.FailedCount += stat.FailedCount - } - s = BucketReplicationStats{ - Stats: make(map[string]*BucketReplicationStat, len(stats)), - } - var latestTotReplicatedSize int64 - for _, st := range u.ReplicationInfo { - latestTotReplicatedSize += int64(st.ReplicatedSize) - } - // normalize computed real time stats with latest usage stat - for arn, tgtstat := range stats { - st := BucketReplicationStat{} - bu, ok := u.ReplicationInfo[arn] - if !ok { - bu = BucketTargetUsageInfo{} - } - // use in memory replication stats if it is ahead of usage info. - st.ReplicatedSize = int64(bu.ReplicatedSize) - if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { - st.ReplicatedSize = tgtstat.ReplicatedSize - } - s.ReplicatedSize += st.ReplicatedSize - // Reset FailedSize and FailedCount to 0 for negative overflows which can - // happen since data usage picture can lag behind actual usage state at the time of cluster start - st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) - st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) - st.Latency = tgtstat.Latency - - s.Stats[arn] = &st - s.FailedSize += st.FailedSize - s.FailedCount += st.FailedCount - } - // normalize overall stats - s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize))) - s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) - return s -} - -// get the most current of in-memory replication stats and data usage info from crawler. -func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { - bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) - return calculateBucketReplicationStats(bucket, u, bucketStats) -} - const resyncTimeInterval = time.Minute * 1 // updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals @@ -2520,7 +2431,6 @@ func (p *ReplicationPool) persistMRF() { logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) } entries = make(map[string]MRFReplicateEntry) - return } for { select { @@ -2531,6 +2441,9 @@ func (p *ReplicationPool) persistMRF() { close(p.mrfSaveCh) saveMRFToDisk(true) return + case <-p.saveStateCh: + saveMRFToDisk(true) + return case e, ok := <-p.mrfSaveCh: if !ok { return @@ -2683,3 +2596,91 @@ func (p *ReplicationPool) queueMRFHeal(file string) error { } return nil } + +// load replication stats from disk +func (p *ReplicationPool) loadStatsFromDisk() (rs map[string]BucketReplicationStats, e error) { + data, err := readConfig(p.ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName)) + if err != nil { + if !errors.Is(err, errConfigNotFound) { + return rs, nil + } + return rs, err + } + + if len(data) <= 4 { + logger.LogIf(p.ctx, fmt.Errorf("replication stats: no data")) + return map[string]BucketReplicationStats{}, nil + } + // Read repl stats meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case replStatsMetaFormat: + default: + return rs, fmt.Errorf("replication stats: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case replStatsVersion: + default: + return rs, fmt.Errorf("replication stats: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + ss := BucketStatsMap{} + if _, err = ss.UnmarshalMsg(data[4:]); err != nil { + return rs, err + } + rs = make(map[string]BucketReplicationStats, len(ss.Stats)) + for bucket, st := range ss.Stats { + rs[bucket] = st.ReplicationStats + } + + return rs, nil +} + +func (p *ReplicationPool) saveStatsToDisk() { + if p == nil || p.objLayer == nil { + return + } + sTimer := time.NewTimer(replStatsSaveInterval) + defer sTimer.Stop() + for { + select { + case <-sTimer.C: + dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) + if err == nil && !dui.LastUpdate.IsZero() { + globalReplicationStats.getAllLatest(dui.BucketsUsage) + } + p.saveStats(p.ctx) + sTimer.Reset(replStatsSaveInterval) + case <-p.ctx.Done(): + return + } + } +} + +// save replication stats to .minio.sys/buckets/replication/node-name.stats +func (p *ReplicationPool) saveStats(ctx context.Context) error { + bsm := globalReplicationStats.getAllCachedLatest() + if len(bsm.Stats) == 0 { + return nil + } + data := make([]byte, 4, 4+bsm.Msgsize()) + // Add the replication stats meta header. + binary.LittleEndian.PutUint16(data[0:2], replStatsMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], replStatsVersion) + // Add data + data, err := bsm.MarshalMsg(data) + if err != nil { + return err + } + return saveConfig(ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName), data) +} + +// SaveState saves replication stats and mrf data before server restart +func (p *ReplicationPool) SaveState(ctx context.Context) error { + go func() { + select { + case p.saveStateCh <- struct{}{}: + case <-p.ctx.Done(): + return + } + }() + return p.saveStats(ctx) +} diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index acaf87885..918861e7f 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -18,6 +18,7 @@ package cmd import ( + "fmt" "time" ) @@ -52,7 +53,10 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) { } // BucketStatsMap captures bucket statistics for all buckets -type BucketStatsMap map[string]BucketStats +type BucketStatsMap struct { + Stats map[string]BucketStats + Timestamp time.Time +} // BucketStats bucket statistics type BucketStats struct { @@ -126,3 +130,11 @@ func (bs *BucketReplicationStat) hasReplicationUsage() bool { bs.PendingCount > 0 || bs.PendingSize > 0 } + +func (brs BucketReplicationStats) String() string { + s := "ReplicatedSize=" + fmt.Sprintf("%d", brs.ReplicatedSize) + "+\n ReplicaSize=" + fmt.Sprintf("%d", brs.ReplicaSize) + for arn, st := range brs.Stats { + s += "\n arn: " + arn + " ReplicatedSize=" + fmt.Sprintf("%d", st.ReplicatedSize) + " +::ReplicaSize=" + fmt.Sprintf("%d", st.ReplicaSize) + } + return s +} diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go index 23ccc3fdc..1a16d6a7a 100644 --- a/cmd/bucket-stats_gen.go +++ b/cmd/bucket-stats_gen.go @@ -794,74 +794,109 @@ func (z *BucketStats) Msgsize() (s int) { // DecodeMsg implements msgp.Decodable func (z *BucketStatsMap) DecodeMsg(dc *msgp.Reader) (err error) { - var zb0003 uint32 - zb0003, err = dc.ReadMapHeader() + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() if err != nil { err = msgp.WrapError(err) return } - if (*z) == nil { - (*z) = make(BucketStatsMap, zb0003) - } else if len((*z)) > 0 { - for key := range *z { - delete((*z), key) - } - } - for zb0003 > 0 { - zb0003-- - var zb0001 string - var zb0002 BucketStats - zb0001, err = dc.ReadString() + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() if err != nil { err = msgp.WrapError(err) return } - var field []byte - _ = field - var zb0004 uint32 - zb0004, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - for zb0004 > 0 { - zb0004-- - field, err = dc.ReadMapKeyPtr() + switch msgp.UnsafeString(field) { + case "Stats": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() if err != nil { - err = msgp.WrapError(err, zb0001) + err = msgp.WrapError(err, "Stats") return } - switch msgp.UnsafeString(field) { - case "ReplicationStats": - err = zb0002.ReplicationStats.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, zb0001, "ReplicationStats") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, zb0001) - return + if z.Stats == nil { + z.Stats = make(map[string]BucketStats, zb0002) + } else if len(z.Stats) > 0 { + for key := range z.Stats { + delete(z.Stats, key) } } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 BucketStats + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + err = za0002.ReplicationStats.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + } + z.Stats[za0001] = za0002 + } + case "Timestamp": + z.Timestamp, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Timestamp") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } } - (*z)[zb0001] = zb0002 } return } // EncodeMsg implements msgp.Encodable -func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) { - err = en.WriteMapHeader(uint32(len(z))) +func (z *BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Stats" + err = en.Append(0x82, 0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) if err != nil { - err = msgp.WrapError(err) return } - for zb0005, zb0006 := range z { - err = en.WriteString(zb0005) + err = en.WriteMapHeader(uint32(len(z.Stats))) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + for za0001, za0002 := range z.Stats { + err = en.WriteString(za0001) if err != nil { - err = msgp.WrapError(err) + err = msgp.WrapError(err, "Stats") return } // map header, size 1 @@ -870,102 +905,148 @@ func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - err = zb0006.ReplicationStats.EncodeMsg(en) + err = za0002.ReplicationStats.EncodeMsg(en) if err != nil { - err = msgp.WrapError(err, zb0005, "ReplicationStats") + err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats") return } } + // write "Timestamp" + err = en.Append(0xa9, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70) + if err != nil { + return + } + err = en.WriteTime(z.Timestamp) + if err != nil { + err = msgp.WrapError(err, "Timestamp") + return + } return } // MarshalMsg implements msgp.Marshaler -func (z BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) { +func (z *BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendMapHeader(o, uint32(len(z))) - for zb0005, zb0006 := range z { - o = msgp.AppendString(o, zb0005) + // map header, size 2 + // string "Stats" + o = append(o, 0x82, 0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.Stats))) + for za0001, za0002 := range z.Stats { + o = msgp.AppendString(o, za0001) // map header, size 1 // string "ReplicationStats" o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) - o, err = zb0006.ReplicationStats.MarshalMsg(o) + o, err = za0002.ReplicationStats.MarshalMsg(o) if err != nil { - err = msgp.WrapError(err, zb0005, "ReplicationStats") + err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats") return } } + // string "Timestamp" + o = append(o, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70) + o = msgp.AppendTime(o, z.Timestamp) return } // UnmarshalMsg implements msgp.Unmarshaler func (z *BucketStatsMap) UnmarshalMsg(bts []byte) (o []byte, err error) { - var zb0003 uint32 - zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { err = msgp.WrapError(err) return } - if (*z) == nil { - (*z) = make(BucketStatsMap, zb0003) - } else if len((*z)) > 0 { - for key := range *z { - delete((*z), key) - } - } - for zb0003 > 0 { - var zb0001 string - var zb0002 BucketStats - zb0003-- - zb0001, bts, err = msgp.ReadStringBytes(bts) + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { err = msgp.WrapError(err) return } - var field []byte - _ = field - var zb0004 uint32 - zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - for zb0004 > 0 { - zb0004-- - field, bts, err = msgp.ReadMapKeyZC(bts) + switch msgp.UnsafeString(field) { + case "Stats": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, zb0001) + err = msgp.WrapError(err, "Stats") return } - switch msgp.UnsafeString(field) { - case "ReplicationStats": - bts, err = zb0002.ReplicationStats.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, zb0001, "ReplicationStats") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, zb0001) - return + if z.Stats == nil { + z.Stats = make(map[string]BucketStats, zb0002) + } else if len(z.Stats) > 0 { + for key := range z.Stats { + delete(z.Stats, key) } } + for zb0002 > 0 { + var za0001 string + var za0002 BucketStats + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + bts, err = za0002.ReplicationStats.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001, "ReplicationStats") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + } + z.Stats[za0001] = za0002 + } + case "Timestamp": + z.Timestamp, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Timestamp") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } } - (*z)[zb0001] = zb0002 } o = bts return } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z BucketStatsMap) Msgsize() (s int) { - s = msgp.MapHeaderSize - if z != nil { - for zb0005, zb0006 := range z { - _ = zb0006 - s += msgp.StringPrefixSize + len(zb0005) + 1 + 17 + zb0006.ReplicationStats.Msgsize() +func (z *BucketStatsMap) Msgsize() (s int) { + s = 1 + 6 + msgp.MapHeaderSize + if z.Stats != nil { + for za0001, za0002 := range z.Stats { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + 1 + 17 + za0002.ReplicationStats.Msgsize() } } + s += 10 + msgp.TimeSize return } diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 5449c08fd..66d7dede3 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -1661,7 +1661,7 @@ func getBucketUsageMetrics() *MetricsGroup { Value: float64(time.Since(dataUsageInfo.LastUpdate)), }) - bucketReplStats := getAllLatestReplicationStats(dataUsageInfo.BucketsUsage) + bucketReplStats := globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage) for bucket, usage := range dataUsageInfo.BucketsUsage { stats := bucketReplStats[bucket] diff --git a/cmd/metrics.go b/cmd/metrics.go index 6ef78625c..4e7423878 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -458,7 +458,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { } for bucket, usageInfo := range dataUsageInfo.BucketsUsage { - stat := getLatestReplicationStats(bucket, usageInfo) + stat := globalReplicationStats.getLatestReplicationStats(bucket, usageInfo) // Total space used by bucket ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( diff --git a/cmd/notification.go b/cmd/notification.go index 102ce15cf..2474d5934 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -569,14 +569,17 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck } replicationStatsList := globalReplicationStats.GetAll() - bucketStatsMap := make(map[string]BucketStats, len(replicationStatsList)) + bucketStatsMap := BucketStatsMap{ + Stats: make(map[string]BucketStats, len(replicationStatsList)), + Timestamp: UTCNow(), + } for k, replicationStats := range replicationStatsList { - bucketStatsMap[k] = BucketStats{ + bucketStatsMap.Stats[k] = BucketStats{ ReplicationStats: replicationStats, } } - replicationStats = append(replicationStats, BucketStatsMap(bucketStatsMap)) + replicationStats = append(replicationStats, bucketStatsMap) return replicationStats } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index a668734db..03e91e04f 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -263,7 +263,7 @@ func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) { values := make(url.Values) respBody, err := client.call(peerRESTMethodGetAllBucketStats, values, nil, -1) if err != nil { - return nil, err + return BucketStatsMap{}, err } bsMap := BucketStatsMap{} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index bcfec96e1..f0ba0c89b 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v26" // Add user-type to LoadPolicyMapping + peerRESTVersion = "v27" // change in GetAllBucketStats response. peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index f7ffec67e..27764865b 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -549,8 +549,7 @@ func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http ReplicationStats: v, } } - - logger.LogIf(r.Context(), msgp.Encode(w, BucketStatsMap(bucketStatsMap))) + logger.LogIf(r.Context(), msgp.Encode(w, &BucketStatsMap{Stats: bucketStatsMap, Timestamp: UTCNow()})) } // GetBucketStatsHandler - fetches current in-memory bucket stats, currently only diff --git a/cmd/signals.go b/cmd/signals.go index 9fbb42814..14beb900c 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -78,9 +78,11 @@ func handleSignals() { case <-globalHTTPServerErrorCh: exit(stopProcess()) case osSignal := <-globalOSSignalCh: + globalReplicationPool.SaveState(context.Background()) logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String())) exit(stopProcess()) case signal := <-globalServiceSignalCh: + globalReplicationPool.SaveState(context.Background()) switch signal { case serviceRestart: logger.Info("Restarting on service signal") diff --git a/internal/bucket/replication/replication.go b/internal/bucket/replication/replication.go index 206dc1741..673a851a7 100644 --- a/internal/bucket/replication/replication.go +++ b/internal/bucket/replication/replication.go @@ -138,6 +138,15 @@ func (t Type) Valid() bool { return t > 0 } +// IsDataReplication returns true if content being replicated +func (t Type) IsDataReplication() bool { + switch t { + case ObjectReplicationType, HealReplicationType, ExistingObjectReplicationType: + return true + } + return false +} + // ObjectOpts provides information to deduce whether replication // can be triggered on the resultant object. type ObjectOpts struct {