fix: store notification events immediately for persistent queues (#17112)

This commit is contained in:
Praveen raj Mani 2023-05-02 20:23:13 +05:30 committed by GitHub
parent ab34f0065c
commit 1704abaf6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 49 additions and 71 deletions

View File

@ -19,7 +19,6 @@ package cmd
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
@ -40,7 +39,6 @@ type EventNotifier struct {
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
eventsQueue chan eventArgs
}
// NewEventNotifier - creates new event notification object.
@ -51,7 +49,6 @@ func NewEventNotifier() *EventNotifier {
targetResCh: make(chan event.TargetIDResult),
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
eventsQueue: make(chan eventArgs, 10000),
}
}
@ -105,12 +102,6 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object
return err
}
go func() {
for e := range evnot.eventsQueue {
evnot.send(e)
}
}()
go func() {
for res := range evnot.targetResCh {
if res.Err != nil {
@ -210,16 +201,6 @@ func (evnot *EventNotifier) RemoveAllRemoteTargets() {
// Send - sends the event to all registered notification targets
func (evnot *EventNotifier) Send(args eventArgs) {
select {
case evnot.eventsQueue <- args:
default:
// A new goroutine is created for each notification job, eventsQueue is
// drained quickly and is not expected to be filled with any scenario.
logger.LogIf(context.Background(), errors.New("internal events queue unexpectedly full"))
}
}
func (evnot *EventNotifier) send(args eventArgs) {
evnot.RLock()
targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
evnot.RUnlock()

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2022 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -274,13 +274,12 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi
// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
ch, confirms, err := target.channel()
if err != nil {
return err

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -200,13 +200,12 @@ func (target *ElasticsearchTarget) isActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

View File

@ -168,13 +168,12 @@ func (target *KafkaTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2022 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -200,13 +200,12 @@ func (target *MQTTTarget) Send(eventKey string) error {
// Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
// Do not send if the connection is not active.
_, err := target.isActive()

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -196,13 +196,13 @@ func (target *MySQLTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -289,13 +289,14 @@ func (target *NATSTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
* MinIO Object Storage (c) 2021-2023 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -145,13 +145,14 @@ func (target *NSQTarget) isActive() (bool, error) {
// Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -188,13 +188,14 @@ func (target *PostgreSQLTarget) isActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -168,13 +168,12 @@ func (target *RedisTarget) isActive() (bool, error) {
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
_, err := target.isActive()
if err != nil {
return err

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -161,13 +161,12 @@ func (target *WebhookTarget) isActive() (bool, error) {
// Save - saves the events to the store if queuestore is configured,
// which will be replayed when the webhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if err := target.init(); err != nil {
return err
}
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.init(); err != nil {
return err
}
err := target.send(eventData)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {