diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index eef0ee8f5..04f346cc4 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -91,8 +91,11 @@ type allHealState struct { sync.RWMutex // map of heal path to heal sequence - healSeqMap map[string]*healSequence // Indexed by endpoint - healLocalDisks map[Endpoint]struct{} + healSeqMap map[string]*healSequence // Indexed by endpoint + // keep track of the healing status of disks in the memory + // false: the disk needs to be healed but no healing routine is started + // true: the disk is currently healing + healLocalDisks map[Endpoint]bool healStatus map[string]healingTracker // Indexed by disk ID } @@ -100,7 +103,7 @@ type allHealState struct { func newHealState(cleanup bool) *allHealState { hstate := &allHealState{ healSeqMap: make(map[string]*healSequence), - healLocalDisks: map[Endpoint]struct{}{}, + healLocalDisks: make(map[Endpoint]bool), healStatus: make(map[string]healingTracker), } if cleanup { @@ -109,13 +112,6 @@ func newHealState(cleanup bool) *allHealState { return hstate } -func (ahs *allHealState) healDriveCount() int { - ahs.RLock() - defer ahs.RUnlock() - - return len(ahs.healLocalDisks) -} - func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) { ahs.Lock() defer ahs.Unlock() @@ -165,23 +161,34 @@ func (ahs *allHealState) getLocalHealingDisks() map[string]madmin.HealingDisk { return dst } +// getHealLocalDiskEndpoints() returns the list of disks that need +// to be healed but there is no healing routine in progress on them. func (ahs *allHealState) getHealLocalDiskEndpoints() Endpoints { ahs.RLock() defer ahs.RUnlock() var endpoints Endpoints - for ep := range ahs.healLocalDisks { - endpoints = append(endpoints, ep) + for ep, healing := range ahs.healLocalDisks { + if !healing { + endpoints = append(endpoints, ep) + } } return endpoints } +func (ahs *allHealState) markDiskForHealing(ep Endpoint) { + ahs.Lock() + defer ahs.Unlock() + + ahs.healLocalDisks[ep] = true +} + func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) { ahs.Lock() defer ahs.Unlock() for _, ep := range healLocalDisks { - ahs.healLocalDisks[ep] = struct{}{} + ahs.healLocalDisks[ep] = false } } @@ -804,16 +811,6 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f } } -// healDiskFormat - heals format.json, return value indicates if a -// failure error occurred. -func (h *healSequence) healDiskFormat() error { - if h.isQuitting() { - return errHealStopSignalled - } - - return h.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata) -} - // healBuckets - check for all buckets heal or just particular bucket. func (h *healSequence) healBuckets(objAPI ObjectLayer, bucketsOnly bool) error { if h.isQuitting() { diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 590bf3d75..4758c9727 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -26,15 +26,12 @@ import ( "os" "sort" "strings" - "sync" "time" "github.com/dustin/go-humanize" "github.com/minio/madmin-go" "github.com/minio/minio-go/v7/pkg/set" - "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/logger" - "github.com/minio/pkg/console" ) const ( @@ -258,26 +255,9 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { initBackgroundHealing(ctx, objAPI) // start quick background healing - bgSeq := mustGetHealSequence(ctx) - globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) - if drivesToHeal := globalBackgroundHealState.healDriveCount(); drivesToHeal > 0 { - logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content - use 'mc admin heal alias/ --verbose' to check the status", - drivesToHeal, defaultMonitorNewDiskInterval)) - - // Heal any disk format and metadata early, if possible. - // Start with format healing - if err := bgSeq.healDiskFormat(); err != nil { - if newObjectLayerFn() != nil { - // log only in situations, when object layer - // has fully initialized. - logger.LogIf(bgSeq.ctx, err) - } - } - } - - go monitorLocalDisksAndHeal(ctx, z, bgSeq) + go monitorLocalDisksAndHeal(ctx, z) } func getLocalDisksToHeal() (disksToHeal Endpoints) { @@ -299,10 +279,108 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { return disksToHeal } +var newDiskHealingTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) + +func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint) error { + logger.Info(fmt.Sprintf("Proceeding to heal '%s' - 'mc admin heal alias/ --verbose' to check the status.", endpoint)) + + disk, format, err := connectEndpoint(endpoint) + if err != nil { + return fmt.Errorf("Error: %w, %s", err, endpoint) + } + + poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint()) + if poolIdx < 0 { + return fmt.Errorf("unexpected pool index (%d) found in %s", poolIdx, disk.Endpoint()) + } + + // Calculate the set index where the current endpoint belongs + z.serverPools[poolIdx].erasureDisksMu.RLock() + setIdx, _, err := findDiskIndex(z.serverPools[poolIdx].format, format) + z.serverPools[poolIdx].erasureDisksMu.RUnlock() + if err != nil { + return err + } + if setIdx < 0 { + return fmt.Errorf("unexpected set index (%d) found in %s", setIdx, disk.Endpoint()) + } + + // Prevent parallel erasure set healing + locker := z.NewNSLock(minioMetaBucket, fmt.Sprintf("new-disk-healing/%s/%d/%d", endpoint, poolIdx, setIdx)) + lkctx, err := locker.GetLock(ctx, newDiskHealingTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer locker.Unlock(lkctx.Cancel) + + buckets, _ := z.ListBuckets(ctx) + + // Buckets data are dispersed in multiple zones/sets, make + // sure to heal all bucket metadata configuration. + buckets = append(buckets, BucketInfo{ + Name: pathJoin(minioMetaBucket, minioConfigPrefix), + }, BucketInfo{ + Name: pathJoin(minioMetaBucket, bucketMetaPrefix), + }) + + // Heal latest buckets first. + sort.Slice(buckets, func(i, j int) bool { + a, b := strings.HasPrefix(buckets[i].Name, minioMetaBucket), strings.HasPrefix(buckets[j].Name, minioMetaBucket) + if a != b { + return a + } + return buckets[i].Created.After(buckets[j].Created) + }) + + if serverDebugLog { + logger.Info("Healing disk '%v' on %s pool", disk, humanize.Ordinal(poolIdx+1)) + } + + // Load healing tracker in this disk + tracker, err := loadHealingTracker(ctx, disk) + if err != nil { + // So someone changed the drives underneath, healing tracker missing. + logger.LogIf(ctx, fmt.Errorf("Healing tracker missing on '%s', disk was swapped again on %s pool: %w", + disk, humanize.Ordinal(poolIdx+1), err)) + tracker = newHealingTracker(disk) + } + + // Load bucket totals + cache := dataUsageCache{} + if err := cache.load(ctx, z.serverPools[poolIdx].sets[setIdx], dataUsageCacheName); err == nil { + dataUsageInfo := cache.dui(dataUsageRoot, nil) + tracker.ObjectsTotalCount = dataUsageInfo.ObjectsTotalCount + tracker.ObjectsTotalSize = dataUsageInfo.ObjectsTotalSize + } + + tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc() + tracker.setQueuedBuckets(buckets) + if err := tracker.save(ctx); err != nil { + return err + } + + // Start or resume healing of this erasure set + err = z.serverPools[poolIdx].sets[setIdx].healErasureSet(ctx, tracker.QueuedBuckets, tracker) + if err != nil { + return err + } + + logger.Info("Healing disk '%s' is complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed) + + if serverDebugLog { + tracker.printTo(os.Stdout) + logger.Info("\n") + } + + logger.LogIf(ctx, tracker.delete(ctx)) + return nil +} + // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal -func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq *healSequence) { +func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools) { // Perform automatic disk healing when a disk is replaced locally. diskCheckTimer := time.NewTimer(defaultMonitorNewDiskInterval) defer diskCheckTimer.Stop() @@ -312,139 +390,35 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq case <-ctx.Done(): return case <-diskCheckTimer.C: - var erasureSetInPoolDisksToHeal []map[int][]StorageAPI healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints() - if len(healDisks) > 0 { - // Reformat disks - bgSeq.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata) - - // Ensure that reformatting disks is finished - bgSeq.queueHealTask(healSource{bucket: nopHeal}, madmin.HealItemMetadata) - - logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal - 'mc admin heal alias/ --verbose' to check the status.", - len(healDisks))) - - erasureSetInPoolDisksToHeal = make([]map[int][]StorageAPI, len(z.serverPools)) - for i := range z.serverPools { - erasureSetInPoolDisksToHeal[i] = map[int][]StorageAPI{} - } + if len(healDisks) == 0 { + // Reset for next interval. + diskCheckTimer.Reset(defaultMonitorNewDiskInterval) + break } - if serverDebugLog && len(healDisks) > 0 { - console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks)) + // Reformat disks immediately + _, err := z.HealFormat(context.Background(), false) + if err != nil && !errors.Is(err, errNoHealRequired) { + logger.LogIf(ctx, err) + // Reset for next interval. + diskCheckTimer.Reset(defaultMonitorNewDiskInterval) + break } - // heal only if new disks found. - for _, endpoint := range healDisks { - disk, format, err := connectEndpoint(endpoint) - if err != nil { - printEndpointError(endpoint, err, true) - continue - } - - poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint()) - if poolIdx < 0 { - continue - } - - // Calculate the set index where the current endpoint belongs - z.serverPools[poolIdx].erasureDisksMu.RLock() - // Protect reading reference format. - setIndex, _, err := findDiskIndex(z.serverPools[poolIdx].format, format) - z.serverPools[poolIdx].erasureDisksMu.RUnlock() - if err != nil { - printEndpointError(endpoint, err, false) - continue - } - - erasureSetInPoolDisksToHeal[poolIdx][setIndex] = append(erasureSetInPoolDisksToHeal[poolIdx][setIndex], disk) - } - - buckets, _ := z.ListBuckets(ctx) - - // Buckets data are dispersed in multiple zones/sets, make - // sure to heal all bucket metadata configuration. - buckets = append(buckets, BucketInfo{ - Name: pathJoin(minioMetaBucket, minioConfigPrefix), - }, BucketInfo{ - Name: pathJoin(minioMetaBucket, bucketMetaPrefix), - }) - - // Heal latest buckets first. - sort.Slice(buckets, func(i, j int) bool { - a, b := strings.HasPrefix(buckets[i].Name, minioMetaBucket), strings.HasPrefix(buckets[j].Name, minioMetaBucket) - if a != b { - return a - } - return buckets[i].Created.After(buckets[j].Created) - }) - - // TODO(klauspost): This will block until all heals are done, - // in the future this should be able to start healing other sets at once. - var wg sync.WaitGroup - for i, setMap := range erasureSetInPoolDisksToHeal { - i := i - for setIndex, disks := range setMap { - if len(disks) == 0 { - continue + for _, disk := range healDisks { + go func(disk Endpoint) { + globalBackgroundHealState.markDiskForHealing(disk) + err := healFreshDisk(ctx, z, disk) + if err != nil { + printEndpointError(disk, err, false) + return } - wg.Add(1) - go func(setIndex int, disks []StorageAPI) { - defer wg.Done() - for _, disk := range disks { - if serverDebugLog { - logger.Info("Healing disk '%v' on %s pool", disk, humanize.Ordinal(i+1)) - } - - // So someone changed the drives underneath, healing tracker missing. - tracker, err := loadHealingTracker(ctx, disk) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Healing tracker missing on '%s', disk was swapped again on %s pool: %w", - disk, humanize.Ordinal(i+1), err)) - tracker = newHealingTracker(disk) - } - - // Load bucket totals - cache := dataUsageCache{} - if err := cache.load(ctx, z.serverPools[i].sets[setIndex], dataUsageCacheName); err == nil { - dataUsageInfo := cache.dui(dataUsageRoot, nil) - tracker.ObjectsTotalCount = dataUsageInfo.ObjectsTotalCount - tracker.ObjectsTotalSize = dataUsageInfo.ObjectsTotalSize - } - - tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc() - tracker.setQueuedBuckets(buckets) - if err := tracker.save(ctx); err != nil { - logger.LogIf(ctx, err) - // Unable to write healing tracker, permission denied or some - // other unexpected error occurred. Proceed to look for new - // disks to be healed again, we cannot proceed further. - return - } - - err = z.serverPools[i].sets[setIndex].healErasureSet(ctx, tracker.QueuedBuckets, tracker) - if err != nil { - logger.LogIf(ctx, err) - continue - } - - if serverDebugLog { - logger.Info("Healing disk '%s' on %s pool, %s set complete", disk, - humanize.Ordinal(i+1), humanize.Ordinal(setIndex+1)) - logger.Info("Summary:\n") - tracker.printTo(os.Stdout) - logger.Info("\n") - } - logger.LogIf(ctx, tracker.delete(ctx)) - - // Only upon success pop the healed disk. - globalBackgroundHealState.popHealLocalDisks(disk.Endpoint()) - } - }(setIndex, disks) - } + // Only upon success pop the healed disk. + globalBackgroundHealState.popHealLocalDisks(disk) + }(disk) } - wg.Wait() // Reset for next interval. diskCheckTimer.Reset(defaultMonitorNewDiskInterval) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index 47f7bb319..423ce19fc 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -151,6 +151,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve errVolumeNotFound, errFileVersionNotFound, errDiskNotFound, + errUnformattedDisk, }...) { logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", disks[index], bucket, object, err), diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 397b9af84..3da2f33cd 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -171,6 +171,16 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, healBuckets := make([]string, len(buckets)) copy(healBuckets, buckets) + // Heal all buckets first in this erasure set - this is useful + // for new objects upload in different buckets to be successful + for _, bucket := range healBuckets { + _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode}) + if err != nil { + // Log bucket healing error if any, we shall retry again. + logger.LogIf(ctx, err) + } + } + var retErr error // Heal all buckets with all objects for _, bucket := range healBuckets { @@ -189,7 +199,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } tracker.Object = "" tracker.Bucket = bucket - // Heal current bucket + // Heal current bucket again in case if it is failed + // in the being of erasure set healing if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ ScanMode: scanMode, }); err != nil {