609 lines
15 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
2022-11-10 19:20:21 +01:00
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/v3/net"
"github.com/valyala/bytebufferpool"
)
2022-11-10 19:20:21 +01:00
const (
// Timeout for the webhook http call
webhookCallTimeout = 3 * time.Second
2022-11-10 19:20:21 +01:00
// maxWorkers is the maximum number of concurrent http loggers
2023-03-21 10:50:40 -07:00
maxWorkers = 16
// maxWorkers is the maximum number of concurrent batch http loggers
maxWorkersWithBatchEvents = 4
// the suffix for the configured queue dir where the logs will be persisted.
httpLoggerExtension = ".http.log"
2023-03-21 10:50:40 -07:00
)
const (
statusOffline = iota
statusOnline
statusClosed
2022-11-10 19:20:21 +01:00
)
var (
logChBuffers = make(map[string]chan interface{})
logChLock = sync.Mutex{}
)
// Config http logger target
type Config struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
UserAgent string `json:"userAgent"`
Endpoint *xnet.URL `json:"endpoint"`
AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
BatchSize int `json:"batchSize"`
QueueSize int `json:"queueSize"`
QueueDir string `json:"queueDir"`
Proxy string `json:"string"`
Transport http.RoundTripper `json:"-"`
// Custom logger
LogOnceIf func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
}
// Target implements logger.Target and sends the json
// format of a log entry to the configured http endpoint.
// An internal buffer of logs is maintained but when the
// buffer is full, new logs are just ignored and an error
// is returned to the caller.
type Target struct {
totalMessages atomic.Int64
failedMessages atomic.Int64
status atomic.Int32
2022-11-10 19:20:21 +01:00
// Worker control
workers atomic.Int64
maxWorkers int64
// workerStartMu sync.Mutex
lastStarted time.Time
2022-11-10 19:20:21 +01:00
2023-03-21 10:50:40 -07:00
wg sync.WaitGroup
2023-03-21 10:50:40 -07:00
// Channel of log entries.
// Reading logCh must hold read lock on logChMu (to avoid read race)
// Sending a value on logCh must hold read lock on logChMu (to avoid closing)
logCh chan interface{}
logChMu sync.RWMutex
// If this webhook is being re-configured we will
// assign the new webhook target to this field.
// The Send() method will then re-direct entries
// to the new target when the current one
// has been set to status "statusClosed".
// Once the glogal target slice has been migrated
// the current target will stop receiving entries.
migrateTarget *Target
// Number of events per HTTP send to webhook target
// this is ideally useful only if your endpoint can
// support reading multiple events on a stream for example
// like : Splunk HTTP Event collector, if you are unsure
// set this to '1'.
batchSize int
payloadType string
// store to persist and replay the logs to the target
// to avoid missing events when the target is down.
store store.Store[interface{}]
storeCtxCancel context.CancelFunc
initQueueOnce once.Init
config Config
2022-11-10 19:20:21 +01:00
client *http.Client
}
// Name returns the name of the target
func (h *Target) Name() string {
return "minio-http-" + h.config.Name
}
// Type - returns type of the target
func (h *Target) Type() types.TargetType {
return types.TargetHTTP
}
// Endpoint returns the backend endpoint
func (h *Target) Endpoint() string {
return h.config.Endpoint.String()
}
func (h *Target) String() string {
return h.config.Name
}
// IsOnline returns true if the target is reachable using a cached value
func (h *Target) IsOnline(ctx context.Context) bool {
return h.status.Load() == statusOnline
}
2022-11-10 19:20:21 +01:00
// Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats {
2023-03-21 10:50:40 -07:00
h.logChMu.RLock()
queueLength := len(h.logCh)
2023-03-21 10:50:40 -07:00
h.logChMu.RUnlock()
stats := types.TargetStats{
TotalMessages: h.totalMessages.Load(),
FailedMessages: h.failedMessages.Load(),
QueueLength: queueLength,
2022-11-10 19:20:21 +01:00
}
2023-03-21 10:50:40 -07:00
return stats
2022-11-10 19:20:21 +01:00
}
// AssignMigrateTarget assigns a target
// which will eventually replace the current target.
func (h *Target) AssignMigrateTarget(migrateTgt *Target) {
h.migrateTarget = migrateTgt
}
// Init validate and initialize the http target
func (h *Target) Init(ctx context.Context) (err error) {
if h.config.QueueDir != "" {
return h.initQueueOnce.DoWithContext(ctx, h.initDiskStore)
}
return h.initQueueOnce.DoWithContext(ctx, h.initMemoryStore)
}
func (h *Target) initDiskStore(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
h.storeCtxCancel = cancel
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, true)
queueStore := store.NewQueueStore[interface{}](
filepath.Join(h.config.QueueDir, h.Name()),
uint64(h.config.QueueSize),
httpLoggerExtension,
)
if err := queueStore.Open(); err != nil {
return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
}
h.store = queueStore
store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnceIf)
return nil
}
func (h *Target) initMemoryStore(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
h.storeCtxCancel = cancel
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, true)
return nil
}
func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) {
defer func() {
if err != nil {
h.status.Store(statusOffline)
} else {
h.status.Store(statusOnline)
}
}()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.Endpoint(), bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err)
}
if payloadType != "" {
req.Header.Set(xhttp.ContentType, payloadType)
}
req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion)
req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID)
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
resp, err := h.client.Do(req)
if err != nil {
return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err)
}
// Drain any response.
xhttp.DrainBody(resp.Body)
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent:
// accepted HTTP status codes.
return nil
case http.StatusForbidden:
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status)
default:
return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status)
}
}
func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
h.logChMu.RLock()
if h.logCh == nil {
h.logChMu.RUnlock()
return
}
h.logChMu.RUnlock()
h.workers.Add(1)
defer h.workers.Add(-1)
h.wg.Add(1)
defer h.wg.Done()
entries := make([]interface{}, 0)
name := h.Name()
defer func() {
// re-load the global buffer pointer
// in case it was modified by a new target.
logChLock.Lock()
currentGlobalBuffer, ok := logChBuffers[name]
logChLock.Unlock()
if !ok {
return
}
for _, v := range entries {
select {
case currentGlobalBuffer <- v:
default:
}
}
if mainWorker {
drain:
for {
select {
case v, ok := <-h.logCh:
if !ok {
break drain
}
currentGlobalBuffer <- v
default:
break drain
}
}
}
}()
var entry interface{}
var ok bool
var err error
lastBatchProcess := time.Now()
buf := bytebufferpool.Get()
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
defer bytebufferpool.Put(buf)
isDirQueue := h.config.QueueDir != ""
// globalBuffer is always created or adjusted
// before this method is launched.
logChLock.Lock()
globalBuffer := logChBuffers[name]
logChLock.Unlock()
newTicker := time.NewTicker(time.Second)
isTick := false
for {
isTick = false
select {
case _ = <-newTicker.C:
isTick = true
case entry, _ = <-globalBuffer:
case entry, ok = <-h.logCh:
if !ok {
return
}
case <-ctx.Done():
return
}
if !isTick {
h.totalMessages.Add(1)
if !isDirQueue {
if err := enc.Encode(&entry); err != nil {
h.config.LogOnceIf(
ctx,
fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry),
h.Name(),
)
h.failedMessages.Add(1)
continue
}
}
entries = append(entries, entry)
}
if len(entries) != h.batchSize {
if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 {
continue
}
if h.batchSize > 1 {
// If we are doing batching, we should wait
// at least one second before sending.
// Even if there is nothing in the queue.
if time.Since(lastBatchProcess).Seconds() < 1 {
continue
}
}
}
lastBatchProcess = time.Now()
retry:
// If the channel reaches above half capacity
// we spawn more workers. The workers spawned
// from this main worker routine will exit
// once the channel drops below half capacity
// and when it's been at least 30 seconds since
// we launched a new worker.
if mainWorker && len(h.logCh) > cap(h.logCh)/2 {
nWorkers := h.workers.Load()
if nWorkers < h.maxWorkers {
if time.Since(h.lastStarted).Milliseconds() > 10 {
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, false)
}
}
}
if !isDirQueue {
err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout)
} else {
err = h.store.PutMultiple(entries)
}
if err != nil {
h.config.LogOnceIf(
context.Background(),
fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err),
name,
)
if errors.Is(err, context.Canceled) {
return
}
time.Sleep(3 * time.Second)
goto retry
}
entries = make([]interface{}, 0)
if !isDirQueue {
buf.Reset()
}
if !mainWorker && len(h.logCh) < cap(h.logCh)/2 {
if time.Since(h.lastStarted).Seconds() > 30 {
return
}
}
}
}
// CreateOrAdjustGlobalBuffer will create or adjust the global log entry buffers
// which are used to migrate log entries between old and new targets.
func CreateOrAdjustGlobalBuffer(currentTgt *Target, newTgt *Target) {
logChLock.Lock()
defer logChLock.Unlock()
requiredCap := currentTgt.config.QueueSize + (currentTgt.config.BatchSize * int(currentTgt.maxWorkers))
currentCap := 0
name := newTgt.Name()
currentBuff, ok := logChBuffers[name]
if !ok {
logChBuffers[name] = make(chan interface{}, requiredCap)
currentCap = requiredCap
} else {
currentCap = cap(currentBuff)
requiredCap += len(currentBuff)
}
if requiredCap > currentCap {
logChBuffers[name] = make(chan interface{}, requiredCap)
if len(currentBuff) > 0 {
drain:
for {
select {
case v, ok := <-currentBuff:
if !ok {
break drain
}
logChBuffers[newTgt.Name()] <- v
default:
break drain
}
}
}
2023-03-21 10:50:40 -07:00
}
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) (*Target, error) {
maxWorkers := maxWorkers
if config.BatchSize > 100 {
maxWorkers = maxWorkersWithBatchEvents
} else if config.BatchSize <= 0 {
config.BatchSize = 1
}
h := &Target{
logCh: make(chan interface{}, config.QueueSize),
config: config,
batchSize: config.BatchSize,
maxWorkers: int64(maxWorkers),
}
h.status.Store(statusOffline)
if config.BatchSize > 1 {
h.payloadType = ""
} else {
h.payloadType = "application/json"
2023-03-21 10:50:40 -07:00
}
// If proxy available, set the same
if h.config.Proxy != "" {
proxyURL, _ := url.Parse(h.config.Proxy)
transport := h.config.Transport
ctransport := transport.(*http.Transport).Clone()
ctransport.Proxy = http.ProxyURL(proxyURL)
h.config.Transport = ctransport
}
2023-03-21 10:50:40 -07:00
h.client = &http.Client{Transport: h.config.Transport}
return h, nil
}
// SendFromStore - reads the log from store and sends it to webhook.
func (h *Target) SendFromStore(key store.Key) (err error) {
var eventData []byte
eventData, err = h.store.GetRaw(key.Name)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
h.failedMessages.Add(1)
defer func() {
if err == nil {
h.failedMessages.Add(-1)
}
}()
if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil {
return err
}
// Delete the event from store.
return h.store.Del(key.Name)
}
// Send the log message 'entry' to the http target.
// Messages are queued in the disk if the store is enabled
2023-03-21 10:50:40 -07:00
// If Cancel has been called the message is ignored.
func (h *Target) Send(ctx context.Context, entry interface{}) error {
if h.status.Load() == statusClosed {
if h.migrateTarget != nil {
return h.migrateTarget.Send(ctx, entry)
}
return nil
}
2023-03-21 10:50:40 -07:00
h.logChMu.RLock()
defer h.logChMu.RUnlock()
if h.logCh == nil {
// We are closing...
return nil
}
retry:
select {
case h.logCh <- entry:
h.totalMessages.Add(1)
case <-ctx.Done():
// return error only for context timedout.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return ctx.Err()
}
return nil
default:
nWorkers := h.workers.Load()
if nWorkers < h.maxWorkers {
// Just sleep to avoid any possible hot-loops.
time.Sleep(50 * time.Millisecond)
goto retry
2022-11-10 19:20:21 +01:00
}
h.totalMessages.Add(1)
h.failedMessages.Add(1)
return errors.New("log buffer full")
}
return nil
}
2023-03-21 10:50:40 -07:00
// Cancel - cancels the target.
// All queued messages are flushed and the function returns afterwards.
// All messages sent to the target after this function has been called will be dropped.
func (h *Target) Cancel() {
h.status.Store(statusClosed)
h.storeCtxCancel()
2023-03-21 10:50:40 -07:00
// Wait for messages to be sent...
h.wg.Wait()
2023-03-21 10:50:40 -07:00
// Set logch to nil and close it.
// This will block all Send operations,
// and finish the existing ones.
// All future ones will be discarded.
h.logChMu.Lock()
xioutil.SafeClose(h.logCh)
2023-03-21 10:50:40 -07:00
h.logCh = nil
h.logChMu.Unlock()
}