From 8724d491164fd246202dafde7c4c479f39cd5ad8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 22 Jan 2021 15:44:39 -0800 Subject: [PATCH] implement Heal sets API to heal erasure sets independently --- cmd/admin-handlers.go | 155 +++++++++++++++++++++++++++- cmd/admin-heal-ops.go | 46 ++++++--- cmd/admin-router.go | 25 +++-- cmd/background-heal-ops.go | 40 ++++--- cmd/background-newdisks-heal-ops.go | 4 +- cmd/data-crawler.go | 11 +- cmd/erasure-healing.go | 35 ++++--- cmd/erasure-metadata-utils.go | 7 +- cmd/global-heal.go | 23 +++-- pkg/madmin/heal-commands.go | 65 ++++++++++++ 10 files changed, 344 insertions(+), 67 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 8b14cb600..6b3cb92f8 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -59,6 +59,11 @@ const ( mgmtClientToken = "clientToken" mgmtForceStart = "forceStart" mgmtForceStop = "forceStop" + + healSetsUUID = "healSetsUUID" + healSetsList = "healSetsList" + healSleepDuration = "healSleepDuration" + healSleepMaxIO = "healSleepMaxIO" ) func updateServer(u *url.URL, sha256Sum []byte, lrTime time.Time, mode string) (us madmin.ServerUpdateStatus, err error) { @@ -635,6 +640,154 @@ func extractHealInitParams(vars map[string]string, qParms url.Values, r io.Reade return } +type healInitSetParams struct { + taskUUID string + setNumbers []int + sleepDuration time.Duration + sleepForIO int + cancel func() +} + +// CancelHealSetsHandler - POST /minio/admin/v3/cancel-heal-sets/ +func (a adminAPIHandlers) CancelHealSetsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CancelHeal") + + defer logger.AuditLog(w, r, "CancelHeal", mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction) + if objectAPI == nil { + return + } + + // Check if this setup has an erasure coded backend. + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + z, ok := objectAPI.(*erasureServerSets) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + if !z.SingleZone() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + taskUUID := vars[healSetsUUID] + if taskUUID == "" { + writeErrorResponseJSON(ctx, w, APIError{ + Code: "XMinioHealNoSuchProcess", + Description: "No such heal process is running on the server", + HTTPStatusCode: http.StatusNotFound, + }, r.URL) + return + } + + a.mu.Lock() + defer a.mu.Unlock() + opts, ok := a.healSetsMap[taskUUID] + if !ok { + writeErrorResponseJSON(ctx, w, APIError{ + Code: "XMinioHealNoSuchProcess", + Description: "No such heal process is running on the server", + HTTPStatusCode: http.StatusNotFound, + }, r.URL) + return + } + + opts.cancel() + delete(a.healSetsMap, opts.taskUUID) +} + +// HealSetsHandler - POST /minio/admin/v3/heal-sets/ +func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "HealSets") + + defer logger.AuditLog(w, r, "HealSets", mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction) + if objectAPI == nil { + return + } + + // Check if this setup has an erasure coded backend. + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + z, ok := objectAPI.(*erasureServerSets) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + if !z.SingleZone() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + opts := healInitSetParams{ + taskUUID: mustGetUUID(), + } + for _, setIdx := range strings.Split(vars[healSetsList], ",") { + if setIdx == "" { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errors.New("empty values not allowed")), r.URL) + return + } + i, err := strconv.Atoi(setIdx) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + if i == 0 { + i = 1 + } + opts.setNumbers = append(opts.setNumbers, i-1) + } + + opts.sleepDuration = time.Second + var err error + if v := vars[healSleepDuration]; v != "" { + opts.sleepDuration, err = time.ParseDuration(v) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + + opts.sleepForIO = 10 + if v := vars[healSleepMaxIO]; v != "" { + opts.sleepForIO, err = strconv.Atoi(v) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + } + + buckets, _ := objectAPI.ListBucketsHeal(ctx) + ctx, opts.cancel = context.WithCancel(context.Background()) + for _, setNumber := range opts.setNumbers { + go func(setNumber int) { + lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks() + if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil { + logger.LogIf(ctx, err) + } + }(setNumber) + } + + a.mu.Lock() + a.healSetsMap[opts.taskUUID] = opts + a.mu.Unlock() + + writeSuccessResponseJSON(w, []byte(fmt.Sprintf(`"%s"`, opts.taskUUID))) +} + // HealHandler - POST /minio/admin/v3/heal/ // ----------- // Start heal processing and return heal status items. @@ -875,7 +1028,7 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * return } - aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context()) + aggregateHealStateResult, err := getAggregatedBackgroundHealState(ctx) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b7d71d2e1..85e145795 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -339,7 +339,11 @@ type healSource struct { bucket string object string versionID string - opts *madmin.HealOpts // optional heal option overrides default setting + throttle struct { + maxSleep time.Duration + maxIO int + } + opts *madmin.HealOpts // optional heal option overrides default setting } // healSequence - state for each heal sequence initiated on the @@ -656,19 +660,29 @@ func (h *healSequence) healSequenceStart() { } } -func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { +func (h *healSequence) queueHealTask(ctx context.Context, source healSource, healType madmin.HealItemType) error { // Send heal request task := healTask{ - bucket: source.bucket, - object: source.object, - versionID: source.versionID, - opts: h.settings, - responseCh: h.respCh, + bucket: source.bucket, + object: source.object, + versionID: source.versionID, + opts: h.settings, + responseCh: h.respCh, + sleepForIO: globalEndpoints.NEndpoints(), + sleepDuration: time.Second, } if source.opts != nil { task.opts = *source.opts } + if source.throttle.maxIO > 0 { + task.sleepForIO = source.throttle.maxIO + } + + if source.throttle.maxSleep > 0 { + task.sleepDuration = source.throttle.maxSleep + } + h.mutex.Lock() h.scannedItemsMap[healType]++ h.lastHealActivity = UTCNow() @@ -677,6 +691,8 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem globalBackgroundHealRoutine.queueHealTask(task) select { + case <-ctx.Done(): + return nil case res := <-h.respCh: if !h.reportProgress { // Object might have been deleted, by the time heal @@ -746,12 +762,8 @@ func (h *healSequence) healItemsFromSourceCh() error { itemType = madmin.HealItemObject } - if err := h.queueHealTask(source, itemType); err != nil { - switch err.(type) { - case ObjectExistsAsDirectory: - case ObjectNotFound: - case VersionNotFound: - default: + if err := h.queueHealTask(context.Background(), source, itemType); err != nil { + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w", pathJoin(source.bucket, source.object), err)) } @@ -821,7 +833,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error { return errHealStopSignalled } - err := h.queueHealTask(healSource{ + err := h.queueHealTask(context.Background(), healSource{ bucket: bucket, object: object, versionID: versionID, @@ -849,7 +861,7 @@ func (h *healSequence) healDiskFormat() error { return errServerNotInitialized } - return h.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata) + return h.queueHealTask(context.Background(), healSource{bucket: SlashSeparator}, madmin.HealItemMetadata) } // healBuckets - check for all buckets heal or just particular bucket. @@ -891,7 +903,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error { return errServerNotInitialized } - if err := h.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket); err != nil { + if err := h.queueHealTask(context.Background(), healSource{bucket: bucket}, madmin.HealItemBucket); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { return err } @@ -941,7 +953,7 @@ func (h *healSequence) healObject(bucket, object, versionID string) error { return errHealStopSignalled } - err := h.queueHealTask(healSource{ + err := h.queueHealTask(context.Background(), healSource{ bucket: bucket, object: object, versionID: versionID, diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 3c52c1ce5..9d1c34818 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -18,6 +18,7 @@ package cmd import ( "net/http" + "sync" "github.com/gorilla/mux" "github.com/minio/minio/cmd/config" @@ -34,12 +35,18 @@ const ( ) // adminAPIHandlers provides HTTP handlers for MinIO admin API. -type adminAPIHandlers struct{} +type adminAPIHandlers struct { + mu sync.Mutex + healSetsMap map[string]healInitSetParams +} // registerAdminRouter - Add handler functions for each service REST API routes. func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) { - adminAPI := adminAPIHandlers{} + adminAPI := adminAPIHandlers{ + healSetsMap: make(map[string]healInitSetParams), + } + // Admin router adminRouter := router.PathPrefix(adminPathPrefix).Subrouter() @@ -68,11 +75,17 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) /// Heal operations // Heal processing endpoint. - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(httpTraceAll(adminAPI.HealHandler)) - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(httpTraceAll(adminAPI.HealHandler)) - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(httpTraceAll(adminAPI.HealHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler)) - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceAll(adminAPI.BackgroundHealStatusHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/cancel-heal-sets"). + HandlerFunc(httpTraceHdrs(adminAPI.CancelHealSetsHandler)). + Queries(healSetsUUID, "{healSetsUUID:.*}") + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/heal-sets"). + HandlerFunc(httpTraceHdrs(adminAPI.HealSetsHandler)). + Queries(healSetsList, "{healSetsList:.*}", healSleepMaxIO, "{healSleepMaxIO:.*}", healSleepDuration, "{healSleepDuration:.*}") + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceHdrs(adminAPI.BackgroundHealStatusHandler)) /// Health operations diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index a5f511c50..041345770 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -29,10 +29,12 @@ import ( // path: 'bucket/' or '/bucket/' => Heal bucket // path: 'bucket/object' => Heal object type healTask struct { - bucket string - object string - versionID string - opts madmin.HealOpts + bucket string + object string + versionID string + sleepDuration time.Duration + sleepForIO int + opts madmin.HealOpts // Healing response will be sent here responseCh chan healResult } @@ -54,20 +56,32 @@ func (h *healRoutine) queueHealTask(task healTask) { h.tasks <- task } -func waitForLowHTTPReq(tolerance int32, maxWait time.Duration) { - const wait = 10 * time.Millisecond - waitCount := maxWait / wait +func waitForLowHTTPReq(maxIO int, maxWait time.Duration) { + // No need to wait run at full speed. + if maxIO <= 0 { + return + } + + waitTick := 100 * time.Millisecond // Bucket notification and http trace are not costly, it is okay to ignore them // while counting the number of concurrent connections - tolerance += int32(globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers()) + maxIOFn := func() int { + return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers()) + } if httpServer := newHTTPServerFn(); httpServer != nil { // Any requests in progress, delay the heal. - for (httpServer.GetRequestCount() >= tolerance) && - waitCount > 0 { - waitCount-- - time.Sleep(wait) + for httpServer.GetRequestCount() >= int32(maxIOFn()) { + if maxWait < waitTick { + time.Sleep(maxWait) + } else { + time.Sleep(waitTick) + } + maxWait = maxWait - waitTick + if maxWait <= 0 { + return + } } } } @@ -82,7 +96,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) + waitForLowHTTPReq(task.sleepForIO, task.sleepDuration) var res madmin.HealResultItem var err error diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 5bfc28242..15ac3ec8c 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -114,7 +114,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq * case <-ctx.Done(): return case <-time.After(defaultMonitorNewDiskInterval): - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) + waitForLowHTTPReq(globalEndpoints.NEndpoints(), time.Second) var erasureSetInZoneDisksToHeal []map[int][]StorageAPI @@ -168,7 +168,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq * logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks() - if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil { + if err := healErasureSet(ctx, setIndex, 10, time.Second, buckets, lbDisks); err != nil { logger.LogIf(ctx, err) continue } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 6d2ee9250..1b980d3f6 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -497,11 +497,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo defer func() { t = UTCNow() }() - return bgSeq.queueHealTask(healSource{ - bucket: bucket, - object: object, - versionID: versionID, - }, madmin.HealItemObject) + return bgSeq.queueHealTask(ctx, + healSource{ + bucket: bucket, + object: object, + versionID: versionID, + }, madmin.HealItemObject) }) sleepDuration(time.Since(t), f.dataUsageCrawlMult) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 754aa5e2c..033ba143a 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -98,16 +98,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints errs := g.Wait() - reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1) - if reducedErr == errVolumeNotFound { - return res, nil - } - // Initialize heal result info res = madmin.HealResultItem{ - Type: madmin.HealItemBucket, - Bucket: bucket, - DiskCount: len(storageDisks), + Type: madmin.HealItemBucket, + Bucket: bucket, + DiskCount: len(storageDisks), + ParityBlocks: len(storageDisks) / 2, + DataBlocks: len(storageDisks) / 2, } for i := range beforeState { @@ -118,6 +115,18 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints }) } + reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1) + if reducedErr == errVolumeNotFound { + for i := range beforeState { + res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i], + State: madmin.DriveStateOk, + }) + } + return res, nil + } + // Initialize sync waitgroup. g = errgroup.WithNErrs(len(storageDisks)) @@ -221,8 +230,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s partsMetadata []FileInfo, errs []error, latestFileInfo FileInfo, dryRun bool, remove bool, scanMode madmin.HealScanMode) (result madmin.HealResultItem, err error) { - dataBlocks := latestFileInfo.Erasure.DataBlocks - storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() @@ -306,7 +313,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // If less than read quorum number of disks have all the parts // of the data, we can't reconstruct the erasure-coded data. - if numAvailableDisks < dataBlocks { + if numAvailableDisks < result.DataBlocks { // Check if er.meta, and corresponding parts are also missing. if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok { writeQuorum := m.Erasure.DataBlocks + 1 @@ -338,9 +345,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Latest FileInfo for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, pErr := pickValidFileInfo(ctx, partsMetadata, modTime, dataBlocks) - if pErr != nil { - return result, toObjectErr(pErr, bucket, object) + latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks) + if err != nil { + return result, toObjectErr(err, bucket, object) } cleanFileInfo := func(fi FileInfo) FileInfo { diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index 1dbdedbfc..91ab9e63f 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -140,7 +140,12 @@ func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, objec } metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir) if err != nil { - if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound { + if !IsErr(err, []error{ + errFileNotFound, + errVolumeNotFound, + errFileVersionNotFound, + errDiskNotFound, + }...) { logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String()) logger.LogIf(ctx, err) } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 8a2cad8b3..d105d48fe 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -97,7 +97,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { +func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error { // Get background heal sequence to send elements to heal var bgSeq *healSequence var ok bool @@ -114,18 +114,18 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis } } - buckets = append(buckets, BucketInfo{ - Name: pathJoin(minioMetaBucket, minioConfigPrefix), - }, BucketInfo{ - Name: pathJoin(minioMetaBucket, bucketConfigPrefix), - }) // add metadata .minio.sys/ bucket prefixes to heal - // Try to pro-actively heal backend-encrypted file. bgSeq.sourceCh <- healSource{ bucket: minioMetaBucket, object: backendEncryptedFile, } + buckets = append(buckets, BucketInfo{ + Name: pathJoin(minioMetaBucket, minioConfigPrefix), + }, BucketInfo{ + Name: pathJoin(minioMetaBucket, bucketConfigPrefix), + }) // add metadata .minio.sys/ bucket prefixes to heal + // Heal all buckets with all objects for _, bucket := range buckets { // Heal current bucket @@ -165,11 +165,18 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis } for _, version := range entry.Versions { - bgSeq.sourceCh <- healSource{ + hsrc := healSource{ bucket: bucket.Name, object: version.Name, versionID: version.VersionID, } + hsrc.throttle.maxIO = maxIO + hsrc.throttle.maxSleep = maxSleep + if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil { + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } + } } } } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index 689c5a848..63877ec6b 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -209,6 +209,71 @@ func (hri *HealResultItem) GetOnlineCounts() (b, a int) { return } +// HealSetsOpts heal sets options +type HealSetsOpts struct { + TaskID string + Sets string // comma separated list of set numbers + SleepMaxIO string // maximum IO tolerance after which healing would sleep + // maximum sleep duration between objects to slow down heal operation + // only applied in conjunction with maxIO. + SleepMax string +} + +// CancelHealSets cancels task started with HealSets() +func (adm *AdminClient) CancelHealSets(ctx context.Context, opts HealSetsOpts) error { + queryVals := make(url.Values) + queryVals.Set("healSetsUUID", opts.TaskID) + + resp, err := adm.executeMethod(ctx, + http.MethodPost, requestData{ + relPath: adminAPIPrefix + "/cancel-heal-sets", + queryValues: queryVals, + }) + defer closeResponse(resp) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return httpRespToErrorResponse(resp) + } + + return nil +} + +// HealSets starts a new background heal sets task in parallel across multiple sets. +func (adm *AdminClient) HealSets(ctx context.Context, opts HealSetsOpts) (string, error) { + queryVals := make(url.Values) + queryVals.Set("healSetsList", opts.Sets) + queryVals.Set("healSleepMaxIO", opts.SleepMaxIO) + queryVals.Set("healSleepDuration", opts.SleepMax) + + resp, err := adm.executeMethod(ctx, + http.MethodPost, requestData{ + relPath: adminAPIPrefix + "/heal-sets", + queryValues: queryVals, + }) + defer closeResponse(resp) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusOK { + return "", httpRespToErrorResponse(resp) + } + + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + var taskID string + if err = json.Unmarshal(respBytes, &taskID); err != nil { + return "", err + } + + return taskID, nil +} + // Heal - API endpoint to start heal and to fetch status // forceStart and forceStop are mutually exclusive, you can either // set one of them to 'true'. If both are set 'forceStart' will be