mirror of
https://github.com/minio/minio.git
synced 2024-12-26 23:25:54 -05:00
95bf4a57b6
Create new code paths for multiple subsystems in the code. This will make maintaing this easier later. Also introduce bugLogIf() for errors that should not happen in the first place.
915 lines
25 KiB
Go
915 lines
25 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/madmin-go/v3"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
)
|
|
|
|
// healStatusSummary - overall short summary of a healing sequence
|
|
type healStatusSummary string
|
|
|
|
// healStatusSummary constants
|
|
const (
|
|
healNotStartedStatus healStatusSummary = "not started"
|
|
healRunningStatus = "running"
|
|
healStoppedStatus = "stopped"
|
|
healFinishedStatus = "finished"
|
|
)
|
|
|
|
const (
|
|
// a heal sequence with this many un-consumed heal result
|
|
// items blocks until heal-status consumption resumes or is
|
|
// aborted due to timeout.
|
|
maxUnconsumedHealResultItems = 1000
|
|
|
|
// if no heal-results are consumed (via the heal-status API)
|
|
// for this timeout duration, the heal sequence is aborted.
|
|
healUnconsumedTimeout = 24 * time.Hour
|
|
|
|
// time-duration to keep heal sequence state after it
|
|
// completes.
|
|
keepHealSeqStateDuration = time.Minute * 10
|
|
|
|
// nopHeal is a no operating healing action to
|
|
// wait for the current healing operation to finish
|
|
nopHeal = ""
|
|
)
|
|
|
|
var (
|
|
errHealIdleTimeout = fmt.Errorf("healing results were not consumed for too long")
|
|
errHealStopSignalled = fmt.Errorf("heal stop signaled")
|
|
|
|
errFnHealFromAPIErr = func(ctx context.Context, err error) error {
|
|
apiErr := toAdminAPIErr(ctx, err)
|
|
return fmt.Errorf("Heal internal error: %s: %s",
|
|
apiErr.Code, apiErr.Description)
|
|
}
|
|
)
|
|
|
|
// healSequenceStatus - accumulated status of the heal sequence
|
|
type healSequenceStatus struct {
|
|
// summary and detail for failures
|
|
Summary healStatusSummary `json:"Summary"`
|
|
FailureDetail string `json:"Detail,omitempty"`
|
|
StartTime time.Time `json:"StartTime"`
|
|
|
|
// settings for the heal sequence
|
|
HealSettings madmin.HealOpts `json:"Settings"`
|
|
|
|
// slice of available heal result records
|
|
Items []madmin.HealResultItem `json:"Items"`
|
|
}
|
|
|
|
// structure to hold state of all heal sequences in server memory
|
|
type allHealState struct {
|
|
sync.RWMutex
|
|
|
|
// map of heal path to heal sequence
|
|
healSeqMap map[string]*healSequence // Indexed by endpoint
|
|
// keep track of the healing status of disks in the memory
|
|
// false: the disk needs to be healed but no healing routine is started
|
|
// true: the disk is currently healing
|
|
healLocalDisks map[Endpoint]bool
|
|
healStatus map[string]healingTracker // Indexed by disk ID
|
|
}
|
|
|
|
// newHealState - initialize global heal state management
|
|
func newHealState(ctx context.Context, cleanup bool) *allHealState {
|
|
hstate := &allHealState{
|
|
healSeqMap: make(map[string]*healSequence),
|
|
healLocalDisks: make(map[Endpoint]bool),
|
|
healStatus: make(map[string]healingTracker),
|
|
}
|
|
if cleanup {
|
|
go hstate.periodicHealSeqsClean(ctx)
|
|
}
|
|
return hstate
|
|
}
|
|
|
|
func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
for _, ep := range healLocalDisks {
|
|
delete(ahs.healLocalDisks, ep)
|
|
}
|
|
for id, disk := range ahs.healStatus {
|
|
for _, ep := range healLocalDisks {
|
|
if disk.Endpoint == ep.String() {
|
|
delete(ahs.healStatus, id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateHealStatus will update the heal status.
|
|
func (ahs *allHealState) updateHealStatus(tracker *healingTracker) {
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
tracker.mu.RLock()
|
|
t := *tracker
|
|
t.QueuedBuckets = append(make([]string, 0, len(tracker.QueuedBuckets)), tracker.QueuedBuckets...)
|
|
t.HealedBuckets = append(make([]string, 0, len(tracker.HealedBuckets)), tracker.HealedBuckets...)
|
|
ahs.healStatus[tracker.ID] = t
|
|
tracker.mu.RUnlock()
|
|
}
|
|
|
|
// Sort by zone, set and disk index
|
|
func sortDisks(disks []madmin.Disk) {
|
|
sort.Slice(disks, func(i, j int) bool {
|
|
a, b := &disks[i], &disks[j]
|
|
if a.PoolIndex != b.PoolIndex {
|
|
return a.PoolIndex < b.PoolIndex
|
|
}
|
|
if a.SetIndex != b.SetIndex {
|
|
return a.SetIndex < b.SetIndex
|
|
}
|
|
return a.DiskIndex < b.DiskIndex
|
|
})
|
|
}
|
|
|
|
// getLocalHealingDisks returns local healing disks indexed by endpoint.
|
|
func (ahs *allHealState) getLocalHealingDisks() map[string]madmin.HealingDisk {
|
|
ahs.RLock()
|
|
defer ahs.RUnlock()
|
|
dst := make(map[string]madmin.HealingDisk, len(ahs.healStatus))
|
|
for _, v := range ahs.healStatus {
|
|
dst[v.Endpoint] = v.toHealingDisk()
|
|
}
|
|
|
|
return dst
|
|
}
|
|
|
|
// getHealLocalDiskEndpoints() returns the list of disks that need
|
|
// to be healed but there is no healing routine in progress on them.
|
|
func (ahs *allHealState) getHealLocalDiskEndpoints() Endpoints {
|
|
ahs.RLock()
|
|
defer ahs.RUnlock()
|
|
|
|
var endpoints Endpoints
|
|
for ep, healing := range ahs.healLocalDisks {
|
|
if !healing {
|
|
endpoints = append(endpoints, ep)
|
|
}
|
|
}
|
|
return endpoints
|
|
}
|
|
|
|
// Set, in the memory, the state of the disk as currently healing or not
|
|
func (ahs *allHealState) setDiskHealingStatus(ep Endpoint, healing bool) {
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
ahs.healLocalDisks[ep] = healing
|
|
}
|
|
|
|
func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) {
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
for _, ep := range healLocalDisks {
|
|
ahs.healLocalDisks[ep] = false
|
|
}
|
|
}
|
|
|
|
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
|
// Launch clean-up routine to remove this heal sequence (after
|
|
// it ends) from the global state after timeout has elapsed.
|
|
periodicTimer := time.NewTimer(time.Minute * 5)
|
|
defer periodicTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-periodicTimer.C:
|
|
now := UTCNow()
|
|
ahs.Lock()
|
|
for path, h := range ahs.healSeqMap {
|
|
if h.hasEnded() && h.endTime.Add(keepHealSeqStateDuration).Before(now) {
|
|
delete(ahs.healSeqMap, path)
|
|
}
|
|
}
|
|
ahs.Unlock()
|
|
|
|
periodicTimer.Reset(time.Minute * 5)
|
|
case <-ctx.Done():
|
|
// server could be restarting - need
|
|
// to exit immediately
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// getHealSequenceByToken - Retrieve a heal sequence by token. The second
|
|
// argument returns if a heal sequence actually exists.
|
|
func (ahs *allHealState) getHealSequenceByToken(token string) (h *healSequence, exists bool) {
|
|
ahs.RLock()
|
|
defer ahs.RUnlock()
|
|
for _, healSeq := range ahs.healSeqMap {
|
|
if healSeq.clientToken == token {
|
|
return healSeq, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// getHealSequence - Retrieve a heal sequence by path. The second
|
|
// argument returns if a heal sequence actually exists.
|
|
func (ahs *allHealState) getHealSequence(path string) (h *healSequence, exists bool) {
|
|
ahs.RLock()
|
|
defer ahs.RUnlock()
|
|
h, exists = ahs.healSeqMap[path]
|
|
return h, exists
|
|
}
|
|
|
|
func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) {
|
|
var hsp madmin.HealStopSuccess
|
|
he, exists := ahs.getHealSequence(path)
|
|
if !exists {
|
|
hsp = madmin.HealStopSuccess{
|
|
ClientToken: "unknown",
|
|
StartTime: UTCNow(),
|
|
}
|
|
} else {
|
|
clientToken := he.clientToken
|
|
if globalIsDistErasure {
|
|
clientToken = fmt.Sprintf("%s:%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
|
}
|
|
|
|
hsp = madmin.HealStopSuccess{
|
|
ClientToken: clientToken,
|
|
ClientAddress: he.clientAddress,
|
|
StartTime: he.startTime,
|
|
}
|
|
|
|
he.stop()
|
|
for !he.hasEnded() {
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
// Heal sequence explicitly stopped, remove it.
|
|
delete(ahs.healSeqMap, path)
|
|
}
|
|
|
|
b, err := json.Marshal(&hsp)
|
|
return b, toAdminAPIErr(GlobalContext, err)
|
|
}
|
|
|
|
// LaunchNewHealSequence - launches a background routine that performs
|
|
// healing according to the healSequence argument. For each heal
|
|
// sequence, state is stored in the `globalAllHealState`, which is a
|
|
// map of the heal path to `healSequence` which holds state about the
|
|
// heal sequence.
|
|
//
|
|
// Heal results are persisted in server memory for
|
|
// `keepHealSeqStateDuration`. This function also launches a
|
|
// background routine to clean up heal results after the
|
|
// aforementioned duration.
|
|
func (ahs *allHealState) LaunchNewHealSequence(h *healSequence, objAPI ObjectLayer) (
|
|
respBytes []byte, apiErr APIError, errMsg string,
|
|
) {
|
|
if h.forceStarted {
|
|
_, apiErr = ahs.stopHealSequence(pathJoin(h.bucket, h.object))
|
|
if apiErr.Code != "" {
|
|
return respBytes, apiErr, ""
|
|
}
|
|
} else {
|
|
oh, exists := ahs.getHealSequence(pathJoin(h.bucket, h.object))
|
|
if exists && !oh.hasEnded() {
|
|
errMsg = "Heal is already running on the given path " +
|
|
"(use force-start option to stop and start afresh). " +
|
|
fmt.Sprintf("The heal was started by IP %s at %s, token is %s",
|
|
h.clientAddress, h.startTime.Format(http.TimeFormat), h.clientToken)
|
|
return nil, errorCodes.ToAPIErr(ErrHealAlreadyRunning), errMsg
|
|
}
|
|
}
|
|
|
|
ahs.Lock()
|
|
defer ahs.Unlock()
|
|
|
|
// Check if new heal sequence to be started overlaps with any
|
|
// existing, running sequence
|
|
hpath := pathJoin(h.bucket, h.object)
|
|
for k, hSeq := range ahs.healSeqMap {
|
|
if !hSeq.hasEnded() && (HasPrefix(k, hpath) || HasPrefix(hpath, k)) {
|
|
errMsg = "The provided heal sequence path overlaps with an existing " +
|
|
fmt.Sprintf("heal path: %s", k)
|
|
return nil, errorCodes.ToAPIErr(ErrHealOverlappingPaths), errMsg
|
|
}
|
|
}
|
|
|
|
// Add heal state and start sequence
|
|
ahs.healSeqMap[hpath] = h
|
|
|
|
clientToken := h.clientToken
|
|
if globalIsDistErasure {
|
|
clientToken = fmt.Sprintf("%s:%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
|
}
|
|
|
|
if h.clientToken == bgHealingUUID {
|
|
// For background heal do nothing, do not spawn an unnecessary goroutine.
|
|
} else {
|
|
// Launch top-level background heal go-routine
|
|
go h.healSequenceStart(objAPI)
|
|
}
|
|
|
|
b, err := json.Marshal(madmin.HealStartSuccess{
|
|
ClientToken: clientToken,
|
|
ClientAddress: h.clientAddress,
|
|
StartTime: h.startTime,
|
|
})
|
|
if err != nil {
|
|
bugLogIf(h.ctx, err)
|
|
return nil, toAdminAPIErr(h.ctx, err), ""
|
|
}
|
|
return b, noError, ""
|
|
}
|
|
|
|
// PopHealStatusJSON - Called by heal-status API. It fetches the heal
|
|
// status results from global state and returns its JSON
|
|
// representation. The clientToken helps ensure there aren't
|
|
// conflicting clients fetching status.
|
|
func (ahs *allHealState) PopHealStatusJSON(hpath string,
|
|
clientToken string) ([]byte, APIErrorCode,
|
|
) {
|
|
// fetch heal state for given path
|
|
h, exists := ahs.getHealSequence(hpath)
|
|
if !exists {
|
|
// heal sequence doesn't exist, must have finished.
|
|
jbytes, err := json.Marshal(healSequenceStatus{
|
|
Summary: healFinishedStatus,
|
|
})
|
|
return jbytes, toAdminAPIErrCode(GlobalContext, err)
|
|
}
|
|
|
|
// Check if client-token is valid
|
|
if clientToken != h.clientToken {
|
|
return nil, ErrHealInvalidClientToken
|
|
}
|
|
|
|
// Take lock to access and update the heal-sequence
|
|
h.mutex.Lock()
|
|
defer h.mutex.Unlock()
|
|
|
|
numItems := len(h.currentStatus.Items)
|
|
|
|
// calculate index of most recently available heal result
|
|
// record.
|
|
lastResultIndex := h.lastSentResultIndex
|
|
if numItems > 0 {
|
|
lastResultIndex = h.currentStatus.Items[numItems-1].ResultIndex
|
|
}
|
|
|
|
h.lastSentResultIndex = lastResultIndex
|
|
|
|
jbytes, err := json.Marshal(h.currentStatus)
|
|
if err != nil {
|
|
h.currentStatus.Items = nil
|
|
|
|
bugLogIf(h.ctx, err)
|
|
return nil, ErrInternalError
|
|
}
|
|
|
|
h.currentStatus.Items = nil
|
|
|
|
return jbytes, ErrNone
|
|
}
|
|
|
|
// healSource denotes single entity and heal option.
|
|
type healSource struct {
|
|
bucket string
|
|
object string
|
|
versionID string
|
|
noWait bool // a non blocking call, if task queue is full return right away.
|
|
opts *madmin.HealOpts // optional heal option overrides default setting
|
|
}
|
|
|
|
// healSequence - state for each heal sequence initiated on the
|
|
// server.
|
|
type healSequence struct {
|
|
// bucket, and object on which heal seq. was initiated
|
|
bucket, object string
|
|
|
|
// Report healing progress
|
|
reportProgress bool
|
|
|
|
// time at which heal sequence was started
|
|
startTime time.Time
|
|
|
|
// time at which heal sequence has ended
|
|
endTime time.Time
|
|
|
|
// Heal client info
|
|
clientToken, clientAddress string
|
|
|
|
// was this heal sequence force started?
|
|
forceStarted bool
|
|
|
|
// heal settings applied to this heal sequence
|
|
settings madmin.HealOpts
|
|
|
|
// current accumulated status of the heal sequence
|
|
currentStatus healSequenceStatus
|
|
|
|
// channel signaled by background routine when traversal has
|
|
// completed
|
|
traverseAndHealDoneCh chan error
|
|
|
|
// canceler to cancel heal sequence.
|
|
cancelCtx context.CancelFunc
|
|
|
|
// the last result index sent to client
|
|
lastSentResultIndex int64
|
|
|
|
// Number of total items scanned against item type
|
|
scannedItemsMap map[madmin.HealItemType]int64
|
|
|
|
// Number of total items healed against item type
|
|
healedItemsMap map[madmin.HealItemType]int64
|
|
|
|
// Number of total items where healing failed against endpoint and drive state
|
|
healFailedItemsMap map[string]int64
|
|
|
|
// The time of the last scan/heal activity
|
|
lastHealActivity time.Time
|
|
|
|
// Holds the request-info for logging
|
|
ctx context.Context
|
|
|
|
// used to lock this structure as it is concurrently accessed
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewHealSequence - creates healSettings, assumes bucket and
|
|
// objPrefix are already validated.
|
|
func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
|
hs madmin.HealOpts, forceStart bool,
|
|
) *healSequence {
|
|
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
|
|
reqInfo.AppendTags("prefix", objPrefix)
|
|
ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo))
|
|
|
|
clientToken := mustGetUUID()
|
|
|
|
return &healSequence{
|
|
bucket: bucket,
|
|
object: objPrefix,
|
|
reportProgress: true,
|
|
startTime: UTCNow(),
|
|
clientToken: clientToken,
|
|
clientAddress: clientAddr,
|
|
forceStarted: forceStart,
|
|
settings: hs,
|
|
currentStatus: healSequenceStatus{
|
|
Summary: healNotStartedStatus,
|
|
HealSettings: hs,
|
|
},
|
|
traverseAndHealDoneCh: make(chan error),
|
|
cancelCtx: cancel,
|
|
ctx: ctx,
|
|
scannedItemsMap: make(map[madmin.HealItemType]int64),
|
|
healedItemsMap: make(map[madmin.HealItemType]int64),
|
|
healFailedItemsMap: make(map[string]int64),
|
|
}
|
|
}
|
|
|
|
// getScannedItemsCount - returns a count of all scanned items
|
|
func (h *healSequence) getScannedItemsCount() int64 {
|
|
var count int64
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
|
|
for _, v := range h.scannedItemsMap {
|
|
count += v
|
|
}
|
|
return count
|
|
}
|
|
|
|
// getScannedItemsMap - returns map of all scanned items against type
|
|
func (h *healSequence) getScannedItemsMap() map[madmin.HealItemType]int64 {
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
|
|
// Make a copy before returning the value
|
|
retMap := make(map[madmin.HealItemType]int64, len(h.scannedItemsMap))
|
|
for k, v := range h.scannedItemsMap {
|
|
retMap[k] = v
|
|
}
|
|
|
|
return retMap
|
|
}
|
|
|
|
// getHealedItemsMap - returns the map of all healed items against type
|
|
func (h *healSequence) getHealedItemsMap() map[madmin.HealItemType]int64 {
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
|
|
// Make a copy before returning the value
|
|
retMap := make(map[madmin.HealItemType]int64, len(h.healedItemsMap))
|
|
for k, v := range h.healedItemsMap {
|
|
retMap[k] = v
|
|
}
|
|
|
|
return retMap
|
|
}
|
|
|
|
// getHealFailedItemsMap - returns map of all items where heal failed against
|
|
// drive endpoint and status
|
|
func (h *healSequence) getHealFailedItemsMap() map[string]int64 {
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
|
|
// Make a copy before returning the value
|
|
retMap := make(map[string]int64, len(h.healFailedItemsMap))
|
|
for k, v := range h.healFailedItemsMap {
|
|
retMap[k] = v
|
|
}
|
|
|
|
return retMap
|
|
}
|
|
|
|
func (h *healSequence) countFailed(res madmin.HealResultItem) {
|
|
h.mutex.Lock()
|
|
defer h.mutex.Unlock()
|
|
|
|
for _, d := range res.After.Drives {
|
|
// For failed items we report the endpoint and drive state
|
|
// This will help users take corrective actions for drives
|
|
h.healFailedItemsMap[d.Endpoint+","+d.State]++
|
|
}
|
|
|
|
h.lastHealActivity = UTCNow()
|
|
}
|
|
|
|
func (h *healSequence) countHeals(healType madmin.HealItemType, healed bool) {
|
|
h.mutex.Lock()
|
|
defer h.mutex.Unlock()
|
|
|
|
if !healed {
|
|
h.scannedItemsMap[healType]++
|
|
} else {
|
|
h.healedItemsMap[healType]++
|
|
}
|
|
|
|
h.lastHealActivity = UTCNow()
|
|
}
|
|
|
|
// isQuitting - determines if the heal sequence is quitting (due to an
|
|
// external signal)
|
|
func (h *healSequence) isQuitting() bool {
|
|
select {
|
|
case <-h.ctx.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// check if the heal sequence has ended
|
|
func (h *healSequence) hasEnded() bool {
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
// background heal never ends
|
|
if h.clientToken == bgHealingUUID {
|
|
return false
|
|
}
|
|
return !h.endTime.IsZero()
|
|
}
|
|
|
|
// stops the heal sequence - safe to call multiple times.
|
|
func (h *healSequence) stop() {
|
|
h.cancelCtx()
|
|
}
|
|
|
|
// pushHealResultItem - pushes a heal result item for consumption in
|
|
// the heal-status API. It blocks if there are
|
|
// maxUnconsumedHealResultItems. When it blocks, the heal sequence
|
|
// routine is effectively paused - this happens when the server has
|
|
// accumulated the maximum number of heal records per heal
|
|
// sequence. When the client consumes further records, the heal
|
|
// sequence automatically resumes. The return value indicates if the
|
|
// operation succeeded.
|
|
func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
|
|
// start a timer to keep an upper time limit to find an empty
|
|
// slot to add the given heal result - if no slot is found it
|
|
// means that the server is holding the maximum amount of
|
|
// heal-results in memory and the client has not consumed it
|
|
// for too long.
|
|
unconsumedTimer := time.NewTimer(healUnconsumedTimeout)
|
|
defer unconsumedTimer.Stop()
|
|
|
|
var itemsLen int
|
|
for {
|
|
h.mutex.Lock()
|
|
itemsLen = len(h.currentStatus.Items)
|
|
if itemsLen == maxUnconsumedHealResultItems {
|
|
// wait for a second, or quit if an external
|
|
// stop signal is received or the
|
|
// unconsumedTimer fires.
|
|
select {
|
|
// Check after a second
|
|
case <-time.After(time.Second):
|
|
h.mutex.Unlock()
|
|
continue
|
|
|
|
case <-h.ctx.Done():
|
|
h.mutex.Unlock()
|
|
// discard result and return.
|
|
return errHealStopSignalled
|
|
|
|
// Timeout if no results consumed for too long.
|
|
case <-unconsumedTimer.C:
|
|
h.mutex.Unlock()
|
|
return errHealIdleTimeout
|
|
}
|
|
}
|
|
break
|
|
}
|
|
|
|
// Set the correct result index for the new result item
|
|
if itemsLen > 0 {
|
|
r.ResultIndex = 1 + h.currentStatus.Items[itemsLen-1].ResultIndex
|
|
} else {
|
|
r.ResultIndex = 1 + h.lastSentResultIndex
|
|
}
|
|
|
|
// append to results
|
|
h.currentStatus.Items = append(h.currentStatus.Items, r)
|
|
|
|
// release lock
|
|
h.mutex.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// healSequenceStart - this is the top-level background heal
|
|
// routine. It launches another go-routine that actually traverses
|
|
// on-disk data, checks and heals according to the selected
|
|
// settings. This go-routine itself, (1) monitors the traversal
|
|
// routine for completion, and (2) listens for external stop
|
|
// signals. When either event happens, it sets the finish status for
|
|
// the heal-sequence.
|
|
func (h *healSequence) healSequenceStart(objAPI ObjectLayer) {
|
|
// Set status as running
|
|
h.mutex.Lock()
|
|
h.currentStatus.Summary = healRunningStatus
|
|
h.currentStatus.StartTime = UTCNow()
|
|
h.mutex.Unlock()
|
|
|
|
go h.traverseAndHeal(objAPI)
|
|
|
|
select {
|
|
case err, ok := <-h.traverseAndHealDoneCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
h.mutex.Lock()
|
|
h.endTime = UTCNow()
|
|
// Heal traversal is complete.
|
|
if err == nil {
|
|
// heal traversal succeeded.
|
|
h.currentStatus.Summary = healFinishedStatus
|
|
} else {
|
|
// heal traversal had an error.
|
|
h.currentStatus.Summary = healStoppedStatus
|
|
h.currentStatus.FailureDetail = err.Error()
|
|
}
|
|
h.mutex.Unlock()
|
|
case <-h.ctx.Done():
|
|
h.mutex.Lock()
|
|
h.endTime = UTCNow()
|
|
h.currentStatus.Summary = healFinishedStatus
|
|
h.mutex.Unlock()
|
|
|
|
// drain traverse channel so the traversal
|
|
// go-routine does not leak.
|
|
go func() {
|
|
// Eventually the traversal go-routine closes
|
|
// the channel and returns, so this go-routine
|
|
// itself will not leak.
|
|
<-h.traverseAndHealDoneCh
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
|
|
// Send heal request
|
|
task := healTask{
|
|
bucket: source.bucket,
|
|
object: source.object,
|
|
versionID: source.versionID,
|
|
opts: h.settings,
|
|
}
|
|
if source.opts != nil {
|
|
task.opts = *source.opts
|
|
} else {
|
|
task.opts.ScanMode = madmin.HealNormalScan
|
|
}
|
|
|
|
h.countHeals(healType, false)
|
|
|
|
if source.noWait {
|
|
select {
|
|
case globalBackgroundHealRoutine.tasks <- task:
|
|
if serverDebugLog {
|
|
fmt.Printf("Task in the queue: %#v\n", task)
|
|
}
|
|
default:
|
|
// task queue is full, no more workers, we shall move on and heal later.
|
|
return nil
|
|
}
|
|
// Don't wait for result
|
|
return nil
|
|
}
|
|
|
|
// respCh must be set to wait for result.
|
|
// We make it size 1, so a result can always be written
|
|
// even if we aren't listening.
|
|
task.respCh = make(chan healResult, 1)
|
|
select {
|
|
case globalBackgroundHealRoutine.tasks <- task:
|
|
if serverDebugLog {
|
|
fmt.Printf("Task in the queue: %#v\n", task)
|
|
}
|
|
case <-h.ctx.Done():
|
|
return nil
|
|
}
|
|
|
|
// task queued, now wait for the response.
|
|
select {
|
|
case res := <-task.respCh:
|
|
if !h.reportProgress {
|
|
if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal
|
|
return nil
|
|
}
|
|
|
|
// Report caller of any failure
|
|
return res.err
|
|
}
|
|
res.result.Type = healType
|
|
if res.err != nil {
|
|
res.result.Detail = res.err.Error()
|
|
}
|
|
return h.pushHealResultItem(res.result)
|
|
case <-h.ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (h *healSequence) healDiskMeta(objAPI ObjectLayer) error {
|
|
// Start healing the config prefix.
|
|
return h.healMinioSysMeta(objAPI, minioConfigPrefix)()
|
|
}
|
|
|
|
func (h *healSequence) healItems(objAPI ObjectLayer, bucketsOnly bool) error {
|
|
if h.clientToken == bgHealingUUID {
|
|
// For background heal do nothing.
|
|
return nil
|
|
}
|
|
|
|
if err := h.healDiskMeta(objAPI); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Heal buckets and objects
|
|
return h.healBuckets(objAPI, bucketsOnly)
|
|
}
|
|
|
|
// traverseAndHeal - traverses on-disk data and performs healing
|
|
// according to settings. At each "safe" point it also checks if an
|
|
// external quit signal has been received and quits if so. Since the
|
|
// healing traversal may be mutating on-disk data when an external
|
|
// quit signal is received, this routine cannot quit immediately and
|
|
// has to wait until a safe point is reached, such as between scanning
|
|
// two objects.
|
|
func (h *healSequence) traverseAndHeal(objAPI ObjectLayer) {
|
|
bucketsOnly := false // Heals buckets and objects also.
|
|
h.traverseAndHealDoneCh <- h.healItems(objAPI, bucketsOnly)
|
|
xioutil.SafeClose(h.traverseAndHealDoneCh)
|
|
}
|
|
|
|
// healMinioSysMeta - heals all files under a given meta prefix, returns a function
|
|
// which in-turn heals the respective meta directory path and any files in int.
|
|
func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) func() error {
|
|
return func() error {
|
|
// NOTE: Healing on meta is run regardless
|
|
// of any bucket being selected, this is to ensure that
|
|
// meta are always upto date and correct.
|
|
return objAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, h.settings, func(bucket, object, versionID string, scanMode madmin.HealScanMode) error {
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
err := h.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: object,
|
|
versionID: versionID,
|
|
}, madmin.HealItemBucketMetadata)
|
|
return err
|
|
})
|
|
}
|
|
}
|
|
|
|
// healBuckets - check for all buckets heal or just particular bucket.
|
|
func (h *healSequence) healBuckets(objAPI ObjectLayer, bucketsOnly bool) error {
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
// 1. If a bucket was specified, heal only the bucket.
|
|
if h.bucket != "" {
|
|
return h.healBucket(objAPI, h.bucket, bucketsOnly)
|
|
}
|
|
|
|
buckets, err := objAPI.ListBuckets(h.ctx, BucketOptions{})
|
|
if err != nil {
|
|
return errFnHealFromAPIErr(h.ctx, err)
|
|
}
|
|
|
|
// Heal latest buckets first.
|
|
sort.Slice(buckets, func(i, j int) bool {
|
|
return buckets[i].Created.After(buckets[j].Created)
|
|
})
|
|
|
|
for _, bucket := range buckets {
|
|
if err = h.healBucket(objAPI, bucket.Name, bucketsOnly); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// healBucket - traverses and heals given bucket
|
|
func (h *healSequence) healBucket(objAPI ObjectLayer, bucket string, bucketsOnly bool) error {
|
|
if err := h.queueHealTask(healSource{bucket: bucket}, madmin.HealItemBucket); err != nil {
|
|
return err
|
|
}
|
|
|
|
if bucketsOnly {
|
|
return nil
|
|
}
|
|
|
|
if !h.settings.Recursive {
|
|
if h.object != "" {
|
|
if err := h.healObject(bucket, h.object, "", h.settings.ScanMode); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := objAPI.HealObjects(h.ctx, bucket, h.object, h.settings, h.healObject); err != nil {
|
|
return errFnHealFromAPIErr(h.ctx, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// healObject - heal the given object and record result
|
|
func (h *healSequence) healObject(bucket, object, versionID string, scanMode madmin.HealScanMode) error {
|
|
if h.isQuitting() {
|
|
return errHealStopSignalled
|
|
}
|
|
|
|
err := h.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: object,
|
|
versionID: versionID,
|
|
opts: &h.settings,
|
|
}, madmin.HealItemObject)
|
|
|
|
// Wait and proceed if there are active requests
|
|
waitForLowHTTPReq()
|
|
|
|
return err
|
|
}
|