Check for only network errors in audit webhook for reachability (#17228)

This commit is contained in:
Praveen raj Mani 2023-05-17 23:40:33 +05:30 committed by GitHub
parent 876f51a708
commit 85912985b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 172 deletions

View File

@ -253,6 +253,13 @@ func NewConfig() Config {
return cfg
}
func getCfgVal(envName, key, defaultValue string) string {
if key != config.Default {
envName = envName + config.Default + key
}
return env.Get(envName, defaultValue)
}
func lookupLegacyConfigForSubSys(subSys string) Config {
cfg := NewConfig()
switch subSys {
@ -269,11 +276,7 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
// Load HTTP logger from the environment if found
for _, target := range loggerTargets {
endpointEnv := legacyEnvLoggerHTTPEndpoint
if target != config.Default {
endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target
}
endpoint := env.Get(endpointEnv, "")
endpoint := getCfgVal(legacyEnvLoggerHTTPEndpoint, target, "")
if endpoint == "" {
continue
}
@ -296,11 +299,7 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
}
for _, target := range loggerAuditTargets {
endpointEnv := legacyEnvAuditLoggerHTTPEndpoint
if target != config.Default {
endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target
}
endpoint := env.Get(endpointEnv, "")
endpoint := getCfgVal(legacyEnvAuditLoggerHTTPEndpoint, target, "")
if endpoint == "" {
continue
}
@ -316,11 +315,8 @@ func lookupLegacyConfigForSubSys(subSys string) Config {
func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
for k, kv := range config.Merge(scfg[config.AuditKafkaSubSys], EnvKafkaEnable, DefaultAuditKafkaKVS) {
enableEnv := EnvKafkaEnable
if k != config.Default {
enableEnv = enableEnv + config.Default + k
}
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
enabledCfgVal := getCfgVal(EnvKafkaEnable, k, kv.Get(config.Enable))
enabled, err := config.ParseBool(enabledCfgVal)
if err != nil {
return cfg, err
}
@ -328,11 +324,7 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
continue
}
var brokers []xnet.Host
brokersEnv := EnvKafkaBrokers
if k != config.Default {
brokersEnv = brokersEnv + config.Default + k
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers))
kafkaBrokers := getCfgVal(EnvKafkaBrokers, k, kv.Get(KafkaBrokers))
if len(kafkaBrokers) == 0 {
return cfg, config.Errorf("kafka 'brokers' cannot be empty")
}
@ -348,90 +340,35 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) {
return cfg, err
}
clientAuthEnv := EnvKafkaTLSClientAuth
if k != config.Default {
clientAuthEnv = clientAuthEnv + config.Default + k
}
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth)))
clientAuthCfgVal := getCfgVal(EnvKafkaTLSClientAuth, k, kv.Get(KafkaTLSClientAuth))
clientAuth, err := strconv.Atoi(clientAuthCfgVal)
if err != nil {
return cfg, err
}
topicEnv := EnvKafkaTopic
if k != config.Default {
topicEnv = topicEnv + config.Default + k
}
versionEnv := EnvKafkaVersion
if k != config.Default {
versionEnv = versionEnv + config.Default + k
}
kafkaArgs := kafka.Config{
Enabled: enabled,
Brokers: brokers,
Topic: env.Get(topicEnv, kv.Get(KafkaTopic)),
Version: env.Get(versionEnv, kv.Get(KafkaVersion)),
Topic: getCfgVal(EnvKafkaTopic, k, kv.Get(KafkaTopic)),
Version: getCfgVal(EnvKafkaVersion, k, kv.Get(KafkaVersion)),
}
tlsEnableEnv := EnvKafkaTLS
if k != config.Default {
tlsEnableEnv = tlsEnableEnv + config.Default + k
}
tlsSkipVerifyEnv := EnvKafkaTLSSkipVerify
if k != config.Default {
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
}
tlsClientTLSCertEnv := EnvKafkaClientTLSCert
if k != config.Default {
tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k
}
tlsClientTLSKeyEnv := EnvKafkaClientTLSKey
if k != config.Default {
tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k
}
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(KafkaTLS)) == config.EnableOn
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn
kafkaArgs.TLS.Enable = getCfgVal(EnvKafkaTLS, k, kv.Get(KafkaTLS)) == config.EnableOn
kafkaArgs.TLS.SkipVerify = getCfgVal(EnvKafkaTLSSkipVerify, k, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn
kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth)
kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(KafkaClientTLSCert))
kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(KafkaClientTLSKey))
kafkaArgs.TLS.ClientTLSCert = getCfgVal(EnvKafkaClientTLSCert, k, kv.Get(KafkaClientTLSCert))
kafkaArgs.TLS.ClientTLSKey = getCfgVal(EnvKafkaClientTLSKey, k, kv.Get(KafkaClientTLSKey))
saslEnableEnv := EnvKafkaSASLEnable
if k != config.Default {
saslEnableEnv = saslEnableEnv + config.Default + k
}
saslUsernameEnv := EnvKafkaSASLUsername
if k != config.Default {
saslUsernameEnv = saslUsernameEnv + config.Default + k
}
saslPasswordEnv := EnvKafkaSASLPassword
if k != config.Default {
saslPasswordEnv = saslPasswordEnv + config.Default + k
}
saslMechanismEnv := EnvKafkaSASLMechanism
if k != config.Default {
saslMechanismEnv = saslMechanismEnv + config.Default + k
}
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(KafkaSASLUsername))
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism))
kafkaArgs.SASL.Enable = getCfgVal(EnvKafkaSASLEnable, k, kv.Get(KafkaSASL)) == config.EnableOn
kafkaArgs.SASL.User = getCfgVal(EnvKafkaSASLUsername, k, kv.Get(KafkaSASLUsername))
kafkaArgs.SASL.Password = getCfgVal(EnvKafkaSASLPassword, k, kv.Get(KafkaSASLPassword))
kafkaArgs.SASL.Mechanism = getCfgVal(EnvKafkaSASLMechanism, k, kv.Get(KafkaSASLMechanism))
queueDirEnv := EnvKafkaQueueDir
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
kafkaArgs.QueueDir = env.Get(queueDirEnv, kv.Get(KafkaQueueDir))
kafkaArgs.QueueDir = getCfgVal(EnvKafkaQueueDir, k, kv.Get(KafkaQueueDir))
queueSizeEnv := EnvKafkaQueueSize
if k != config.Default {
queueSizeEnv = queueSizeEnv + config.Default + k
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, kv.Get(KafkaQueueSize)))
queueSizeCfgVal := getCfgVal(EnvKafkaQueueSize, k, kv.Get(KafkaQueueSize))
queueSize, err := strconv.Atoi(queueSizeCfgVal)
if err != nil {
return cfg, err
}
@ -464,59 +401,38 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
// legacy environment variables, ignore.
continue
}
enableEnv := EnvLoggerWebhookEnable
if target != config.Default {
enableEnv = EnvLoggerWebhookEnable + config.Default + target
}
enable, err := config.ParseBool(env.Get(enableEnv, ""))
enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, target, "")
enable, err := config.ParseBool(enableCfgVal)
if err != nil || !enable {
continue
}
endpointEnv := EnvLoggerWebhookEndpoint
if target != config.Default {
endpointEnv = EnvLoggerWebhookEndpoint + config.Default + target
}
authTokenEnv := EnvLoggerWebhookAuthToken
if target != config.Default {
authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target
}
clientCertEnv := EnvLoggerWebhookClientCert
if target != config.Default {
clientCertEnv = EnvLoggerWebhookClientCert + config.Default + target
}
clientKeyEnv := EnvLoggerWebhookClientKey
if target != config.Default {
clientKeyEnv = EnvLoggerWebhookClientKey + config.Default + target
}
err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, ""))
clientCert := getCfgVal(EnvLoggerWebhookClientCert, target, "")
clientKey := getCfgVal(EnvLoggerWebhookClientKey, target, "")
err = config.EnsureCertAndKey(clientCert, clientKey)
if err != nil {
return cfg, err
}
proxyEnv := EnvLoggerWebhookProxy
queueSizeEnv := EnvLoggerWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, target, "100000")
queueSize, err := strconv.Atoi(queueSizeCfgVal)
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
queueDirEnv := EnvLoggerWebhookQueueDir
if target != config.Default {
queueDirEnv = EnvLoggerWebhookQueueDir + config.Default + target
}
cfg.HTTP[target] = http.Config{
Enabled: true,
Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""),
ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""),
Proxy: env.Get(proxyEnv, ""),
Endpoint: getCfgVal(EnvLoggerWebhookEndpoint, target, ""),
AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, target, ""),
ClientCert: clientCert,
ClientKey: clientKey,
Proxy: getCfgVal(EnvLoggerWebhookProxy, target, ""),
QueueSize: queueSize,
QueueDir: env.Get(queueDirEnv, ""),
QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, target, ""),
Name: loggerTargetNamePrefix + target,
}
}
@ -586,57 +502,35 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
// legacy environment variables, ignore.
continue
}
enableEnv := EnvAuditWebhookEnable
if target != config.Default {
enableEnv = EnvAuditWebhookEnable + config.Default + target
}
enable, err := config.ParseBool(env.Get(enableEnv, ""))
enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, target, ""))
if err != nil || !enable {
continue
}
endpointEnv := EnvAuditWebhookEndpoint
if target != config.Default {
endpointEnv = EnvAuditWebhookEndpoint + config.Default + target
}
authTokenEnv := EnvAuditWebhookAuthToken
if target != config.Default {
authTokenEnv = EnvAuditWebhookAuthToken + config.Default + target
}
clientCertEnv := EnvAuditWebhookClientCert
if target != config.Default {
clientCertEnv = EnvAuditWebhookClientCert + config.Default + target
}
clientKeyEnv := EnvAuditWebhookClientKey
if target != config.Default {
clientKeyEnv = EnvAuditWebhookClientKey + config.Default + target
}
err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, ""))
clientCert := getCfgVal(EnvAuditWebhookClientCert, target, "")
clientKey := getCfgVal(EnvAuditWebhookClientKey, target, "")
err = config.EnsureCertAndKey(clientCert, clientKey)
if err != nil {
return cfg, err
}
queueSizeEnv := EnvAuditWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, target, "100000")
queueSize, err := strconv.Atoi(queueSizeCfgVal)
if err != nil {
return cfg, err
}
if queueSize <= 0 {
return cfg, errors.New("invalid queue_size value")
}
queueDirEnv := EnvAuditWebhookQueueDir
if target != config.Default {
queueDirEnv = EnvAuditWebhookQueueDir + config.Default + target
}
cfg.AuditWebhook[target] = http.Config{
Enabled: true,
Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""),
ClientCert: env.Get(clientCertEnv, ""),
ClientKey: env.Get(clientKeyEnv, ""),
Endpoint: getCfgVal(EnvAuditWebhookEndpoint, target, ""),
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, target, ""),
ClientCert: clientCert,
ClientKey: clientKey,
QueueSize: queueSize,
QueueDir: env.Get(queueDirEnv, ""),
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, target, ""),
Name: auditTargetNamePrefix + target,
}
}

View File

@ -127,7 +127,10 @@ func (h *Target) String() string {
// IsOnline returns true if the target is reachable.
func (h *Target) IsOnline(ctx context.Context) bool {
return h.isAlive(ctx) == nil
if err := h.checkAlive(ctx); err != nil {
return !xnet.IsNetworkOrHostDown(err, false)
}
return true
}
// Stats returns the target statistics.
@ -145,7 +148,7 @@ func (h *Target) Stats() types.TargetStats {
}
// This will check if we can reach the remote.
func (h *Target) isAlive(ctx context.Context) (err error) {
func (h *Target) checkAlive(ctx context.Context) (err error) {
return h.send(ctx, []byte(`{}`), 2*webhookCallTimeout)
}
@ -179,8 +182,7 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) {
return errors.New("target is closed")
}
err = h.isAlive(ctx)
if err != nil {
if !h.IsOnline(ctx) {
// Start a goroutine that will continue to check if we can reach
h.revive.Do(func() {
go func() {
@ -190,7 +192,7 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) {
if atomic.LoadInt32(&h.status) != statusOffline {
return
}
if err := h.isAlive(ctx); err == nil {
if h.IsOnline(ctx) {
// We are online.
if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) {
h.workerStartMu.Lock()
@ -223,7 +225,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err)
return fmt.Errorf("invalid configuration for '%s'; %v", h.config.Endpoint, err)
}
req.Header.Set(xhttp.ContentType, "application/json")
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)

View File

@ -54,7 +54,7 @@ type Store[I any] interface {
}
// replayItems - Reads the items from the store and replays.
func replayItems[I any](store Store[I], doneCh <-chan struct{}, logger logger, id string) <-chan string {
func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id string) <-chan string {
itemKeyCh := make(chan string)
go func() {
@ -66,7 +66,7 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, logger logger, i
for {
names, err := store.List()
if err != nil {
logger(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id)
log(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id)
} else {
for _, name := range names {
select {