diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 1cd1be4ee..583eccd45 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -657,6 +657,8 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) { } func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { + opts := globalHealConfig + // Send heal request task := healTask{ bucket: source.bucket, @@ -667,8 +669,15 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem } if source.opts != nil { task.opts = *source.opts + } else { + if opts.Bitrot { + task.opts.ScanMode = madmin.HealDeepScan + } } + // Wait and proceed if there are active requests + waitForLowHTTPReq(opts.IOCount, opts.Sleep) + h.mutex.Lock() h.scannedItemsMap[healType]++ h.lastHealActivity = UTCNow() diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 2bd487f38..a4886d64f 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/madmin" ) @@ -53,20 +54,28 @@ func (h *healRoutine) queueHealTask(task healTask) { h.tasks <- task } -func waitForLowHTTPReq(tolerance int32, maxWait time.Duration) { - const wait = 10 * time.Millisecond - waitCount := maxWait / wait +func waitForLowHTTPReq(tolerance int, maxWait time.Duration) { + // At max 10 attempts to wait with 100 millisecond interval before proceeding + waitCount := 10 + waitTick := 100 * time.Millisecond // Bucket notification and http trace are not costly, it is okay to ignore them // while counting the number of concurrent connections - tolerance += int32(globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers()) + toleranceFn := func() int { + return tolerance + globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers() + } if httpServer := newHTTPServerFn(); httpServer != nil { // Any requests in progress, delay the heal. - for (httpServer.GetRequestCount() >= tolerance) && - waitCount > 0 { + for httpServer.GetRequestCount() >= toleranceFn() { + time.Sleep(waitTick) waitCount-- - time.Sleep(wait) + if waitCount == 0 { + if intDataUpdateTracker.debug { + logger.Info("waitForLowHTTPReq: waited %d times, resuming", waitCount) + } + break + } } } } @@ -80,9 +89,6 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { break } - // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) - var res madmin.HealResultItem var err error switch { diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 5cc52f603..0e273f075 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -124,8 +124,6 @@ wait: case <-ctx.Done(): return case <-time.After(defaultMonitorNewDiskInterval): - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) - var erasureSetInZoneDisksToHeal []map[int][]StorageAPI healDisks := globalBackgroundHealState.getHealLocalDisks() diff --git a/cmd/config/heal/heal.go b/cmd/config/heal/heal.go index 17d35b2fe..a0e37ca9e 100644 --- a/cmd/config/heal/heal.go +++ b/cmd/config/heal/heal.go @@ -17,20 +17,32 @@ package heal import ( - "errors" + "fmt" + "strconv" + "time" "github.com/minio/minio/cmd/config" + "github.com/minio/minio/pkg/env" ) // Compression environment variables const ( - Bitrot = "bitrotscan" + Bitrot = "bitrotscan" + Sleep = "max_sleep" + IOCount = "max_io" + + EnvBitrot = "MINIO_HEAL_BITROTSCAN" + EnvSleep = "MINIO_HEAL_MAX_SLEEP" + EnvIOCount = "MINIO_HEAL_MAX_IO" ) // Config represents the heal settings. type Config struct { // Bitrot will perform bitrot scan on local disk when checking objects. Bitrot bool `json:"bitrotscan"` + // maximum sleep duration between objects to slow down heal operation. + Sleep time.Duration `json:"sleep"` + IOCount int `json:"iocount"` } var ( @@ -40,6 +52,14 @@ var ( Key: Bitrot, Value: config.EnableOff, }, + config.KV{ + Key: Sleep, + Value: "1s", + }, + config.KV{ + Key: IOCount, + Value: "10", + }, } // Help provides help for config values @@ -50,6 +70,18 @@ var ( Optional: true, Type: "on|off", }, + config.HelpKV{ + Key: Sleep, + Description: `maximum sleep duration between objects to slow down heal operation. eg. 2s`, + Optional: true, + Type: "duration", + }, + config.HelpKV{ + Key: IOCount, + Description: `maximum IO requests allowed between objects to slow down heal operation. eg. 3`, + Optional: true, + Type: "int", + }, } ) @@ -58,10 +90,17 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { if err = config.CheckValidKeys(config.HealSubSys, kvs, DefaultKVS); err != nil { return cfg, err } - bitrot := kvs.Get(Bitrot) - if bitrot != config.EnableOn && bitrot != config.EnableOff { - return cfg, errors.New(Bitrot + ": must be 'on' or 'off'") + cfg.Bitrot, err = config.ParseBool(env.Get(EnvBitrot, kvs.Get(Bitrot))) + if err != nil { + return cfg, fmt.Errorf("'heal:bitrotscan' value invalid: %w", err) + } + cfg.Sleep, err = time.ParseDuration(env.Get(EnvSleep, kvs.Get(Sleep))) + if err != nil { + return cfg, fmt.Errorf("'heal:max_sleep' value invalid: %w", err) + } + cfg.IOCount, err = strconv.Atoi(env.Get(EnvIOCount, kvs.Get(IOCount))) + if err != nil { + return cfg, fmt.Errorf("'heal:max_io' value invalid: %w", err) } - cfg.Bitrot = bitrot == config.EnableOn return cfg, nil } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 461068004..536df90fe 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -153,7 +153,7 @@ type folderScanner struct { // crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache. // The returned cache will always be valid, but may not be updated from the existing. -// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler. +// Before each operation sleepDuration is called which can be used to temporarily halt the crawler. // If the supplied context is canceled the function will return at the first chance. func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) { t := UTCNow() @@ -507,6 +507,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // Dynamic time delay. t := UTCNow() + resolver.bucket = bucket foundObjs := false @@ -604,12 +605,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // If we have quorum, found directories, but no objects, issue heal to delete the dangling. objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: true}, func(bucket, object, versionID string) error { - // Wait for each heal as per crawler frequency. - sleepDuration(time.Since(t), f.dataUsageCrawlMult) - - defer func() { - t = UTCNow() - }() return bgSeq.queueHealTask(healSource{ bucket: bucket, object: object, diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index ce0ac6998..9228e34e1 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -55,8 +55,6 @@ type FSObjects struct { // The count of concurrent calls on FSObjects API activeIOCount int64 - // The active IO count ceiling for crawling to work - maxActiveIOCount int64 // Path to be exported over S3 API. fsPath string @@ -168,8 +166,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { listPool: NewTreeWalkPool(globalLookupTimeout), appendFileMap: make(map[string]*fsAppendFile), diskMount: mountinfo.IsLikelyMountPoint(fsPath), - - maxActiveIOCount: 10, } // Once the filesystem has initialized hold the read lock for @@ -230,12 +226,6 @@ func (fs *FSObjects) StorageInfo(ctx context.Context, _ bool) (StorageInfo, []er return storageInfo, nil } -func (fs *FSObjects) waitForLowActiveIO() { - for atomic.LoadInt64(&fs.activeIOCount) >= fs.maxActiveIOCount { - time.Sleep(lowActiveIOWaitTick) - } -} - // CrawlAndGetDataUsage returns data usage stats of the current FS deployment func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { // Load bucket totals diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 8a2cad8b3..5f1660dc4 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -186,7 +186,10 @@ func deepHealObject(bucket, object, versionID string) { bucket: bucket, object: object, versionID: versionID, - opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan}, + opts: &madmin.HealOpts{ + Remove: true, // if found dangling purge it. + ScanMode: madmin.HealDeepScan, + }, } } } diff --git a/cmd/http/server.go b/cmd/http/server.go index e6edd0854..49c38ad49 100644 --- a/cmd/http/server.go +++ b/cmd/http/server.go @@ -57,8 +57,8 @@ type Server struct { } // GetRequestCount - returns number of request in progress. -func (srv *Server) GetRequestCount() int32 { - return atomic.LoadInt32(&srv.requestCount) +func (srv *Server) GetRequestCount() int { + return int(atomic.LoadInt32(&srv.requestCount)) } // Start - start HTTP server diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index f003d597b..1f828b22d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -63,12 +63,6 @@ const ( // Size of each buffer. readAheadBufSize = 1 << 20 - // Wait interval to check if active IO count is low - // to proceed crawling to compute data usage. - // Wait up to lowActiveIOWaitMaxN times. - lowActiveIOWaitTick = 100 * time.Millisecond - lowActiveIOWaitMaxN = 10 - // XL metadata file carries per object metadata. xlStorageFormatFile = "xl.meta" ) @@ -90,8 +84,7 @@ func isValidVolname(volname string) bool { // xlStorage - implements StorageAPI interface. type xlStorage struct { - maxActiveIOCount int32 - activeIOCount int32 + activeIOCount int32 diskPath string endpoint Endpoint @@ -262,13 +255,8 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { }, }, globalSync: env.Get(config.EnvFSOSync, config.EnableOff) == config.EnableOn, - // Allow disk usage crawler to run with up to 2 concurrent - // I/O ops, if and when activeIOCount reaches this - // value disk usage routine suspends the crawler - // and waits until activeIOCount reaches below this threshold. - maxActiveIOCount: 3, - ctx: GlobalContext, - rootDisk: rootDisk, + ctx: GlobalContext, + rootDisk: rootDisk, } // Success. @@ -336,20 +324,6 @@ func (s *xlStorage) Healing() bool { return err == nil } -func (s *xlStorage) waitForLowActiveIO() { - max := lowActiveIOWaitMaxN - for atomic.LoadInt32(&s.activeIOCount) >= s.maxActiveIOCount { - time.Sleep(lowActiveIOWaitTick) - max-- - if max == 0 { - if intDataUpdateTracker.debug { - logger.Info("waitForLowActiveIO: waited %d times, resuming", lowActiveIOWaitMaxN) - } - break - } - } -} - func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { // Check if the current bucket has a configured lifecycle policy lc, err := globalLifecycleSys.Get(cache.Info.Name) @@ -402,24 +376,18 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac if !version.Deleted { // Bitrot check local data if size > 0 && item.heal && opts.Bitrot { - s.waitForLowActiveIO() - err := s.VerifyFile(ctx, item.bucket, item.objectPath(), version) - switch err { - case errFileCorrupt: - res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ - Remove: healDeleteDangling, - ScanMode: madmin.HealDeepScan, - }) - if err != nil { - if !errors.Is(err, NotImplemented{}) { - logger.LogIf(ctx, err) - } - size = 0 - } else { - size = res.ObjectSize + // HealObject verifies bitrot requirement internally + res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ + Remove: healDeleteDangling, + ScanMode: madmin.HealDeepScan, + }) + if err != nil { + if !errors.Is(err, NotImplemented{}) { + logger.LogIf(ctx, err) } - default: - // VerifyFile already logs errors + size = 0 + } else { + size = res.ObjectSize } } totalSize += size