diff --git a/cmd/event-notification.go b/cmd/event-notification.go index 001c142fb..09e8c3237 100644 --- a/cmd/event-notification.go +++ b/cmd/event-notification.go @@ -19,7 +19,6 @@ package cmd import ( "context" - "errors" "fmt" "net/url" "strings" @@ -40,7 +39,6 @@ type EventNotifier struct { targetResCh chan event.TargetIDResult bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap - eventsQueue chan eventArgs } // NewEventNotifier - creates new event notification object. @@ -51,7 +49,6 @@ func NewEventNotifier() *EventNotifier { targetResCh: make(chan event.TargetIDResult), bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), - eventsQueue: make(chan eventArgs, 10000), } } @@ -105,12 +102,6 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object return err } - go func() { - for e := range evnot.eventsQueue { - evnot.send(e) - } - }() - go func() { for res := range evnot.targetResCh { if res.Err != nil { @@ -210,16 +201,6 @@ func (evnot *EventNotifier) RemoveAllRemoteTargets() { // Send - sends the event to all registered notification targets func (evnot *EventNotifier) Send(args eventArgs) { - select { - case evnot.eventsQueue <- args: - default: - // A new goroutine is created for each notification job, eventsQueue is - // drained quickly and is not expected to be filled with any scenario. - logger.LogIf(context.Background(), errors.New("internal events queue unexpectedly full")) - } -} - -func (evnot *EventNotifier) send(args eventArgs) { evnot.RLock() targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) evnot.RUnlock() diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index cc2356a72..2762f48de 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2022 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -274,13 +274,12 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confi // Save - saves the events to the store which will be replayed when the amqp connection is active. func (target *AMQPTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } ch, confirms, err := target.channel() if err != nil { return err diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index b8f6eee38..5e91b1bb9 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -200,13 +200,12 @@ func (target *ElasticsearchTarget) isActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 9e24b8805..a8e128b1e 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -168,13 +168,12 @@ func (target *KafkaTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/kafka_scram_client_contrib.go b/internal/event/target/kafka_scram_client_contrib.go index ba8d2ad8a..95ec4fb7d 100644 --- a/internal/event/target/kafka_scram_client_contrib.go +++ b/internal/event/target/kafka_scram_client_contrib.go @@ -1,5 +1,5 @@ /* - * MinIO Object Storage (c) 2021 MinIO, Inc. + * MinIO Object Storage (c) 2021-2023 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/internal/event/target/lazyinit.go b/internal/event/target/lazyinit.go index 695da1978..c7abf38d9 100644 --- a/internal/event/target/lazyinit.go +++ b/internal/event/target/lazyinit.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2022 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index c5f315c1d..f20b10adf 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -200,13 +200,12 @@ func (target *MQTTTarget) Send(eventKey string) error { // Save - saves the events to the store if queuestore is configured, which will // be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } // Do not send if the connection is not active. _, err := target.isActive() diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index e5ca61515..ffae34cb9 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -196,13 +196,13 @@ func (target *MySQLTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } if err := target.init(); err != nil { return err } - if target.store != nil { - return target.store.Put(eventData) - } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/mysql_test.go b/internal/event/target/mysql_test.go index 4b08aa3d9..9a59b28d5 100644 --- a/internal/event/target/mysql_test.go +++ b/internal/event/target/mysql_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index ceeec6e32..e38fb7396 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -289,13 +289,14 @@ func (target *NATSTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if err := target.init(); err != nil { return err } - if target.store != nil { - return target.store.Put(eventData) - } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/nats_contrib_test.go b/internal/event/target/nats_contrib_test.go index 5413d3767..a895d87be 100644 --- a/internal/event/target/nats_contrib_test.go +++ b/internal/event/target/nats_contrib_test.go @@ -1,5 +1,5 @@ /* - * MinIO Object Storage (c) 2021 MinIO, Inc. + * MinIO Object Storage (c) 2021-2023 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/internal/event/target/nats_tls_contrib_test.go b/internal/event/target/nats_tls_contrib_test.go index 4bbcc0bb0..43e918ff3 100644 --- a/internal/event/target/nats_tls_contrib_test.go +++ b/internal/event/target/nats_tls_contrib_test.go @@ -1,5 +1,5 @@ /* - * MinIO Object Storage (c) 2021 MinIO, Inc. + * MinIO Object Storage (c) 2021-2023 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index 82ce09ba0..d77707bae 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -145,13 +145,14 @@ func (target *NSQTarget) isActive() (bool, error) { // Save - saves the events to the store which will be replayed when the nsq connection is active. func (target *NSQTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if err := target.init(); err != nil { return err } - if target.store != nil { - return target.store.Put(eventData) - } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/nsq_test.go b/internal/event/target/nsq_test.go index e28161bb9..fb5123206 100644 --- a/internal/event/target/nsq_test.go +++ b/internal/event/target/nsq_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index e91fc7b40..240444bab 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -188,13 +188,14 @@ func (target *PostgreSQLTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if err := target.init(); err != nil { return err } - if target.store != nil { - return target.store.Put(eventData) - } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/postgresql_test.go b/internal/event/target/postgresql_test.go index d624aa9db..0ec94f6f1 100644 --- a/internal/event/target/postgresql_test.go +++ b/internal/event/target/postgresql_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index 40f7ba256..b85b80222 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -168,13 +168,12 @@ func (target *RedisTarget) isActive() (bool, error) { // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. func (target *RedisTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } _, err := target.isActive() if err != nil { return err diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index b7feb9ad1..fbce36c6a 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -161,13 +161,12 @@ func (target *WebhookTarget) isActive() (bool, error) { // Save - saves the events to the store if queuestore is configured, // which will be replayed when the webhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { - if err := target.init(); err != nil { - return err - } - if target.store != nil { return target.store.Put(eventData) } + if err := target.init(); err != nil { + return err + } err := target.send(eventData) if err != nil { if xnet.IsNetworkOrHostDown(err, false) {