mirror of
				https://github.com/minio/minio.git
				synced 2025-10-30 00:05:02 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			517 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			517 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2015-2021 MinIO, Inc.
 | |
| //
 | |
| // This file is part of MinIO Object Storage stack
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/minio/minio/internal/bucket/replication"
 | |
| 	"github.com/rcrowley/go-metrics"
 | |
| )
 | |
| 
 | |
| func (b *BucketReplicationStats) hasReplicationUsage() bool {
 | |
| 	for _, s := range b.Stats {
 | |
| 		if s.hasReplicationUsage() {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // ReplicationStats holds the global in-memory replication stats
 | |
| type ReplicationStats struct {
 | |
| 	// map of site deployment ID to site replication status
 | |
| 	// for site replication - maintain stats at global level
 | |
| 	srStats *SRStats
 | |
| 	// active worker stats
 | |
| 	workers *ActiveWorkerStat
 | |
| 	// queue stats cache
 | |
| 	qCache queueCache
 | |
| 
 | |
| 	pCache proxyStatsCache
 | |
| 	// mrf backlog stats
 | |
| 	mrfStats ReplicationMRFStats
 | |
| 	// for bucket replication, continue to use existing cache
 | |
| 	Cache             map[string]*BucketReplicationStats
 | |
| 	mostRecentStats   BucketStatsMap
 | |
| 	registry          metrics.Registry
 | |
| 	sync.RWMutex                 // mutex for Cache
 | |
| 	mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
 | |
| 
 | |
| 	wlock sync.RWMutex // mutex for active workers
 | |
| 
 | |
| 	movingAvgTicker *time.Ticker // Ticker for calculating moving averages
 | |
| 	wTimer          *time.Ticker // ticker for calculating active workers
 | |
| 	qTimer          *time.Ticker // ticker for calculating queue stats
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) trackEWMA() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-r.movingAvgTicker.C:
 | |
| 			r.updateMovingAvg()
 | |
| 		case <-GlobalContext.Done():
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) updateMovingAvg() {
 | |
| 	r.RLock()
 | |
| 	for _, s := range r.Cache {
 | |
| 		for _, st := range s.Stats {
 | |
| 			st.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
 | |
| 			st.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
 | |
| 		}
 | |
| 	}
 | |
| 	r.RUnlock()
 | |
| }
 | |
| 
 | |
| // ActiveWorkers returns worker stats
 | |
| func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat {
 | |
| 	if r == nil {
 | |
| 		return ActiveWorkerStat{}
 | |
| 	}
 | |
| 	r.wlock.RLock()
 | |
| 	defer r.wlock.RUnlock()
 | |
| 	w := r.workers.get()
 | |
| 	return ActiveWorkerStat{
 | |
| 		Curr: w.Curr,
 | |
| 		Max:  w.Max,
 | |
| 		Avg:  w.Avg,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) collectWorkerMetrics(ctx context.Context) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-r.wTimer.C:
 | |
| 			r.wlock.Lock()
 | |
| 			r.workers.update()
 | |
| 			r.wlock.Unlock()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) collectQueueMetrics(ctx context.Context) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-r.qTimer.C:
 | |
| 			r.qCache.update()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Delete deletes in-memory replication statistics for a bucket.
 | |
| func (r *ReplicationStats) Delete(bucket string) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 	delete(r.Cache, bucket)
 | |
| }
 | |
| 
 | |
| // UpdateReplicaStat updates in-memory replica statistics with new values.
 | |
| func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 	bs, ok := r.Cache[bucket]
 | |
| 	if !ok {
 | |
| 		bs = newBucketReplicationStats()
 | |
| 	}
 | |
| 	bs.ReplicaSize += n
 | |
| 	bs.ReplicaCount++
 | |
| 	r.Cache[bucket] = bs
 | |
| 	r.srUpdateReplicaStat(n)
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) srUpdateReplicaStat(sz int64) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	atomic.AddInt64(&r.srStats.ReplicaSize, sz)
 | |
| 	atomic.AddInt64(&r.srStats.ReplicaCount, 1)
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) srUpdate(sr replStat) {
 | |
| 	dID, err := globalSiteReplicationSys.getDeplIDForEndpoint(sr.endpoint())
 | |
| 	if err == nil {
 | |
| 		r.srStats.update(sr, dID)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Update updates in-memory replication statistics with new values.
 | |
| func (r *ReplicationStats) Update(bucket string, ri replicatedTargetInfo, status, prevStatus replication.StatusType) {
 | |
| 	if r == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	var rs replStat
 | |
| 	switch status {
 | |
| 	case replication.Pending:
 | |
| 		if ri.OpType.IsDataReplication() && prevStatus != status {
 | |
| 			rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
 | |
| 		}
 | |
| 	case replication.Completed:
 | |
| 		if ri.OpType.IsDataReplication() {
 | |
| 			rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
 | |
| 		}
 | |
| 	case replication.Failed:
 | |
| 		if ri.OpType.IsDataReplication() && prevStatus == replication.Pending {
 | |
| 			rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err)
 | |
| 		}
 | |
| 	case replication.Replica:
 | |
| 		if ri.OpType == replication.ObjectReplicationType {
 | |
| 			rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, "", false, ri.Err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// update site-replication in-memory stats
 | |
| 	if rs.Completed || rs.Failed {
 | |
| 		r.srUpdate(rs)
 | |
| 	}
 | |
| 
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 
 | |
| 	// update bucket replication in-memory stats
 | |
| 	bs, ok := r.Cache[bucket]
 | |
| 	if !ok {
 | |
| 		bs = newBucketReplicationStats()
 | |
| 		r.Cache[bucket] = bs
 | |
| 	}
 | |
| 	b, ok := bs.Stats[ri.Arn]
 | |
| 	if !ok {
 | |
| 		b = &BucketReplicationStat{
 | |
| 			XferRateLrg: newXferStats(),
 | |
| 			XferRateSml: newXferStats(),
 | |
| 		}
 | |
| 		bs.Stats[ri.Arn] = b
 | |
| 	}
 | |
| 
 | |
| 	switch {
 | |
| 	case rs.Completed:
 | |
| 		b.ReplicatedSize += rs.TransferSize
 | |
| 		b.ReplicatedCount++
 | |
| 		if rs.TransferDuration > 0 {
 | |
| 			b.Latency.update(rs.TransferSize, rs.TransferDuration)
 | |
| 			b.updateXferRate(rs.TransferSize, rs.TransferDuration)
 | |
| 		}
 | |
| 	case rs.Failed:
 | |
| 		b.FailStats.addsize(rs.TransferSize, rs.Err)
 | |
| 	case rs.Pending:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type replStat struct {
 | |
| 	Arn       string
 | |
| 	Completed bool
 | |
| 	Pending   bool
 | |
| 	Failed    bool
 | |
| 	opType    replication.Type
 | |
| 	// transfer size
 | |
| 	TransferSize int64
 | |
| 	// transfer duration
 | |
| 	TransferDuration time.Duration
 | |
| 	Endpoint         string
 | |
| 	Secure           bool
 | |
| 	Err              error
 | |
| }
 | |
| 
 | |
| func (rs *replStat) endpoint() string {
 | |
| 	scheme := "http"
 | |
| 	if rs.Secure {
 | |
| 		scheme = "https"
 | |
| 	}
 | |
| 	return scheme + "://" + rs.Endpoint
 | |
| }
 | |
| 
 | |
| func (rs *replStat) set(arn string, n int64, duration time.Duration, status replication.StatusType, opType replication.Type, endpoint string, secure bool, err error) {
 | |
| 	rs.Endpoint = endpoint
 | |
| 	rs.Secure = secure
 | |
| 	rs.TransferSize = n
 | |
| 	rs.Arn = arn
 | |
| 	rs.TransferDuration = duration
 | |
| 	rs.opType = opType
 | |
| 	switch status {
 | |
| 	case replication.Completed:
 | |
| 		rs.Completed = true
 | |
| 	case replication.Pending:
 | |
| 		rs.Pending = true
 | |
| 	case replication.Failed:
 | |
| 		rs.Failed = true
 | |
| 		rs.Err = err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetAll returns replication metrics for all buckets at once.
 | |
| func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
 | |
| 	if r == nil {
 | |
| 		return map[string]BucketReplicationStats{}
 | |
| 	}
 | |
| 
 | |
| 	r.RLock()
 | |
| 
 | |
| 	bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
 | |
| 	for k, v := range r.Cache {
 | |
| 		bucketReplicationStats[k] = v.Clone()
 | |
| 	}
 | |
| 	r.RUnlock()
 | |
| 	for k, v := range bucketReplicationStats {
 | |
| 		v.QStat = r.qCache.getBucketStats(k)
 | |
| 		bucketReplicationStats[k] = v
 | |
| 	}
 | |
| 
 | |
| 	return bucketReplicationStats
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary {
 | |
| 	if r == nil {
 | |
| 		return SRMetricsSummary{}
 | |
| 	}
 | |
| 
 | |
| 	m := SRMetricsSummary{
 | |
| 		Uptime:        UTCNow().Unix() - globalBootTime.Unix(),
 | |
| 		Queued:        r.qCache.getSiteStats(),
 | |
| 		ActiveWorkers: r.ActiveWorkers(),
 | |
| 		Metrics:       r.srStats.get(),
 | |
| 		Proxied:       r.pCache.getSiteStats(),
 | |
| 		ReplicaSize:   atomic.LoadInt64(&r.srStats.ReplicaSize),
 | |
| 		ReplicaCount:  atomic.LoadInt64(&r.srStats.ReplicaCount),
 | |
| 	}
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // Get replication metrics for a bucket from this node since this node came up.
 | |
| func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
 | |
| 	if r == nil {
 | |
| 		return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
 | |
| 	}
 | |
| 
 | |
| 	r.RLock()
 | |
| 	defer r.RUnlock()
 | |
| 
 | |
| 	st, ok := r.Cache[bucket]
 | |
| 	if !ok {
 | |
| 		return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
 | |
| 	}
 | |
| 	return st.Clone()
 | |
| }
 | |
| 
 | |
| // NewReplicationStats initialize in-memory replication statistics
 | |
| func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
 | |
| 	r := metrics.NewRegistry()
 | |
| 	rs := ReplicationStats{
 | |
| 		Cache:           make(map[string]*BucketReplicationStats),
 | |
| 		qCache:          newQueueCache(r),
 | |
| 		pCache:          newProxyStatsCache(),
 | |
| 		srStats:         newSRStats(),
 | |
| 		movingAvgTicker: time.NewTicker(2 * time.Second),
 | |
| 		wTimer:          time.NewTicker(2 * time.Second),
 | |
| 		qTimer:          time.NewTicker(2 * time.Second),
 | |
| 
 | |
| 		workers:  newActiveWorkerStat(r),
 | |
| 		registry: r,
 | |
| 	}
 | |
| 	go rs.collectWorkerMetrics(ctx)
 | |
| 	go rs.collectQueueMetrics(ctx)
 | |
| 	return &rs
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) {
 | |
| 	if r == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
 | |
| 	bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage))
 | |
| 
 | |
| 	for bucket := range bucketsUsage {
 | |
| 		bucketStats := make([]BucketStats, len(peerBucketStatsList))
 | |
| 		for i, peerBucketStats := range peerBucketStatsList {
 | |
| 			bucketStat, ok := peerBucketStats.Stats[bucket]
 | |
| 			if !ok {
 | |
| 				continue
 | |
| 			}
 | |
| 			bucketStats[i] = bucketStat
 | |
| 		}
 | |
| 		bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, bucketStats)
 | |
| 	}
 | |
| 	return bucketsReplicationStats
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucketStats []BucketStats) (bs BucketStats) {
 | |
| 	if r == nil {
 | |
| 		bs = BucketStats{
 | |
| 			ReplicationStats: BucketReplicationStats{
 | |
| 				Stats: make(map[string]*BucketReplicationStat),
 | |
| 			},
 | |
| 			QueueStats: ReplicationQueueStats{},
 | |
| 			ProxyStats: ProxyMetric{},
 | |
| 		}
 | |
| 		return bs
 | |
| 	}
 | |
| 	var s BucketReplicationStats
 | |
| 	// accumulate cluster bucket stats
 | |
| 	stats := make(map[string]*BucketReplicationStat)
 | |
| 	var (
 | |
| 		totReplicaSize, totReplicatedSize   int64
 | |
| 		totReplicaCount, totReplicatedCount int64
 | |
| 		totFailed                           RTimedMetrics
 | |
| 		tq                                  InQueueMetric
 | |
| 	)
 | |
| 	for _, bucketStat := range bucketStats {
 | |
| 		totReplicaSize += bucketStat.ReplicationStats.ReplicaSize
 | |
| 		totReplicaCount += bucketStat.ReplicationStats.ReplicaCount
 | |
| 		for _, q := range bucketStat.QueueStats.Nodes {
 | |
| 			tq = tq.merge(q.QStats)
 | |
| 		}
 | |
| 
 | |
| 		for arn, stat := range bucketStat.ReplicationStats.Stats {
 | |
| 			oldst := stats[arn]
 | |
| 			if oldst == nil {
 | |
| 				oldst = &BucketReplicationStat{
 | |
| 					XferRateLrg: newXferStats(),
 | |
| 					XferRateSml: newXferStats(),
 | |
| 				}
 | |
| 			}
 | |
| 			fstats := stat.FailStats.merge(oldst.FailStats)
 | |
| 			lrg := oldst.XferRateLrg.merge(*stat.XferRateLrg)
 | |
| 			sml := oldst.XferRateSml.merge(*stat.XferRateSml)
 | |
| 			stats[arn] = &BucketReplicationStat{
 | |
| 				Failed:          fstats.toMetric(),
 | |
| 				FailStats:       fstats,
 | |
| 				ReplicatedSize:  stat.ReplicatedSize + oldst.ReplicatedSize,
 | |
| 				ReplicatedCount: stat.ReplicatedCount + oldst.ReplicatedCount,
 | |
| 				Latency:         stat.Latency.merge(oldst.Latency),
 | |
| 				XferRateLrg:     &lrg,
 | |
| 				XferRateSml:     &sml,
 | |
| 			}
 | |
| 			totReplicatedSize += stat.ReplicatedSize
 | |
| 			totReplicatedCount += stat.ReplicatedCount
 | |
| 			totFailed = totFailed.merge(stat.FailStats)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	s = BucketReplicationStats{
 | |
| 		Stats:           stats,
 | |
| 		QStat:           tq,
 | |
| 		ReplicaSize:     totReplicaSize,
 | |
| 		ReplicaCount:    totReplicaCount,
 | |
| 		ReplicatedSize:  totReplicatedSize,
 | |
| 		ReplicatedCount: totReplicatedCount,
 | |
| 		Failed:          totFailed.toMetric(),
 | |
| 	}
 | |
| 
 | |
| 	var qs ReplicationQueueStats
 | |
| 	for _, bs := range bucketStats {
 | |
| 		qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...)
 | |
| 	}
 | |
| 	qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
 | |
| 
 | |
| 	var ps ProxyMetric
 | |
| 	for _, bs := range bucketStats {
 | |
| 		ps.add(bs.ProxyStats)
 | |
| 	}
 | |
| 	bs = BucketStats{
 | |
| 		ReplicationStats: s,
 | |
| 		QueueStats:       qs,
 | |
| 		ProxyStats:       ps,
 | |
| 	}
 | |
| 	r.mostRecentStatsMu.Lock()
 | |
| 	if len(r.mostRecentStats.Stats) == 0 {
 | |
| 		r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
 | |
| 	}
 | |
| 	if len(bs.ReplicationStats.Stats) > 0 {
 | |
| 		r.mostRecentStats.Stats[bucket] = bs
 | |
| 	}
 | |
| 	r.mostRecentStats.Timestamp = UTCNow()
 | |
| 	r.mostRecentStatsMu.Unlock()
 | |
| 	return bs
 | |
| }
 | |
| 
 | |
| // get the most current of in-memory replication stats  and data usage info from crawler.
 | |
| func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) {
 | |
| 	if r == nil {
 | |
| 		return s
 | |
| 	}
 | |
| 	bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
 | |
| 	return r.calculateBucketReplicationStats(bucket, bucketStats)
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) incQ(bucket string, sz int64, isDeleteRepl bool, opType replication.Type) {
 | |
| 	r.qCache.Lock()
 | |
| 	defer r.qCache.Unlock()
 | |
| 	v, ok := r.qCache.bucketStats[bucket]
 | |
| 	if !ok {
 | |
| 		v = newInQueueStats(r.registry, bucket)
 | |
| 	}
 | |
| 	atomic.AddInt64(&v.nowBytes, sz)
 | |
| 	atomic.AddInt64(&v.nowCount, 1)
 | |
| 	r.qCache.bucketStats[bucket] = v
 | |
| 	atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, sz)
 | |
| 	atomic.AddInt64(&r.qCache.srQueueStats.nowCount, 1)
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opType replication.Type) {
 | |
| 	r.qCache.Lock()
 | |
| 	defer r.qCache.Unlock()
 | |
| 	v, ok := r.qCache.bucketStats[bucket]
 | |
| 	if !ok {
 | |
| 		v = newInQueueStats(r.registry, bucket)
 | |
| 	}
 | |
| 	atomic.AddInt64(&v.nowBytes, -1*sz)
 | |
| 	atomic.AddInt64(&v.nowCount, -1)
 | |
| 	r.qCache.bucketStats[bucket] = v
 | |
| 
 | |
| 	atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz)
 | |
| 	atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1)
 | |
| }
 | |
| 
 | |
| // incProxy increments proxy metrics for proxied calls
 | |
| func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) {
 | |
| 	if r != nil {
 | |
| 		r.pCache.inc(bucket, api, isErr)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric {
 | |
| 	if r == nil {
 | |
| 		return ProxyMetric{}
 | |
| 	}
 | |
| 	return r.pCache.getBucketStats(bucket)
 | |
| }
 |