mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
simplify MRF, converge it to regular healing (#17026)
This commit is contained in:
183
cmd/mrf.go
183
cmd/mrf.go
@@ -20,16 +20,13 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v2"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
mrfInfoResetInterval = 10 * time.Second
|
||||
mrfOpsQueueSize = 10000
|
||||
mrfOpsQueueSize = 100000
|
||||
)
|
||||
|
||||
// partialOperation is a successful upload/delete of an object
|
||||
@@ -38,35 +35,17 @@ type partialOperation struct {
|
||||
bucket string
|
||||
object string
|
||||
versionID string
|
||||
size int64
|
||||
setIndex int
|
||||
poolIndex int
|
||||
}
|
||||
|
||||
type setInfo struct {
|
||||
index, pool int
|
||||
queued time.Time
|
||||
}
|
||||
|
||||
// mrfState sncapsulates all the information
|
||||
// related to the global background MRF.
|
||||
type mrfState struct {
|
||||
ready int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||
_ int32 // For 64 bits alignment
|
||||
|
||||
ctx context.Context
|
||||
objectAPI ObjectLayer
|
||||
|
||||
mu sync.Mutex
|
||||
opCh chan partialOperation
|
||||
pendingOps map[partialOperation]setInfo
|
||||
setReconnectEvent chan setInfo
|
||||
|
||||
itemsHealed uint64
|
||||
bytesHealed uint64
|
||||
pendingItems uint64
|
||||
pendingBytes uint64
|
||||
|
||||
triggeredAt time.Time
|
||||
mu sync.Mutex
|
||||
opCh chan partialOperation
|
||||
}
|
||||
|
||||
// Initialize healing MRF subsystem
|
||||
@@ -75,24 +54,15 @@ func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) {
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.ctx = ctx
|
||||
m.objectAPI = objAPI
|
||||
m.opCh = make(chan partialOperation, mrfOpsQueueSize)
|
||||
m.pendingOps = make(map[partialOperation]setInfo)
|
||||
m.setReconnectEvent = make(chan setInfo)
|
||||
m.objectAPI = objAPI
|
||||
|
||||
go globalMRFState.maintainMRFList()
|
||||
go globalMRFState.healRoutine()
|
||||
|
||||
atomic.StoreInt32(&m.ready, 1)
|
||||
}
|
||||
|
||||
func (m *mrfState) initialized() bool {
|
||||
return atomic.LoadInt32(&m.ready) != 0
|
||||
}
|
||||
|
||||
// Add a partial S3 operation (put/delete) when one or more disks are offline.
|
||||
func (m *mrfState) addPartialOp(op partialOperation) {
|
||||
if !m.initialized() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -102,140 +72,39 @@ func (m *mrfState) addPartialOp(op partialOperation) {
|
||||
}
|
||||
}
|
||||
|
||||
// Receive the new set (disk) reconnection event
|
||||
func (m *mrfState) newSetReconnected(pool, set int) {
|
||||
if !m.initialized() {
|
||||
return
|
||||
}
|
||||
|
||||
idler := time.NewTimer(100 * time.Millisecond)
|
||||
defer idler.Stop()
|
||||
|
||||
select {
|
||||
case m.setReconnectEvent <- setInfo{index: set, pool: pool}:
|
||||
case <-idler.C:
|
||||
}
|
||||
}
|
||||
|
||||
// Get current MRF stats of the last MRF activity
|
||||
func (m *mrfState) getCurrentMRFRoundInfo() madmin.MRFStatus {
|
||||
m.mu.Lock()
|
||||
triggeredAt := m.triggeredAt
|
||||
itemsHealed := m.itemsHealed
|
||||
bytesHealed := m.bytesHealed
|
||||
pendingItems := m.pendingItems
|
||||
pendingBytes := m.pendingBytes
|
||||
m.mu.Unlock()
|
||||
|
||||
if pendingItems == 0 {
|
||||
return madmin.MRFStatus{}
|
||||
}
|
||||
|
||||
return madmin.MRFStatus{
|
||||
Started: triggeredAt,
|
||||
ItemsHealed: itemsHealed,
|
||||
BytesHealed: bytesHealed,
|
||||
TotalItems: itemsHealed + pendingItems,
|
||||
TotalBytes: bytesHealed + pendingBytes,
|
||||
}
|
||||
}
|
||||
|
||||
// maintainMRFList gathers the list of successful partial uploads
|
||||
// from all underlying er.sets and puts them in a global map which
|
||||
// should not have more than 10000 entries.
|
||||
func (m *mrfState) maintainMRFList() {
|
||||
for fOp := range m.opCh {
|
||||
m.mu.Lock()
|
||||
if len(m.pendingOps) > mrfOpsQueueSize {
|
||||
m.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
m.pendingOps[fOp] = setInfo{index: fOp.setIndex, pool: fOp.poolIndex}
|
||||
m.pendingItems++
|
||||
if fOp.size > 0 {
|
||||
m.pendingBytes += uint64(fOp.size)
|
||||
}
|
||||
|
||||
m.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Reset current MRF stats
|
||||
func (m *mrfState) resetMRFInfoIfNoPendingOps() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if m.pendingItems > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
m.itemsHealed = 0
|
||||
m.bytesHealed = 0
|
||||
m.pendingItems = 0
|
||||
m.pendingBytes = 0
|
||||
m.triggeredAt = time.Time{}
|
||||
}
|
||||
var healSleeper = newDynamicSleeper(5, time.Second, false)
|
||||
|
||||
// healRoutine listens to new disks reconnection events and
|
||||
// issues healing requests for queued objects belonging to the
|
||||
// corresponding erasure set
|
||||
func (m *mrfState) healRoutine() {
|
||||
idler := time.NewTimer(mrfInfoResetInterval)
|
||||
defer idler.Stop()
|
||||
|
||||
mrfHealingOpts := madmin.HealOpts{
|
||||
ScanMode: madmin.HealNormalScan,
|
||||
Remove: healDeleteDangling,
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case <-idler.C:
|
||||
m.resetMRFInfoIfNoPendingOps()
|
||||
idler.Reset(mrfInfoResetInterval)
|
||||
case setInfo := <-m.setReconnectEvent:
|
||||
// Get the list of objects related the er.set
|
||||
// to which the connected disk belongs.
|
||||
var mrfOperations []partialOperation
|
||||
m.mu.Lock()
|
||||
for k, v := range m.pendingOps {
|
||||
if v == setInfo {
|
||||
mrfOperations = append(mrfOperations, k)
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
if len(mrfOperations) == 0 {
|
||||
continue
|
||||
case u, ok := <-m.opCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.triggeredAt = time.Now().UTC()
|
||||
m.mu.Unlock()
|
||||
|
||||
// Heal objects
|
||||
for _, u := range mrfOperations {
|
||||
_, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts)
|
||||
m.mu.Lock()
|
||||
if err == nil {
|
||||
m.itemsHealed++
|
||||
m.bytesHealed += uint64(u.size)
|
||||
}
|
||||
m.pendingItems--
|
||||
m.pendingBytes -= uint64(u.size)
|
||||
delete(m.pendingOps, u)
|
||||
m.mu.Unlock()
|
||||
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
// Log healing error if any
|
||||
logger.LogIf(m.ctx, err)
|
||||
}
|
||||
now := time.Now()
|
||||
if now.Sub(u.queued) < time.Second {
|
||||
// let recently failed networks to reconnect
|
||||
// making MRF wait for 1s before retrying,
|
||||
// i.e 4 reconnect attempts.
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
waitForLowHTTPReq()
|
||||
// wait on timer per heal
|
||||
wait := healSleeper.Timer(context.Background())
|
||||
|
||||
if u.object == "" {
|
||||
healBucket(u.bucket, madmin.HealNormalScan)
|
||||
} else {
|
||||
healObject(u.bucket, u.object, u.versionID, madmin.HealNormalScan)
|
||||
}
|
||||
|
||||
wait()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user