mrf: Enhance behavior for better results (#11788)

MRF was starting to heal when it receives a disk connection event, which
is not good when a node having multiple disks reconnects to the cluster.

Besides, MRF needs Remove healing option to remove stale files.
This commit is contained in:
Anis Elleuch 2021-03-18 19:19:02 +01:00 committed by GitHub
parent 32b088a2ff
commit 14d89eaae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -45,11 +45,6 @@ import (
// setsDsyncLockers is encapsulated type for Close() // setsDsyncLockers is encapsulated type for Close()
type setsDsyncLockers [][]dsync.NetLocker type setsDsyncLockers [][]dsync.NetLocker
// Information of a new disk connection
type diskConnectInfo struct {
setIndex int
}
const envMinioDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL" const envMinioDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL"
// erasureSets implements ObjectLayer combining a static list of erasure coded // erasureSets implements ObjectLayer combining a static list of erasure coded
@ -89,7 +84,9 @@ type erasureSets struct {
poolIndex int poolIndex int
disksConnectEvent chan diskConnectInfo // A channel to send the set index to the MRF when
// any disk belonging to that set is connected
setReconnectEvent chan int
// Distribution algorithm of choice. // Distribution algorithm of choice.
distributionAlgo string distributionAlgo string
@ -199,6 +196,7 @@ func findDiskIndex(refFormat, format *formatErasureV3) (int, int, error) {
// and re-arranges the disks in proper position. // and re-arranges the disks in proper position.
func (s *erasureSets) connectDisks() { func (s *erasureSets) connectDisks() {
var wg sync.WaitGroup var wg sync.WaitGroup
var setsJustConnected = make([]bool, s.setCount)
diskMap := s.getDiskMap() diskMap := s.getDiskMap()
for _, endpoint := range s.endpoints { for _, endpoint := range s.endpoints {
diskPath := endpoint.String() diskPath := endpoint.String()
@ -253,19 +251,29 @@ func (s *erasureSets) connectDisks() {
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
s.erasureDisksMu.Unlock() s.erasureDisksMu.Unlock()
go func(setIndex int) { setsJustConnected[setIndex] = true
idler := time.NewTimer(100 * time.Millisecond)
defer idler.Stop()
// Send a new disk connect event with a timeout
select {
case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}:
case <-idler.C:
}
}(setIndex)
}(endpoint) }(endpoint)
} }
wg.Wait() wg.Wait()
go func() {
idler := time.NewTimer(100 * time.Millisecond)
defer idler.Stop()
for setIndex, justConnected := range setsJustConnected {
if !justConnected {
continue
}
// Send a new set connect event with a timeout
idler.Reset(100 * time.Millisecond)
select {
case s.setReconnectEvent <- setIndex:
case <-idler.C:
}
}
}()
} }
// monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected
@ -354,7 +362,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
setDriveCount: setDriveCount, setDriveCount: setDriveCount,
defaultParityCount: defaultParityCount, defaultParityCount: defaultParityCount,
format: format, format: format,
disksConnectEvent: make(chan diskConnectInfo), setReconnectEvent: make(chan int),
distributionAlgo: format.Erasure.DistributionAlgo, distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID), deploymentID: uuid.MustParse(format.ID),
mrfOperations: make(map[healSource]int), mrfOperations: make(map[healSource]int),
@ -659,12 +667,12 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
} }
} }
select { select {
case _, ok := <-s.disksConnectEvent: case _, ok := <-s.setReconnectEvent:
if ok { if ok {
close(s.disksConnectEvent) close(s.setReconnectEvent)
} }
default: default:
close(s.disksConnectEvent) close(s.setReconnectEvent)
} }
return nil return nil
} }
@ -1353,47 +1361,25 @@ func (s *erasureSets) maintainMRFList() {
bucket: fOp.bucket, bucket: fOp.bucket,
object: fOp.object, object: fOp.object,
versionID: fOp.versionID, versionID: fOp.versionID,
opts: &madmin.HealOpts{Remove: true},
}] = fOp.failedSet }] = fOp.failedSet
s.mrfMU.Unlock() s.mrfMU.Unlock()
} }
} }
func toSourceChTimed(t *time.Timer, sourceCh chan healSource, u healSource) {
t.Reset(100 * time.Millisecond)
// No defer, as we don't know which
// case will be selected
select {
case sourceCh <- u:
case <-t.C:
return
}
// We still need to check the return value
// of Stop, because t could have fired
// between the send on sourceCh and this line.
if !t.Stop() {
<-t.C
}
}
// healMRFRoutine monitors new disks connection, sweep the MRF list // healMRFRoutine monitors new disks connection, sweep the MRF list
// to find objects related to the new disk that needs to be healed. // to find objects related to the new disk that needs to be healed.
func (s *erasureSets) healMRFRoutine() { func (s *erasureSets) healMRFRoutine() {
// Wait until background heal state is initialized // Wait until background heal state is initialized
bgSeq := mustGetHealSequence(GlobalContext) bgSeq := mustGetHealSequence(GlobalContext)
idler := time.NewTimer(100 * time.Millisecond) for setIndex := range s.setReconnectEvent {
defer idler.Stop()
for e := range s.disksConnectEvent {
// Get the list of objects related the er.set // Get the list of objects related the er.set
// to which the connected disk belongs. // to which the connected disk belongs.
var mrfOperations []healSource var mrfOperations []healSource
s.mrfMU.Lock() s.mrfMU.Lock()
for k, v := range s.mrfOperations { for k, v := range s.mrfOperations {
if v == e.setIndex { if v == setIndex {
mrfOperations = append(mrfOperations, k) mrfOperations = append(mrfOperations, k)
} }
} }
@ -1401,8 +1387,10 @@ func (s *erasureSets) healMRFRoutine() {
// Heal objects // Heal objects
for _, u := range mrfOperations { for _, u := range mrfOperations {
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
// Send an object to background heal // Send an object to background heal
toSourceChTimed(idler, bgSeq.sourceCh, u) bgSeq.sourceCh <- u
s.mrfMU.Lock() s.mrfMU.Lock()
delete(s.mrfOperations, u) delete(s.mrfOperations, u)