From 3f4488c589d1eca5c3249709fbf9aef1871c343b Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Tue, 2 Jan 2024 13:51:24 -0800 Subject: [PATCH] scanner: Allow full throttle if there is no parallel disk ops (#18109) --- cmd/data-scanner.go | 27 ++++++++++++++++++++------- cmd/data-usage_test.go | 16 +++++++++++----- cmd/erasure.go | 2 +- cmd/naughty-disk_test.go | 7 +++++-- cmd/storage-interface.go | 4 ++-- cmd/storage-rest-client.go | 2 +- cmd/storage-rest-server.go | 2 +- cmd/xl-storage-disk-id-check.go | 10 ++++++++-- cmd/xl-storage.go | 4 ++-- 9 files changed, 51 insertions(+), 23 deletions(-) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 9eeeddbc6..051a8d92a 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -246,6 +246,8 @@ type folderScanner struct { healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude scanMode madmin.HealScanMode + weSleep func() bool + disks []StorageAPI disksQuorum int @@ -299,7 +301,7 @@ type folderScanner struct { // The returned cache will always be valid, but may not be updated from the existing. // Before each operation sleepDuration is called which can be used to temporarily halt the scanner. // If the supplied context is canceled the function will return at the first chance. -func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) { +func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) { switch cache.Info.Name { case "", dataUsageRoot: return cache, errors.New("internal error: root scan attempted") @@ -316,6 +318,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, ca dataUsageScannerDebug: false, healObjectSelect: 0, scanMode: scanMode, + weSleep: weSleep, updates: cache.Info.updates, updateCurrentPath: updatePath, disks: disks, @@ -372,6 +375,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int done := ctx.Done() scannerLogPrefix := color.Green("folder-scanner:") + noWait := func() {} + thisHash := hashPath(folder.name) // Store initial compaction state. wasCompacted := into.Compacted @@ -401,8 +406,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) { replicationCfg = f.oldCache.Info.replication } - // Check if we can skip it due to bloom filter... - scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder) + + if f.weSleep() { + scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder) + } var existingFolders, newFolders []cachedFolder var foundObjects bool @@ -453,8 +460,11 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int return nil } - // Dynamic time delay. - wait := scannerSleeper.Timer(ctx) + wait := noWait + if f.weSleep() { + // Dynamic time delay. + wait = scannerSleeper.Timer(ctx) + } // Get file size, ignore errors. item := scannerItem{ @@ -704,8 +714,11 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int // this object might be dangling. entry, _ = entries.firstFound() } - // wait timer per object. - wait := scannerSleeper.Timer(ctx) + wait := noWait + if f.weSleep() { + // wait timer per object. + wait = scannerSleeper.Timer(ctx) + } defer wait() f.updateCurrentPath(entry.name) stopFn := globalScannerMetrics.log(scannerMetricHealAbandonedObject, f.root, entry.name) diff --git a/cmd/data-usage_test.go b/cmd/data-usage_test.go index 17a4cfc34..ae6cbd700 100644 --- a/cmd/data-usage_test.go +++ b/cmd/data-usage_test.go @@ -62,7 +62,9 @@ func TestDataUsageUpdate(t *testing.T) { return } - got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0) + weSleep := func() bool { return false } + + got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep) if err != nil { t.Fatal(err) } @@ -173,7 +175,7 @@ func TestDataUsageUpdate(t *testing.T) { } // Changed dir must be picked up in this many cycles. for i := 0; i < dataUsageUpdateDirCycles; i++ { - got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0) + got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep) got.Info.NextCycle++ if err != nil { t.Fatal(err) @@ -280,7 +282,10 @@ func TestDataUsageUpdatePrefix(t *testing.T) { } return } - got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0) + + weSleep := func() bool { return false } + + got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep) if err != nil { t.Fatal(err) } @@ -414,7 +419,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) { } // Changed dir must be picked up in this many cycles. for i := 0; i < dataUsageUpdateDirCycles; i++ { - got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0) + got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep) got.Info.NextCycle++ if err != nil { t.Fatal(err) @@ -562,7 +567,8 @@ func TestDataUsageCacheSerialize(t *testing.T) { } return } - want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0) + weSleep := func() bool { return false } + want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure.go b/cmd/erasure.go index a5b820572..d4d42dec9 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -522,7 +522,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa // Calc usage before := cache.Info.LastUpdate var err error - cache, err = disk.NSScanner(ctx, cache, updates, healScanMode) + cache, err = disk.NSScanner(ctx, cache, updates, healScanMode, nil) if err != nil { if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) { logger.LogIf(ctx, cache.save(ctx, er, cacheName)) diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 701632f3a..ecb48a523 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -112,8 +112,11 @@ func (d *naughtyDisk) SetDiskID(id string) { d.disk.SetDiskID(id) } -func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (info dataUsageCache, err error) { - return d.disk.NSScanner(ctx, cache, updates, scanMode) +func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, weSleep func() bool) (info dataUsageCache, err error) { + if err := d.calcError(); err != nil { + return info, err + } + return d.disk.NSScanner(ctx, cache, updates, scanMode, weSleep) } func (d *naughtyDisk) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index ee9d27809..8c3478519 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -66,7 +66,7 @@ type StorageAPI interface { // has never been replaced. Healing() *healingTracker DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) - NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) + NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error) // Volume operations. MakeVol(ctx context.Context, volume string) (err error) @@ -147,7 +147,7 @@ func (p *unrecognizedDisk) Healing() *healingTracker { return nil } -func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { +func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error) { return dataUsageCache{}, errDiskNotFound } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 6d7232787..56ba937e0 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -213,7 +213,7 @@ func (client *storageRESTClient) Healing() *healingTracker { return nil } -func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { +func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) { atomic.AddInt32(&client.scanning, 1) defer atomic.AddInt32(&client.scanning, -1) defer close(updates) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 34b20dd11..5b405cbdb 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -241,7 +241,7 @@ func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScan out <- resp } }() - ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode)) + ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode), nil) wg.Wait() if err != nil { return grid.NewRemoteErr(err) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 904b4e285..aa365ff10 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -274,7 +274,7 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker { return p.storage.Healing() } -func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { +func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) { if contextCanceled(ctx) { close(updates) return dataUsageCache{}, ctx.Err() @@ -284,7 +284,13 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac close(updates) return dataUsageCache{}, err } - return p.storage.NSScanner(ctx, cache, updates, scanMode) + + weSleep := func() bool { + // Entire queue is full, so we sleep. + return cap(p.health.tokens) == len(p.health.tokens) + } + + return p.storage.NSScanner(ctx, cache, updates, scanMode, weSleep) } func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3aa940709..2356e0ed8 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -459,7 +459,7 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, return buf, err } -func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) { +func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) { atomic.AddInt32(&s.scanning, 1) defer atomic.AddInt32(&s.scanning, -1) @@ -663,7 +663,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates return sizeSummary{}, errIgnoreFileContrib } return sizeS, nil - }, scanMode) + }, scanMode, weSleep) if err != nil { return dataUsageInfo, err }