implement Heal sets API to heal erasure sets independently

This commit is contained in:
Harshavardhana 2021-01-22 15:44:39 -08:00
parent 006c69f716
commit 8724d49116
10 changed files with 344 additions and 67 deletions

View File

@ -59,6 +59,11 @@ const (
mgmtClientToken = "clientToken"
mgmtForceStart = "forceStart"
mgmtForceStop = "forceStop"
healSetsUUID = "healSetsUUID"
healSetsList = "healSetsList"
healSleepDuration = "healSleepDuration"
healSleepMaxIO = "healSleepMaxIO"
)
func updateServer(u *url.URL, sha256Sum []byte, lrTime time.Time, mode string) (us madmin.ServerUpdateStatus, err error) {
@ -635,6 +640,154 @@ func extractHealInitParams(vars map[string]string, qParms url.Values, r io.Reade
return
}
type healInitSetParams struct {
taskUUID string
setNumbers []int
sleepDuration time.Duration
sleepForIO int
cancel func()
}
// CancelHealSetsHandler - POST /minio/admin/v3/cancel-heal-sets/
func (a adminAPIHandlers) CancelHealSetsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CancelHeal")
defer logger.AuditLog(w, r, "CancelHeal", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction)
if objectAPI == nil {
return
}
// Check if this setup has an erasure coded backend.
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
z, ok := objectAPI.(*erasureServerSets)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
if !z.SingleZone() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
taskUUID := vars[healSetsUUID]
if taskUUID == "" {
writeErrorResponseJSON(ctx, w, APIError{
Code: "XMinioHealNoSuchProcess",
Description: "No such heal process is running on the server",
HTTPStatusCode: http.StatusNotFound,
}, r.URL)
return
}
a.mu.Lock()
defer a.mu.Unlock()
opts, ok := a.healSetsMap[taskUUID]
if !ok {
writeErrorResponseJSON(ctx, w, APIError{
Code: "XMinioHealNoSuchProcess",
Description: "No such heal process is running on the server",
HTTPStatusCode: http.StatusNotFound,
}, r.URL)
return
}
opts.cancel()
delete(a.healSetsMap, opts.taskUUID)
}
// HealSetsHandler - POST /minio/admin/v3/heal-sets/
func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "HealSets")
defer logger.AuditLog(w, r, "HealSets", mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction)
if objectAPI == nil {
return
}
// Check if this setup has an erasure coded backend.
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
z, ok := objectAPI.(*erasureServerSets)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
if !z.SingleZone() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
opts := healInitSetParams{
taskUUID: mustGetUUID(),
}
for _, setIdx := range strings.Split(vars[healSetsList], ",") {
if setIdx == "" {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errors.New("empty values not allowed")), r.URL)
return
}
i, err := strconv.Atoi(setIdx)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if i == 0 {
i = 1
}
opts.setNumbers = append(opts.setNumbers, i-1)
}
opts.sleepDuration = time.Second
var err error
if v := vars[healSleepDuration]; v != "" {
opts.sleepDuration, err = time.ParseDuration(v)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
opts.sleepForIO = 10
if v := vars[healSleepMaxIO]; v != "" {
opts.sleepForIO, err = strconv.Atoi(v)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
buckets, _ := objectAPI.ListBucketsHeal(ctx)
ctx, opts.cancel = context.WithCancel(context.Background())
for _, setNumber := range opts.setNumbers {
go func(setNumber int) {
lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks()
if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err)
}
}(setNumber)
}
a.mu.Lock()
a.healSetsMap[opts.taskUUID] = opts
a.mu.Unlock()
writeSuccessResponseJSON(w, []byte(fmt.Sprintf(`"%s"`, opts.taskUUID)))
}
// HealHandler - POST /minio/admin/v3/heal/
// -----------
// Start heal processing and return heal status items.
@ -875,7 +1028,7 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
return
}
aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context())
aggregateHealStateResult, err := getAggregatedBackgroundHealState(ctx)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return

View File

@ -339,6 +339,10 @@ type healSource struct {
bucket string
object string
versionID string
throttle struct {
maxSleep time.Duration
maxIO int
}
opts *madmin.HealOpts // optional heal option overrides default setting
}
@ -656,7 +660,7 @@ func (h *healSequence) healSequenceStart() {
}
}
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
func (h *healSequence) queueHealTask(ctx context.Context, source healSource, healType madmin.HealItemType) error {
// Send heal request
task := healTask{
bucket: source.bucket,
@ -664,11 +668,21 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
versionID: source.versionID,
opts: h.settings,
responseCh: h.respCh,
sleepForIO: globalEndpoints.NEndpoints(),
sleepDuration: time.Second,
}
if source.opts != nil {
task.opts = *source.opts
}
if source.throttle.maxIO > 0 {
task.sleepForIO = source.throttle.maxIO
}
if source.throttle.maxSleep > 0 {
task.sleepDuration = source.throttle.maxSleep
}
h.mutex.Lock()
h.scannedItemsMap[healType]++
h.lastHealActivity = UTCNow()
@ -677,6 +691,8 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
globalBackgroundHealRoutine.queueHealTask(task)
select {
case <-ctx.Done():
return nil
case res := <-h.respCh:
if !h.reportProgress {
// Object might have been deleted, by the time heal
@ -746,12 +762,8 @@ func (h *healSequence) healItemsFromSourceCh() error {
itemType = madmin.HealItemObject
}
if err := h.queueHealTask(source, itemType); err != nil {
switch err.(type) {
case ObjectExistsAsDirectory:
case ObjectNotFound:
case VersionNotFound:
default:
if err := h.queueHealTask(context.Background(), source, itemType); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w",
pathJoin(source.bucket, source.object), err))
}
@ -821,7 +833,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
return errHealStopSignalled
}
err := h.queueHealTask(healSource{
err := h.queueHealTask(context.Background(), healSource{
bucket: bucket,
object: object,
versionID: versionID,
@ -849,7 +861,7 @@ func (h *healSequence) healDiskFormat() error {
return errServerNotInitialized
}
return h.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata)
return h.queueHealTask(context.Background(), healSource{bucket: SlashSeparator}, madmin.HealItemMetadata)
}
// healBuckets - check for all buckets heal or just particular bucket.
@ -891,7 +903,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error {
return errServerNotInitialized
}
if err := h.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket); err != nil {
if err := h.queueHealTask(context.Background(), healSource{bucket: bucket}, madmin.HealItemBucket); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
return err
}
@ -941,7 +953,7 @@ func (h *healSequence) healObject(bucket, object, versionID string) error {
return errHealStopSignalled
}
err := h.queueHealTask(healSource{
err := h.queueHealTask(context.Background(), healSource{
bucket: bucket,
object: object,
versionID: versionID,

View File

@ -18,6 +18,7 @@ package cmd
import (
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
@ -34,12 +35,18 @@ const (
)
// adminAPIHandlers provides HTTP handlers for MinIO admin API.
type adminAPIHandlers struct{}
type adminAPIHandlers struct {
mu sync.Mutex
healSetsMap map[string]healInitSetParams
}
// registerAdminRouter - Add handler functions for each service REST API routes.
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) {
adminAPI := adminAPIHandlers{}
adminAPI := adminAPIHandlers{
healSetsMap: make(map[string]healInitSetParams),
}
// Admin router
adminRouter := router.PathPrefix(adminPathPrefix).Subrouter()
@ -68,11 +75,17 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
/// Heal operations
// Heal processing endpoint.
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(httpTraceHdrs(adminAPI.HealHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceAll(adminAPI.BackgroundHealStatusHandler))
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/cancel-heal-sets").
HandlerFunc(httpTraceHdrs(adminAPI.CancelHealSetsHandler)).
Queries(healSetsUUID, "{healSetsUUID:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/heal-sets").
HandlerFunc(httpTraceHdrs(adminAPI.HealSetsHandler)).
Queries(healSetsList, "{healSetsList:.*}", healSleepMaxIO, "{healSleepMaxIO:.*}", healSleepDuration, "{healSleepDuration:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(httpTraceHdrs(adminAPI.BackgroundHealStatusHandler))
/// Health operations

View File

@ -32,6 +32,8 @@ type healTask struct {
bucket string
object string
versionID string
sleepDuration time.Duration
sleepForIO int
opts madmin.HealOpts
// Healing response will be sent here
responseCh chan healResult
@ -54,20 +56,32 @@ func (h *healRoutine) queueHealTask(task healTask) {
h.tasks <- task
}
func waitForLowHTTPReq(tolerance int32, maxWait time.Duration) {
const wait = 10 * time.Millisecond
waitCount := maxWait / wait
func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
// No need to wait run at full speed.
if maxIO <= 0 {
return
}
waitTick := 100 * time.Millisecond
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
tolerance += int32(globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers())
maxIOFn := func() int {
return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers())
}
if httpServer := newHTTPServerFn(); httpServer != nil {
// Any requests in progress, delay the heal.
for (httpServer.GetRequestCount() >= tolerance) &&
waitCount > 0 {
waitCount--
time.Sleep(wait)
for httpServer.GetRequestCount() >= int32(maxIOFn()) {
if maxWait < waitTick {
time.Sleep(maxWait)
} else {
time.Sleep(waitTick)
}
maxWait = maxWait - waitTick
if maxWait <= 0 {
return
}
}
}
}
@ -82,7 +96,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
}
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
waitForLowHTTPReq(task.sleepForIO, task.sleepDuration)
var res madmin.HealResultItem
var err error

View File

@ -114,7 +114,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *
case <-ctx.Done():
return
case <-time.After(defaultMonitorNewDiskInterval):
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
waitForLowHTTPReq(globalEndpoints.NEndpoints(), time.Second)
var erasureSetInZoneDisksToHeal []map[int][]StorageAPI
@ -168,7 +168,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks()
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil {
if err := healErasureSet(ctx, setIndex, 10, time.Second, buckets, lbDisks); err != nil {
logger.LogIf(ctx, err)
continue
}

View File

@ -497,7 +497,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
defer func() {
t = UTCNow()
}()
return bgSeq.queueHealTask(healSource{
return bgSeq.queueHealTask(ctx,
healSource{
bucket: bucket,
object: object,
versionID: versionID,

View File

@ -98,16 +98,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
errs := g.Wait()
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1)
if reducedErr == errVolumeNotFound {
return res, nil
}
// Initialize heal result info
res = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: len(storageDisks),
ParityBlocks: len(storageDisks) / 2,
DataBlocks: len(storageDisks) / 2,
}
for i := range beforeState {
@ -118,6 +115,18 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
})
}
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1)
if reducedErr == errVolumeNotFound {
for i := range beforeState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
State: madmin.DriveStateOk,
})
}
return res, nil
}
// Initialize sync waitgroup.
g = errgroup.WithNErrs(len(storageDisks))
@ -221,8 +230,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
partsMetadata []FileInfo, errs []error, latestFileInfo FileInfo,
dryRun bool, remove bool, scanMode madmin.HealScanMode) (result madmin.HealResultItem, err error) {
dataBlocks := latestFileInfo.Erasure.DataBlocks
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
@ -306,7 +313,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < dataBlocks {
if numAvailableDisks < result.DataBlocks {
// Check if er.meta, and corresponding parts are also missing.
if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok {
writeQuorum := m.Erasure.DataBlocks + 1
@ -338,9 +345,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Latest FileInfo for reference. If a valid metadata is not
// present, it is as good as object not found.
latestMeta, pErr := pickValidFileInfo(ctx, partsMetadata, modTime, dataBlocks)
if pErr != nil {
return result, toObjectErr(pErr, bucket, object)
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
if err != nil {
return result, toObjectErr(err, bucket, object)
}
cleanFileInfo := func(fi FileInfo) FileInfo {

View File

@ -140,7 +140,12 @@ func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, objec
}
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
if err != nil {
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())
logger.LogIf(ctx, err)
}

View File

@ -97,7 +97,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error {
func healErasureSet(ctx context.Context, setIndex int, maxIO int, maxSleep time.Duration, buckets []BucketInfo, disks []StorageAPI) error {
// Get background heal sequence to send elements to heal
var bgSeq *healSequence
var ok bool
@ -114,18 +114,18 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
}
}
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
}, BucketInfo{
Name: pathJoin(minioMetaBucket, bucketConfigPrefix),
}) // add metadata .minio.sys/ bucket prefixes to heal
// Try to pro-actively heal backend-encrypted file.
bgSeq.sourceCh <- healSource{
bucket: minioMetaBucket,
object: backendEncryptedFile,
}
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
}, BucketInfo{
Name: pathJoin(minioMetaBucket, bucketConfigPrefix),
}) // add metadata .minio.sys/ bucket prefixes to heal
// Heal all buckets with all objects
for _, bucket := range buckets {
// Heal current bucket
@ -165,11 +165,18 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
}
for _, version := range entry.Versions {
bgSeq.sourceCh <- healSource{
hsrc := healSource{
bucket: bucket.Name,
object: version.Name,
versionID: version.VersionID,
}
hsrc.throttle.maxIO = maxIO
hsrc.throttle.maxSleep = maxSleep
if err := bgSeq.queueHealTask(ctx, hsrc, madmin.HealItemObject); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
}
}
}
}

View File

@ -209,6 +209,71 @@ func (hri *HealResultItem) GetOnlineCounts() (b, a int) {
return
}
// HealSetsOpts heal sets options
type HealSetsOpts struct {
TaskID string
Sets string // comma separated list of set numbers
SleepMaxIO string // maximum IO tolerance after which healing would sleep
// maximum sleep duration between objects to slow down heal operation
// only applied in conjunction with maxIO.
SleepMax string
}
// CancelHealSets cancels task started with HealSets()
func (adm *AdminClient) CancelHealSets(ctx context.Context, opts HealSetsOpts) error {
queryVals := make(url.Values)
queryVals.Set("healSetsUUID", opts.TaskID)
resp, err := adm.executeMethod(ctx,
http.MethodPost, requestData{
relPath: adminAPIPrefix + "/cancel-heal-sets",
queryValues: queryVals,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp)
}
return nil
}
// HealSets starts a new background heal sets task in parallel across multiple sets.
func (adm *AdminClient) HealSets(ctx context.Context, opts HealSetsOpts) (string, error) {
queryVals := make(url.Values)
queryVals.Set("healSetsList", opts.Sets)
queryVals.Set("healSleepMaxIO", opts.SleepMaxIO)
queryVals.Set("healSleepDuration", opts.SleepMax)
resp, err := adm.executeMethod(ctx,
http.MethodPost, requestData{
relPath: adminAPIPrefix + "/heal-sets",
queryValues: queryVals,
})
defer closeResponse(resp)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", httpRespToErrorResponse(resp)
}
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
var taskID string
if err = json.Unmarshal(respBytes, &taskID); err != nil {
return "", err
}
return taskID, nil
}
// Heal - API endpoint to start heal and to fetch status
// forceStart and forceStop are mutually exclusive, you can either
// set one of them to 'true'. If both are set 'forceStart' will be