mirror of https://github.com/minio/minio.git
fix: prioritize audit_webhook and logger_webhook ENVs over the config KVS (#17783)
This commit is contained in:
parent
45fb375c41
commit
0285df5a02
|
@ -386,7 +386,7 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if config.LoggerSubSystems.Contains(subSys) {
|
if config.LoggerSubSystems.Contains(subSys) {
|
||||||
if err := logger.ValidateSubSysConfig(s, subSys); err != nil {
|
if err := logger.ValidateSubSysConfig(ctx, s, subSys); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -575,7 +575,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
||||||
scannerCycle.Store(scannerCfg.Cycle)
|
scannerCycle.Store(scannerCfg.Cycle)
|
||||||
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys)
|
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
||||||
}
|
}
|
||||||
|
@ -592,7 +592,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs))
|
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs))
|
||||||
}
|
}
|
||||||
case config.AuditWebhookSubSys:
|
case config.AuditWebhookSubSys:
|
||||||
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys)
|
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.AuditWebhookSubSys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err))
|
||||||
}
|
}
|
||||||
|
@ -610,7 +610,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs))
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs))
|
||||||
}
|
}
|
||||||
case config.AuditKafkaSubSys:
|
case config.AuditKafkaSubSys:
|
||||||
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys)
|
loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.AuditKafkaSubSys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package logger
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -98,6 +99,8 @@ const (
|
||||||
auditTargetNamePrefix = "audit-"
|
auditTargetNamePrefix = "audit-"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errInvalidQueueSize = errors.New("invalid queue_size value")
|
||||||
|
|
||||||
// Default KVS for loggerHTTP and loggerAuditHTTP
|
// Default KVS for loggerHTTP and loggerAuditHTTP
|
||||||
var (
|
var (
|
||||||
DefaultLoggerWebhookKVS = config.KVS{
|
DefaultLoggerWebhookKVS = config.KVS{
|
||||||
|
@ -260,7 +263,7 @@ func getCfgVal(envName, key, defaultValue string) string {
|
||||||
return env.Get(envName, defaultValue)
|
return env.Get(envName, defaultValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func lookupLegacyConfigForSubSys(subSys string) Config {
|
func lookupLegacyConfigForSubSys(ctx context.Context, subSys string) Config {
|
||||||
cfg := NewConfig()
|
cfg := NewConfig()
|
||||||
switch subSys {
|
switch subSys {
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
|
@ -280,9 +283,14 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
|
||||||
if endpoint == "" {
|
if endpoint == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
url, err := xnet.ParseHTTPURL(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
LogOnceIf(ctx, err, "logger-webhook-"+endpoint)
|
||||||
|
continue
|
||||||
|
}
|
||||||
cfg.HTTP[target] = http.Config{
|
cfg.HTTP[target] = http.Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Endpoint: endpoint,
|
Endpoint: url,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,9 +311,14 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
|
||||||
if endpoint == "" {
|
if endpoint == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
url, err := xnet.ParseHTTPURL(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
LogOnceIf(ctx, err, "audit-webhook-"+endpoint)
|
||||||
|
continue
|
||||||
|
}
|
||||||
cfg.AuditWebhook[target] = http.Config{
|
cfg.AuditWebhook[target] = http.Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Endpoint: endpoint,
|
Endpoint: url,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +386,7 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
if queueSize <= 0 {
|
if queueSize <= 0 {
|
||||||
return cfg, errors.New("invalid queue_size value")
|
return cfg, errInvalidQueueSize
|
||||||
}
|
}
|
||||||
kafkaArgs.QueueSize = queueSize
|
kafkaArgs.QueueSize = queueSize
|
||||||
|
|
||||||
|
@ -384,214 +397,121 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
||||||
envs := env.List(EnvLoggerWebhookEndpoint)
|
for k, kv := range config.Merge(scfg[config.LoggerWebhookSubSys], EnvLoggerWebhookEnable, DefaultLoggerWebhookKVS) {
|
||||||
var loggerTargets []string
|
if v, ok := cfg.HTTP[k]; ok && v.Enabled {
|
||||||
for _, k := range envs {
|
|
||||||
target := strings.TrimPrefix(k, EnvLoggerWebhookEndpoint+config.Default)
|
|
||||||
if target == EnvLoggerWebhookEndpoint {
|
|
||||||
target = config.Default
|
|
||||||
}
|
|
||||||
loggerTargets = append(loggerTargets, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load HTTP logger from the environment if found
|
|
||||||
for _, target := range loggerTargets {
|
|
||||||
if v, ok := cfg.HTTP[target]; ok && v.Enabled {
|
|
||||||
// This target is already enabled using the
|
// This target is already enabled using the
|
||||||
// legacy environment variables, ignore.
|
// legacy environment variables, ignore.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, target, "")
|
|
||||||
enable, err := config.ParseBool(enableCfgVal)
|
|
||||||
if err != nil || !enable {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
clientCert := getCfgVal(EnvLoggerWebhookClientCert, target, "")
|
|
||||||
clientKey := getCfgVal(EnvLoggerWebhookClientKey, target, "")
|
|
||||||
err = config.EnsureCertAndKey(clientCert, clientKey)
|
|
||||||
if err != nil {
|
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
|
|
||||||
queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, target, "100000")
|
|
||||||
queueSize, err := strconv.Atoi(queueSizeCfgVal)
|
|
||||||
if err != nil {
|
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
if queueSize <= 0 {
|
|
||||||
return cfg, errors.New("invalid queue_size value")
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.HTTP[target] = http.Config{
|
|
||||||
Enabled: true,
|
|
||||||
Endpoint: getCfgVal(EnvLoggerWebhookEndpoint, target, ""),
|
|
||||||
AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, target, ""),
|
|
||||||
ClientCert: clientCert,
|
|
||||||
ClientKey: clientKey,
|
|
||||||
Proxy: getCfgVal(EnvLoggerWebhookProxy, target, ""),
|
|
||||||
QueueSize: queueSize,
|
|
||||||
QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, target, ""),
|
|
||||||
Name: loggerTargetNamePrefix + target,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for starget, kv := range scfg[config.LoggerWebhookSubSys] {
|
|
||||||
if l, ok := cfg.HTTP[starget]; ok && l.Enabled {
|
|
||||||
// Ignore this HTTP logger config since there is
|
|
||||||
// a target with the same name loaded and enabled
|
|
||||||
// from the environment.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
subSysTarget := config.LoggerWebhookSubSys
|
subSysTarget := config.LoggerWebhookSubSys
|
||||||
if starget != config.Default {
|
if k != config.Default {
|
||||||
subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget
|
subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + k
|
||||||
}
|
}
|
||||||
if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil {
|
if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
enabled, err := config.ParseBool(kv.Get(config.Enable))
|
enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, k, kv.Get(config.Enable))
|
||||||
if err != nil {
|
enable, err := config.ParseBool(enableCfgVal)
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
if !enabled {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey))
|
|
||||||
if err != nil {
|
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
queueSize, err := strconv.Atoi(kv.Get(QueueSize))
|
|
||||||
if err != nil {
|
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
if queueSize <= 0 {
|
|
||||||
return cfg, errors.New("invalid queue_size value")
|
|
||||||
}
|
|
||||||
cfg.HTTP[starget] = http.Config{
|
|
||||||
Enabled: true,
|
|
||||||
Endpoint: kv.Get(Endpoint),
|
|
||||||
AuthToken: kv.Get(AuthToken),
|
|
||||||
ClientCert: kv.Get(ClientCert),
|
|
||||||
ClientKey: kv.Get(ClientKey),
|
|
||||||
Proxy: kv.Get(Proxy),
|
|
||||||
QueueSize: queueSize,
|
|
||||||
QueueDir: kv.Get(QueueDir),
|
|
||||||
Name: loggerTargetNamePrefix + starget,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
|
||||||
var loggerAuditTargets []string
|
|
||||||
envs := env.List(EnvAuditWebhookEndpoint)
|
|
||||||
for _, k := range envs {
|
|
||||||
target := strings.TrimPrefix(k, EnvAuditWebhookEndpoint+config.Default)
|
|
||||||
if target == EnvAuditWebhookEndpoint {
|
|
||||||
target = config.Default
|
|
||||||
}
|
|
||||||
loggerAuditTargets = append(loggerAuditTargets, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, target := range loggerAuditTargets {
|
|
||||||
if v, ok := cfg.AuditWebhook[target]; ok && v.Enabled {
|
|
||||||
// This target is already enabled using the
|
|
||||||
// legacy environment variables, ignore.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, target, ""))
|
|
||||||
if err != nil || !enable {
|
if err != nil || !enable {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
var url *xnet.URL
|
||||||
clientCert := getCfgVal(EnvAuditWebhookClientCert, target, "")
|
endpoint := getCfgVal(EnvLoggerWebhookEndpoint, k, kv.Get(Endpoint))
|
||||||
clientKey := getCfgVal(EnvAuditWebhookClientKey, target, "")
|
url, err = xnet.ParseHTTPURL(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
clientCert := getCfgVal(EnvLoggerWebhookClientCert, k, kv.Get(ClientCert))
|
||||||
|
clientKey := getCfgVal(EnvLoggerWebhookClientKey, k, kv.Get(ClientKey))
|
||||||
err = config.EnsureCertAndKey(clientCert, clientKey)
|
err = config.EnsureCertAndKey(clientCert, clientKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
|
queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, k, kv.Get(QueueSize))
|
||||||
queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, target, "100000")
|
|
||||||
queueSize, err := strconv.Atoi(queueSizeCfgVal)
|
queueSize, err := strconv.Atoi(queueSizeCfgVal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
if queueSize <= 0 {
|
if queueSize <= 0 {
|
||||||
return cfg, errors.New("invalid queue_size value")
|
return cfg, errInvalidQueueSize
|
||||||
}
|
}
|
||||||
|
cfg.HTTP[k] = http.Config{
|
||||||
cfg.AuditWebhook[target] = http.Config{
|
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Endpoint: getCfgVal(EnvAuditWebhookEndpoint, target, ""),
|
Endpoint: url,
|
||||||
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, target, ""),
|
AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, k, kv.Get(AuthToken)),
|
||||||
ClientCert: clientCert,
|
ClientCert: clientCert,
|
||||||
ClientKey: clientKey,
|
ClientKey: clientKey,
|
||||||
|
Proxy: getCfgVal(EnvLoggerWebhookProxy, k, kv.Get(Proxy)),
|
||||||
QueueSize: queueSize,
|
QueueSize: queueSize,
|
||||||
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, target, ""),
|
QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)),
|
||||||
Name: auditTargetNamePrefix + target,
|
Name: loggerTargetNamePrefix + k,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
for starget, kv := range scfg[config.AuditWebhookSubSys] {
|
func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
||||||
if l, ok := cfg.AuditWebhook[starget]; ok && l.Enabled {
|
for k, kv := range config.Merge(scfg[config.AuditWebhookSubSys], EnvAuditWebhookEnable, DefaultAuditWebhookKVS) {
|
||||||
// Ignore this audit config since another target
|
if v, ok := cfg.AuditWebhook[k]; ok && v.Enabled {
|
||||||
// with the same name is already loaded and enabled
|
// This target is already enabled using the
|
||||||
// in the shell environment.
|
// legacy environment variables, ignore.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
subSysTarget := config.AuditWebhookSubSys
|
subSysTarget := config.AuditWebhookSubSys
|
||||||
if starget != config.Default {
|
if k != config.Default {
|
||||||
subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + starget
|
subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + k
|
||||||
}
|
}
|
||||||
if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditWebhookKVS); err != nil {
|
if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditWebhookKVS); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
enabled, err := config.ParseBool(kv.Get(config.Enable))
|
enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, k, kv.Get(config.Enable)))
|
||||||
if err != nil {
|
if err != nil || !enable {
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
if !enabled {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey))
|
var url *xnet.URL
|
||||||
|
endpoint := getCfgVal(EnvAuditWebhookEndpoint, k, kv.Get(Endpoint))
|
||||||
|
url, err = xnet.ParseHTTPURL(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
queueSize, err := strconv.Atoi(kv.Get(QueueSize))
|
clientCert := getCfgVal(EnvAuditWebhookClientCert, k, kv.Get(ClientCert))
|
||||||
|
clientKey := getCfgVal(EnvAuditWebhookClientKey, k, kv.Get(ClientKey))
|
||||||
|
err = config.EnsureCertAndKey(clientCert, clientKey)
|
||||||
|
if err != nil {
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, k, kv.Get(QueueSize))
|
||||||
|
queueSize, err := strconv.Atoi(queueSizeCfgVal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
if queueSize <= 0 {
|
if queueSize <= 0 {
|
||||||
return cfg, errors.New("invalid queue_size value")
|
return cfg, errInvalidQueueSize
|
||||||
}
|
}
|
||||||
cfg.AuditWebhook[starget] = http.Config{
|
cfg.AuditWebhook[k] = http.Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Endpoint: kv.Get(Endpoint),
|
Endpoint: url,
|
||||||
AuthToken: kv.Get(AuthToken),
|
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)),
|
||||||
ClientCert: kv.Get(ClientCert),
|
ClientCert: clientCert,
|
||||||
ClientKey: kv.Get(ClientKey),
|
ClientKey: clientKey,
|
||||||
QueueSize: queueSize,
|
QueueSize: queueSize,
|
||||||
QueueDir: kv.Get(QueueDir),
|
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)),
|
||||||
Name: auditTargetNamePrefix + starget,
|
Name: auditTargetNamePrefix + k,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system
|
// LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system
|
||||||
func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err error) {
|
func LookupConfigForSubSys(ctx context.Context, scfg config.Config, subSys string) (cfg Config, err error) {
|
||||||
switch subSys {
|
switch subSys {
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
cfg = lookupLegacyConfigForSubSys(config.LoggerWebhookSubSys)
|
cfg = lookupLegacyConfigForSubSys(ctx, config.LoggerWebhookSubSys)
|
||||||
if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil {
|
if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
case config.AuditWebhookSubSys:
|
case config.AuditWebhookSubSys:
|
||||||
cfg = lookupLegacyConfigForSubSys(config.AuditWebhookSubSys)
|
cfg = lookupLegacyConfigForSubSys(ctx, config.AuditWebhookSubSys)
|
||||||
if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil {
|
if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
|
@ -605,8 +525,8 @@ func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err e
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateSubSysConfig - validates logger related config of given sub-system
|
// ValidateSubSysConfig - validates logger related config of given sub-system
|
||||||
func ValidateSubSysConfig(scfg config.Config, subSys string) error {
|
func ValidateSubSysConfig(ctx context.Context, scfg config.Config, subSys string) error {
|
||||||
// Lookup for legacy environment variables first
|
// Lookup for legacy environment variables first
|
||||||
_, err := LookupConfigForSubSys(scfg, subSys)
|
_, err := LookupConfigForSubSys(ctx, scfg, subSys)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ func SetLoggerHTTPAudit(scfg config.Config, k string, args http.Config) {
|
||||||
},
|
},
|
||||||
config.KV{
|
config.KV{
|
||||||
Key: Endpoint,
|
Key: Endpoint,
|
||||||
Value: args.Endpoint,
|
Value: args.Endpoint.String(),
|
||||||
},
|
},
|
||||||
config.KV{
|
config.KV{
|
||||||
Key: AuthToken,
|
Key: AuthToken,
|
||||||
|
@ -64,7 +64,7 @@ func SetLoggerHTTP(scfg config.Config, k string, args http.Config) {
|
||||||
},
|
},
|
||||||
config.KV{
|
config.KV{
|
||||||
Key: Endpoint,
|
Key: Endpoint,
|
||||||
Value: args.Endpoint,
|
Value: args.Endpoint.String(),
|
||||||
},
|
},
|
||||||
config.KV{
|
config.KV{
|
||||||
Key: AuthToken,
|
Key: AuthToken,
|
||||||
|
|
|
@ -41,7 +41,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Timeout for the webhook http call
|
// Timeout for the webhook http call
|
||||||
webhookCallTimeout = 5 * time.Second
|
webhookCallTimeout = 3 * time.Second
|
||||||
|
|
||||||
// maxWorkers is the maximum number of concurrent http loggers
|
// maxWorkers is the maximum number of concurrent http loggers
|
||||||
maxWorkers = 16
|
maxWorkers = 16
|
||||||
|
@ -61,7 +61,7 @@ type Config struct {
|
||||||
Enabled bool `json:"enabled"`
|
Enabled bool `json:"enabled"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
UserAgent string `json:"userAgent"`
|
UserAgent string `json:"userAgent"`
|
||||||
Endpoint string `json:"endpoint"`
|
Endpoint *xnet.URL `json:"endpoint"`
|
||||||
AuthToken string `json:"authToken"`
|
AuthToken string `json:"authToken"`
|
||||||
ClientCert string `json:"clientCert"`
|
ClientCert string `json:"clientCert"`
|
||||||
ClientKey string `json:"clientKey"`
|
ClientKey string `json:"clientKey"`
|
||||||
|
@ -119,7 +119,7 @@ func (h *Target) Name() string {
|
||||||
|
|
||||||
// Endpoint returns the backend endpoint
|
// Endpoint returns the backend endpoint
|
||||||
func (h *Target) Endpoint() string {
|
func (h *Target) Endpoint() string {
|
||||||
return h.config.Endpoint
|
return h.config.Endpoint.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Target) String() string {
|
func (h *Target) String() string {
|
||||||
|
@ -128,8 +128,8 @@ func (h *Target) String() string {
|
||||||
|
|
||||||
// IsOnline returns true if the target is reachable.
|
// IsOnline returns true if the target is reachable.
|
||||||
func (h *Target) IsOnline(ctx context.Context) bool {
|
func (h *Target) IsOnline(ctx context.Context) bool {
|
||||||
if err := h.checkAlive(ctx); err != nil {
|
if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil {
|
||||||
return !xnet.IsNetworkOrHostDown(err, false)
|
return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -148,11 +148,6 @@ func (h *Target) Stats() types.TargetStats {
|
||||||
return stats
|
return stats
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will check if we can reach the remote.
|
|
||||||
func (h *Target) checkAlive(ctx context.Context) (err error) {
|
|
||||||
return h.send(ctx, []byte(`{}`), webhookCallTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init validate and initialize the http target
|
// Init validate and initialize the http target
|
||||||
func (h *Target) Init(ctx context.Context) (err error) {
|
func (h *Target) Init(ctx context.Context) (err error) {
|
||||||
if h.config.QueueDir != "" {
|
if h.config.QueueDir != "" {
|
||||||
|
@ -224,9 +219,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||||
h.config.Endpoint, bytes.NewReader(payload))
|
h.Endpoint(), bytes.NewReader(payload))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid configuration for '%s'; %v", h.config.Endpoint, err)
|
return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err)
|
||||||
}
|
}
|
||||||
req.Header.Set(xhttp.ContentType, "application/json")
|
req.Header.Set(xhttp.ContentType, "application/json")
|
||||||
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
|
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
|
||||||
|
@ -242,7 +237,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
|
||||||
|
|
||||||
resp, err := h.client.Do(req)
|
resp, err := h.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err)
|
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drain any response.
|
// Drain any response.
|
||||||
|
@ -253,9 +248,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
|
||||||
// accepted HTTP status codes.
|
// accepted HTTP status codes.
|
||||||
return nil
|
return nil
|
||||||
case http.StatusForbidden:
|
case http.StatusForbidden:
|
||||||
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status)
|
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status)
|
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,7 +277,7 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) {
|
||||||
}
|
}
|
||||||
tries++
|
tries++
|
||||||
if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil {
|
if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil {
|
||||||
h.config.LogOnce(ctx, err, h.config.Endpoint)
|
h.config.LogOnce(ctx, err, h.Endpoint())
|
||||||
atomic.AddInt64(&h.failedMessages, 1)
|
atomic.AddInt64(&h.failedMessages, 1)
|
||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue