heal sets with optional prefix input

This commit is contained in:
Harshavardhana 2021-02-05 11:03:42 -08:00
parent 42157eb218
commit 0615d85384
5 changed files with 28 additions and 9 deletions

View File

@ -63,6 +63,7 @@ const (
healSetsUUID = "healSetsUUID" healSetsUUID = "healSetsUUID"
healSetsList = "healSetsList" healSetsList = "healSetsList"
healSetsPrefix = "healSetsPrefix"
healSleepDuration = "healSleepDuration" healSleepDuration = "healSleepDuration"
healSleepMaxIO = "healSleepMaxIO" healSleepMaxIO = "healSleepMaxIO"
) )
@ -779,9 +780,8 @@ func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request
wg.Add(1) wg.Add(1)
go func(setNumber int) { go func(setNumber int) {
defer wg.Done() defer wg.Done()
lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks() lbDisks := z.serverSets[0].sets[setNumber].getDisks()
setDriveCount := z.SetDriveCount() if err := healErasureSet(ctx, vars[healSetsPrefix], setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
if err := healErasureSet(ctx, setNumber, setDriveCount, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
}(setNumber) }(setNumber)
@ -791,7 +791,11 @@ func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request
opts.cancel() opts.cancel()
delete(a.healSetsMap, opts.taskUUID) delete(a.healSetsMap, opts.taskUUID)
a.mu.Unlock() a.mu.Unlock()
logger.Info("Healing finished for %v", vars[healSetsList]) if vars[healSetsPrefix] != "" {
logger.Info("Healing finished for %v at %s", vars[healSetsList], vars[healSetsPrefix])
} else {
logger.Info("Healing finished for %v", vars[healSetsList])
}
}() }()
a.mu.Lock() a.mu.Lock()

View File

@ -85,7 +85,10 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
Queries(healSetsUUID, "{healSetsUUID:.*}") Queries(healSetsUUID, "{healSetsUUID:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/heal-sets"). adminRouter.Methods(http.MethodPost).Path(adminVersion+"/heal-sets").
HandlerFunc(httpTraceHdrs(adminAPI.HealSetsHandler)). HandlerFunc(httpTraceHdrs(adminAPI.HealSetsHandler)).
Queries(healSetsList, "{healSetsList:.*}", healSleepMaxIO, "{healSleepMaxIO:.*}", healSleepDuration, "{healSleepDuration:.*}") Queries(healSetsList, "{healSetsList:.*}",
healSetsPrefix, "{healSetsPrefix:.*}",
healSleepMaxIO, "{healSleepMaxIO:.*}",
healSleepDuration, "{healSleepDuration:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceHdrs(adminAPI.BackgroundHealStatusHandler)) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceHdrs(adminAPI.BackgroundHealStatusHandler))
/// Health operations /// Health operations

View File

@ -168,8 +168,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks() lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks()
setDriveCount := z.SetDriveCount() if err := healErasureSet(ctx, "", setIndex, 100, time.Second, buckets, lbDisks); err != nil {
if err := healErasureSet(ctx, setIndex, setDriveCount, 10, time.Second, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }

View File

@ -98,7 +98,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
} }
// healErasureSet lists and heals all objects in a specific erasure set // healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error { func healErasureSet(ctx context.Context, prefix string, setIndex int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error {
// Get background heal sequence to send elements to heal // Get background heal sequence to send elements to heal
var bgSeq *healSequence var bgSeq *healSequence
var ok bool var ok bool
@ -115,6 +115,13 @@ func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO
} }
} }
obj := newObjectLayerFn()
if obj == nil {
return errServerNotInitialized
}
setDriveCount := obj.SetDriveCount()
// Try to pro-actively heal backend-encrypted file. // Try to pro-actively heal backend-encrypted file.
bgSeq.sourceCh <- healSource{ bgSeq.sourceCh <- healSource{
bucket: minioMetaBucket, bucket: minioMetaBucket,
@ -146,7 +153,11 @@ func healErasureSet(ctx context.Context, setIndex int, setDriveCount int, maxIO
wwg.Add(1) wwg.Add(1)
go func(disk StorageAPI) { go func(disk StorageAPI) {
defer wwg.Done() defer wwg.Done()
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) if disk == nil {
// disk is nil and not available.
return
}
entryCh, err := disk.WalkVersions(ctx, bucket.Name, prefix, "", true, ctx.Done())
if err != nil { if err != nil {
// Disk walk returned error, ignore it. // Disk walk returned error, ignore it.
return return

View File

@ -213,6 +213,7 @@ func (hri *HealResultItem) GetOnlineCounts() (b, a int) {
type HealSetsOpts struct { type HealSetsOpts struct {
TaskID string TaskID string
Sets string // comma separated list of set numbers Sets string // comma separated list of set numbers
Prefix string // folder at which healing should trigger
SleepMaxIO string // maximum IO tolerance after which healing would sleep SleepMaxIO string // maximum IO tolerance after which healing would sleep
// maximum sleep duration between objects to slow down heal operation // maximum sleep duration between objects to slow down heal operation
// only applied in conjunction with maxIO. // only applied in conjunction with maxIO.
@ -245,6 +246,7 @@ func (adm *AdminClient) CancelHealSets(ctx context.Context, opts HealSetsOpts) e
func (adm *AdminClient) HealSets(ctx context.Context, opts HealSetsOpts) (string, error) { func (adm *AdminClient) HealSets(ctx context.Context, opts HealSetsOpts) (string, error) {
queryVals := make(url.Values) queryVals := make(url.Values)
queryVals.Set("healSetsList", opts.Sets) queryVals.Set("healSetsList", opts.Sets)
queryVals.Set("healSetsPrefix", opts.Prefix)
queryVals.Set("healSleepMaxIO", opts.SleepMaxIO) queryVals.Set("healSleepMaxIO", opts.SleepMaxIO)
queryVals.Set("healSleepDuration", opts.SleepMax) queryVals.Set("healSleepDuration", opts.SleepMax)