mirror of
https://github.com/minio/minio.git
synced 2025-01-24 21:23:15 -05:00
f23944aed7
Fixes #5659
656 lines
17 KiB
Go
656 lines
17 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
)
|
|
|
|
// healStatusSummary - overall short summary of a healing sequence
|
|
type healStatusSummary string
|
|
|
|
// healStatusSummary constants
|
|
const (
|
|
healNotStartedStatus healStatusSummary = "not started"
|
|
healRunningStatus = "running"
|
|
healStoppedStatus = "stopped"
|
|
healFinishedStatus = "finished"
|
|
)
|
|
|
|
const (
|
|
// a heal sequence with this many un-consumed heal result
|
|
// items blocks until heal-status consumption resumes or is
|
|
// aborted due to timeout.
|
|
maxUnconsumedHealResultItems = 1000
|
|
|
|
// if no heal-results are consumed (via the heal-status API)
|
|
// for this timeout duration, the heal sequence is aborted.
|
|
healUnconsumedTimeout = 24 * time.Hour
|
|
|
|
// time-duration to keep heal sequence state after it
|
|
// completes.
|
|
keepHealSeqStateDuration = time.Minute * 10
|
|
)
|
|
|
|
var (
|
|
errHealIdleTimeout = fmt.Errorf("healing results were not consumed for too long")
|
|
errHealPushStopNDiscard = fmt.Errorf("heal push stopped due to heal stop signal")
|
|
errHealStopSignalled = fmt.Errorf("heal stop signalled")
|
|
|
|
errFnHealFromAPIErr = func(err error) error {
|
|
errCode := toAPIErrorCode(err)
|
|
apiErr := getAPIError(errCode)
|
|
return fmt.Errorf("Heal internal error: %s: %s",
|
|
apiErr.Code, apiErr.Description)
|
|
}
|
|
)
|
|
|
|
// healSequenceStatus - accumulated status of the heal sequence
|
|
type healSequenceStatus struct {
|
|
// lock to update this structure as it is concurrently
|
|
// accessed
|
|
updateLock *sync.RWMutex
|
|
|
|
// summary and detail for failures
|
|
Summary healStatusSummary `json:"Summary"`
|
|
FailureDetail string `json:"Detail,omitempty"`
|
|
StartTime time.Time `json:"StartTime"`
|
|
|
|
// disk information
|
|
NumDisks int `json:"NumDisks"`
|
|
|
|
// settings for the heal sequence
|
|
HealSettings madmin.HealOpts `json:"Settings"`
|
|
|
|
// slice of available heal result records
|
|
Items []madmin.HealResultItem `json:"Items"`
|
|
}
|
|
|
|
// structure to hold state of all heal sequences in server memory
|
|
type allHealState struct {
|
|
sync.Mutex
|
|
|
|
// map of heal path to heal sequence
|
|
healSeqMap map[string]*healSequence
|
|
}
|
|
|
|
var (
|
|
// global server heal state
|
|
globalAllHealState allHealState
|
|
)
|
|
|
|
// initAllHealState - initialize healing apparatus
|
|
func initAllHealState(isErasureMode bool) {
|
|
if !isErasureMode {
|
|
return
|
|
}
|
|
|
|
globalAllHealState = allHealState{
|
|
healSeqMap: make(map[string]*healSequence),
|
|
}
|
|
}
|
|
|
|
// getHealSequence - Retrieve a heal sequence by path. The second
|
|
// argument returns if a heal sequence actually exists.
|
|
func (ahs *allHealState) getHealSequence(path string) (h *healSequence, exists bool) {
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
h, exists = ahs.healSeqMap[path]
|
|
return h, exists
|
|
}
|
|
|
|
// LaunchNewHealSequence - launches a background routine that performs
|
|
// healing according to the healSequence argument. For each heal
|
|
// sequence, state is stored in the `globalAllHealState`, which is a
|
|
// map of the heal path to `healSequence` which holds state about the
|
|
// heal sequence.
|
|
//
|
|
// Heal results are persisted in server memory for
|
|
// `keepHealSeqStateDuration`. This function also launches a
|
|
// background routine to clean up heal results after the
|
|
// aforementioned duration.
|
|
func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) (
|
|
respBytes []byte, errCode APIErrorCode, errMsg string) {
|
|
|
|
existsAndLive := false
|
|
he, exists := ahs.getHealSequence(h.path)
|
|
if exists {
|
|
if !he.hasEnded() || len(he.currentStatus.Items) > 0 {
|
|
existsAndLive = true
|
|
}
|
|
}
|
|
if existsAndLive {
|
|
// A heal sequence exists on the given path.
|
|
if h.forceStarted {
|
|
// stop the running heal sequence - wait for
|
|
// it to finish.
|
|
he.stop()
|
|
for !he.hasEnded() {
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
} else {
|
|
errMsg = "Heal is already running on the given path " +
|
|
"(use force-start option to stop and start afresh). " +
|
|
fmt.Sprintf("The heal was started by IP %s at %s",
|
|
h.clientAddress, h.startTime)
|
|
|
|
return nil, ErrHealAlreadyRunning, errMsg
|
|
}
|
|
}
|
|
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
// Check if new heal sequence to be started overlaps with any
|
|
// existing, running sequence
|
|
for k, hSeq := range ahs.healSeqMap {
|
|
if !hSeq.hasEnded() && (strings.HasPrefix(k, h.path) ||
|
|
strings.HasPrefix(h.path, k)) {
|
|
|
|
errMsg = "The provided heal sequence path overlaps with an existing " +
|
|
fmt.Sprintf("heal path: %s", k)
|
|
return nil, ErrHealOverlappingPaths, errMsg
|
|
}
|
|
}
|
|
|
|
// Add heal state and start sequence
|
|
ahs.healSeqMap[h.path] = h
|
|
|
|
// Launch top-level background heal go-routine
|
|
go h.healSequenceStart()
|
|
|
|
// Launch clean-up routine to remove this heal sequence (after
|
|
// it ends) from the global state after timeout has elapsed.
|
|
go func() {
|
|
var keepStateTimeout <-chan time.Time
|
|
ticker := time.NewTicker(time.Minute)
|
|
defer ticker.Stop()
|
|
everyMinute := ticker.C
|
|
for {
|
|
select {
|
|
// Check every minute if heal sequence has ended.
|
|
case <-everyMinute:
|
|
if h.hasEnded() {
|
|
keepStateTimeout = time.After(keepHealSeqStateDuration)
|
|
everyMinute = nil
|
|
}
|
|
|
|
// This case does not fire until the heal
|
|
// sequence completes.
|
|
case <-keepStateTimeout:
|
|
// Heal sequence has ended, keep
|
|
// results state duration has elapsed,
|
|
// so purge state.
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
delete(ahs.healSeqMap, h.path)
|
|
return
|
|
|
|
case <-globalServiceDoneCh:
|
|
// server could be restarting - need
|
|
// to exit immediately
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
b, err := json.Marshal(madmin.HealStartSuccess{
|
|
h.clientToken,
|
|
h.clientAddress,
|
|
h.startTime,
|
|
})
|
|
if err != nil {
|
|
errorIf(err, "Failed to marshal heal result into json.")
|
|
return nil, ErrInternalError, ""
|
|
}
|
|
return b, ErrNone, ""
|
|
}
|
|
|
|
// PopHealStatusJSON - Called by heal-status API. It fetches the heal
|
|
// status results from global state and returns its JSON
|
|
// representation. The clientToken helps ensure there aren't
|
|
// conflicting clients fetching status.
|
|
func (ahs *allHealState) PopHealStatusJSON(path string,
|
|
clientToken string) ([]byte, APIErrorCode) {
|
|
|
|
// fetch heal state for given path
|
|
h, exists := ahs.getHealSequence(path)
|
|
if !exists {
|
|
// If there is no such heal sequence, return error.
|
|
return nil, ErrHealNoSuchProcess
|
|
}
|
|
|
|
// Check if client-token is valid
|
|
if clientToken != h.clientToken {
|
|
return nil, ErrHealInvalidClientToken
|
|
}
|
|
|
|
// Take lock to access and update the heal-sequence
|
|
h.currentStatus.updateLock.Lock()
|
|
defer h.currentStatus.updateLock.Unlock()
|
|
|
|
numItems := len(h.currentStatus.Items)
|
|
|
|
// calculate index of most recently available heal result
|
|
// record.
|
|
lastResultIndex := h.lastSentResultIndex
|
|
if numItems > 0 {
|
|
lastResultIndex = h.currentStatus.Items[numItems-1].ResultIndex
|
|
}
|
|
|
|
// After sending status to client, and before relinquishing
|
|
// the updateLock, reset Item to nil and record the result
|
|
// index sent to the client.
|
|
defer func(i int64) {
|
|
h.lastSentResultIndex = i
|
|
h.currentStatus.Items = nil
|
|
}(lastResultIndex)
|
|
|
|
jbytes, err := json.Marshal(h.currentStatus)
|
|
if err != nil {
|
|
errorIf(err, "Failed to marshal heal result into json.")
|
|
return nil, ErrInternalError
|
|
}
|
|
|
|
return jbytes, ErrNone
|
|
}
|
|
|
|
// healSequence - state for each heal sequence initiated on the
|
|
// server.
|
|
type healSequence struct {
|
|
// bucket, and prefix on which heal seq. was initiated
|
|
bucket, objPrefix string
|
|
|
|
// path is just bucket + "/" + objPrefix
|
|
path string
|
|
|
|
// time at which heal sequence was started
|
|
startTime time.Time
|
|
|
|
// Heal client info
|
|
clientToken, clientAddress string
|
|
|
|
// was this heal sequence force started?
|
|
forceStarted bool
|
|
|
|
// heal settings applied to this heal sequence
|
|
settings madmin.HealOpts
|
|
|
|
// current accumulated status of the heal sequence
|
|
currentStatus healSequenceStatus
|
|
|
|
// channel signalled by background routine when traversal has
|
|
// completed
|
|
traverseAndHealDoneCh chan error
|
|
|
|
// channel to signal heal sequence to stop (e.g. from the
|
|
// heal-stop API)
|
|
stopSignalCh chan struct{}
|
|
|
|
// the last result index sent to client
|
|
lastSentResultIndex int64
|
|
|
|
// Holds the request-info for logging
|
|
ctx context.Context
|
|
}
|
|
|
|
// NewHealSequence - creates healSettings, assumes bucket and
|
|
// objPrefix are already validated.
|
|
func newHealSequence(bucket, objPrefix, clientAddr string,
|
|
numDisks int, hs madmin.HealOpts, forceStart bool) *healSequence {
|
|
|
|
ctx := logger.SetContext(context.Background(), &logger.ReqInfo{clientAddr, "", "", "Heal", bucket, objPrefix, nil})
|
|
|
|
return &healSequence{
|
|
bucket: bucket,
|
|
objPrefix: objPrefix,
|
|
path: bucket + "/" + objPrefix,
|
|
startTime: UTCNow(),
|
|
clientToken: mustGetUUID(),
|
|
clientAddress: clientAddr,
|
|
forceStarted: forceStart,
|
|
settings: hs,
|
|
currentStatus: healSequenceStatus{
|
|
Summary: healNotStartedStatus,
|
|
HealSettings: hs,
|
|
NumDisks: numDisks,
|
|
updateLock: &sync.RWMutex{},
|
|
},
|
|
traverseAndHealDoneCh: make(chan error),
|
|
stopSignalCh: make(chan struct{}),
|
|
ctx: ctx,
|
|
}
|
|
}
|
|
|
|
// isQuitting - determines if the heal sequence is quitting (due to an
|
|
// external signal)
|
|
func (h *healSequence) isQuitting() bool {
|
|
select {
|
|
case <-h.stopSignalCh:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// check if the heal sequence has ended
|
|
func (h *healSequence) hasEnded() bool {
|
|
h.currentStatus.updateLock.RLock()
|
|
summary := h.currentStatus.Summary
|
|
h.currentStatus.updateLock.RUnlock()
|
|
return summary == healStoppedStatus || summary == healFinishedStatus
|
|
}
|
|
|
|
// stops the heal sequence - safe to call multiple times.
|
|
func (h *healSequence) stop() {
|
|
select {
|
|
case <-h.stopSignalCh:
|
|
default:
|
|
close(h.stopSignalCh)
|
|
}
|
|
}
|
|
|
|
// pushHealResultItem - pushes a heal result item for consumption in
|
|
// the heal-status API. It blocks if there are
|
|
// maxUnconsumedHealResultItems. When it blocks, the heal sequence
|
|
// routine is effectively paused - this happens when the server has
|
|
// accumulated the maximum number of heal records per heal
|
|
// sequence. When the client consumes further records, the heal
|
|
// sequence automatically resumes. The return value indicates if the
|
|
// operation succeeded.
|
|
func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
|
|
|
|
// start a timer to keep an upper time limit to find an empty
|
|
// slot to add the given heal result - if no slot is found it
|
|
// means that the server is holding the maximum amount of
|
|
// heal-results in memory and the client has not consumed it
|
|
// for too long.
|
|
unconsumedTimer := time.NewTimer(healUnconsumedTimeout)
|
|
defer func() {
|
|
// stop the timeout timer so it is garbage collected.
|
|
if !unconsumedTimer.Stop() {
|
|
<-unconsumedTimer.C
|
|
}
|
|
}()
|
|
|
|
var itemsLen int
|
|
for {
|
|
h.currentStatus.updateLock.Lock()
|
|
itemsLen = len(h.currentStatus.Items)
|
|
if itemsLen == maxUnconsumedHealResultItems {
|
|
// unlock and wait to check again if we can push
|
|
h.currentStatus.updateLock.Unlock()
|
|
|
|
// wait for a second, or quit if an external
|
|
// stop signal is received or the
|
|
// unconsumedTimer fires.
|
|
select {
|
|
// Check after a second
|
|
case <-time.After(time.Second):
|
|
continue
|
|
|
|
case <-h.stopSignalCh:
|
|
// discard result and return.
|
|
return errHealPushStopNDiscard
|
|
|
|
// Timeout if no results consumed for too
|
|
// long.
|
|
case <-unconsumedTimer.C:
|
|
return errHealIdleTimeout
|
|
|
|
}
|
|
}
|
|
break
|
|
}
|
|
|
|
// Set the correct result index for the new result item
|
|
if itemsLen > 0 {
|
|
r.ResultIndex = 1 + h.currentStatus.Items[itemsLen-1].ResultIndex
|
|
} else {
|
|
r.ResultIndex = 1 + h.lastSentResultIndex
|
|
}
|
|
|
|
// append to results
|
|
h.currentStatus.Items = append(h.currentStatus.Items, r)
|
|
|
|
// release lock
|
|
h.currentStatus.updateLock.Unlock()
|
|
|
|
// This is a "safe" point for the heal sequence to quit if
|
|
// signalled externally.
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// healSequenceStart - this is the top-level background heal
|
|
// routine. It launches another go-routine that actually traverses
|
|
// on-disk data, checks and heals according to the selected
|
|
// settings. This go-routine itself, (1) monitors the traversal
|
|
// routine for completion, and (2) listens for external stop
|
|
// signals. When either event happens, it sets the finish status for
|
|
// the heal-sequence.
|
|
func (h *healSequence) healSequenceStart() {
|
|
// Set status as running
|
|
h.currentStatus.updateLock.Lock()
|
|
h.currentStatus.Summary = healRunningStatus
|
|
h.currentStatus.StartTime = UTCNow()
|
|
h.currentStatus.updateLock.Unlock()
|
|
|
|
go h.traverseAndHeal()
|
|
|
|
select {
|
|
case err, ok := <-h.traverseAndHealDoneCh:
|
|
h.currentStatus.updateLock.Lock()
|
|
defer h.currentStatus.updateLock.Unlock()
|
|
// Heal traversal is complete.
|
|
if ok {
|
|
// heal traversal had an error.
|
|
h.currentStatus.Summary = healStoppedStatus
|
|
h.currentStatus.FailureDetail = err.Error()
|
|
} else {
|
|
// heal traversal succeeded.
|
|
h.currentStatus.Summary = healFinishedStatus
|
|
}
|
|
|
|
case <-h.stopSignalCh:
|
|
h.currentStatus.updateLock.Lock()
|
|
h.currentStatus.Summary = healStoppedStatus
|
|
h.currentStatus.FailureDetail = errHealStopSignalled.Error()
|
|
h.currentStatus.updateLock.Unlock()
|
|
|
|
// drain traverse channel so the traversal
|
|
// go-routine does not leak.
|
|
go func() {
|
|
// Eventually the traversal go-routine closes
|
|
// the channel and returns, so this go-routine
|
|
// itself will not leak.
|
|
<-h.traverseAndHealDoneCh
|
|
}()
|
|
}
|
|
}
|
|
|
|
// traverseAndHeal - traverses on-disk data and performs healing
|
|
// according to settings. At each "safe" point it also checks if an
|
|
// external quit signal has been received and quits if so. Since the
|
|
// healing traversal may be mutating on-disk data when an external
|
|
// quit signal is received, this routine cannot quit immediately and
|
|
// has to wait until a safe point is reached, such as between scanning
|
|
// two objects.
|
|
func (h *healSequence) traverseAndHeal() {
|
|
var err error
|
|
checkErr := func(f func() error) {
|
|
switch {
|
|
case err != nil:
|
|
return
|
|
case h.isQuitting():
|
|
err = errHealStopSignalled
|
|
return
|
|
}
|
|
err = f()
|
|
}
|
|
|
|
// Start with format healing
|
|
checkErr(h.healDiskFormat)
|
|
|
|
// Heal buckets and objects
|
|
checkErr(h.healBuckets)
|
|
|
|
if err != nil {
|
|
h.traverseAndHealDoneCh <- err
|
|
}
|
|
|
|
close(h.traverseAndHealDoneCh)
|
|
}
|
|
|
|
// healDiskFormat - heals format.json, return value indicates if a
|
|
// failure error occurred.
|
|
func (h *healSequence) healDiskFormat() error {
|
|
// Get current object layer instance.
|
|
objectAPI := newObjectLayerFn()
|
|
if objectAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun)
|
|
if err != nil {
|
|
return errFnHealFromAPIErr(err)
|
|
}
|
|
|
|
peersReInitFormat(globalAdminPeers, h.settings.DryRun)
|
|
|
|
// Push format heal result
|
|
return h.pushHealResultItem(res)
|
|
}
|
|
|
|
// healBuckets - check for all buckets heal or just particular bucket.
|
|
func (h *healSequence) healBuckets() error {
|
|
// 1. If a bucket was specified, heal only the bucket.
|
|
if h.bucket != "" {
|
|
return h.healBucket(h.bucket)
|
|
}
|
|
|
|
// Get current object layer instance.
|
|
objectAPI := newObjectLayerFn()
|
|
if objectAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
buckets, err := objectAPI.ListBucketsHeal(h.ctx)
|
|
if err != nil {
|
|
return errFnHealFromAPIErr(err)
|
|
}
|
|
|
|
for _, bucket := range buckets {
|
|
if err = h.healBucket(bucket.Name); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// healBucket - traverses and heals given bucket
|
|
func (h *healSequence) healBucket(bucket string) error {
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
// Get current object layer instance.
|
|
objectAPI := newObjectLayerFn()
|
|
if objectAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun)
|
|
// push any available results before checking for error
|
|
for _, result := range results {
|
|
if perr := h.pushHealResultItem(result); perr != nil {
|
|
return perr
|
|
}
|
|
}
|
|
// handle heal-bucket error
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !h.settings.Recursive {
|
|
if h.objPrefix != "" {
|
|
// Check if an object named as the objPrefix exists,
|
|
// and if so heal it.
|
|
_, err = objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix)
|
|
if err == nil {
|
|
err = h.healObject(bucket, h.objPrefix)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
marker := ""
|
|
isTruncated := true
|
|
for isTruncated {
|
|
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, bucket,
|
|
h.objPrefix, marker, "", 1000)
|
|
if err != nil {
|
|
return errFnHealFromAPIErr(err)
|
|
}
|
|
|
|
for _, o := range objectInfos.Objects {
|
|
if err := h.healObject(o.Bucket, o.Name); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
isTruncated = objectInfos.IsTruncated
|
|
marker = objectInfos.NextMarker
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// healObject - heal the given object and record result
|
|
func (h *healSequence) healObject(bucket, object string) error {
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
// Get current object layer instance.
|
|
objectAPI := newObjectLayerFn()
|
|
if objectAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun)
|
|
if err != nil {
|
|
hri.Detail = err.Error()
|
|
}
|
|
return h.pushHealResultItem(hri)
|
|
}
|