rename all references from crawl -> scanner (#11621)

This commit is contained in:
Harshavardhana
2021-02-26 15:11:42 -08:00
committed by GitHub
parent 6386b45c08
commit 9171d6ef65
23 changed files with 133 additions and 133 deletions

View File

@@ -43,9 +43,9 @@ import (
)
const (
dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders.
dataCrawlStartDelay = 1 * time.Minute // Time to wait on startup and between cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
dataScannerSleepPerFolder = time.Millisecond // Time to wait between folders.
dataScannerStartDelay = 1 * time.Minute // Time to wait on startup and between cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
healDeleteDangling = true
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
@@ -76,7 +76,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
for {
err := locker.GetLock(ctx, dataScannerLeaderLockTimeout)
if err != nil {
time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
time.Sleep(time.Duration(r.Float64() * float64(dataScannerStartDelay)))
continue
}
break
@@ -100,16 +100,16 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
br.Close()
}
crawlTimer := time.NewTimer(dataCrawlStartDelay)
defer crawlTimer.Stop()
scannerTimer := time.NewTimer(dataScannerStartDelay)
defer scannerTimer.Stop()
for {
select {
case <-ctx.Done():
return
case <-crawlTimer.C:
case <-scannerTimer.C:
// Reset the timer for next cycle.
crawlTimer.Reset(dataCrawlStartDelay)
scannerTimer.Reset(dataScannerStartDelay)
if intDataUpdateTracker.debug {
console.Debugln("starting scanner cycle")
@@ -120,7 +120,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
logger.LogIf(ctx, err)
err = objAPI.CrawlAndGetDataUsage(ctx, bf, results)
err = objAPI.NSScanner(ctx, bf, results)
close(results)
logger.LogIf(ctx, err)
if err == nil {
@@ -156,27 +156,27 @@ type folderScanner struct {
newCache dataUsageCache
withFilter *bloomFilter
dataUsageCrawlDebug bool
healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
dataUsageScannerDebug bool
healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
newFolders []cachedFolder
existingFolders []cachedFolder
disks []StorageAPI
}
// crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache.
// scanDataFolder will scanner the basepath+cache.Info.Name and return an updated cache.
// The returned cache will always be valid, but may not be updated from the existing.
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance.
func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
t := UTCNow()
logPrefix := color.Green("data-usage: ")
logSuffix := color.Blue("- %v + %v", basePath, cache.Info.Name)
if intDataUpdateTracker.debug {
defer func() {
console.Debugf(logPrefix+" Crawl time: %v %s\n", time.Since(t), logSuffix)
console.Debugf(logPrefix+" Scanner time: %v %s\n", time.Since(t), logSuffix)
}()
}
@@ -189,15 +189,15 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
skipHeal := cache.Info.SkipHealing
s := folderScanner{
root: basePath,
getSize: getSize,
oldCache: cache,
newCache: dataUsageCache{Info: cache.Info},
newFolders: nil,
existingFolders: nil,
dataUsageCrawlDebug: intDataUpdateTracker.debug,
healFolderInclude: 0,
healObjectSelect: 0,
root: basePath,
getSize: getSize,
oldCache: cache,
newCache: dataUsageCache{Info: cache.Info},
newFolders: nil,
existingFolders: nil,
dataUsageScannerDebug: intDataUpdateTracker.debug,
healFolderInclude: 0,
healObjectSelect: 0,
}
// Add disks for set healing.
@@ -227,21 +227,21 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
s.withFilter = nil
}
}
if s.dataUsageCrawlDebug {
console.Debugf(logPrefix+"Start crawling. Bloom filter: %v %s\n", s.withFilter != nil, logSuffix)
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Start scanning. Bloom filter: %v %s\n", s.withFilter != nil, logSuffix)
}
done := ctx.Done()
var flattenLevels = 2
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Cycle: %v, Entries: %v %s\n", cache.Info.NextCycle, len(cache.Cache), logSuffix)
}
// Always scan flattenLevels deep. Cache root is level 0.
todo := []cachedFolder{{name: cache.Info.Name, objectHealProbDiv: 1}}
for i := 0; i < flattenLevels; i++ {
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Level %v, scanning %v directories. %s\n", i, len(todo), logSuffix)
}
select {
@@ -257,7 +257,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
}
}
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"New folders: %v %s\n", s.newFolders, logSuffix)
}
@@ -286,7 +286,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
}
}
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Existing folders: %v %s\n", len(s.existingFolders), logSuffix)
}
@@ -313,13 +313,13 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
// If folder isn't in filter, skip it completely.
if !s.withFilter.containsDir(folder.name) {
if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) {
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Skipping non-updated folder: %v %s\n", folder, logSuffix)
}
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
continue
} else {
if s.dataUsageCrawlDebug {
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Adding non-updated folder to heal check: %v %s\n", folder.name, logSuffix)
}
// Update probability of including objects
@@ -341,8 +341,8 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
}
s.newCache.replaceHashed(h, folder.parent, *du)
}
if s.dataUsageCrawlDebug {
console.Debugf(logPrefix+"Finished crawl, %v entries %s\n", len(s.newCache.Cache), logSuffix)
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Finished scanner, %v entries %s\n", len(s.newCache.Cache), logSuffix)
}
s.newCache.Info.LastUpdate = UTCNow()
s.newCache.Info.NextCycle++
@@ -371,7 +371,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
var activeLifeCycle *lifecycle.Lifecycle
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
}
activeLifeCycle = f.oldCache.Info.lifeCycle
@@ -382,12 +382,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
if !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
}
continue
} else {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
}
// If probability was already scannerHealFolderInclude, keep it.
@@ -395,7 +395,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
}
}
scannerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
cache := dataUsageEntry{}
@@ -404,14 +404,14 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
entName = path.Clean(path.Join(folder.name, entName))
bucket, prefix := path2BucketObjectWithBasePath(f.root, entName)
if bucket == "" {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" no bucket (%s,%s)\n", f.root, entName)
}
return errDoneForNow
}
if isReservedOrInvalidBucket(bucket, false) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" invalid bucket: %v, entry: %v\n", bucket, entName)
}
return errDoneForNow
@@ -450,13 +450,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
wait := scannerSleeper.Timer(ctx)
// Get file size, ignore errors.
item := crawlItem{
item := scannerItem{
Path: path.Join(f.root, entName),
Typ: typ,
bucket: bucket,
prefix: path.Dir(prefix),
objectName: path.Base(entName),
debug: f.dataUsageCrawlDebug,
debug: f.dataUsageScannerDebug,
lifeCycle: activeLifeCycle,
heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
}
@@ -532,7 +532,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
healObjectsPrefix := color.Green("healObjects:")
for k := range existing {
bucket, prefix := path2BucketObject(k)
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
}
@@ -552,13 +552,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
minDisks: len(f.disks), // We want full consistency.
// Weird, maybe transient error.
agreed: func(entry metaCacheEntry) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" got agreement: %v\n", entry.name)
}
},
// Some disks have data for this.
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" got partial, %d agreed, errs: %v\n", nAgreed, errs)
}
@@ -580,7 +580,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
entry, _ = entries.firstFound()
}
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" resolved to: %v, dir: %v\n", entry.name, entry.isDir())
}
@@ -618,20 +618,20 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
},
// Too many disks failed.
finished: func(errs []error) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" too many errors: %v\n", errs)
}
cancel()
},
})
if f.dataUsageCrawlDebug && err != nil && err != errFileNotFound {
if f.dataUsageScannerDebug && err != nil && err != errFileNotFound {
console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
}
// If we found one or more disks with this folder, delete it.
if err == nil && dangling {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix)
}
@@ -690,7 +690,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder,
dirStack = append(dirStack, entName)
err := readDirFn(path.Join(dirStack...), addDir)
dirStack = dirStack[:len(dirStack)-1]
scannerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
return err
}
@@ -705,19 +705,19 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder,
bucket, prefix := path2BucketObjectWithBasePath(f.root, fileName)
var activeLifeCycle *lifecycle.Lifecycle
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix, false) {
if f.dataUsageCrawlDebug {
if f.dataUsageScannerDebug {
console.Debugf(deepScannerLogPrefix+" Prefix %q has active rules\n", prefix)
}
activeLifeCycle = f.oldCache.Info.lifeCycle
}
item := crawlItem{
item := scannerItem{
Path: fileName,
Typ: typ,
bucket: bucket,
prefix: path.Dir(prefix),
objectName: path.Base(entName),
debug: f.dataUsageCrawlDebug,
debug: f.dataUsageScannerDebug,
lifeCycle: activeLifeCycle,
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
}
@@ -752,8 +752,8 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder,
return &cache, nil
}
// crawlItem represents each file while walking.
type crawlItem struct {
// scannerItem represents each file while walking.
type scannerItem struct {
Path string
Typ os.FileMode
@@ -773,10 +773,10 @@ type sizeSummary struct {
replicaSize int64
}
type getSizeFn func(item crawlItem) (sizeSummary, error)
type getSizeFn func(item scannerItem) (sizeSummary, error)
// transformMetaDir will transform a directory to prefix/file.ext
func (i *crawlItem) transformMetaDir() {
func (i *scannerItem) transformMetaDir() {
split := strings.Split(i.prefix, SlashSeparator)
if len(split) > 1 {
i.prefix = path.Join(split[:len(split)-1]...)
@@ -799,7 +799,7 @@ var applyActionsLogPrefix = color.Green("applyActions:")
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
size, err := meta.oi.GetActualSize()
if i.debug {
logger.LogIf(ctx, err)
@@ -1052,12 +1052,12 @@ func applyLifecycleAction(ctx context.Context, action lifecycle.Action, objLayer
}
// objectPath returns the prefix and object name.
func (i *crawlItem) objectPath() string {
func (i *scannerItem) objectPath() string {
return path.Join(i.prefix, i.objectName)
}
// healReplication will heal a scanned item that has failed replication.
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
// heal delete marker replication failure or versioned delete replication failure
if oi.ReplicationStatus == replication.Pending ||
@@ -1082,7 +1082,7 @@ func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, oi Objec
}
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
// handle soft delete and permanent delete failures here.
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
versionID := ""