Add lazy init of audit logger (#16842)

This commit is contained in:
Klaus Post 2023-03-21 10:50:40 -07:00 committed by GitHub
parent 0448728228
commit 11d04279c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,6 +23,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -39,7 +40,13 @@ const (
webhookCallTimeout = 5 * time.Second webhookCallTimeout = 5 * time.Second
// maxWorkers is the maximum number of concurrent operations. // maxWorkers is the maximum number of concurrent operations.
maxWorkers = 8 maxWorkers = 16
)
const (
statusOffline = iota
statusOnline
statusClosed
) )
// Config http logger target // Config http logger target
@ -67,20 +74,24 @@ type Config struct {
type Target struct { type Target struct {
totalMessages int64 totalMessages int64
failedMessages int64 failedMessages int64
status int32
// Worker control // Worker control
workers int64 workers int64
workerStartMu sync.Mutex workerStartMu sync.Mutex
lastStarted time.Time lastStarted time.Time
wg sync.WaitGroup wg sync.WaitGroup
doneCh chan struct{}
// Channel of log entries // Channel of log entries.
logCh chan interface{} // Reading logCh must hold read lock on logChMu (to avoid read race)
// Sending a value on logCh must hold read lock on logChMu (to avoid closing)
logCh chan interface{}
logChMu sync.RWMutex
// is the target online? // If the first init fails, this starts a goroutine that
online bool // will attempt to establish the connection.
revive sync.Once
config Config config Config
client *http.Client client *http.Client
@ -97,70 +108,104 @@ func (h *Target) String() string {
// IsOnline returns true if the initialization was successful // IsOnline returns true if the initialization was successful
func (h *Target) IsOnline() bool { func (h *Target) IsOnline() bool {
return h.online return atomic.LoadInt32(&h.status) == statusOnline
} }
// Stats returns the target statistics. // Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats { func (h *Target) Stats() types.TargetStats {
return types.TargetStats{ h.logChMu.RLock()
logCh := h.logCh
h.logChMu.RUnlock()
stats := types.TargetStats{
TotalMessages: atomic.LoadInt64(&h.totalMessages), TotalMessages: atomic.LoadInt64(&h.totalMessages),
FailedMessages: atomic.LoadInt64(&h.failedMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages),
QueueLength: len(h.logCh), QueueLength: len(logCh),
} }
return stats
} }
// Init validate and initialize the http target // Init validate and initialize the http target
func (h *Target) Init() error { func (h *Target) Init() (err error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) switch atomic.LoadInt32(&h.status) {
defer cancel() case statusOnline:
return nil
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`)) case statusClosed:
if err != nil { return errors.New("target is closed")
return err
} }
req.Header.Set(xhttp.ContentType, "application/json") // This will check if we can reach the remote.
checkAlive := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout)
defer cancel()
// Set user-agent to indicate MinIO release req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`))
// version to the configured log endpoint if err != nil {
req.Header.Set("User-Agent", h.config.UserAgent) return err
}
if h.config.AuthToken != "" { req.Header.Set(xhttp.ContentType, "application/json")
req.Header.Set("Authorization", h.config.AuthToken)
}
// If proxy available, set the same // Set user-agent to indicate MinIO release
if h.config.Proxy != "" { // version to the configured log endpoint
proxyURL, _ := url.Parse(h.config.Proxy) req.Header.Set("User-Agent", h.config.UserAgent)
transport := h.config.Transport
ctransport := transport.(*http.Transport).Clone()
ctransport.Proxy = http.ProxyURL(proxyURL)
h.config.Transport = ctransport
}
client := http.Client{Transport: h.config.Transport} if h.config.AuthToken != "" {
resp, err := client.Do(req) req.Header.Set("Authorization", h.config.AuthToken)
if err != nil { }
return err
}
h.client = &client
// Drain any response. resp, err := h.client.Do(req)
xhttp.DrainBody(resp.Body) if err != nil {
return err
}
// Drain any response.
xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) { if !acceptedResponseStatusCode(resp.StatusCode) {
if resp.StatusCode == http.StatusForbidden { if resp.StatusCode == http.StatusForbidden {
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set",
h.config.Endpoint, resp.Status)
}
return fmt.Errorf("%s returned '%s', please check your endpoint configuration",
h.config.Endpoint, resp.Status) h.config.Endpoint, resp.Status)
} }
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", return nil
h.config.Endpoint, resp.Status)
} }
h.lastStarted = time.Now() err = checkAlive()
h.online = true if err != nil {
atomic.AddInt64(&h.workers, 1) // Start a goroutine that will continue to check if we can reach
go h.startHTTPLogger() h.revive.Do(func() {
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
if atomic.LoadInt32(&h.status) != statusOffline {
return
}
if err := checkAlive(); err == nil {
// We are online.
if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) {
h.workerStartMu.Lock()
h.lastStarted = time.Now()
h.workerStartMu.Unlock()
atomic.AddInt64(&h.workers, 1)
go h.startHTTPLogger()
}
return
}
}
}()
})
return err
}
if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) {
h.workerStartMu.Lock()
h.lastStarted = time.Now()
h.workerStartMu.Unlock()
go h.startHTTPLogger()
}
return nil return nil
} }
@ -178,38 +223,56 @@ func (h *Target) logEntry(entry interface{}) {
return return
} }
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) tries := 0
defer cancel() for {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, if tries > 0 {
h.config.Endpoint, bytes.NewReader(logJSON)) if tries >= 10 || atomic.LoadInt32(&h.status) == statusClosed {
if err != nil { // Don't retry when closing...
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) return
atomic.AddInt64(&h.failedMessages, 1) }
return // sleep = (tries+2) ^ 2 milliseconds.
} sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond
req.Header.Set(xhttp.ContentType, "application/json") if sleep > time.Second {
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) sleep = time.Second
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) }
time.Sleep(sleep)
}
tries++
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
atomic.AddInt64(&h.failedMessages, 1)
continue
}
req.Header.Set(xhttp.ContentType, "application/json")
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
// Set user-agent to indicate MinIO release // Set user-agent to indicate MinIO release
// version to the configured log endpoint // version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent) req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" { if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken) req.Header.Set("Authorization", h.config.AuthToken)
} }
resp, err := h.client.Do(req) resp, err := h.client.Do(req)
if err != nil { if err != nil {
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
return continue
} }
// Drain any response. // Drain any response.
xhttp.DrainBody(resp.Body) xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) { if acceptedResponseStatusCode(resp.StatusCode) {
return
}
// Log failure, retry
atomic.AddInt64(&h.failedMessages, 1) atomic.AddInt64(&h.failedMessages, 1)
switch resp.StatusCode { switch resp.StatusCode {
case http.StatusForbidden: case http.StatusForbidden:
@ -221,25 +284,25 @@ func (h *Target) logEntry(entry interface{}) {
} }
func (h *Target) startHTTPLogger() { func (h *Target) startHTTPLogger() {
// Create a routine which sends json logs received h.logChMu.RLock()
// from an internal channel. logCh := h.logCh
h.wg.Add(1) if logCh != nil {
go func() { // We are not allowed to add when logCh is nil
defer func() { h.wg.Add(1)
h.wg.Done() defer h.wg.Done()
atomic.AddInt64(&h.workers, -1) }
}() h.logChMu.RUnlock()
for { defer atomic.AddInt64(&h.workers, -1)
select {
case entry := <-h.logCh: if logCh == nil {
atomic.AddInt64(&h.totalMessages, 1) return
h.logEntry(entry) }
case <-h.doneCh: // Send messages until channel is closed.
return for entry := range logCh {
} atomic.AddInt64(&h.totalMessages, 1)
} h.logEntry(entry)
}() }
} }
// New initializes a new logger target which // New initializes a new logger target which
@ -247,30 +310,43 @@ func (h *Target) startHTTPLogger() {
func New(config Config) *Target { func New(config Config) *Target {
h := &Target{ h := &Target{
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
doneCh: make(chan struct{}),
config: config, config: config,
online: false, status: statusOffline,
} }
// If proxy available, set the same
if h.config.Proxy != "" {
proxyURL, _ := url.Parse(h.config.Proxy)
transport := h.config.Transport
ctransport := transport.(*http.Transport).Clone()
ctransport.Proxy = http.ProxyURL(proxyURL)
h.config.Transport = ctransport
}
h.client = &http.Client{Transport: h.config.Transport}
return h return h
} }
// Send log message 'e' to http target. // Send log message 'e' to http target.
// If servers are offline messages are queued until queue is full.
// If Cancel has been called the message is ignored.
func (h *Target) Send(entry interface{}) error { func (h *Target) Send(entry interface{}) error {
if !h.online { if atomic.LoadInt32(&h.status) == statusClosed {
return nil return nil
} }
h.logChMu.RLock()
select { defer h.logChMu.RUnlock()
case <-h.doneCh: if h.logCh == nil {
// We are closing...
return nil return nil
default:
} }
select { select {
case <-h.doneCh:
case h.logCh <- entry: case h.logCh <- entry:
default: default:
// Drop messages until we are online.
if !h.IsOnline() {
return errors.New("log buffer full and remote offline")
}
nWorkers := atomic.LoadInt64(&h.workers) nWorkers := atomic.LoadInt64(&h.workers)
if nWorkers < maxWorkers { if nWorkers < maxWorkers {
// Only have one try to start at the same time. // Only have one try to start at the same time.
@ -284,11 +360,7 @@ func (h *Target) Send(entry interface{}) error {
go h.startHTTPLogger() go h.startHTTPLogger()
} }
} }
// Block to send h.logCh <- entry
select {
case <-h.doneCh:
case h.logCh <- entry:
}
return nil return nil
} }
// log channel is full, do not wait and return // log channel is full, do not wait and return
@ -301,10 +373,22 @@ func (h *Target) Send(entry interface{}) error {
return nil return nil
} }
// Cancel - cancels the target // Cancel - cancels the target.
// All queued messages are flushed and the function returns afterwards.
// All messages sent to the target after this function has been called will be dropped.
func (h *Target) Cancel() { func (h *Target) Cancel() {
close(h.doneCh) atomic.StoreInt32(&h.status, statusClosed)
// Set logch to nil and close it.
// This will block all Send operations,
// and finish the existing ones.
// All future ones will be discarded.
h.logChMu.Lock()
close(h.logCh) close(h.logCh)
h.logCh = nil
h.logChMu.Unlock()
// Wait for messages to be sent...
h.wg.Wait() h.wg.Wait()
} }