logger lock should be more granular (#14901)

This PR simplifies few things by splitting
the locks between audit, logger targets to
avoid potential contention between them.

any failures inside audit/logger HTTP
targets must only log to console instead
of other targets to avoid cyclical dependency.

avoids unneeded atomic variables instead
uses RWLock to differentiate a more common
read phase v/s lock phase.
This commit is contained in:
Harshavardhana
2022-05-12 07:20:58 -07:00
committed by GitHub
parent 88dd83a365
commit 9341201132
6 changed files with 114 additions and 58 deletions

View File

@@ -24,7 +24,6 @@ import (
"io"
"net/http"
"strconv"
"sync/atomic"
"time"
"github.com/klauspost/compress/gzhttp"
@@ -158,13 +157,12 @@ func GetAuditEntry(ctx context.Context) *audit.Entry {
// AuditLog - logs audit logs to all audit targets.
func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqClaims map[string]interface{}, filterKeys ...string) {
// Fast exit if there is not audit target configured
if atomic.LoadInt32(&nAuditTargets) == 0 {
auditTgts := AuditTargets()
if len(auditTgts) == 0 {
return
}
var entry audit.Entry
if w != nil && r != nil {
reqInfo := GetReqInfo(ctx)
if reqInfo == nil {
@@ -234,7 +232,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl
}
// Send audit logs only to http targets.
for _, t := range AuditTargets() {
for _, t := range auditTgts {
if err := t.Send(entry, string(All)); err != nil {
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), All)
}

View File

@@ -278,12 +278,7 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) {
logIf(ctx, err, errKind...)
}
// logIf prints a detailed error message during
// the execution of the server.
func logIf(ctx context.Context, err error, errKind ...interface{}) {
if Disable {
return
}
func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entry {
logKind := string(Minio)
if len(errKind) > 0 {
if ek, ok := errKind[0].(Kind); ok {
@@ -357,8 +352,37 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
entry.Trace.Variables = make(map[string]interface{})
}
return entry
}
// consoleLogIf prints a detailed error message during
// the execution of the server.
func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) {
if Disable {
return
}
if consoleTgt != nil {
entry := errToEntry(ctx, err, errKind...)
consoleTgt.Send(entry, entry.LogKind)
}
}
// logIf prints a detailed error message during
// the execution of the server.
func logIf(ctx context.Context, err error, errKind ...interface{}) {
if Disable {
return
}
systemTgts := SystemTargets()
if len(systemTgts) == 0 {
return
}
entry := errToEntry(ctx, err, errKind...)
// Iterate over all logger targets to send the log entry
for _, t := range SystemTargets() {
for _, t := range systemTgts {
if err := t.Send(entry, entry.LogKind); err != nil {
if consoleTgt != nil {
entry.Trace.Message = fmt.Sprintf("event(%#v) was not sent to Logger target (%#v): %#v", entry, t, err)

View File

@@ -31,6 +31,27 @@ type logOnceType struct {
sync.Mutex
}
func (l *logOnceType) logOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) {
if err == nil {
return
}
l.Lock()
shouldLog := false
prevErr := l.IDMap[id]
if prevErr == nil {
l.IDMap[id] = err
shouldLog = true
} else if prevErr.Error() != err.Error() {
l.IDMap[id] = err
shouldLog = true
}
l.Unlock()
if shouldLog {
consoleLogIf(ctx, err, errKind...)
}
}
// One log message per error.
func (l *logOnceType) logOnceIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) {
if err == nil {
@@ -91,3 +112,20 @@ func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interf
logOnce.logOnceIf(ctx, err, id, errKind...)
}
// LogOnceConsoleIf - similar to LogOnceIf but exclusively only logs to console target.
func LogOnceConsoleIf(ctx context.Context, err error, id interface{}, errKind ...interface{}) {
if err == nil {
return
}
if errors.Is(err, context.Canceled) {
return
}
if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" {
return
}
logOnce.logOnceConsoleIf(ctx, err, id, errKind...)
}

View File

@@ -19,7 +19,6 @@ package logger
import (
"sync"
"sync/atomic"
"github.com/minio/minio/internal/logger/target/http"
"github.com/minio/minio/internal/logger/target/kafka"
@@ -39,8 +38,8 @@ type Target interface {
}
var (
// swapMu must be held while reading slice info or swapping targets or auditTargets.
swapMu sync.Mutex
swapAuditMuRW sync.RWMutex
swapSystemMuRW sync.RWMutex
// systemTargets is the set of enabled loggers.
// Must be immutable at all times.
@@ -49,33 +48,25 @@ var (
// This is always set represent /dev/console target
consoleTgt Target
nTargets int32 // atomic count of len(targets)
)
// SystemTargets returns active targets.
// Returned slice may not be modified in any way.
func SystemTargets() []Target {
if atomic.LoadInt32(&nTargets) == 0 {
// Lock free if none...
return nil
}
swapMu.Lock()
swapSystemMuRW.RLock()
defer swapSystemMuRW.RUnlock()
res := systemTargets
swapMu.Unlock()
return res
}
// AuditTargets returns active audit targets.
// Returned slice may not be modified in any way.
func AuditTargets() []Target {
if atomic.LoadInt32(&nAuditTargets) == 0 {
// Lock free if none...
return nil
}
swapMu.Lock()
swapAuditMuRW.RLock()
defer swapAuditMuRW.RUnlock()
res := auditTargets
swapMu.Unlock()
return res
}
@@ -83,8 +74,7 @@ func AuditTargets() []Target {
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
var (
auditTargets = []Target{}
nAuditTargets int32 // atomic count of len(auditTargets)
auditTargets = []Target{}
)
// AddSystemTarget adds a new logger target to the
@@ -93,7 +83,10 @@ func AddSystemTarget(t Target) error {
if err := t.Init(); err != nil {
return err
}
swapMu.Lock()
swapSystemMuRW.Lock()
defer swapSystemMuRW.Unlock()
if consoleTgt == nil {
if t.Type() == types.TargetConsole {
consoleTgt = t
@@ -102,8 +95,6 @@ func AddSystemTarget(t Target) error {
updated := append(make([]Target, 0, len(systemTargets)+1), systemTargets...)
updated = append(updated, t)
systemTargets = updated
atomic.StoreInt32(&nTargets, int32(len(updated)))
swapMu.Unlock()
return nil
}
@@ -142,24 +133,26 @@ func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error)
// UpdateSystemTargets swaps targets with newly loaded ones from the cfg
func UpdateSystemTargets(cfg Config) error {
updated, err := initSystemTargets(cfg.HTTP)
newTgts, err := initSystemTargets(cfg.HTTP)
if err != nil {
return err
}
swapMu.Lock()
swapSystemMuRW.Lock()
defer swapSystemMuRW.Unlock()
for _, tgt := range systemTargets {
// Preserve console target when dynamically updating
// other HTTP targets, console target is always present.
if tgt.Type() == types.TargetConsole {
updated = append(updated, tgt)
newTgts = append(newTgts, tgt)
break
}
}
atomic.StoreInt32(&nTargets, int32(len(updated)))
cancelAllSystemTargets() // cancel running targets
systemTargets = updated
swapMu.Unlock()
systemTargets = newTgts
return nil
}
@@ -183,18 +176,19 @@ func existingAuditTargets(t types.TargetType) []Target {
// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhookTargets(cfg Config) error {
updated, err := initSystemTargets(cfg.AuditWebhook)
newTgts, err := initSystemTargets(cfg.AuditWebhook)
if err != nil {
return err
}
// retain kafka targets
updated = append(existingAuditTargets(types.TargetKafka), updated...)
swapMu.Lock()
atomic.StoreInt32(&nAuditTargets, int32(len(updated)))
// retain kafka targets
swapAuditMuRW.Lock()
defer swapAuditMuRW.Unlock()
newTgts = append(existingAuditTargets(types.TargetKafka), newTgts...)
cancelAuditTargetType(types.TargetHTTP) // cancel running targets
auditTargets = updated
swapMu.Unlock()
auditTargets = newTgts
return nil
}
@@ -204,13 +198,14 @@ func UpdateAuditKafkaTargets(cfg Config) error {
if err != nil {
return err
}
swapAuditMuRW.Lock()
defer swapAuditMuRW.Unlock()
// retain HTTP targets
updated = append(existingAuditTargets(types.TargetHTTP), updated...)
swapMu.Lock()
atomic.StoreInt32(&nAuditTargets, int32(len(updated)))
cancelAuditTargetType(types.TargetKafka) // cancel running targets
auditTargets = updated
swapMu.Unlock()
return nil
}