mirror of
https://github.com/minio/minio.git
synced 2025-02-23 03:22:30 -05:00
skip healing properly in the scanner when a drive is hotplugged (#19939)
skip healing properly in scanner when drive is hotplugged due to how the state is passed around the SkipHealing might not be the true state() of the system always, causing a situation where we might healing from the scanner on the same drive which is being. Due to this competing heals get triggered that slow each other down.
This commit is contained in:
parent
7bd1d899bc
commit
bbb64eaade
@ -464,10 +464,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
|
||||
}
|
||||
|
||||
// Remove .healing.bin from all disks with similar heal-id
|
||||
disks, err := z.GetDisks(poolIdx, setIdx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
disks := z.serverPools[poolIdx].sets[setIdx].getDisks()
|
||||
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
|
@ -304,7 +304,7 @@ type folderScanner struct {
|
||||
// The returned cache will always be valid, but may not be updated from the existing.
|
||||
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
||||
// If the supplied context is canceled the function will return at the first chance.
|
||||
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) {
|
||||
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, healing bool, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) {
|
||||
switch cache.Info.Name {
|
||||
case "", dataUsageRoot:
|
||||
return cache, errors.New("internal error: root scan attempted")
|
||||
@ -319,7 +319,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, ca
|
||||
newCache: dataUsageCache{Info: cache.Info},
|
||||
updateCache: dataUsageCache{Info: cache.Info},
|
||||
dataUsageScannerDebug: false,
|
||||
healObjectSelect: 0,
|
||||
healObjectSelect: healObjectSelectProb,
|
||||
scanMode: scanMode,
|
||||
weSleep: weSleep,
|
||||
updates: cache.Info.updates,
|
||||
@ -328,12 +328,6 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, ca
|
||||
disksQuorum: len(disks) / 2,
|
||||
}
|
||||
|
||||
// Enable healing in XL mode.
|
||||
if globalIsErasure && !cache.Info.SkipHealing {
|
||||
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
||||
s.healObjectSelect = healObjectSelectProb
|
||||
}
|
||||
|
||||
done := ctx.Done()
|
||||
|
||||
// Read top level in bucket.
|
||||
@ -344,7 +338,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, ca
|
||||
}
|
||||
root := dataUsageEntry{}
|
||||
folder := cachedFolder{name: cache.Info.Name, objectHealProbDiv: 1}
|
||||
err := s.scanFolder(ctx, folder, &root)
|
||||
err := s.scanFolder(ctx, folder, healing, &root)
|
||||
if err != nil {
|
||||
// No useful information...
|
||||
return cache, err
|
||||
@ -375,7 +369,7 @@ func (f *folderScanner) sendUpdate() {
|
||||
// Files found in the folders will be added to f.newCache.
|
||||
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
||||
// If final is not provided the folders found are returned from the function.
|
||||
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, into *dataUsageEntry) error {
|
||||
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, healing bool, into *dataUsageEntry) error {
|
||||
done := ctx.Done()
|
||||
scannerLogPrefix := color.Green("folder-scanner:")
|
||||
|
||||
@ -488,7 +482,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
// if the drive belongs to an erasure set
|
||||
// that is already being healed, skip the
|
||||
// healing attempt on this drive.
|
||||
item.heal.enabled = item.heal.enabled && f.healObjectSelect > 0
|
||||
item.heal.enabled = item.heal.enabled && !healing
|
||||
|
||||
sz, err := f.getSize(item)
|
||||
if err != nil && err != errIgnoreFileContrib {
|
||||
@ -571,7 +565,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
if !into.Compacted {
|
||||
dst = &dataUsageEntry{Compacted: false}
|
||||
}
|
||||
if err := f.scanFolder(ctx, folder, dst); err != nil {
|
||||
if err := f.scanFolder(ctx, folder, healing, dst); err != nil {
|
||||
return
|
||||
}
|
||||
if !into.Compacted {
|
||||
@ -652,8 +646,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
||||
}
|
||||
|
||||
// Scan for healing
|
||||
if f.healObjectSelect == 0 || len(abandonedChildren) == 0 {
|
||||
// If we are not heal scanning, return now.
|
||||
if healing || len(abandonedChildren) == 0 {
|
||||
// if disks are already healing or we have no abandoned childrens do not need to heal
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -352,9 +352,6 @@ type dataUsageCacheInfo struct {
|
||||
Name string
|
||||
NextCycle uint32
|
||||
LastUpdate time.Time
|
||||
// indicates if the disk is being healed and scanner
|
||||
// should skip healing the disk
|
||||
SkipHealing bool
|
||||
|
||||
// Active lifecycle, if any on the bucket
|
||||
lifeCycle *lifecycle.Lifecycle `msg:"-"`
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -64,7 +64,7 @@ func TestDataUsageUpdate(t *testing.T) {
|
||||
|
||||
weSleep := func() bool { return false }
|
||||
|
||||
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||
got, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -174,7 +174,7 @@ func TestDataUsageUpdate(t *testing.T) {
|
||||
}
|
||||
// Changed dir must be picked up in this many cycles.
|
||||
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
||||
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep)
|
||||
got, err = scanDataFolder(context.Background(), nil, base, false, got, getSize, 0, weSleep)
|
||||
got.Info.NextCycle++
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -284,7 +284,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
|
||||
|
||||
weSleep := func() bool { return false }
|
||||
|
||||
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep)
|
||||
got, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -419,7 +419,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
|
||||
}
|
||||
// Changed dir must be picked up in this many cycles.
|
||||
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
||||
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep)
|
||||
got, err = scanDataFolder(context.Background(), nil, base, false, got, getSize, 0, weSleep)
|
||||
got.Info.NextCycle++
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -568,7 +568,7 @@ func TestDataUsageCacheSerialize(t *testing.T) {
|
||||
return
|
||||
}
|
||||
weSleep := func() bool { return false }
|
||||
want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||
want, err := scanDataFolder(context.Background(), nil, base, false, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -233,7 +233,7 @@ func TestListOnlineDisks(t *testing.T) {
|
||||
data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
|
||||
z := obj.(*erasureServerPools)
|
||||
|
||||
erasureDisks, err := z.GetDisks(0, 0)
|
||||
erasureDisks, _, err := z.GetDisks(0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -409,7 +409,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
||||
data := bytes.Repeat([]byte("a"), smallFileThreshold/2)
|
||||
z := obj.(*erasureServerPools)
|
||||
|
||||
erasureDisks, err := z.GetDisks(0, 0)
|
||||
erasureDisks, _, err := z.GetDisks(0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -302,11 +302,12 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string
|
||||
}
|
||||
|
||||
// Return the disks belonging to the poolIdx, and setIdx.
|
||||
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) {
|
||||
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, bool, error) {
|
||||
if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) {
|
||||
return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil
|
||||
disks, healing := z.serverPools[poolIdx].sets[setIdx].getOnlineDisksWithHealing(true)
|
||||
return disks, healing, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
|
||||
return nil, false, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
|
||||
}
|
||||
|
||||
// Return the count of disks in each pool
|
||||
|
@ -381,7 +381,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
||||
}
|
||||
|
||||
// Collect disks we can use.
|
||||
disks, healing := er.getOnlineDisksWithHealing(false)
|
||||
disks, _ := er.getOnlineDisksWithHealing(false)
|
||||
if len(disks) == 0 {
|
||||
scannerLogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle"))
|
||||
return nil
|
||||
@ -497,7 +497,6 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
||||
if cache.Info.Name == "" {
|
||||
cache.Info.Name = bucket.Name
|
||||
}
|
||||
cache.Info.SkipHealing = healing
|
||||
cache.Info.NextCycle = wantCycle
|
||||
if cache.Info.Name != bucket.Name {
|
||||
cache.Info = dataUsageCacheInfo{
|
||||
|
@ -287,7 +287,7 @@ type ObjectLayer interface {
|
||||
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
|
||||
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
|
||||
GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) // return the disks belonging to pool and set.
|
||||
GetDisks(poolIdx, setIdx int) ([]StorageAPI, bool, error) // return the disks belonging to pool and set.
|
||||
SetDriveCounts() []int // list of erasure stripe size for each pool in order.
|
||||
|
||||
// Healing operations.
|
||||
|
@ -554,14 +554,14 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
||||
|
||||
poolIdx, setIdx, _ := s.GetDiskLoc()
|
||||
|
||||
disks, err := objAPI.GetDisks(poolIdx, setIdx)
|
||||
disks, healing, err := objAPI.GetDisks(poolIdx, setIdx)
|
||||
if err != nil {
|
||||
return cache, err
|
||||
}
|
||||
|
||||
cache.Info.updates = updates
|
||||
|
||||
dataUsageInfo, err := scanDataFolder(ctx, disks, s.drivePath, cache, func(item scannerItem) (sizeSummary, error) {
|
||||
dataUsageInfo, err := scanDataFolder(ctx, disks, s.drivePath, healing, cache, func(item scannerItem) (sizeSummary, error) {
|
||||
// Look for `xl.meta/xl.json' at the leaf.
|
||||
if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) &&
|
||||
!strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user