allow protection from invalid config values (#19460)

we have had numerous reports on some config
values not having default values, causing
features misbehaving and not having default
values set properly.

This PR tries to address all these concerns
once and for all.

Each new sub-system that gets added

- must check for invalid keys
- must have default values set
- must not "return err" when being saved into
  a global state() instead collate as part of
  other subsystem errors allow other sub-systems
  to independently initialize.
This commit is contained in:
Harshavardhana 2024-04-10 18:10:30 -07:00 committed by GitHub
parent 9b926f7dbe
commit 0c31e61343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 118 additions and 72 deletions

View File

@ -566,6 +566,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
return errServerNotInitialized return errServerNotInitialized
} }
var errs []error
setDriveCounts := objAPI.SetDriveCounts() setDriveCounts := objAPI.SetDriveCounts()
switch subSys { switch subSys {
case config.APISubSys: case config.APISubSys:
@ -588,26 +589,29 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
case config.HealSubSys: case config.HealSubSys:
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default]) healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
if err != nil { if err != nil {
return fmt.Errorf("Unable to apply heal config: %w", err) errs = append(errs, fmt.Errorf("Unable to apply heal config: %w", err))
} } else {
globalHealConfig.Update(healCfg) globalHealConfig.Update(healCfg)
}
case config.BatchSubSys: case config.BatchSubSys:
batchCfg, err := batch.LookupConfig(s[config.BatchSubSys][config.Default]) batchCfg, err := batch.LookupConfig(s[config.BatchSubSys][config.Default])
if err != nil { if err != nil {
return fmt.Errorf("Unable to apply batch config: %w", err) errs = append(errs, fmt.Errorf("Unable to apply batch config: %w", err))
} } else {
globalBatchConfig.Update(batchCfg) globalBatchConfig.Update(batchCfg)
}
case config.ScannerSubSys: case config.ScannerSubSys:
scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default]) scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default])
if err != nil { if err != nil {
return fmt.Errorf("Unable to apply scanner config: %w", err) errs = append(errs, fmt.Errorf("Unable to apply scanner config: %w", err))
} } else {
// update dynamic scanner values. // update dynamic scanner values.
scannerIdleMode.Store(scannerCfg.IdleMode) scannerIdleMode.Store(scannerCfg.IdleMode)
scannerCycle.Store(scannerCfg.Cycle) scannerCycle.Store(scannerCfg.Cycle)
scannerExcessObjectVersions.Store(scannerCfg.ExcessVersions) scannerExcessObjectVersions.Store(scannerCfg.ExcessVersions)
scannerExcessFolders.Store(scannerCfg.ExcessFolders) scannerExcessFolders.Store(scannerCfg.ExcessFolders)
configLogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait)) configLogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
}
case config.LoggerWebhookSubSys: case config.LoggerWebhookSubSys:
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys) loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys)
if err != nil { if err != nil {
@ -693,11 +697,11 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
} }
} }
case config.DriveSubSys: case config.DriveSubSys:
if driveConfig, err := drive.LookupConfig(s[config.DriveSubSys][config.Default]); err != nil { driveConfig, err := drive.LookupConfig(s[config.DriveSubSys][config.Default])
if err != nil {
configLogIf(ctx, fmt.Errorf("Unable to load drive config: %w", err)) configLogIf(ctx, fmt.Errorf("Unable to load drive config: %w", err))
} else { } else {
err := globalDriveConfig.Update(driveConfig) if err = globalDriveConfig.Update(driveConfig); err != nil {
if err != nil {
configLogIf(ctx, fmt.Errorf("Unable to update drive config: %v", err)) configLogIf(ctx, fmt.Errorf("Unable to update drive config: %v", err))
} }
} }
@ -711,14 +715,15 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
case config.BrowserSubSys: case config.BrowserSubSys:
browserCfg, err := browser.LookupConfig(s[config.BrowserSubSys][config.Default]) browserCfg, err := browser.LookupConfig(s[config.BrowserSubSys][config.Default])
if err != nil { if err != nil {
return fmt.Errorf("Unable to apply browser config: %w", err) errs = append(errs, fmt.Errorf("Unable to apply browser config: %w", err))
} } else {
globalBrowserConfig.Update(browserCfg) globalBrowserConfig.Update(browserCfg)
}
case config.ILMSubSys: case config.ILMSubSys:
ilmCfg, err := ilm.LookupConfig(s[config.ILMSubSys][config.Default]) ilmCfg, err := ilm.LookupConfig(s[config.ILMSubSys][config.Default])
if err != nil { if err != nil {
return fmt.Errorf("Unable to apply ilm config: %w", err) errs = append(errs, fmt.Errorf("Unable to apply ilm config: %w", err))
} } else {
if globalTransitionState != nil { if globalTransitionState != nil {
globalTransitionState.UpdateWorkers(ilmCfg.TransitionWorkers) globalTransitionState.UpdateWorkers(ilmCfg.TransitionWorkers)
} }
@ -727,11 +732,15 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
} }
globalILMConfig.update(ilmCfg) globalILMConfig.update(ilmCfg)
} }
}
globalServerConfigMu.Lock() globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock() defer globalServerConfigMu.Unlock()
if globalServerConfig != nil { if globalServerConfig != nil {
globalServerConfig[subSys] = s[subSys] globalServerConfig[subSys] = s[subSys]
} }
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil return nil
} }

View File

@ -626,6 +626,10 @@ func NewHTTPTransportWithClientCerts(clientCert, clientKey string) *http.Transpo
if err != nil { if err != nil {
internalLogIf(ctx, fmt.Errorf("Unable to load client key and cert, please check your client certificate configuration: %w", err)) internalLogIf(ctx, fmt.Errorf("Unable to load client key and cert, please check your client certificate configuration: %w", err))
} }
if transport == nil {
// Client certs are not readable return default transport.
return s.NewHTTPTransportWithTimeout(1 * time.Minute)
}
return transport return transport
} }

View File

@ -97,15 +97,30 @@ func (browseCfg *Config) Update(newCfg Config) {
// LookupConfig - lookup api config and override with valid environment settings if any. // LookupConfig - lookup api config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
cspPolicy := env.Get(EnvBrowserCSPPolicy, kvs.GetWithDefault(browserCSPPolicy, DefaultKVS)) cfg = Config{
hstsSeconds, err := strconv.Atoi(env.Get(EnvBrowserHSTSSeconds, kvs.GetWithDefault(browserHSTSSeconds, DefaultKVS))) CSPPolicy: env.Get(EnvBrowserCSPPolicy, kvs.GetWithDefault(browserCSPPolicy, DefaultKVS)),
if err != nil { HSTSSeconds: 0,
HSTSIncludeSubdomains: true,
HSTSPreload: true,
ReferrerPolicy: "strict-origin-when-cross-origin",
}
if err = config.CheckValidKeys(config.BrowserSubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
} }
hstsIncludeSubdomains := env.Get(EnvBrowserHSTSIncludeSubdomains, kvs.GetWithDefault(browserHSTSIncludeSubdomains, DefaultKVS)) == config.EnableOn hstsIncludeSubdomains := env.Get(EnvBrowserHSTSIncludeSubdomains, kvs.GetWithDefault(browserHSTSIncludeSubdomains, DefaultKVS)) == config.EnableOn
hstsPreload := env.Get(EnvBrowserHSTSPreload, kvs.Get(browserHSTSPreload)) == config.EnableOn hstsPreload := env.Get(EnvBrowserHSTSPreload, kvs.Get(browserHSTSPreload)) == config.EnableOn
hstsSeconds, err := strconv.Atoi(env.Get(EnvBrowserHSTSSeconds, kvs.GetWithDefault(browserHSTSSeconds, DefaultKVS)))
if err != nil {
return cfg, err
}
cfg.HSTSSeconds = hstsSeconds
cfg.HSTSIncludeSubdomains = hstsIncludeSubdomains
cfg.HSTSPreload = hstsPreload
referrerPolicy := env.Get(EnvBrowserReferrerPolicy, kvs.GetWithDefault(browserReferrerPolicy, DefaultKVS)) referrerPolicy := env.Get(EnvBrowserReferrerPolicy, kvs.GetWithDefault(browserReferrerPolicy, DefaultKVS))
switch referrerPolicy { switch referrerPolicy {
case "no-referrer", "no-referrer-when-downgrade", "origin", "origin-when-cross-origin", "same-origin", "strict-origin", "strict-origin-when-cross-origin", "unsafe-url": case "no-referrer", "no-referrer-when-downgrade", "origin", "origin-when-cross-origin", "same-origin", "strict-origin", "strict-origin-when-cross-origin", "unsafe-url":
@ -114,11 +129,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, fmt.Errorf("invalid value %v for %s", referrerPolicy, browserReferrerPolicy) return cfg, fmt.Errorf("invalid value %v for %s", referrerPolicy, browserReferrerPolicy)
} }
cfg.CSPPolicy = cspPolicy
cfg.HSTSSeconds = hstsSeconds
cfg.HSTSIncludeSubdomains = hstsIncludeSubdomains
cfg.HSTSPreload = hstsPreload
return cfg, nil return cfg, nil
} }

View File

@ -33,30 +33,35 @@ var DefaultKVS = config.KVS{
}, },
} }
var configLk sync.RWMutex
// Config represents the subnet related configuration // Config represents the subnet related configuration
type Config struct { type Config struct {
// MaxTimeout - maximum timeout for a drive operation // MaxTimeout - maximum timeout for a drive operation
MaxTimeout time.Duration `json:"maxTimeout"` MaxTimeout time.Duration `json:"maxTimeout"`
mutex sync.RWMutex
} }
// Update - updates the config with latest values // Update - updates the config with latest values
func (c *Config) Update(new *Config) error { func (c *Config) Update(new Config) error {
c.mutex.Lock() configLk.Lock()
defer c.mutex.Unlock() defer configLk.Unlock()
c.MaxTimeout = getMaxTimeout(new.MaxTimeout) c.MaxTimeout = getMaxTimeout(new.MaxTimeout)
return nil return nil
} }
// GetMaxTimeout - returns the max timeout value. // GetMaxTimeout - returns the max timeout value.
func (c *Config) GetMaxTimeout() time.Duration { func (c *Config) GetMaxTimeout() time.Duration {
c.mutex.RLock() configLk.RLock()
defer c.mutex.RUnlock() defer configLk.RUnlock()
return getMaxTimeout(c.MaxTimeout) return getMaxTimeout(c.MaxTimeout)
} }
// LookupConfig - lookup config and override with valid environment settings if any. // LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg *Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
cfg = Config{
MaxTimeout: 30 * time.Second,
}
if err = config.CheckValidKeys(config.DriveSubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.DriveSubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
} }
@ -68,9 +73,7 @@ func LookupConfig(kvs config.KVS) (cfg *Config, err error) {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "") d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
} }
} }
cfg = &Config{
mutex: sync.RWMutex{},
}
dur, _ := time.ParseDuration(d) dur, _ := time.ParseDuration(d)
if dur < time.Second { if dur < time.Second {
cfg.MaxTimeout = 30 * time.Second cfg.MaxTimeout = 30 * time.Second

View File

@ -157,11 +157,14 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.HealSubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.HealSubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
} }
cfg.Bitrot = env.Get(EnvBitrot, kvs.GetWithDefault(Bitrot, DefaultKVS))
_, err = parseBitrotConfig(cfg.Bitrot) bitrot := env.Get(EnvBitrot, kvs.GetWithDefault(Bitrot, DefaultKVS))
if err != nil { if _, err = parseBitrotConfig(bitrot); err != nil {
return cfg, fmt.Errorf("'heal:bitrotscan' value invalid: %w", err) return cfg, fmt.Errorf("'heal:bitrotscan' value invalid: %w", err)
} }
cfg.Bitrot = bitrot
cfg.Sleep, err = time.ParseDuration(env.Get(EnvSleep, kvs.GetWithDefault(Sleep, DefaultKVS))) cfg.Sleep, err = time.ParseDuration(env.Get(EnvSleep, kvs.GetWithDefault(Sleep, DefaultKVS)))
if err != nil { if err != nil {
return cfg, fmt.Errorf("'heal:max_sleep' value invalid: %w", err) return cfg, fmt.Errorf("'heal:max_sleep' value invalid: %w", err)

View File

@ -44,6 +44,15 @@ type Config struct {
// LookupConfig - lookup ilm config and override with valid environment settings if any. // LookupConfig - lookup ilm config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
cfg = Config{
TransitionWorkers: 100,
ExpirationWorkers: 100,
}
if err = config.CheckValidKeys(config.ILMSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
tw, err := strconv.Atoi(env.Get(EnvILMTransitionWorkers, kvs.GetWithDefault(transitionWorkers, DefaultKVS))) tw, err := strconv.Atoi(env.Get(EnvILMTransitionWorkers, kvs.GetWithDefault(transitionWorkers, DefaultKVS)))
if err != nil { if err != nil {
return cfg, err return cfg, err

View File

@ -114,15 +114,44 @@ var DefaultKVS = config.KVS{
// LookupConfig - lookup config and override with valid environment settings if any. // LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
cfg = Config{
ExcessVersions: 100,
ExcessFolders: 50000,
IdleMode: 0, // Default is on
}
if err = config.CheckValidKeys(config.ScannerSubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.ScannerSubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
} }
excessVersions, err := strconv.ParseInt(env.Get(EnvExcessVersions, kvs.GetWithDefault(ExcessVersions, DefaultKVS)), 10, 64)
if err != nil {
return cfg, err
}
cfg.ExcessVersions = excessVersions
excessFolders, err := strconv.ParseInt(env.Get(EnvExcessFolders, kvs.GetWithDefault(ExcessFolders, DefaultKVS)), 10, 64)
if err != nil {
return cfg, err
}
cfg.ExcessFolders = excessFolders
switch idleSpeed := env.Get(EnvIdleSpeed, kvs.GetWithDefault(IdleSpeed, DefaultKVS)); idleSpeed {
case "", config.EnableOn:
cfg.IdleMode = 0
case config.EnableOff:
cfg.IdleMode = 1
default:
return cfg, fmt.Errorf("unknown value: '%s'", idleSpeed)
}
// Stick to loading deprecated config/env if they are already set, and the Speed value // Stick to loading deprecated config/env if they are already set, and the Speed value
// has not been changed from its "default" value, if it has been changed honor new settings. // has not been changed from its "default" value, if it has been changed honor new settings.
if kvs.GetWithDefault(Speed, DefaultKVS) == "default" { if kvs.GetWithDefault(Speed, DefaultKVS) == "default" {
if kvs.Get(Delay) != "" && kvs.Get(MaxWait) != "" { if kvs.Get(Delay) != "" && kvs.Get(MaxWait) != "" {
return lookupDeprecatedScannerConfig(kvs) if err = lookupDeprecatedScannerConfig(kvs, &cfg); err != nil {
return cfg, err
}
} }
} }
@ -141,38 +170,17 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, fmt.Errorf("unknown '%s' value", speed) return cfg, fmt.Errorf("unknown '%s' value", speed)
} }
switch idleSpeed := env.Get(EnvIdleSpeed, kvs.GetWithDefault(IdleSpeed, DefaultKVS)); idleSpeed {
case "", config.EnableOn:
cfg.IdleMode = 0
case config.EnableOff:
cfg.IdleMode = 1
default:
return cfg, fmt.Errorf("unknown value: '%s'", idleSpeed)
}
excessVersions, err := strconv.ParseInt(env.Get(EnvExcessVersions, kvs.GetWithDefault(ExcessVersions, DefaultKVS)), 10, 64)
if err != nil {
return cfg, err
}
cfg.ExcessVersions = excessVersions
excessFolders, err := strconv.ParseInt(env.Get(EnvExcessFolders, kvs.GetWithDefault(ExcessFolders, DefaultKVS)), 10, 64)
if err != nil {
return cfg, err
}
cfg.ExcessFolders = excessFolders
return cfg, nil return cfg, nil
} }
func lookupDeprecatedScannerConfig(kvs config.KVS) (cfg Config, err error) { func lookupDeprecatedScannerConfig(kvs config.KVS, cfg *Config) (err error) {
delay := env.Get(EnvDelayLegacy, "") delay := env.Get(EnvDelayLegacy, "")
if delay == "" { if delay == "" {
delay = env.Get(EnvDelay, kvs.GetWithDefault(Delay, DefaultKVS)) delay = env.Get(EnvDelay, kvs.GetWithDefault(Delay, DefaultKVS))
} }
cfg.Delay, err = strconv.ParseFloat(delay, 64) cfg.Delay, err = strconv.ParseFloat(delay, 64)
if err != nil { if err != nil {
return cfg, err return err
} }
maxWait := env.Get(EnvMaxWaitLegacy, "") maxWait := env.Get(EnvMaxWaitLegacy, "")
if maxWait == "" { if maxWait == "" {
@ -180,7 +188,7 @@ func lookupDeprecatedScannerConfig(kvs config.KVS) (cfg Config, err error) {
} }
cfg.MaxWait, err = time.ParseDuration(maxWait) cfg.MaxWait, err = time.ParseDuration(maxWait)
if err != nil { if err != nil {
return cfg, err return err
} }
cycle := env.Get(EnvCycle, kvs.GetWithDefault(Cycle, DefaultKVS)) cycle := env.Get(EnvCycle, kvs.GetWithDefault(Cycle, DefaultKVS))
if cycle == "" { if cycle == "" {
@ -188,7 +196,7 @@ func lookupDeprecatedScannerConfig(kvs config.KVS) (cfg Config, err error) {
} }
cfg.Cycle, err = time.ParseDuration(cycle) cfg.Cycle, err = time.ParseDuration(cycle)
if err != nil { if err != nil {
return cfg, err return err
} }
return cfg, nil return nil
} }