diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index aae462b06..6823e31b1 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -25,6 +25,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/console" ) @@ -142,7 +143,7 @@ wait: } if serverDebugLog { - console.Debugf("disk check timer fired, attempting to heal %d drives\n", len(healDisks)) + console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks)) } // heal only if new disks found. diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 8d07135cf..532ddf497 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -182,6 +182,8 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, return cache, errors.New("internal error: root scan attempted") } + skipHeal := cache.Info.SkipHealing + s := folderScanner{ root: basePath, getSize: getSize, @@ -244,7 +246,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, default: } var err error - todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1) + todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1, skipHeal) if err != nil { // No useful information... return cache, err @@ -262,7 +264,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, return s.newCache, ctx.Err() default: } - du, err := s.deepScanFolder(ctx, folder) + du, err := s.deepScanFolder(ctx, folder, skipHeal) if err != nil { logger.LogIf(ctx, err) continue @@ -324,7 +326,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, } // Update on this cycle... - du, err := s.deepScanFolder(ctx, folder) + du, err := s.deepScanFolder(ctx, folder, skipHeal) if err != nil { logger.LogIf(ctx, err) continue @@ -347,7 +349,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, // Files found in the folders will be added to f.newCache. // If final is provided folders will be put into f.newFolders or f.existingFolders. // If final is not provided the folders found are returned from the function. -func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) { +func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool, skipHeal bool) ([]cachedFolder, error) { var nextFolders []cachedFolder done := ctx.Done() scannerLogPrefix := color.Green("folder-scanner:") @@ -455,6 +457,11 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, } + // if the drive belongs to an erasure set + // that is already being healed, skip the + // healing attempt on this drive. + item.heal = item.heal && !skipHeal + sizeSummary, err := f.getSize(item) if err == errSkipFile { wait() // wait to proceed to next entry. @@ -659,7 +666,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } // deepScanFolder will deep scan a folder and return the size if no error occurs. -func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) (*dataUsageEntry, error) { +func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder, skipHeal bool) (*dataUsageEntry, error) { var cache dataUsageEntry done := ctx.Done() @@ -700,18 +707,23 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) activeLifeCycle = f.oldCache.Info.lifeCycle } - sizeSummary, err := f.getSize( - crawlItem{ - Path: fileName, - Typ: typ, - bucket: bucket, - prefix: path.Dir(prefix), - objectName: path.Base(entName), - debug: f.dataUsageCrawlDebug, - lifeCycle: activeLifeCycle, - heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, - }) + item := crawlItem{ + Path: fileName, + Typ: typ, + bucket: bucket, + prefix: path.Dir(prefix), + objectName: path.Base(entName), + debug: f.dataUsageCrawlDebug, + lifeCycle: activeLifeCycle, + heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, + } + // if the drive belongs to an erasure set + // that is already being healed, skip the + // healing attempt on this drive. + item.heal = item.heal && !skipHeal + + sizeSummary, err := f.getSize(item) if err == errSkipFile { // Wait to throttle IO wait() diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index b3760d6bc..c6d7cf467 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -88,9 +88,12 @@ type dataUsageEntryInfo struct { type dataUsageCacheInfo struct { // Name of the bucket. Also root element. - Name string - LastUpdate time.Time - NextCycle uint32 + Name string + LastUpdate time.Time + NextCycle uint32 + // indicates if the disk is being healed and crawler + // should skip healing the disk + SkipHealing bool BloomFilter []byte `msg:"BloomFilter,omitempty"` lifeCycle *lifecycle.Lifecycle `msg:"-"` } diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index 2366b1aac..a686d2de6 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -313,6 +313,12 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "NextCycle") return } + case "SkipHealing": + z.SkipHealing, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "SkipHealing") + return + } case "BloomFilter": z.BloomFilter, err = dc.ReadBytes(z.BloomFilter) if err != nil { @@ -333,11 +339,11 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { // omitempty: check for empty values - zb0001Len := uint32(4) - var zb0001Mask uint8 /* 4 bits */ + zb0001Len := uint32(5) + var zb0001Mask uint8 /* 5 bits */ if z.BloomFilter == nil { zb0001Len-- - zb0001Mask |= 0x8 + zb0001Mask |= 0x10 } // variable map header, size zb0001Len err = en.Append(0x80 | uint8(zb0001Len)) @@ -377,7 +383,17 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "NextCycle") return } - if (zb0001Mask & 0x8) == 0 { // if not empty + // write "SkipHealing" + err = en.Append(0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) + if err != nil { + return + } + err = en.WriteBool(z.SkipHealing) + if err != nil { + err = msgp.WrapError(err, "SkipHealing") + return + } + if (zb0001Mask & 0x10) == 0 { // if not empty // write "BloomFilter" err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) if err != nil { @@ -396,11 +412,11 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // omitempty: check for empty values - zb0001Len := uint32(4) - var zb0001Mask uint8 /* 4 bits */ + zb0001Len := uint32(5) + var zb0001Mask uint8 /* 5 bits */ if z.BloomFilter == nil { zb0001Len-- - zb0001Mask |= 0x8 + zb0001Mask |= 0x10 } // variable map header, size zb0001Len o = append(o, 0x80|uint8(zb0001Len)) @@ -416,7 +432,10 @@ func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "NextCycle" o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) o = msgp.AppendUint32(o, z.NextCycle) - if (zb0001Mask & 0x8) == 0 { // if not empty + // string "SkipHealing" + o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) + o = msgp.AppendBool(o, z.SkipHealing) + if (zb0001Mask & 0x10) == 0 { // if not empty // string "BloomFilter" o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) o = msgp.AppendBytes(o, z.BloomFilter) @@ -460,6 +479,12 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "NextCycle") return } + case "SkipHealing": + z.SkipHealing, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SkipHealing") + return + } case "BloomFilter": z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter) if err != nil { @@ -480,7 +505,7 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *dataUsageCacheInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) return } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 017c405dc..1b3ae6a21 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "sort" "sync" "github.com/minio/minio/pkg/sync/errgroup" @@ -37,65 +36,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { return newDisks } -type sortSlices struct { - disks []StorageAPI - infos []DiskInfo -} - -type sortByOther sortSlices - -func (sbo sortByOther) Len() int { - return len(sbo.disks) -} - -func (sbo sortByOther) Swap(i, j int) { - sbo.disks[i], sbo.disks[j] = sbo.disks[j], sbo.disks[i] - sbo.infos[i], sbo.infos[j] = sbo.infos[j], sbo.infos[i] -} - -func (sbo sortByOther) Less(i, j int) bool { - return sbo.infos[i].UsedInodes < sbo.infos[j].UsedInodes -} - -func (er erasureObjects) getOnlineDisksSortedByUsedInodes() (newDisks []StorageAPI) { - disks := er.getDisks() - var wg sync.WaitGroup - var mu sync.Mutex - var infos []DiskInfo - for _, i := range hashOrder(UTCNow().String(), len(disks)) { - i := i - wg.Add(1) - go func() { - defer wg.Done() - if disks[i-1] == nil { - return - } - di, err := disks[i-1].DiskInfo(context.Background()) - if err != nil || di.Healing { - - // - Do not consume disks which are not reachable - // unformatted or simply not accessible for some reason. - // - // - Do not consume disks which are being healed - // - // - Future: skip busy disks - return - } - - mu.Lock() - newDisks = append(newDisks, disks[i-1]) - infos = append(infos, di) - mu.Unlock() - }() - } - wg.Wait() - - slices := sortSlices{newDisks, infos} - sort.Sort(sortByOther(slices)) - - return slices.disks -} - func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { disks := er.getDisks() var wg sync.WaitGroup diff --git a/cmd/erasure.go b/cmd/erasure.go index fea593436..f44cac7a9 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -223,6 +223,51 @@ func (er erasureObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) return getStorageInfo(disks, endpoints) } +func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) { + var wg sync.WaitGroup + disks := er.getDisks() + infos := make([]DiskInfo, len(disks)) + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + i := i + wg.Add(1) + go func() { + defer wg.Done() + + disk := disks[i-1] + + if disk == nil { + return + } + + di, err := disk.DiskInfo(context.Background()) + if err != nil { + // - Do not consume disks which are not reachable + // unformatted or simply not accessible for some reason. + // + // + // - Future: skip busy disks + return + } + + infos[i-1] = di + }() + } + wg.Wait() + + for i, info := range infos { + // Check if one of the drives in the set is being healed. + // this information is used by crawler to skip healing + // this erasure set while it calculates the usage. + if info.Healing { + healing = true + continue + } + newDisks = append(newDisks, disks[i]) + } + + return newDisks, healing +} + // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error { @@ -231,8 +276,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc return nil } - // Collect disks we can use, sorted by least inode usage. - disks := er.getOnlineDisksSortedByUsedInodes() + // Collect disks we can use. + disks, healing := er.getOnlineDisksWithHealing() if len(disks) == 0 { logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl") return nil @@ -247,6 +292,11 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc continue } id, _ := disk.GetDiskID() + if id == "" { + // its possible that disk is unformatted + // or just went offline + continue + } allDiskIDs = append(allDiskIDs, id) } @@ -348,6 +398,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc cache.Info.Name = bucket.Name } cache.Info.BloomFilter = bloom + cache.Info.SkipHealing = healing cache.Disks = allDiskIDs if cache.Info.Name != bucket.Name { logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name)) diff --git a/cmd/global-heal.go b/cmd/global-heal.go index b42d313b8..53d97312f 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -22,6 +22,8 @@ import ( "time" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/console" "github.com/minio/minio/pkg/madmin" ) @@ -146,6 +148,10 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis } } + if serverDebugLog { + console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, setIndex+1) + } + var entryChs []FileInfoVersionsCh var mu sync.Mutex var wg sync.WaitGroup