Do regular checks for healing status while scanning (#19946)

This commit is contained in:
Klaus Post 2024-06-18 09:11:04 -07:00 committed by GitHub
parent eb990f64a9
commit 2f9018f03b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 214 additions and 659 deletions

View File

@ -464,7 +464,10 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
} }
// Remove .healing.bin from all disks with similar heal-id // Remove .healing.bin from all disks with similar heal-id
disks := z.serverPools[poolIdx].sets[setIdx].getDisks() disks, err := z.GetDisks(poolIdx, setIdx)
if err != nil {
return err
}
for _, disk := range disks { for _, disk := range disks {
if disk == nil { if disk == nil {

View File

@ -31,6 +31,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
@ -250,6 +251,7 @@ type folderScanner struct {
scanMode madmin.HealScanMode scanMode madmin.HealScanMode
weSleep func() bool weSleep func() bool
shouldHeal func() bool
disks []StorageAPI disks []StorageAPI
disksQuorum int disksQuorum int
@ -304,11 +306,12 @@ type folderScanner struct {
// The returned cache will always be valid, but may not be updated from the existing. // The returned cache will always be valid, but may not be updated from the existing.
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner. // Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance. // If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, healing bool, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) { func scanDataFolder(ctx context.Context, disks []StorageAPI, drive *xlStorage, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) {
switch cache.Info.Name { switch cache.Info.Name {
case "", dataUsageRoot: case "", dataUsageRoot:
return cache, errors.New("internal error: root scan attempted") return cache, errors.New("internal error: root scan attempted")
} }
basePath := drive.drivePath
updatePath, closeDisk := globalScannerMetrics.currentPathUpdater(basePath, cache.Info.Name) updatePath, closeDisk := globalScannerMetrics.currentPathUpdater(basePath, cache.Info.Name)
defer closeDisk() defer closeDisk()
@ -319,7 +322,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, he
newCache: dataUsageCache{Info: cache.Info}, newCache: dataUsageCache{Info: cache.Info},
updateCache: dataUsageCache{Info: cache.Info}, updateCache: dataUsageCache{Info: cache.Info},
dataUsageScannerDebug: false, dataUsageScannerDebug: false,
healObjectSelect: healObjectSelectProb, healObjectSelect: 0,
scanMode: scanMode, scanMode: scanMode,
weSleep: weSleep, weSleep: weSleep,
updates: cache.Info.updates, updates: cache.Info.updates,
@ -328,6 +331,32 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, he
disksQuorum: len(disks) / 2, disksQuorum: len(disks) / 2,
} }
var skipHeal atomic.Bool
if globalIsErasure || cache.Info.SkipHealing {
skipHeal.Store(true)
}
// Check if we should do healing at all.
s.shouldHeal = func() bool {
if skipHeal.Load() {
return false
}
if s.healObjectSelect == 0 {
return false
}
if di, _ := drive.DiskInfo(ctx, DiskInfoOptions{}); di.Healing {
skipHeal.Store(true)
return false
}
return true
}
// Enable healing in XL mode.
if globalIsErasure && !cache.Info.SkipHealing {
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
s.healObjectSelect = healObjectSelectProb
}
done := ctx.Done() done := ctx.Done()
// Read top level in bucket. // Read top level in bucket.
@ -338,7 +367,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, he
} }
root := dataUsageEntry{} root := dataUsageEntry{}
folder := cachedFolder{name: cache.Info.Name, objectHealProbDiv: 1} folder := cachedFolder{name: cache.Info.Name, objectHealProbDiv: 1}
err := s.scanFolder(ctx, folder, healing, &root) err := s.scanFolder(ctx, folder, &root)
if err != nil { if err != nil {
// No useful information... // No useful information...
return cache, err return cache, err
@ -369,7 +398,7 @@ func (f *folderScanner) sendUpdate() {
// Files found in the folders will be added to f.newCache. // Files found in the folders will be added to f.newCache.
// If final is provided folders will be put into f.newFolders or f.existingFolders. // If final is provided folders will be put into f.newFolders or f.existingFolders.
// If final is not provided the folders found are returned from the function. // If final is not provided the folders found are returned from the function.
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, healing bool, into *dataUsageEntry) error { func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, into *dataUsageEntry) error {
done := ctx.Done() done := ctx.Done()
scannerLogPrefix := color.Green("folder-scanner:") scannerLogPrefix := color.Green("folder-scanner:")
@ -476,14 +505,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, hea
replication: replicationCfg, replication: replicationCfg,
} }
item.heal.enabled = thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure item.heal.enabled = thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healObjectSelect/folder.objectHealProbDiv) && f.shouldHeal()
item.heal.bitrot = f.scanMode == madmin.HealDeepScan item.heal.bitrot = f.scanMode == madmin.HealDeepScan
// if the drive belongs to an erasure set
// that is already being healed, skip the
// healing attempt on this drive.
item.heal.enabled = item.heal.enabled && !healing
sz, err := f.getSize(item) sz, err := f.getSize(item)
if err != nil && err != errIgnoreFileContrib { if err != nil && err != errIgnoreFileContrib {
wait() // wait to proceed to next entry. wait() // wait to proceed to next entry.
@ -565,7 +589,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, hea
if !into.Compacted { if !into.Compacted {
dst = &dataUsageEntry{Compacted: false} dst = &dataUsageEntry{Compacted: false}
} }
if err := f.scanFolder(ctx, folder, healing, dst); err != nil { if err := f.scanFolder(ctx, folder, dst); err != nil {
return return
} }
if !into.Compacted { if !into.Compacted {
@ -646,8 +670,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, hea
} }
// Scan for healing // Scan for healing
if healing || len(abandonedChildren) == 0 { if len(abandonedChildren) == 0 || !f.shouldHeal() {
// if disks are already healing or we have no abandoned childrens do not need to heal // If we are not heal scanning, return now.
break break
} }
@ -681,6 +705,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, hea
healObjectsPrefix := color.Green("healObjects:") healObjectsPrefix := color.Green("healObjects:")
for k := range abandonedChildren { for k := range abandonedChildren {
if !f.shouldHeal() {
break
}
bucket, prefix := path2BucketObject(k) bucket, prefix := path2BucketObject(k)
stopFn := globalScannerMetrics.time(scannerMetricCheckMissing) stopFn := globalScannerMetrics.time(scannerMetricCheckMissing)
f.updateCurrentPath(k) f.updateCurrentPath(k)
@ -714,6 +741,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, hea
}, },
// Some disks have data for this. // Some disks have data for this.
partial: func(entries metaCacheEntries, errs []error) { partial: func(entries metaCacheEntries, errs []error) {
if !f.shouldHeal() {
cancel()
return
}
entry, ok := entries.resolve(&resolver) entry, ok := entries.resolve(&resolver)
if !ok { if !ok {
// check if we can get one entry at least // check if we can get one entry at least

View File

@ -352,6 +352,9 @@ type dataUsageCacheInfo struct {
Name string Name string
NextCycle uint32 NextCycle uint32
LastUpdate time.Time LastUpdate time.Time
// indicates if the disk is being healed and scanner
// should skip healing the disk
SkipHealing bool
// Active lifecycle, if any on the bucket // Active lifecycle, if any on the bucket
lifeCycle *lifecycle.Lifecycle `msg:"-"` lifeCycle *lifecycle.Lifecycle `msg:"-"`

File diff suppressed because it is too large Load Diff

View File

@ -26,6 +26,9 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/minio/minio/internal/cachevalue"
) )
type usageTestFile struct { type usageTestFile struct {
@ -61,10 +64,13 @@ func TestDataUsageUpdate(t *testing.T) {
} }
return return
} }
xls := xlStorage{drivePath: base, diskInfoCache: cachevalue.New[DiskInfo]()}
xls.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{}, func(ctx context.Context) (DiskInfo, error) {
return DiskInfo{Total: 1 << 40, Free: 1 << 40}, nil
})
weSleep := func() bool { return false } weSleep := func() bool { return false }
got, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep) got, err := scanDataFolder(context.Background(), nil, &xls, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -174,7 +180,7 @@ func TestDataUsageUpdate(t *testing.T) {
} }
// Changed dir must be picked up in this many cycles. // Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ { for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), nil, base, false, got, getSize, 0, weSleep) got, err = scanDataFolder(context.Background(), nil, &xls, got, getSize, 0, weSleep)
got.Info.NextCycle++ got.Info.NextCycle++
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -283,8 +289,12 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
} }
weSleep := func() bool { return false } weSleep := func() bool { return false }
xls := xlStorage{drivePath: base, diskInfoCache: cachevalue.New[DiskInfo]()}
xls.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{}, func(ctx context.Context) (DiskInfo, error) {
return DiskInfo{Total: 1 << 40, Free: 1 << 40}, nil
})
got, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep) got, err := scanDataFolder(context.Background(), nil, &xls, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -419,7 +429,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
} }
// Changed dir must be picked up in this many cycles. // Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ { for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), nil, base, false, got, getSize, 0, weSleep) got, err = scanDataFolder(context.Background(), nil, &xls, got, getSize, 0, weSleep)
got.Info.NextCycle++ got.Info.NextCycle++
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -567,8 +577,12 @@ func TestDataUsageCacheSerialize(t *testing.T) {
} }
return return
} }
xls := xlStorage{drivePath: base, diskInfoCache: cachevalue.New[DiskInfo]()}
xls.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{}, func(ctx context.Context) (DiskInfo, error) {
return DiskInfo{Total: 1 << 40, Free: 1 << 40}, nil
})
weSleep := func() bool { return false } weSleep := func() bool { return false }
want, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep) want, err := scanDataFolder(context.Background(), nil, &xls, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -233,7 +233,7 @@ func TestListOnlineDisks(t *testing.T) {
data := bytes.Repeat([]byte("a"), smallFileThreshold*16) data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
z := obj.(*erasureServerPools) z := obj.(*erasureServerPools)
erasureDisks, _, err := z.GetDisks(0, 0) erasureDisks, err := z.GetDisks(0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -409,7 +409,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
data := bytes.Repeat([]byte("a"), smallFileThreshold/2) data := bytes.Repeat([]byte("a"), smallFileThreshold/2)
z := obj.(*erasureServerPools) z := obj.(*erasureServerPools)
erasureDisks, _, err := z.GetDisks(0, 0) erasureDisks, err := z.GetDisks(0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -302,12 +302,11 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string
} }
// Return the disks belonging to the poolIdx, and setIdx. // Return the disks belonging to the poolIdx, and setIdx.
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, bool, error) { func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) {
if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) { if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) {
disks, healing := z.serverPools[poolIdx].sets[setIdx].getOnlineDisksWithHealing(true) return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil
return disks, healing, nil
} }
return nil, false, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)) return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
} }
// Return the count of disks in each pool // Return the count of disks in each pool

View File

@ -381,7 +381,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
} }
// Collect disks we can use. // Collect disks we can use.
disks, _ := er.getOnlineDisksWithHealing(false) disks, healing := er.getOnlineDisksWithHealing(false)
if len(disks) == 0 { if len(disks) == 0 {
scannerLogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle")) scannerLogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle"))
return nil return nil
@ -497,6 +497,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
if cache.Info.Name == "" { if cache.Info.Name == "" {
cache.Info.Name = bucket.Name cache.Info.Name = bucket.Name
} }
cache.Info.SkipHealing = healing
cache.Info.NextCycle = wantCycle cache.Info.NextCycle = wantCycle
if cache.Info.Name != bucket.Name { if cache.Info.Name != bucket.Name {
cache.Info = dataUsageCacheInfo{ cache.Info = dataUsageCacheInfo{

View File

@ -287,7 +287,7 @@ type ObjectLayer interface {
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
GetDisks(poolIdx, setIdx int) ([]StorageAPI, bool, error) // return the disks belonging to pool and set. GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) // return the disks belonging to pool and set.
SetDriveCounts() []int // list of erasure stripe size for each pool in order. SetDriveCounts() []int // list of erasure stripe size for each pool in order.
// Healing operations. // Healing operations.

View File

@ -554,14 +554,14 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
poolIdx, setIdx, _ := s.GetDiskLoc() poolIdx, setIdx, _ := s.GetDiskLoc()
disks, healing, err := objAPI.GetDisks(poolIdx, setIdx) disks, err := objAPI.GetDisks(poolIdx, setIdx)
if err != nil { if err != nil {
return cache, err return cache, err
} }
cache.Info.updates = updates cache.Info.updates = updates
dataUsageInfo, err := scanDataFolder(ctx, disks, s.drivePath, healing, cache, func(item scannerItem) (sizeSummary, error) { dataUsageInfo, err := scanDataFolder(ctx, disks, s, cache, func(item scannerItem) (sizeSummary, error) {
// Look for `xl.meta/xl.json' at the leaf. // Look for `xl.meta/xl.json' at the leaf.
if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) && if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) &&
!strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) { !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) {