Make logger webhook config dynamic (#14289)

It should not be required to restart the 
server after setting the logger webhook config.
This commit is contained in:
Shireesh Anjal 2022-02-18 00:41:15 +05:30 committed by GitHub
parent b29224f62f
commit 28f188e3ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 136 additions and 54 deletions

View File

@ -574,18 +574,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err))
}
for _, l := range loggerCfg.HTTP {
if l.Enabled {
l.LogOnce = logger.LogOnceIf
l.UserAgent = loggerUserAgent
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
// Enable http logging
if err = logger.AddTarget(http.New(l)); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize server logger HTTP target: %w", err))
}
}
}
for _, l := range loggerCfg.AuditWebhook {
if l.Enabled {
l.LogOnce = logger.LogOnceIf
@ -663,6 +651,25 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config
return fmt.Errorf("Unable to apply scanner config: %w", err)
}
// Logger webhook
loggerCfg, err := logger.LookupConfig(s)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
}
userAgent := getUserAgent(getMinioMode())
for n, l := range loggerCfg.HTTP {
if l.Enabled {
l.LogOnce = logger.LogOnceIf
l.UserAgent = userAgent
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
loggerCfg.HTTP[n] = l
}
}
err = logger.UpdateTargets(loggerCfg)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
}
// Apply configurations.
// We should not fail after this.
var setDriveCounts []int

View File

@ -150,6 +150,10 @@ func (sys *HTTPConsoleLoggerSys) Content() (logs []log.Entry) {
return
}
// Cancel - cancels the target
func (sys *HTTPConsoleLoggerSys) Cancel() {
}
// Send log message 'e' to console and publish to console
// log pubsub system
func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error {

View File

@ -55,6 +55,9 @@ func (t *testingLogger) Init() error {
return nil
}
func (t *testingLogger) Cancel() {
}
func (t *testingLogger) Send(entry interface{}, errKind string) error {
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -166,6 +166,7 @@ var SubSystemsDynamic = set.CreateStringSet(
ScannerSubSys,
HealSubSys,
SubnetSubSys,
LoggerWebhookSubSys,
)
// SubSystemsSingleTargets - subsystems which only support single target.

View File

@ -25,6 +25,8 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
xhttp "github.com/minio/minio/internal/http"
@ -55,6 +57,9 @@ type Config struct {
// buffer is full, new logs are just ignored and an error
// is returned to the caller.
type Target struct {
status int32
wg sync.WaitGroup
// Channel of log entries
logCh chan interface{}
@ -109,6 +114,7 @@ func (h *Target) Init() error {
h.config.Endpoint, resp.Status)
}
h.status = 1
go h.startHTTPLogger()
return nil
}
@ -120,53 +126,60 @@ func acceptedResponseStatusCode(code int) bool {
return acceptedStatusCodeMap[code]
}
func (h *Target) logEntry(entry interface{}) {
h.wg.Add(1)
defer h.wg.Done()
logJSON, err := json.Marshal(&entry)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
cancel()
return
}
req.Header.Set(xhttp.ContentType, "application/json")
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
cancel()
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
return
}
// Drain any response.
xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) {
switch resp.StatusCode {
case http.StatusForbidden:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint)
default:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint)
}
}
}
func (h *Target) startHTTPLogger() {
// Create a routine which sends json logs received
// from an internal channel.
go func() {
for entry := range h.logCh {
logJSON, err := json.Marshal(&entry)
if err != nil {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
cancel()
continue
}
req.Header.Set(xhttp.ContentType, "application/json")
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.config.UserAgent)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
cancel()
if err != nil {
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
continue
}
// Drain any response.
xhttp.DrainBody(resp.Body)
if !acceptedResponseStatusCode(resp.StatusCode) {
switch resp.StatusCode {
case http.StatusForbidden:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint)
default:
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint)
}
}
h.logEntry(entry)
}
}()
}
@ -184,6 +197,11 @@ func New(config Config) *Target {
// Send log message 'e' to http target.
func (h *Target) Send(entry interface{}, errKind string) error {
if atomic.LoadInt32(&h.status) == 0 {
// Channel was closed or used before init.
return nil
}
select {
case h.logCh <- entry:
default:
@ -194,3 +212,11 @@ func (h *Target) Send(entry interface{}, errKind string) error {
return nil
}
// Cancel - cancels the target
func (h *Target) Cancel() {
if atomic.CompareAndSwapInt32(&h.status, 1, 0) {
close(h.logCh)
}
h.wg.Wait()
}

View File

@ -197,6 +197,10 @@ func (h *Target) Init() error {
return nil
}
// Cancel - cancels the target
func (h *Target) Cancel() {
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) *Target {

View File

@ -20,6 +20,8 @@ package logger
import (
"sync"
"sync/atomic"
"github.com/minio/minio/internal/logger/target/http"
)
// Target is the entity that we will receive
@ -29,6 +31,7 @@ type Target interface {
String() string
Endpoint() string
Init() error
Cancel()
Send(entry interface{}, errKind string) error
}
@ -108,3 +111,37 @@ func AddTarget(t Target) error {
return nil
}
func cancelAllTargets() {
for _, tgt := range targets {
tgt.Cancel()
}
}
func initTargets(cfg Config) (tgts []Target, err error) {
for _, l := range cfg.HTTP {
if l.Enabled {
t := http.New(l)
if err = t.Init(); err != nil {
return tgts, err
}
tgts = append(tgts, t)
}
}
return tgts, err
}
// UpdateTargets swaps targets with newly loaded ones from the cfg
func UpdateTargets(cfg Config) error {
updated, err := initTargets(cfg)
if err != nil {
return err
}
swapMu.Lock()
atomic.StoreInt32(&nTargets, int32(len(updated)))
cancelAllTargets() // cancel running targets
targets = updated
swapMu.Unlock()
return nil
}