Fix race get/set system/audit targest to avoid race errors (#19790)

This commit is contained in:
Anis Eleuch 2024-05-22 17:23:03 +01:00 committed by GitHub
parent 391baa1c9a
commit d0e0b81d8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -43,19 +43,45 @@ type Target interface {
Type() types.TargetType
}
type targetsList struct {
list []Target
mu *sync.RWMutex
}
func newTargetsList() *targetsList {
return &targetsList{
mu: &sync.RWMutex{},
}
}
func (tl targetsList) get() []Target {
tl.mu.RLock()
defer tl.mu.RUnlock()
return tl.list
}
func (tl *targetsList) add(t Target) {
tl.mu.Lock()
defer tl.mu.Unlock()
tl.list = append(tl.list, t)
}
func (tl *targetsList) set(tgts []Target) {
tl.mu.Lock()
defer tl.mu.Unlock()
tl.list = tgts
}
var (
// systemTargets is the set of enabled loggers.
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
systemTargets = []Target{}
swapSystemMuRW sync.RWMutex
systemTargets = newTargetsList()
// auditTargets is the list of enabled audit loggers
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
auditTargets = []Target{}
swapAuditMuRW sync.RWMutex
auditTargets = newTargetsList()
// This is always set represent /dev/console target
consoleTgt Target
@ -64,21 +90,13 @@ var (
// SystemTargets returns active targets.
// Returned slice may not be modified in any way.
func SystemTargets() []Target {
swapSystemMuRW.RLock()
defer swapSystemMuRW.RUnlock()
res := systemTargets
return res
return systemTargets.get()
}
// AuditTargets returns active audit targets.
// Returned slice may not be modified in any way.
func AuditTargets() []Target {
swapAuditMuRW.RLock()
defer swapAuditMuRW.RUnlock()
res := auditTargets
return res
return auditTargets.get()
}
// CurrentStats returns the current statistics.
@ -115,18 +133,13 @@ func AddSystemTarget(ctx context.Context, t Target) error {
return err
}
swapSystemMuRW.Lock()
defer swapSystemMuRW.Unlock()
if consoleTgt == nil {
if t.Type() == types.TargetConsole {
consoleTgt = t
}
}
updated := append(make([]Target, 0, len(systemTargets)+1), systemTargets...)
updated = append(updated, t)
systemTargets = updated
systemTargets.add(t)
return nil
}
@ -170,15 +183,15 @@ func cancelTargets(targets []Target) {
// UpdateHTTPWebhooks swaps system webhook targets with newly loaded ones from the cfg
func UpdateHTTPWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) {
return updateHTTPTargets(ctx, cfgs, &systemTargets)
return updateHTTPTargets(ctx, cfgs, systemTargets)
}
// UpdateAuditWebhooks swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) {
return updateHTTPTargets(ctx, cfgs, &auditTargets)
return updateHTTPTargets(ctx, cfgs, auditTargets)
}
func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetList *[]Target) (errs []error) {
func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetsList *targetsList) (errs []error) {
tgts := make([]*http.Target, 0)
newWebhooks := make([]Target, 0)
for _, cfg := range cfgs {
@ -192,7 +205,7 @@ func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetL
}
}
oldTargets, others := splitTargets(*targetList, types.TargetHTTP)
oldTargets, others := splitTargets(targetsList.get(), types.TargetHTTP)
newWebhooks = append(newWebhooks, others...)
for i := range oldTargets {
@ -219,9 +232,7 @@ func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetL
}
}
swapAuditMuRW.Lock()
*targetList = newWebhooks
swapAuditMuRW.Unlock()
targetsList.set(newWebhooks)
cancelTargets(oldTargets)
@ -232,12 +243,10 @@ func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetL
func UpdateAuditKafkaTargets(ctx context.Context, cfg Config) []error {
newKafkaTgts, errs := initKafkaTargets(ctx, cfg.AuditKafka)
swapAuditMuRW.Lock()
// Retain webhook targets
oldKafkaTgts, otherTgts := splitTargets(auditTargets, types.TargetKafka)
oldKafkaTgts, otherTgts := splitTargets(auditTargets.get(), types.TargetKafka)
newKafkaTgts = append(newKafkaTgts, otherTgts...)
auditTargets = newKafkaTgts
swapAuditMuRW.Unlock()
auditTargets.set(newKafkaTgts)
cancelTargets(oldKafkaTgts) // cancel running targets
return errs