mirror of
				https://github.com/minio/minio.git
				synced 2025-10-29 15:55:00 -04:00 
			
		
		
		
	Fixes to replication metrics (#13493)
For reporting ReplicaSize and loading initial replication metrics correctly.
This commit is contained in:
		
							parent
							
								
									52c5f6e152
								
							
						
					
					
						commit
						e7f559c582
					
				| @ -21,6 +21,7 @@ import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/minio/minio/internal/bucket/replication" | ||||
| ) | ||||
| @ -67,7 +68,7 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) { | ||||
| 	if !ok { | ||||
| 		bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} | ||||
| 	} | ||||
| 	atomic.StoreInt64(&bs.ReplicaSize, n) | ||||
| 	atomic.AddInt64(&bs.ReplicaSize, n) | ||||
| 	r.Cache[bucket] = bs | ||||
| } | ||||
| 
 | ||||
| @ -122,44 +123,13 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats | ||||
| 	if r == nil { | ||||
| 		return BucketReplicationStats{} | ||||
| 	} | ||||
| 
 | ||||
| 	r.ulock.RLock() | ||||
| 
 | ||||
| 	brs := BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} | ||||
| 
 | ||||
| 	defer r.ulock.RUnlock() | ||||
| 	st, ok := r.UsageCache[bucket] | ||||
| 	if ok { | ||||
| 		return st.Clone() | ||||
| 	} | ||||
| 	r.ulock.RUnlock() | ||||
| 
 | ||||
| 	dataUsageInfo, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) | ||||
| 	if err != nil { | ||||
| 		return brs | ||||
| 	} | ||||
| 	// data usage has not captured any data yet. | ||||
| 	if dataUsageInfo.LastUpdate.IsZero() { | ||||
| 		return brs | ||||
| 	} | ||||
| 	usage, ok := dataUsageInfo.BucketsUsage[bucket] | ||||
| 	if ok && usage.ReplicationInfo != nil { | ||||
| 		brs.ReplicaSize = int64(usage.ReplicaSize) | ||||
| 		for arn, uinfo := range usage.ReplicationInfo { | ||||
| 			brs.Stats[arn] = &BucketReplicationStat{ | ||||
| 				FailedSize:     int64(uinfo.ReplicationFailedSize), | ||||
| 				ReplicatedSize: int64(uinfo.ReplicatedSize), | ||||
| 				ReplicaSize:    int64(uinfo.ReplicaSize), | ||||
| 				FailedCount:    int64(uinfo.ReplicationFailedCount), | ||||
| 			} | ||||
| 		} | ||||
| 		if brs.hasReplicationUsage() { | ||||
| 			r.ulock.Lock() | ||||
| 			defer r.ulock.Unlock() | ||||
| 			r.UsageCache[bucket] = &brs | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 	return brs | ||||
| 	return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} | ||||
| } | ||||
| 
 | ||||
| // Get replication metrics for a bucket from this node since this node came up. | ||||
| @ -180,38 +150,51 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { | ||||
| 
 | ||||
| // NewReplicationStats initialize in-memory replication statistics | ||||
| func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { | ||||
| 	st := &ReplicationStats{ | ||||
| 	return &ReplicationStats{ | ||||
| 		Cache:      make(map[string]*BucketReplicationStats), | ||||
| 		UsageCache: make(map[string]*BucketReplicationStats), | ||||
| 	} | ||||
| 
 | ||||
| 	dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) | ||||
| 	if err != nil { | ||||
| 		return st | ||||
| 	} | ||||
| 
 | ||||
| 	// data usage has not captured any data yet. | ||||
| 	if dataUsageInfo.LastUpdate.IsZero() { | ||||
| 		return st | ||||
| 	} | ||||
| 
 | ||||
| 	for bucket, usage := range dataUsageInfo.BucketsUsage { | ||||
| 		b := &BucketReplicationStats{ | ||||
| 			Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), | ||||
| 		} | ||||
| 		for arn, uinfo := range usage.ReplicationInfo { | ||||
| 			b.Stats[arn] = &BucketReplicationStat{ | ||||
| 				FailedSize:     int64(uinfo.ReplicationFailedSize), | ||||
| 				ReplicatedSize: int64(uinfo.ReplicatedSize), | ||||
| 				ReplicaSize:    int64(uinfo.ReplicaSize), | ||||
| 				FailedCount:    int64(uinfo.ReplicationFailedCount), | ||||
| 			} | ||||
| 		} | ||||
| 		b.ReplicaSize += int64(usage.ReplicaSize) | ||||
| 		if b.hasReplicationUsage() { | ||||
| 			st.UsageCache[bucket] = b | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return st | ||||
| } | ||||
| 
 | ||||
| // load replication metrics at cluster start from initial data usage | ||||
| func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) { | ||||
| 	rTimer := time.NewTimer(time.Minute * 1) | ||||
| 	defer rTimer.Stop() | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case <-rTimer.C: | ||||
| 			dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) | ||||
| 			if err != nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			// data usage has not captured any data yet. | ||||
| 			if dui.LastUpdate.IsZero() { | ||||
| 				continue | ||||
| 			} | ||||
| 			m := make(map[string]*BucketReplicationStats) | ||||
| 			for bucket, usage := range dui.BucketsUsage { | ||||
| 				b := &BucketReplicationStats{ | ||||
| 					Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), | ||||
| 				} | ||||
| 				for arn, uinfo := range usage.ReplicationInfo { | ||||
| 					b.Stats[arn] = &BucketReplicationStat{ | ||||
| 						FailedSize:     int64(uinfo.ReplicationFailedSize), | ||||
| 						ReplicatedSize: int64(uinfo.ReplicatedSize), | ||||
| 						ReplicaSize:    int64(uinfo.ReplicaSize), | ||||
| 						FailedCount:    int64(uinfo.ReplicationFailedCount), | ||||
| 					} | ||||
| 				} | ||||
| 				b.ReplicaSize += int64(usage.ReplicaSize) | ||||
| 				if b.hasReplicationUsage() { | ||||
| 					m[bucket] = b | ||||
| 				} | ||||
| 			} | ||||
| 			r.ulock.Lock() | ||||
| 			defer r.ulock.Unlock() | ||||
| 			r.UsageCache = m | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -1497,6 +1497,7 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { | ||||
| 		FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(), | ||||
| 	}) | ||||
| 	globalReplicationStats = NewReplicationStats(ctx, objectAPI) | ||||
| 	go globalReplicationStats.loadInitialReplicationMetrics(ctx) | ||||
| } | ||||
| 
 | ||||
| // get Reader from replication target if active-active replication is in place and | ||||
|  | ||||
| @ -442,8 +442,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic | ||||
| 	for _, bucketStat := range bucketStats { | ||||
| 		totReplicaSize += bucketStat.ReplicationStats.ReplicaSize | ||||
| 		for arn, stat := range bucketStat.ReplicationStats.Stats { | ||||
| 			oldst, ok := stats[arn] | ||||
| 			if !ok { | ||||
| 			oldst := stats[arn] | ||||
| 			if oldst == nil { | ||||
| 				oldst = &BucketReplicationStat{} | ||||
| 			} | ||||
| 			stats[arn] = &BucketReplicationStat{ | ||||
| @ -459,8 +459,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic | ||||
| 	if usageStat.Stats != nil { | ||||
| 		totReplicaSize += usageStat.ReplicaSize | ||||
| 		for arn, stat := range usageStat.Stats { | ||||
| 			st, ok := stats[arn] | ||||
| 			if !ok { | ||||
| 			st := stats[arn] | ||||
| 			if st == nil { | ||||
| 				st = &BucketReplicationStat{ | ||||
| 					ReplicatedSize: stat.ReplicatedSize, | ||||
| 					FailedSize:     stat.FailedSize, | ||||
| @ -484,13 +484,13 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic | ||||
| 	// normalize computed real time stats with latest usage stat | ||||
| 	for arn, tgtstat := range stats { | ||||
| 		st := BucketReplicationStat{} | ||||
| 		bu, ok := usageStat.Stats[arn] | ||||
| 		bu, ok := u.ReplicationInfo[arn] | ||||
| 		if !ok { | ||||
| 			bu = &BucketReplicationStat{} | ||||
| 			bu = BucketTargetUsageInfo{} | ||||
| 		} | ||||
| 		// use in memory replication stats if it is ahead of usage info. | ||||
| 		st.ReplicatedSize = bu.ReplicatedSize | ||||
| 		if tgtstat.ReplicatedSize >= bu.ReplicatedSize { | ||||
| 		st.ReplicatedSize = int64(bu.ReplicatedSize) | ||||
| 		if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { | ||||
| 			st.ReplicatedSize = tgtstat.ReplicatedSize | ||||
| 		} | ||||
| 		s.ReplicatedSize += st.ReplicatedSize | ||||
|  | ||||
| @ -3278,9 +3278,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite | ||||
| 	if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { | ||||
| 		scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) | ||||
| 	} | ||||
| 	if objInfo.ReplicationStatus == replication.Replica { | ||||
| 	if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { | ||||
| 		actualSize, _ := objInfo.GetActualSize() | ||||
| 		globalReplicationStats.UpdateReplicaStat(bucket, actualSize) | ||||
| 		defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) | ||||
| 	} | ||||
| 
 | ||||
| 	// Write success response. | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user