mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
fix: crawler to skip healing the drives in a set being healed (#11274)
If an erasure set had a drive replacement recently, we don't need to attempt healing on another drive with in the same erasure set - this would ensure we do not double heal the same content and also prioritizes usage for such an erasure set to be calculated sooner.
This commit is contained in:
parent
e8ce348da1
commit
e0055609bb
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/color"
|
||||
"github.com/minio/minio/pkg/console"
|
||||
)
|
||||
|
||||
@ -142,7 +143,7 @@ wait:
|
||||
}
|
||||
|
||||
if serverDebugLog {
|
||||
console.Debugf("disk check timer fired, attempting to heal %d drives\n", len(healDisks))
|
||||
console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks))
|
||||
}
|
||||
|
||||
// heal only if new disks found.
|
||||
|
@ -182,6 +182,8 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
return cache, errors.New("internal error: root scan attempted")
|
||||
}
|
||||
|
||||
skipHeal := cache.Info.SkipHealing
|
||||
|
||||
s := folderScanner{
|
||||
root: basePath,
|
||||
getSize: getSize,
|
||||
@ -244,7 +246,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
default:
|
||||
}
|
||||
var err error
|
||||
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1)
|
||||
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1, skipHeal)
|
||||
if err != nil {
|
||||
// No useful information...
|
||||
return cache, err
|
||||
@ -262,7 +264,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
return s.newCache, ctx.Err()
|
||||
default:
|
||||
}
|
||||
du, err := s.deepScanFolder(ctx, folder)
|
||||
du, err := s.deepScanFolder(ctx, folder, skipHeal)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
@ -324,7 +326,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
}
|
||||
|
||||
// Update on this cycle...
|
||||
du, err := s.deepScanFolder(ctx, folder)
|
||||
du, err := s.deepScanFolder(ctx, folder, skipHeal)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
@ -347,7 +349,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
// Files found in the folders will be added to f.newCache.
|
||||
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
||||
// If final is not provided the folders found are returned from the function.
|
||||
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) {
|
||||
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool, skipHeal bool) ([]cachedFolder, error) {
|
||||
var nextFolders []cachedFolder
|
||||
done := ctx.Done()
|
||||
scannerLogPrefix := color.Green("folder-scanner:")
|
||||
@ -455,6 +457,11 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
|
||||
}
|
||||
|
||||
// if the drive belongs to an erasure set
|
||||
// that is already being healed, skip the
|
||||
// healing attempt on this drive.
|
||||
item.heal = item.heal && !skipHeal
|
||||
|
||||
sizeSummary, err := f.getSize(item)
|
||||
if err == errSkipFile {
|
||||
wait() // wait to proceed to next entry.
|
||||
@ -659,7 +666,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
}
|
||||
|
||||
// deepScanFolder will deep scan a folder and return the size if no error occurs.
|
||||
func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) (*dataUsageEntry, error) {
|
||||
func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder, skipHeal bool) (*dataUsageEntry, error) {
|
||||
var cache dataUsageEntry
|
||||
|
||||
done := ctx.Done()
|
||||
@ -700,18 +707,23 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder)
|
||||
activeLifeCycle = f.oldCache.Info.lifeCycle
|
||||
}
|
||||
|
||||
sizeSummary, err := f.getSize(
|
||||
crawlItem{
|
||||
Path: fileName,
|
||||
Typ: typ,
|
||||
bucket: bucket,
|
||||
prefix: path.Dir(prefix),
|
||||
objectName: path.Base(entName),
|
||||
debug: f.dataUsageCrawlDebug,
|
||||
lifeCycle: activeLifeCycle,
|
||||
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
|
||||
})
|
||||
item := crawlItem{
|
||||
Path: fileName,
|
||||
Typ: typ,
|
||||
bucket: bucket,
|
||||
prefix: path.Dir(prefix),
|
||||
objectName: path.Base(entName),
|
||||
debug: f.dataUsageCrawlDebug,
|
||||
lifeCycle: activeLifeCycle,
|
||||
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
|
||||
}
|
||||
|
||||
// if the drive belongs to an erasure set
|
||||
// that is already being healed, skip the
|
||||
// healing attempt on this drive.
|
||||
item.heal = item.heal && !skipHeal
|
||||
|
||||
sizeSummary, err := f.getSize(item)
|
||||
if err == errSkipFile {
|
||||
// Wait to throttle IO
|
||||
wait()
|
||||
|
@ -88,9 +88,12 @@ type dataUsageEntryInfo struct {
|
||||
|
||||
type dataUsageCacheInfo struct {
|
||||
// Name of the bucket. Also root element.
|
||||
Name string
|
||||
LastUpdate time.Time
|
||||
NextCycle uint32
|
||||
Name string
|
||||
LastUpdate time.Time
|
||||
NextCycle uint32
|
||||
// indicates if the disk is being healed and crawler
|
||||
// should skip healing the disk
|
||||
SkipHealing bool
|
||||
BloomFilter []byte `msg:"BloomFilter,omitempty"`
|
||||
lifeCycle *lifecycle.Lifecycle `msg:"-"`
|
||||
}
|
||||
|
@ -313,6 +313,12 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "NextCycle")
|
||||
return
|
||||
}
|
||||
case "SkipHealing":
|
||||
z.SkipHealing, err = dc.ReadBool()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "SkipHealing")
|
||||
return
|
||||
}
|
||||
case "BloomFilter":
|
||||
z.BloomFilter, err = dc.ReadBytes(z.BloomFilter)
|
||||
if err != nil {
|
||||
@ -333,11 +339,11 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// omitempty: check for empty values
|
||||
zb0001Len := uint32(4)
|
||||
var zb0001Mask uint8 /* 4 bits */
|
||||
zb0001Len := uint32(5)
|
||||
var zb0001Mask uint8 /* 5 bits */
|
||||
if z.BloomFilter == nil {
|
||||
zb0001Len--
|
||||
zb0001Mask |= 0x8
|
||||
zb0001Mask |= 0x10
|
||||
}
|
||||
// variable map header, size zb0001Len
|
||||
err = en.Append(0x80 | uint8(zb0001Len))
|
||||
@ -377,7 +383,17 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "NextCycle")
|
||||
return
|
||||
}
|
||||
if (zb0001Mask & 0x8) == 0 { // if not empty
|
||||
// write "SkipHealing"
|
||||
err = en.Append(0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteBool(z.SkipHealing)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "SkipHealing")
|
||||
return
|
||||
}
|
||||
if (zb0001Mask & 0x10) == 0 { // if not empty
|
||||
// write "BloomFilter"
|
||||
err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
|
||||
if err != nil {
|
||||
@ -396,11 +412,11 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// omitempty: check for empty values
|
||||
zb0001Len := uint32(4)
|
||||
var zb0001Mask uint8 /* 4 bits */
|
||||
zb0001Len := uint32(5)
|
||||
var zb0001Mask uint8 /* 5 bits */
|
||||
if z.BloomFilter == nil {
|
||||
zb0001Len--
|
||||
zb0001Mask |= 0x8
|
||||
zb0001Mask |= 0x10
|
||||
}
|
||||
// variable map header, size zb0001Len
|
||||
o = append(o, 0x80|uint8(zb0001Len))
|
||||
@ -416,7 +432,10 @@ func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
// string "NextCycle"
|
||||
o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65)
|
||||
o = msgp.AppendUint32(o, z.NextCycle)
|
||||
if (zb0001Mask & 0x8) == 0 { // if not empty
|
||||
// string "SkipHealing"
|
||||
o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
|
||||
o = msgp.AppendBool(o, z.SkipHealing)
|
||||
if (zb0001Mask & 0x10) == 0 { // if not empty
|
||||
// string "BloomFilter"
|
||||
o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72)
|
||||
o = msgp.AppendBytes(o, z.BloomFilter)
|
||||
@ -460,6 +479,12 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "NextCycle")
|
||||
return
|
||||
}
|
||||
case "SkipHealing":
|
||||
z.SkipHealing, bts, err = msgp.ReadBoolBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "SkipHealing")
|
||||
return
|
||||
}
|
||||
case "BloomFilter":
|
||||
z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter)
|
||||
if err != nil {
|
||||
@ -480,7 +505,7 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *dataUsageCacheInfo) Msgsize() (s int) {
|
||||
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BytesPrefixSize + len(z.BloomFilter)
|
||||
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
@ -37,65 +36,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
|
||||
return newDisks
|
||||
}
|
||||
|
||||
type sortSlices struct {
|
||||
disks []StorageAPI
|
||||
infos []DiskInfo
|
||||
}
|
||||
|
||||
type sortByOther sortSlices
|
||||
|
||||
func (sbo sortByOther) Len() int {
|
||||
return len(sbo.disks)
|
||||
}
|
||||
|
||||
func (sbo sortByOther) Swap(i, j int) {
|
||||
sbo.disks[i], sbo.disks[j] = sbo.disks[j], sbo.disks[i]
|
||||
sbo.infos[i], sbo.infos[j] = sbo.infos[j], sbo.infos[i]
|
||||
}
|
||||
|
||||
func (sbo sortByOther) Less(i, j int) bool {
|
||||
return sbo.infos[i].UsedInodes < sbo.infos[j].UsedInodes
|
||||
}
|
||||
|
||||
func (er erasureObjects) getOnlineDisksSortedByUsedInodes() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var infos []DiskInfo
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if disks[i-1] == nil {
|
||||
return
|
||||
}
|
||||
di, err := disks[i-1].DiskInfo(context.Background())
|
||||
if err != nil || di.Healing {
|
||||
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
//
|
||||
// - Do not consume disks which are being healed
|
||||
//
|
||||
// - Future: skip busy disks
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
newDisks = append(newDisks, disks[i-1])
|
||||
infos = append(infos, di)
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
slices := sortSlices{newDisks, infos}
|
||||
sort.Sort(sortByOther(slices))
|
||||
|
||||
return slices.disks
|
||||
}
|
||||
|
||||
func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
|
||||
disks := er.getDisks()
|
||||
var wg sync.WaitGroup
|
||||
|
@ -223,6 +223,51 @@ func (er erasureObjects) StorageInfo(ctx context.Context) (StorageInfo, []error)
|
||||
return getStorageInfo(disks, endpoints)
|
||||
}
|
||||
|
||||
func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) {
|
||||
var wg sync.WaitGroup
|
||||
disks := er.getDisks()
|
||||
infos := make([]DiskInfo, len(disks))
|
||||
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
disk := disks[i-1]
|
||||
|
||||
if disk == nil {
|
||||
return
|
||||
}
|
||||
|
||||
di, err := disk.DiskInfo(context.Background())
|
||||
if err != nil {
|
||||
// - Do not consume disks which are not reachable
|
||||
// unformatted or simply not accessible for some reason.
|
||||
//
|
||||
//
|
||||
// - Future: skip busy disks
|
||||
return
|
||||
}
|
||||
|
||||
infos[i-1] = di
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, info := range infos {
|
||||
// Check if one of the drives in the set is being healed.
|
||||
// this information is used by crawler to skip healing
|
||||
// this erasure set while it calculates the usage.
|
||||
if info.Healing {
|
||||
healing = true
|
||||
continue
|
||||
}
|
||||
newDisks = append(newDisks, disks[i])
|
||||
}
|
||||
|
||||
return newDisks, healing
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed.
|
||||
// Updates are sent on a regular basis and the caller *must* consume them.
|
||||
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
|
||||
@ -231,8 +276,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
return nil
|
||||
}
|
||||
|
||||
// Collect disks we can use, sorted by least inode usage.
|
||||
disks := er.getOnlineDisksSortedByUsedInodes()
|
||||
// Collect disks we can use.
|
||||
disks, healing := er.getOnlineDisksWithHealing()
|
||||
if len(disks) == 0 {
|
||||
logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl")
|
||||
return nil
|
||||
@ -247,6 +292,11 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
continue
|
||||
}
|
||||
id, _ := disk.GetDiskID()
|
||||
if id == "" {
|
||||
// its possible that disk is unformatted
|
||||
// or just went offline
|
||||
continue
|
||||
}
|
||||
allDiskIDs = append(allDiskIDs, id)
|
||||
}
|
||||
|
||||
@ -348,6 +398,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
cache.Info.Name = bucket.Name
|
||||
}
|
||||
cache.Info.BloomFilter = bloom
|
||||
cache.Info.SkipHealing = healing
|
||||
cache.Disks = allDiskIDs
|
||||
if cache.Info.Name != bucket.Name {
|
||||
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
|
||||
|
@ -22,6 +22,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/color"
|
||||
"github.com/minio/minio/pkg/console"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
@ -146,6 +148,10 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
|
||||
}
|
||||
}
|
||||
|
||||
if serverDebugLog {
|
||||
console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, setIndex+1)
|
||||
}
|
||||
|
||||
var entryChs []FileInfoVersionsCh
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
Loading…
Reference in New Issue
Block a user