diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 923f592fa..804b82261 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -89,13 +89,14 @@ type allHealState struct { // map of heal path to heal sequence healSeqMap map[string]*healSequence - healLocalDisks []Endpoints + healLocalDisks map[Endpoint]struct{} } // newHealState - initialize global heal state management func newHealState() *allHealState { healState := &allHealState{ - healSeqMap: make(map[string]*healSequence), + healSeqMap: make(map[string]*healSequence), + healLocalDisks: map[Endpoint]struct{}{}, } go healState.periodicHealSeqsClean(GlobalContext) @@ -103,20 +104,43 @@ func newHealState() *allHealState { return healState } -func (ahs *allHealState) getHealLocalDisks() []Endpoints { +func (ahs *allHealState) healDriveCount() int { ahs.Lock() defer ahs.Unlock() - healLocalDisks := make([]Endpoints, len(ahs.healLocalDisks)) - copy(healLocalDisks, ahs.healLocalDisks) + fmt.Println(ahs.healLocalDisks) + return len(ahs.healLocalDisks) +} + +func (ahs *allHealState) getHealLocalDisks() Endpoints { + ahs.Lock() + defer ahs.Unlock() + + var healLocalDisks Endpoints + for ep := range ahs.healLocalDisks { + healLocalDisks = append(healLocalDisks, ep) + } return healLocalDisks } -func (ahs *allHealState) updateHealLocalDisks(healLocalDisks []Endpoints) { +func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) { ahs.Lock() defer ahs.Unlock() - ahs.healLocalDisks = healLocalDisks + for _, ep := range healLocalDisks { + delete(ahs.healLocalDisks, ep) + } + fmt.Println(ahs.healLocalDisks) +} + +func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) { + ahs.Lock() + defer ahs.Unlock() + + for _, ep := range healLocalDisks { + ahs.healLocalDisks[ep] = struct{}{} + } + fmt.Println(ahs.healLocalDisks) } func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) { diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index ec4d9a1b6..10fd76b9c 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -26,7 +26,7 @@ import ( "github.com/minio/minio/cmd/logger" ) -const defaultMonitorNewDiskInterval = time.Minute * 3 +const defaultMonitorNewDiskInterval = time.Second * 10 func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { z, ok := objAPI.(*erasureZones) @@ -36,15 +36,6 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { initBackgroundHealing(ctx, objAPI) // start quick background healing - localDisksInZoneHeal := getLocalDisksToHeal(objAPI) - globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal) - - drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal) - if drivesToHeal != 0 { - logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...", - drivesToHeal, defaultMonitorNewDiskInterval)) - } - var bgSeq *healSequence var found bool @@ -56,7 +47,14 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { time.Sleep(time.Second) } - if drivesToHeal != 0 { + for _, ep := range getLocalDisksToHeal() { + globalBackgroundHealState.pushHealLocalDisks(ep) + } + + if drivesToHeal := globalBackgroundHealState.healDriveCount(); drivesToHeal > 0 { + logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...", + drivesToHeal, defaultMonitorNewDiskInterval)) + // Heal any disk format and metadata early, if possible. if err := bgSeq.healDiskMeta(); err != nil { if newObjectLayerFn() != nil { @@ -67,19 +65,11 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { } } - go monitorLocalDisksAndHeal(ctx, z, drivesToHeal, localDisksInZoneHeal, bgSeq) + go monitorLocalDisksAndHeal(ctx, z, bgSeq) } -func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints { - z, ok := objAPI.(*erasureZones) - if !ok { - return nil - } - - // Attempt a heal as the server starts-up first. - localDisksInZoneHeal := make([]Endpoints, len(z.zones)) - for i, ep := range globalEndpoints { - localDisksToHeal := Endpoints{} +func getLocalDisksToHeal() (disksToHeal Endpoints) { + for _, ep := range globalEndpoints { for _, endpoint := range ep.Endpoints { if !endpoint.IsLocal { continue @@ -88,28 +78,14 @@ func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints { // and reformat if the current disk is not formatted _, _, err := connectEndpoint(endpoint) if errors.Is(err, errUnformattedDisk) { - localDisksToHeal = append(localDisksToHeal, endpoint) + disksToHeal = append(disksToHeal, endpoint) } } - if len(localDisksToHeal) == 0 { - continue - } - localDisksInZoneHeal[i] = localDisksToHeal } - return localDisksInZoneHeal + return disksToHeal } -func getDrivesToHealCount(localDisksInZoneHeal []Endpoints) int { - var drivesToHeal int - for _, eps := range localDisksInZoneHeal { - for range eps { - drivesToHeal++ - } - } - return drivesToHeal -} - func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // Run the background healer globalBackgroundHealRoutine = newHealRoutine() @@ -121,77 +97,65 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal -func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal int, localDisksInZoneHeal []Endpoints, bgSeq *healSequence) { +func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healSequence) { // Perform automatic disk healing when a disk is replaced locally. for { select { case <-ctx.Done(): return case <-time.After(defaultMonitorNewDiskInterval): - // heal only if new disks found. - if drivesToHeal == 0 { - localDisksInZoneHeal = getLocalDisksToHeal(z) - drivesToHeal = getDrivesToHealCount(localDisksInZoneHeal) - if drivesToHeal == 0 { - // No drives to heal. - globalBackgroundHealState.updateHealLocalDisks(nil) - continue - } - globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal) + waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) + var erasureSetInZoneEndpointToHeal = make([]map[int]Endpoint, len(z.zones)) + for i := range z.zones { + erasureSetInZoneEndpointToHeal[i] = map[int]Endpoint{} + } + + healDisks := globalBackgroundHealState.getHealLocalDisks() + // heal only if new disks found. + for _, endpoint := range healDisks { logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...", - drivesToHeal)) + len(healDisks))) // Reformat disks bgSeq.sourceCh <- healSource{bucket: SlashSeparator} // Ensure that reformatting disks is finished bgSeq.sourceCh <- healSource{bucket: nopHeal} - } - var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal)) - // Compute the list of erasure set to heal - for i, localDisksToHeal := range localDisksInZoneHeal { - var erasureSetToHeal []int - for _, endpoint := range localDisksToHeal { - // Load the new format of this passed endpoint - _, format, err := connectEndpoint(endpoint) - if err != nil { - printEndpointError(endpoint, err, true) - continue - } - - // Calculate the set index where the current endpoint belongs - setIndex, _, err := findDiskIndex(z.zones[i].format, format) - if err != nil { - printEndpointError(endpoint, err, false) - continue - } - - erasureSetToHeal = append(erasureSetToHeal, setIndex) + // Load the new format of this passed endpoint + _, format, err := connectEndpoint(endpoint) + if err != nil { + printEndpointError(endpoint, err, true) + continue } - erasureSetInZoneToHeal[i] = erasureSetToHeal - } - logger.Info("New unformatted drives detected attempting to heal the content...") - for i, disks := range localDisksInZoneHeal { - for _, disk := range disks { - logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) + zoneIdx := globalEndpoints.GetLocalZoneIdx(endpoint) + if zoneIdx < 0 { + continue } + + // Calculate the set index where the current endpoint belongs + setIndex, _, err := findDiskIndex(z.zones[zoneIdx].format, format) + if err != nil { + printEndpointError(endpoint, err, false) + continue + } + + erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = endpoint } - // Heal all erasure sets that need - for i, erasureSetToHeal := range erasureSetInZoneToHeal { - for _, setIndex := range erasureSetToHeal { - err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount) - if err != nil { + for i, setMap := range erasureSetInZoneEndpointToHeal { + for setIndex, endpoint := range setMap { + logger.Info("Healing disk '%s' on %s zone", endpoint, humanize.Ordinal(i+1)) + + if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil { logger.LogIf(ctx, err) + continue } - // Only upon success reduce the counter - if err == nil { - drivesToHeal-- - } + // Only upon success pop the healed disk. + globalBackgroundHealState.popHealLocalDisks(endpoint) } } } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 8c9a5ada1..9cc447b96 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -24,6 +24,7 @@ import ( "net/url" "path" "path/filepath" + "reflect" "runtime" "strconv" "strings" @@ -203,6 +204,21 @@ type ZoneEndpoints struct { // EndpointZones - list of list of endpoints type EndpointZones []ZoneEndpoints +// GetLocalZoneIdx returns the zone which endpoint belongs to locally. +// if ep is remote this code will return -1 zoneIndex +func (l EndpointZones) GetLocalZoneIdx(ep Endpoint) int { + for i, zep := range l { + for _, cep := range zep.Endpoints { + if cep.IsLocal && ep.IsLocal { + if reflect.DeepEqual(cep, ep) { + return i + } + } + } + } + return -1 +} + // Add add zone endpoints func (l *EndpointZones) Add(zeps ZoneEndpoints) error { existSet := set.NewStringSet() diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 0e78e9b25..c8865e320 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -137,13 +137,10 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) { format, err := loadFormatErasure(disk) if err != nil { - // Close the internal connection to avoid connection leaks. - disk.Close() if errors.Is(err, errUnformattedDisk) { info, derr := disk.DiskInfo(context.TODO()) if derr != nil && info.RootDisk { - return nil, nil, fmt.Errorf("Disk: %s returned %w but its a root disk refusing to use it", - disk, derr) // make sure to '%w' to wrap the error + return nil, nil, fmt.Errorf("Disk: %s returned %w", disk, derr) // make sure to '%w' to wrap the error } } return nil, nil, fmt.Errorf("Disk: %s returned %w", disk, err) // make sure to '%w' to wrap the error @@ -213,14 +210,22 @@ func (s *erasureSets) connectDisks() { defer wg.Done() disk, format, err := connectEndpoint(endpoint) if err != nil { - printEndpointError(endpoint, err, true) + if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) { + logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint)) + globalBackgroundHealState.pushHealLocalDisks(endpoint) + } else { + printEndpointError(endpoint, err, true) + } return } setIndex, diskIndex, err := findDiskIndex(s.format, format) if err != nil { - // Close the internal connection to avoid connection leaks. - disk.Close() - printEndpointError(endpoint, err, false) + if endpoint.IsLocal { + globalBackgroundHealState.pushHealLocalDisks(endpoint) + logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal...", endpoint)) + } else { + printEndpointError(endpoint, err, false) + } return } disk.SetDiskID(format.Erasure.This) @@ -291,7 +296,9 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI { } } -const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs. +// defaultMonitorConnectEndpointInterval is the interval to monitor endpoint connections. +// Must be bigger than defaultMonitorNewDiskInterval. +const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5 // Initialize new set of erasure coded sets. func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3) (*erasureSets, error) { @@ -342,12 +349,10 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto } diskID, derr := disk.GetDiskID() if derr != nil { - disk.Close() continue } m, n, err := findDiskIndexByDiskID(format, diskID) if err != nil { - disk.Close() continue } s.endpointStrings[m*setDriveCount+n] = disk.String() @@ -1218,13 +1223,11 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) diskID, err := disk.GetDiskID() if err != nil { - disk.Close() continue } m, n, err := findDiskIndexByDiskID(refFormat, diskID) if err != nil { - disk.Close() continue } @@ -1248,17 +1251,14 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) func isTestSetup(infos []DiskInfo, errs []error) bool { rootDiskCount := 0 for i := range errs { - if errs[i] != nil && errs[i] != errUnformattedDisk { - // On any error which is not unformatted disk - // it is safer to reject healing. - return false - } - if infos[i].RootDisk { - rootDiskCount++ + if errs[i] == nil || errs[i] == errUnformattedDisk { + if infos[i].RootDisk { + rootDiskCount++ + } } } - // It is a test setup if all disks are root disks. - return rootDiskCount == len(infos) + // It is a test setup if all disks are root disks in quorum. + return rootDiskCount >= len(infos)/2+1 } func getHealDiskInfos(storageDisks []StorageAPI, errs []error) ([]DiskInfo, []error) { @@ -1321,6 +1321,19 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H // Mark all root disks down markRootDisksAsDown(storageDisks, sErrs) + refFormat, err := getFormatErasureInQuorum(formats) + if err != nil { + return res, err + } + + for i, format := range formats { + if format != nil { + if ferr := formatErasureV3Check(refFormat, format); ferr != nil { + sErrs[i] = errUnformattedDisk + } + } + } + // Prepare heal-result res = madmin.HealResultItem{ Type: madmin.HealItemMetadata, @@ -1346,11 +1359,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return res, errNoHealRequired } - refFormat, err := getFormatErasureInQuorum(formats) - if err != nil { - return res, err - } - // Mark all UUIDs which might be offline, use list // of formats to mark them appropriately. markUUIDsOffline(refFormat, formats) @@ -1424,13 +1432,11 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H diskID, err := disk.GetDiskID() if err != nil { - disk.Close() continue } m, n, err := findDiskIndexByDiskID(refFormat, diskID) if err != nil { - disk.Close() continue } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 49e024320..05e818406 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -2056,6 +2056,25 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes writeQuorum++ } + var aggHealStateResult madmin.BgHealState + if opts.Maintenance { + // check if local disks are being healed, if they are being healed + // we need to tell healthy status as 'false' so that this server + // is not taken down for maintenance + var err error + aggHealStateResult, err = getAggregatedBackgroundHealState(ctx) + if err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err)) + return HealthResult{ + Healthy: false, + } + } + + if len(aggHealStateResult.HealDisks) > 0 { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Total drives to be healed %d", len(aggHealStateResult.HealDisks))) + } + } + for zoneIdx := range erasureSetUpCount { for setIdx := range erasureSetUpCount[zoneIdx] { if erasureSetUpCount[zoneIdx][setIdx] < writeQuorum { @@ -2063,10 +2082,11 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes fmt.Errorf("Write quorum may be lost on zone: %d, set: %d, expected write quorum: %d", zoneIdx, setIdx, writeQuorum)) return HealthResult{ - Healthy: false, - ZoneID: zoneIdx, - SetID: setIdx, - WriteQuorum: writeQuorum, + Healthy: false, + HealingDrives: len(aggHealStateResult.HealDisks), + ZoneID: zoneIdx, + SetID: setIdx, + WriteQuorum: writeQuorum, } } } @@ -2081,21 +2101,6 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes } } - // check if local disks are being healed, if they are being healed - // we need to tell healthy status as 'false' so that this server - // is not taken down for maintenance - aggHealStateResult, err := getAggregatedBackgroundHealState(ctx) - if err != nil { - logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err)) - return HealthResult{ - Healthy: false, - } - } - - if len(aggHealStateResult.HealDisks) > 0 { - logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Total drives to be healed %d", len(aggHealStateResult.HealDisks))) - } - return HealthResult{ Healthy: len(aggHealStateResult.HealDisks) == 0, HealingDrives: len(aggHealStateResult.HealDisks), diff --git a/cmd/global-heal.go b/cmd/global-heal.go index a0e04720b..29a7e6004 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -73,11 +73,14 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { return madmin.BgHealState{}, false } + objAPI := newObjectLayerWithoutSafeModeFn() + if objAPI == nil { + return madmin.BgHealState{}, false + } + var healDisks []string - for _, eps := range globalBackgroundHealState.getHealLocalDisks() { - for _, ep := range eps { - healDisks = append(healDisks, ep.String()) - } + for _, ep := range getLocalDisksToHeal() { + healDisks = append(healDisks, ep.String()) } return madmin.BgHealState{ diff --git a/cmd/healthcheck-handler.go b/cmd/healthcheck-handler.go index 795d1ee87..b99ed379e 100644 --- a/cmd/healthcheck-handler.go +++ b/cmd/healthcheck-handler.go @@ -43,7 +43,9 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { } if !result.Healthy { // return how many drives are being healed if any - w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives)) + if result.HealingDrives > 0 { + w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives)) + } // As a maintenance call we are purposefully asked to be taken // down, this is for orchestrators to know if we can safely // take this server down, return appropriate error.