configure audit queue_size

This commit is contained in:
Harshavardhana 2021-12-02 21:24:12 -08:00 committed by Minio Trusted
parent 7e8767c1c1
commit ec42715219
3 changed files with 43 additions and 6 deletions

View File

@ -506,6 +506,7 @@ func lookupConfigs(s config.Config, setDriveCount int) {
http.WithTargetName(k),
http.WithEndpoint(l.Endpoint),
http.WithAuthToken(l.AuthToken),
http.WithQueueSize(l.QueueSize),
http.WithUserAgent(loggerUserAgent),
http.WithLogKind(string(logger.All)),
http.WithTransport(NewGatewayHTTPTransport()),
@ -524,6 +525,7 @@ func lookupConfigs(s config.Config, setDriveCount int) {
http.WithTargetName(k),
http.WithEndpoint(l.Endpoint),
http.WithAuthToken(l.AuthToken),
http.WithQueueSize(l.QueueSize),
http.WithUserAgent(loggerUserAgent),
http.WithLogKind(string(logger.All)),
http.WithTransport(NewGatewayHTTPTransport()),

View File

@ -17,6 +17,7 @@
package logger
import (
"strconv"
"strings"
"github.com/minio/minio/cmd/config"
@ -33,6 +34,7 @@ type HTTP struct {
Enabled bool `json:"enabled"`
Endpoint string `json:"endpoint"`
AuthToken string `json:"authToken"`
QueueSize int `json:"queueSize"`
}
// Config console and http logger targets
@ -49,10 +51,12 @@ const (
EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE"
EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT"
EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE"
EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN"
EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE"
EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT"
EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE"
EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN"
)
@ -220,6 +224,14 @@ func LookupConfig(scfg config.Config) (Config, error) {
if err != nil || !enable {
continue
}
queueSizeEnv := EnvLoggerWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
if err != nil {
continue
}
endpointEnv := EnvLoggerWebhookEndpoint
if target != config.Default {
endpointEnv = EnvLoggerWebhookEndpoint + config.Default + target
@ -232,6 +244,7 @@ func LookupConfig(scfg config.Config) (Config, error) {
Enabled: true,
Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""),
QueueSize: queueSize,
}
}
@ -249,6 +262,14 @@ func LookupConfig(scfg config.Config) (Config, error) {
if err != nil || !enable {
continue
}
queueSizeEnv := EnvAuditWebhookQueueSize
if target != config.Default {
queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target
}
queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000"))
if err != nil {
continue
}
endpointEnv := EnvAuditWebhookEndpoint
if target != config.Default {
endpointEnv = EnvAuditWebhookEndpoint + config.Default + target
@ -261,6 +282,7 @@ func LookupConfig(scfg config.Config) (Config, error) {
Enabled: true,
Endpoint: env.Get(endpointEnv, ""),
AuthToken: env.Get(authTokenEnv, ""),
QueueSize: queueSize,
}
}
@ -289,6 +311,7 @@ func LookupConfig(scfg config.Config) (Config, error) {
Enabled: true,
Endpoint: kv.Get(Endpoint),
AuthToken: kv.Get(AuthToken),
QueueSize: 100000,
}
}
@ -317,6 +340,7 @@ func LookupConfig(scfg config.Config) (Config, error) {
Enabled: true,
Endpoint: kv.Get(Endpoint),
AuthToken: kv.Get(AuthToken),
QueueSize: 100000,
}
}

View File

@ -61,7 +61,7 @@ func (h *Target) String() string {
// Validate validate the http target
func (h *Target) Validate() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.endpoint, strings.NewReader(`{}`))
@ -107,6 +107,7 @@ func (h *Target) startHTTPLogger() {
for entry := range h.logCh {
logJSON, err := json.Marshal(&entry)
if err != nil {
logger.LogIf(context.Background(), fmt.Errorf("failed to marshal %v: %v", entry, err))
continue
}
@ -114,6 +115,8 @@ func (h *Target) startHTTPLogger() {
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.endpoint, bytes.NewReader(logJSON))
if err != nil {
logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration",
h.endpoint, err))
cancel()
continue
}
@ -130,7 +133,7 @@ func (h *Target) startHTTPLogger() {
resp, err := h.client.Do(req)
cancel()
if err != nil {
logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration\n",
logger.LogIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration",
h.endpoint, err))
continue
}
@ -199,12 +202,18 @@ func WithTransport(transport *http.Transport) Option {
}
}
// WithQueueSize adds a custom queue size to control the
// in-flux of requeusts
func WithQueueSize(size int) Option {
return func(t *Target) {
t.logCh = make(chan interface{}, size)
}
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(opts ...Option) *Target {
h := &Target{
logCh: make(chan interface{}, 10000),
}
h := &Target{}
// Loop through each option
for _, opt := range opts {
@ -226,9 +235,11 @@ func (h *Target) Send(entry interface{}, errKind string) error {
select {
case h.logCh <- entry:
default:
err := errors.New("log buffer full")
logger.LogIf(context.Background(), fmt.Errorf("audit event lost %v: %v", entry, err))
// log channel is full, do not wait and return
// an error immediately to the caller
return errors.New("log buffer full")
return err
}
return nil