mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
add cluster maintenance healthcheck drive heal affinity (#10218)
This commit is contained in:
parent
19c4f3082b
commit
6c6137b2e7
@ -799,6 +799,52 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
keepConnLive(w, r, respCh)
|
||||
}
|
||||
|
||||
func getAggregatedBackgroundHealState(ctx context.Context, failOnErr bool) (madmin.BgHealState, error) {
|
||||
var bgHealStates []madmin.BgHealState
|
||||
|
||||
// Get local heal status first
|
||||
bgHealStates = append(bgHealStates, getLocalBackgroundHealStatus())
|
||||
|
||||
if globalIsDistErasure {
|
||||
// Get heal status from other peers
|
||||
peersHealStates, nerrs := globalNotificationSys.BackgroundHealStatus()
|
||||
for _, nerr := range nerrs {
|
||||
if nerr.Err != nil {
|
||||
if failOnErr {
|
||||
return madmin.BgHealState{}, nerr.Err
|
||||
}
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
}
|
||||
}
|
||||
bgHealStates = append(bgHealStates, peersHealStates...)
|
||||
}
|
||||
|
||||
// Aggregate healing result
|
||||
var aggregatedHealStateResult = madmin.BgHealState{
|
||||
ScannedItemsCount: bgHealStates[0].ScannedItemsCount,
|
||||
LastHealActivity: bgHealStates[0].LastHealActivity,
|
||||
NextHealRound: bgHealStates[0].NextHealRound,
|
||||
HealDisks: bgHealStates[0].HealDisks,
|
||||
}
|
||||
|
||||
bgHealStates = bgHealStates[1:]
|
||||
|
||||
for _, state := range bgHealStates {
|
||||
aggregatedHealStateResult.ScannedItemsCount += state.ScannedItemsCount
|
||||
aggregatedHealStateResult.HealDisks = append(aggregatedHealStateResult.HealDisks, state.HealDisks...)
|
||||
if !state.LastHealActivity.IsZero() && aggregatedHealStateResult.LastHealActivity.Before(state.LastHealActivity) {
|
||||
aggregatedHealStateResult.LastHealActivity = state.LastHealActivity
|
||||
// The node which has the last heal activity means its
|
||||
// is the node that is orchestrating self healing operations,
|
||||
// which also means it is the same node which decides when
|
||||
// the next self healing operation will be done.
|
||||
aggregatedHealStateResult.NextHealRound = state.NextHealRound
|
||||
}
|
||||
}
|
||||
|
||||
return aggregatedHealStateResult, nil
|
||||
}
|
||||
|
||||
func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "HealBackgroundStatus")
|
||||
|
||||
@ -815,39 +861,8 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
|
||||
return
|
||||
}
|
||||
|
||||
var bgHealStates []madmin.BgHealState
|
||||
|
||||
// Get local heal status first
|
||||
bgHealStates = append(bgHealStates, getLocalBackgroundHealStatus())
|
||||
|
||||
if globalIsDistErasure {
|
||||
// Get heal status from other peers
|
||||
peersHealStates := globalNotificationSys.BackgroundHealStatus()
|
||||
bgHealStates = append(bgHealStates, peersHealStates...)
|
||||
}
|
||||
|
||||
// Aggregate healing result
|
||||
var aggregatedHealStateResult = madmin.BgHealState{
|
||||
ScannedItemsCount: bgHealStates[0].ScannedItemsCount,
|
||||
LastHealActivity: bgHealStates[0].LastHealActivity,
|
||||
NextHealRound: bgHealStates[0].NextHealRound,
|
||||
}
|
||||
|
||||
bgHealStates = bgHealStates[1:]
|
||||
|
||||
for _, state := range bgHealStates {
|
||||
aggregatedHealStateResult.ScannedItemsCount += state.ScannedItemsCount
|
||||
if !state.LastHealActivity.IsZero() && aggregatedHealStateResult.LastHealActivity.Before(state.LastHealActivity) {
|
||||
aggregatedHealStateResult.LastHealActivity = state.LastHealActivity
|
||||
// The node which has the last heal activity means its
|
||||
// is the node that is orchestrating self healing operations,
|
||||
// which also means it is the same node which decides when
|
||||
// the next self healing operation will be done.
|
||||
aggregatedHealStateResult.NextHealRound = state.NextHealRound
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(aggregatedHealStateResult); err != nil {
|
||||
aggregateHealStateResult, _ := getAggregatedBackgroundHealState(r.Context(), false)
|
||||
if err := json.NewEncoder(w).Encode(aggregateHealStateResult); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
@ -89,6 +89,7 @@ type allHealState struct {
|
||||
|
||||
// map of heal path to heal sequence
|
||||
healSeqMap map[string]*healSequence
|
||||
healLocalDisks []Endpoints
|
||||
}
|
||||
|
||||
// newHealState - initialize global heal state management
|
||||
@ -102,6 +103,22 @@ func newHealState() *allHealState {
|
||||
return healState
|
||||
}
|
||||
|
||||
func (ahs *allHealState) getHealLocalDisks() []Endpoints {
|
||||
ahs.Lock()
|
||||
defer ahs.Unlock()
|
||||
|
||||
healLocalDisks := make([]Endpoints, len(ahs.healLocalDisks))
|
||||
copy(healLocalDisks, ahs.healLocalDisks)
|
||||
return healLocalDisks
|
||||
}
|
||||
|
||||
func (ahs *allHealState) updateHealLocalDisks(eps []Endpoints) {
|
||||
ahs.Lock()
|
||||
defer ahs.Unlock()
|
||||
|
||||
ahs.healLocalDisks = eps
|
||||
}
|
||||
|
||||
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
|
||||
// Launch clean-up routine to remove this heal sequence (after
|
||||
// it ends) from the global state after timeout has elapsed.
|
||||
|
@ -90,6 +90,9 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
case task.bucket == nopHeal:
|
||||
continue
|
||||
case task.bucket == SlashSeparator:
|
||||
// Quickly check if drives need healing upon start-up
|
||||
globalBackgroundHealState.updateHealLocalDisks(getLocalDisksToHeal(objAPI))
|
||||
|
||||
res, err = healDiskFormat(ctx, objAPI, task.opts)
|
||||
case task.bucket != "" && task.object == "":
|
||||
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts.DryRun, task.opts.Remove)
|
||||
|
@ -18,6 +18,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
@ -30,6 +31,46 @@ func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
go monitorLocalDisksAndHeal(ctx, objAPI)
|
||||
}
|
||||
|
||||
func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints {
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt a heal as the server starts-up first.
|
||||
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
|
||||
for i, ep := range globalEndpoints {
|
||||
localDisksToHeal := Endpoints{}
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
// Try to connect to the current endpoint
|
||||
// and reformat if the current disk is not formatted
|
||||
_, _, err := connectEndpoint(endpoint)
|
||||
if err == errUnformattedDisk {
|
||||
localDisksToHeal = append(localDisksToHeal, endpoint)
|
||||
}
|
||||
}
|
||||
if len(localDisksToHeal) == 0 {
|
||||
continue
|
||||
}
|
||||
localDisksInZoneHeal[i] = localDisksToHeal
|
||||
}
|
||||
return localDisksInZoneHeal
|
||||
|
||||
}
|
||||
|
||||
func getDrivesToHealCount(localDisksInZoneHeal []Endpoints) int {
|
||||
var drivesToHeal int
|
||||
for _, eps := range localDisksInZoneHeal {
|
||||
for range eps {
|
||||
drivesToHeal++
|
||||
}
|
||||
}
|
||||
return drivesToHeal
|
||||
}
|
||||
|
||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||
// 1. Only the concerned erasure set will be listed and healed
|
||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||
@ -50,52 +91,42 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
localDisksInZoneHeal := globalBackgroundHealState.getHealLocalDisks()
|
||||
|
||||
drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal)
|
||||
if drivesToHeal != 0 {
|
||||
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
|
||||
drivesToHeal, defaultMonitorNewDiskInterval))
|
||||
}
|
||||
|
||||
firstTime := true
|
||||
|
||||
// Perform automatic disk healing when a disk is replaced locally.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(defaultMonitorNewDiskInterval):
|
||||
// Attempt a heal as the server starts-up first.
|
||||
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
|
||||
var healNewDisks bool
|
||||
for i, ep := range globalEndpoints {
|
||||
localDisksToHeal := Endpoints{}
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
// heal only if new disks found.
|
||||
if drivesToHeal == 0 {
|
||||
firstTime = false
|
||||
localDisksInZoneHeal = getLocalDisksToHeal(z)
|
||||
drivesToHeal = getDrivesToHealCount(localDisksInZoneHeal)
|
||||
if drivesToHeal == 0 {
|
||||
// No drives to heal.
|
||||
globalBackgroundHealState.updateHealLocalDisks(nil)
|
||||
continue
|
||||
}
|
||||
// Try to connect to the current endpoint
|
||||
// and reformat if the current disk is not formatted
|
||||
_, _, err := connectEndpoint(endpoint)
|
||||
if err == errUnformattedDisk {
|
||||
localDisksToHeal = append(localDisksToHeal, endpoint)
|
||||
}
|
||||
}
|
||||
if len(localDisksToHeal) == 0 {
|
||||
continue
|
||||
}
|
||||
localDisksInZoneHeal[i] = localDisksToHeal
|
||||
healNewDisks = true
|
||||
}
|
||||
|
||||
// Reformat disks only if needed.
|
||||
if !healNewDisks {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("New unformatted drives detected attempting to heal...")
|
||||
for i, disks := range localDisksInZoneHeal {
|
||||
for _, disk := range disks {
|
||||
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
|
||||
}
|
||||
globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal)
|
||||
}
|
||||
|
||||
if !firstTime {
|
||||
// Reformat disks
|
||||
bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
|
||||
|
||||
// Ensure that reformatting disks is finished
|
||||
bgSeq.sourceCh <- healSource{bucket: nopHeal}
|
||||
}
|
||||
|
||||
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
|
||||
// Compute the list of erasure set to heal
|
||||
@ -108,6 +139,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
printEndpointError(endpoint, err, true)
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculate the set index where the current endpoint belongs
|
||||
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
|
||||
if err != nil {
|
||||
@ -120,6 +152,13 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
erasureSetInZoneToHeal[i] = erasureSetToHeal
|
||||
}
|
||||
|
||||
logger.Info("New unformatted drives detected attempting to heal the content...")
|
||||
for i, disks := range localDisksInZoneHeal {
|
||||
for _, disk := range disks {
|
||||
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
|
||||
}
|
||||
}
|
||||
|
||||
// Heal all erasure sets that need
|
||||
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
|
||||
for _, setIndex := range erasureSetToHeal {
|
||||
@ -127,6 +166,11 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
// Only upon success reduce the counter
|
||||
if err == nil {
|
||||
drivesToHeal--
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1414,6 +1414,11 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
|
||||
refFormat, err = getFormatErasureInQuorum(tmpNewFormats)
|
||||
if err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
|
||||
// kill the monitoring loop such that we stop writing
|
||||
// to indicate that we will re-initialize everything
|
||||
// with new format.
|
||||
|
@ -2032,6 +2032,7 @@ type HealthOptions struct {
|
||||
// was queried
|
||||
type HealthResult struct {
|
||||
Healthy bool
|
||||
HealingDrives int
|
||||
ZoneID, SetID int
|
||||
WriteQuorum int
|
||||
}
|
||||
@ -2086,8 +2087,23 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if local disks are being healed, if they are being healed
|
||||
// we need to tell healthy status as 'false' so that this server
|
||||
// is not taken down for maintenance
|
||||
aggHealStateResult, err := getAggregatedBackgroundHealState(ctx, true)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to verify global heal status: %w", err))
|
||||
return HealthResult{
|
||||
Healthy: true,
|
||||
Healthy: false,
|
||||
}
|
||||
}
|
||||
|
||||
healthy := len(aggHealStateResult.HealDisks) == 0
|
||||
|
||||
return HealthResult{
|
||||
Healthy: healthy,
|
||||
HealingDrives: len(aggHealStateResult.HealDisks),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,9 +73,17 @@ func getLocalBackgroundHealStatus() madmin.BgHealState {
|
||||
return madmin.BgHealState{}
|
||||
}
|
||||
|
||||
var healDisks []string
|
||||
for _, eps := range globalBackgroundHealState.getHealLocalDisks() {
|
||||
for _, ep := range eps {
|
||||
healDisks = append(healDisks, ep.String())
|
||||
}
|
||||
}
|
||||
|
||||
return madmin.BgHealState{
|
||||
ScannedItemsCount: bgSeq.getScannedItemsCount(),
|
||||
LastHealActivity: bgSeq.lastHealActivity,
|
||||
HealDisks: healDisks,
|
||||
NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)),
|
||||
}
|
||||
}
|
||||
|
@ -269,21 +269,25 @@ func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationP
|
||||
}
|
||||
|
||||
// BackgroundHealStatus - returns background heal status of all peers
|
||||
func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState {
|
||||
func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
states := make([]madmin.BgHealState, len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(GlobalContext, func() error {
|
||||
st, err := client.BackgroundHealStatus()
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
} else {
|
||||
states[idx] = st
|
||||
return err
|
||||
}
|
||||
states[idx] = st
|
||||
return nil
|
||||
}, idx, *client.host)
|
||||
}
|
||||
|
||||
return states
|
||||
return states, ng.Wait()
|
||||
}
|
||||
|
||||
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
|
||||
|
@ -2103,9 +2103,6 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
|
||||
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
|
||||
// legacy data dir means its old content, honor system umask.
|
||||
if err = os.MkdirAll(legacyDataPath, 0777); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
@ -2116,14 +2113,11 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
|
||||
|
||||
for _, entry := range entries {
|
||||
// Skip xl.meta renames further, also ignore any directories such as `legacyDataDir`
|
||||
if entry == xlStorageFormatFile || strings.HasPrefix(entry, SlashSeparator) {
|
||||
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = os.Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
@ -2159,20 +2153,14 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
|
||||
}
|
||||
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
if srcDataPath != "" {
|
||||
removeAll(oldDstDataPath)
|
||||
removeAll(dstDataPath)
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2265,10 +2253,7 @@ func (s *xlStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (e
|
||||
}
|
||||
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
|
@ -294,6 +294,7 @@ type BgHealState struct {
|
||||
ScannedItemsCount int64
|
||||
LastHealActivity time.Time
|
||||
NextHealRound time.Time
|
||||
HealDisks []string
|
||||
}
|
||||
|
||||
// BackgroundHealStatus returns the background heal status of the
|
||||
|
Loading…
Reference in New Issue
Block a user