mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Apply dynamic config at sub-system level (#14369)
Currently, when applying any dynamic config, the system reloads and re-applies the config of all the dynamic sub-systems. This PR refactors the code in such a way that changing config of a given dynamic sub-system will work on only that sub-system.
This commit is contained in:
parent
0cbdc458c5
commit
94d37d05e5
@ -64,6 +64,12 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
subSys, _, _, err := config.GetSubSys(string(kvBytes))
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
cfg, err := readServerConfig(ctx, objectAPI)
|
cfg, err := readServerConfig(ctx, objectAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
@ -74,7 +80,7 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = validateConfig(cfg, ""); err != nil {
|
if err = validateConfig(cfg, subSys); err != nil {
|
||||||
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
|
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -84,15 +90,16 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
dynamic := config.SubSystemsDynamic.Contains(string(kvBytes))
|
dynamic := config.SubSystemsDynamic.Contains(subSys)
|
||||||
if dynamic {
|
if dynamic {
|
||||||
applyDynamic(ctx, objectAPI, cfg, r, w)
|
applyDynamic(ctx, objectAPI, cfg, subSys, r, w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyDynamic(ctx context.Context, objectAPI ObjectLayer, cfg config.Config, r *http.Request, w http.ResponseWriter) {
|
func applyDynamic(ctx context.Context, objectAPI ObjectLayer, cfg config.Config, subSys string,
|
||||||
|
r *http.Request, w http.ResponseWriter) {
|
||||||
// Apply dynamic values.
|
// Apply dynamic values.
|
||||||
if err := applyDynamicConfig(GlobalContext, objectAPI, cfg); err != nil {
|
if err := applyDynamicConfigForSubSys(GlobalContext, objectAPI, cfg, subSys); err != nil {
|
||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -162,7 +169,7 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
|
|
||||||
if dynamic {
|
if dynamic {
|
||||||
applyDynamic(ctx, objectAPI, cfg, r, w)
|
applyDynamic(ctx, objectAPI, cfg, subSys, r, w)
|
||||||
}
|
}
|
||||||
writeSuccessResponseHeadersOnly(w)
|
writeSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
@ -616,84 +616,80 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyDynamicConfig will apply dynamic config values.
|
func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s config.Config, subSys string) error {
|
||||||
// Dynamic systems should be in config.SubSystemsDynamic as well.
|
switch subSys {
|
||||||
func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config) error {
|
case config.APISubSys:
|
||||||
// Read all dynamic configs.
|
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
||||||
// API
|
if err != nil {
|
||||||
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
||||||
if err != nil {
|
}
|
||||||
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
var setDriveCounts []int
|
||||||
}
|
if objAPI != nil {
|
||||||
|
setDriveCounts = objAPI.SetDriveCounts()
|
||||||
// Compression
|
}
|
||||||
cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
|
globalAPIConfig.init(apiConfig, setDriveCounts)
|
||||||
if err != nil {
|
case config.CompressionSubSys:
|
||||||
return fmt.Errorf("Unable to setup Compression: %w", err)
|
cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
|
||||||
}
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to setup Compression: %w", err)
|
||||||
// Validate if the object layer supports compression.
|
}
|
||||||
if objAPI != nil {
|
// Validate if the object layer supports compression.
|
||||||
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
if objAPI != nil {
|
||||||
return fmt.Errorf("Backend does not support compression")
|
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
||||||
|
return fmt.Errorf("Backend does not support compression")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
globalCompressConfigMu.Lock()
|
||||||
|
globalCompressConfig = cmpCfg
|
||||||
|
globalCompressConfigMu.Unlock()
|
||||||
|
case config.HealSubSys:
|
||||||
|
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to apply heal config: %w", err)
|
||||||
|
}
|
||||||
|
globalHealConfig.Update(healCfg)
|
||||||
|
case config.ScannerSubSys:
|
||||||
|
scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to apply scanner config: %w", err)
|
||||||
|
}
|
||||||
|
// update dynamic scanner values.
|
||||||
|
scannerCycle.Update(scannerCfg.Cycle)
|
||||||
|
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
||||||
|
case config.LoggerWebhookSubSys:
|
||||||
|
loggerCfg, err := logger.LookupConfig(s)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
||||||
|
}
|
||||||
|
userAgent := getUserAgent(getMinioMode())
|
||||||
|
for n, l := range loggerCfg.HTTP {
|
||||||
|
if l.Enabled {
|
||||||
|
l.LogOnce = logger.LogOnceIf
|
||||||
|
l.UserAgent = userAgent
|
||||||
|
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
||||||
|
loggerCfg.HTTP[n] = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = logger.UpdateTargets(loggerCfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heal
|
|
||||||
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Unable to apply heal config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Scanner
|
|
||||||
scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default])
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Unable to apply scanner config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logger webhook
|
|
||||||
loggerCfg, err := logger.LookupConfig(s)
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
|
||||||
}
|
|
||||||
userAgent := getUserAgent(getMinioMode())
|
|
||||||
for n, l := range loggerCfg.HTTP {
|
|
||||||
if l.Enabled {
|
|
||||||
l.LogOnce = logger.LogOnceIf
|
|
||||||
l.UserAgent = userAgent
|
|
||||||
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
|
||||||
loggerCfg.HTTP[n] = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = logger.UpdateTargets(loggerCfg)
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply configurations.
|
|
||||||
// We should not fail after this.
|
|
||||||
var setDriveCounts []int
|
|
||||||
if objAPI != nil {
|
|
||||||
setDriveCounts = objAPI.SetDriveCounts()
|
|
||||||
}
|
|
||||||
globalAPIConfig.init(apiConfig, setDriveCounts)
|
|
||||||
|
|
||||||
globalCompressConfigMu.Lock()
|
|
||||||
globalCompressConfig = cmpCfg
|
|
||||||
globalCompressConfigMu.Unlock()
|
|
||||||
|
|
||||||
globalHealConfig.Update(healCfg)
|
|
||||||
|
|
||||||
// update dynamic scanner values.
|
|
||||||
scannerCycle.Update(scannerCfg.Cycle)
|
|
||||||
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
|
||||||
|
|
||||||
// Update all dynamic config values in memory.
|
|
||||||
globalServerConfigMu.Lock()
|
globalServerConfigMu.Lock()
|
||||||
defer globalServerConfigMu.Unlock()
|
defer globalServerConfigMu.Unlock()
|
||||||
if globalServerConfig != nil {
|
if globalServerConfig != nil {
|
||||||
for k := range config.SubSystemsDynamic {
|
globalServerConfig[subSys] = s[subSys]
|
||||||
globalServerConfig[k] = s[k]
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyDynamicConfig will apply dynamic config values.
|
||||||
|
// Dynamic systems should be in config.SubSystemsDynamic as well.
|
||||||
|
func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config) error {
|
||||||
|
for subSys := range config.SubSystemsDynamic {
|
||||||
|
err := applyDynamicConfigForSubSys(ctx, objAPI, s, subSys)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -813,18 +813,16 @@ func GetSubSys(s string) (subSys string, inputs []string, tgt string, e error) {
|
|||||||
return subSys, inputs, tgt, Errorf("input arguments cannot be empty")
|
return subSys, inputs, tgt, Errorf("input arguments cannot be empty")
|
||||||
}
|
}
|
||||||
inputs = strings.SplitN(s, KvSpaceSeparator, 2)
|
inputs = strings.SplitN(s, KvSpaceSeparator, 2)
|
||||||
if len(inputs) <= 1 {
|
|
||||||
return subSys, inputs, tgt, Errorf("invalid number of arguments '%s'", s)
|
|
||||||
}
|
|
||||||
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
|
|
||||||
if len(subSystemValue) == 0 {
|
|
||||||
return subSys, inputs, tgt, Errorf("invalid number of arguments %s", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !SubSystems.Contains(subSystemValue[0]) {
|
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
|
||||||
|
subSys = subSystemValue[0]
|
||||||
|
if !SubSystems.Contains(subSys) {
|
||||||
return subSys, inputs, tgt, Errorf("unknown sub-system %s", s)
|
return subSys, inputs, tgt, Errorf("unknown sub-system %s", s)
|
||||||
}
|
}
|
||||||
subSys = subSystemValue[0]
|
|
||||||
|
if len(inputs) == 1 {
|
||||||
|
return subSys, inputs, tgt, nil
|
||||||
|
}
|
||||||
|
|
||||||
if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 {
|
if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 {
|
||||||
return subSys, inputs, tgt, Errorf("sub-system '%s' only supports single target", subSystemValue[0])
|
return subSys, inputs, tgt, Errorf("sub-system '%s' only supports single target", subSystemValue[0])
|
||||||
|
Loading…
Reference in New Issue
Block a user