Cancel old logger/audit targets outside lock (#14927)

When configuring a new target, such as an audit target, the server waits
until all audit events are sent to the audit target before doing the
swap from the old to the new audit target. Therefore current S3 operations
can suffer from this since the audit swap lock will be held.

This behavior is unnecessary as the new audit target can enter in a
functional mode immediately and the old audit will just cancel itself
at its own pace.
This commit is contained in:
Anis Elleuch 2022-05-16 21:32:36 +01:00 committed by GitHub
parent d324c0a1c3
commit 05685863e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 50 deletions

View File

@ -99,12 +99,6 @@ func AddSystemTarget(t Target) error {
return nil return nil
} }
func cancelAllSystemTargets() {
for _, tgt := range systemTargets {
tgt.Cancel()
}
}
func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) { func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) {
for _, l := range cfgMap { for _, l := range cfgMap {
if l.Enabled { if l.Enabled {
@ -131,6 +125,26 @@ func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error)
return tgts, err return tgts, err
} }
// Split targets into two groups:
// group1 contains all targets of type t
// group2 contains the remaining targets
func splitTargets(targets []Target, t types.TargetType) (group1 []Target, group2 []Target) {
for _, target := range targets {
if target.Type() == t {
group1 = append(group1, target)
} else {
group2 = append(group2, target)
}
}
return
}
func cancelTargets(targets []Target) {
for _, target := range targets {
target.Cancel()
}
}
// UpdateSystemTargets swaps targets with newly loaded ones from the cfg // UpdateSystemTargets swaps targets with newly loaded ones from the cfg
func UpdateSystemTargets(cfg Config) error { func UpdateSystemTargets(cfg Config) error {
newTgts, err := initSystemTargets(cfg.HTTP) newTgts, err := initSystemTargets(cfg.HTTP)
@ -139,73 +153,47 @@ func UpdateSystemTargets(cfg Config) error {
} }
swapSystemMuRW.Lock() swapSystemMuRW.Lock()
defer swapSystemMuRW.Unlock() consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole)
newTgts = append(newTgts, consoleTargets...)
for _, tgt := range systemTargets {
// Preserve console target when dynamically updating
// other HTTP targets, console target is always present.
if tgt.Type() == types.TargetConsole {
newTgts = append(newTgts, tgt)
break
}
}
cancelAllSystemTargets() // cancel running targets
systemTargets = newTgts systemTargets = newTgts
swapSystemMuRW.Unlock()
cancelTargets(otherTargets) // cancel running targets
return nil return nil
} }
func cancelAuditTargetType(t types.TargetType) {
for _, tgt := range auditTargets {
if tgt.Type() == t {
tgt.Cancel()
}
}
}
func existingAuditTargets(t types.TargetType) []Target {
tgts := make([]Target, 0, len(auditTargets))
for _, tgt := range auditTargets {
if tgt.Type() == t {
tgts = append(tgts, tgt)
}
}
return tgts
}
// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhookTargets(cfg Config) error { func UpdateAuditWebhookTargets(cfg Config) error {
newTgts, err := initSystemTargets(cfg.AuditWebhook) newWebhookTgts, err := initSystemTargets(cfg.AuditWebhook)
if err != nil { if err != nil {
return err return err
} }
// retain kafka targets
swapAuditMuRW.Lock() swapAuditMuRW.Lock()
defer swapAuditMuRW.Unlock() // Retain kafka targets
oldWebhookTgts, otherTgts := splitTargets(auditTargets, types.TargetHTTP)
newTgts = append(existingAuditTargets(types.TargetKafka), newTgts...) newWebhookTgts = append(newWebhookTgts, otherTgts...)
cancelAuditTargetType(types.TargetHTTP) // cancel running targets auditTargets = newWebhookTgts
auditTargets = newTgts swapAuditMuRW.Unlock()
cancelTargets(oldWebhookTgts) // cancel running targets
return nil return nil
} }
// UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg // UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg
func UpdateAuditKafkaTargets(cfg Config) error { func UpdateAuditKafkaTargets(cfg Config) error {
updated, err := initKafkaTargets(cfg.AuditKafka) newKafkaTgts, err := initKafkaTargets(cfg.AuditKafka)
if err != nil { if err != nil {
return err return err
} }
swapAuditMuRW.Lock() swapAuditMuRW.Lock()
defer swapAuditMuRW.Unlock() // Retain webhook targets
oldKafkaTgts, otherTgts := splitTargets(auditTargets, types.TargetKafka)
// retain HTTP targets newKafkaTgts = append(newKafkaTgts, otherTgts...)
updated = append(existingAuditTargets(types.TargetHTTP), updated...) auditTargets = newKafkaTgts
cancelAuditTargetType(types.TargetKafka) // cancel running targets swapAuditMuRW.Unlock()
auditTargets = updated
cancelTargets(oldKafkaTgts) // cancel running targets
return nil return nil
} }