Klaus Post 472c2d828c
Fix waitgroup add after wait on config reload (#14584)
Fix `panic: "POST /minio/peer/v21/signalservice?signal=2": sync: WaitGroup is reused before previous Wait has returned`

Log entries already on the channel would cause `logEntry` to increment the
 waitgroup when sending messages, after Cancel has been called.

Instead of tracking every single message, just check the send goroutine. Faster 
and safe, since it will not decrement until the channel is closed.

Regression from #14289
2022-03-19 09:15:45 -07:00

230 lines
6.3 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger/target/types"
)
// Timeout for the webhook http call
const webhookCallTimeout = 5 * time.Second
// Config http logger target
type Config struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
UserAgent string `json:"userAgent"`
Endpoint string `json:"endpoint"`
AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
QueueSize int `json:"queueSize"`
Transport http.RoundTripper `json:"-"`
// Custom logger
LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"`
}
// Target implements logger.Target and sends the json
// format of a log entry to the configured http endpoint.
// An internal buffer of logs is maintained but when the
// buffer is full, new logs are just ignored and an error
// is returned to the caller.
type Target struct {
status int32
wg sync.WaitGroup
// Channel of log entries
logCh chan interface{}
config Config
}
// Endpoint returns the backend endpoint
func (h *Target) Endpoint() string {
return h.config.Endpoint
}
func (h *Target) String() string {
return h.config.Name
}
// Init validate and initialize the http target
func (h *Target) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`))
if err != nil {
return err
}
req.Header.Set(xhttp.ContentType, "application/json")
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
if err != nil {
return err
}
// Drain any response.
xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) {
switch resp.StatusCode {
case http.StatusForbidden:
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set",
h.config.Endpoint, resp.Status)
}
return fmt.Errorf("%s returned '%s', please check your endpoint configuration",
h.config.Endpoint, resp.Status)
}
h.status = 1
go h.startHTTPLogger()
return nil
}
// Accepted HTTP Status Codes
var acceptedStatusCodeMap = map[int]bool{http.StatusOK: true, http.StatusCreated: true, http.StatusAccepted: true, http.StatusNoContent: true}
func acceptedResponseStatusCode(code int) bool {
return acceptedStatusCodeMap[code]
}
func (h *Target) logEntry(entry interface{}) {
logJSON, err := json.Marshal(&entry)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
cancel()
return
}
req.Header.Set(xhttp.ContentType, "application/json")
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
cancel()
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
return
}
// Drain any response.
xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) {
switch resp.StatusCode {
case http.StatusForbidden:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint)
default:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint)
}
}
}
func (h *Target) startHTTPLogger() {
// Create a routine which sends json logs received
// from an internal channel.
go func() {
h.wg.Add(1)
defer h.wg.Done()
for entry := range h.logCh {
h.logEntry(entry)
}
}()
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) *Target {
h := &Target{
logCh: make(chan interface{}, config.QueueSize),
config: config,
}
return h
}
// Send log message 'e' to http target.
func (h *Target) Send(entry interface{}, errKind string) error {
if atomic.LoadInt32(&h.status) == 0 {
// Channel was closed or used before init.
return nil
}
select {
case h.logCh <- entry:
default:
// log channel is full, do not wait and return
// an error immediately to the caller
return errors.New("log buffer full")
}
return nil
}
// Cancel - cancels the target
func (h *Target) Cancel() {
if atomic.CompareAndSwapInt32(&h.status, 1, 0) {
close(h.logCh)
}
h.wg.Wait()
}
// Type - returns type of the target
func (h *Target) Type() types.TargetType {
return types.TargetHTTP
}