diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 54196245d..54211f845 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -68,17 +68,6 @@ func newBgHealSequence() *healSequence { } } -func getCurrentMRFStatus() madmin.MRFStatus { - mrfInfo := globalMRFState.getCurrentMRFRoundInfo() - return madmin.MRFStatus{ - BytesHealed: mrfInfo.bytesHealed, - ItemsHealed: mrfInfo.itemsHealed, - TotalItems: mrfInfo.itemsHealed + mrfInfo.pendingItems, - TotalBytes: mrfInfo.bytesHealed + mrfInfo.pendingBytes, - Started: mrfInfo.triggeredAt, - } -} - // getBackgroundHealStatus will return the func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealState, bool) { if globalBackgroundHealState == nil { @@ -94,9 +83,9 @@ func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealS ScannedItemsCount: bgSeq.getScannedItemsCount(), } - if globalMRFState != nil { + if globalMRFState.initialized() { status.MRF = map[string]madmin.MRFStatus{ - globalLocalNodeName: getCurrentMRFStatus(), + globalLocalNodeName: globalMRFState.getCurrentMRFRoundInfo(), } } diff --git a/cmd/globals.go b/cmd/globals.go index 34baf3614..49267a0fc 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -293,7 +293,7 @@ var ( globalBackgroundHealRoutine *healRoutine globalBackgroundHealState *allHealState - globalMRFState *mrfState + globalMRFState mrfState // If writes to FS backend should be O_SYNC. globalFSOSync bool diff --git a/cmd/mrf.go b/cmd/mrf.go index 00e53ce5d..f2c8ced47 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -20,6 +20,7 @@ package cmd import ( "context" "sync" + "sync/atomic" "time" "github.com/minio/madmin-go" @@ -48,19 +49,11 @@ type setInfo struct { index, pool int } -type mrfStats struct { - triggeredAt time.Time - - itemsHealed uint64 - bytesHealed uint64 - - pendingItems uint64 - pendingBytes uint64 -} - // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { + ready int32 + ctx context.Context objectAPI ObjectLayer @@ -77,9 +70,30 @@ type mrfState struct { triggeredAt time.Time } +// Initialize healing MRF subsystem +func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) { + m.mu.Lock() + defer m.mu.Unlock() + + m.ctx = ctx + m.objectAPI = objAPI + m.opCh = make(chan partialOperation, mrfOpsQueueSize) + m.pendingOps = make(map[partialOperation]setInfo) + m.setReconnectEvent = make(chan setInfo) + + go globalMRFState.maintainMRFList() + go globalMRFState.healRoutine() + + atomic.StoreInt32(&m.ready, 1) +} + +func (m *mrfState) initialized() bool { + return atomic.LoadInt32(&m.ready) != 0 +} + // Add a partial S3 operation (put/delete) when one or more disks are offline. func (m *mrfState) addPartialOp(op partialOperation) { - if m == nil { + if !m.initialized() { return } @@ -91,7 +105,7 @@ func (m *mrfState) addPartialOp(op partialOperation) { // Receive the new set (disk) reconnection event func (m *mrfState) newSetReconnected(pool, set int) { - if m == nil { + if !m.initialized() { return } @@ -104,8 +118,8 @@ func (m *mrfState) newSetReconnected(pool, set int) { } } -// Get current MRF stats -func (m *mrfState) getCurrentMRFRoundInfo() mrfStats { +// Get current MRF stats of the last MRF activity +func (m *mrfState) getCurrentMRFRoundInfo() madmin.MRFStatus { m.mu.Lock() triggeredAt := m.triggeredAt itemsHealed := m.itemsHealed @@ -115,15 +129,15 @@ func (m *mrfState) getCurrentMRFRoundInfo() mrfStats { m.mu.Unlock() if pendingItems == 0 { - return mrfStats{} + return madmin.MRFStatus{} } - return mrfStats{ - triggeredAt: triggeredAt, - itemsHealed: itemsHealed, - bytesHealed: bytesHealed, - pendingItems: pendingItems, - pendingBytes: pendingBytes, + return madmin.MRFStatus{ + Started: triggeredAt, + ItemsHealed: itemsHealed, + BytesHealed: bytesHealed, + TotalItems: itemsHealed + pendingItems, + TotalBytes: bytesHealed + pendingBytes, } } @@ -230,13 +244,5 @@ func (m *mrfState) healRoutine() { // Initialize healing MRF func initHealMRF(ctx context.Context, obj ObjectLayer) { - globalMRFState = &mrfState{ - ctx: ctx, - objectAPI: obj, - opCh: make(chan partialOperation, mrfOpsQueueSize), - pendingOps: make(map[partialOperation]setInfo), - setReconnectEvent: make(chan setInfo), - } - go globalMRFState.maintainMRFList() - go globalMRFState.healRoutine() + globalMRFState.init(ctx, obj) }