minio/cmd/background-heal-ops.go

148 lines
3.7 KiB
Go
Raw Permalink Normal View History

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
// healTask represents what to heal along with options
// path: '/' => Heal disk formats along with metadata
// path: 'bucket/' or '/bucket/' => Heal bucket
// path: 'bucket/object' => Heal object
type healTask struct {
bucket string
object string
versionID string
opts madmin.HealOpts
// Healing response will be sent here
responseCh chan healResult
}
// healResult represents a healing result with a possible error
type healResult struct {
result madmin.HealResultItem
err error
}
// healRoutine receives heal tasks, to heal buckets, objects and format.json
type healRoutine struct {
tasks chan healTask
doneCh chan struct{}
}
// Add a new task in the tasks queue
func (h *healRoutine) queueHealTask(task healTask) {
h.tasks <- task
}
func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
// No need to wait run at full speed.
if maxIO <= 0 {
return
}
// At max 10 attempts to wait with 100 millisecond interval before proceeding
waitTick := 100 * time.Millisecond
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
maxIOFn := func() int {
return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers())
}
tmpMaxWait := maxWait
if httpServer := newHTTPServerFn(); httpServer != nil {
// Any requests in progress, delay the heal.
for httpServer.GetRequestCount() >= maxIOFn() {
if tmpMaxWait > 0 {
if tmpMaxWait < waitTick {
time.Sleep(tmpMaxWait)
} else {
time.Sleep(waitTick)
}
tmpMaxWait = tmpMaxWait - waitTick
}
if tmpMaxWait <= 0 {
if intDataUpdateTracker.debug {
logger.Info("waitForLowHTTPReq: waited max %s, resuming", maxWait)
}
break
}
}
}
}
// Wait for heal requests and process them
func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
for {
select {
case task, ok := <-h.tasks:
if !ok {
return
}
var res madmin.HealResultItem
var err error
switch task.bucket {
case nopHeal:
continue
case SlashSeparator:
res, err = healDiskFormat(ctx, objAPI, task.opts)
default:
if task.object == "" {
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts)
} else {
res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts)
}
}
task.responseCh <- healResult{result: res, err: err}
case <-h.doneCh:
return
case <-ctx.Done():
return
}
}
}
func newHealRoutine() *healRoutine {
return &healRoutine{
tasks: make(chan healTask),
doneCh: make(chan struct{}),
}
}
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) {
res, err := objAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return madmin.HealResultItem{}, err
}
return res, nil
}