mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
scanner: Allow full throttle if there is no parallel disk ops (#18109)
This commit is contained in:
parent
9434fff215
commit
3f4488c589
@ -246,6 +246,8 @@ type folderScanner struct {
|
|||||||
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
||||||
scanMode madmin.HealScanMode
|
scanMode madmin.HealScanMode
|
||||||
|
|
||||||
|
weSleep func() bool
|
||||||
|
|
||||||
disks []StorageAPI
|
disks []StorageAPI
|
||||||
disksQuorum int
|
disksQuorum int
|
||||||
|
|
||||||
@ -299,7 +301,7 @@ type folderScanner struct {
|
|||||||
// The returned cache will always be valid, but may not be updated from the existing.
|
// The returned cache will always be valid, but may not be updated from the existing.
|
||||||
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
||||||
// If the supplied context is canceled the function will return at the first chance.
|
// If the supplied context is canceled the function will return at the first chance.
|
||||||
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) {
|
||||||
switch cache.Info.Name {
|
switch cache.Info.Name {
|
||||||
case "", dataUsageRoot:
|
case "", dataUsageRoot:
|
||||||
return cache, errors.New("internal error: root scan attempted")
|
return cache, errors.New("internal error: root scan attempted")
|
||||||
@ -316,6 +318,7 @@ func scanDataFolder(ctx context.Context, disks []StorageAPI, basePath string, ca
|
|||||||
dataUsageScannerDebug: false,
|
dataUsageScannerDebug: false,
|
||||||
healObjectSelect: 0,
|
healObjectSelect: 0,
|
||||||
scanMode: scanMode,
|
scanMode: scanMode,
|
||||||
|
weSleep: weSleep,
|
||||||
updates: cache.Info.updates,
|
updates: cache.Info.updates,
|
||||||
updateCurrentPath: updatePath,
|
updateCurrentPath: updatePath,
|
||||||
disks: disks,
|
disks: disks,
|
||||||
@ -372,6 +375,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||||||
done := ctx.Done()
|
done := ctx.Done()
|
||||||
scannerLogPrefix := color.Green("folder-scanner:")
|
scannerLogPrefix := color.Green("folder-scanner:")
|
||||||
|
|
||||||
|
noWait := func() {}
|
||||||
|
|
||||||
thisHash := hashPath(folder.name)
|
thisHash := hashPath(folder.name)
|
||||||
// Store initial compaction state.
|
// Store initial compaction state.
|
||||||
wasCompacted := into.Compacted
|
wasCompacted := into.Compacted
|
||||||
@ -401,8 +406,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||||||
if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
|
if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
|
||||||
replicationCfg = f.oldCache.Info.replication
|
replicationCfg = f.oldCache.Info.replication
|
||||||
}
|
}
|
||||||
// Check if we can skip it due to bloom filter...
|
|
||||||
|
if f.weSleep() {
|
||||||
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
|
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
|
||||||
|
}
|
||||||
|
|
||||||
var existingFolders, newFolders []cachedFolder
|
var existingFolders, newFolders []cachedFolder
|
||||||
var foundObjects bool
|
var foundObjects bool
|
||||||
@ -453,8 +460,11 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wait := noWait
|
||||||
|
if f.weSleep() {
|
||||||
// Dynamic time delay.
|
// Dynamic time delay.
|
||||||
wait := scannerSleeper.Timer(ctx)
|
wait = scannerSleeper.Timer(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
// Get file size, ignore errors.
|
// Get file size, ignore errors.
|
||||||
item := scannerItem{
|
item := scannerItem{
|
||||||
@ -704,8 +714,11 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
|
|||||||
// this object might be dangling.
|
// this object might be dangling.
|
||||||
entry, _ = entries.firstFound()
|
entry, _ = entries.firstFound()
|
||||||
}
|
}
|
||||||
|
wait := noWait
|
||||||
|
if f.weSleep() {
|
||||||
// wait timer per object.
|
// wait timer per object.
|
||||||
wait := scannerSleeper.Timer(ctx)
|
wait = scannerSleeper.Timer(ctx)
|
||||||
|
}
|
||||||
defer wait()
|
defer wait()
|
||||||
f.updateCurrentPath(entry.name)
|
f.updateCurrentPath(entry.name)
|
||||||
stopFn := globalScannerMetrics.log(scannerMetricHealAbandonedObject, f.root, entry.name)
|
stopFn := globalScannerMetrics.log(scannerMetricHealAbandonedObject, f.root, entry.name)
|
||||||
|
@ -62,7 +62,9 @@ func TestDataUsageUpdate(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
|
weSleep := func() bool { return false }
|
||||||
|
|
||||||
|
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -173,7 +175,7 @@ func TestDataUsageUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Changed dir must be picked up in this many cycles.
|
// Changed dir must be picked up in this many cycles.
|
||||||
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
||||||
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0)
|
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep)
|
||||||
got.Info.NextCycle++
|
got.Info.NextCycle++
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -280,7 +282,10 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0)
|
|
||||||
|
weSleep := func() bool { return false }
|
||||||
|
|
||||||
|
got, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0, weSleep)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -414,7 +419,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Changed dir must be picked up in this many cycles.
|
// Changed dir must be picked up in this many cycles.
|
||||||
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
for i := 0; i < dataUsageUpdateDirCycles; i++ {
|
||||||
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0)
|
got, err = scanDataFolder(context.Background(), nil, base, got, getSize, 0, weSleep)
|
||||||
got.Info.NextCycle++
|
got.Info.NextCycle++
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -562,7 +567,8 @@ func TestDataUsageCacheSerialize(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
|
weSleep := func() bool { return false }
|
||||||
|
want, err := scanDataFolder(context.Background(), nil, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0, weSleep)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -522,7 +522,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
|
|||||||
// Calc usage
|
// Calc usage
|
||||||
before := cache.Info.LastUpdate
|
before := cache.Info.LastUpdate
|
||||||
var err error
|
var err error
|
||||||
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
|
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
|
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
|
||||||
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
|
||||||
|
@ -112,8 +112,11 @@ func (d *naughtyDisk) SetDiskID(id string) {
|
|||||||
d.disk.SetDiskID(id)
|
d.disk.SetDiskID(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (info dataUsageCache, err error) {
|
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, weSleep func() bool) (info dataUsageCache, err error) {
|
||||||
return d.disk.NSScanner(ctx, cache, updates, scanMode)
|
if err := d.calcError(); err != nil {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
return d.disk.NSScanner(ctx, cache, updates, scanMode, weSleep)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) {
|
func (d *naughtyDisk) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) {
|
||||||
|
@ -66,7 +66,7 @@ type StorageAPI interface {
|
|||||||
// has never been replaced.
|
// has never been replaced.
|
||||||
Healing() *healingTracker
|
Healing() *healingTracker
|
||||||
DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error)
|
DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error)
|
||||||
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error)
|
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error)
|
||||||
|
|
||||||
// Volume operations.
|
// Volume operations.
|
||||||
MakeVol(ctx context.Context, volume string) (err error)
|
MakeVol(ctx context.Context, volume string) (err error)
|
||||||
@ -147,7 +147,7 @@ func (p *unrecognizedDisk) Healing() *healingTracker {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error) {
|
||||||
return dataUsageCache{}, errDiskNotFound
|
return dataUsageCache{}, errDiskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +213,7 @@ func (client *storageRESTClient) Healing() *healingTracker {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) {
|
||||||
atomic.AddInt32(&client.scanning, 1)
|
atomic.AddInt32(&client.scanning, 1)
|
||||||
defer atomic.AddInt32(&client.scanning, -1)
|
defer atomic.AddInt32(&client.scanning, -1)
|
||||||
defer close(updates)
|
defer close(updates)
|
||||||
|
@ -241,7 +241,7 @@ func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScan
|
|||||||
out <- resp
|
out <- resp
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode))
|
ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode), nil)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return grid.NewRemoteErr(err)
|
return grid.NewRemoteErr(err)
|
||||||
|
@ -274,7 +274,7 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
|
|||||||
return p.storage.Healing()
|
return p.storage.Healing()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, _ func() bool) (dataUsageCache, error) {
|
||||||
if contextCanceled(ctx) {
|
if contextCanceled(ctx) {
|
||||||
close(updates)
|
close(updates)
|
||||||
return dataUsageCache{}, ctx.Err()
|
return dataUsageCache{}, ctx.Err()
|
||||||
@ -284,7 +284,13 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
|
|||||||
close(updates)
|
close(updates)
|
||||||
return dataUsageCache{}, err
|
return dataUsageCache{}, err
|
||||||
}
|
}
|
||||||
return p.storage.NSScanner(ctx, cache, updates, scanMode)
|
|
||||||
|
weSleep := func() bool {
|
||||||
|
// Entire queue is full, so we sleep.
|
||||||
|
return cap(p.health.tokens) == len(p.health.tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.storage.NSScanner(ctx, cache, updates, scanMode, weSleep)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||||
|
@ -459,7 +459,7 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte,
|
|||||||
return buf, err
|
return buf, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) {
|
||||||
atomic.AddInt32(&s.scanning, 1)
|
atomic.AddInt32(&s.scanning, 1)
|
||||||
defer atomic.AddInt32(&s.scanning, -1)
|
defer atomic.AddInt32(&s.scanning, -1)
|
||||||
|
|
||||||
@ -663,7 +663,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
|||||||
return sizeSummary{}, errIgnoreFileContrib
|
return sizeSummary{}, errIgnoreFileContrib
|
||||||
}
|
}
|
||||||
return sizeS, nil
|
return sizeS, nil
|
||||||
}, scanMode)
|
}, scanMode, weSleep)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dataUsageInfo, err
|
return dataUsageInfo, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user