mirror of
https://github.com/minio/minio.git
synced 2025-11-06 20:33:07 -05:00
fix: rename crawler as scanner in config (#11549)
This commit is contained in:
@@ -50,10 +50,14 @@ import (
|
||||
var serverDebugLog = env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
|
||||
logger.Init(GOPATH, GOROOT)
|
||||
logger.RegisterError(config.FmtError)
|
||||
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
// Inject into config package.
|
||||
config.Logger.Info = logger.Info
|
||||
config.Logger.LogIf = logger.LogIf
|
||||
|
||||
globalDNSCache = xhttp.NewDNSCache(10*time.Second, 10*time.Second, logger.LogOnceIf)
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/minio/minio/cmd/config/api"
|
||||
"github.com/minio/minio/cmd/config/cache"
|
||||
"github.com/minio/minio/cmd/config/compress"
|
||||
"github.com/minio/minio/cmd/config/crawler"
|
||||
"github.com/minio/minio/cmd/config/dns"
|
||||
"github.com/minio/minio/cmd/config/etcd"
|
||||
"github.com/minio/minio/cmd/config/heal"
|
||||
@@ -35,6 +34,7 @@ import (
|
||||
"github.com/minio/minio/cmd/config/identity/openid"
|
||||
"github.com/minio/minio/cmd/config/notify"
|
||||
"github.com/minio/minio/cmd/config/policy/opa"
|
||||
"github.com/minio/minio/cmd/config/scanner"
|
||||
"github.com/minio/minio/cmd/config/storageclass"
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
@@ -60,7 +60,7 @@ func initHelp() {
|
||||
config.LoggerWebhookSubSys: logger.DefaultKVS,
|
||||
config.AuditWebhookSubSys: logger.DefaultAuditKVS,
|
||||
config.HealSubSys: heal.DefaultKVS,
|
||||
config.CrawlerSubSys: crawler.DefaultKVS,
|
||||
config.ScannerSubSys: scanner.DefaultKVS,
|
||||
}
|
||||
for k, v := range notify.DefaultNotificationKVS {
|
||||
kvs[k] = v
|
||||
@@ -117,8 +117,8 @@ func initHelp() {
|
||||
Description: "manage object healing frequency and bitrot verification checks",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: config.CrawlerSubSys,
|
||||
Description: "manage crawling for usage calculation, lifecycle, healing and more",
|
||||
Key: config.ScannerSubSys,
|
||||
Description: "manage scanner for usage calculation, lifecycle, healing and more",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: config.LoggerWebhookSubSys,
|
||||
@@ -200,7 +200,7 @@ func initHelp() {
|
||||
config.CacheSubSys: cache.Help,
|
||||
config.CompressionSubSys: compress.Help,
|
||||
config.HealSubSys: heal.Help,
|
||||
config.CrawlerSubSys: crawler.Help,
|
||||
config.ScannerSubSys: scanner.Help,
|
||||
config.IdentityOpenIDSubSys: openid.Help,
|
||||
config.IdentityLDAPSubSys: xldap.Help,
|
||||
config.PolicyOPASubSys: opa.Help,
|
||||
@@ -274,11 +274,11 @@ func validateConfig(s config.Config, setDriveCounts []int) error {
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil {
|
||||
if _, err = heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default]); err != nil {
|
||||
if _, err = scanner.LookupConfig(s[config.ScannerSubSys][config.Default]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -606,10 +606,10 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config
|
||||
return fmt.Errorf("Unable to apply heal config: %w", err)
|
||||
}
|
||||
|
||||
// Crawler
|
||||
crawlerCfg, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default])
|
||||
// Scanner
|
||||
scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default])
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to apply crawler config: %w", err)
|
||||
return fmt.Errorf("Unable to apply scanner config: %w", err)
|
||||
}
|
||||
|
||||
// Apply configurations.
|
||||
@@ -624,7 +624,7 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config
|
||||
globalHealConfig = healCfg
|
||||
globalHealConfigMu.Unlock()
|
||||
|
||||
logger.LogIf(ctx, crawlerSleeper.Update(crawlerCfg.Delay, crawlerCfg.MaxWait))
|
||||
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
||||
|
||||
// Update all dynamic config values in memory.
|
||||
globalServerConfigMu.Lock()
|
||||
|
||||
@@ -77,6 +77,7 @@ const (
|
||||
LoggerWebhookSubSys = "logger_webhook"
|
||||
AuditWebhookSubSys = "audit_webhook"
|
||||
HealSubSys = "heal"
|
||||
ScannerSubSys = "scanner"
|
||||
CrawlerSubSys = "crawler"
|
||||
|
||||
// Add new constants here if you add new fields to config.
|
||||
@@ -114,7 +115,7 @@ var SubSystems = set.CreateStringSet(
|
||||
PolicyOPASubSys,
|
||||
IdentityLDAPSubSys,
|
||||
IdentityOpenIDSubSys,
|
||||
CrawlerSubSys,
|
||||
ScannerSubSys,
|
||||
HealSubSys,
|
||||
NotifyAMQPSubSys,
|
||||
NotifyESSubSys,
|
||||
@@ -132,7 +133,7 @@ var SubSystems = set.CreateStringSet(
|
||||
var SubSystemsDynamic = set.CreateStringSet(
|
||||
APISubSys,
|
||||
CompressionSubSys,
|
||||
CrawlerSubSys,
|
||||
ScannerSubSys,
|
||||
HealSubSys,
|
||||
)
|
||||
|
||||
@@ -151,7 +152,7 @@ var SubSystemsSingleTargets = set.CreateStringSet([]string{
|
||||
IdentityLDAPSubSys,
|
||||
IdentityOpenIDSubSys,
|
||||
HealSubSys,
|
||||
CrawlerSubSys,
|
||||
ScannerSubSys,
|
||||
}...)
|
||||
|
||||
// Constant separators
|
||||
@@ -462,6 +463,13 @@ func LookupWorm() (bool, error) {
|
||||
return ParseBool(env.Get(EnvWorm, EnableOff))
|
||||
}
|
||||
|
||||
// Carries all the renamed sub-systems from their
|
||||
// previously known names
|
||||
var renamedSubsys = map[string]string{
|
||||
CrawlerSubSys: ScannerSubSys,
|
||||
// Add future sub-system renames
|
||||
}
|
||||
|
||||
// Merge - merges a new config with all the
|
||||
// missing values for default configs,
|
||||
// returns a config.
|
||||
@@ -477,9 +485,21 @@ func (c Config) Merge() Config {
|
||||
}
|
||||
}
|
||||
if _, ok := cp[subSys]; !ok {
|
||||
// A config subsystem was removed or server was downgraded.
|
||||
Logger.Info("config: ignoring unknown subsystem config %q\n", subSys)
|
||||
continue
|
||||
rnSubSys, ok := renamedSubsys[subSys]
|
||||
if !ok {
|
||||
// A config subsystem was removed or server was downgraded.
|
||||
Logger.Info("config: ignoring unknown subsystem config %q\n", subSys)
|
||||
continue
|
||||
}
|
||||
// Copy over settings from previous sub-system
|
||||
// to newly renamed sub-system
|
||||
for _, kv := range cp[rnSubSys][Default] {
|
||||
_, ok := c[subSys][tgt].Lookup(kv.Key)
|
||||
if !ok {
|
||||
ckvs.Set(kv.Key, kv.Value)
|
||||
}
|
||||
}
|
||||
subSys = rnSubSys
|
||||
}
|
||||
cp[subSys][tgt] = ckvs
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package crawler
|
||||
package scanner
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
@@ -29,8 +29,10 @@ const (
|
||||
Delay = "delay"
|
||||
MaxWait = "max_wait"
|
||||
|
||||
EnvDelay = "MINIO_CRAWLER_DELAY"
|
||||
EnvMaxWait = "MINIO_CRAWLER_MAX_WAIT"
|
||||
EnvDelay = "MINIO_SCANNER_DELAY"
|
||||
EnvDelayLegacy = "MINIO_CRAWLER_DELAY"
|
||||
EnvMaxWait = "MINIO_SCANNER_MAX_WAIT"
|
||||
EnvMaxWaitLegacy = "MINIO_CRAWLER_MAX_WAIT"
|
||||
)
|
||||
|
||||
// Config represents the heal settings.
|
||||
@@ -58,7 +60,7 @@ var (
|
||||
Help = config.HelpKVS{
|
||||
config.HelpKV{
|
||||
Key: Delay,
|
||||
Description: `crawler delay multiplier, defaults to '10.0'`,
|
||||
Description: `scanner delay multiplier, defaults to '10.0'`,
|
||||
Optional: true,
|
||||
Type: "float",
|
||||
},
|
||||
@@ -73,14 +75,22 @@ var (
|
||||
|
||||
// LookupConfig - lookup config and override with valid environment settings if any.
|
||||
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
||||
if err = config.CheckValidKeys(config.CrawlerSubSys, kvs, DefaultKVS); err != nil {
|
||||
if err = config.CheckValidKeys(config.ScannerSubSys, kvs, DefaultKVS); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
cfg.Delay, err = strconv.ParseFloat(env.Get(EnvDelay, kvs.Get(Delay)), 64)
|
||||
delay := env.Get(EnvDelayLegacy, "")
|
||||
if delay == "" {
|
||||
delay = env.Get(EnvDelay, kvs.Get(Delay))
|
||||
}
|
||||
cfg.Delay, err = strconv.ParseFloat(delay, 64)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
cfg.MaxWait, err = time.ParseDuration(env.Get(EnvMaxWait, kvs.Get(MaxWait)))
|
||||
maxWait := env.Get(EnvMaxWaitLegacy, "")
|
||||
if maxWait == "" {
|
||||
maxWait = env.Get(EnvMaxWait, kvs.Get(MaxWait))
|
||||
}
|
||||
cfg.MaxWait, err = time.ParseDuration(maxWait)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
@@ -56,25 +56,25 @@ var (
|
||||
globalHealConfig heal.Config
|
||||
globalHealConfigMu sync.Mutex
|
||||
|
||||
dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
||||
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
||||
// Sleeper values are updated when config is loaded.
|
||||
crawlerSleeper = newDynamicSleeper(10, 10*time.Second)
|
||||
scannerSleeper = newDynamicSleeper(10, 10*time.Second)
|
||||
)
|
||||
|
||||
// initDataCrawler will start the crawler in the background.
|
||||
func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
go runDataCrawler(ctx, objAPI)
|
||||
// initDataScanner will start the scanner in the background.
|
||||
func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
||||
go runDataScanner(ctx, objAPI)
|
||||
}
|
||||
|
||||
// runDataCrawler will start a data crawler.
|
||||
// runDataScanner will start a data scanner.
|
||||
// The function will block until the context is canceled.
|
||||
// There should only ever be one crawler running per cluster.
|
||||
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 crawler is running on the cluster.
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock")
|
||||
// There should only ever be one scanner running per cluster.
|
||||
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 scanner is running on the cluster.
|
||||
locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock")
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for {
|
||||
err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout)
|
||||
err := locker.GetLock(ctx, dataScannerLeaderLockTimeout)
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
|
||||
continue
|
||||
@@ -112,7 +112,7 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
crawlTimer.Reset(dataCrawlStartDelay)
|
||||
|
||||
if intDataUpdateTracker.debug {
|
||||
console.Debugln("starting crawler cycle")
|
||||
console.Debugln("starting scanner cycle")
|
||||
}
|
||||
|
||||
// Wait before starting next cycle and wait on startup.
|
||||
@@ -167,7 +167,7 @@ type folderScanner struct {
|
||||
|
||||
// crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache.
|
||||
// The returned cache will always be valid, but may not be updated from the existing.
|
||||
// Before each operation sleepDuration is called which can be used to temporarily halt the crawler.
|
||||
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
||||
// If the supplied context is canceled the function will return at the first chance.
|
||||
func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
|
||||
t := UTCNow()
|
||||
@@ -390,12 +390,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
if f.dataUsageCrawlDebug {
|
||||
console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
|
||||
}
|
||||
// If probability was already crawlerHealFolderInclude, keep it.
|
||||
// If probability was already scannerHealFolderInclude, keep it.
|
||||
folder.objectHealProbDiv = f.healFolderInclude
|
||||
}
|
||||
}
|
||||
}
|
||||
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
||||
scannerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
||||
|
||||
cache := dataUsageEntry{}
|
||||
|
||||
@@ -447,7 +447,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
}
|
||||
|
||||
// Dynamic time delay.
|
||||
wait := crawlerSleeper.Timer(ctx)
|
||||
wait := scannerSleeper.Timer(ctx)
|
||||
|
||||
// Get file size, ignore errors.
|
||||
item := crawlItem{
|
||||
@@ -537,7 +537,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
}
|
||||
|
||||
// Dynamic time delay.
|
||||
wait := crawlerSleeper.Timer(ctx)
|
||||
wait := scannerSleeper.Timer(ctx)
|
||||
resolver.bucket = bucket
|
||||
|
||||
foundObjs := false
|
||||
@@ -567,7 +567,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
|
||||
// Sleep and reset.
|
||||
wait()
|
||||
wait = crawlerSleeper.Timer(ctx)
|
||||
wait = scannerSleeper.Timer(ctx)
|
||||
entry, ok := entries.resolve(&resolver)
|
||||
if !ok {
|
||||
for _, err := range errs {
|
||||
@@ -604,7 +604,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
for _, ver := range fiv.Versions {
|
||||
// Sleep and reset.
|
||||
wait()
|
||||
wait = crawlerSleeper.Timer(ctx)
|
||||
wait = scannerSleeper.Timer(ctx)
|
||||
err := bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
object: fiv.Name,
|
||||
@@ -640,9 +640,9 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
Remove: true,
|
||||
},
|
||||
func(bucket, object, versionID string) error {
|
||||
// Wait for each heal as per crawler frequency.
|
||||
// Wait for each heal as per scanner frequency.
|
||||
wait()
|
||||
wait = crawlerSleeper.Timer(ctx)
|
||||
wait = scannerSleeper.Timer(ctx)
|
||||
return bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
@@ -690,12 +690,12 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder,
|
||||
dirStack = append(dirStack, entName)
|
||||
err := readDirFn(path.Join(dirStack...), addDir)
|
||||
dirStack = dirStack[:len(dirStack)-1]
|
||||
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
||||
scannerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
||||
return err
|
||||
}
|
||||
|
||||
// Dynamic time delay.
|
||||
wait := crawlerSleeper.Timer(ctx)
|
||||
wait := scannerSleeper.Timer(ctx)
|
||||
|
||||
// Get file size, ignore errors.
|
||||
dirStack = append(dirStack, entName)
|
||||
@@ -91,7 +91,7 @@ type dataUsageCacheInfo struct {
|
||||
Name string
|
||||
LastUpdate time.Time
|
||||
NextCycle uint32
|
||||
// indicates if the disk is being healed and crawler
|
||||
// indicates if the disk is being healed and scanner
|
||||
// should skip healing the disk
|
||||
SkipHealing bool
|
||||
BloomFilter []byte `msg:"BloomFilter,omitempty"`
|
||||
|
||||
@@ -401,7 +401,7 @@ func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloom
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
// Start crawler. Blocks until done.
|
||||
// Start scanner. Blocks until done.
|
||||
err := erObj.crawlAndGetDataUsage(ctx, allBuckets, bf, updates)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
|
||||
@@ -260,7 +260,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
|
||||
|
||||
for i, info := range infos {
|
||||
// Check if one of the drives in the set is being healed.
|
||||
// this information is used by crawler to skip healing
|
||||
// this information is used by scanner to skip healing
|
||||
// this erasure set while it calculates the usage.
|
||||
if info.Healing || info.Error != "" {
|
||||
healing = true
|
||||
@@ -378,7 +378,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
}
|
||||
}()
|
||||
|
||||
// Start one crawler per disk
|
||||
// Start one scanner per disk
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(disks))
|
||||
for i := range disks {
|
||||
|
||||
@@ -56,12 +56,6 @@ const (
|
||||
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
|
||||
)
|
||||
|
||||
// Inject into config package.
|
||||
func init() {
|
||||
config.Logger.Info = Info
|
||||
config.Logger.LogIf = LogIf
|
||||
}
|
||||
|
||||
// Default KVS for loggerHTTP and loggerAuditHTTP
|
||||
var (
|
||||
DefaultKVS = config.KVS{
|
||||
|
||||
@@ -145,7 +145,7 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool {
|
||||
// Cycle is somehow bigger.
|
||||
return false
|
||||
case cache.finished() && time.Since(cache.lastHandout) > 48*time.Hour:
|
||||
// Keep only for 2 days. Fallback if crawler is clogged.
|
||||
// Keep only for 2 days. Fallback if scanner is clogged.
|
||||
return false
|
||||
case cache.finished() && currentCycle >= dataUsageUpdateDirCycles && cache.startedCycle < currentCycle-dataUsageUpdateDirCycles:
|
||||
// Cycle is too old to be valuable.
|
||||
|
||||
@@ -419,7 +419,7 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
|
||||
}
|
||||
|
||||
// Populates prometheus with bucket usage metrics, this metrics
|
||||
// is only enabled if crawler is enabled.
|
||||
// is only enabled if scanner is enabled.
|
||||
func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
|
||||
objLayer := newObjectLayerFn()
|
||||
// Service not initialized yet
|
||||
|
||||
@@ -509,7 +509,7 @@ func serverMain(ctx *cli.Context) {
|
||||
initBackgroundExpiry(GlobalContext, newObject)
|
||||
}
|
||||
|
||||
initDataCrawler(GlobalContext, newObject)
|
||||
initDataScanner(GlobalContext, newObject)
|
||||
|
||||
if err = initServer(GlobalContext, newObject); err != nil {
|
||||
var cerr config.Err
|
||||
|
||||
Reference in New Issue
Block a user