Parallelize new disks healing of different erasure sets (#15112)

- Always reformat all disks when a new disk is detected, this will
  ensure new uploads to be written in new fresh disks
- Always heal all buckets first when an erasure set started to be healed
- Use a lock to prevent two disks belonging to different nodes but in
  the same erasure set to be healed in parallel
- Heal different sets in parallel

Bonus:
- Avoid logging errUnformattedDisk when a new fresh disk is inserted but
  not detected by healing mechanism yet (10 seconds lag)
This commit is contained in:
Anis Elleuch
2022-06-21 15:53:55 +01:00
committed by GitHub
parent 95b51c48be
commit b3eda248a3
4 changed files with 154 additions and 171 deletions

View File

@@ -91,8 +91,11 @@ type allHealState struct {
sync.RWMutex
// map of heal path to heal sequence
healSeqMap map[string]*healSequence // Indexed by endpoint
healLocalDisks map[Endpoint]struct{}
healSeqMap map[string]*healSequence // Indexed by endpoint
// keep track of the healing status of disks in the memory
// false: the disk needs to be healed but no healing routine is started
// true: the disk is currently healing
healLocalDisks map[Endpoint]bool
healStatus map[string]healingTracker // Indexed by disk ID
}
@@ -100,7 +103,7 @@ type allHealState struct {
func newHealState(cleanup bool) *allHealState {
hstate := &allHealState{
healSeqMap: make(map[string]*healSequence),
healLocalDisks: map[Endpoint]struct{}{},
healLocalDisks: make(map[Endpoint]bool),
healStatus: make(map[string]healingTracker),
}
if cleanup {
@@ -109,13 +112,6 @@ func newHealState(cleanup bool) *allHealState {
return hstate
}
func (ahs *allHealState) healDriveCount() int {
ahs.RLock()
defer ahs.RUnlock()
return len(ahs.healLocalDisks)
}
func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
@@ -165,23 +161,34 @@ func (ahs *allHealState) getLocalHealingDisks() map[string]madmin.HealingDisk {
return dst
}
// getHealLocalDiskEndpoints() returns the list of disks that need
// to be healed but there is no healing routine in progress on them.
func (ahs *allHealState) getHealLocalDiskEndpoints() Endpoints {
ahs.RLock()
defer ahs.RUnlock()
var endpoints Endpoints
for ep := range ahs.healLocalDisks {
endpoints = append(endpoints, ep)
for ep, healing := range ahs.healLocalDisks {
if !healing {
endpoints = append(endpoints, ep)
}
}
return endpoints
}
func (ahs *allHealState) markDiskForHealing(ep Endpoint) {
ahs.Lock()
defer ahs.Unlock()
ahs.healLocalDisks[ep] = true
}
func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
for _, ep := range healLocalDisks {
ahs.healLocalDisks[ep] = struct{}{}
ahs.healLocalDisks[ep] = false
}
}
@@ -804,16 +811,6 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f
}
}
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func (h *healSequence) healDiskFormat() error {
if h.isQuitting() {
return errHealStopSignalled
}
return h.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata)
}
// healBuckets - check for all buckets heal or just particular bucket.
func (h *healSequence) healBuckets(objAPI ObjectLayer, bucketsOnly bool) error {
if h.isQuitting() {