diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 81b58ead6..13bed4ab8 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1682,7 +1682,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } globalNotificationSys.DeleteBucketMetadata(ctx, bucket) - globalReplicationPool.deleteResyncMetadata(ctx, bucket) + globalReplicationPool.Get().deleteResyncMetadata(ctx, bucket) // Call site replication hook. replLogIf(ctx, globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete)) diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index f593f7666..05a9d2e85 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -230,7 +230,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW w.Header().Set(xhttp.ContentType, string(mimeJSON)) enc := json.NewEncoder(w) - stats := globalReplicationStats.getLatestReplicationStats(bucket) + stats := globalReplicationStats.Load().getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { @@ -286,7 +286,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsV2Handler(w http.Respons w.Header().Set(xhttp.ContentType, string(mimeJSON)) enc := json.NewEncoder(w) - stats := globalReplicationStats.getLatestReplicationStats(bucket) + stats := globalReplicationStats.Load().getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { @@ -422,7 +422,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW return } - if err := globalReplicationPool.resyncer.start(ctx, objectAPI, resyncOpts{ + if err := globalReplicationPool.Get().resyncer.start(ctx, objectAPI, resyncOpts{ bucket: bucket, arn: arn, resyncID: resetID, diff --git a/cmd/bucket-replication-metrics.go b/cmd/bucket-replication-metrics.go index 3b3b56af6..aa4cfb963 100644 --- a/cmd/bucket-replication-metrics.go +++ b/cmd/bucket-replication-metrics.go @@ -119,7 +119,7 @@ func (a *ActiveWorkerStat) update() { if a == nil { return } - a.Curr = globalReplicationPool.ActiveWorkers() + a.Curr = globalReplicationPool.Get().ActiveWorkers() a.hist.Update(int64(a.Curr)) a.Avg = float32(a.hist.Mean()) a.Max = int(a.hist.Max()) diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 1df4a6827..d20a0ff47 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -87,6 +87,9 @@ func (r *ReplicationStats) updateMovingAvg() { // ActiveWorkers returns worker stats func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat { + if r == nil { + return ActiveWorkerStat{} + } r.wlock.RLock() defer r.wlock.RUnlock() w := r.workers.get() @@ -351,6 +354,9 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio } func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) { + if r == nil { + return nil + } peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage)) @@ -460,6 +466,9 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket // get the most current of in-memory replication stats and data usage info from crawler. func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) { + if r == nil { + return s + } bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) return r.calculateBucketReplicationStats(bucket, bucketStats) } @@ -495,9 +504,14 @@ func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opTyp // incProxy increments proxy metrics for proxied calls func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) { - r.pCache.inc(bucket, api, isErr) + if r != nil { + r.pCache.inc(bucket, api, isErr) + } } func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric { + if r == nil { + return ProxyMetric{} + } return r.pCache.getBucketStats(bucket) } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b7c63b044..5902c3ee6 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -50,6 +50,7 @@ import ( xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" "golang.org/x/exp/maps" @@ -478,7 +479,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { - globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry()) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -548,7 +549,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj // to decrement pending count later. for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(dobj.Bucket, rinfo, replicationStatus, + globalReplicationStats.Load().Update(dobj.Bucket, rinfo, replicationStatus, prevStatus) } } @@ -556,7 +557,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj eventName := event.ObjectReplicationComplete if replicationStatus == replication.Failed { eventName = event.ObjectReplicationFailed - globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry()) } drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID) if replicationStatus != prevStatus { @@ -1054,7 +1055,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) return } ctx = lkctx.Context() @@ -1139,7 +1140,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { rinfo.OpType = opType // update optype to reflect correct operation. - globalReplicationStats.Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus) + globalReplicationStats.Load().Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus) } } } @@ -1159,7 +1160,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje ri.EventType = ReplicateMRF ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal() ri.RetryCount++ - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) } } @@ -1787,8 +1788,8 @@ const ( ) var ( - globalReplicationPool *ReplicationPool - globalReplicationStats *ReplicationStats + globalReplicationPool = once.NewSingleton[ReplicationPool]() + globalReplicationStats atomic.Pointer[ReplicationStats] ) // ReplicationPool describes replication pool @@ -1803,6 +1804,7 @@ type ReplicationPool struct { priority string maxWorkers int maxLWorkers int + stats *ReplicationStats mu sync.RWMutex mrfMU sync.Mutex @@ -1849,7 +1851,7 @@ const ( ) // NewReplicationPool creates a pool of replication workers of specified size -func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { +func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts, stats *ReplicationStats) *ReplicationPool { var workers, failedWorkers int priority := "auto" maxWorkers := WorkerMaxLimit @@ -1891,6 +1893,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool mrfStopCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, + stats: stats, priority: priority, maxWorkers: maxWorkers, maxLWorkers: maxLWorkers, @@ -1918,11 +1921,11 @@ func (p *ReplicationPool) AddMRFWorker() { } switch v := oi.(type) { case ReplicateObjectInfo: - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) atomic.AddInt32(&p.activeMRFWorkers, 1) replicateObject(p.ctx, v, p.objLayer) atomic.AddInt32(&p.activeMRFWorkers, -1) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) default: bugLogIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type") @@ -1950,9 +1953,9 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) replicateObject(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) } @@ -1960,10 +1963,10 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, 0, true, v.OpType) + p.stats.incQ(v.Bucket, 0, true, v.OpType) replicateDelete(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, 0, true, v.OpType) + p.stats.decQ(v.Bucket, 0, true, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) @@ -1990,9 +1993,9 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) replicateObject(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) } @@ -2156,7 +2159,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case <-p.ctx.Done(): case p.lrgworkers[h%uint64(len(p.lrgworkers))] <- ri: default: - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + p.queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() maxLWorkers := p.maxLWorkers existing := len(p.lrgworkers) @@ -2187,7 +2190,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case healCh <- ri: case ch <- ri: default: - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() prio := p.priority maxWorkers := p.maxWorkers @@ -2223,7 +2226,7 @@ func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObje doi.ResetID = v.ResetID doi.TargetArn = k - globalReplicationPool.queueReplicaDeleteTask(doi) + globalReplicationPool.Get().queueReplicaDeleteTask(doi) } } } @@ -2244,7 +2247,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf case <-p.ctx.Done(): case ch <- doi: default: - globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) + p.queueMRFSave(doi.ToMRFEntry()) p.mu.RLock() prio := p.priority maxWorkers := p.maxWorkers @@ -2274,9 +2277,10 @@ type replicationPoolOpts struct { } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts()) - globalReplicationStats = NewReplicationStats(ctx, objectAPI) - go globalReplicationStats.trackEWMA() + stats := NewReplicationStats(ctx, objectAPI) + globalReplicationPool.Set(NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts(), stats)) + globalReplicationStats.Store(stats) + go stats.trackEWMA() } type proxyResult struct { @@ -2482,7 +2486,7 @@ func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc if dsc.Synchronous() { replicateObject(ctx, ri, o) } else { - globalReplicationPool.queueReplicaTask(ri) + globalReplicationPool.Get().queueReplicaTask(ri) } } @@ -2610,9 +2614,9 @@ func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts } func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) { - globalReplicationPool.queueReplicaDeleteTask(dv) + globalReplicationPool.Get().queueReplicaDeleteTask(dv) for arn := range dv.ReplicationState.Targets { - globalReplicationStats.Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType("")) + globalReplicationStats.Load().Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType("")) } } @@ -3042,9 +3046,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt if len(tgtArns) == 0 { return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn) } - globalReplicationPool.resyncer.RLock() - data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] - globalReplicationPool.resyncer.RUnlock() + globalReplicationPool.Get().resyncer.RLock() + data, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket] + globalReplicationPool.Get().resyncer.RUnlock() if !ok { data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI) if err != nil { @@ -3070,9 +3074,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt return err } - globalReplicationPool.resyncer.Lock() - defer globalReplicationPool.resyncer.Unlock() - brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] + globalReplicationPool.Get().resyncer.Lock() + defer globalReplicationPool.Get().resyncer.Unlock() + brs, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket] if !ok { brs = BucketReplicationResyncStatus{ Version: resyncMetaVersion, @@ -3080,8 +3084,8 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt } } brs.TargetsMap[opts.arn] = status - globalReplicationPool.resyncer.statusMap[opts.bucket] = brs - go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts) + globalReplicationPool.Get().resyncer.statusMap[opts.bucket] = brs + go globalReplicationPool.Get().resyncer.resyncBucket(GlobalContext, objAPI, false, opts) return nil } @@ -3413,7 +3417,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf if roi.ReplicationStatus == replication.Pending || roi.ReplicationStatus == replication.Failed || roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending { - globalReplicationPool.queueReplicaDeleteTask(dv) + globalReplicationPool.Get().queueReplicaDeleteTask(dv) return } // if replication status is Complete on DeleteMarker and existing object resync required @@ -3429,12 +3433,12 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf switch roi.ReplicationStatus { case replication.Pending, replication.Failed: roi.EventType = ReplicateHeal - globalReplicationPool.queueReplicaTask(roi) + globalReplicationPool.Get().queueReplicaTask(roi) return } if roi.ExistingObjResync.mustResync() { roi.EventType = ReplicateExisting - globalReplicationPool.queueReplicaTask(roi) + globalReplicationPool.Get().queueReplicaTask(roi) } return } @@ -3499,8 +3503,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { return } if entry.RetryCount > mrfRetryLimit { // let scanner catch up if retry count exceeded - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1) - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) return } @@ -3513,8 +3517,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { select { case p.mrfSaveCh <- entry: default: - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1) - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) } } } @@ -3563,7 +3567,7 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string if !p.initialized() { return } - atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries))) + atomic.StoreUint64(&p.stats.mrfStats.LastFailedCount, uint64(len(entries))) if len(entries) == 0 { return } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index a51fbf41d..82f8e885a 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -310,7 +310,7 @@ type ReplQNodeStats struct { func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats) { qs.NodeName = globalLocalNodeName qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() - qs.ActiveWorkers = globalReplicationStats.ActiveWorkers() + qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers() qs.XferStats = make(map[RMetricName]XferStats) qs.QStats = r.qCache.getBucketStats(bucket) qs.TgtXferStats = make(map[string]map[RMetricName]XferStats) @@ -402,7 +402,7 @@ func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats) func (r *ReplicationStats) getNodeQueueStatsSummary() (qs ReplQNodeStats) { qs.NodeName = globalLocalNodeName qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() - qs.ActiveWorkers = globalReplicationStats.ActiveWorkers() + qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers() qs.XferStats = make(map[RMetricName]XferStats) qs.QStats = r.qCache.getSiteStats() qs.MRFStats = ReplicationMRFStats{ diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 11b67411b..8f6eb3172 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -170,9 +170,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int, legacy bool) { listQuorum = "strict" } t.listQuorum = listQuorum - if globalReplicationPool != nil && + if r := globalReplicationPool.GetNonBlocking(); r != nil && (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers || cfg.ReplicationMaxLWorkers != t.replicationMaxLWorkers) { - globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers) + r.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers) } t.replicationPriority = cfg.ReplicationPriority t.replicationMaxWorkers = cfg.ReplicationMaxWorkers diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 98c0161e8..3dd3ac192 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -2313,8 +2313,8 @@ func getReplicationNodeMetrics(opts MetricsGroupOpts) *MetricsGroupV2 { var ml []MetricV2 // common operational metrics for bucket replication and site replication - published // at cluster level - if globalReplicationStats != nil { - qs := globalReplicationStats.getNodeQueueStatsSummary() + if rStats := globalReplicationStats.Load(); rStats != nil { + qs := rStats.getNodeQueueStatsSummary() activeWorkersCount := MetricV2{ Description: getClusterReplActiveWorkersCountMD(), } @@ -3245,7 +3245,7 @@ func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroupV2 { var bucketReplStats map[string]BucketStats if !globalSiteReplicationSys.isEnabled() { - bucketReplStats = globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage) + bucketReplStats = globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage) } for bucket, usage := range dataUsageInfo.BucketsUsage { quota, _ := globalBucketQuotaSys.Get(ctx, bucket) diff --git a/cmd/metrics-v3-bucket-replication.go b/cmd/metrics-v3-bucket-replication.go index 64f65e832..ef341801a 100644 --- a/cmd/metrics-v3-bucket-replication.go +++ b/cmd/metrics-v3-bucket-replication.go @@ -119,7 +119,7 @@ func loadBucketReplicationMetrics(ctx context.Context, m MetricValues, c *metric return nil } - bucketReplStats := globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage) + bucketReplStats := globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage) for _, bucket := range buckets { labels := []string{bucketL, bucket} if s, ok := bucketReplStats[bucket]; ok { diff --git a/cmd/metrics-v3-replication.go b/cmd/metrics-v3-replication.go index da26e0956..44a8e87ae 100644 --- a/cmd/metrics-v3-replication.go +++ b/cmd/metrics-v3-replication.go @@ -69,11 +69,12 @@ var ( // loadClusterReplicationMetrics - `MetricsLoaderFn` for cluster replication metrics // such as transfer rate and objects queued. func loadClusterReplicationMetrics(ctx context.Context, m MetricValues, c *metricsCache) error { - if globalReplicationStats == nil { + st := globalReplicationStats.Load() + if st == nil { return nil } - qs := globalReplicationStats.getNodeQueueStatsSummary() + qs := st.getNodeQueueStatsSummary() qt := qs.QStats m.Set(replicationAverageQueuedBytes, float64(qt.Avg.Bytes)) diff --git a/cmd/metrics.go b/cmd/metrics.go index 42ae31a7b..0548ff2b0 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -300,7 +300,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { } for bucket, usageInfo := range dataUsageInfo.BucketsUsage { - stat := globalReplicationStats.getLatestReplicationStats(bucket) + stat := globalReplicationStats.Load().getLatestReplicationStats(bucket) // Total space used by bucket ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( diff --git a/cmd/notification.go b/cmd/notification.go index e37577566..5d65a8130 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -537,7 +537,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s // DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) { - globalReplicationStats.Delete(bucketName) + globalReplicationStats.Load().Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) globalEventNotifier.RemoveNotification(bucketName) @@ -591,7 +591,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck } } - replicationStatsList := globalReplicationStats.GetAll() + replicationStatsList := globalReplicationStats.Load().GetAll() bucketStatsMap := BucketStatsMap{ Stats: make(map[string]BucketStats, len(replicationStatsList)), Timestamp: UTCNow(), @@ -599,7 +599,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck for k, replicationStats := range replicationStatsList { bucketStatsMap.Stats[k] = BucketStats{ ReplicationStats: replicationStats, - ProxyStats: globalReplicationStats.getProxyStats(k), + ProxyStats: globalReplicationStats.Load().getProxyStats(k), } } @@ -632,11 +632,13 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String()) } } - bucketStats = append(bucketStats, BucketStats{ - ReplicationStats: globalReplicationStats.Get(bucketName), - QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, - ProxyStats: globalReplicationStats.getProxyStats(bucketName), - }) + if st := globalReplicationStats.Load(); st != nil { + bucketStats = append(bucketStats, BucketStats{ + ReplicationStats: st.Get(bucketName), + QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}}, + ProxyStats: st.getProxyStats(bucketName), + }) + } return bucketStats } @@ -665,7 +667,7 @@ func (sys *NotificationSys) GetClusterSiteMetrics(ctx context.Context) []SRMetri peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String()) } } - siteStats = append(siteStats, globalReplicationStats.getSRMetricsForNode()) + siteStats = append(siteStats, globalReplicationStats.Load().getSRMetricsForNode()) return siteStats } @@ -1605,7 +1607,7 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node if node != "all" && node != globalLocalNodeName { return nil } - mCh, err := globalReplicationPool.getMRF(ctx, bucket) + mCh, err := globalReplicationPool.Get().getMRF(ctx, bucket) if err != nil { return err } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 4ff2dcc0a..f1bf1f5b7 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -505,11 +505,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj if (isErrObjectNotFound(err) || isErrVersionNotFound(err) || isErrReadQuorum(err)) && !(gr != nil && gr.ObjInfo.DeleteMarker) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, getObjectAPI, false) + globalReplicationStats.Load().incProxy(bucket, getObjectAPI, false) // proxy to replication target if active-active replication is in place. reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts) if perr != nil { - globalReplicationStats.incProxy(bucket, getObjectAPI, true) + globalReplicationStats.Load().incProxy(bucket, getObjectAPI, true) proxyGetErr := ErrorRespToObjectError(perr, bucket, object) if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) && !isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) { @@ -1025,14 +1025,14 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob // proxy HEAD to replication target if active-active replication configured on bucket proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, headObjectAPI, false) + globalReplicationStats.Load().incProxy(bucket, headObjectAPI, false) var oi ObjectInfo oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts) if proxy.Proxy { objInfo = oi } if proxy.Err != nil { - globalReplicationStats.incProxy(bucket, headObjectAPI, true) + globalReplicationStats.Load().incProxy(bucket, headObjectAPI, true) writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err)) return } @@ -2090,7 +2090,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String() metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano) - defer globalReplicationStats.UpdateReplicaStat(bucket, size) + defer globalReplicationStats.Load().UpdateReplicaStat(bucket, size) } // Check if bucket encryption is enabled @@ -3301,11 +3301,11 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, false) // proxy to replication target if site replication is in place. tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts) if gerr.Err != nil || tags == nil { - globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL) return } // overlay tags from peer site. @@ -3404,11 +3404,11 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, false) // proxy to replication target if site replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts) if perr.Err != nil { - globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } @@ -3501,11 +3501,11 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, false) // proxy to replication target if active-active replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts) if perr.Err != nil { - globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index fe267b6ee..512dcec88 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -1029,7 +1029,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { actualSize, _ := objInfo.GetActualSize() - defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) + defer globalReplicationStats.Load().UpdateReplicaStat(bucket, actualSize) } // Get object location. diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ef18d0aea..3a383a156 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -469,7 +469,7 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP return np, grid.NewRemoteErr(errors.New("Bucket name is missing")) } - globalReplicationStats.Delete(bucketName) + globalReplicationStats.Load().Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) globalEventNotifier.RemoveNotification(bucketName) @@ -483,12 +483,12 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP // GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer. func (s *peerRESTServer) GetAllBucketStatsHandler(mss *grid.MSS) (*BucketStatsMap, *grid.RemoteErr) { - replicationStats := globalReplicationStats.GetAll() + replicationStats := globalReplicationStats.Load().GetAll() bucketStatsMap := make(map[string]BucketStats, len(replicationStats)) for k, v := range replicationStats { bucketStatsMap[k] = BucketStats{ ReplicationStats: v, - ProxyStats: globalReplicationStats.getProxyStats(k), + ProxyStats: globalReplicationStats.Load().getProxyStats(k), } } return &BucketStatsMap{Stats: bucketStatsMap, Timestamp: time.Now()}, nil @@ -501,11 +501,14 @@ func (s *peerRESTServer) GetBucketStatsHandler(vars *grid.MSS) (*BucketStats, *g if bucketName == "" { return nil, grid.NewRemoteErrString("Bucket name is missing") } - + st := globalReplicationStats.Load() + if st == nil { + return &BucketStats{}, nil + } bs := BucketStats{ - ReplicationStats: globalReplicationStats.Get(bucketName), - QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, - ProxyStats: globalReplicationStats.getProxyStats(bucketName), + ReplicationStats: st.Get(bucketName), + QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}}, + ProxyStats: st.getProxyStats(bucketName), } return &bs, nil } @@ -516,9 +519,11 @@ func (s *peerRESTServer) GetSRMetricsHandler(mss *grid.MSS) (*SRMetricsSummary, if objAPI == nil { return nil, grid.NewRemoteErr(errServerNotInitialized) } - - sm := globalReplicationStats.getSRMetricsForNode() - return &sm, nil + if st := globalReplicationStats.Load(); st != nil { + sm := st.getSRMetricsForNode() + return &sm, nil + } + return &SRMetricsSummary{}, nil } // LoadBucketMetadataHandler - reloads in memory bucket metadata @@ -1173,7 +1178,7 @@ func (s *peerRESTServer) GetReplicationMRFHandler(w http.ResponseWriter, r *http vars := mux.Vars(r) bucketName := vars[peerRESTBucket] ctx := newContext(r, w, "GetReplicationMRF") - re, err := globalReplicationPool.getMRF(ctx, bucketName) + re, err := globalReplicationPool.Get().getMRF(ctx, bucketName) if err != nil { s.writeErrorResponse(w, err) return diff --git a/cmd/server-main.go b/cmd/server-main.go index b006f900c..19036e5cf 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -1082,7 +1082,7 @@ func serverMain(ctx *cli.Context) { // initialize replication resync state. bootstrapTrace("initResync", func() { - globalReplicationPool.initResync(GlobalContext, buckets, newObject) + globalReplicationPool.Get().initResync(GlobalContext, buckets, newObject) }) // Initialize site replication manager after bucket metadata diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 265c99821..728dc80b0 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -5858,7 +5858,7 @@ func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer }) continue } - if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{ + if err := globalReplicationPool.Get().resyncer.start(ctx, objAPI, resyncOpts{ bucket: bucket, arn: tgtArn, resyncID: rs.ResyncID, @@ -5953,8 +5953,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye continue } // update resync state for the bucket - globalReplicationPool.resyncer.Lock() - m, ok := globalReplicationPool.resyncer.statusMap[bucket] + globalReplicationPool.Get().resyncer.Lock() + m, ok := globalReplicationPool.Get().resyncer.statusMap[bucket] if !ok { m = newBucketResyncStatus(bucket) } @@ -5964,8 +5964,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye m.TargetsMap[t.Arn] = st m.LastUpdate = UTCNow() } - globalReplicationPool.resyncer.statusMap[bucket] = m - globalReplicationPool.resyncer.Unlock() + globalReplicationPool.Get().resyncer.statusMap[bucket] = m + globalReplicationPool.Get().resyncer.Unlock() } } @@ -5975,7 +5975,7 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye return res, err } select { - case globalReplicationPool.resyncer.resyncCancelCh <- struct{}{}: + case globalReplicationPool.Get().resyncer.resyncCancelCh <- struct{}{}: case <-ctx.Done(): } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index d7a999733..1e36df60d 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -351,7 +351,9 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer // Test Server needs to start before formatting of disks. // Get credential. credentials := globalActiveCred - + if !globalReplicationPool.IsSet() { + globalReplicationPool.Set(nil) + } testServer.Obj = objLayer testServer.rawDiskPaths = disks testServer.Disks = mustGetPoolEndpoints(0, disks...) diff --git a/internal/once/singleton.go b/internal/once/singleton.go new file mode 100644 index 000000000..7d7f30450 --- /dev/null +++ b/internal/once/singleton.go @@ -0,0 +1,46 @@ +package once + +// Singleton contains a pointer to T that must be set once. +// Until the value is set all Get() calls will block. +type Singleton[T any] struct { + v *T + set chan struct{} +} + +// NewSingleton creates a new unset singleton. +func NewSingleton[T any]() *Singleton[T] { + return &Singleton[T]{set: make(chan struct{}), v: nil} +} + +// Get will return the singleton value. +func (s *Singleton[T]) Get() *T { + <-s.set + return s.v +} + +// GetNonBlocking will return the singleton value or nil if not set yet. +func (s *Singleton[T]) GetNonBlocking() *T { + select { + case <-s.set: + return s.v + default: + return nil + } +} + +// IsSet will return whether the singleton has been set. +func (s *Singleton[T]) IsSet() bool { + select { + case <-s.set: + return true + default: + return false + } +} + +// Set the value and unblock all Get requests. +// This may only be called once, a second call will panic. +func (s *Singleton[T]) Set(v *T) { + s.v = v + close(s.set) +}