mirror of https://github.com/minio/minio.git
Add http_timeout to audit webhook configurations (#20421)
This commit is contained in:
parent
bc527eceda
commit
3bae73fb42
|
@ -51,6 +51,7 @@ const (
|
|||
MaxRetry = "max_retry"
|
||||
RetryInterval = "retry_interval"
|
||||
Proxy = "proxy"
|
||||
httpTimeout = "http_timeout"
|
||||
|
||||
KafkaBrokers = "brokers"
|
||||
KafkaTopic = "topic"
|
||||
|
@ -89,6 +90,7 @@ const (
|
|||
EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR"
|
||||
EnvAuditWebhookMaxRetry = "MINIO_AUDIT_WEBHOOK_MAX_RETRY"
|
||||
EnvAuditWebhookRetryInterval = "MINIO_AUDIT_WEBHOOK_RETRY_INTERVAL"
|
||||
EnvAuditWebhookHTTPTimeout = "MINIO_AUDIT_WEBHOOK_HTTP_TIMEOUT"
|
||||
|
||||
EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE"
|
||||
EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS"
|
||||
|
@ -162,6 +164,10 @@ var (
|
|||
Key: RetryInterval,
|
||||
Value: "3s",
|
||||
},
|
||||
config.KV{
|
||||
Key: httpTimeout,
|
||||
Value: "5s",
|
||||
},
|
||||
}
|
||||
|
||||
DefaultAuditWebhookKVS = config.KVS{
|
||||
|
@ -569,6 +575,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
|||
if maxRetry < 0 {
|
||||
return cfg, fmt.Errorf("invalid %s max_retry", maxRetryCfgVal)
|
||||
}
|
||||
|
||||
retryIntervalCfgVal := getCfgVal(EnvAuditWebhookRetryInterval, k, kv.Get(RetryInterval))
|
||||
retryInterval, err := time.ParseDuration(retryIntervalCfgVal)
|
||||
if err != nil {
|
||||
|
@ -577,18 +584,29 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
|||
if retryInterval > time.Minute {
|
||||
return cfg, fmt.Errorf("maximum allowed value for retry interval is '1m': %s", retryIntervalCfgVal)
|
||||
}
|
||||
|
||||
httpTimeoutCfgVal := getCfgVal(EnvAuditWebhookHTTPTimeout, k, kv.Get(httpTimeout))
|
||||
httpTimeout, err := time.ParseDuration(httpTimeoutCfgVal)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
if httpTimeout < time.Second {
|
||||
return cfg, fmt.Errorf("minimum value allowed for http_timeout is '1s': %s", httpTimeout)
|
||||
}
|
||||
|
||||
cfg.AuditWebhook[k] = http.Config{
|
||||
Enabled: true,
|
||||
Endpoint: url,
|
||||
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)),
|
||||
ClientCert: clientCert,
|
||||
ClientKey: clientKey,
|
||||
BatchSize: batchSize,
|
||||
QueueSize: queueSize,
|
||||
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)),
|
||||
MaxRetry: maxRetry,
|
||||
RetryIntvl: retryInterval,
|
||||
Name: auditTargetNamePrefix + k,
|
||||
HTTPTimeout: httpTimeout,
|
||||
Enabled: true,
|
||||
Endpoint: url,
|
||||
AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)),
|
||||
ClientCert: clientCert,
|
||||
ClientKey: clientKey,
|
||||
BatchSize: batchSize,
|
||||
QueueSize: queueSize,
|
||||
QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)),
|
||||
MaxRetry: maxRetry,
|
||||
RetryIntvl: retryInterval,
|
||||
Name: auditTargetNamePrefix + k,
|
||||
}
|
||||
}
|
||||
return cfg, nil
|
||||
|
|
|
@ -88,6 +88,12 @@ var (
|
|||
Optional: true,
|
||||
Type: "duration",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: httpTimeout,
|
||||
Description: `defines the maximum duration for each http request`,
|
||||
Optional: true,
|
||||
Type: "duration",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: config.Comment,
|
||||
Description: config.DefaultComment,
|
||||
|
@ -155,6 +161,12 @@ var (
|
|||
Optional: true,
|
||||
Type: "duration",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: httpTimeout,
|
||||
Description: `defines the maximum duration for each http request`,
|
||||
Optional: true,
|
||||
Type: "duration",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: config.Comment,
|
||||
Description: config.DefaultComment,
|
||||
|
|
|
@ -43,8 +43,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Timeout for the webhook http call
|
||||
webhookCallTimeout = 5 * time.Second
|
||||
|
||||
// maxWorkers is the maximum number of concurrent http loggers
|
||||
maxWorkers = 16
|
||||
|
@ -69,20 +67,21 @@ var (
|
|||
|
||||
// Config http logger target
|
||||
type Config struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
Name string `json:"name"`
|
||||
UserAgent string `json:"userAgent"`
|
||||
Endpoint *xnet.URL `json:"endpoint"`
|
||||
AuthToken string `json:"authToken"`
|
||||
ClientCert string `json:"clientCert"`
|
||||
ClientKey string `json:"clientKey"`
|
||||
BatchSize int `json:"batchSize"`
|
||||
QueueSize int `json:"queueSize"`
|
||||
QueueDir string `json:"queueDir"`
|
||||
MaxRetry int `json:"maxRetry"`
|
||||
RetryIntvl time.Duration `json:"retryInterval"`
|
||||
Proxy string `json:"string"`
|
||||
Transport http.RoundTripper `json:"-"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Name string `json:"name"`
|
||||
UserAgent string `json:"userAgent"`
|
||||
Endpoint *xnet.URL `json:"endpoint"`
|
||||
AuthToken string `json:"authToken"`
|
||||
ClientCert string `json:"clientCert"`
|
||||
ClientKey string `json:"clientKey"`
|
||||
BatchSize int `json:"batchSize"`
|
||||
QueueSize int `json:"queueSize"`
|
||||
QueueDir string `json:"queueDir"`
|
||||
MaxRetry int `json:"maxRetry"`
|
||||
RetryIntvl time.Duration `json:"retryInterval"`
|
||||
Proxy string `json:"string"`
|
||||
Transport http.RoundTripper `json:"-"`
|
||||
HTTPTimeout time.Duration `json:"httpTimeout"`
|
||||
|
||||
// Custom logger
|
||||
LogOnceIf func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
|
||||
|
@ -137,8 +136,9 @@ type Target struct {
|
|||
|
||||
initQueueOnce once.Init
|
||||
|
||||
config Config
|
||||
client *http.Client
|
||||
config Config
|
||||
client *http.Client
|
||||
httpTimeout time.Duration
|
||||
}
|
||||
|
||||
// Name returns the name of the target
|
||||
|
@ -429,7 +429,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
|
|||
|
||||
var err error
|
||||
if !isDirQueue {
|
||||
err = h.send(ctx, buf.Bytes(), count, h.payloadType, webhookCallTimeout)
|
||||
err = h.send(ctx, buf.Bytes(), count, h.payloadType, h.httpTimeout)
|
||||
} else {
|
||||
_, err = h.store.PutMultiple(entries)
|
||||
}
|
||||
|
@ -520,10 +520,11 @@ func New(config Config) (*Target, error) {
|
|||
}
|
||||
|
||||
h := &Target{
|
||||
logCh: make(chan interface{}, config.QueueSize),
|
||||
config: config,
|
||||
batchSize: config.BatchSize,
|
||||
maxWorkers: int64(maxWorkers),
|
||||
logCh: make(chan interface{}, config.QueueSize),
|
||||
config: config,
|
||||
batchSize: config.BatchSize,
|
||||
maxWorkers: int64(maxWorkers),
|
||||
httpTimeout: config.HTTPTimeout,
|
||||
}
|
||||
h.status.Store(statusOffline)
|
||||
|
||||
|
@ -566,7 +567,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := h.send(context.Background(), eventData, count, h.payloadType, webhookCallTimeout); err != nil {
|
||||
if err := h.send(context.Background(), eventData, count, h.payloadType, h.httpTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue