mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
mrf: Avoid rare data race and more simplification (#12791)
This change avoids a rare data race and simplify the function that returns MRF last activity information.
This commit is contained in:
66
cmd/mrf.go
66
cmd/mrf.go
@@ -20,6 +20,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
@@ -48,19 +49,11 @@ type setInfo struct {
|
||||
index, pool int
|
||||
}
|
||||
|
||||
type mrfStats struct {
|
||||
triggeredAt time.Time
|
||||
|
||||
itemsHealed uint64
|
||||
bytesHealed uint64
|
||||
|
||||
pendingItems uint64
|
||||
pendingBytes uint64
|
||||
}
|
||||
|
||||
// mrfState sncapsulates all the information
|
||||
// related to the global background MRF.
|
||||
type mrfState struct {
|
||||
ready int32
|
||||
|
||||
ctx context.Context
|
||||
objectAPI ObjectLayer
|
||||
|
||||
@@ -77,9 +70,30 @@ type mrfState struct {
|
||||
triggeredAt time.Time
|
||||
}
|
||||
|
||||
// Initialize healing MRF subsystem
|
||||
func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.ctx = ctx
|
||||
m.objectAPI = objAPI
|
||||
m.opCh = make(chan partialOperation, mrfOpsQueueSize)
|
||||
m.pendingOps = make(map[partialOperation]setInfo)
|
||||
m.setReconnectEvent = make(chan setInfo)
|
||||
|
||||
go globalMRFState.maintainMRFList()
|
||||
go globalMRFState.healRoutine()
|
||||
|
||||
atomic.StoreInt32(&m.ready, 1)
|
||||
}
|
||||
|
||||
func (m *mrfState) initialized() bool {
|
||||
return atomic.LoadInt32(&m.ready) != 0
|
||||
}
|
||||
|
||||
// Add a partial S3 operation (put/delete) when one or more disks are offline.
|
||||
func (m *mrfState) addPartialOp(op partialOperation) {
|
||||
if m == nil {
|
||||
if !m.initialized() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -91,7 +105,7 @@ func (m *mrfState) addPartialOp(op partialOperation) {
|
||||
|
||||
// Receive the new set (disk) reconnection event
|
||||
func (m *mrfState) newSetReconnected(pool, set int) {
|
||||
if m == nil {
|
||||
if !m.initialized() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -104,8 +118,8 @@ func (m *mrfState) newSetReconnected(pool, set int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Get current MRF stats
|
||||
func (m *mrfState) getCurrentMRFRoundInfo() mrfStats {
|
||||
// Get current MRF stats of the last MRF activity
|
||||
func (m *mrfState) getCurrentMRFRoundInfo() madmin.MRFStatus {
|
||||
m.mu.Lock()
|
||||
triggeredAt := m.triggeredAt
|
||||
itemsHealed := m.itemsHealed
|
||||
@@ -115,15 +129,15 @@ func (m *mrfState) getCurrentMRFRoundInfo() mrfStats {
|
||||
m.mu.Unlock()
|
||||
|
||||
if pendingItems == 0 {
|
||||
return mrfStats{}
|
||||
return madmin.MRFStatus{}
|
||||
}
|
||||
|
||||
return mrfStats{
|
||||
triggeredAt: triggeredAt,
|
||||
itemsHealed: itemsHealed,
|
||||
bytesHealed: bytesHealed,
|
||||
pendingItems: pendingItems,
|
||||
pendingBytes: pendingBytes,
|
||||
return madmin.MRFStatus{
|
||||
Started: triggeredAt,
|
||||
ItemsHealed: itemsHealed,
|
||||
BytesHealed: bytesHealed,
|
||||
TotalItems: itemsHealed + pendingItems,
|
||||
TotalBytes: bytesHealed + pendingBytes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,13 +244,5 @@ func (m *mrfState) healRoutine() {
|
||||
|
||||
// Initialize healing MRF
|
||||
func initHealMRF(ctx context.Context, obj ObjectLayer) {
|
||||
globalMRFState = &mrfState{
|
||||
ctx: ctx,
|
||||
objectAPI: obj,
|
||||
opCh: make(chan partialOperation, mrfOpsQueueSize),
|
||||
pendingOps: make(map[partialOperation]setInfo),
|
||||
setReconnectEvent: make(chan setInfo),
|
||||
}
|
||||
go globalMRFState.maintainMRFList()
|
||||
go globalMRFState.healRoutine()
|
||||
globalMRFState.init(ctx, obj)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user