1
0
mirror of https://github.com/minio/minio.git synced 2025-01-26 22:23:15 -05:00
minio/cmd/background-heal-ops.go
Anis Elleuch d4dcf1d722
metrics: Use StorageInfo() instead to have consistent info ()
Metrics used to have its own code to calculate offline disks.
StorageInfo() was avoided because it is an expensive operation
by sending calls to all nodes.

To make metrics & server info share the same code, a new
argument `local` is added to StorageInfo() so it will only
query local disks when needed.

Metrics now calls StorageInfo() as server info handler does
but with the local flag set to false.

Co-authored-by: Praveen raj Mani <praveen@minio.io>
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-02-20 09:21:33 +05:30

191 lines
5.2 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
// healTask represents what to heal along with options
// path: '/' => Heal disk formats along with metadata
// path: 'bucket/' or '/bucket/' => Heal bucket
// path: 'bucket/object' => Heal object
type healTask struct {
path string
opts madmin.HealOpts
// Healing response will be sent here
responseCh chan healResult
}
// healResult represents a healing result with a possible error
type healResult struct {
result madmin.HealResultItem
err error
}
// healRoutine receives heal tasks, to heal buckets, objects and format.json
type healRoutine struct {
tasks chan healTask
doneCh chan struct{}
}
// Add a new task in the tasks queue
func (h *healRoutine) queueHealTask(task healTask) {
h.tasks <- task
}
func waitForLowHTTPReq(tolerance int32) {
if httpServer := newHTTPServerFn(); httpServer != nil {
// Wait at max 10 minute for an inprogress request before proceeding to heal
waitCount := 600
// Any requests in progress, delay the heal.
for (httpServer.GetRequestCount() >= tolerance) &&
waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
}
// Wait for heal requests and process them
func (h *healRoutine) run() {
ctx := context.Background()
for {
select {
case task, ok := <-h.tasks:
if !ok {
break
}
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(globalEndpoints.Nodes()))
var res madmin.HealResultItem
var err error
bucket, object := path2BucketObject(task.path)
switch {
case bucket == "" && object == "":
res, err = bgHealDiskFormat(ctx, task.opts)
case bucket != "" && object == "":
res, err = bgHealBucket(ctx, bucket, task.opts)
case bucket != "" && object != "":
res, err = bgHealObject(ctx, bucket, object, task.opts)
}
if task.responseCh != nil {
task.responseCh <- healResult{result: res, err: err}
}
case <-h.doneCh:
return
case <-GlobalServiceDoneCh:
return
}
}
}
func initHealRoutine() *healRoutine {
return &healRoutine{
tasks: make(chan healTask),
doneCh: make(chan struct{}),
}
}
func startBackgroundHealing() {
ctx := context.Background()
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Run the background healer
globalBackgroundHealRoutine = initHealRoutine()
go globalBackgroundHealRoutine.run()
// Launch the background healer sequence to track
// background healing operations
info := objAPI.StorageInfo(ctx, false)
numDisks := info.Backend.OnlineDisks.Sum() + info.Backend.OfflineDisks.Sum()
nh := newBgHealSequence(numDisks)
globalBackgroundHealState.LaunchNewHealSequence(nh)
}
func initBackgroundHealing() {
go startBackgroundHealing()
}
// bgHealDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
res, err := objectAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return madmin.HealResultItem{}, err
}
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers if healing is not required.
if err == nil {
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
return res, nil
}
// bghealBucket - traverses and heals given bucket
func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove)
}
// bgHealObject - heal the given object and record result
func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts.DryRun, opts.Remove, opts.ScanMode)
}