Fix uninitialized replication stats (#20260)

Services are unfrozen before `initBackgroundReplication` is finished. This means that 
the globalReplicationStats write is racy. Switch to an atomic pointer.

Provide the `ReplicationPool` with the stats, so it doesn't have to be grabbed 
from the atomic pointer on every use.

All other loads and checks are nil, and calls return empty values when stats 
still haven't been initialized.
This commit is contained in:
Klaus Post 2024-08-15 05:04:40 -07:00 committed by GitHub
parent 3b1aa40372
commit f1302c40fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 175 additions and 101 deletions

View File

@ -1682,7 +1682,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
globalNotificationSys.DeleteBucketMetadata(ctx, bucket)
globalReplicationPool.deleteResyncMetadata(ctx, bucket)
globalReplicationPool.Get().deleteResyncMetadata(ctx, bucket)
// Call site replication hook.
replLogIf(ctx, globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete))

View File

@ -230,7 +230,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
w.Header().Set(xhttp.ContentType, string(mimeJSON))
enc := json.NewEncoder(w)
stats := globalReplicationStats.getLatestReplicationStats(bucket)
stats := globalReplicationStats.Load().getLatestReplicationStats(bucket)
bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket)
bwMap := bwRpt.BucketStats
for arn, st := range stats.ReplicationStats.Stats {
@ -286,7 +286,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsV2Handler(w http.Respons
w.Header().Set(xhttp.ContentType, string(mimeJSON))
enc := json.NewEncoder(w)
stats := globalReplicationStats.getLatestReplicationStats(bucket)
stats := globalReplicationStats.Load().getLatestReplicationStats(bucket)
bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket)
bwMap := bwRpt.BucketStats
for arn, st := range stats.ReplicationStats.Stats {
@ -422,7 +422,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW
return
}
if err := globalReplicationPool.resyncer.start(ctx, objectAPI, resyncOpts{
if err := globalReplicationPool.Get().resyncer.start(ctx, objectAPI, resyncOpts{
bucket: bucket,
arn: arn,
resyncID: resetID,

View File

@ -119,7 +119,7 @@ func (a *ActiveWorkerStat) update() {
if a == nil {
return
}
a.Curr = globalReplicationPool.ActiveWorkers()
a.Curr = globalReplicationPool.Get().ActiveWorkers()
a.hist.Update(int64(a.Curr))
a.Avg = float32(a.hist.Mean())
a.Max = int(a.hist.Max())

View File

@ -87,6 +87,9 @@ func (r *ReplicationStats) updateMovingAvg() {
// ActiveWorkers returns worker stats
func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat {
if r == nil {
return ActiveWorkerStat{}
}
r.wlock.RLock()
defer r.wlock.RUnlock()
w := r.workers.get()
@ -351,6 +354,9 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
}
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) {
if r == nil {
return nil
}
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage))
@ -460,6 +466,9 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket
// get the most current of in-memory replication stats and data usage info from crawler.
func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) {
if r == nil {
return s
}
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return r.calculateBucketReplicationStats(bucket, bucketStats)
}
@ -495,9 +504,14 @@ func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opTyp
// incProxy increments proxy metrics for proxied calls
func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) {
r.pCache.inc(bucket, api, isErr)
if r != nil {
r.pCache.inc(bucket, api, isErr)
}
}
func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric {
if r == nil {
return ProxyMetric{}
}
return r.pCache.getBucketStats(bucket)
}

View File

@ -50,6 +50,7 @@ import (
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/once"
"github.com/tinylib/msgp/msgp"
"github.com/zeebo/xxh3"
"golang.org/x/exp/maps"
@ -478,7 +479,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry())
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
@ -548,7 +549,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
// to decrement pending count later.
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(dobj.Bucket, rinfo, replicationStatus,
globalReplicationStats.Load().Update(dobj.Bucket, rinfo, replicationStatus,
prevStatus)
}
}
@ -556,7 +557,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
eventName := event.ObjectReplicationComplete
if replicationStatus == replication.Failed {
eventName = event.ObjectReplicationFailed
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry())
}
drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
if replicationStatus != prevStatus {
@ -1054,7 +1055,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
UserAgent: "Internal: [Replication]",
Host: globalLocalNodeName,
})
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
return
}
ctx = lkctx.Context()
@ -1139,7 +1140,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
rinfo.OpType = opType // update optype to reflect correct operation.
globalReplicationStats.Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus)
globalReplicationStats.Load().Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus)
}
}
}
@ -1159,7 +1160,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
ri.EventType = ReplicateMRF
ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
ri.RetryCount++
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
}
}
@ -1787,8 +1788,8 @@ const (
)
var (
globalReplicationPool *ReplicationPool
globalReplicationStats *ReplicationStats
globalReplicationPool = once.NewSingleton[ReplicationPool]()
globalReplicationStats atomic.Pointer[ReplicationStats]
)
// ReplicationPool describes replication pool
@ -1803,6 +1804,7 @@ type ReplicationPool struct {
priority string
maxWorkers int
maxLWorkers int
stats *ReplicationStats
mu sync.RWMutex
mrfMU sync.Mutex
@ -1849,7 +1851,7 @@ const (
)
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts, stats *ReplicationStats) *ReplicationPool {
var workers, failedWorkers int
priority := "auto"
maxWorkers := WorkerMaxLimit
@ -1891,6 +1893,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
mrfStopCh: make(chan struct{}, 1),
ctx: ctx,
objLayer: o,
stats: stats,
priority: priority,
maxWorkers: maxWorkers,
maxLWorkers: maxLWorkers,
@ -1918,11 +1921,11 @@ func (p *ReplicationPool) AddMRFWorker() {
}
switch v := oi.(type) {
case ReplicateObjectInfo:
globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
atomic.AddInt32(&p.activeMRFWorkers, 1)
replicateObject(p.ctx, v, p.objLayer)
atomic.AddInt32(&p.activeMRFWorkers, -1)
globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
default:
bugLogIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type")
@ -1950,9 +1953,9 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
replicateObject(p.ctx, v, p.objLayer)
globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
@ -1960,10 +1963,10 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
globalReplicationStats.incQ(v.Bucket, 0, true, v.OpType)
p.stats.incQ(v.Bucket, 0, true, v.OpType)
replicateDelete(p.ctx, v, p.objLayer)
globalReplicationStats.decQ(v.Bucket, 0, true, v.OpType)
p.stats.decQ(v.Bucket, 0, true, v.OpType)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
@ -1990,9 +1993,9 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation
if opTracker != nil {
atomic.AddInt32(opTracker, 1)
}
globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
replicateObject(p.ctx, v, p.objLayer)
globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
if opTracker != nil {
atomic.AddInt32(opTracker, -1)
}
@ -2156,7 +2159,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case <-p.ctx.Done():
case p.lrgworkers[h%uint64(len(p.lrgworkers))] <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
p.queueMRFSave(ri.ToMRFEntry())
p.mu.RLock()
maxLWorkers := p.maxLWorkers
existing := len(p.lrgworkers)
@ -2187,7 +2190,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
case healCh <- ri:
case ch <- ri:
default:
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
p.mu.RLock()
prio := p.priority
maxWorkers := p.maxWorkers
@ -2223,7 +2226,7 @@ func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObje
doi.ResetID = v.ResetID
doi.TargetArn = k
globalReplicationPool.queueReplicaDeleteTask(doi)
globalReplicationPool.Get().queueReplicaDeleteTask(doi)
}
}
}
@ -2244,7 +2247,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
case <-p.ctx.Done():
case ch <- doi:
default:
globalReplicationPool.queueMRFSave(doi.ToMRFEntry())
p.queueMRFSave(doi.ToMRFEntry())
p.mu.RLock()
prio := p.priority
maxWorkers := p.maxWorkers
@ -2274,9 +2277,10 @@ type replicationPoolOpts struct {
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts())
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
go globalReplicationStats.trackEWMA()
stats := NewReplicationStats(ctx, objectAPI)
globalReplicationPool.Set(NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts(), stats))
globalReplicationStats.Store(stats)
go stats.trackEWMA()
}
type proxyResult struct {
@ -2482,7 +2486,7 @@ func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc
if dsc.Synchronous() {
replicateObject(ctx, ri, o)
} else {
globalReplicationPool.queueReplicaTask(ri)
globalReplicationPool.Get().queueReplicaTask(ri)
}
}
@ -2610,9 +2614,9 @@ func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts
}
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
globalReplicationPool.queueReplicaDeleteTask(dv)
globalReplicationPool.Get().queueReplicaDeleteTask(dv)
for arn := range dv.ReplicationState.Targets {
globalReplicationStats.Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType(""))
globalReplicationStats.Load().Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType(""))
}
}
@ -3042,9 +3046,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt
if len(tgtArns) == 0 {
return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn)
}
globalReplicationPool.resyncer.RLock()
data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
globalReplicationPool.resyncer.RUnlock()
globalReplicationPool.Get().resyncer.RLock()
data, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket]
globalReplicationPool.Get().resyncer.RUnlock()
if !ok {
data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI)
if err != nil {
@ -3070,9 +3074,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt
return err
}
globalReplicationPool.resyncer.Lock()
defer globalReplicationPool.resyncer.Unlock()
brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
globalReplicationPool.Get().resyncer.Lock()
defer globalReplicationPool.Get().resyncer.Unlock()
brs, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket]
if !ok {
brs = BucketReplicationResyncStatus{
Version: resyncMetaVersion,
@ -3080,8 +3084,8 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt
}
}
brs.TargetsMap[opts.arn] = status
globalReplicationPool.resyncer.statusMap[opts.bucket] = brs
go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
globalReplicationPool.Get().resyncer.statusMap[opts.bucket] = brs
go globalReplicationPool.Get().resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
return nil
}
@ -3413,7 +3417,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
if roi.ReplicationStatus == replication.Pending ||
roi.ReplicationStatus == replication.Failed ||
roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending {
globalReplicationPool.queueReplicaDeleteTask(dv)
globalReplicationPool.Get().queueReplicaDeleteTask(dv)
return
}
// if replication status is Complete on DeleteMarker and existing object resync required
@ -3429,12 +3433,12 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
switch roi.ReplicationStatus {
case replication.Pending, replication.Failed:
roi.EventType = ReplicateHeal
globalReplicationPool.queueReplicaTask(roi)
globalReplicationPool.Get().queueReplicaTask(roi)
return
}
if roi.ExistingObjResync.mustResync() {
roi.EventType = ReplicateExisting
globalReplicationPool.queueReplicaTask(roi)
globalReplicationPool.Get().queueReplicaTask(roi)
}
return
}
@ -3499,8 +3503,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
return
}
if entry.RetryCount > mrfRetryLimit { // let scanner catch up if retry count exceeded
atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1)
atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1)
atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
return
}
@ -3513,8 +3517,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
select {
case p.mrfSaveCh <- entry:
default:
atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1)
atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1)
atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
}
}
}
@ -3563,7 +3567,7 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string
if !p.initialized() {
return
}
atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries)))
atomic.StoreUint64(&p.stats.mrfStats.LastFailedCount, uint64(len(entries)))
if len(entries) == 0 {
return
}

View File

@ -310,7 +310,7 @@ type ReplQNodeStats struct {
func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats) {
qs.NodeName = globalLocalNodeName
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
qs.ActiveWorkers = globalReplicationStats.ActiveWorkers()
qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers()
qs.XferStats = make(map[RMetricName]XferStats)
qs.QStats = r.qCache.getBucketStats(bucket)
qs.TgtXferStats = make(map[string]map[RMetricName]XferStats)
@ -402,7 +402,7 @@ func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats)
func (r *ReplicationStats) getNodeQueueStatsSummary() (qs ReplQNodeStats) {
qs.NodeName = globalLocalNodeName
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
qs.ActiveWorkers = globalReplicationStats.ActiveWorkers()
qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers()
qs.XferStats = make(map[RMetricName]XferStats)
qs.QStats = r.qCache.getSiteStats()
qs.MRFStats = ReplicationMRFStats{

View File

@ -170,9 +170,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int, legacy bool) {
listQuorum = "strict"
}
t.listQuorum = listQuorum
if globalReplicationPool != nil &&
if r := globalReplicationPool.GetNonBlocking(); r != nil &&
(cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers || cfg.ReplicationMaxLWorkers != t.replicationMaxLWorkers) {
globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers)
r.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers)
}
t.replicationPriority = cfg.ReplicationPriority
t.replicationMaxWorkers = cfg.ReplicationMaxWorkers

View File

@ -2313,8 +2313,8 @@ func getReplicationNodeMetrics(opts MetricsGroupOpts) *MetricsGroupV2 {
var ml []MetricV2
// common operational metrics for bucket replication and site replication - published
// at cluster level
if globalReplicationStats != nil {
qs := globalReplicationStats.getNodeQueueStatsSummary()
if rStats := globalReplicationStats.Load(); rStats != nil {
qs := rStats.getNodeQueueStatsSummary()
activeWorkersCount := MetricV2{
Description: getClusterReplActiveWorkersCountMD(),
}
@ -3245,7 +3245,7 @@ func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroupV2 {
var bucketReplStats map[string]BucketStats
if !globalSiteReplicationSys.isEnabled() {
bucketReplStats = globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage)
bucketReplStats = globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage)
}
for bucket, usage := range dataUsageInfo.BucketsUsage {
quota, _ := globalBucketQuotaSys.Get(ctx, bucket)

View File

@ -119,7 +119,7 @@ func loadBucketReplicationMetrics(ctx context.Context, m MetricValues, c *metric
return nil
}
bucketReplStats := globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage)
bucketReplStats := globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage)
for _, bucket := range buckets {
labels := []string{bucketL, bucket}
if s, ok := bucketReplStats[bucket]; ok {

View File

@ -69,11 +69,12 @@ var (
// loadClusterReplicationMetrics - `MetricsLoaderFn` for cluster replication metrics
// such as transfer rate and objects queued.
func loadClusterReplicationMetrics(ctx context.Context, m MetricValues, c *metricsCache) error {
if globalReplicationStats == nil {
st := globalReplicationStats.Load()
if st == nil {
return nil
}
qs := globalReplicationStats.getNodeQueueStatsSummary()
qs := st.getNodeQueueStatsSummary()
qt := qs.QStats
m.Set(replicationAverageQueuedBytes, float64(qt.Avg.Bytes))

View File

@ -300,7 +300,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
}
for bucket, usageInfo := range dataUsageInfo.BucketsUsage {
stat := globalReplicationStats.getLatestReplicationStats(bucket)
stat := globalReplicationStats.Load().getLatestReplicationStats(bucket)
// Total space used by bucket
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(

View File

@ -537,7 +537,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s
// DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers
func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) {
globalReplicationStats.Delete(bucketName)
globalReplicationStats.Load().Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
globalBucketTargetSys.Delete(bucketName)
globalEventNotifier.RemoveNotification(bucketName)
@ -591,7 +591,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
}
}
replicationStatsList := globalReplicationStats.GetAll()
replicationStatsList := globalReplicationStats.Load().GetAll()
bucketStatsMap := BucketStatsMap{
Stats: make(map[string]BucketStats, len(replicationStatsList)),
Timestamp: UTCNow(),
@ -599,7 +599,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
for k, replicationStats := range replicationStatsList {
bucketStatsMap.Stats[k] = BucketStats{
ReplicationStats: replicationStats,
ProxyStats: globalReplicationStats.getProxyStats(k),
ProxyStats: globalReplicationStats.Load().getProxyStats(k),
}
}
@ -632,11 +632,13 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String())
}
}
bucketStats = append(bucketStats, BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}},
ProxyStats: globalReplicationStats.getProxyStats(bucketName),
})
if st := globalReplicationStats.Load(); st != nil {
bucketStats = append(bucketStats, BucketStats{
ReplicationStats: st.Get(bucketName),
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}},
ProxyStats: st.getProxyStats(bucketName),
})
}
return bucketStats
}
@ -665,7 +667,7 @@ func (sys *NotificationSys) GetClusterSiteMetrics(ctx context.Context) []SRMetri
peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String())
}
}
siteStats = append(siteStats, globalReplicationStats.getSRMetricsForNode())
siteStats = append(siteStats, globalReplicationStats.Load().getSRMetricsForNode())
return siteStats
}
@ -1605,7 +1607,7 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node
if node != "all" && node != globalLocalNodeName {
return nil
}
mCh, err := globalReplicationPool.getMRF(ctx, bucket)
mCh, err := globalReplicationPool.Get().getMRF(ctx, bucket)
if err != nil {
return err
}

View File

@ -505,11 +505,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
if (isErrObjectNotFound(err) || isErrVersionNotFound(err) || isErrReadQuorum(err)) && !(gr != nil && gr.ObjInfo.DeleteMarker) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.incProxy(bucket, getObjectAPI, false)
globalReplicationStats.Load().incProxy(bucket, getObjectAPI, false)
// proxy to replication target if active-active replication is in place.
reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
if perr != nil {
globalReplicationStats.incProxy(bucket, getObjectAPI, true)
globalReplicationStats.Load().incProxy(bucket, getObjectAPI, true)
proxyGetErr := ErrorRespToObjectError(perr, bucket, object)
if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) &&
!isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) {
@ -1025,14 +1025,14 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.incProxy(bucket, headObjectAPI, false)
globalReplicationStats.Load().incProxy(bucket, headObjectAPI, false)
var oi ObjectInfo
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
if proxy.Proxy {
objInfo = oi
}
if proxy.Err != nil {
globalReplicationStats.incProxy(bucket, headObjectAPI, true)
globalReplicationStats.Load().incProxy(bucket, headObjectAPI, true)
writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err))
return
}
@ -2090,7 +2090,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
defer globalReplicationStats.UpdateReplicaStat(bucket, size)
defer globalReplicationStats.Load().UpdateReplicaStat(bucket, size)
}
// Check if bucket encryption is enabled
@ -3301,11 +3301,11 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false)
globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, false)
// proxy to replication target if site replication is in place.
tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts)
if gerr.Err != nil || tags == nil {
globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true)
globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL)
return
} // overlay tags from peer site.
@ -3404,11 +3404,11 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false)
globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, false)
// proxy to replication target if site replication is in place.
perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts)
if perr.Err != nil {
globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true)
globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
return
}
@ -3501,11 +3501,11 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false)
globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, false)
// proxy to replication target if active-active replication is in place.
perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts)
if perr.Err != nil {
globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true)
globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
return
}

View File

@ -1029,7 +1029,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
actualSize, _ := objInfo.GetActualSize()
defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize)
defer globalReplicationStats.Load().UpdateReplicaStat(bucket, actualSize)
}
// Get object location.

View File

@ -469,7 +469,7 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP
return np, grid.NewRemoteErr(errors.New("Bucket name is missing"))
}
globalReplicationStats.Delete(bucketName)
globalReplicationStats.Load().Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
globalBucketTargetSys.Delete(bucketName)
globalEventNotifier.RemoveNotification(bucketName)
@ -483,12 +483,12 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP
// GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer.
func (s *peerRESTServer) GetAllBucketStatsHandler(mss *grid.MSS) (*BucketStatsMap, *grid.RemoteErr) {
replicationStats := globalReplicationStats.GetAll()
replicationStats := globalReplicationStats.Load().GetAll()
bucketStatsMap := make(map[string]BucketStats, len(replicationStats))
for k, v := range replicationStats {
bucketStatsMap[k] = BucketStats{
ReplicationStats: v,
ProxyStats: globalReplicationStats.getProxyStats(k),
ProxyStats: globalReplicationStats.Load().getProxyStats(k),
}
}
return &BucketStatsMap{Stats: bucketStatsMap, Timestamp: time.Now()}, nil
@ -501,11 +501,14 @@ func (s *peerRESTServer) GetBucketStatsHandler(vars *grid.MSS) (*BucketStats, *g
if bucketName == "" {
return nil, grid.NewRemoteErrString("Bucket name is missing")
}
st := globalReplicationStats.Load()
if st == nil {
return &BucketStats{}, nil
}
bs := BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}},
ProxyStats: globalReplicationStats.getProxyStats(bucketName),
ReplicationStats: st.Get(bucketName),
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}},
ProxyStats: st.getProxyStats(bucketName),
}
return &bs, nil
}
@ -516,9 +519,11 @@ func (s *peerRESTServer) GetSRMetricsHandler(mss *grid.MSS) (*SRMetricsSummary,
if objAPI == nil {
return nil, grid.NewRemoteErr(errServerNotInitialized)
}
sm := globalReplicationStats.getSRMetricsForNode()
return &sm, nil
if st := globalReplicationStats.Load(); st != nil {
sm := st.getSRMetricsForNode()
return &sm, nil
}
return &SRMetricsSummary{}, nil
}
// LoadBucketMetadataHandler - reloads in memory bucket metadata
@ -1173,7 +1178,7 @@ func (s *peerRESTServer) GetReplicationMRFHandler(w http.ResponseWriter, r *http
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
ctx := newContext(r, w, "GetReplicationMRF")
re, err := globalReplicationPool.getMRF(ctx, bucketName)
re, err := globalReplicationPool.Get().getMRF(ctx, bucketName)
if err != nil {
s.writeErrorResponse(w, err)
return

View File

@ -1082,7 +1082,7 @@ func serverMain(ctx *cli.Context) {
// initialize replication resync state.
bootstrapTrace("initResync", func() {
globalReplicationPool.initResync(GlobalContext, buckets, newObject)
globalReplicationPool.Get().initResync(GlobalContext, buckets, newObject)
})
// Initialize site replication manager after bucket metadata

View File

@ -5858,7 +5858,7 @@ func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer
})
continue
}
if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{
if err := globalReplicationPool.Get().resyncer.start(ctx, objAPI, resyncOpts{
bucket: bucket,
arn: tgtArn,
resyncID: rs.ResyncID,
@ -5953,8 +5953,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye
continue
}
// update resync state for the bucket
globalReplicationPool.resyncer.Lock()
m, ok := globalReplicationPool.resyncer.statusMap[bucket]
globalReplicationPool.Get().resyncer.Lock()
m, ok := globalReplicationPool.Get().resyncer.statusMap[bucket]
if !ok {
m = newBucketResyncStatus(bucket)
}
@ -5964,8 +5964,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye
m.TargetsMap[t.Arn] = st
m.LastUpdate = UTCNow()
}
globalReplicationPool.resyncer.statusMap[bucket] = m
globalReplicationPool.resyncer.Unlock()
globalReplicationPool.Get().resyncer.statusMap[bucket] = m
globalReplicationPool.Get().resyncer.Unlock()
}
}
@ -5975,7 +5975,7 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye
return res, err
}
select {
case globalReplicationPool.resyncer.resyncCancelCh <- struct{}{}:
case globalReplicationPool.Get().resyncer.resyncCancelCh <- struct{}{}:
case <-ctx.Done():
}

View File

@ -351,7 +351,9 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer
// Test Server needs to start before formatting of disks.
// Get credential.
credentials := globalActiveCred
if !globalReplicationPool.IsSet() {
globalReplicationPool.Set(nil)
}
testServer.Obj = objLayer
testServer.rawDiskPaths = disks
testServer.Disks = mustGetPoolEndpoints(0, disks...)

View File

@ -0,0 +1,46 @@
package once
// Singleton contains a pointer to T that must be set once.
// Until the value is set all Get() calls will block.
type Singleton[T any] struct {
v *T
set chan struct{}
}
// NewSingleton creates a new unset singleton.
func NewSingleton[T any]() *Singleton[T] {
return &Singleton[T]{set: make(chan struct{}), v: nil}
}
// Get will return the singleton value.
func (s *Singleton[T]) Get() *T {
<-s.set
return s.v
}
// GetNonBlocking will return the singleton value or nil if not set yet.
func (s *Singleton[T]) GetNonBlocking() *T {
select {
case <-s.set:
return s.v
default:
return nil
}
}
// IsSet will return whether the singleton has been set.
func (s *Singleton[T]) IsSet() bool {
select {
case <-s.set:
return true
default:
return false
}
}
// Set the value and unblock all Get requests.
// This may only be called once, a second call will panic.
func (s *Singleton[T]) Set(v *T) {
s.v = v
close(s.set)
}