From c097ce9c324fad06a57ceaf265311ef7b5519b57 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 24 Aug 2020 13:47:01 -0700 Subject: [PATCH] continous healing based on crawler (#10103) Design: https://gist.github.com/klauspost/792fe25c315caf1dd15c8e79df124914 --- cmd/admin-heal-ops.go | 22 +++-- cmd/background-heal-ops.go | 11 +-- cmd/data-crawler.go | 169 ++++++++++++++++++++++++++++++------- cmd/data-update-tracker.go | 3 - cmd/data-usage-cache.go | 15 ++++ cmd/data-usage_test.go | 7 ++ cmd/erasure-sets.go | 6 +- cmd/erasure-zones.go | 19 +++-- cmd/erasure.go | 13 +-- cmd/fs-v1.go | 2 + cmd/global-heal.go | 69 +-------------- cmd/server-main.go | 4 - cmd/storage-rest-client.go | 1 + 13 files changed, 205 insertions(+), 136 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 091675c6a..f7835a6a5 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -654,9 +654,11 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem if !h.reportProgress { // Object might have been deleted, by the time heal // was attempted, we should ignore this object and - // return success. + // return the error and not calculate this object + // as part of the metrics. if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) { - return nil + // Return the error so that caller can handle it. + return res.err } h.mutex.Lock() @@ -720,6 +722,8 @@ func (h *healSequence) healItemsFromSourceCh() error { if err := h.queueHealTask(source, itemType); err != nil { switch err.(type) { case ObjectExistsAsDirectory: + case ObjectNotFound: + case VersionNotFound: default: logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w", pathJoin(source.bucket, source.object), err)) @@ -793,17 +797,17 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error { return errHealStopSignalled } - herr := h.queueHealTask(healSource{ + err := h.queueHealTask(healSource{ bucket: bucket, object: object, versionID: versionID, }, madmin.HealItemBucketMetadata) // Object might have been deleted, by the time heal // was attempted we ignore this object an move on. - if isErrObjectNotFound(herr) || isErrVersionNotFound(herr) { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { return nil } - return herr + return err }) } } @@ -904,9 +908,15 @@ func (h *healSequence) healObject(bucket, object, versionID string) error { return errHealStopSignalled } - return h.queueHealTask(healSource{ + err := h.queueHealTask(healSource{ bucket: bucket, object: object, versionID: versionID, }, madmin.HealItemObject) + // Object might have been deleted, by the time heal + // was attempted we ignore this object an move on. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + return nil + } + return err } diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 704784e8f..cbdaa1fe4 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -55,19 +55,20 @@ func (h *healRoutine) queueHealTask(task healTask) { h.tasks <- task } -func waitForLowHTTPReq(tolerance int32) { +func waitForLowHTTPReq(tolerance int32, maxWait time.Duration) { + const wait = 10 * time.Millisecond + waitCount := maxWait / wait + // Bucket notification and http trace are not costly, it is okay to ignore them // while counting the number of concurrent connections tolerance += int32(globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers()) if httpServer := newHTTPServerFn(); httpServer != nil { - // Wait at max 10 minute for an inprogress request before proceeding to heal - waitCount := 600 // Any requests in progress, delay the heal. for (httpServer.GetRequestCount() >= tolerance) && waitCount > 0 { waitCount-- - time.Sleep(1 * time.Second) + time.Sleep(wait) } } } @@ -82,7 +83,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints())) + waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) var res madmin.HealResultItem var err error diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index ba79d5cf7..2dad8a49a 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -27,6 +27,8 @@ import ( "strings" "time" + "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/lifecycle" @@ -41,9 +43,10 @@ import ( const ( dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders. dataCrawlSleepDefMult = 10.0 // Default multiplier for waits between operations. - dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. + dataCrawlStartDelay = 5 * time.Second // Time to wait on startup and between cycles. dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. + healDeleteDangling = true ) // initDataCrawler will start the crawler unless disabled. @@ -87,12 +90,6 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { if err == nil { // Store new cycle... nextBloomCycle++ - if nextBloomCycle%dataUpdateTrackerResetEvery == 0 { - if intDataUpdateTracker.debug { - logger.Info(color.Green("runDataCrawler:") + " Resetting bloom filter for next runs.") - } - nextBloomCycle++ - } var tmp [8]byte binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle) r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false) @@ -111,8 +108,9 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { } type cachedFolder struct { - name string - parent *dataUsageHash + name string + parent *dataUsageHash + objectHealProbDiv uint32 } type folderScanner struct { @@ -125,6 +123,8 @@ type folderScanner struct { dataUsageCrawlMult float64 dataUsageCrawlDebug bool + healFolderInclude uint32 // Include a clean folder one in n cycles. + healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude newFolders []cachedFolder existingFolders []cachedFolder @@ -167,8 +167,17 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, existingFolders: nil, dataUsageCrawlMult: delayMult, dataUsageCrawlDebug: intDataUpdateTracker.debug, + healFolderInclude: 0, + healObjectSelect: 0, } + // Enable healing in XL mode. + if globalIsErasure { + // Include a clean folder one in n cycles. + s.healFolderInclude = 32 + // Do a heal check on an object once every n cycles. Must divide into healFolderInclude + s.healObjectSelect = 512 + } if len(cache.Info.BloomFilter) > 0 { s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) @@ -189,7 +198,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, } // Always scan flattenLevels deep. Cache root is level 0. - todo := []cachedFolder{{name: cache.Info.Name}} + todo := []cachedFolder{{name: cache.Info.Name, objectHealProbDiv: 1}} for i := 0; i < flattenLevels; i++ { if s.dataUsageCrawlDebug { logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo)) @@ -218,7 +227,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, return s.newCache, ctx.Err() default: } - du, err := s.deepScanFolder(ctx, folder.name) + du, err := s.deepScanFolder(ctx, folder) if err != nil { logger.LogIf(ctx, err) continue @@ -249,26 +258,38 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, } h := hashPath(folder.name) if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) { - s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) - continue + if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) { + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) + continue + } else { + folder.objectHealProbDiv = s.healFolderInclude + } + folder.objectHealProbDiv = dataUsageUpdateDirCycles } - if s.withFilter != nil { _, prefix := path2BucketObjectWithBasePath(basePath, folder.name) if s.oldCache.Info.lifeCycle == nil || !s.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) { // If folder isn't in filter, skip it completely. if !s.withFilter.containsDir(folder.name) { - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) + if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) + } + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) + continue + } else { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Adding non-updated folder to heal check: %v"+logSuffix, folder.name) + } + // Update probability of including objects + folder.objectHealProbDiv = s.healFolderInclude } - s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()]) - continue } } } // Update on this cycle... - du, err := s.deepScanFolder(ctx, folder.name) + du, err := s.deepScanFolder(ctx, folder) if err != nil { logger.LogIf(ctx, err) continue @@ -301,6 +322,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo default: } thisHash := hashPath(folder.name) + existing := f.oldCache.findChildrenCopy(thisHash) // If there are lifecycle rules for the prefix, remove the filter. filter := f.withFilter @@ -309,7 +331,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo _, prefix := path2BucketObjectWithBasePath(f.root, folder.name) if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) { if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix) + logger.Info(color.Green("folder-scanner:")+" Prefix %q has active rules", prefix) } activeLifeCycle = f.oldCache.Info.lifeCycle filter = nil @@ -318,11 +340,19 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if _, ok := f.oldCache.Cache[thisHash.Key()]; filter != nil && ok { // If folder isn't in filter and we have data, skip it completely. if folder.name != dataUsageRoot && !filter.containsDir(folder.name) { - f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) - if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name) + if !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) { + f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) + if f.dataUsageCrawlDebug { + logger.Info(color.Green("folder-scanner:")+" Skipping non-updated folder: %v", folder.name) + } + continue + } else { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("folder-scanner:")+" Adding non-updated folder to heal check: %v", folder.name) + } + // If probability was already crawlerHealFolderInclude, keep it. + folder.objectHealProbDiv = f.healFolderInclude } - continue } } f.waitForLowActiveIO() @@ -336,14 +366,14 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo bucket, prefix := path2BucketObjectWithBasePath(f.root, entName) if bucket == "" { if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName) + logger.Info(color.Green("folder-scanner:")+" no bucket (%s,%s)", f.root, entName) } return nil } if isReservedOrInvalidBucket(bucket, false) { if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName) + logger.Info(color.Green("folder-scanner:")+" invalid bucket: %v, entry: %v", bucket, entName) } return nil } @@ -359,7 +389,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo _, exists := f.oldCache.Cache[h.Key()] cache.addChildString(entName) - this := cachedFolder{name: entName, parent: &thisHash} + this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} + delete(existing, h.Key()) cache.addChild(h) if final { if exists { @@ -385,11 +416,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo objectName: path.Base(entName), debug: f.dataUsageCrawlDebug, lifeCycle: activeLifeCycle, + heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv), } size, err := f.getSize(item) sleepDuration(time.Since(t), f.dataUsageCrawlMult) - if err == errSkipFile || err == errFileNotFound { + if err == errSkipFile { return nil } logger.LogIf(ctx, err) @@ -402,19 +434,78 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if err != nil { return nil, err } + + if f.healObjectSelect == 0 { + // If we are not scanning, return now. + f.newCache.replaceHashed(thisHash, folder.parent, cache) + continue + } + + objAPI := newObjectLayerWithoutSafeModeFn() + if objAPI == nil { + continue + } + + bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) + if !found { + continue + } + + // Whatever remains in 'existing' are folders at this level + // that existed in the previous run but wasn't found now. + // + // This may be because of 2 reasons: + // + // 1) The folder/object was deleted. + // 2) We come from another disk and this disk missed the write. + // + // We therefore perform a heal check. + // If that doesn't bring it back we remove the folder and assume it was deleted. + // This means that the next run will not look for it. + for k := range existing { + f.waitForLowActiveIO() + bucket, prefix := path2BucketObject(k) + if f.dataUsageCrawlDebug { + logger.Info(color.Green("folder-scanner:")+" checking disappeared folder: %v/%v", bucket, prefix) + } + + err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, + func(bucket, object, versionID string) error { + return bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: object, + versionID: versionID, + }, madmin.HealItemObject) + }) + + if f.dataUsageCrawlDebug && err != nil { + logger.Info(color.Green("healObjects:")+" checking returned value %v", err) + } + + // Add unless healing returned an error. + if err == nil { + this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} + cache.addChild(hashPath(k)) + if final { + f.existingFolders = append(f.existingFolders, this) + } else { + nextFolders = append(nextFolders, this) + } + } + } f.newCache.replaceHashed(thisHash, folder.parent, cache) } return nextFolders, nil } // deepScanFolder will deep scan a folder and return the size if no error occurs. -func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) { +func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) (*dataUsageEntry, error) { var cache dataUsageEntry done := ctx.Done() var addDir func(entName string, typ os.FileMode) error - var dirStack = []string{f.root, folder} + var dirStack = []string{f.root, folder.name} addDir = func(entName string, typ os.FileMode) error { select { @@ -445,7 +536,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dat if f.oldCache.Info.lifeCycle != nil { if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, false) { if f.dataUsageCrawlDebug { - logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix) + logger.Info(color.Green("folder-scanner:")+" Prefix %q has active rules", prefix) } activeLifeCycle = f.oldCache.Info.lifeCycle } @@ -460,6 +551,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dat objectName: path.Base(entName), debug: f.dataUsageCrawlDebug, lifeCycle: activeLifeCycle, + heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv), }) // Don't sleep for really small amount of time @@ -490,6 +582,7 @@ type crawlItem struct { prefix string // Only the prefix if any, does not have final object name. objectName string // Only the object name without prefixes. lifeCycle *lifecycle.Lifecycle + heal bool // Has the object been selected for heal check? debug bool } @@ -522,6 +615,20 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action if i.debug { logger.LogIf(ctx, err) } + if i.heal { + if i.debug { + logger.Info(color.Green("applyActions:")+" heal checking: %v/%v v%s", i.bucket, i.objectPath(), meta.oi.VersionID) + } + res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, madmin.HealOpts{Remove: healDeleteDangling}) + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + return 0 + } + if !errors.Is(err, NotImplemented{}) { + logger.LogIf(ctx, err) + return 0 + } + size = res.ObjectSize + } if i.lifeCycle == nil { return size } diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 397aecd39..89130ac7b 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -48,9 +48,6 @@ const ( dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin" dataUpdateTrackerVersion = 2 dataUpdateTrackerSaveInterval = 5 * time.Minute - - // Reset bloom filters every n cycle - dataUpdateTrackerResetEvery = 1000 ) var ( diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 2a858f921..e7a052912 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -85,7 +85,12 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) { } // mod returns true if the hash mod cycles == cycle. +// If cycles is 0 false is always returned. +// If cycles is 1 true is always returned (as expected). func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool { + if cycles <= 1 { + return cycles == 1 + } return uint32(xxhash.Sum64String(string(h)))%cycles == cycle%cycles } @@ -117,6 +122,16 @@ func (d *dataUsageCache) find(path string) *dataUsageEntry { return &due } +// findChildrenCopy returns a copy of the children of the supplied hash. +func (d *dataUsageCache) findChildrenCopy(h dataUsageHash) dataUsageHashMap { + ch := d.Cache[h.String()].Children + res := make(dataUsageHashMap, len(ch)) + for k := range ch { + res[k] = struct{}{} + } + return res +} + // Returns nil if not found. func (d *dataUsageCache) subCache(path string) dataUsageCache { dst := dataUsageCache{Info: dataUsageCacheInfo{ diff --git a/cmd/data-usage_test.go b/cmd/data-usage_test.go index 5255459ee..de7e804f0 100644 --- a/cmd/data-usage_test.go +++ b/cmd/data-usage_test.go @@ -360,6 +360,13 @@ func TestDataUsageUpdatePrefix(t *testing.T) { if err != nil { t.Fatal(err) } + if got.root() == nil { + t.Log("cached folders:") + for folder := range got.Cache { + t.Log("folder:", folder) + } + t.Fatal("got nil root.") + } // Test dirs var want = []struct { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 68e6acf77..141d470b8 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1611,10 +1611,10 @@ func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, op continue } - // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(s.drivesPerSet)) - for _, version := range entry.Versions { + // Wait and proceed if there are active requests + waitForLowHTTPReq(int32(s.drivesPerSet), time.Second) + if err := healObject(bucket, version.Name, version.VersionID); err != nil { return toObjectErr(err, bucket, version.Name) } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 7c7bbfe82..9ffe3fea1 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -1903,7 +1903,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results } // HealObjectFn closure function heals the object. -type HealObjectFn func(string, string, string) error +type HealObjectFn func(bucket, object, versionID string) error func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { var zonesEntryChs [][]FileInfoVersionsCh @@ -1928,28 +1928,37 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) } + // If listing did not return any entries upon first attempt, we + // return `ObjectNotFound`, to indicate the caller for any + // actions they may want to take as if `prefix` is missing. + err := toObjectErr(errFileNotFound, bucket, prefix) for { entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { break } + // Indicate that first attempt was a success and subsequent loop + // knows that its not our first attempt at 'prefix' + err = nil + if zoneIndex >= len(zoneDrivesPerSet) || zoneIndex < 0 { + return fmt.Errorf("invalid zone index returned: %d", zoneIndex) + } if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan { // Skip good entries. continue } - // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex])) - for _, version := range entry.Versions { + // Wait and proceed if there are active requests + waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]), time.Second) if err := healObject(bucket, version.Name, version.VersionID); err != nil { return toObjectErr(err, bucket, version.Name) } } } - return nil + return err } func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { diff --git a/cmd/erasure.go b/cmd/erasure.go index 50a6254a2..95e6099bf 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -26,7 +26,6 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" - "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" @@ -294,20 +293,12 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc } } - // Add existing buckets if changes or lifecycles. + // Add existing buckets. for _, b := range buckets { e := oldCache.find(b.Name) if e != nil { cache.replace(b.Name, dataUsageRoot, *e) - lc, err := globalLifecycleSys.Get(b.Name) - activeLC := err == nil && lc.HasActiveRules("", true) - if activeLC || bf == nil || bf.containsDir(b.Name) { - bucketCh <- b - } else { - if intDataUpdateTracker.debug { - logger.Info(color.Green("crawlAndGetDataUsage:")+" Skipping bucket %v, not updated", b.Name) - } - } + bucketCh <- b } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index fc5d55bee..2527d4486 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -352,6 +352,8 @@ func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataU if fiErr != nil { return 0, errSkipFile } + // We cannot heal in FS mode. + item.heal = false oi := fsMeta.ToObjectInfo(bucket, object, fi) sz := item.applyActions(ctx, fs, actionMeta{oi: oi}) diff --git a/cmd/global-heal.go b/cmd/global-heal.go index d84cc8f67..a68cb6bd5 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -29,8 +29,6 @@ const ( // sleep for an hour after a lock timeout // before retrying to acquire lock again. leaderLockTimeoutSleepInterval = time.Hour - // heal entire namespace once in 30 days - healInterval = 30 * 24 * time.Hour ) var leaderLockTimeout = newDynamicTimeout(30*time.Second, time.Minute) @@ -86,7 +84,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { ScannedItemsCount: bgSeq.getScannedItemsCount(), LastHealActivity: bgSeq.lastHealActivity, HealDisks: healDisks, - NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)), + NextHealRound: UTCNow().Add(dataCrawlStartDelay), }, true } @@ -184,68 +182,3 @@ func deepHealObject(bucket, object, versionID string) { } } } - -// Returns the duration to the next background healing round -func durationToNextHealRound(lastHeal time.Time) time.Duration { - if lastHeal.IsZero() { - lastHeal = globalBootTime - } - - d := lastHeal.Add(healInterval).Sub(UTCNow()) - if d < 0 { - return time.Second - } - return d -} - -// Healing leader will take the charge of healing all erasure sets -func execLeaderTasks(ctx context.Context, z *erasureZones) { - // So that we don't heal immediately, but after one month. - lastScanTime := UTCNow() - // Get background heal sequence to send elements to heal - var bgSeq *healSequence - var ok bool - for { - bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if ok { - break - } - select { - case <-ctx.Done(): - return - case <-time.After(time.Second): - continue - } - } - for { - select { - case <-ctx.Done(): - return - case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C: - bgSeq.resetHealStatusCounters() - for _, zone := range z.zones { - // Heal set by set - for i, set := range zone.sets { - if err := healErasureSet(ctx, i, set, zone.drivesPerSet); err != nil { - logger.LogIf(ctx, err) - continue - } - } - } - lastScanTime = UTCNow() - } - } -} - -func startGlobalHeal(ctx context.Context, objAPI ObjectLayer) { - zones, ok := objAPI.(*erasureZones) - if !ok { - return - } - - execLeaderTasks(ctx, zones) -} - -func initGlobalHeal(ctx context.Context, objAPI ObjectLayer) { - go startGlobalHeal(ctx, objAPI) -} diff --git a/cmd/server-main.go b/cmd/server-main.go index ff9243b17..b3266dfe4 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -379,10 +379,6 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { // No unlock for "leader" lock. } - if globalIsErasure { - initGlobalHeal(ctx, objAPI) - } - initDataCrawler(ctx, objAPI) } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 80ccef71e..ee089e6a2 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -334,6 +334,7 @@ func (client *storageRESTClient) CheckParts(volume, path string, fi FileInfo) er var reader bytes.Buffer if err := gob.NewEncoder(&reader).Encode(fi); err != nil { + logger.LogIf(context.Background(), err) return err }