diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index b5f3f4e9b..35697d050 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net/http" "net/url" "strings" @@ -39,7 +40,13 @@ const ( webhookCallTimeout = 5 * time.Second // maxWorkers is the maximum number of concurrent operations. - maxWorkers = 8 + maxWorkers = 16 +) + +const ( + statusOffline = iota + statusOnline + statusClosed ) // Config http logger target @@ -67,20 +74,24 @@ type Config struct { type Target struct { totalMessages int64 failedMessages int64 + status int32 // Worker control workers int64 workerStartMu sync.Mutex lastStarted time.Time - wg sync.WaitGroup - doneCh chan struct{} + wg sync.WaitGroup - // Channel of log entries - logCh chan interface{} + // Channel of log entries. + // Reading logCh must hold read lock on logChMu (to avoid read race) + // Sending a value on logCh must hold read lock on logChMu (to avoid closing) + logCh chan interface{} + logChMu sync.RWMutex - // is the target online? - online bool + // If the first init fails, this starts a goroutine that + // will attempt to establish the connection. + revive sync.Once config Config client *http.Client @@ -97,70 +108,104 @@ func (h *Target) String() string { // IsOnline returns true if the initialization was successful func (h *Target) IsOnline() bool { - return h.online + return atomic.LoadInt32(&h.status) == statusOnline } // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { - return types.TargetStats{ + h.logChMu.RLock() + logCh := h.logCh + h.logChMu.RUnlock() + stats := types.TargetStats{ TotalMessages: atomic.LoadInt64(&h.totalMessages), FailedMessages: atomic.LoadInt64(&h.failedMessages), - QueueLength: len(h.logCh), + QueueLength: len(logCh), } + + return stats } // Init validate and initialize the http target -func (h *Target) Init() error { - ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`)) - if err != nil { - return err +func (h *Target) Init() (err error) { + switch atomic.LoadInt32(&h.status) { + case statusOnline: + return nil + case statusClosed: + return errors.New("target is closed") } - req.Header.Set(xhttp.ContentType, "application/json") + // This will check if we can reach the remote. + checkAlive := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) + defer cancel() - // Set user-agent to indicate MinIO release - // version to the configured log endpoint - req.Header.Set("User-Agent", h.config.UserAgent) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`)) + if err != nil { + return err + } - if h.config.AuthToken != "" { - req.Header.Set("Authorization", h.config.AuthToken) - } + req.Header.Set(xhttp.ContentType, "application/json") - // If proxy available, set the same - if h.config.Proxy != "" { - proxyURL, _ := url.Parse(h.config.Proxy) - transport := h.config.Transport - ctransport := transport.(*http.Transport).Clone() - ctransport.Proxy = http.ProxyURL(proxyURL) - h.config.Transport = ctransport - } + // Set user-agent to indicate MinIO release + // version to the configured log endpoint + req.Header.Set("User-Agent", h.config.UserAgent) - client := http.Client{Transport: h.config.Transport} - resp, err := client.Do(req) - if err != nil { - return err - } - h.client = &client + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) + } - // Drain any response. - xhttp.DrainBody(resp.Body) + resp, err := h.client.Do(req) + if err != nil { + return err + } + // Drain any response. + xhttp.DrainBody(resp.Body) - if !acceptedResponseStatusCode(resp.StatusCode) { - if resp.StatusCode == http.StatusForbidden { - return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", + if !acceptedResponseStatusCode(resp.StatusCode) { + if resp.StatusCode == http.StatusForbidden { + return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", + h.config.Endpoint, resp.Status) + } + return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status) } - return fmt.Errorf("%s returned '%s', please check your endpoint configuration", - h.config.Endpoint, resp.Status) + return nil } - h.lastStarted = time.Now() - h.online = true - atomic.AddInt64(&h.workers, 1) - go h.startHTTPLogger() + err = checkAlive() + if err != nil { + // Start a goroutine that will continue to check if we can reach + h.revive.Do(func() { + go func() { + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + if atomic.LoadInt32(&h.status) != statusOffline { + return + } + if err := checkAlive(); err == nil { + // We are online. + if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { + h.workerStartMu.Lock() + h.lastStarted = time.Now() + h.workerStartMu.Unlock() + atomic.AddInt64(&h.workers, 1) + go h.startHTTPLogger() + } + return + } + } + }() + }) + return err + } + + if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { + h.workerStartMu.Lock() + h.lastStarted = time.Now() + h.workerStartMu.Unlock() + go h.startHTTPLogger() + } return nil } @@ -178,38 +223,56 @@ func (h *Target) logEntry(entry interface{}) { return } - ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, - h.config.Endpoint, bytes.NewReader(logJSON)) - if err != nil { - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - atomic.AddInt64(&h.failedMessages, 1) - return - } - req.Header.Set(xhttp.ContentType, "application/json") - req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) - req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) + tries := 0 + for { + if tries > 0 { + if tries >= 10 || atomic.LoadInt32(&h.status) == statusClosed { + // Don't retry when closing... + return + } + // sleep = (tries+2) ^ 2 milliseconds. + sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond + if sleep > time.Second { + sleep = time.Second + } + time.Sleep(sleep) + } + tries++ + ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.config.Endpoint, bytes.NewReader(logJSON)) + if err != nil { + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) + atomic.AddInt64(&h.failedMessages, 1) + continue + } + req.Header.Set(xhttp.ContentType, "application/json") + req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) + req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) - // Set user-agent to indicate MinIO release - // version to the configured log endpoint - req.Header.Set("User-Agent", h.config.UserAgent) + // Set user-agent to indicate MinIO release + // version to the configured log endpoint + req.Header.Set("User-Agent", h.config.UserAgent) - if h.config.AuthToken != "" { - req.Header.Set("Authorization", h.config.AuthToken) - } + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) + } - resp, err := h.client.Do(req) - if err != nil { - atomic.AddInt64(&h.failedMessages, 1) - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - return - } + resp, err := h.client.Do(req) + if err != nil { + atomic.AddInt64(&h.failedMessages, 1) + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) + continue + } - // Drain any response. - xhttp.DrainBody(resp.Body) + // Drain any response. + xhttp.DrainBody(resp.Body) - if !acceptedResponseStatusCode(resp.StatusCode) { + if acceptedResponseStatusCode(resp.StatusCode) { + return + } + // Log failure, retry atomic.AddInt64(&h.failedMessages, 1) switch resp.StatusCode { case http.StatusForbidden: @@ -221,25 +284,25 @@ func (h *Target) logEntry(entry interface{}) { } func (h *Target) startHTTPLogger() { - // Create a routine which sends json logs received - // from an internal channel. - h.wg.Add(1) - go func() { - defer func() { - h.wg.Done() - atomic.AddInt64(&h.workers, -1) - }() + h.logChMu.RLock() + logCh := h.logCh + if logCh != nil { + // We are not allowed to add when logCh is nil + h.wg.Add(1) + defer h.wg.Done() + } + h.logChMu.RUnlock() - for { - select { - case entry := <-h.logCh: - atomic.AddInt64(&h.totalMessages, 1) - h.logEntry(entry) - case <-h.doneCh: - return - } - } - }() + defer atomic.AddInt64(&h.workers, -1) + + if logCh == nil { + return + } + // Send messages until channel is closed. + for entry := range logCh { + atomic.AddInt64(&h.totalMessages, 1) + h.logEntry(entry) + } } // New initializes a new logger target which @@ -247,30 +310,43 @@ func (h *Target) startHTTPLogger() { func New(config Config) *Target { h := &Target{ logCh: make(chan interface{}, config.QueueSize), - doneCh: make(chan struct{}), config: config, - online: false, + status: statusOffline, } + // If proxy available, set the same + if h.config.Proxy != "" { + proxyURL, _ := url.Parse(h.config.Proxy) + transport := h.config.Transport + ctransport := transport.(*http.Transport).Clone() + ctransport.Proxy = http.ProxyURL(proxyURL) + h.config.Transport = ctransport + } + h.client = &http.Client{Transport: h.config.Transport} + return h } // Send log message 'e' to http target. +// If servers are offline messages are queued until queue is full. +// If Cancel has been called the message is ignored. func (h *Target) Send(entry interface{}) error { - if !h.online { + if atomic.LoadInt32(&h.status) == statusClosed { return nil } - - select { - case <-h.doneCh: + h.logChMu.RLock() + defer h.logChMu.RUnlock() + if h.logCh == nil { + // We are closing... return nil - default: } - select { - case <-h.doneCh: case h.logCh <- entry: default: + // Drop messages until we are online. + if !h.IsOnline() { + return errors.New("log buffer full and remote offline") + } nWorkers := atomic.LoadInt64(&h.workers) if nWorkers < maxWorkers { // Only have one try to start at the same time. @@ -284,11 +360,7 @@ func (h *Target) Send(entry interface{}) error { go h.startHTTPLogger() } } - // Block to send - select { - case <-h.doneCh: - case h.logCh <- entry: - } + h.logCh <- entry return nil } // log channel is full, do not wait and return @@ -301,10 +373,22 @@ func (h *Target) Send(entry interface{}) error { return nil } -// Cancel - cancels the target +// Cancel - cancels the target. +// All queued messages are flushed and the function returns afterwards. +// All messages sent to the target after this function has been called will be dropped. func (h *Target) Cancel() { - close(h.doneCh) + atomic.StoreInt32(&h.status, statusClosed) + + // Set logch to nil and close it. + // This will block all Send operations, + // and finish the existing ones. + // All future ones will be discarded. + h.logChMu.Lock() close(h.logCh) + h.logCh = nil + h.logChMu.Unlock() + + // Wait for messages to be sent... h.wg.Wait() }