heal multiple buckets in parallel

This commit is contained in:
Harshavardhana 2021-02-01 20:20:12 -08:00
parent 745a4b31ba
commit fa1cd6dcce
3 changed files with 59 additions and 47 deletions

View File

@ -780,7 +780,8 @@ func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request
go func(setNumber int) { go func(setNumber int) {
defer wg.Done() defer wg.Done()
lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks() lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks()
if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil { setDriveCount := z.SetDriveCount()
if err := healErasureSet(ctx, setNumber, setDriveCount, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
}(setNumber) }(setNumber)

View File

@ -168,7 +168,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks() lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks()
if err := healErasureSet(ctx, setIndex, 10, time.Second, buckets, lbDisks); err != nil { setDriveCount := z.SetDriveCount()
if err := healErasureSet(ctx, setIndex, setDriveCount, 10, time.Second, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }

View File

@ -98,7 +98,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
} }
// healErasureSet lists and heals all objects in a specific erasure set // healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error { func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error {
// Get background heal sequence to send elements to heal // Get background heal sequence to send elements to heal
var bgSeq *healSequence var bgSeq *healSequence
var ok bool var ok bool
@ -128,59 +128,69 @@ func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time.
}) // add metadata .minio.sys/ bucket prefixes to heal }) // add metadata .minio.sys/ bucket prefixes to heal
// Heal all buckets with all objects // Heal all buckets with all objects
var wg sync.WaitGroup
for _, bucket := range buckets { for _, bucket := range buckets {
// Heal current bucket wg.Add(1)
bgSeq.sourceCh <- healSource{ go func(bucket BucketInfo, disks []StorageAPI) {
bucket: bucket.Name, defer wg.Done()
}
var entryChs []FileInfoVersionsCh // Heal current bucket
var mu sync.Mutex bgSeq.sourceCh <- healSource{
var wg sync.WaitGroup bucket: bucket.Name,
for _, disk := range disks {
disk := disk
wg.Add(1)
go func() {
defer wg.Done()
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done())
if err != nil {
// Disk walk returned error, ignore it.
return
}
mu.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh,
})
mu.Unlock()
}()
}
wg.Wait()
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfoVersions, len(entryChs))
for {
entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
if !ok {
break
} }
for _, version := range entry.Versions { var entryChs []FileInfoVersionsCh
hsrc := healSource{ var mu sync.Mutex
bucket: bucket.Name, var wwg sync.WaitGroup
object: version.Name, for _, disk := range disks {
versionID: version.VersionID, wwg.Add(1)
go func(disk StorageAPI) {
defer wwg.Done()
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done())
if err != nil {
// Disk walk returned error, ignore it.
return
}
mu.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh,
})
mu.Unlock()
}(disk)
}
wwg.Wait()
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfoVersions, len(entryChs))
for {
entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
if !ok {
break
} }
hsrc.throttle.maxIO = maxIO
hsrc.throttle.maxSleep = maxSleep if quorumCount == setDriveCount {
if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil { continue
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { }
logger.LogIf(ctx, err)
for _, version := range entry.Versions {
hsrc := healSource{
bucket: bucket.Name,
object: version.Name,
versionID: version.VersionID,
}
hsrc.throttle.maxIO = maxIO
hsrc.throttle.maxSleep = maxSleep
if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
} }
} }
} }
} }(bucket, disks)
} }
wg.Wait()
return nil return nil
} }